Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Rework SMPI initialization to handle argc and argv earlier than in MPI_Init when...
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
index b52b9ba..d55ff23 100644 (file)
@@ -134,7 +134,10 @@ public:
     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
     void addNullRequest(int src, int dst, int tag)
     {
-      store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
+      store.insert({req_key_t(
+            MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
+            MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
+            tag), MPI_REQUEST_NULL});
     }
 };
 
@@ -250,7 +253,7 @@ void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string nam
   disps      = std::vector<int>(comm_size, 0);
   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
 
-  if (name == "gatherV") {
+  if (name == "gatherv") {
     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
     if (action.size() > 4 + comm_size)
       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
@@ -341,7 +344,7 @@ void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string na
 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
 {
   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
-       0 reduceScatter 275427 275427 275427 204020 11346849 0
+       0 reducescatter 275427 275427 275427 204020 11346849 0
      where:
        1) The first four values after the name of the action declare the recvcounts array
        2) The value 11346849 is the amount of instructions
@@ -362,8 +365,8 @@ void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::stri
 
 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
 {
-  /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
-        0 allToAllV 100 1 7 10 12 100 1 70 10 5
+  /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
+        0 alltoallv 100 1 7 10 12 100 1 70 10 5
      where:
       1) 100 is the size of the send buffer *sizeof(int),
       2) 1 7 10 12 is the sendcounts array
@@ -432,7 +435,7 @@ void SendAction::kernel(simgrid::xbt::ReplayAction& action)
 
   if (name == "send") {
     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
-  } else if (name == "Isend") {
+  } else if (name == "isend") {
     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
     req_storage.add(request);
   } else {
@@ -458,13 +461,13 @@ void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
 
   if (name == "recv") {
     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
-  } else if (name == "Irecv") {
+  } else if (name == "irecv") {
     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
     req_storage.add(request);
   }
 
   TRACE_smpi_comm_out(my_proc_id);
-  // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
+  // TODO: Check why this was only activated in the "recv" case and not in the "irecv" case
   if (name == "recv" && not TRACE_smpi_view_internals()) {
     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
   }
@@ -588,8 +591,8 @@ void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
 
 void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
 {
-  TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
-      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
+  TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
+      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
         Datatype::encode(args.datatype1),
         Datatype::encode(args.datatype2)));
 
@@ -622,10 +625,10 @@ void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
   int rank = MPI_COMM_WORLD->rank();
 
   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
-        name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
+        name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
 
-  if (name == "gatherV") {
+  if (name == "gatherv") {
     Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
         (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
         args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
@@ -670,7 +673,7 @@ void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
 {
   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
-      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
+      new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
         std::to_string(args.comp_size), /* ugly hack to print comp_size */
         Datatype::encode(args.datatype1)));
 
@@ -686,7 +689,7 @@ void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
 {
   TRACE_smpi_comm_in(my_proc_id, __func__,
       new simgrid::instr::VarCollTIData(
-        "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
+        "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
 
   Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
@@ -697,16 +700,17 @@ void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
 } // Replay Namespace
 }} // namespace simgrid::smpi
 
-static std::vector<simgrid::smpi::replay::RequestStorage> storage;
+static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
 /** @brief Only initialize the replay, don't do it for real */
 void smpi_replay_init(int* argc, char*** argv)
 {
-  simgrid::smpi::Process::init(argc, argv);
+  if (not smpi_process()->initializing()){
+    simgrid::smpi::ActorExt::init(argc, argv);
+  }
   smpi_process()->mark_as_initialized();
   smpi_process()->set_replaying(true);
 
   int my_proc_id = simgrid::s4u::this_actor::get_pid();
-  storage.resize(smpi_process_count());
 
   TRACE_smpi_init(my_proc_id);
   TRACE_smpi_computing_init(my_proc_id);
@@ -717,26 +721,26 @@ void smpi_replay_init(int* argc, char*** argv)
   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
-  xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
-  xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
-  xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
-  xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
-  xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
-  xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
-  xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
+  xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
+  xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
+  xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
+  xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
+  xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
+  xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
+  xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
-  xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
-  xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
+  xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
+  xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
-  xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
-  xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
-  xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
+  xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
+  xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
+  xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
-  xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
+  xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
 
   //if we have a delayed start, sleep here.
@@ -745,9 +749,8 @@ void smpi_replay_init(int* argc, char*** argv)
     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
     smpi_execute_flops(value);
   } else {
-    //UGLY: force a context switch to be sure that all MSG_processes begin initialization
-    XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
-    smpi_execute_flops(0.0);
+    // Wait for the other actors to initialize also
+    simgrid::s4u::this_actor::yield();
   }
 }
 
@@ -756,18 +759,19 @@ void smpi_replay_main(int* argc, char*** argv)
 {
   static int active_processes = 0;
   active_processes++;
+  storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
   simgrid::xbt::replay_runner(*argc, *argv);
 
   /* and now, finalize everything */
   /* One active process will stop. Decrease the counter*/
-  unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
+  unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
   if (count_requests > 0) {
     MPI_Request requests[count_requests];
     MPI_Status status[count_requests];
     unsigned int i=0;
 
-    for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
+    for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
       requests[i] = pair.second;
       i++;
     }