#include "simix/simix.h" /* semaphores for the barrier */
#include "xbt.h" /* calloc, printf */
#include "simgrid_config.h" /* getline */
-#include "instr/private.h"
+#include "instr/instr_private.h"
XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
"Messages specific for this msg example");
int communicator_size = 0;
-typedef struct coll_ctr_t {
+static void action_Isend(xbt_dynar_t action);
+
+typedef struct {
int last_Irecv_sender_id;
int bcast_counter;
int reduce_counter;
int allReduce_counter;
-} *coll_ctr;
+ xbt_dynar_t isends;
+} s_process_globals_t, *process_globals_t;
/* Helper function */
static double parse_double(const char *string)
return atoi(&(process_name[1]));
}
+static void asynchronous_cleanup(void) {
+ process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+
+ /* Destroy any isend which correspond to completed communications */
+ int found;
+ msg_comm_t comm;
+ while ((found = MSG_comm_testany(globals->isends)) != -1) {
+ xbt_dynar_remove_at(globals->isends,found,&comm);
+ MSG_comm_destroy(comm);
+ }
+}
+
/* My actions */
+static int spawned_send(int argc, char *argv[])
+{
+ DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()),
+ argv[1], argv[0]);
+ MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL),
+ argv[0]);
+ return 0;
+}
+
static void action_send(xbt_dynar_t action)
{
char *name = NULL;
char to[250];
- char *size = xbt_dynar_get_as(action, 3, char *);
- double clock = MSG_get_clock();
-
+ char *size_str = xbt_dynar_get_as(action, 3, char *);
+ double size=parse_double(size_str);
+ double clock = MSG_get_clock(); /* this "call" is free thanks to inlining */
+
sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),
xbt_dynar_get_as(action, 2, char *));
// char *to = xbt_dynar_get_as(action, 2, char *);
TRACE_smpi_send(rank, rank, dst_traced);
#endif
- DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
- MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
- VERB2("%s %f", name, MSG_get_clock() - clock);
+ DEBUG2("Entering Send: %s (size: %lg)", name, size);
+ if (size<65536) {
+ action_Isend(action);
+ } else {
+ MSG_task_send(MSG_task_create(name, 0, size, NULL), to);
+ }
+
+ VERB2("%s %f", name, MSG_get_clock() - clock);
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
free(name);
#ifdef HAVE_TRACING
TRACE_smpi_ptp_out(rank, rank, dst_traced, "send");
-#endif
-}
-
+#endif
-static int spawned_send(int argc, char *argv[])
-{
- DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()),
- argv[1], argv[0]);
- MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL),
- argv[0]);
- return 0;
+ asynchronous_cleanup();
}
-static void Isend(xbt_dynar_t action)
+static void action_Isend(xbt_dynar_t action)
{
- char spawn_name[80];
char to[250];
// char *to = xbt_dynar_get_as(action, 2, char *);
char *size = xbt_dynar_get_as(action, 3, char *);
- char **myargv;
- m_process_t comm_helper;
double clock = MSG_get_clock();
- DEBUG1("Isend on %s: spawn process ",
- MSG_process_get_name(MSG_process_self()));
+ process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+
sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),
xbt_dynar_get_as(action, 2, char *));
- myargv = (char **) calloc(3, sizeof(char *));
- myargv[0] = xbt_strdup(to);
- myargv[1] = xbt_strdup(size);
- myargv[2] = NULL;
+ msg_comm_t comm =
+ MSG_task_isend( MSG_task_create(to,0,parse_double(size),NULL), to);
+ xbt_dynar_push(globals->isends,&comm);
- // sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
- sprintf(spawn_name, "%s_wait", to);
- comm_helper =
- MSG_process_create_with_arguments(spawn_name, spawned_send,
- NULL, MSG_host_self(), 2, myargv);
+ DEBUG1("Isend on %s", MSG_process_get_name(MSG_process_self()));
VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
+
+ asynchronous_cleanup();
}
TRACE_smpi_recv(rank, src_traced, rank);
#endif
+ asynchronous_cleanup();
}
static int spawned_recv(int argc, char *argv[])
}
-static void Irecv(xbt_dynar_t action)
+static void action_Irecv(xbt_dynar_t action)
{
char *name;
m_process_t comm_helper;
#ifdef HAVE_TRACING
int rank = get_rank(MSG_process_get_name(MSG_process_self()));
int src_traced = get_rank(xbt_dynar_get_as(action, 2, char *));
- coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
- if (!counters) {
- DEBUG0("Initialize the counters");
- counters = (coll_ctr) calloc(1, sizeof(struct coll_ctr_t));
- }
+ process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
counters->last_Irecv_sender_id = src_traced;
MSG_process_set_data(MSG_process_self(), (void *) counters);
TRACE_smpi_ptp_out(rank, src_traced, rank, "Irecv");
#endif
+ asynchronous_cleanup();
}
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
name = xbt_str_join(action, " ");
#ifdef HAVE_TRACING
- coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
+ process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
int src_traced = counters->last_Irecv_sender_id;
int rank = get_rank(MSG_process_get_name(MSG_process_self()));
TRACE_smpi_ptp_in(rank, src_traced, rank, "wait");
}
/* FIXME: that's a poor man's implementation: we should take the message exchanges into account */
-static void barrier(xbt_dynar_t action)
+static void action_barrier(xbt_dynar_t action)
{
char *name = NULL;
static smx_mutex_t mutex = NULL;
}
-static void reduce(xbt_dynar_t action)
+static void action_reduce(xbt_dynar_t action)
{
int i;
char *name;
const char *process_name;
double clock = MSG_get_clock();
- coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
+ process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
xbt_assert0(communicator_size, "Size of Communicator is not defined"
", can't use collective operations");
process_name = MSG_process_get_name(MSG_process_self());
- if (!counters) {
- DEBUG0("Initialize the counters");
- counters = (coll_ctr) calloc(1, sizeof(struct coll_ctr_t));
- }
-
name = bprintf("reduce_%d", counters->reduce_counter++);
if (!strcmp(process_name, "p0")) {
free(name);
}
-static void bcast(xbt_dynar_t action)
+static void action_bcast(xbt_dynar_t action)
{
int i;
char *name;
m_process_t comm_helper = NULL;
m_task_t task = NULL;
char *size = xbt_dynar_get_as(action, 2, char *);
- coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
+ process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
double clock = MSG_get_clock();
xbt_assert0(communicator_size, "Size of Communicator is not defined"
process_name = MSG_process_get_name(MSG_process_self());
- if (!counters) {
- DEBUG0("Initialize the counters");
- counters = (coll_ctr) calloc(1, sizeof(struct coll_ctr_t));
- }
name = bprintf("bcast_%d", counters->bcast_counter++);
if (!strcmp(process_name, "p0")) {
free(name);
}
-static void allReduce(xbt_dynar_t action)
+static void action_allReduce(xbt_dynar_t action)
{
int i;
char *name;
const char *process_name;
double clock = MSG_get_clock();
- coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
+ process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
xbt_assert0(communicator_size, "Size of Communicator is not defined"
", can't use collective operations");
process_name = MSG_process_get_name(MSG_process_self());
- if (!counters) {
- DEBUG0("Initialize the counters");
- counters = (coll_ctr) calloc(1, sizeof(struct coll_ctr_t));
- }
-
name = bprintf("allReduce_%d", counters->allReduce_counter++);
if (!strcmp(process_name, "p0")) {
free(name);
}
-static void comm_size(xbt_dynar_t action)
+static void action_comm_size(xbt_dynar_t action)
{
char *name = NULL;
char *size = xbt_dynar_get_as(action, 2, char *);
free(name);
}
-static void compute(xbt_dynar_t action)
+static void action_compute(xbt_dynar_t action)
{
char *name = NULL;
char *amout = xbt_dynar_get_as(action, 2, char *);
free(name);
}
-static void init(xbt_dynar_t action)
+static void action_init(xbt_dynar_t action)
{
#ifdef HAVE_TRACING
TRACE_smpi_init(get_rank(MSG_process_get_name(MSG_process_self())));
#endif
+ DEBUG0("Initialize the counters");
+ process_globals_t globals = (process_globals_t) calloc(1, sizeof(s_process_globals_t));
+ globals->isends = xbt_dynar_new(sizeof(msg_comm_t),NULL);
+ MSG_process_set_data(MSG_process_self(),globals);
+
}
-static void finalize(xbt_dynar_t action)
+static void action_finalize(xbt_dynar_t action)
{
#ifdef HAVE_TRACING
TRACE_smpi_finalize(get_rank(MSG_process_get_name(MSG_process_self())));
#endif
- coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
+ process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
if (counters)
free(counters);
}
MSG_launch_application(argv[2]);
/* Action registration */
- MSG_action_register("init", init);
- MSG_action_register("finalize", finalize);
- MSG_action_register("comm_size", comm_size);
- MSG_action_register("send", action_send);
- MSG_action_register("Isend", Isend);
- MSG_action_register("recv", action_recv);
- MSG_action_register("Irecv", Irecv);
- MSG_action_register("wait", action_wait);
- MSG_action_register("barrier", barrier);
- MSG_action_register("bcast", bcast);
- MSG_action_register("reduce", reduce);
- MSG_action_register("allReduce", allReduce);
- MSG_action_register("sleep", action_sleep);
- MSG_action_register("compute", compute);
-
-
+ MSG_action_register("init", action_init);
+ MSG_action_register("finalize", action_finalize);
+ MSG_action_register("comm_size",action_comm_size);
+ MSG_action_register("send", action_send);
+ MSG_action_register("Isend", action_Isend);
+ MSG_action_register("recv", action_recv);
+ MSG_action_register("Irecv", action_Irecv);
+ MSG_action_register("wait", action_wait);
+ MSG_action_register("barrier", action_barrier);
+ MSG_action_register("bcast", action_bcast);
+ MSG_action_register("reduce", action_reduce);
+ MSG_action_register("allReduce",action_allReduce);
+ MSG_action_register("sleep", action_sleep);
+ MSG_action_register("compute", action_compute);
+
+
/* Actually do the simulation using MSG_action_trace_run */
res = MSG_action_trace_run(argv[3]); // it's ok to pass a NULL argument here