Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Actor: make the refcount observable, and improve debug messages
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
index d55ff23..464ba2c 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -73,7 +73,7 @@ void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
 }
 
 /* Helper function */
-static double parse_double(std::string string)
+static double parse_double(const std::string& string)
 {
   return xbt_str_parse_double(string.c_str(), "%s is not a double");
 }
@@ -141,7 +141,7 @@ public:
     }
 };
 
-void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   CHECK_ACTION_PARAMS(action, 3, 0)
   src = std::stoi(action[2]);
@@ -149,7 +149,7 @@ void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
   tag = std::stoi(action[4]);
 }
 
-void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   CHECK_ACTION_PARAMS(action, 3, 1)
   partner = std::stoi(action[2]);
@@ -159,13 +159,13 @@ void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
 }
 
-void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   CHECK_ACTION_PARAMS(action, 1, 0)
   flops = parse_double(action[2]);
 }
 
-void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   CHECK_ACTION_PARAMS(action, 1, 2)
   size = parse_double(action[2]);
@@ -174,7 +174,7 @@ void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
 }
 
-void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   CHECK_ACTION_PARAMS(action, 2, 2)
   comm_size = parse_double(action[2]);
@@ -184,7 +184,7 @@ void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name
     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
 }
 
-void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   CHECK_ACTION_PARAMS(action, 2, 1)
   comm_size = parse_double(action[2]);
@@ -193,7 +193,7 @@ void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string n
     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
 }
 
-void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   CHECK_ACTION_PARAMS(action, 2, 1)
   comm_size = MPI_COMM_WORLD->size();
@@ -206,7 +206,7 @@ void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string na
     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
 }
 
-void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
 {
   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
         0 gather 68 68 0 0 0
@@ -236,7 +236,7 @@ void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name
   }
 }
 
-void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
 {
   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
        0 gather 68 68 10 10 10 0 0 0
@@ -291,7 +291,7 @@ void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string nam
   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
 }
 
-void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
         0 gather 68 68 0 0 0
@@ -313,7 +313,7 @@ void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string nam
     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
 }
 
-void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
      0 gather 68 10 10 10 68 0 0 0
@@ -341,7 +341,7 @@ void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string na
   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
 }
 
-void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
        0 reducescatter 275427 275427 275427 204020 11346849 0
@@ -363,7 +363,7 @@ void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::stri
   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
 }
 
-void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
+void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
         0 alltoallv 100 1 7 10 12 100 1 70 10 5
@@ -424,7 +424,7 @@ void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
     TRACE_smpi_recv(args.src, args.dst, args.tag);
 }
 
-void SendAction::kernel(simgrid::xbt::ReplayAction& action)
+void SendAction::kernel(simgrid::xbt::ReplayAction&)
 {
   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
 
@@ -445,10 +445,8 @@ void SendAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
+void RecvAction::kernel(simgrid::xbt::ReplayAction&)
 {
-  int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
-
   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
         args.tag, Datatype::encode(args.datatype1)));
 
@@ -459,28 +457,32 @@ void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
     args.size = status.count;
   }
 
+  bool is_recv = false; // Help analyzers understanding that status is not used unintialized
   if (name == "recv") {
+    is_recv = true;
     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
   } else if (name == "irecv") {
     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
     req_storage.add(request);
+  } else {
+    THROW_IMPOSSIBLE;
   }
 
   TRACE_smpi_comm_out(my_proc_id);
-  // TODO: Check why this was only activated in the "recv" case and not in the "irecv" case
-  if (name == "recv" && not TRACE_smpi_view_internals()) {
+  if (is_recv && not TRACE_smpi_view_internals()) {
+    int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
   }
 }
 
-void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
+void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
 {
   TRACE_smpi_computing_in(my_proc_id, args.flops);
   smpi_execute_flops(args.flops);
   TRACE_smpi_computing_out(my_proc_id);
 }
 
