Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Functions gras_trp_sg_chunk_recv and send added.
authordonassbr <donassbr@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 3 May 2007 14:35:09 +0000 (14:35 +0000)
committerdonassbr <donassbr@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 3 May 2007 14:35:09 +0000 (14:35 +0000)
Some memories leaks solved (socket destroy).
But, I still don't know why the simulation time is different.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3480 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras_simix/Msg/gras_simix_sg_msg.c
src/gras_simix/Transport/gras_simix_sg_transport.c
src/gras_simix/Transport/gras_simix_transport.c
src/gras_simix/Transport/gras_simix_transport_plugin_sg.c
src/gras_simix/Virtu/gras_simix_sg_process.c

index bc263f9..89e544b 100644 (file)
@@ -103,33 +103,6 @@ void gras_msg_send_ext(gras_socket_t   sock,
 
        VERB0("Message sent");
 
-/*
-  if (XBT_LOG_ISENABLED(gras_msg,xbt_log_priority_verbose)) {
-     asprintf(&name,"type:'%s';kind:'%s';ID %lu from %s:%d to %s:%d",
-             msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID,
-             gras_os_myname(),gras_os_myport(),
-             gras_socket_peer_name(sock), gras_socket_peer_port(sock));
-     task=MSG_task_create(name,0,
-                         ((double)whole_payload_size),msg);
-     free(name);
-  } else {
-     task=MSG_task_create(msg->type->name,0,
-                         ((double)whole_payload_size),msg);
-  }
-   
-       sock->bufdata = msg;
-       SIMIX_cond_signal(gras_libdata_by_name_from_remote("trp", sock->data->to_process)->cond);
-  DEBUG1("Prepare to send a message to %s",
-        MSG_host_get_name (sock_data->to_host));
-  if (MSG_task_put_with_timeout(task, sock_data->to_host,sock_data->to_chan,60.0) != MSG_OK) 
-    THROW0(system_error,0,"Problem during the MSG_task_put with timeout 60");
-
-  VERB5("Sent to %s(%d) a message type '%s' kind '%s' ID %lu",
-       MSG_host_get_name(sock_data->to_host),sock_data->to_PID,
-       msg->type->name,
-       e_gras_msg_kind_names[msg->kind],
-       msg->ID);
-*/
 }
 /*
  * receive the next message on the given socket.  
index b7c326b..6066347 100644 (file)
@@ -63,19 +63,17 @@ gras_socket_t gras_trp_select(double timeout) {
        /* Ok, got something. Open a socket back to the expeditor */
 
        /* Try to reuse an already openned socket to that expeditor */
+       DEBUG1("Open sockets size %lu",xbt_dynar_length(pd->sockets));
        xbt_dynar_foreach(pd->sockets,cursor,sock_iter) {
                DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter);
 
                if (sock_iter->meas || !sock_iter->outgoing)
                        continue;
-               //DEBUG4("sock_iter %p port %d active %p port %d",((gras_trp_sg_sock_data_t*)sock_iter->data)->to_process,sock_iter->peer_port,((gras_trp_sg_sock_data_t*)pd->active_socket->data)->from_process, pd->active_socket->port);
-               //DEBUG1("\nFrom process %p", ((gras_trp_sg_sock_data_t*)pd->active_socket->data)->from_process);
                if ((sock_iter->peer_port == active_socket->port) && 
                                (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_host == SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)active_socket->data)->from_process))) {
                        SIMIX_mutex_unlock(pd->mutex);
                        return sock_iter;
                }
-
        }
 
        /* Socket to expeditor not created yet */
