Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
It was more difficult to find the bug that I thought, but it seems working now.
authordonassbr <donassbr@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 5 Jul 2007 12:04:38 +0000 (12:04 +0000)
committerdonassbr <donassbr@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 5 Jul 2007 12:04:38 +0000 (12:04 +0000)
The AMOK test works.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3660 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Msg/msg.c
src/gras/Msg/msg_interface.h
src/gras/Transport/transport_interface.h
src/gras/Transport/transport_plugin_sg.c
src/gras/Virtu/sg_process.c

index 19adf43..b24b4c5 100644 (file)
@@ -40,6 +40,7 @@ static void *gras_msg_procdata_new() {
    res->cbl_list      = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
    res->timers        = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
    res->msg_to_receive_queue = xbt_fifo_new();
    res->cbl_list      = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
    res->timers        = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
    res->msg_to_receive_queue = xbt_fifo_new();
+   res->msg_to_receive_queue_meas = xbt_fifo_new();
    
    return (void*)res;
 }
    
    return (void*)res;
 }
@@ -55,6 +56,7 @@ static void gras_msg_procdata_free(void *data) {
    xbt_dynar_free(&( res->cbl_list ));
    xbt_dynar_free(&( res->timers ));
    xbt_fifo_free( res->msg_to_receive_queue );
    xbt_dynar_free(&( res->cbl_list ));
    xbt_dynar_free(&( res->timers ));
    xbt_fifo_free( res->msg_to_receive_queue );
+   xbt_fifo_free( res->msg_to_receive_queue_meas );
 
    free(res->name);
    free(res);
 
    free(res->name);
    free(res);
index 2cb52fe..9c53b39 100644 (file)
@@ -40,6 +40,7 @@ typedef struct {
        
        /* queue storing the msgs that have to received and the process synchronization made (wait the surf action done) */
        xbt_fifo_t msg_to_receive_queue; /* elm type: s_gras_msg_t */
        
        /* queue storing the msgs that have to received and the process synchronization made (wait the surf action done) */
        xbt_fifo_t msg_to_receive_queue; /* elm type: s_gras_msg_t */
+       xbt_fifo_t msg_to_receive_queue_meas; /* elm type: s_gras_msg_t */
 
 } s_gras_msg_procdata_t,*gras_msg_procdata_t;
 
 
 } s_gras_msg_procdata_t,*gras_msg_procdata_t;
 
index eb6064b..379e5bc 100644 (file)
@@ -105,7 +105,10 @@ typedef struct {
   /* SG only elements. In RL, they are part of the OS ;) */
        smx_cond_t cond;
        smx_mutex_t mutex;
   /* SG only elements. In RL, they are part of the OS ;) */
        smx_cond_t cond;
        smx_mutex_t mutex;
+       smx_cond_t cond_meas;
+       smx_mutex_t mutex_meas;
        xbt_fifo_t active_socket;
        xbt_fifo_t active_socket;
+       xbt_fifo_t active_socket_meas;
    
 } s_gras_trp_procdata_t,*gras_trp_procdata_t;
 
    
 } s_gras_trp_procdata_t,*gras_trp_procdata_t;
 
index 32134a9..ee6de67 100644 (file)
@@ -259,21 +259,15 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
 
        smx_action_t act; /* simix action */
        gras_trp_sg_sock_data_t *sock_data; 
 
        smx_action_t act; /* simix action */
        gras_trp_sg_sock_data_t *sock_data; 
-       gras_trp_sg_sock_data_t *remote_sock_data; 
-       gras_hostdata_t *hd;
-       gras_hostdata_t *remote_hd;
        gras_trp_procdata_t trp_remote_proc;
        gras_msg_procdata_t msg_remote_proc;
        gras_msg_t msg; /* message to send */
 
        sock_data = (gras_trp_sg_sock_data_t *)sock->data;
        gras_trp_procdata_t trp_remote_proc;
        gras_msg_procdata_t msg_remote_proc;
        gras_msg_t msg; /* message to send */
 
        sock_data = (gras_trp_sg_sock_data_t *)sock->data;
