Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
replay : add actions scan and exscan
authorAugustin Degomme <adegomme@users.noreply.github.com>
Sat, 7 Aug 2021 18:38:13 +0000 (20:38 +0200)
committerAugustin Degomme <adegomme@users.noreply.github.com>
Sat, 7 Aug 2021 18:38:13 +0000 (20:38 +0200)
include/simgrid/smpi/smpi_replay.hpp
src/smpi/bindings/smpi_pmpi_coll.cpp
src/smpi/internals/smpi_replay.cpp

index ec856a6..c15f273 100644 (file)
@@ -161,6 +161,11 @@ public:
   void parse(xbt::ReplayAction& action, const std::string& name) override;
 };
 
+class ScanArgParser : public CollCommParser {
+public:
+  void parse(xbt::ReplayAction& action, const std::string& name) override;
+};
+
 class AllToAllVArgParser : public CollCommParser {
 public:
   int recv_size_sum;
@@ -340,6 +345,12 @@ public:
   void kernel(xbt::ReplayAction& action) override;
 };
 
+class ScanAction : public ReplayAction<ScanArgParser> {
+public:
+  using ReplayAction::ReplayAction;
+  void kernel(xbt::ReplayAction& action) override;
+};
+
 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
 public:
   explicit AllToAllVAction() : ReplayAction("alltoallv") {}
index ad484c2..5ca09f8 100644 (file)
@@ -559,7 +559,7 @@ int PMPI_Iscan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
   const void* real_sendbuf = smpi_get_in_place_buf(sendbuf, recvbuf, tmp_sendbuf, count, datatype);
 
   TRACE_smpi_comm_in(pid, request == MPI_REQUEST_IGNORED ? "PMPI_Scan" : "PMPI_Iscan",
-                     new simgrid::instr::CollTIData(request == MPI_REQUEST_IGNORED ? "scan" : "iscan", -1, -1.0,
+                     new simgrid::instr::CollTIData(request == MPI_REQUEST_IGNORED ? "scan" : "iscan", -1, 0.0,
                                                     count, 0, simgrid::smpi::Datatype::encode(datatype), ""));
 
   int retval;
@@ -594,8 +594,8 @@ int PMPI_Iexscan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype dat
   const void* real_sendbuf = smpi_get_in_place_buf(sendbuf, recvbuf, tmp_sendbuf, count, datatype);
 
   TRACE_smpi_comm_in(pid, request == MPI_REQUEST_IGNORED ? "PMPI_Exscan" : "PMPI_Iexscan",
-                     new simgrid::instr::Pt2PtTIData(request == MPI_REQUEST_IGNORED ? "exscan" : "iexscan", -1,
-                                                     count, simgrid::smpi::Datatype::encode(datatype)));
+                     new simgrid::instr::CollTIData(request == MPI_REQUEST_IGNORED ? "exscan" : "iexscan", -1, 0.0,
+                                                    count, 0, simgrid::smpi::Datatype::encode(datatype), ""));
 
   int retval;
   if (request == MPI_REQUEST_IGNORED)
index d23817d..966fbdb 100644 (file)
@@ -8,6 +8,7 @@
 #include "smpi_datatype.hpp"
 #include "smpi_group.hpp"
 #include "smpi_request.hpp"
+#include "simgrid/s4u/Exec.hpp"
 #include "xbt/replay.hpp"
 #include <simgrid/smpi/smpi_replay.hpp>
 #include <src/smpi/include/private.hpp>
@@ -378,9 +379,10 @@ void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std
 
 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
 {
-  CHECK_ACTION_PARAMS(action, 1, 1)
+  CHECK_ACTION_PARAMS(action, 2, 1)
   size      = parse_integer<size_t>(action[2]);
-  datatype1 = parse_datatype(action, 3);
+  comp_size = parse_double(action[3]);
+  datatype1 = parse_datatype(action, 4);
 }
 
 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
@@ -616,8 +618,11 @@ void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
                 args.root, MPI_COMM_WORLD);
-  if(args.comp_size != 0.0)
-    private_execute_flops(args.comp_size);
+  if (args.comp_size != 0.0)
+    simgrid::s4u::this_actor::exec_init(args.comp_size)
+      ->set_name("computation")
+      ->start()
+      ->wait();
 
   TRACE_smpi_comm_out(get_pid());
 }
@@ -632,8 +637,11 @@ void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
                    MPI_COMM_WORLD);
-  if(args.comp_size != 0.0)
-    private_execute_flops(args.comp_size);
+  if (args.comp_size != 0.0)
+    simgrid::s4u::this_actor::exec_init(args.comp_size)
+      ->set_name("computation")
+      ->start()
+      ->wait();
 
   TRACE_smpi_comm_out(get_pid());
 }
@@ -735,14 +743,40 @@ void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
   TRACE_smpi_comm_in(
       get_pid(), "action_reducescatter",
       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
-                                        std::to_string(args.comp_size), /* ugly hack to print comp_size */
+                                        std::to_string(args.comp_size),
                                         Datatype::encode(args.datatype1)));
 
   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
+  if (args.comp_size != 0.0)
+    simgrid::s4u::this_actor::exec_init(args.comp_size)
+      ->set_name("computation")
+      ->start()
+      ->wait();
+  TRACE_smpi_comm_out(get_pid());
+}
 
-  private_execute_flops(args.comp_size);
+void ScanAction::kernel(simgrid::xbt::ReplayAction&)
+{
+  const ScanArgParser& args = get_args();
+  TRACE_smpi_comm_in(get_pid(), "action_scan",
+                     new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
+                     args.size, 0, Datatype::encode(args.datatype1), ""));
+  if (get_name() == "scan")
+    colls::scan(send_buffer(args.size * args.datatype1->size()),
+              recv_buffer(args.size * args.datatype1->size()), args.size,
+              args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
+  else
+    colls::exscan(send_buffer(args.size * args.datatype1->size()),
+              recv_buffer(args.size * args.datatype1->size()), args.size,
+              args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
+
+  if (args.comp_size != 0.0)
+    simgrid::s4u::this_actor::exec_init(args.comp_size)
+      ->set_name("computation")
+      ->start()
+      ->wait();
   TRACE_smpi_comm_out(get_pid());
 }
 
@@ -802,6 +836,8 @@ void smpi_replay_init(const char* instance_id, int rank, double start_delay_flop
   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
+  xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
+  xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });