X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/f2dfa42a467e09f316eec0f0c948b1609c4e1fb8..4cc5d70a3fe754f95107d487e4efcf5bc7cd85a8:/examples/msg/actions/actions.c diff --git a/examples/msg/actions/actions.c b/examples/msg/actions/actions.c index b6b74897a4..44bfb41b7e 100644 --- a/examples/msg/actions/actions.c +++ b/examples/msg/actions/actions.c @@ -9,15 +9,17 @@ #include #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */ #include "xbt.h" /* calloc, printf */ - -typedef enum { - LOCAL = 0, - MAX_CHANNEL -} channel_t; +#include "simgrid_config.h" /* getline */ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example"); +int communicator_size=0; +typedef struct coll_ctr_t{ + int bcast_counter; + int reduce_counter; + int allReduce_counter; +} *coll_ctr; /* Helper function */ static double parse_double(const char *string) { @@ -30,27 +32,76 @@ static double parse_double(const char *string) { return value; } + /* My actions */ static void send(xbt_dynar_t action) { char *name = xbt_str_join(action, " "); - char *to = xbt_dynar_get_as(action, 2, char *); + char to[250]; char *size = xbt_dynar_get_as(action, 3, char *); - INFO2("Send: %s (size: %lg)", name, parse_double(size)); + double clock = MSG_get_clock(); + sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()), + xbt_dynar_get_as(action, 2, char *)); + // char *to = xbt_dynar_get_as(action, 2, char *); + DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size)); MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to); - INFO1("Sent %s", name); + DEBUG2("%s %f", name, MSG_get_clock()-clock); free(name); } + +static int spawned_send(int argc, char *argv[]) +{ + DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()), + argv[1],argv[0]); + MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL), + argv[0]); + return 0; +} + +static void Isend(xbt_dynar_t action) +{ + char spawn_name[80]; + char to[250]; + // char *to = xbt_dynar_get_as(action, 2, char *); + char *size = xbt_dynar_get_as(action, 3, char *); + char **myargv; + m_process_t comm_helper; + double clock = MSG_get_clock(); + DEBUG1("Isend on %s: spawn process ", + MSG_process_get_name(MSG_process_self())); + + sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()), + xbt_dynar_get_as(action, 2, char *)); + myargv = (char**) calloc (3, sizeof (char*)); + + myargv[0] = xbt_strdup(to); + myargv[1] = xbt_strdup(size); + myargv[2] = NULL; + + // sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self())); + sprintf(spawn_name,"%s_wait",to); + comm_helper = + MSG_process_create_with_arguments(spawn_name, spawned_send, + NULL, MSG_host_self(), 2, myargv); + DEBUG2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock); +} + + static void recv(xbt_dynar_t action) { char *name = xbt_str_join(action, " "); + char mailbox_name[250]; m_task_t task = NULL; - INFO1("Receiving: %s", name); + double clock = MSG_get_clock(); //FIXME: argument of action ignored so far; semantic not clear //char *from=xbt_dynar_get_as(action,2,char*); - MSG_task_receive(&task, MSG_process_get_name(MSG_process_self())); - INFO1("Received %s", MSG_task_get_name(task)); + sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *), + MSG_process_get_name(MSG_process_self())); + DEBUG1("Receiving: %s", name); + MSG_task_receive(&task, mailbox_name); + // MSG_task_receive(&task, MSG_process_get_name(MSG_process_self())); + DEBUG2("%s %f", name, MSG_get_clock()-clock); MSG_task_destroy(task); free(name); } @@ -58,64 +109,338 @@ static void recv(xbt_dynar_t action) static int spawned_recv(int argc, char *argv[]) { m_task_t task = NULL; - char* name = (char *) MSG_process_get_data(MSG_process_self()); - INFO1("Receiving on %s", name); - MSG_task_receive(&task, name); - INFO1("Received %s", MSG_task_get_name(task)); - - MSG_task_put(MSG_task_create("waiter", 0, 0, NULL), - MSG_process_get_host(MSG_process_self()), LOCAL); + DEBUG1("Receiving on %s", argv[0]); + MSG_task_receive(&task, argv[0]); + DEBUG1("Received %s", MSG_task_get_name(task)); + DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self())); + MSG_task_send(MSG_task_create("waiter",0,0,NULL), + MSG_process_get_name(MSG_process_self())); MSG_task_destroy(task); return 0; } + static void Irecv(xbt_dynar_t action) { char *name = xbt_str_join(action, " "); m_process_t comm_helper; - - INFO1("Irecv on %s: spawn process ", + char mailbox_name[250]; + char **myargv; + double clock = MSG_get_clock(); + DEBUG1("Irecv on %s: spawn process ", MSG_process_get_name(MSG_process_self())); - comm_helper = - MSG_process_create("spawned_recv", - spawned_recv, - (void *) MSG_process_get_name(MSG_process_self()), - MSG_process_get_host(MSG_process_self())); + sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *), + MSG_process_get_name(MSG_process_self())); + sprintf(name,"%s_wait",MSG_process_get_name(MSG_process_self())); + myargv = (char**) calloc (2, sizeof (char*)); + + myargv[0] = xbt_strdup(mailbox_name); + myargv[1] = NULL; + comm_helper = MSG_process_create_with_arguments(name,spawned_recv, + NULL, MSG_host_self(), + 1, myargv); + + DEBUG2("%s %f", xbt_str_join(action, " "), + MSG_get_clock()-clock); free(name); } -static void wait(xbt_dynar_t action) + +static void wait_action(xbt_dynar_t action) { char *name = xbt_str_join(action, " "); + char task_name[80]; m_task_t task = NULL; - INFO1("wait: %s", name); - MSG_task_get(&(task), LOCAL); - INFO1("waited: %s", name); + double clock = MSG_get_clock(); + + DEBUG1("Entering %s", name); + sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self())); + DEBUG1("wait: %s", task_name); + MSG_task_receive(&task,task_name); + MSG_task_destroy(task); + DEBUG2("%s %f", name, MSG_get_clock()-clock); + free(name); +} + +static void barrier (xbt_dynar_t action) +{ + char *name = xbt_str_join(action, " "); + DEBUG1("barrier: %s", name); + + + free(name); + +} + +static void reduce(xbt_dynar_t action) +{ + int i; + char *name; + char task_name[80]; + char spawn_name[80]; + char **myargv; + char *comm_size = xbt_dynar_get_as(action, 2, char *); + char *comp_size = xbt_dynar_get_as(action, 3, char *); + m_process_t comm_helper=NULL; + m_task_t task=NULL, comp_task=NULL; + const char* process_name; + double clock = MSG_get_clock(); + + coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self()); + + xbt_assert0(communicator_size, "Size of Communicator is not defined" + ", can't use collective operations"); + + process_name = MSG_process_get_name(MSG_process_self()); + + if (!counters){ + DEBUG0("Initialize the counters"); + counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t)); + } + + name = bprintf("reduce_%d", counters->reduce_counter++); + + if (!strcmp(process_name, "p0")){ + DEBUG2("%s: %s is the Root",name, process_name); + for(i=1;ibcast_counter++); + if (!strcmp(process_name, "p0")){ + DEBUG2("%s: %s is the Root",name, process_name); + + for(i=1;iallReduce_counter++); + + if (!strcmp(process_name, "p0")){ + DEBUG2("%s: %s is the Root",name, process_name); + for(i=1;i