X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/970e2c3003a63371db1104cfcff3f684f4f186af..4cc5d70a3fe754f95107d487e4efcf5bc7cd85a8:/examples/msg/actions/actions.c diff --git a/examples/msg/actions/actions.c b/examples/msg/actions/actions.c index f7c017814a..44bfb41b7e 100644 --- a/examples/msg/actions/actions.c +++ b/examples/msg/actions/actions.c @@ -9,9 +9,17 @@ #include #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */ #include "xbt.h" /* calloc, printf */ +#include "simgrid_config.h" /* getline */ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example"); +int communicator_size=0; + +typedef struct coll_ctr_t{ + int bcast_counter; + int reduce_counter; + int allReduce_counter; +} *coll_ctr; /* Helper function */ static double parse_double(const char *string) { @@ -25,63 +33,414 @@ static double parse_double(const char *string) { } -/* Helper function */ -static double parse_double(const char *string) -{ - double value; - char *endptr; - - value = strtod(string, &endptr); - if (*endptr != '\0') - THROW1(unknown_error, 0, "%s is not a double", string); - return value; -} - - /* My actions */ static void send(xbt_dynar_t action) { char *name = xbt_str_join(action, " "); - char *to = xbt_dynar_get_as(action, 2, char *); + char to[250]; char *size = xbt_dynar_get_as(action, 3, char *); - INFO2("Send: %s (size: %lg)", name, parse_double(size)); + double clock = MSG_get_clock(); + sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()), + xbt_dynar_get_as(action, 2, char *)); + // char *to = xbt_dynar_get_as(action, 2, char *); + DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size)); MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to); - INFO1("Sent %s", name); + DEBUG2("%s %f", name, MSG_get_clock()-clock); free(name); } + +static int spawned_send(int argc, char *argv[]) +{ + DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()), + argv[1],argv[0]); + MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL), + argv[0]); + return 0; +} + +static void Isend(xbt_dynar_t action) +{ + char spawn_name[80]; + char to[250]; + // char *to = xbt_dynar_get_as(action, 2, char *); + char *size = xbt_dynar_get_as(action, 3, char *); + char **myargv; + m_process_t comm_helper; + double clock = MSG_get_clock(); + DEBUG1("Isend on %s: spawn process ", + MSG_process_get_name(MSG_process_self())); + + sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()), + xbt_dynar_get_as(action, 2, char *)); + myargv = (char**) calloc (3, sizeof (char*)); + + myargv[0] = xbt_strdup(to); + myargv[1] = xbt_strdup(size); + myargv[2] = NULL; + + // sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self())); + sprintf(spawn_name,"%s_wait",to); + comm_helper = + MSG_process_create_with_arguments(spawn_name, spawned_send, + NULL, MSG_host_self(), 2, myargv); + DEBUG2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock); +} + + static void recv(xbt_dynar_t action) { char *name = xbt_str_join(action, " "); + char mailbox_name[250]; m_task_t task = NULL; - INFO1("Receiving: %s", name); + double clock = MSG_get_clock(); //FIXME: argument of action ignored so far; semantic not clear //char *from=xbt_dynar_get_as(action,2,char*); - MSG_task_receive(&task, MSG_process_get_name(MSG_process_self())); - INFO1("Received %s", MSG_task_get_name(task)); + sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *), + MSG_process_get_name(MSG_process_self())); + DEBUG1("Receiving: %s", name); + MSG_task_receive(&task, mailbox_name); + // MSG_task_receive(&task, MSG_process_get_name(MSG_process_self())); + DEBUG2("%s %f", name, MSG_get_clock()-clock); MSG_task_destroy(task); free(name); } +static int spawned_recv(int argc, char *argv[]) +{ + m_task_t task = NULL; + DEBUG1("Receiving on %s", argv[0]); + MSG_task_receive(&task, argv[0]); + DEBUG1("Received %s", MSG_task_get_name(task)); + DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self())); + MSG_task_send(MSG_task_create("waiter",0,0,NULL), + MSG_process_get_name(MSG_process_self())); + + MSG_task_destroy(task); + return 0; +} + + +static void Irecv(xbt_dynar_t action) +{ + char *name = xbt_str_join(action, " "); + m_process_t comm_helper; + char mailbox_name[250]; + char **myargv; + double clock = MSG_get_clock(); + DEBUG1("Irecv on %s: spawn process ", + MSG_process_get_name(MSG_process_self())); + + sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *), + MSG_process_get_name(MSG_process_self())); + sprintf(name,"%s_wait",MSG_process_get_name(MSG_process_self())); + myargv = (char**) calloc (2, sizeof (char*)); + + myargv[0] = xbt_strdup(mailbox_name); + myargv[1] = NULL; + comm_helper = MSG_process_create_with_arguments(name,spawned_recv, + NULL, MSG_host_self(), + 1, myargv); + + DEBUG2("%s %f", xbt_str_join(action, " "), + MSG_get_clock()-clock); + + free(name); +} + + +static void wait_action(xbt_dynar_t action) +{ + char *name = xbt_str_join(action, " "); + char task_name[80]; + m_task_t task = NULL; + double clock = MSG_get_clock(); + + DEBUG1("Entering %s", name); + sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self())); + DEBUG1("wait: %s", task_name); + MSG_task_receive(&task,task_name); + MSG_task_destroy(task); + DEBUG2("%s %f", name, MSG_get_clock()-clock); + free(name); +} + +static void barrier (xbt_dynar_t action) +{ + char *name = xbt_str_join(action, " "); + DEBUG1("barrier: %s", name); + + + free(name); + +} + +static void reduce(xbt_dynar_t action) +{ + int i; + char *name; + char task_name[80]; + char spawn_name[80]; + char **myargv; + char *comm_size = xbt_dynar_get_as(action, 2, char *); + char *comp_size = xbt_dynar_get_as(action, 3, char *); + m_process_t comm_helper=NULL; + m_task_t task=NULL, comp_task=NULL; + const char* process_name; + double clock = MSG_get_clock(); + + coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self()); + + xbt_assert0(communicator_size, "Size of Communicator is not defined" + ", can't use collective operations"); + + process_name = MSG_process_get_name(MSG_process_self()); + + if (!counters){ + DEBUG0("Initialize the counters"); + counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t)); + } + + name = bprintf("reduce_%d", counters->reduce_counter++); + + if (!strcmp(process_name, "p0")){ + DEBUG2("%s: %s is the Root",name, process_name); + for(i=1;ibcast_counter++); + if (!strcmp(process_name, "p0")){ + DEBUG2("%s: %s is the Root",name, process_name); + + for(i=1;iallReduce_counter++); + + if (!strcmp(process_name, "p0")){ + DEBUG2("%s: %s is the Root",name, process_name); + for(i=1;i