Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add a flat broadcast action
authorsuter <suter@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 29 Oct 2009 20:59:50 +0000 (20:59 +0000)
committersuter <suter@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 29 Oct 2009 20:59:50 +0000 (20:59 +0000)
requires a comm_size action to set the size of the MPI_COMM_WORLD
communicator
update the actions_with_isend.txt to include the Isend action

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6823 48e7efb5-ca39-0410-a469-dd3cf9ba447f

examples/msg/actions/actions.c
examples/msg/actions/actions_bcast.txt [new file with mode: 0644]
examples/msg/actions/actions_with_isend.txt
examples/msg/actions/bcast_deployment.xml [new file with mode: 0644]

index 205aa75..e57369a 100644 (file)
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
                  "Messages specific for this msg example");
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
                  "Messages specific for this msg example");
+int communicator_size=0;
 
 
+typedef struct coll_ctr_t{
+  int bcast_counter;
+} *coll_ctr;
 
 /* Helper function */
 static double parse_double(const char *string) {
 
 /* Helper function */
 static double parse_double(const char *string) {
@@ -45,8 +49,8 @@ static int spawned_send(int argc, char *argv[])
   char *name = xbt_str_join(action, " ");
   char *to = xbt_dynar_get_as(action, 2, char *);
   char *size = xbt_dynar_get_as(action, 3, char *);
   char *name = xbt_str_join(action, " ");
   char *to = xbt_dynar_get_as(action, 2, char *);
   char *size = xbt_dynar_get_as(action, 3, char *);
-  
-  INFO1("Sending on %s", name);
+  INFO3("name is %s, to is %s, sizeis %s", name, to, size);
+  INFO1("Sending on %s\n", name);
   MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
   INFO1("Sent %s", name);
   free(name);
   MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
   INFO1("Sent %s", name);
   free(name);
@@ -136,6 +140,83 @@ static void barrier (xbt_dynar_t action)
   free(name);
 
 }
   free(name);
 
 }
+
+static int bcast_spawned_send(int argc, char *argv[])
+{
+  char name[80];
+  INFO3("%s: Sending %s on %s", MSG_process_self()->name, 
+       argv[1],argv[0]);
+  MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL), 
+               argv[0]);
+  
+  sprintf(name,"%s_wait",argv[0]);
+  return 0;
+}
+
+static void bcast (xbt_dynar_t action)
+{
+  int i;
+  char *name;
+  const char* process_name;
+  char task_name[80];
+  char spawn_name[80];
+  char **myargv;
+  m_process_t comm_helper=NULL;
+  m_task_t task=NULL;
+  char *size = xbt_dynar_get_as(action, 2, char *);
+  coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
+
+  xbt_assert0(communicator_size, "Size of Communicator is not defined"
+             ", can't use collective operations");
+
+  MSG_process_self()->data=NULL;
+
+  process_name = MSG_process_self()->name;
+  if (!counters){
+    DEBUG0("Initialize the counters");
+    counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
+  }
+
+  name = bprintf("bcast_%d", counters->bcast_counter++);
+  if (!strcmp(process_name, "process0")){
+    INFO2("%s: %s is the Root",name, process_name);
+
+    for(i=1;i<communicator_size;i++){
+      myargv = (char**) calloc (3, sizeof (char*));
+      myargv[0] = xbt_strdup(name);
+      myargv[1] = xbt_strdup(size);
+      myargv[2] = NULL;
+
+      sprintf(spawn_name,"%s_%d", myargv[0], i);
+      comm_helper = 
+       MSG_process_create_with_arguments(spawn_name, bcast_spawned_send, 
+                                         NULL, MSG_host_self(), 2, myargv);
+    }
+    
+    for(i=1;i<communicator_size;i++){
+      sprintf(task_name,"process%d_wait", i);
+      DEBUG1("get on %s", task_name);
+      MSG_task_receive(&task,task_name);      
+      MSG_task_destroy(task);
+      task=NULL;
+    }
+    INFO2("%s: all messages sent by %s have been received",
+         name, process_name);
+  } else {
+    INFO2("%s: %s receives", name, process_name);
+    MSG_task_receive(&task, name);
+    MSG_task_destroy(task);
+    INFO2("%s: %s has received", name,process_name);
+    sprintf(task_name,"%s_wait", process_name);
+    DEBUG1("put on %s", task_name);
+    MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
+  }
+
+  MSG_process_set_data(MSG_process_self(), (void*)counters);
+  free(name);
+}
+
+
 static void sleep(xbt_dynar_t action)
 {
   char *name = xbt_str_join(action, " ");
 static void sleep(xbt_dynar_t action)
 {
   char *name = xbt_str_join(action, " ");
@@ -146,6 +227,12 @@ static void sleep(xbt_dynar_t action)
   free(name);
 }
 
   free(name);
 }
 