index beff009..f2db951 100644 (file)
@@ -97,10 +97,10 @@ void gras_trp_init(void){
 
 void
 gras_trp_exit(void){
-  xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
-  gras_socket_t sock_iter;
-  int cursor;
-
+  //xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
+  //gras_socket_t sock_iter;
+  //int cursor;
+       DEBUG1("gras_trp valor %d",_gras_trp_started);
    if (_gras_trp_started == 0) {
       return;
    }
@@ -115,13 +115,15 @@ gras_trp_exit(void){
        }
 #endif
 
-      /* Close all the sockets */
+      /* Close all the sockets, moved to process_close */
+                       /*
+                       DEBUG1("sockets pointer %p",sockets);
       xbt_dynar_foreach(sockets,cursor,sock_iter) {
        VERB1("Closing the socket %p left open on exit. Maybe a socket leak?",
              sock_iter);
        gras_socket_close(sock_iter);
       }
-      
+      */
       /* Delete the plugins */
       xbt_dict_free(&_gras_trp_plugins);
    }
@@ -360,9 +362,11 @@ void gras_socket_close(gras_socket_t sock) {
   }
    
   /* FIXME: Issue an event when the socket is closed */
+       DEBUG1("sockets pointer before %p",sockets);
   if (sock) {
                xbt_dynar_foreach(sockets,cursor,sock_iter) {
                        if (sock == sock_iter) {
+                               DEBUG2("remove sock cursor %d dize %lu\n",cursor,xbt_dynar_length(sockets));
                                xbt_dynar_cursor_rm(sockets,&cursor);
                                if (sock->plugin->socket_close) 
                                        (* sock->plugin->socket_close)(sock);
@@ -611,7 +615,6 @@ void gras_trp_socketset_dump(const char *name) {
  */
 int gras_trp_libdata_id;
 void gras_trp_register() {
-       DEBUG0("\ntrp add\n");
    gras_trp_libdata_id = gras_procdata_add("gras_trp",gras_trp_procdata_new, gras_trp_procdata_free);
 }
 
index 70660f5..df87054 100644 (file)
@@ -13,8 +13,7 @@
 
 #include "xbt/ex.h" 
 
-//#include "msg/msg.h"
-
+#include "gras_simix/Msg/gras_simix_msg_private.h"
 #include "gras_simix_transport_private.h"
 #include "gras_simix/Virtu/gras_simix_virtu_sg.h"
 
@@ -215,7 +214,10 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self,
 }
 
 void gras_trp_sg_socket_close(gras_socket_t sock){
-
+       xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_name("gras_trp"))->sockets;
+       gras_socket_t sock_iter;
+       int cursor;
+       int found;
   gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
   int cpt;
   gras_sg_portrec_t pr; 
@@ -228,21 +230,32 @@ void gras_trp_sg_socket_close(gras_socket_t sock){
 
   if (sock->data)
     free(sock->data);
-       
-/*
-       SIMIX_cond_destroy(hd->cond_port[sock->port]);
-       hd->cond_port[sock->port] = NULL;
-       SIMIX_mutex_destroy(hd->mutex_port[sock->port]);
-       hd->mutex_port[sock->port] = NULL;
-*/
+
+       /* search for a socket in the list that is using the mutex and condition. It can happen because we create 2 sockets to communicate (incomming and outgoing) */
+       found = 0;
+       xbt_dynar_foreach(sockets,cursor,sock_iter) {
+               if (sock_iter->port == sock->port) {
+                       found = 1;
+                       break;
+               }
+       }
+       /* if not found, it is the last socket opened in this port and we can free the mutex and condition */
+       if (!found) {
+               SIMIX_cond_destroy(hd->cond_port[sock->port]);
+               hd->cond_port[sock->port] = NULL;
+               SIMIX_mutex_destroy(hd->mutex_port[sock->port]);
+               hd->mutex_port[sock->port] = NULL;
+       }
+
+
   if (sock->incoming && !sock->outgoing && sock->port >= 0) {
     /* server mode socket. Unregister it from 'OS' tables */
     xbt_dynar_foreach(hd->ports, cpt, pr) {
       DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
       if (pr.port == sock->port) {
-       xbt_dynar_cursor_rm(hd->ports, &cpt);
-       XBT_OUT;
-        return;
+                               xbt_dynar_cursor_rm(hd->ports, &cpt);
+                               XBT_OUT;
+                               return;
       }
     }
     WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
@@ -267,9 +280,69 @@ void gras_trp_sg_chunk_send(gras_socket_t sock,
 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
                                const char *data,
                                unsigned long int size) {
+  char name[256];
+  static unsigned int count=0;
+
+       smx_action_t act; /* simix action */
+       gras_trp_sg_sock_data_t *sock_data; 
+       gras_hostdata_t *hd;
+       gras_hostdata_t *remote_hd;
+       gras_trp_procdata_t trp_remote_proc;
+       gras_msg_procdata_t msg_remote_proc;
+       gras_msg_t msg; /* message to send */
+
+       sock_data = (gras_trp_sg_sock_data_t *)sock->data;
+       hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
+       remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host);
+
+  xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
+
+
+  sprintf(name,"Chunk[%d]",count++);
+       /*initialize gras message */
+       msg = xbt_new(s_gras_msg_t,1);
+       msg->expe = sock;
+       msg->payl_size=size;
+
+  if (data) {
+    msg->payl=(void*)xbt_malloc(size);
+    memcpy(msg->payl,data,size);
+  } else {
+    msg->payl = NULL;
+  }
+
+       /* put message on msg_queue */
+       msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
+       xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
+       
+       /* wake-up the receiver */
+       trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
+       SIMIX_cond_signal(trp_remote_proc->cond);
+
+       SIMIX_mutex_lock(remote_hd->mutex_port[sock->peer_port]);
+       /* wait for the receiver */
+       SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]);
+
+       /* creates simix action and waits its ends, waits in the sender host condition*/
+  DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
+        name, SIMIX_host_get_name(SIMIX_host_self()),
+        SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size);
+
+       act = SIMIX_action_communicate(sock_data->to_host, SIMIX_host_self(),name, size, -1);
+       SIMIX_register_action_to_condition(act,remote_hd->cond_port[sock->peer_port]);
+       SIMIX_register_condition_to_action(act,remote_hd->cond_port[sock->peer_port]);
+
+       SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process),
+       
+       SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]);
+       /* error treatmeant */
+
+       /* cleanup structures */
+       SIMIX_action_destroy(act);
+       SIMIX_mutex_unlock(remote_hd->mutex_port[sock->peer_port]);
+       SIMIX_cond_signal(remote_hd->cond_port[sock->peer_port]);
                                /*
   m_task_t task=NULL;
-  static unsigned int count=0;
   char name[256];
   gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data;
   sg_task_data_t *task_data;
@@ -300,6 +373,56 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data,
                            unsigned long int size){
+  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
+       gras_trp_sg_sock_data_t *sock_data; 
+       gras_hostdata_t *remote_hd;
+       gras_hostdata_t *local_hd;
+  gras_msg_t msg_got;
+       gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
+
+  xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
+       
+       SIMIX_mutex_lock(pd->mutex);
+       if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) {
+               SIMIX_cond_wait_timeout(pd->cond,pd->mutex,60);
+       }
+       if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) {
+               THROW0(timeout_error,0,"Timeout");
+       }
+       SIMIX_mutex_unlock(pd->mutex);
+
+       sock_data = (gras_trp_sg_sock_data_t *)sock->data;
+       DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port);
+       remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host);
+       local_hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
+
+
+
+  msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue);
+
+       SIMIX_mutex_lock(local_hd->mutex_port[sock->port]);
+/* ok, I'm here, you can continuate the communication */
+       SIMIX_cond_signal(local_hd->cond_port[sock->port]);
+
+/* wait for communication end */
+       SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]);
+
+
+  if (msg_got->payl_size != size)
+    THROW5(mismatch_error,0,
+          "Got %d bytes when %ld where expected (in %s->%s:%d)",
+          msg_got->payl_size, size,
+          SIMIX_host_get_name(sock_data->to_host),
+          SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
+  if (data) {
+    memcpy(data,msg_got->payl,size);
+       }
+       if (msg_got->payl)
+               xbt_free(msg_got->payl);        
+       xbt_free(msg_got);
+       SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]);
+       SIMIX_mutex_unlock(local_hd->mutex_port[sock->port]);
+
                                        /*
   gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
 
index acb01ff..4a0189b 100644 (file)
@@ -23,11 +23,8 @@ gras_process_init() {
   gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
   gras_procdata_t *pd=xbt_new0(gras_procdata_t,1);
   gras_trp_procdata_t trp_pd;
-  //gras_sg_portrec_t prmeas,pr;
-  //int i;
-  
-  SIMIX_process_set_data(SIMIX_process_self(),(void*)pd);
 
+  SIMIX_process_set_data(SIMIX_process_self(),(void*)pd);
 
 
   gras_procdata_init();
@@ -38,8 +35,6 @@ gras_process_init() {
     hd->refcount = 1;
     hd->ports = xbt_dynar_new(sizeof(gras_sg_portrec_t),NULL);
 
-  //  memset(hd->proc, 0, sizeof(hd->proc[0]) * XBT_MAX_CHANNEL); 
-       
                for (i=0;i<65536;i++) {
                        hd->cond_port[i] =NULL;
                        hd->mutex_port[i] =NULL;
@@ -60,41 +55,6 @@ gras_process_init() {
        trp_pd->mutex = SIMIX_mutex_init();
        trp_pd->cond = SIMIX_cond_init();
        trp_pd->active_socket = xbt_fifo_new();
-  /* take a free channel for this process */
-  /*
-       trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
-  for (i=0; i<XBT_MAX_CHANNEL && hd->proc[i]; i++);
-  if (i == XBT_MAX_CHANNEL) 
-    THROW2(system_error,0,
-          "Can't add a new process on %s, because all channels are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS.",
-           MSG_host_get_name(MSG_host_self()),XBT_MAX_CHANNEL);
-
-  trp_pd->chan = i;
-  hd->proc[ i ] = MSG_process_self_PID();
-*/
-  /* regiter it to the ports structure */
- // pr.port = -1;
-  //pr.tochan = i;
-  //pr.meas = 0;
-  //xbt_dynar_push(hd->ports,&pr);
-
-  /* take a free meas channel for this process */
-  /*
-       for (i=0; i<XBT_MAX_CHANNEL && hd->proc[i]; i++);
-  if (i == XBT_MAX_CHANNEL) {
-    THROW2(system_error,0,
-          "Can't add a new process on %s, because all channels are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS.",
-           MSG_host_get_name(MSG_host_self()),XBT_MAX_CHANNEL);
-  }
-  trp_pd->measChan = i;
-
-  hd->proc[ i ] = MSG_process_self_PID();
-*/
-  /* register it to the ports structure */
-  //prmeas.port = -1;
-  //prmeas.tochan = i;
-  //prmeas.meas = 1;
-  //xbt_dynar_push(hd->ports,&prmeas);
 
   VERB2("Creating process '%s' (%ld)",
           SIMIX_process_get_name(SIMIX_process_self()),
@@ -104,6 +64,9 @@ gras_process_init() {
 void
 gras_process_exit() {
        int i;
+       xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_name("gras_trp"))->sockets;
+  gras_socket_t sock_iter;
+  int cursor;
   gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
   gras_procdata_t *pd=(gras_procdata_t*)SIMIX_process_get_data(SIMIX_process_self());
 
@@ -126,20 +89,12 @@ gras_process_exit() {
     WARN1("process %ld terminated, but some messages are still queued",
          gras_os_getpid());
 
-/*
-  for (cpt=0; cpt< XBT_MAX_CHANNEL; cpt++)
-    if (myPID == hd->proc[cpt])
-      hd->proc[cpt] = 0;
-*/
-
-/* remove ports from host, maybe i can do it on the socket destroy function */
-  /*
-       xbt_dynar_foreach(hd->ports, cpt, pr) {
-    if (pr.port == trp_pd->chan || pr.port == trp_pd->measChan) {
-      xbt_dynar_cursor_rm(hd->ports, &cpt);
-    }
-  }*/
-       
+       /* if each process has its sockets list, we need to close them when the process finish */
+       xbt_dynar_foreach(sockets,cursor,sock_iter) {
+               VERB1("Closing the socket %p left open on exit. Maybe a socket leak?",
+                               sock_iter);
+               gras_socket_close(sock_iter);
+       }
   if ( ! --(hd->refcount)) {
     xbt_dynar_free(&hd->ports);
                for (i=0; i<65536; i++) {