From 26960d5a5c8b7036f1ea69bd3790978ecfa1da25 Mon Sep 17 00:00:00 2001 From: suter Date: Thu, 5 Nov 2009 22:48:55 +0000 Subject: [PATCH] First implementation of allReduce action (Refactoring will be needed at some point) All involved processes exit at the same time git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6833 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- examples/msg/actions/actions.c | 120 ++++++++++++++++++--- examples/msg/actions/actions_allReduce.txt | 9 ++ examples/msg/actions/actions_bcast.txt | 9 +- 3 files changed, 122 insertions(+), 16 deletions(-) create mode 100644 examples/msg/actions/actions_allReduce.txt diff --git a/examples/msg/actions/actions.c b/examples/msg/actions/actions.c index 9f79b73029..ca1ee5a881 100644 --- a/examples/msg/actions/actions.c +++ b/examples/msg/actions/actions.c @@ -18,6 +18,7 @@ int communicator_size=0; typedef struct coll_ctr_t{ int bcast_counter; int reduce_counter; + int allReduce_counter; } *coll_ctr; /* Helper function */ @@ -63,7 +64,7 @@ static void Isend(xbt_dynar_t action) m_process_t comm_helper; INFO1("Isend on %s: spawn process ", - MSG_process_self()->name); + MSG_process_get_name(MSG_process_self())); myargv = (char**) calloc (3, sizeof (char*)); @@ -71,7 +72,7 @@ static void Isend(xbt_dynar_t action) myargv[1] = xbt_strdup(size); myargv[2] = NULL; - sprintf(spawn_name,"%s_wait",MSG_process_self()->name); + sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self())); comm_helper = MSG_process_create_with_arguments(spawn_name, spawned_send, NULL, MSG_host_self(), 2, myargv); @@ -94,12 +95,12 @@ static void recv(xbt_dynar_t action) static int spawned_recv(int argc, char *argv[]) { m_task_t task = NULL; - char* name = (char *) MSG_process_self()->data; + char* name = (char *) MSG_process_get_data(MSG_process_self()); INFO1("Receiving on %s", name); MSG_task_receive(&task, name); INFO1("Received %s", MSG_task_get_name(task)); - INFO1("waiter on %s", MSG_process_self()->name); - MSG_task_send(MSG_task_create("waiter",0,0,NULL),MSG_process_self()->name); + INFO1("waiter on %s", MSG_process_get_name(MSG_process_self())); + MSG_task_send(MSG_task_create("waiter",0,0,NULL),MSG_process_get_name(MSG_process_self())); MSG_task_destroy(task); return 0; @@ -114,9 +115,9 @@ static void Irecv(xbt_dynar_t action) INFO1("Irecv on %s: spawn process ", MSG_process_get_name(MSG_process_self())); - sprintf(name,"%s_wait",MSG_process_self()->name); + sprintf(name,"%s_wait",MSG_process_get_name(MSG_process_self())); comm_helper = MSG_process_create(name,spawned_recv, - (void *) MSG_process_self()->name, + (void *) MSG_process_get_name(MSG_process_self()), MSG_host_self()); @@ -131,7 +132,7 @@ static void wait(xbt_dynar_t action) m_task_t task = NULL; INFO1("wait: %s", name); - sprintf(task_name,"%s_wait",MSG_process_self()->name); + sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self())); MSG_task_receive(&task,task_name); MSG_task_destroy(task); INFO1("waited: %s", name); @@ -165,9 +166,7 @@ static void reduce(xbt_dynar_t action) xbt_assert0(communicator_size, "Size of Communicator is not defined" ", can't use collective operations"); - MSG_process_self()->data=NULL; - - process_name = MSG_process_self()->name; + process_name = MSG_process_get_name(MSG_process_self()); if (!counters){ DEBUG0("Initialize the counters"); @@ -228,9 +227,8 @@ static void bcast (xbt_dynar_t action) xbt_assert0(communicator_size, "Size of Communicator is not defined" ", can't use collective operations"); - MSG_process_self()->data=NULL; - process_name = MSG_process_self()->name; + process_name = MSG_process_get_name(MSG_process_self()); if (!counters){ DEBUG0("Initialize the counters"); counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t)); @@ -286,6 +284,101 @@ static void sleep(xbt_dynar_t action) free(name); } +static void allReduce(xbt_dynar_t action) +{ + int i; + char *name; + char task_name[80]; + char spawn_name[80]; + char **myargv; + char *comm_size = xbt_dynar_get_as(action, 2, char *); + char *comp_size = xbt_dynar_get_as(action, 3, char *); + m_process_t comm_helper=NULL; + m_task_t task=NULL, comp_task=NULL; + const char* process_name; + + coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self()); + + xbt_assert0(communicator_size, "Size of Communicator is not defined" + ", can't use collective operations"); + + process_name = MSG_process_get_name(MSG_process_self()); + + if (!counters){ + DEBUG0("Initialize the counters"); + counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t)); + } + + name = bprintf("allReduce_%d", counters->allReduce_counter++); + + if (!strcmp(process_name, "process0")){ + INFO2("%s: %s is the Root",name, process_name); + for(i=1;i