Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Cleanup WaitAction a bit
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
index d38ce07..b12e1e1 100644 (file)
@@ -116,7 +116,8 @@ public:
     double start_time = smpi_process()->simulated_elapsed();
     args.parse(action);
     kernel(action);
-    log_timed_action(action, start_time);
+    if (name != "Init")
+      log_timed_action(action, start_time);
   }
 
   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
@@ -127,9 +128,6 @@ public:
   WaitAction() : ReplayAction("Wait") {}
   void kernel(simgrid::xbt::ReplayAction& action) override
   {
-    CHECK_ACTION_PARAMS(action, 0, 0)
-    MPI_Status status;
-
     std::string s = boost::algorithm::join(action, " ");
     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
     MPI_Request request = get_reqq_self()->back();
@@ -143,18 +141,20 @@ public:
 
     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
 
-    MPI_Group group          = request->comm()->group();
-    int src_traced           = group->rank(request->src());
-    int dst_traced           = group->rank(request->dst());
+    // Must be taken before Request::wait() since the request may be set to
+    // MPI_REQUEST_NULL by Request::wait!
+    int src                  = request->comm()->group()->rank(request->src());
+    int dst                  = request->comm()->group()->rank(request->dst());
     bool is_wait_for_receive = (request->flags() & RECV);
     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
 
+    MPI_Status status;
     Request::wait(&request, &status);
 
     TRACE_smpi_comm_out(rank);
     if (is_wait_for_receive)
-      TRACE_smpi_recv(src_traced, dst_traced, 0);
+      TRACE_smpi_recv(src, dst, 0);
   }
 };
 
@@ -254,120 +254,69 @@ public:
   }
 };
 
