Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
GRAS replayer partially works. Need a real platform to test it now
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 13 Oct 2009 13:53:16 +0000 (13:53 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 13 Oct 2009 13:53:16 +0000 (13:53 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6757 48e7efb5-ca39-0410-a469-dd3cf9ba447f

examples/gras/replay/replay.c
examples/gras/replay/replay.xml
examples/gras/replay/workload.h
examples/gras/replay/xbt_workload.c

index 327c818..a7ff823 100644 (file)
@@ -23,44 +23,199 @@ int worker(int argc,char *argv[]);
 XBT_LOG_NEW_DEFAULT_CATEGORY(replay, "Messages specific to this example");
 
 static void declare_msg() {
-  gras_datadesc_type_t command_assignment_type;
-
-
+  amok_pm_init();
+  xbt_workload_declare_datadesc();
+  gras_msgtype_declare("go",NULL);
+  gras_msgtype_declare("commands",
+      gras_datadesc_dynar(
+          gras_datadesc_by_name("xbt_workload_elm_t"),xbt_workload_elm_free_voidp));
+  gras_msgtype_declare("chunk",gras_datadesc_by_name("xbt_workload_data_chunk_t"));
 }
 
 int master(int argc,char *argv[]) {
+
   gras_init(&argc,argv);
-  amok_pm_init();
+  declare_msg();
+
 
   xbt_assert0(argc==3,"usage: replay_master tracefile port");
   gras_socket_server(atoi(argv[2])); /* open my master socket, even if I don't use it */
   xbt_dynar_t peers = amok_pm_group_new("replay");            /* group of slaves */
+  xbt_peer_t peer;
   xbt_dynar_t cmds = xbt_workload_parse_file(argv[1]);
   xbt_workload_sort_who_date(cmds);
   unsigned int cursor;
   xbt_workload_elm_t cmd;
 
+  xbt_ex_t e;
+  xbt_dict_cursor_t dict_cursor;
+
+  xbt_dict_t pals_int=xbt_dict_new();
   xbt_dynar_foreach(cmds,cursor,cmd) {
-    char *str = xbt_workload_elm_to_string(cmd);
-    INFO1("%s",str);
-    free(str);
+    int *p = xbt_dict_get_or_null(pals_int,cmd->who);
+    if (!p) {
+      p=(int*)0xBEAF;
+      xbt_dict_set(pals_int,cmd->who,&p,NULL);
+    }
   }
 
   /* friends, we're ready. Come and play */
-  INFO0("Wait for peers for 5 sec");
-  gras_msg_handleall(5);
-  INFO1("Got %ld pals", xbt_dynar_length(peers));
-
+  INFO1("Wait for peers for a while. I need %d peers",xbt_dict_size(pals_int));
+  while (xbt_dynar_length(peers)<xbt_dict_size(pals_int)) {
+    TRY {
+      gras_msg_handle(20);
+    } CATCH(e) {
+      xbt_dynar_foreach(peers,cursor,peer){
+        xbt_dict_remove(pals_int,peer->name);
+      }
+      char *peer_name;
+      void *data;
+      xbt_dict_foreach(pals_int,dict_cursor,peer_name,data) {
+        INFO1("Peer %s didn't showed up",peer_name);
+      }
+      RETHROW;
+    }
+  }
+  INFO1("Got my %ld peers", xbt_dynar_length(peers));
+  xbt_dict_free(&pals_int);
+
+  /* Check who came */
+  xbt_dict_t pals = xbt_dict_new();
+  gras_socket_t pal;
+  xbt_dynar_foreach(peers,cursor, peer) {
+    //INFO1("%s is here",peer->name);
+    gras_socket_t sock = gras_socket_client(peer->name,peer->port);
+    xbt_dict_set(pals,peer->name,sock,NULL);
+  }
+  /* check that we have a dude for every element of the trace */
+  xbt_dynar_foreach(cmds,cursor,cmd) {
+    if (0) {
+      char *str = xbt_workload_elm_to_string(cmd);
+      INFO1("%s",str);
+      free(str);
+    }
+
+    pal=xbt_dict_get_or_null(pals,cmd->who);
+    if (!pal) {
+      CRITICAL1("Process %s didn't came! Abording!",cmd->who);
+      amok_pm_group_shutdown("replay");
+      xbt_dynar_free(&cmds);
+      gras_exit();
+      abort();
+    }
+    gras_msg_send(pal,"commands",&cmds);
+  }
+  INFO0("Sent commands to every processes. Let them start now");
+  xbt_dict_cursor_t dict_it;
+  char *pal_name;
+  xbt_dict_foreach(pals,dict_it,pal_name,pal) {
+    gras_msg_send(pal,"go",NULL);
+  }
+  INFO0("They should be started by now.");
 
   /* Done, exiting */
+  amok_pm_group_shutdown("replay");
   xbt_dynar_free(&cmds);
   gras_exit();
   return 0;
 }
 
+typedef struct {
+  xbt_dynar_t commands;
+  xbt_dict_t peers;
+  gras_socket_t mysock;
+} s_worker_data_t,*worker_data_t;
+
+
+static gras_socket_t get_peer_sock(char*peer) {
+  worker_data_t g = gras_userdata_get();
+  gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
+  if (!peer_sock) {
+    peer_sock = gras_socket_client(peer,4000);
+    xbt_dict_set(g->peers,peer,peer_sock,gras_socket_close_voidp);
+  }
+  return peer_sock;
+}
+static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload) {
+  worker_data_t g = gras_userdata_get();
+  g->commands = *(xbt_dynar_t*)payload;
+  return 0;
+}
+static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) {
+  worker_data_t g = gras_userdata_get();
+  unsigned int cursor;
+  xbt_workload_elm_t cmd;
+  xbt_ex_t e;
+  const char *myname=gras_os_myname();
+  xbt_dynar_foreach(g->commands,cursor,cmd) {
+    xbt_workload_data_chunk_t chunk;
+
+    if (!strcmp(cmd->who,myname)) {
+      char *c = xbt_workload_elm_to_string(cmd);
+      INFO1("%s",c);
+      free(c);
+
+      switch (cmd->action) {
+      case XBT_WORKLOAD_COMPUTE:
+        gras_cpu_burn(cmd->d_arg);
+        break;
+      case XBT_WORKLOAD_SEND:
+        chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
+        gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk);
+        break;
+      case XBT_WORKLOAD_RECV:
+        TRY {
+          gras_msg_wait(1000000,"chunk",NULL,&chunk);
+        } CATCH(e) {
+          RETHROW2("Exception while waiting for %f bytes from %s: %s",
+                cmd->d_arg,cmd->str_arg);
+        }
+        xbt_workload_data_chunk_free(chunk);
+        break;
+      }
+    }
+  }
+  return 0;
+}
+
 int worker(int argc,char *argv[]) {
+  worker_data_t globals;
   gras_init(&argc,argv);
-  amok_pm_init();
+  declare_msg();
+  globals = gras_userdata_new(s_worker_data_t);
+  /* Create the connexions */
+  globals->mysock = gras_socket_server(4000); /* FIXME: shouldn't be hardcoded */
+  gras_socket_t master;
+  int connected=0;
+
+  gras_cb_register("commands", worker_commands_cb);
+  gras_cb_register("go", worker_go_cb);
+  globals->peers=xbt_dict_new();
+
+  if (gras_if_RL())
+    INFO2("Sensor %s starting. Connecting to master on %s", gras_os_myname(), argv[1]);
+  while (!connected) {
+    xbt_ex_t e;
+    TRY {
+      master = gras_socket_client_from_string(argv[1]);
+      connected = 1;
+    }
+    CATCH(e) {
+      if (e.category != system_error)
+        RETHROW;
+      xbt_ex_free(e);
+      INFO0("Failed to connect. Retry in 0.5 second");
+      gras_os_sleep(0.5);
+    }
+  }
+  /* Join and run the group */
+  amok_pm_group_join(master, "replay", -1);
+  amok_pm_mainloop(600);
+
+  gras_socket_close(globals->mysock);
+  xbt_dynar_free(&(globals->commands));
+  xbt_dict_free(&(globals->peers));
+  free(globals);
 
   gras_exit();
   return 0;
index 27a3d75..b30850e 100644 (file)
@@ -1,12 +1,34 @@
 <?xml version='1.0'?>
 <!DOCTYPE platform SYSTEM "simgrid.dtd">
 <platform version="2">
-  <process host="Tremblay" function="master">
-     <argument value="Montage_25.xml" /> <!-- trace file -->
-     <argument value="4000"/>            <!-- port number -->
-  </process>
-  <process host="Fafard" function="worker">
-     <argument value="Tremblay"/>   <!-- server host -->
-     <argument value="4000"/>       <!-- port number -->
+  <process host="Dodge" function="master">
+     <argument value="Montage_25.trace" /> <!-- trace file -->
+     <argument value="4500"/>            <!-- port number -->
   </process>
+  <process host="Bescherelle" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Browne" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Croteau" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Dodge" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Ethernet" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Fafard" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Fernand" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Gaston" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Intel" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Jackson" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Jacquelin" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Jacques" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Jean_Yves" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Jill" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Jocelyne" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Juneau" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Kuenning" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Laroche" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Mathematica" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Monique" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Olivier" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Papineau" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Pronovost" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Provost" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="Stephen" function="worker"><argument value="Dodge:4500"/></process>
+  <process host="TeX" function="worker"><argument value="Dodge:4500"/></process>
 </platform>
index e4e3543..67f0c0c 100644 (file)
@@ -21,6 +21,8 @@
 
 /* one command to do */
 typedef struct {
+  /* keep it in sync with function xbt_workload_declare_datadesc() */
+
   char *who;      /* the slave who should do it */
   char *comment;  /* a comment placed at the end of the line, if any */
   int action;     /* 0: compute(darg flops); 1: send darg bytes to strarg; 2: recv darg bytes from strarg */
@@ -37,4 +39,14 @@ XBT_PUBLIC(int) xbt_workload_elm_cmp_who_date(const void* _c1, const void* _c2);
 XBT_PUBLIC(void) xbt_workload_sort_who_date(xbt_dynar_t c);
 XBT_PUBLIC(xbt_dynar_t) xbt_workload_parse_file(char *filename);
 
+XBT_PUBLIC(void) xbt_workload_declare_datadesc(void);
+
+
+typedef struct {
+  int size;
+  char *chunk;
+} s_xbt_workload_data_chunk_t,*xbt_workload_data_chunk_t;
+XBT_PUBLIC(xbt_workload_data_chunk_t) xbt_workload_data_chunk_new(int size);
+XBT_PUBLIC(void) xbt_workload_data_chunk_free(xbt_workload_data_chunk_t c);
+
 #endif /* XBT_WORKLOAD_H_ */
index ab22b50..1dbdff7 100644 (file)
@@ -12,6 +12,7 @@
 #include "xbt/sysdep.h"
 #include "xbt/str.h"
 #include "workload.h"
+#include "gras/datadesc.h"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_workload,xbt, "Workload characterisation mecanisms");
 
@@ -154,3 +155,40 @@ xbt_dynar_t xbt_workload_parse_file(char *filename) {
   xbt_dynar_free(&in);
   return cmds;
 }
+
+
+void xbt_workload_declare_datadesc(void) {
+  gras_datadesc_type_t ddt;
+
+  ddt = gras_datadesc_struct("s_xbt_workload_elm_t");
+  gras_datadesc_struct_append(ddt,"who",gras_datadesc_by_name("string"));
+  gras_datadesc_struct_append(ddt,"comment",gras_datadesc_by_name("string"));
+  gras_datadesc_struct_append(ddt,"action",gras_datadesc_by_name("int"));
+  gras_datadesc_struct_append(ddt,"date",gras_datadesc_by_name("double"));
+  gras_datadesc_struct_append(ddt,"d_arg",gras_datadesc_by_name("double"));
+  gras_datadesc_struct_append(ddt,"str_arg",gras_datadesc_by_name("string"));
+  gras_datadesc_struct_close(ddt);
+
+  gras_datadesc_ref("xbt_workload_elm_t",ddt);
+
+  ddt = gras_datadesc_struct("s_xbt_workload_data_chunk_t");
+  gras_datadesc_struct_append(ddt,"size",gras_datadesc_by_name("int"));
+  gras_datadesc_cb_field_push(ddt, "size");
+  gras_datadesc_struct_append(ddt,"chunk",gras_datadesc_ref_pop_arr(gras_datadesc_by_name("char")));
+  gras_datadesc_struct_close(ddt);
+
+  gras_datadesc_ref("xbt_workload_data_chunk_t",ddt);
+}
+
+
+
+xbt_workload_data_chunk_t xbt_workload_data_chunk_new(int size) {
+  xbt_workload_data_chunk_t res = xbt_new0(s_xbt_workload_data_chunk_t,1);
+  res->size = size;
+  res->chunk = xbt_new(char,size-sizeof(res)-sizeof(int));
+  return res;
+}
+void xbt_workload_data_chunk_free(xbt_workload_data_chunk_t c) {
+  free(c->chunk);
+  free(c);
+}