"Messages specific for this msg example");
int communicator_size = 0;
+static void action_Isend(xbt_dynar_t action);
+
typedef struct {
int last_Irecv_sender_id;
int bcast_counter;
int reduce_counter;
int allReduce_counter;
+ xbt_dynar_t isends;
} s_process_globals_t, *process_globals_t;
/* Helper function */
return atoi(&(process_name[1]));
}
+static void asynchronous_cleanup(void) {
+ process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+
+ /* Destroy any isend which correspond to completed communications */
+ int found;
+ msg_comm_t comm;
+ while ((found = MSG_comm_testany(globals->isends)) != -1) {
+ xbt_dynar_remove_at(globals->isends,found,&comm);
+ MSG_comm_destroy(comm);
+ }
+}
+
/* My actions */
static int spawned_send(int argc, char *argv[])
{
DEBUG2("Entering Send: %s (size: %lg)", name, size);
if (size<65536) {
- char spawn_name[80];
- char **myargv;
- m_process_t comm_helper;
- DEBUG1("Isend on %s: spawn process ",
- MSG_process_get_name(MSG_process_self()));
- myargv = (char **) calloc(3, sizeof(char *));
- myargv[0] = xbt_strdup(to);
- myargv[1] = xbt_strdup(size_str);
- myargv[2] = NULL;
-
- sprintf(spawn_name, "%s_wait", to);
- comm_helper =
- MSG_process_create_with_arguments(spawn_name, spawned_send,
- NULL, MSG_host_self(), 2, myargv);
- }
- else {
+ action_Isend(action);
+ } else {
MSG_task_send(MSG_task_create(name, 0, size, NULL), to);
}
#ifdef HAVE_TRACING
TRACE_smpi_ptp_out(rank, rank, dst_traced, "send");
-#endif
+#endif
+
+ asynchronous_cleanup();
}
static void action_Isend(xbt_dynar_t action)
{
- char spawn_name[80];
char to[250];
// char *to = xbt_dynar_get_as(action, 2, char *);
char *size = xbt_dynar_get_as(action, 3, char *);
- char **myargv;
- m_process_t comm_helper;
double clock = MSG_get_clock();
- DEBUG1("Isend on %s: spawn process ",
- MSG_process_get_name(MSG_process_self()));
+ process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+
sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),
xbt_dynar_get_as(action, 2, char *));
- myargv = (char **) calloc(3, sizeof(char *));
- myargv[0] = xbt_strdup(to);
- myargv[1] = xbt_strdup(size);
- myargv[2] = NULL;
+ msg_comm_t comm =
+ MSG_task_isend( MSG_task_create(to,0,parse_double(size),NULL), to);
+ xbt_dynar_push(globals->isends,&comm);
- // sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
- sprintf(spawn_name, "%s_wait", to);
- comm_helper =
- MSG_process_create_with_arguments(spawn_name, spawned_send,
- NULL, MSG_host_self(), 2, myargv);
+ DEBUG1("Isend on %s", MSG_process_get_name(MSG_process_self()));
VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
+
+ asynchronous_cleanup();
}
TRACE_smpi_recv(rank, src_traced, rank);
#endif
+ asynchronous_cleanup();
}
static int spawned_recv(int argc, char *argv[])
TRACE_smpi_ptp_out(rank, src_traced, rank, "Irecv");
#endif
+ asynchronous_cleanup();
}
TRACE_smpi_init(get_rank(MSG_process_get_name(MSG_process_self())));
#endif
DEBUG0("Initialize the counters");
- counters = (process_globals_t) calloc(1, sizeof(s_process_globals_t));
+ process_globals_t globals = (process_globals_t) calloc(1, sizeof(s_process_globals_t));
+ globals->isends = xbt_dynar_new(sizeof(msg_comm_t),NULL);
+ MSG_process_set_data(MSG_process_self(),globals);
+
}
static void action_finalize(xbt_dynar_t action)