-       remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data;
-       hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
-       remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host);
 
   xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
 
 
   xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
 
-
+       SIMIX_mutex_lock(sock_data->mutex);
   sprintf(name,"Chunk[%d]",count++);
        /*initialize gras message */
        msg = xbt_new(s_gras_msg_t,1);
   sprintf(name,"Chunk[%d]",count++);
        /*initialize gras message */
        msg = xbt_new(s_gras_msg_t,1);
@@ -289,17 +283,16 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
 
        /* put message on msg_queue */
        msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
 
        /* put message on msg_queue */
        msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
-       xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
-       
-       /* wake-up the receiver */
+       xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg);
+       /* put his socket on the active_socket list */
        trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
        trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
-       SIMIX_cond_signal(trp_remote_proc->cond);
+       xbt_fifo_push(trp_remote_proc->active_socket_meas,sock);        
+       /* wake-up the receiver */
+
+       SIMIX_cond_signal(trp_remote_proc->cond_meas);
 
 
-       SIMIX_mutex_lock(remote_sock_data->mutex);
-       //SIMIX_mutex_lock(remote_hd->mutex_port[sock->peer_port]);
        /* wait for the receiver */
        /* wait for the receiver */
-       SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
-       //SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]);
+       SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
 
        /* creates simix action and waits its ends, waits in the sender host condition*/
   DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
 
        /* creates simix action and waits its ends, waits in the sender host condition*/
   DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
@@ -307,68 +300,52 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
         SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size);
 
        act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, name, size, -1);
         SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size);
 
        act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, name, size, -1);
-       /*
-       SIMIX_register_action_to_condition(act,remote_hd->cond_port[sock->peer_port]);
-       SIMIX_register_condition_to_action(act,remote_hd->cond_port[sock->peer_port]);
-       */
-       SIMIX_register_action_to_condition(act,remote_sock_data->cond);
-       SIMIX_register_condition_to_action(act,remote_sock_data->cond);
-
-       SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process),
-       
-       SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
-       //SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]);
+       SIMIX_register_action_to_condition(act,sock_data->cond);
+       SIMIX_register_condition_to_action(act,sock_data->cond);
+
+       SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
        /* error treatmeant */
 
        /* cleanup structures */
        SIMIX_action_destroy(act);
 
        /* error treatmeant */
 
        /* cleanup structures */
        SIMIX_action_destroy(act);
 
-       SIMIX_mutex_unlock(remote_sock_data->mutex);
-       //SIMIX_mutex_unlock(remote_hd->mutex_port[sock->peer_port]);
-       SIMIX_cond_signal(remote_sock_data->cond);
-       //SIMIX_cond_signal(remote_hd->cond_port[sock->peer_port]);
+       SIMIX_mutex_unlock(sock_data->mutex);
 }
 
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data,
                            unsigned long int size){
 }
 
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data,
                            unsigned long int size){
-  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
        gras_trp_sg_sock_data_t *sock_data; 
        gras_trp_sg_sock_data_t *sock_data; 
-       gras_hostdata_t *remote_hd;
-       gras_hostdata_t *local_hd;
+       gras_trp_sg_sock_data_t *remote_sock_data; 
+       gras_socket_t remote_socket;
   gras_msg_t msg_got;
        gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
   gras_msg_t msg_got;
        gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
+  gras_trp_procdata_t trp_proc=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
 
   xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
        
 
   xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
        
-       SIMIX_mutex_lock(pd->mutex);
-       if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) {
-               SIMIX_cond_wait_timeout(pd->cond,pd->mutex,60);
+       SIMIX_mutex_lock(trp_proc->mutex_meas);
+       if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
+               SIMIX_cond_wait_timeout(trp_proc->cond_meas,trp_proc->mutex_meas,60);
        }
        }
