Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
first try at replaying alltoall. Not tested yet
[simgrid.git] / src / smpi / smpi_replay.c
index 636a46e..8431291 100644 (file)
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
 
 int communicator_size = 0;
+static int active_processes = 0;
+
+static void log_timed_action (const char *const *action, double clock){
+  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
+    char *name = xbt_str_join_array(action, " ");
+    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
+    free(name);
+  }
+}
 
 typedef struct {
   xbt_dynar_t isends; /* of MPI_Request */
@@ -41,11 +50,12 @@ static void action_init(const char *const *action)
 
   /* start a simulated timer */
   smpi_process_simulated_start();
+  /*initialize the number of active processes */
+  active_processes = smpi_process_count();
 }
 
 static void action_finalize(const char *const *action)
 {
-  double sim_time= 1.;
   smpi_replay_globals_t globals =
       (smpi_replay_globals_t) smpi_process_get_user_data();
 
@@ -56,12 +66,6 @@ static void action_finalize(const char *const *action)
     xbt_dynar_free_container(&(globals->irecvs));
   }
   free(globals);
-  /* end the simulated timer */
-  sim_time = smpi_process_simulated_elapsed();
-  if (!smpi_process_index())
-    XBT_INFO("Simulation time %g", sim_time);
-  smpi_process_finalize();
-  smpi_process_destroy();
 }
 
 static void action_comm_size(const char *const *action)
@@ -69,25 +73,29 @@ static void action_comm_size(const char *const *action)
   double clock = smpi_process_simulated_elapsed();
 
   communicator_size = parse_double(action[2]);
+  log_timed_action (action, clock);
+}
 
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+static void action_comm_split(const char *const *action)
+{
+  double clock = smpi_process_simulated_elapsed();
+
+  log_timed_action (action, clock);
 }
 
+static void action_comm_dup(const char *const *action)
+{
+  double clock = smpi_process_simulated_elapsed();
+
+  log_timed_action (action, clock);
+}
 
 static void action_compute(const char *const *action)
 {
   double clock = smpi_process_simulated_elapsed();
   smpi_execute_flops(parse_double(action[2]));
 
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+  log_timed_action (action, clock);
 }
 
 static void action_send(const char *const *action)
@@ -105,11 +113,7 @@ static void action_send(const char *const *action)
 
   smpi_mpi_send(NULL, size, MPI_BYTE, to , 0, MPI_COMM_WORLD);
 
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+  log_timed_action (action, clock);
 
   #ifdef HAVE_TRACING
   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
@@ -143,12 +147,7 @@ static void action_Isend(const char *const *action)
 
   xbt_dynar_push(globals->isends,&request);
 
