Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Added round trip time contraint to the SDP program, this parameter
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
index 8c81ca2..b19a9f8 100644 (file)
 #include "transport_private.h"
 #include "gras/Virtu/virtu_sg.h"
 
-XBT_LOG_EXTERNAL_CATEGORY(transport);
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport,"SimGrid pseudo-transport");
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg,gras_trp,"SimGrid pseudo-transport");
 
 /***
  *** Prototypes 
  ***/
-void hexa_print(unsigned char *data, int size);   /* in gras.c */
 
 /* retrieve the port record associated to a numerical port on an host */
 static void find_port(gras_hostdata_t *hd, int port, gras_sg_portrec_t *hpd);
@@ -36,14 +34,17 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self,
                               /* OUT */ gras_socket_t sock);
 void gras_trp_sg_socket_close(gras_socket_t sd);
 
+void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
+                               const char *data,
+                               unsigned long int size);
 void gras_trp_sg_chunk_send(gras_socket_t sd,
                            const char *data,
-                           unsigned long int size);
+                           unsigned long int size,
+                           int stable_ignored);
 
-void gras_trp_sg_chunk_recv(gras_socket_t sd,
+int gras_trp_sg_chunk_recv(gras_socket_t sd,
                            char *data,
-                           unsigned long int size,
-                           unsigned long int bufsize);
+                           unsigned long int size);
 
 /***
  *** Specific plugin part
@@ -84,8 +85,9 @@ gras_trp_sg_setup(gras_trp_plugin_t plug) {
   plug->socket_server = gras_trp_sg_socket_server;
   plug->socket_close  = gras_trp_sg_socket_close;
 
-  plug->chunk_send    = gras_trp_sg_chunk_send;
-  plug->chunk_recv    = gras_trp_sg_chunk_recv;
+  plug->raw_send = gras_trp_sg_chunk_send_raw;
+  plug->send = gras_trp_sg_chunk_send;
+  plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
 
   plug->flush         = NULL; /* nothing cached */
 }
@@ -151,10 +153,10 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self,
                               gras_socket_t sock){
 
   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
-  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp");
+  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
   gras_sg_portrec_t pr;
   gras_trp_sg_sock_data_t *data;
-  int found;
+  volatile int found;
   
   const char *host=MSG_host_get_name(MSG_host_self());
 
@@ -236,7 +238,14 @@ typedef struct {
 
 void gras_trp_sg_chunk_send(gras_socket_t sock,
                            const char *data,
-                           unsigned long int size) {
+                           unsigned long int size,
+                           int stable_ignored) {
+  gras_trp_sg_chunk_send_raw(sock,data,size);
+}
+
+void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
+                               const char *data,
+                               unsigned long int size) {
   m_task_t task=NULL;
   static unsigned int count=0;
   char name[256];
@@ -248,25 +257,28 @@ void gras_trp_sg_chunk_send(gras_socket_t sock,
   sprintf(name,"Chunk[%d]",count++);
 
   task_data=xbt_new(sg_task_data_t,1);
-  task_data->data=(void*)xbt_malloc(size);
   task_data->size = size;
-  memcpy(task_data->data,data,size);
+  if (data) {
+    task_data->data=(void*)xbt_malloc(size);
+    memcpy(task_data->data,data,size);
+  } else {
+    task_data->data = NULL;
+  }
 
-  task=MSG_task_create(name,0,((double)size)/(1024.0*1024.0),task_data);
+  task=MSG_task_create(name,0,((double)size),task_data);
 
   DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
         name, MSG_host_get_name(MSG_host_self()),
         MSG_host_get_name(sock_data->to_host), sock_data->to_chan,size);
-  if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK) {
-    THROW0(system_error,0,"Problem during the MSG_task_put");
+  if (MSG_task_put_with_timeout(task, sock_data->to_host,sock_data->to_chan,60.0) != MSG_OK) {
+    THROW0(system_error,0,"Problem during the MSG_task_put with timeout 60");
   }
 }
 
-void gras_trp_sg_chunk_recv(gras_socket_t sock,
+int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data,
-                           unsigned long int size,
-                           unsigned long int bufsize){
-  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp");
+                           unsigned long int size){
+  gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
 
   m_task_t task=NULL;
   sg_task_data_t *task_data;
@@ -277,34 +289,29 @@ void gras_trp_sg_chunk_recv(gras_socket_t sock,
   DEBUG4("recv chunk on %s ->  %s:%d (size=%ld)",
         MSG_host_get_name(sock_data->to_host),
         MSG_host_get_name(MSG_host_self()), sock_data->to_chan, size);
-  if (MSG_task_get(&task, (sock->meas ? pd->measChan : pd->chan)) != MSG_OK)
+  if (MSG_task_get_with_time_out(&task, 
+                                (sock->meas ? pd->measChan : pd->chan),
+                                60) != MSG_OK)
     THROW0(system_error,0,"Error in MSG_task_get()");
   DEBUG1("Got chuck %s",MSG_task_get_name(task));
 
   task_data = MSG_task_get_data(task);
-  if (size != -1) {    
-     if (task_data->size != size)
-       THROW5(mismatch_error,0,
-             "Got %d bytes when %ld where expected (in %s->%s:%d)",
-             task_data->size, size,
-             MSG_host_get_name(sock_data->to_host),
-             MSG_host_get_name(MSG_host_self()), sock_data->to_chan);
-     memcpy(data,task_data->data,size);
-  } else {
-     /* damn, the size is embeeded at the begining of the chunk */
-     int netsize;
-     
-     memcpy((char*)&netsize,task_data->data,4);
-     DEBUG1("netsize embeeded = %d",netsize);
-
-     memcpy(data,task_data->data,netsize+4);
-  }
-  free(task_data->data);
+  if (task_data->size != size)
+    THROW5(mismatch_error,0,
+          "Got %d bytes when %ld where expected (in %s->%s:%d)",
+          task_data->size, size,
+          MSG_host_get_name(sock_data->to_host),
+          MSG_host_get_name(MSG_host_self()), sock_data->to_chan);
+  if (data) 
+    memcpy(data,task_data->data,size);
+  if (task_data->data)
+    free(task_data->data);
   free(task_data);
 
   if (MSG_task_destroy(task) != MSG_OK)
     THROW0(system_error,0,"Error in MSG_task_destroy()");
 
   XBT_OUT;
+  return size;
 }