Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Cleanup WaitAction a bit
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
index b8f9165..4861690 100644 (file)
@@ -22,7 +22,6 @@ using simgrid::s4u::Actor;
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
 
-static int communicator_size = 0;
 static int active_processes  = 0;
 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
 
@@ -144,9 +143,10 @@ public:
 
     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
 
-    MPI_Group group          = request->comm()->group();
-    int src_traced           = group->rank(request->src());
-    int dst_traced           = group->rank(request->dst());
+    // Must be taken before Request::wait() since the request may be set to
+    // MPI_REQUEST_NULL by Request::wait!
+    int src                  = request->comm()->group()->rank(request->src());
+    int dst                  = request->comm()->group()->rank(request->dst());
     bool is_wait_for_receive = (request->flags() & RECV);
     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
@@ -229,6 +229,32 @@ public:
   }
 };
 
+class TestAction : public ReplayAction<ActionArgParser> {
+public:
+  TestAction() : ReplayAction("Test") {}
+  void kernel(simgrid::xbt::ReplayAction& action) override
+  {
+    MPI_Request request = get_reqq_self()->back();
+    get_reqq_self()->pop_back();
+    // if request is null here, this may mean that a previous test has succeeded
+    // Different times in traced application and replayed version may lead to this
+    // In this case, ignore the extra calls.
+    if (request != nullptr) {
+      TRACE_smpi_testing_in(my_proc_id);
+
+      MPI_Status status;
+      int flag = Request::test(&request, &status);
+
+      XBT_DEBUG("MPI_Test result: %d", flag);
+      /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
+       * nullptr.*/
+      get_reqq_self()->push_back(request);
+
+      TRACE_smpi_testing_out(my_proc_id);
+    }
+  }
+};
+
 } // Replay Namespace
 
 static void action_init(simgrid::xbt::ReplayAction& action)
@@ -255,7 +281,6 @@ static void action_finalize(simgrid::xbt::ReplayAction& action)
 
 static void action_comm_size(simgrid::xbt::ReplayAction& action)
 {
-  communicator_size = parse_double(action[2]);
   log_timed_action (action, smpi_process()->simulated_elapsed());
 }
 
@@ -269,37 +294,6 @@ static void action_comm_dup(simgrid::xbt::ReplayAction& action)
   log_timed_action (action, smpi_process()->simulated_elapsed());
 }
 
-static void action_compute(simgrid::xbt::ReplayAction& action)
-{
-  Replay::ComputeAction().execute(action);
-}
-
-static void action_test(simgrid::xbt::ReplayAction& action)
-{
-  CHECK_ACTION_PARAMS(action, 0, 0)
-  double clock = smpi_process()->simulated_elapsed();
-  MPI_Status status;
-
-  MPI_Request request = get_reqq_self()->back();
-  get_reqq_self()->pop_back();
-  //if request is null here, this may mean that a previous test has succeeded
-  //Different times in traced application and replayed version may lead to this
-  //In this case, ignore the extra calls.
-  if(request!=nullptr){
-    int my_proc_id = Actor::self()->getPid();
-    TRACE_smpi_testing_in(my_proc_id);
-
-    int flag = Request::test(&request, &status);
-
-    XBT_DEBUG("MPI_Test result: %d", flag);
-    /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
-    get_reqq_self()->push_back(request);
-
-    TRACE_smpi_testing_out(my_proc_id);
-  }
-  log_timed_action (action, clock);
-}
-
 static void action_waitall(simgrid::xbt::ReplayAction& action)
 {
   CHECK_ACTION_PARAMS(action, 0, 0)
@@ -811,23 +805,12 @@ void smpi_replay_init(int* argc, char*** argv)
   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
 
-  std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
-  std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
-  std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
-  std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
-  std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
-
-  xbt_replay_action_register("send",
-                             std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
-  xbt_replay_action_register("Isend",
-                             std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
-  xbt_replay_action_register("recv",
-                             std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
-  xbt_replay_action_register("Irecv",
-                             std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
-  xbt_replay_action_register("test", simgrid::smpi::action_test);
-  xbt_replay_action_register("wait",
-                             std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
+  xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
+  xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
+  xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
+  xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
+  xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
+  xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
@@ -842,7 +825,7 @@ void smpi_replay_init(int* argc, char*** argv)
   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
-  xbt_replay_action_register("compute", simgrid::smpi::action_compute);
+  xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
 
   //if we have a delayed start, sleep here.
   if(*argc>2){