-  //TODO do the asynchronous cleanup
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+  log_timed_action (action, clock);
 }
 
 static void action_recv(const char *const *action) {
@@ -172,11 +171,7 @@ static void action_recv(const char *const *action) {
   TRACE_smpi_computing_in(rank);
 #endif
 
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+  log_timed_action (action, clock);
 }
 
 static void action_Irecv(const char *const *action)
@@ -201,12 +196,7 @@ static void action_Irecv(const char *const *action)
 #endif
   xbt_dynar_push(globals->irecvs,&request);
 
-  //TODO do the asynchronous cleanup
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+  log_timed_action (action, clock);
 }
 
 static void action_wait(const char *const *action){
@@ -241,11 +231,13 @@ static void action_wait(const char *const *action){
   TRACE_smpi_computing_in(rank);
 #endif
 
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+  log_timed_action (action, clock);
+}
+
+static void action_waitall(const char *const *action){
+  double clock = smpi_process_simulated_elapsed();
+//  smpi_mpi_waitall(count, requests, status);
+  log_timed_action (action, clock);
 }
 
 static void action_barrier(const char *const *action){
@@ -261,11 +253,7 @@ static void action_barrier(const char *const *action){
   TRACE_smpi_computing_in(rank);
 #endif
 
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+  log_timed_action (action, clock);
 }
 
 static void action_bcast(const char *const *action)
@@ -285,11 +273,7 @@ static void action_bcast(const char *const *action)
   TRACE_smpi_computing_in(rank);
 #endif
 
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+  log_timed_action (action, clock);
 }
 
 static void action_reduce(const char *const *action)
@@ -308,11 +292,7 @@ static void action_reduce(const char *const *action)
   TRACE_smpi_computing_in(rank);
 #endif
 
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
-  }
+  log_timed_action (action, clock);
 }
 
 static void action_allReduce(const char *const *action) {
@@ -332,34 +312,87 @@ static void action_allReduce(const char *const *action) {
   TRACE_smpi_computing_in(rank);
 #endif
 
-  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
-    char *name = xbt_str_join_array(action, " ");
-    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
-    free(name);
+  log_timed_action (action, clock);
+}
+
+static void action_allToAll(const char *const *action) {
+  double clock = smpi_process_simulated_elapsed();
+  double comm_size = smpi_comm_size(MPI_COMM_WORLD);
+  double send_size = parse_double(action[2]);
+  double recv_size = parse_double(action[3]);
+
+#ifdef HAVE_TRACING
+  int rank = smpi_process_index();
+  TRACE_smpi_computing_out(rank);
+  TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
+#endif
+
+  if (send_size < 200 && comm_size > 12) {
+    smpi_coll_tuned_alltoall_bruck(NULL, send_size, MPI_BYTE,
+                                   NULL, recv_size, MPI_BYTE,
+                                   MPI_COMM_WORLD);
+  } else if (send_size < 3000) {
+    smpi_coll_tuned_alltoall_basic_linear(NULL, send_size, MPI_BYTE,
+                                          NULL, recv_size, MPI_BYTE,
+                                          MPI_COMM_WORLD);
+  } else {
+    smpi_coll_tuned_alltoall_pairwise(NULL, send_size, MPI_BYTE,
+                                      NULL, recv_size, MPI_BYTE,
+                                      MPI_COMM_WORLD);
   }
+
+#ifdef HAVE_TRACING
+  TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
+  TRACE_smpi_computing_in(rank);
+#endif
+
+  log_timed_action (action, clock);
+}
+
+static void action_allToAllv(const char *const *action) {
+  double clock = smpi_process_simulated_elapsed();
+
+  log_timed_action (action, clock);
 }
 
 void smpi_replay_init(int *argc, char***argv){
   PMPI_Init(argc, argv);
-  _xbt_replay_action_init();
-
-  xbt_replay_action_register("init",     action_init);
-  xbt_replay_action_register("finalize", action_finalize);
-  xbt_replay_action_register("comm_size",action_comm_size);
-  xbt_replay_action_register("send",     action_send);
-  xbt_replay_action_register("Isend",    action_Isend);
-  xbt_replay_action_register("recv",     action_recv);
-  xbt_replay_action_register("Irecv",    action_Irecv);
-  xbt_replay_action_register("wait",     action_wait);
-  xbt_replay_action_register("barrier",  action_barrier);
-  xbt_replay_action_register("bcast",    action_bcast);
-  xbt_replay_action_register("reduce",   action_reduce);
-  xbt_replay_action_register("allReduce",action_allReduce);
-  xbt_replay_action_register("compute",  action_compute);
+  if (!smpi_process_index()){
+    _xbt_replay_action_init();
+    xbt_replay_action_register("init",       action_init);
+    xbt_replay_action_register("finalize",   action_finalize);
+    xbt_replay_action_register("comm_size",  action_comm_size);
+    xbt_replay_action_register("comm_split", action_comm_split);
+    xbt_replay_action_register("comm_dup",   action_comm_dup);
+    xbt_replay_action_register("send",       action_send);
+    xbt_replay_action_register("Isend",      action_Isend);
+    xbt_replay_action_register("recv",       action_recv);
+    xbt_replay_action_register("Irecv",      action_Irecv);
+    xbt_replay_action_register("wait",       action_wait);
+    xbt_replay_action_register("waitAll",    action_waitall);
+    xbt_replay_action_register("barrier",    action_barrier);
+    xbt_replay_action_register("bcast",      action_bcast);
+    xbt_replay_action_register("reduce",     action_reduce);
+    xbt_replay_action_register("allReduce",  action_allReduce);
+    xbt_replay_action_register("allToAll",   action_allToAll);
+    xbt_replay_action_register("allToAllV",  action_allToAllv);
+    xbt_replay_action_register("compute",    action_compute);
+  }
 
   xbt_replay_action_runner(*argc, *argv);
 }
 
 int smpi_replay_finalize(){
+  double sim_time= 1.;
+  /* One active process will stop. Decrease the counter*/
+  active_processes--;
+
+  if(!active_processes){
+    /* Last process alive speaking */
+    /* end the simulated timer */
+    sim_time = smpi_process_simulated_elapsed();
+    XBT_INFO("Simulation time %g", sim_time);
+    _xbt_replay_action_exit();
+  }
   return PMPI_Finalize();
 }