Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: C++ify action_allgatherv
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
index 6e82151..a58bfff 100644 (file)
@@ -12,6 +12,7 @@
 #include "smpi_request.hpp"
 #include "xbt/replay.hpp"
 
+#include <memory>
 #include <numeric>
 #include <unordered_map>
 #include <vector>
@@ -25,13 +26,16 @@ static int active_processes  = 0;
 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
 
 static MPI_Datatype MPI_DEFAULT_TYPE;
-static MPI_Datatype MPI_CURRENT_TYPE;
 
 static int sendbuffer_size = 0;
 static char* sendbuffer    = nullptr;
 static int recvbuffer_size = 0;
 static char* recvbuffer    = nullptr;
 
+class ReplayActionArg {
+  ReplayActionArg() {}
+};
+
 static void log_timed_action (const char *const *action, double clock){
   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
     char *name = xbt_str_join_array(action, " ");
@@ -214,7 +218,7 @@ static void action_send(const char *const *action)
   double size=parse_double(action[3]);
   double clock = smpi_process()->simulated_elapsed();
 
-  MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
 
   int my_proc_id = Actor::self()->getPid();
   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
@@ -238,7 +242,7 @@ static void action_Isend(const char *const *action)
   double size=parse_double(action[3]);
   double clock = smpi_process()->simulated_elapsed();
 
-  MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
 
   int my_proc_id = Actor::self()->getPid();
   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
@@ -263,7 +267,7 @@ static void action_recv(const char *const *action) {
   double clock = smpi_process()->simulated_elapsed();
   MPI_Status status;
 
-  MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
 
   int my_proc_id = Actor::self()->getPid();
   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
@@ -294,7 +298,7 @@ static void action_Irecv(const char *const *action)
   double size=parse_double(action[3]);
   double clock = smpi_process()->simulated_elapsed();
 
-  MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
 
   int my_proc_id = Actor::self()->getPid();
   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
@@ -421,7 +425,7 @@ static void action_bcast(const char *const *action)
   double clock = smpi_process()->simulated_elapsed();
   int root     = (action[3]) ? atoi(action[3]) : 0;
   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
-  MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
 
   int my_proc_id = Actor::self()->getPid();
   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
@@ -444,7 +448,7 @@ static void action_reduce(const char *const *action)
   double clock = smpi_process()->simulated_elapsed();
   int root         = (action[4]) ? atoi(action[4]) : 0;
 
-  MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
 
   int my_proc_id = Actor::self()->getPid();
   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
@@ -465,7 +469,7 @@ static void action_allReduce(const char *const *action) {
   double comm_size = parse_double(action[2]);
   double comp_size = parse_double(action[3]);
 
-  MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
 
   double clock = smpi_process()->simulated_elapsed();
   int my_proc_id = Actor::self()->getPid();
@@ -487,7 +491,7 @@ static void action_allToAll(const char *const *action) {
   int comm_size = MPI_COMM_WORLD->size();
   int send_size = parse_double(action[2]);
   int recv_size = parse_double(action[3]);
-  MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
 
   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
@@ -520,7 +524,7 @@ static void action_gather(const char *const *action) {
   int comm_size = MPI_COMM_WORLD->size();
   int send_size = parse_double(action[2]);
   int recv_size = parse_double(action[3]);
-  MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
 
   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
@@ -557,7 +561,7 @@ static void action_scatter(const char* const* action)
   int comm_size                  = MPI_COMM_WORLD->size();
   int send_size                  = parse_double(action[2]);
   int recv_size                  = parse_double(action[3]);
-  MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE  = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
 
   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
@@ -593,10 +597,9 @@ static void action_gatherv(const char *const *action) {
   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
   int send_size = parse_double(action[2]);
   std::vector<int> disps(comm_size, 0);
-  int recvcounts[comm_size];
-  int recv_sum=0;
+  std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
 
-  MPI_CURRENT_TYPE =
+  MPI_Datatype MPI_CURRENT_TYPE =
       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
   MPI_Datatype MPI_CURRENT_TYPE2{
       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
@@ -604,9 +607,9 @@ static void action_gatherv(const char *const *action) {
   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
   void *recv = nullptr;
   for(int i=0;i<comm_size;i++) {
-    recvcounts[i] = atoi(action[i+3]);
-    recv_sum=recv_sum+recvcounts[i];
+    (*recvcounts)[i] = atoi(action[i + 3]);
   }
+  int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
 
   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
   int rank = MPI_COMM_WORLD->rank();
@@ -614,13 +617,11 @@ static void action_gatherv(const char *const *action) {
   if(rank==root)
     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
 
-  std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
-
   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
-                                             "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
+                                             "gatherV", root, send_size, nullptr, -1, recvcounts,
                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
 
-  Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps.data(), MPI_CURRENT_TYPE2, root,
+  Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
                  MPI_COMM_WORLD);
 
   TRACE_smpi_comm_out(Actor::self()->getPid());
@@ -643,10 +644,9 @@ static void action_scatterv(const char* const* action)
   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
   int recv_size = parse_double(action[2 + comm_size]);
   std::vector<int> disps(comm_size, 0);
-  int sendcounts[comm_size];
-  int send_sum = 0;
+  std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
 
-  MPI_CURRENT_TYPE =
+  MPI_Datatype MPI_CURRENT_TYPE =
       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
   MPI_Datatype MPI_CURRENT_TYPE2{
       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
@@ -654,9 +654,9 @@ static void action_scatterv(const char* const* action)
   void* send = nullptr;
   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
   for (int i = 0; i < comm_size; i++) {
-    sendcounts[i] = atoi(action[i + 2]);
-    send_sum += sendcounts[i];
+    (*sendcounts)[i] = atoi(action[i + 2]);
   }
+  int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
 
   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
   int rank = MPI_COMM_WORLD->rank();
@@ -664,13 +664,12 @@ static void action_scatterv(const char* const* action)
   if (rank == root)
     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
 
-  std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
+  TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
+                                                                           nullptr, encode_datatype(MPI_CURRENT_TYPE),
+                                                                           encode_datatype(MPI_CURRENT_TYPE2)));
 
-  TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
-                                             "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
-                                             encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
-
-  Colls::scatterv(send, sendcounts, disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
+  Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
+                  MPI_COMM_WORLD);
 
   TRACE_smpi_comm_out(Actor::self()->getPid());
   log_timed_action(action, clock);
@@ -689,23 +688,23 @@ static void action_reducescatter(const char *const *action) {
   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
   int comp_size = parse_double(action[2+comm_size]);
   int my_proc_id                     = Actor::self()->getPid();
-  std::vector<int>* trace_recvcounts = new std::vector<int>;
-  MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
+  std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
+  MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
 
   for(int i=0;i<comm_size;i++) {
-    trace_recvcounts->push_back(atoi(action[i + 2]));
+    recvcounts->push_back(atoi(action[i + 2]));
   }
-  int size{std::accumulate(trace_recvcounts->begin(), trace_recvcounts->end(), 0)};
+  int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
 
   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
-                     new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
+                     new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
                                                        encode_datatype(MPI_CURRENT_TYPE)));
 
   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
 
-  Colls::reduce_scatter(sendbuf, recvbuf, trace_recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
+  Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
   smpi_execute_flops(comp_size);
 
   TRACE_smpi_comm_out(my_proc_id);
@@ -726,7 +725,7 @@ static void action_allgather(const char *const *action) {
   int sendcount=atoi(action[2]);
   int recvcount=atoi(action[3]);
 
-  MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
 
   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
@@ -758,11 +757,10 @@ static void action_allgatherv(const char *const *action) {
   int comm_size = MPI_COMM_WORLD->size();
   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
   int sendcount=atoi(action[2]);
-  int recvcounts[comm_size];
+  std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
   std::vector<int> disps(comm_size, 0);
-  int recv_sum=0;
 
-  MPI_CURRENT_TYPE =
+  MPI_Datatype MPI_CURRENT_TYPE =
       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
   MPI_Datatype MPI_CURRENT_TYPE2{
       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
@@ -770,22 +768,20 @@ static void action_allgatherv(const char *const *action) {
   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
 
   for(int i=0;i<comm_size;i++) {
-    recvcounts[i] = atoi(action[i+3]);
-    recv_sum=recv_sum+recvcounts[i];
+    (*recvcounts)[i] = atoi(action[i + 3]);
   }
+  int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
 
   int my_proc_id = Actor::self()->getPid();
 
-  std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
-
   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
-                     new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
+                     new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
                                                        encode_datatype(MPI_CURRENT_TYPE),
                                                        encode_datatype(MPI_CURRENT_TYPE2)));
 
-  Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
-                          MPI_COMM_WORLD);
+  Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
+                    MPI_COMM_WORLD);
 
   TRACE_smpi_comm_out(my_proc_id);
   log_timed_action (action, clock);
@@ -804,18 +800,14 @@ static void action_allToAllv(const char *const *action) {
 
   int comm_size = MPI_COMM_WORLD->size();
   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
-  int send_size = 0;
-  int recv_size = 0;
-  int sendcounts[comm_size];
-  std::vector<int>* trace_sendcounts = new std::vector<int>;
-  int recvcounts[comm_size];
-  std::vector<int>* trace_recvcounts = new std::vector<int>;
+  std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
+  std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
   std::vector<int> senddisps(comm_size, 0);
   std::vector<int> recvdisps(comm_size, 0);
 
-  MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
-                         ? decode_datatype(action[4 + 2 * comm_size])
-                         : MPI_DEFAULT_TYPE;
+  MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
+                                      ? decode_datatype(action[4 + 2 * comm_size])
+                                      : MPI_DEFAULT_TYPE;
   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
                                      ? decode_datatype(action[5 + 2 * comm_size])
                                      : MPI_DEFAULT_TYPE};
@@ -827,21 +819,19 @@ static void action_allToAllv(const char *const *action) {
   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
 
   for(int i=0;i<comm_size;i++) {
-    sendcounts[i] = atoi(action[i+3]);
-    trace_sendcounts->push_back(sendcounts[i]);
-    send_size += sendcounts[i];
-    recvcounts[i] = atoi(action[i+4+comm_size]);
-    trace_recvcounts->push_back(recvcounts[i]);
-    recv_size += recvcounts[i];
+    (*sendcounts)[i] = atoi(action[3 + i]);
+    (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
   }
+  int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
+  int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
 
   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
-                     new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
-                                                       trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
+                     new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
+                                                       encode_datatype(MPI_CURRENT_TYPE),
                                                        encode_datatype(MPI_CURRENT_TYPE2)));
 
-  Colls::alltoallv(sendbuf, sendcounts, senddisps.data(), MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps.data(),
-                         MPI_CURRENT_TYPE, MPI_COMM_WORLD);
+  Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
+                   recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
 
   TRACE_smpi_comm_out(my_proc_id);
   log_timed_action (action, clock);