Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Try to do all communications in parallel (not quite working)
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 15 Oct 2009 11:54:49 +0000 (11:54 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 15 Oct 2009 11:54:49 +0000 (11:54 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6784 48e7efb5-ca39-0410-a469-dd3cf9ba447f

examples/gras/replay/replay.c

index a8e9996..f7b23fb 100644 (file)
@@ -12,6 +12,7 @@
 #include "xbt/log.h"
 #include "xbt/str.h"
 #include "xbt/dynar.h"
 #include "xbt/log.h"
 #include "xbt/str.h"
 #include "xbt/dynar.h"
+#include "xbt/synchro.h"
 #include "workload.h"
 #include "gras.h"
 #include "amok/peermanagement.h"
 #include "workload.h"
 #include "gras.h"
 #include "amok/peermanagement.h"
@@ -132,7 +133,7 @@ static gras_socket_t get_peer_sock(char*peer) {
   gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
   if (!peer_sock) {
     peer_sock = gras_socket_client(peer,4000);
   gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
   if (!peer_sock) {
     peer_sock = gras_socket_client(peer,4000);
-    xbt_dict_set(g->peers,peer,peer_sock,gras_socket_close_voidp);
+    xbt_dict_set(g->peers,peer,peer_sock,NULL);//gras_socket_close_voidp);
   }
   return peer_sock;
 }
   }
   return peer_sock;
 }
@@ -141,15 +142,38 @@ static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload) {
   g->commands = *(xbt_dynar_t*)payload;
   return 0;
 }
   g->commands = *(xbt_dynar_t*)payload;
   return 0;
 }
+static void do_command(int rank, void*c) {
+  xbt_ex_t e;
+  xbt_workload_elm_t cmd = *(xbt_workload_elm_t*)c;
+  xbt_workload_data_chunk_t chunk;
+
+  if (cmd->action == XBT_WORKLOAD_SEND) {
+    chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
+    INFO2("Send %.f bytes to %s",cmd->d_arg,cmd->str_arg);
+    gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk);
+
+  } else if (cmd->action == XBT_WORKLOAD_RECV) {
+    INFO2("Recv %.f bytes from %s",cmd->d_arg,cmd->str_arg);
+    TRY {
+      gras_msg_wait(1000000,"chunk",NULL,&chunk);
+    } CATCH(e) {
+      RETHROW2("Exception while waiting for %f bytes from %s: %s",
+            cmd->d_arg,cmd->str_arg);
+    }
+    xbt_workload_data_chunk_free(chunk);
+
+  } else {
+    xbt_die(bprintf("unknown command: %s",xbt_workload_elm_to_string(cmd)));
+  }
+}
 static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) {
   worker_data_t g = gras_userdata_get();
   unsigned int cursor;
   xbt_workload_elm_t cmd;
 static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) {
   worker_data_t g = gras_userdata_get();
   unsigned int cursor;
   xbt_workload_elm_t cmd;
-  xbt_ex_t e;
   const char *myname=gras_os_myname();
   const char *myname=gras_os_myname();
-  xbt_dynar_foreach(g->commands,cursor,cmd) {
-    xbt_workload_data_chunk_t chunk;
+  xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL);
 
 
+  xbt_dynar_foreach(g->commands,cursor,cmd) {
     if (!strcmp(cmd->who,myname)) {
       char *c = xbt_workload_elm_to_string(cmd);
       INFO1("%s",c);
     if (!strcmp(cmd->who,myname)) {
       char *c = xbt_workload_elm_to_string(cmd);
       INFO1("%s",c);
@@ -157,24 +181,29 @@ static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) {
 
       switch (cmd->action) {
       case XBT_WORKLOAD_COMPUTE:
 
       switch (cmd->action) {
       case XBT_WORKLOAD_COMPUTE:
+        /* If any communication were queued, do them in parallel */
+        if (xbt_dynar_length(cmd_to_go)){
+          xbt_dynar_dopar(cmd_to_go,do_command);
+          INFO0("Communications all done");
+          xbt_dynar_reset(cmd_to_go);
+        }
+        INFO1("Compute %.f flops",cmd->d_arg);
         gras_cpu_burn(cmd->d_arg);
         break;
       case XBT_WORKLOAD_SEND:
         gras_cpu_burn(cmd->d_arg);
         break;
       case XBT_WORKLOAD_SEND:
-        chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
-        gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk);
-        break;
       case XBT_WORKLOAD_RECV:
       case XBT_WORKLOAD_RECV:
-        TRY {
-          gras_msg_wait(1000000,"chunk",NULL,&chunk);
-        } CATCH(e) {
-          RETHROW2("Exception while waiting for %f bytes from %s: %s",
-                cmd->d_arg,cmd->str_arg);
-        }
-        xbt_workload_data_chunk_free(chunk);
+        /* queue communications for later realization in parallel */
+        xbt_dynar_push(cmd_to_go,&cmd);
         break;
       }
     }
   }
         break;
       }
     }
   }
+  /* do in parallel any communication still queued */
+  if (xbt_dynar_length(cmd_to_go)){
+    xbt_dynar_dopar(cmd_to_go,do_command);
+    xbt_dynar_reset(cmd_to_go);
+  }
+
   return 0;
 }
 
   return 0;
 }