-} // Replay Namespace
-
-static void action_init(simgrid::xbt::ReplayAction& action)
-{
-  XBT_DEBUG("Initialize the counters");
-  CHECK_ACTION_PARAMS(action, 0, 1)
-  if (action.size() > 2)
-    MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
-  else
-    MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
-
-  /* start a simulated timer */
-  smpi_process()->simulated_start();
-  /*initialize the number of active processes */
-  active_processes = smpi_process_count();
-
-  set_reqq_self(new std::vector<MPI_Request>);
-}
-
-static void action_finalize(simgrid::xbt::ReplayAction& action)
-{
-  /* Nothing to do */
-}
-
-static void action_comm_size(simgrid::xbt::ReplayAction& action)
-{
-  log_timed_action (action, smpi_process()->simulated_elapsed());
-}
-
-static void action_comm_split(simgrid::xbt::ReplayAction& action)
-{
-  log_timed_action (action, smpi_process()->simulated_elapsed());
-}
-
-static void action_comm_dup(simgrid::xbt::ReplayAction& action)
-{
-  log_timed_action (action, smpi_process()->simulated_elapsed());
-}
-
-static void action_compute(simgrid::xbt::ReplayAction& action)
-{
-  Replay::ComputeAction().execute(action);
-}
-
-static void action_test(simgrid::xbt::ReplayAction& action)
-{
-  CHECK_ACTION_PARAMS(action, 0, 0)
-  double clock = smpi_process()->simulated_elapsed();
-  MPI_Status status;
-
-  MPI_Request request = get_reqq_self()->back();
-  get_reqq_self()->pop_back();
-  //if request is null here, this may mean that a previous test has succeeded
-  //Different times in traced application and replayed version may lead to this
-  //In this case, ignore the extra calls.
-  if(request!=nullptr){
-    int my_proc_id = Actor::self()->getPid();
-    TRACE_smpi_testing_in(my_proc_id);
-
-    int flag = Request::test(&request, &status);
+class InitAction : public ReplayAction<ActionArgParser> {
+public:
+  InitAction() : ReplayAction("Init") {}
+  void kernel(simgrid::xbt::ReplayAction& action) override
+  {
+    CHECK_ACTION_PARAMS(action, 0, 1)
+    MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
+                                           : MPI_BYTE;  // default TAU datatype
 
-    XBT_DEBUG("MPI_Test result: %d", flag);
-    /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
-    get_reqq_self()->push_back(request);
+    /* start a simulated timer */
+    smpi_process()->simulated_start();
+    /*initialize the number of active processes */
+    active_processes = smpi_process_count();
 
-    TRACE_smpi_testing_out(my_proc_id);
+    set_reqq_self(new std::vector<MPI_Request>);
   }
-  log_timed_action (action, clock);
-}
-
-static void action_waitall(simgrid::xbt::ReplayAction& action)
-{
-  CHECK_ACTION_PARAMS(action, 0, 0)
-  double clock = smpi_process()->simulated_elapsed();
-  const unsigned int count_requests = get_reqq_self()->size();
+};
 
-  if (count_requests>0) {
-    MPI_Status status[count_requests];
+class CommunicatorAction : public ReplayAction<ActionArgParser> {
+public:
+  CommunicatorAction() : ReplayAction("Comm") {}
+  void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
+};
 
-    int my_proc_id_traced = Actor::self()->getPid();
-    TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
-                       new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
-    int recvs_snd[count_requests];
-    int recvs_rcv[count_requests];
-    for (unsigned int i = 0; i < count_requests; i++) {
-      const auto& req = (*get_reqq_self())[i];
-      if (req && (req->flags() & RECV)) {
-        recvs_snd[i] = req->src();
-        recvs_rcv[i] = req->dst();
-      } else
-        recvs_snd[i] = -100;
-   }
-   Request::waitall(count_requests, &(*get_reqq_self())[0], status);
-
-   for (unsigned i = 0; i < count_requests; i++) {
-     if (recvs_snd[i]!=-100)
-       TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
-   }
-   TRACE_smpi_comm_out(my_proc_id_traced);
+class WaitAllAction : public ReplayAction<ActionArgParser> {
+public:
+  WaitAllAction() : ReplayAction("waitAll") {}
+  void kernel(simgrid::xbt::ReplayAction& action) override
+  {
+    const unsigned int count_requests = get_reqq_self()->size();
+
+    if (count_requests > 0) {
+      TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
+                         new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
+      std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
+      for (const auto& req : (*get_reqq_self())) {
+        if (req && (req->flags() & RECV)) {
+          sender_receiver.push_back({req->src(), req->dst()});
+        }
+      }
+      MPI_Status status[count_requests];
+      Request::waitall(count_requests, &(*get_reqq_self())[0], status);
+
+      for (auto& pair : sender_receiver) {
+        TRACE_smpi_recv(pair.first, pair.second, 0);
+      }
+      TRACE_smpi_comm_out(my_proc_id);
+    }
   }
-  log_timed_action (action, clock);
-}
-
-static void action_barrier(simgrid::xbt::ReplayAction& action)
-{
-  double clock = smpi_process()->simulated_elapsed();
-  int my_proc_id = Actor::self()->getPid();
-  TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
+};
 
-  Colls::barrier(MPI_COMM_WORLD);
+class BarrierAction : public ReplayAction<ActionArgParser> {
+public:
+  BarrierAction() : ReplayAction("barrier") {}
+  void kernel(simgrid::xbt::ReplayAction& action) override
+  {
+    TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
+    Colls::barrier(MPI_COMM_WORLD);
+    TRACE_smpi_comm_out(my_proc_id);
+  }
+};
 
-  TRACE_smpi_comm_out(my_proc_id);
-  log_timed_action (action, clock);
-}
+} // Replay Namespace
 
 static void action_bcast(simgrid::xbt::ReplayAction& action)
 {
@@ -829,30 +778,19 @@ void smpi_replay_init(int* argc, char*** argv)
   TRACE_smpi_computing_init(my_proc_id);
   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
   TRACE_smpi_comm_out(my_proc_id);
-  xbt_replay_action_register("init",       simgrid::smpi::action_init);
-  xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
-  xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
-  xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
-  xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
-
-  std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
-  std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
-  std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
-  std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
-  std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
-
-  xbt_replay_action_register("send",
-                             std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
-  xbt_replay_action_register("Isend",
-                             std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
-  xbt_replay_action_register("recv",
-                             std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
-  xbt_replay_action_register("Irecv",
-                             std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
-  xbt_replay_action_register("test", simgrid::smpi::action_test);
-  xbt_replay_action_register("wait",
-                             std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
-  xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
+  xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
+  xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
+  xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
+  xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
+  xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
+
+  xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
+  xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
+  xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
+  xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
+  xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
+  xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
+  xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
@@ -866,7 +804,7 @@ void smpi_replay_init(int* argc, char*** argv)
   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
-  xbt_replay_action_register("compute", simgrid::smpi::action_compute);
+  xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
 
   //if we have a delayed start, sleep here.
   if(*argc>2){