X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/0a1c995ee798730127d6aed9968a244625e40286..9a877ba41cb963c7d32a71d935b29e37b01f1617:/examples/msg/actions/actions.c diff --git a/examples/msg/actions/actions.c b/examples/msg/actions/actions.c index 59c639de0e..f1f5fbd350 100644 --- a/examples/msg/actions/actions.c +++ b/examples/msg/actions/actions.c @@ -6,77 +6,532 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include -#include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */ -#include "xbt.h" /* calloc, printf */ +#include +#include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */ +#include "xbt.h" /* calloc, printf */ +#include "simgrid_config.h" /* getline */ + +XBT_LOG_NEW_DEFAULT_CATEGORY(actions, + "Messages specific for this msg example"); +int communicator_size=0; + +typedef struct coll_ctr_t{ + int bcast_counter; + int reduce_counter; + int allReduce_counter; +} *coll_ctr; + +/* Helper function */ +static double parse_double(const char *string) { + double value; + char *endptr; + + value=strtod(string, &endptr); + if (*endptr != '\0') + THROW1(unknown_error, 0, "%s is not a double", string); + return value; +} -XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,"Messages specific for this msg example"); /* My actions */ -static void send(xbt_dynar_t action) { - char *name=xbt_str_join(action," "); - char *to = xbt_dynar_get_as(action,2,char*); - char *size=xbt_dynar_get_as(action,3,char*); - INFO1("Send: %s",name); - MSG_task_send(MSG_task_create(name, 0, atoi(size), NULL), to); - INFO1("Sent %s",name); +static void send(xbt_dynar_t action) +{ + char *name = NULL; + char to[250]; + char *size = xbt_dynar_get_as(action, 3, char *); + double clock = MSG_get_clock(); + sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()), + xbt_dynar_get_as(action, 2, char *)); + // char *to = xbt_dynar_get_as(action, 2, char *); + + if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug)) + name = xbt_str_join(action, " "); + + DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size)); + MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to); + DEBUG2("%s %f", name, MSG_get_clock()-clock); + + if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug)) + free(name); +} + + +static int spawned_send(int argc, char *argv[]) +{ + DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()), + argv[1],argv[0]); + MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL), + argv[0]); + return 0; +} + +static void Isend(xbt_dynar_t action) +{ + char spawn_name[80]; + char to[250]; + // char *to = xbt_dynar_get_as(action, 2, char *); + char *size = xbt_dynar_get_as(action, 3, char *); + char **myargv; + m_process_t comm_helper; + double clock = MSG_get_clock(); + DEBUG1("Isend on %s: spawn process ", + MSG_process_get_name(MSG_process_self())); + + sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()), + xbt_dynar_get_as(action, 2, char *)); + myargv = (char**) calloc (3, sizeof (char*)); + + myargv[0] = xbt_strdup(to); + myargv[1] = xbt_strdup(size); + myargv[2] = NULL; + + // sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self())); + sprintf(spawn_name,"%s_wait",to); + comm_helper = + MSG_process_create_with_arguments(spawn_name, spawned_send, + NULL, MSG_host_self(), 2, myargv); + DEBUG2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock); +} + + +static void recv(xbt_dynar_t action) +{ + char *name = NULL; + char mailbox_name[250]; + m_task_t task = NULL; + double clock = MSG_get_clock(); + //FIXME: argument of action ignored so far; semantic not clear + //char *from=xbt_dynar_get_as(action,2,char*); + sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *), + MSG_process_get_name(MSG_process_self())); + + if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug)) + name = xbt_str_join(action, " "); + + DEBUG1("Receiving: %s", name); + MSG_task_receive(&task, mailbox_name); + // MSG_task_receive(&task, MSG_process_get_name(MSG_process_self())); + DEBUG2("%s %f", name, MSG_get_clock()-clock); + MSG_task_destroy(task); + + if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug)) + free(name); +} + +static int spawned_recv(int argc, char *argv[]) +{ + m_task_t task = NULL; + DEBUG1("Receiving on %s", argv[0]); + MSG_task_receive(&task, argv[0]); + DEBUG1("Received %s", MSG_task_get_name(task)); + DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self())); + MSG_task_send(MSG_task_create("waiter",0,0,NULL), + MSG_process_get_name(MSG_process_self())); + + MSG_task_destroy(task); + return 0; +} + + +static void Irecv(xbt_dynar_t action) +{ + char *name; + m_process_t comm_helper; + char mailbox_name[250]; + char **myargv; + double clock = MSG_get_clock(); + DEBUG1("Irecv on %s: spawn process ", + MSG_process_get_name(MSG_process_self())); + + sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *), + MSG_process_get_name(MSG_process_self())); + name = bprintf("%s_wait",MSG_process_get_name(MSG_process_self())); + myargv = (char**) calloc (2, sizeof (char*)); + + myargv[0] = xbt_strdup(mailbox_name); + myargv[1] = NULL; + comm_helper = MSG_process_create_with_arguments(name,spawned_recv, + NULL, MSG_host_self(), + 1, myargv); + + DEBUG2("%s %f", xbt_str_join(action, " "), + MSG_get_clock()-clock); + + free(name); +} + + +static void wait_action(xbt_dynar_t action) +{ + char *name = NULL; + char task_name[80]; + m_task_t task = NULL; + double clock = MSG_get_clock(); + + if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug)) + name = xbt_str_join(action, " "); + + DEBUG1("Entering %s", name); + sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self())); + DEBUG1("wait: %s", task_name); + MSG_task_receive(&task,task_name); + MSG_task_destroy(task); + DEBUG2("%s %f", name, MSG_get_clock()-clock); + if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug)) + free(name); +} + +/* FIXME: that's a poor man's implementation: we should take the message exchanges into account */ +smx_sem_t barrier_semaphore=NULL; +static void barrier (xbt_dynar_t action) +{ + char *name = NULL; + + if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug)) + name = xbt_str_join(action, " "); + + DEBUG1("Entering barrier: %s", name); + if (barrier_semaphore == NULL) // first arriving on the barrier + barrier_semaphore = SIMIX_sem_init(0); + + if (SIMIX_sem_get_capacity(barrier_semaphore)==-communicator_size +1) { // last arriving + SIMIX_sem_release_forever(barrier_semaphore); + SIMIX_sem_destroy(barrier_semaphore); + barrier_semaphore = NULL; + } else { // not last + SIMIX_sem_acquire(barrier_semaphore); + } + + DEBUG1("Exiting barrier: %s", name); + + if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug)) + free(name); + +} + +static void reduce(xbt_dynar_t action) +{ + int i; + char *name; + char task_name[80]; + char spawn_name[80]; + char **myargv; + char *comm_size = xbt_dynar_get_as(action, 2, char *); + char *comp_size = xbt_dynar_get_as(action, 3, char *); + m_process_t comm_helper=NULL; + m_task_t task=NULL, comp_task=NULL; + const char* process_name; + double clock = MSG_get_clock(); + + coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self()); + + xbt_assert0(communicator_size, "Size of Communicator is not defined" + ", can't use collective operations"); + + process_name = MSG_process_get_name(MSG_process_self()); + + if (!counters){ + DEBUG0("Initialize the counters"); + counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t)); + } + + name = bprintf("reduce_%d", counters->reduce_counter++); + + if (!strcmp(process_name, "p0")){ + DEBUG2("%s: %s is the Root",name, process_name); + for(i=1;ibcast_counter++); + if (!strcmp(process_name, "p0")){ + DEBUG2("%s: %s is the Root",name, process_name); + + for(i=1;iallReduce_counter++); + + if (!strcmp(process_name, "p0")){ + DEBUG2("%s: %s is the Root",name, process_name); + for(i=1;i