+static void comm_size(xbt_dynar_t action)
+{
+  char *size = xbt_dynar_get_as(action, 2, char *);
+  communicator_size = parse_double(size);
+}
+
 static void compute(xbt_dynar_t action)
 {
   char *name = xbt_str_join(action, " ");
 static void compute(xbt_dynar_t action)
 {
   char *name = xbt_str_join(action, " ");
@@ -162,7 +249,7 @@ static void compute(xbt_dynar_t action)
 int main(int argc, char *argv[])
 {
   MSG_error_t res = MSG_OK;
 int main(int argc, char *argv[])
 {
   MSG_error_t res = MSG_OK;
-
+  
   /* Check the given arguments */
   MSG_global_init(&argc, argv);
   if (argc < 4) {
   /* Check the given arguments */
   MSG_global_init(&argc, argv);
   if (argc < 4) {
@@ -179,12 +266,14 @@ int main(int argc, char *argv[])
   MSG_launch_application(argv[2]);
 
   /*   Action registration */
   MSG_launch_application(argv[2]);
 
   /*   Action registration */
+  MSG_action_register("comm_size", comm_size);
   MSG_action_register("send", send);
   MSG_action_register("Isend", Isend);
   MSG_action_register("recv", recv);
   MSG_action_register("Irecv", Irecv);
   MSG_action_register("wait", wait);
   MSG_action_register("barrier", barrier);
   MSG_action_register("send", send);
   MSG_action_register("Isend", Isend);
   MSG_action_register("recv", recv);
   MSG_action_register("Irecv", Irecv);
   MSG_action_register("wait", wait);
   MSG_action_register("barrier", barrier);
+  MSG_action_register("bcast", bcast);
   MSG_action_register("sleep", sleep);
   MSG_action_register("compute", compute);
 
   MSG_action_register("sleep", sleep);
   MSG_action_register("compute", compute);
 
diff --git a/examples/msg/actions/actions_bcast.txt b/examples/msg/actions/actions_bcast.txt
new file mode 100644 (file)
index 0000000..437ecef
--- /dev/null
@@ -0,0 +1,19 @@
+process0 comm_size 3
+process0 bcast 5e8
+process1 bcast 5e8
+process2 bcast 5e8
+
+process0 compute 5e8
+process1 compute 2e8
+process2 compute 5e8
+
+process0 bcast 5e8
+process1 bcast 5e8
+process2 bcast 5e8
+
+
+process0 bcast 5e8
+process1 bcast 5e8
+process2 bcast 5e8
+
+
index 3e59843..3f0ae37 100644 (file)
@@ -11,4 +11,5 @@ toto send titi 1e9
 titi Irecv
 titi compute 5e8
 titi wait
 titi Irecv
 titi compute 5e8
 titi wait
-titi send tutu 1e9
\ No newline at end of file
+titi Isend tutu 1e9
+titi compute 5e8
diff --git a/examples/msg/actions/bcast_deployment.xml b/examples/msg/actions/bcast_deployment.xml
new file mode 100644 (file)
index 0000000..589bdcb
--- /dev/null
@@ -0,0 +1,7 @@
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "simgrid.dtd">
+<platform version="2">
+  <process host="Tremblay" function="process0"/>
+  <process host="Jupiter" function="process1"/>
+  <process host="Fafard" function="process2"/>
+</platform>