-void TestAction::kernel(simgrid::xbt::ReplayAction& action)
+void TestAction::kernel(simgrid::xbt::ReplayAction&)
 {
   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
   req_storage.remove(request);
@@ -491,7 +493,8 @@ void TestAction::kernel(simgrid::xbt::ReplayAction& action)
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
 
     MPI_Status status;
-    int flag = Request::test(&request, &status);
+    int flag = 0;
+    Request::test(&request, &status, &flag);
 
     XBT_DEBUG("MPI_Test result: %d", flag);
     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
@@ -515,12 +518,12 @@ void InitAction::kernel(simgrid::xbt::ReplayAction& action)
   smpi_process()->simulated_start();
 }
 
-void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
+void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
 {
   /* nothing to do */
 }
 
-void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
+void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
 {
   const unsigned int count_requests = req_storage.size();
 
@@ -545,14 +548,14 @@ void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
   }
 }
 
-void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
+void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
 {
   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
   Colls::barrier(MPI_COMM_WORLD);
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
+void BcastAction::kernel(simgrid::xbt::ReplayAction&)
 {
   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
@@ -563,7 +566,7 @@ void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
+void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
 {
   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
@@ -577,7 +580,7 @@ void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
+void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
 {
   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
         Datatype::encode(args.datatype1), ""));
@@ -589,7 +592,7 @@ void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
+void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
 {
   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
@@ -603,7 +606,7 @@ void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
+void GatherAction::kernel(simgrid::xbt::ReplayAction&)
 {
   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
@@ -620,7 +623,7 @@ void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
+void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
 {
   int rank = MPI_COMM_WORLD->rank();
 
@@ -642,7 +645,7 @@ void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
+void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
 {
   int rank = MPI_COMM_WORLD->rank();
   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
@@ -655,7 +658,7 @@ void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
+void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
 {
   int rank = MPI_COMM_WORLD->rank();
   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
@@ -670,7 +673,7 @@ void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
+void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
 {
   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
@@ -685,7 +688,7 @@ void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
   TRACE_smpi_comm_out(my_proc_id);
 }
 
-void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
+void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
 {
   TRACE_smpi_comm_in(my_proc_id, __func__,
       new simgrid::instr::VarCollTIData(
@@ -702,11 +705,14 @@ void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
 
 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
 /** @brief Only initialize the replay, don't do it for real */
-void smpi_replay_init(int* argc, char*** argv)
+void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
 {
-  if (not smpi_process()->initializing()){
-    simgrid::smpi::ActorExt::init(argc, argv);
-  }
+  xbt_assert(not smpi_process()->initializing());
+
+  simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
+  simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
+  simgrid::smpi::ActorExt::init();
+
   smpi_process()->mark_as_initialized();
   smpi_process()->set_replaying(true);
 
@@ -717,7 +723,7 @@ void smpi_replay_init(int* argc, char*** argv)
   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
   TRACE_smpi_comm_out(my_proc_id);
   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
-  xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
+  xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
@@ -744,10 +750,9 @@ void smpi_replay_init(int* argc, char*** argv)
   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
 
   //if we have a delayed start, sleep here.
-  if(*argc>2){
-    double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
-    XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
-    smpi_execute_flops(value);
+  if (start_delay_flops > 0) {
+    XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
+    smpi_execute_flops(start_delay_flops);
   } else {
     // Wait for the other actors to initialize also
     simgrid::s4u::this_actor::yield();
@@ -755,12 +760,13 @@ void smpi_replay_init(int* argc, char*** argv)
 }
 
 /** @brief actually run the replay after initialization */
-void smpi_replay_main(int* argc, char*** argv)
+void smpi_replay_main(int rank, const char* trace_filename)
 {
   static int active_processes = 0;
   active_processes++;
   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
-  simgrid::xbt::replay_runner(*argc, *argv);
+  std::string rank_string                      = std::to_string(rank);
+  simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
 
   /* and now, finalize everything */
   /* One active process will stop. Decrease the counter*/
@@ -795,8 +801,8 @@ void smpi_replay_main(int* argc, char*** argv)
 }
 
 /** @brief chain a replay initialization and a replay start */
-void smpi_replay_run(int* argc, char*** argv)
+void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
 {
-  smpi_replay_init(argc, argv);
-  smpi_replay_main(argc, argv);
+  smpi_replay_init(instance_id, rank, start_delay_flops);
+  smpi_replay_main(rank, trace_filename);
 }