Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Various fixes. Working (in SG) for ForkJoins, but not for Montage.
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 15 Oct 2009 16:26:07 +0000 (16:26 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 15 Oct 2009 16:26:07 +0000 (16:26 +0000)
* Change synchronization scheme for ending: workers send go to master
  instead of having the master calling amok_pm_group_end() right after
  the setup. This killed the workers ways too early.
* make sure that workers do create all mandatory sockets to others
  since dopar threads have real difficulties speaking to the listener
  thread to add new sockets
* instead of having a callback to the go message in the worker which
  does all the work, do explicitely wait for that message and move the
  code in worker's main()

Like I said, there is still a deadlock when trying to replay
Montage_25, but this may related to the fact that this workflow seems
somehow broken. the "cimages.tbl" file, generated by root and consumed
by ID00021@mImgTbl is mangled in the dot output, and thus presumably
also in memory representation. More to come tomorrow.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6792 48e7efb5-ca39-0410-a469-dd3cf9ba447f

examples/gras/replay/replay.c

index f7b23fb..4f79090 100644 (file)
@@ -18,6 +18,8 @@
 #include "amok/peermanagement.h"
 #include <stdio.h>
 
+#include "simix/simix.h"
+
 int master(int argc,char *argv[]);
 int worker(int argc,char *argv[]);
 
@@ -90,12 +92,6 @@ int master(int argc,char *argv[]) {
   }
   /* check that we have a dude for every element of the trace */
   xbt_dynar_foreach(cmds,cursor,cmd) {
-    if (0) {
-      char *str = xbt_workload_elm_to_string(cmd);
-      INFO1("%s",str);
-      free(str);
-    }
-
     pal=xbt_dict_get_or_null(pals,cmd->who);
     if (!pal) {
       CRITICAL1("Process %s didn't came! Abording!",cmd->who);
@@ -104,18 +100,25 @@ int master(int argc,char *argv[]) {
       gras_exit();
       abort();
     }
+  }
+  /* Send the commands to every pal */
+  char *pal_name;
+  xbt_dict_foreach(pals,dict_cursor,pal_name,pal) {
     gras_msg_send(pal,"commands",&cmds);
   }
   INFO0("Sent commands to every processes. Let them start now");
   xbt_dict_cursor_t dict_it;
-  char *pal_name;
   xbt_dict_foreach(pals,dict_it,pal_name,pal) {
     gras_msg_send(pal,"go",NULL);
   }
-  INFO0("They should be started by now.");
+  INFO0("They should be started by now. Wait for their completion.");
+
+  for (cursor=0;cursor<xbt_dynar_length(peers);cursor++) {
+    gras_msg_wait(-1,"go",NULL,NULL);
+  }
 
   /* Done, exiting */
-  amok_pm_group_shutdown("replay");
+  //amok_pm_group_shutdown("replay");
   xbt_dynar_free(&cmds);
   gras_exit();
   return 0;
@@ -132,6 +135,7 @@ static gras_socket_t get_peer_sock(char*peer) {
   worker_data_t g = gras_userdata_get();
   gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
   if (!peer_sock) {
+    INFO1("Create a socket to %s",peer);
     peer_sock = gras_socket_client(peer,4000);
     xbt_dict_set(g->peers,peer,peer_sock,NULL);//gras_socket_close_voidp);
   }
@@ -148,66 +152,31 @@ static void do_command(int rank, void*c) {
   xbt_workload_data_chunk_t chunk;
 
   if (cmd->action == XBT_WORKLOAD_SEND) {
+    gras_socket_t sock = get_peer_sock(cmd->str_arg);
     chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
-    INFO2("Send %.f bytes to %s",cmd->d_arg,cmd->str_arg);
-    gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk);
+    INFO4("Send %.f bytes to %s %s:%d",cmd->d_arg,cmd->str_arg,
+        gras_socket_peer_name(sock),gras_socket_peer_port(sock));
+    gras_msg_send_(sock,gras_msgtype_by_name("chunk"),&chunk);
+    INFO2("Done sending %.f bytes to %s",cmd->d_arg,cmd->str_arg);
 
   } else if (cmd->action == XBT_WORKLOAD_RECV) {
     INFO2("Recv %.f bytes from %s",cmd->d_arg,cmd->str_arg);
     TRY {
       gras_msg_wait(1000000,"chunk",NULL,&chunk);
     } CATCH(e) {
+      SIMIX_display_process_status();
       RETHROW2("Exception while waiting for %f bytes from %s: %s",
             cmd->d_arg,cmd->str_arg);
     }
     xbt_workload_data_chunk_free(chunk);
+    INFO2("Done receiving %.f bytes from %s",cmd->d_arg,cmd->str_arg);
 
   } else {
     xbt_die(bprintf("unknown command: %s",xbt_workload_elm_to_string(cmd)));
   }
 }
-static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) {
-  worker_data_t g = gras_userdata_get();
-  unsigned int cursor;
-  xbt_workload_elm_t cmd;
-  const char *myname=gras_os_myname();
-  xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL);
-
-  xbt_dynar_foreach(g->commands,cursor,cmd) {
-    if (!strcmp(cmd->who,myname)) {
-      char *c = xbt_workload_elm_to_string(cmd);
-      INFO1("%s",c);
-      free(c);
-
-      switch (cmd->action) {
-      case XBT_WORKLOAD_COMPUTE:
-        /* If any communication were queued, do them in parallel */
-        if (xbt_dynar_length(cmd_to_go)){
-          xbt_dynar_dopar(cmd_to_go,do_command);
-          INFO0("Communications all done");
-          xbt_dynar_reset(cmd_to_go);
-        }
-        INFO1("Compute %.f flops",cmd->d_arg);
-        gras_cpu_burn(cmd->d_arg);
-        break;
-      case XBT_WORKLOAD_SEND:
-      case XBT_WORKLOAD_RECV:
-        /* queue communications for later realization in parallel */
-        xbt_dynar_push(cmd_to_go,&cmd);
-        break;
-      }
-    }
-  }
-  /* do in parallel any communication still queued */
-  if (xbt_dynar_length(cmd_to_go)){
-    xbt_dynar_dopar(cmd_to_go,do_command);
-    xbt_dynar_reset(cmd_to_go);
-  }
-
-  return 0;
-}
-
 int worker(int argc,char *argv[]) {
+  xbt_ex_t e;
   worker_data_t globals;
   gras_init(&argc,argv);
   declare_msg();
@@ -218,7 +187,6 @@ int worker(int argc,char *argv[]) {
   int connected=0;
 
   gras_cb_register("commands", worker_commands_cb);
-  gras_cb_register("go", worker_go_cb);
   globals->peers=xbt_dict_new();
 
   if (gras_if_RL())
@@ -239,7 +207,58 @@ int worker(int argc,char *argv[]) {
   }
   /* Join and run the group */
   amok_pm_group_join(master, "replay", -1);
-  amok_pm_mainloop(600);
+  gras_msg_handle(60); // command message
+  gras_msg_wait(60,"go",NULL,NULL);
+  {
+    worker_data_t g = gras_userdata_get();
+    unsigned int cursor;
+    xbt_workload_elm_t cmd;
+    const char *myname=gras_os_myname();
+    xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL);
+
+    xbt_dynar_foreach(g->commands,cursor,cmd) {
+      if (!strcmp(cmd->who,myname)) {
+        char *c = xbt_workload_elm_to_string(cmd);
+  //      INFO1("TODO: %s",c);
+        free(c);
+
+        switch (cmd->action) {
+        case XBT_WORKLOAD_COMPUTE:
+          /* If any communication were queued, do them in parallel */
+          if (xbt_dynar_length(cmd_to_go)){
+            TRY {
+              xbt_dynar_dopar(cmd_to_go,do_command);
+              xbt_dynar_reset(cmd_to_go);
+            } CATCH(e) {
+              SIMIX_display_process_status();
+            }
+            INFO0("Communications all done");
+            xbt_dynar_reset(cmd_to_go);
+          }
+          INFO1("Compute %.f flops",cmd->d_arg);
+          gras_cpu_burn(cmd->d_arg);
+          INFO1("Done computing %.f flops",cmd->d_arg);
+          break;
+        case XBT_WORKLOAD_SEND:
+          /* Create the socket from main thread since it seems to fails when done from dopar thread */
+          get_peer_sock(cmd->str_arg);
+        case XBT_WORKLOAD_RECV:
+          /* queue communications for later realization in parallel */
+          xbt_dynar_push(cmd_to_go,&cmd);
+          break;
+        }
+      }
+    }
+    /* do in parallel any communication still queued */
+    INFO1("Do %ld pending communications after end of TODO list",xbt_dynar_length(cmd_to_go));
+    if (xbt_dynar_length(cmd_to_go)){
+      xbt_dynar_dopar(cmd_to_go,do_command);
+      xbt_dynar_reset(cmd_to_go);
+    }
+  }
+
+  gras_msg_send(master,"go",NULL);
+//  amok_pm_group_leave(master, "replay");
 
   gras_socket_close(globals->mysock);
   xbt_dynar_free(&(globals->commands));