Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Avoid to actually malloc the chunk of data when exchanging measurement packets in...
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
index f3243a2..d90c36f 100644 (file)
@@ -24,7 +24,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport,"SimGrid pseudo-transport");
 /***
  *** Prototypes 
  ***/
-void hexa_print(unsigned char *data, int size);   /* in gras.c */
 
 /* retrieve the port record associated to a numerical port on an host */
 static void find_port(gras_hostdata_t *hd, int port, gras_sg_portrec_t *hpd);
@@ -36,11 +35,15 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self,
                               /* OUT */ gras_socket_t sock);
 void gras_trp_sg_socket_close(gras_socket_t sd);
 
+void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
+                               const char *data,
+                               unsigned long int size);
 void gras_trp_sg_chunk_send(gras_socket_t sd,
                            const char *data,
-                           unsigned long int size);
+                           unsigned long int size,
+                           int stable_ignored);
 
-void gras_trp_sg_chunk_recv(gras_socket_t sd,
+int gras_trp_sg_chunk_recv(gras_socket_t sd,
                            char *data,
                            unsigned long int size);
 
@@ -83,8 +86,9 @@ gras_trp_sg_setup(gras_trp_plugin_t plug) {
   plug->socket_server = gras_trp_sg_socket_server;
   plug->socket_close  = gras_trp_sg_socket_close;
 
-  plug->chunk_send    = gras_trp_sg_chunk_send;
-  plug->chunk_recv    = gras_trp_sg_chunk_recv;
+  plug->raw_send = gras_trp_sg_chunk_send_raw;
+  plug->send = gras_trp_sg_chunk_send;
+  plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
 
   plug->flush         = NULL; /* nothing cached */
 }
@@ -150,7 +154,7 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self,
                               gras_socket_t sock){
 
   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
-  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp");
+  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
   gras_sg_portrec_t pr;
   gras_trp_sg_sock_data_t *data;
   int found;
@@ -235,19 +239,32 @@ typedef struct {
 
 void gras_trp_sg_chunk_send(gras_socket_t sock,
                            const char *data,
-                           unsigned long int size) {
+                           unsigned long int size,
+                           int stable_ignored) {
+  gras_trp_sg_chunk_send_raw(sock,data,size);
+}
+
+void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
+                               const char *data,
+                               unsigned long int size) {
   m_task_t task=NULL;
   static unsigned int count=0;
   char name[256];
   gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data;
   sg_task_data_t *task_data;
   
+  xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
+
   sprintf(name,"Chunk[%d]",count++);
 
   task_data=xbt_new(sg_task_data_t,1);
-  task_data->data=(void*)xbt_malloc(size);
   task_data->size = size;
-  memcpy(task_data->data,data,size);
+  if (data) {
+    task_data->data=(void*)xbt_malloc(size);
+    memcpy(task_data->data,data,size);
+  } else {
+    task_data->data = NULL;
+  }
 
   task=MSG_task_create(name,0,((double)size)/(1024.0*1024.0),task_data);
 
@@ -259,120 +276,43 @@ void gras_trp_sg_chunk_send(gras_socket_t sock,
   }
 }
 
-void gras_trp_sg_chunk_recv(gras_socket_t sock,
+int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data,
                            unsigned long int size){
-  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp");
+  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
 
   m_task_t task=NULL;
   sg_task_data_t *task_data;
   gras_trp_sg_sock_data_t *sock_data = sock->data;
 
+  xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
   XBT_IN;
   DEBUG4("recv chunk on %s ->  %s:%d (size=%ld)",
         MSG_host_get_name(sock_data->to_host),
         MSG_host_get_name(MSG_host_self()), sock_data->to_chan, size);
-  if (MSG_task_get(&task, (sock->meas ? pd->measChan : pd->chan)) != MSG_OK)
+  if (MSG_task_get_with_time_out(&task, 
+                                (sock->meas ? pd->measChan : pd->chan),
+                                60) != MSG_OK)
     THROW0(system_error,0,"Error in MSG_task_get()");
   DEBUG1("Got chuck %s",MSG_task_get_name(task));
 
   task_data = MSG_task_get_data(task);
-  if (size != -1) {    
-     if (task_data->size != size)
-       THROW5(mismatch_error,0,
-             "Got %d bytes when %ld where expected (in %s->%s:%d)",
-             task_data->size, size,
-             MSG_host_get_name(sock_data->to_host),
-             MSG_host_get_name(MSG_host_self()), sock_data->to_chan);
-     memcpy(data,task_data->data,size);
-  } else {
-     /* damn, the size is embeeded at the begining of the chunk */
-     int netsize;
-     
-     memcpy((char*)&netsize,task_data->data,4);
-     DEBUG1("netsize embeeded = %d",netsize);
-
-     memcpy(data,task_data->data,netsize+4);
-  }
-  free(task_data->data);
+  if (task_data->size != size)
+    THROW5(mismatch_error,0,
+          "Got %d bytes when %ld where expected (in %s->%s:%d)",
+          task_data->size, size,
+          MSG_host_get_name(sock_data->to_host),
+          MSG_host_get_name(MSG_host_self()), sock_data->to_chan);
+  if (data) 
+    memcpy(data,task_data->data,size);
+  if (task_data->data)
+    free(task_data->data);
   free(task_data);
 
   if (MSG_task_destroy(task) != MSG_OK)
     THROW0(system_error,0,"Error in MSG_task_destroy()");
 
   XBT_OUT;
+  return size;
 }
 
-#if 0
-/* Data exchange over measurement sockets */
-xbt_error_t gras_socket_meas_exchange(gras_socket_t peer,
-                                     int sender,
-                                     unsigned int timeout,
-                                     unsigned long int expSize,
-                                     unsigned long int msgSize) {
-  unsigned int bytesTotal;
-  static unsigned int count=0;
-  m_task_t task=NULL;
-  char name[256];
-  gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)peer->data;
-  
-  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp");
-  double startTime;
-  
-  startTime=gras_os_time(); /* used only in sender mode */
-
-  for(bytesTotal = 0;  bytesTotal < expSize;  bytesTotal += msgSize) {
-
-    if (sender) {
-    
-      sprintf(name,"meas data[%d]",count++);
-      
-      task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
-
-      DEBUG5("%f:%s: gras_socket_meas_send(%f %s -> %s) BEGIN",
-            gras_os_time(), MSG_process_get_name(MSG_process_self()),
-            ((double)msgSize)/(1024.0*1024.0),
-            MSG_host_get_name( MSG_host_self()), peer->peer_name);
-
-      if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK)
-       RAISE0(system_error,"Problem during the MSG_task_put()");
-              
-      DEBUG5("%f:%s: gras_socket_meas_send(%f %s -> %s) END",
-            gras_os_time(), MSG_process_get_name(MSG_process_self()),
-            ((double)msgSize)/(1024.0*1024.0),
-            MSG_host_get_name( MSG_host_self()), peer->peer_name);
-                  
-    } else { /* we are receiver, simulate a select */
-      
-      task=NULL;
-      DEBUG2("%f:%s: gras_socket_meas_recv() BEGIN\n",
-            gras_os_time(), MSG_process_get_name(MSG_process_self()));
-      do {
-       if (MSG_task_Iprobe((m_channel_t) pd->measChan)) {
-         if (MSG_task_get(&task, (m_channel_t) pd->measChan) != MSG_OK) {
-           fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
-           return unknown_error;
-         }
-         
-         if (MSG_task_destroy(task) != MSG_OK) {
-           fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
-           return unknown_error;
-         }
-         
-         DEBUG2("%f:%s: gras_socket_meas_recv() END\n",
-                gras_os_time(), MSG_process_get_name(MSG_process_self()));
-         break;
-       } else {
-         MSG_process_sleep(0.0001);
-       }
-       
-      } while (gras_os_time() - startTime < timeout);
-      
-      if (gras_os_time() - startTime > timeout)
-       return timeout_error;
-    } /* receiver part */
-  } /* foreach msg */
-
-  return no_error;
-}
-#endif