-       if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) {
+       if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
+               SIMIX_mutex_unlock(trp_proc->mutex_meas);
                THROW0(timeout_error,0,"Timeout");
        }
                THROW0(timeout_error,0,"Timeout");
        }
-       SIMIX_mutex_unlock(pd->mutex);
-
-       sock_data = (gras_trp_sg_sock_data_t *)sock->data;
-       DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port);
-       remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host);
-       local_hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
+       SIMIX_mutex_unlock(trp_proc->mutex_meas);
 
 
+       remote_socket = xbt_fifo_shift(trp_proc->active_socket_meas);
+       remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data;
+  msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
 
 
+       sock_data = (gras_trp_sg_sock_data_t *)sock->data;
 
 
-  msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue);
-
-       SIMIX_mutex_lock(sock_data->mutex);
-       //SIMIX_mutex_lock(local_hd->mutex_port[sock->port]);
 /* ok, I'm here, you can continue the communication */
 /* ok, I'm here, you can continue the communication */
-       SIMIX_cond_signal(sock_data->cond);
-       //SIMIX_cond_signal(local_hd->cond_port[sock->port]);
+       SIMIX_cond_signal(remote_sock_data->cond);
 
 
+       SIMIX_mutex_lock(remote_sock_data->mutex);
 /* wait for communication end */
 /* wait for communication end */
-       SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
-       //SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]);
-
+       SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
 
   if (msg_got->payl_size != size)
     THROW5(mismatch_error,0,
 
   if (msg_got->payl_size != size)
     THROW5(mismatch_error,0,
@@ -382,13 +359,7 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock,
        if (msg_got->payl)
                xbt_free(msg_got->payl);        
        xbt_free(msg_got);
        if (msg_got->payl)
                xbt_free(msg_got->payl);        
        xbt_free(msg_got);
-       SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
-       SIMIX_mutex_unlock(sock_data->mutex);
-       //SIMIX_mutex_unlock(local_hd->mutex_port[sock->port]);
-       /*
-       SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]);
-       SIMIX_mutex_unlock(local_hd->mutex_port[sock->port]);
-       */
+       SIMIX_mutex_unlock(remote_sock_data->mutex);
        return 0;
 }
 
        return 0;
 }
 
index ff44d66..77951a5 100644 (file)
@@ -48,7 +48,10 @@ gras_process_init() {
 
        trp_pd->mutex = SIMIX_mutex_init();
        trp_pd->cond = SIMIX_cond_init();
 
        trp_pd->mutex = SIMIX_mutex_init();
        trp_pd->cond = SIMIX_cond_init();
+       trp_pd->mutex_meas = SIMIX_mutex_init();
+       trp_pd->cond_meas = SIMIX_cond_init();
        trp_pd->active_socket = xbt_fifo_new();
        trp_pd->active_socket = xbt_fifo_new();
+       trp_pd->active_socket_meas = xbt_fifo_new();
 
   VERB2("Creating process '%s' (%d)",
           SIMIX_process_get_name(SIMIX_process_self()),
 
   VERB2("Creating process '%s' (%d)",
           SIMIX_process_get_name(SIMIX_process_self()),
@@ -69,9 +72,10 @@ gras_process_exit() {
        SIMIX_mutex_destroy(trp_pd->mutex);
        SIMIX_cond_destroy(trp_pd->cond);
        xbt_fifo_free(trp_pd->active_socket);
        SIMIX_mutex_destroy(trp_pd->mutex);
        SIMIX_cond_destroy(trp_pd->cond);
        xbt_fifo_free(trp_pd->active_socket);
-  //int myPID=gras_os_getpid();
-  //int cpt;
-  //gras_sg_portrec_t pr;
+       SIMIX_mutex_destroy(trp_pd->mutex_meas);
+       SIMIX_cond_destroy(trp_pd->cond_meas);
+       xbt_fifo_free(trp_pd->active_socket_meas);
+
 
   xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!");
 
 
   xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!");