Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Reduce scope for variables.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
index 878b3b1..2af987f 100644 (file)
@@ -61,7 +61,7 @@ public:
 };
 }
 
-typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
+typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
 
 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
@@ -165,6 +165,19 @@ void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&
   flops = parse_double(action[2]);
 }
 
+void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
+{
+  CHECK_ACTION_PARAMS(action, 1, 0)
+  time = parse_double(action[2]);
+}
+
+void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
+{
+  CHECK_ACTION_PARAMS(action, 2, 0)
+  filename = std::string(action[2]);
+  line = std::stoi(action[3]);
+}
+
 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
   CHECK_ACTION_PARAMS(action, 1, 2)
@@ -260,13 +273,12 @@ void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::stri
     if (action.size() > 5 + comm_size)
       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
   } else {
-    int datatype_index = 0;
     int disp_index     = 0;
     /* The 3 comes from "0 gather <sendcount>", which must always be present.
      * The + comm_size is the recvcounts array, which must also be present
      */
     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
-      datatype_index = 3 + comm_size;
+      int datatype_index = 3 + comm_size;
       disp_index     = datatype_index + 1;
       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
@@ -274,7 +286,7 @@ void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::stri
                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
       disp_index = 3 + comm_size;
     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
-      datatype_index = 3 + comm_size;
+      int datatype_index = 3 + comm_size;
       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
     }
@@ -477,9 +489,23 @@ void RecvAction::kernel(simgrid::xbt::ReplayAction&)
 
 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
 {
-  TRACE_smpi_computing_in(my_proc_id, args.flops);
-  smpi_execute_flops(args.flops);
-  TRACE_smpi_computing_out(my_proc_id);
+  if (simgrid::config::get_value<bool>("smpi/simulate-computation")) {
+    smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
+  }
+}
+
+void SleepAction::kernel(simgrid::xbt::ReplayAction&)
+{
+  XBT_DEBUG("Sleep for: %lf secs", args.time);
+  int rank = simgrid::s4u::this_actor::get_pid();
+  TRACE_smpi_sleeping_in(rank, args.time);
+  simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
+  TRACE_smpi_sleeping_out(rank);
+}
+
+void LocationAction::kernel(simgrid::xbt::ReplayAction&)
+{
+  smpi_trace_set_call_location(args.filename.c_str(), args.line);
 }
 
 void TestAction::kernel(simgrid::xbt::ReplayAction&)
@@ -493,7 +519,8 @@ void TestAction::kernel(simgrid::xbt::ReplayAction&)
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
 
     MPI_Status status;
-    int flag = Request::test(&request, &status);
+    int flag = 0;
+    Request::test(&request, &status, &flag);
 
     XBT_DEBUG("MPI_Test result: %d", flag);
     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
@@ -574,7 +601,7 @@ void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
 
   Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
-  smpi_execute_flops(args.comp_size);
+  private_execute_flops(args.comp_size);
 
   TRACE_smpi_comm_out(my_proc_id);
 }
@@ -586,7 +613,7 @@ void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
 
   Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
-  smpi_execute_flops(args.comp_size);
+  private_execute_flops(args.comp_size);
 
   TRACE_smpi_comm_out(my_proc_id);
 }
@@ -683,7 +710,7 @@ void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
       recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
       args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
 
-  smpi_execute_flops(args.comp_size);
+  private_execute_flops(args.comp_size);
   TRACE_smpi_comm_out(my_proc_id);
 }
 
@@ -706,11 +733,12 @@ static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
 /** @brief Only initialize the replay, don't do it for real */
 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
 {
-  if (not smpi_process()->initializing()){
-    simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
-    simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
-    simgrid::smpi::ActorExt::init();
-  }
+  xbt_assert(not smpi_process()->initializing());
+
+  simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
+  simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
+  simgrid::smpi::ActorExt::init();
+
   smpi_process()->mark_as_initialized();
   smpi_process()->set_replaying(true);
 
@@ -746,11 +774,13 @@ void smpi_replay_init(const char* instance_id, int rank, double start_delay_flop
   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
+  xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
+  xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
 
   //if we have a delayed start, sleep here.
   if (start_delay_flops > 0) {
     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
-    smpi_execute_flops(start_delay_flops);
+    private_execute_flops(start_delay_flops);
   } else {
     // Wait for the other actors to initialize also
     simgrid::s4u::this_actor::yield();
@@ -795,7 +825,6 @@ void smpi_replay_main(int rank, const char* trace_filename)
   smpi_process()->finalize();
 
   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
-  TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
 }
 
 /** @brief chain a replay initialization and a replay start */