#include "simix/simix.h" /* semaphores for the barrier */
#include "xbt.h" /* calloc, printf */
#include "simgrid_config.h" /* getline */
+#include "instr/instr_private.h"
XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
"Messages specific for this msg example");
int communicator_size = 0;
typedef struct coll_ctr_t {
+ int last_Irecv_sender_id;
int bcast_counter;
int reduce_counter;
int allReduce_counter;
return value;
}
+static int get_rank (const char *process_name)
+{
+ return atoi(&(process_name[1]));
+}
/* My actions */
static void action_send(xbt_dynar_t action)
char to[250];
char *size = xbt_dynar_get_as(action, 3, char *);
double clock = MSG_get_clock();
+
sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),
xbt_dynar_get_as(action, 2, char *));
// char *to = xbt_dynar_get_as(action, 2, char *);
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
name = xbt_str_join(action, " ");
+#ifdef HAVE_TRACING
+ int rank = get_rank(MSG_process_get_name(MSG_process_self()));
+ int dst_traced = get_rank(xbt_dynar_get_as(action, 2, char *));
+ TRACE_smpi_ptp_in(rank, rank, dst_traced, "send");
+ TRACE_smpi_send(rank, rank, dst_traced);
+#endif
+
DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
VERB2("%s %f", name, MSG_get_clock() - clock);
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
free(name);
+
+#ifdef HAVE_TRACING
+ TRACE_smpi_ptp_out(rank, rank, dst_traced, "send");
+#endif
}
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
name = xbt_str_join(action, " ");
+#ifdef HAVE_TRACING
+ int rank = get_rank(MSG_process_get_name(MSG_process_self()));
+ int src_traced = get_rank(xbt_dynar_get_as(action, 2, char *));
+ TRACE_smpi_ptp_in(rank, src_traced, rank, "recv");
+#endif
+
DEBUG1("Receiving: %s", name);
MSG_task_receive(&task, mailbox_name);
// MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
- DEBUG2("%s %f", name, MSG_get_clock() - clock);
+ VERB2("%s %f", name, MSG_get_clock() - clock);
MSG_task_destroy(task);
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
free(name);
+#ifdef HAVE_TRACING
+ TRACE_smpi_ptp_out(rank, src_traced, rank, "recv");
+ TRACE_smpi_recv(rank, src_traced, rank);
+#endif
+
}
static int spawned_recv(int argc, char *argv[])
char mailbox_name[250];
char **myargv;
double clock = MSG_get_clock();
+
DEBUG1("Irecv on %s: spawn process ",
MSG_process_get_name(MSG_process_self()));
+#ifdef HAVE_TRACING
+ int rank = get_rank(MSG_process_get_name(MSG_process_self()));
+ int src_traced = get_rank(xbt_dynar_get_as(action, 2, char *));
+ coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
+ if (!counters) {
+ DEBUG0("Initialize the counters");
+ counters = (coll_ctr) calloc(1, sizeof(struct coll_ctr_t));
+ }
+ counters->last_Irecv_sender_id = src_traced;
+ MSG_process_set_data(MSG_process_self(), (void *) counters);
+
+ TRACE_smpi_ptp_in(rank, src_traced, rank, "Irecv");
+#endif
sprintf(mailbox_name, "%s_%s", xbt_dynar_get_as(action, 2, char *),
MSG_process_get_name(MSG_process_self()));
VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
free(name);
+#ifdef HAVE_TRACING
+ TRACE_smpi_ptp_out(rank, src_traced, rank, "Irecv");
+#endif
+
}
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
name = xbt_str_join(action, " ");
+#ifdef HAVE_TRACING
+ coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
+ int src_traced = counters->last_Irecv_sender_id;
+ int rank = get_rank(MSG_process_get_name(MSG_process_self()));
+ TRACE_smpi_ptp_in(rank, src_traced, rank, "wait");
+#endif
DEBUG1("Entering %s", name);
sprintf(task_name, "%s_wait", MSG_process_get_name(MSG_process_self()));
VERB2("%s %f", name, MSG_get_clock() - clock);
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
free(name);
+#ifdef HAVE_TRACING
+ TRACE_smpi_ptp_out(rank, src_traced, rank, "wait");
+ TRACE_smpi_recv(rank, src_traced, rank);
+#endif
+
}
/* FIXME: that's a poor man's implementation: we should take the message exchanges into account */
-smx_sem_t barrier_semaphore = NULL;
static void barrier(xbt_dynar_t action)
{
char *name = NULL;
+ static smx_mutex_t mutex = NULL;
+ static smx_cond_t cond = NULL;
+ static int processes_arrived_sofar=0;
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
name = xbt_str_join(action, " ");
- DEBUG1("Entering barrier: %s", name);
- if (barrier_semaphore == NULL) // first arriving on the barrier
- barrier_semaphore = SIMIX_sem_init(0);
+ if (mutex == NULL) { // first arriving on the barrier
+ mutex = SIMIX_req_mutex_init();
+ cond = SIMIX_req_cond_init();
+ processes_arrived_sofar=0;
+ }
+ DEBUG2("Entering barrier: %s (%d already there)", name,processes_arrived_sofar);
- if (SIMIX_sem_get_capacity(barrier_semaphore) == -communicator_size + 1) { // last arriving
- SIMIX_sem_release_forever(barrier_semaphore);
- SIMIX_sem_destroy(barrier_semaphore);
- barrier_semaphore = NULL;
- } else { // not last
- SIMIX_sem_acquire(barrier_semaphore);
+ SIMIX_req_mutex_lock(mutex);
+ if (++processes_arrived_sofar == communicator_size) {
+ SIMIX_req_cond_broadcast(cond);
+ SIMIX_req_mutex_unlock(mutex);
+ } else {
+ SIMIX_req_cond_wait(cond,mutex);
+ SIMIX_req_mutex_unlock(mutex);
}
DEBUG1("Exiting barrier: %s", name);
+ processes_arrived_sofar--;
+ if (!processes_arrived_sofar) {
+ SIMIX_req_cond_destroy(cond);
+ SIMIX_req_mutex_destroy(mutex);
+ mutex=NULL;
+ }
+
if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
free(name);
static void comm_size(xbt_dynar_t action)
{
+ char *name = NULL;
char *size = xbt_dynar_get_as(action, 2, char *);
+ double clock = MSG_get_clock();
+
+ if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
+ name = xbt_str_join(action, " ");
communicator_size = parse_double(size);
+ VERB2("%s %f", name, MSG_get_clock() - clock);
+ if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
+ free(name);
}
static void compute(xbt_dynar_t action)
free(name);
}
+static void init(xbt_dynar_t action)
+{
+#ifdef HAVE_TRACING
+ TRACE_smpi_init(get_rank(MSG_process_get_name(MSG_process_self())));
+#endif
+}
+
+static void finalize(xbt_dynar_t action)
+{
+#ifdef HAVE_TRACING
+ TRACE_smpi_finalize(get_rank(MSG_process_get_name(MSG_process_self())));
+#endif
+ coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
+ if (counters)
+ free(counters);
+}
+
/** Main function */
int main(int argc, char *argv[])
{
MSG_launch_application(argv[2]);
/* Action registration */
+ MSG_action_register("init", init);
+ MSG_action_register("finalize", finalize);
MSG_action_register("comm_size", comm_size);
MSG_action_register("send", action_send);
MSG_action_register("Isend", Isend);
MSG_action_register("sleep", action_sleep);
MSG_action_register("compute", compute);
-
+
/* Actually do the simulation using MSG_action_trace_run */
res = MSG_action_trace_run(argv[3]); // it's ok to pass a NULL argument here