1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 template <class T> class ReplayAction {
105 const std::string name;
111 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
113 virtual void execute(simgrid::xbt::ReplayAction& action)
115 // Needs to be re-initialized for every action, hence here
116 double start_time = smpi_process()->simulated_elapsed();
119 log_timed_action(action, start_time);
122 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
125 class WaitAction : public ReplayAction<ActionArgParser> {
127 WaitAction() : ReplayAction("Wait") {}
128 void kernel(simgrid::xbt::ReplayAction& action) override
130 CHECK_ACTION_PARAMS(action, 0, 0)
133 std::string s = boost::algorithm::join(action, " ");
134 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
135 MPI_Request request = get_reqq_self()->back();
136 get_reqq_self()->pop_back();
138 if (request == nullptr) {
139 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
144 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
146 MPI_Group group = request->comm()->group();
147 int src_traced = group->rank(request->src());
148 int dst_traced = group->rank(request->dst());
149 bool is_wait_for_receive = (request->flags() & RECV);
150 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
151 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
153 Request::wait(&request, &status);
155 TRACE_smpi_comm_out(rank);
156 if (is_wait_for_receive)
157 TRACE_smpi_recv(src_traced, dst_traced, 0);
161 class SendAction : public ReplayAction<SendRecvParser> {
163 SendAction() = delete;
164 SendAction(std::string name) : ReplayAction(name) {}
165 void kernel(simgrid::xbt::ReplayAction& action) override
167 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
169 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
170 Datatype::encode(args.datatype1)));
171 if (not TRACE_smpi_view_internals())
172 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
174 if (name == "send") {
175 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
176 } else if (name == "Isend") {
177 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178 get_reqq_self()->push_back(request);
180 xbt_die("Don't know this action, %s", name.c_str());
183 TRACE_smpi_comm_out(my_proc_id);
187 class RecvAction : public ReplayAction<SendRecvParser> {
189 RecvAction() = delete;
190 explicit RecvAction(std::string name) : ReplayAction(name) {}
191 void kernel(simgrid::xbt::ReplayAction& action) override
193 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
195 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
196 Datatype::encode(args.datatype1)));
199 // unknown size from the receiver point of view
200 if (args.size <= 0.0) {
201 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
202 args.size = status.count;
205 if (name == "recv") {
206 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
207 } else if (name == "Irecv") {
208 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
209 get_reqq_self()->push_back(request);
212 TRACE_smpi_comm_out(my_proc_id);
213 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
214 if (name == "recv" && not TRACE_smpi_view_internals()) {
215 TRACE_smpi_recv(src_traced, my_proc_id, 0);
220 class ComputeAction : public ReplayAction<ComputeParser> {
222 ComputeAction() : ReplayAction("compute") {}
223 void kernel(simgrid::xbt::ReplayAction& action) override
225 TRACE_smpi_computing_in(my_proc_id, args.flops);
226 smpi_execute_flops(args.flops);
227 TRACE_smpi_computing_out(my_proc_id);
231 class TestAction : public ReplayAction<ActionArgParser> {
233 TestAction() : ReplayAction("Test") {}
234 void kernel(simgrid::xbt::ReplayAction& action) override
236 MPI_Request request = get_reqq_self()->back();
237 get_reqq_self()->pop_back();
238 // if request is null here, this may mean that a previous test has succeeded
239 // Different times in traced application and replayed version may lead to this
240 // In this case, ignore the extra calls.
241 if (request != nullptr) {
242 TRACE_smpi_testing_in(my_proc_id);
245 int flag = Request::test(&request, &status);
247 XBT_DEBUG("MPI_Test result: %d", flag);
248 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
250 get_reqq_self()->push_back(request);
252 TRACE_smpi_testing_out(my_proc_id);
257 } // Replay Namespace
259 static void action_init(simgrid::xbt::ReplayAction& action)
261 XBT_DEBUG("Initialize the counters");
262 CHECK_ACTION_PARAMS(action, 0, 1)
263 if (action.size() > 2)
264 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
266 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
268 /* start a simulated timer */
269 smpi_process()->simulated_start();
270 /*initialize the number of active processes */
271 active_processes = smpi_process_count();
273 set_reqq_self(new std::vector<MPI_Request>);
276 static void action_finalize(simgrid::xbt::ReplayAction& action)
281 static void action_comm_size(simgrid::xbt::ReplayAction& action)
283 log_timed_action (action, smpi_process()->simulated_elapsed());
286 static void action_comm_split(simgrid::xbt::ReplayAction& action)
288 log_timed_action (action, smpi_process()->simulated_elapsed());
291 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
293 log_timed_action (action, smpi_process()->simulated_elapsed());
296 static void action_waitall(simgrid::xbt::ReplayAction& action)
298 CHECK_ACTION_PARAMS(action, 0, 0)
299 double clock = smpi_process()->simulated_elapsed();
300 const unsigned int count_requests = get_reqq_self()->size();
302 if (count_requests>0) {
303 MPI_Status status[count_requests];
305 int my_proc_id_traced = Actor::self()->getPid();
306 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
307 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
308 int recvs_snd[count_requests];
309 int recvs_rcv[count_requests];
310 for (unsigned int i = 0; i < count_requests; i++) {
311 const auto& req = (*get_reqq_self())[i];
312 if (req && (req->flags() & RECV)) {
313 recvs_snd[i] = req->src();
314 recvs_rcv[i] = req->dst();
318 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
320 for (unsigned i = 0; i < count_requests; i++) {
321 if (recvs_snd[i]!=-100)
322 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
324 TRACE_smpi_comm_out(my_proc_id_traced);
326 log_timed_action (action, clock);
329 static void action_barrier(simgrid::xbt::ReplayAction& action)
331 double clock = smpi_process()->simulated_elapsed();
332 int my_proc_id = Actor::self()->getPid();
333 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
335 Colls::barrier(MPI_COMM_WORLD);
337 TRACE_smpi_comm_out(my_proc_id);
338 log_timed_action (action, clock);
341 static void action_bcast(simgrid::xbt::ReplayAction& action)
343 CHECK_ACTION_PARAMS(action, 1, 2)
344 double size = parse_double(action[2]);
345 double clock = smpi_process()->simulated_elapsed();
346 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
347 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
348 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
350 int my_proc_id = Actor::self()->getPid();
351 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
352 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
353 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
355 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
357 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
359 TRACE_smpi_comm_out(my_proc_id);
360 log_timed_action (action, clock);
363 static void action_reduce(simgrid::xbt::ReplayAction& action)
365 CHECK_ACTION_PARAMS(action, 2, 2)
366 double comm_size = parse_double(action[2]);
367 double comp_size = parse_double(action[3]);
368 double clock = smpi_process()->simulated_elapsed();
369 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
371 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
373 int my_proc_id = Actor::self()->getPid();
374 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
375 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
376 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
378 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
379 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
380 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
381 smpi_execute_flops(comp_size);
383 TRACE_smpi_comm_out(my_proc_id);
384 log_timed_action (action, clock);
387 static void action_allReduce(simgrid::xbt::ReplayAction& action)
389 CHECK_ACTION_PARAMS(action, 2, 1)
390 double comm_size = parse_double(action[2]);
391 double comp_size = parse_double(action[3]);
393 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
395 double clock = smpi_process()->simulated_elapsed();
396 int my_proc_id = Actor::self()->getPid();
397 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
398 Datatype::encode(MPI_CURRENT_TYPE), ""));
400 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
401 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
402 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
403 smpi_execute_flops(comp_size);
405 TRACE_smpi_comm_out(my_proc_id);
406 log_timed_action (action, clock);
409 static void action_allToAll(simgrid::xbt::ReplayAction& action)
411 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
412 double clock = smpi_process()->simulated_elapsed();
413 unsigned long comm_size = MPI_COMM_WORLD->size();
414 int send_size = parse_double(action[2]);
415 int recv_size = parse_double(action[3]);
416 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
417 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
419 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
420 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
422 int my_proc_id = Actor::self()->getPid();
423 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
424 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
425 Datatype::encode(MPI_CURRENT_TYPE),
426 Datatype::encode(MPI_CURRENT_TYPE2)));
428 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
430 TRACE_smpi_comm_out(my_proc_id);
431 log_timed_action (action, clock);
434 static void action_gather(simgrid::xbt::ReplayAction& action)
436 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
439 1) 68 is the sendcounts
440 2) 68 is the recvcounts
441 3) 0 is the root node
442 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
443 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
445 CHECK_ACTION_PARAMS(action, 2, 3)
446 double clock = smpi_process()->simulated_elapsed();
447 unsigned long comm_size = MPI_COMM_WORLD->size();
448 int send_size = parse_double(action[2]);
449 int recv_size = parse_double(action[3]);
450 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
451 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
453 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
454 void *recv = nullptr;
455 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
456 int rank = MPI_COMM_WORLD->rank();
459 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
461 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
462 Datatype::encode(MPI_CURRENT_TYPE),
463 Datatype::encode(MPI_CURRENT_TYPE2)));
465 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
467 TRACE_smpi_comm_out(Actor::self()->getPid());
468 log_timed_action (action, clock);
471 static void action_scatter(simgrid::xbt::ReplayAction& action)
473 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
476 1) 68 is the sendcounts
477 2) 68 is the recvcounts
478 3) 0 is the root node
479 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
480 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
482 CHECK_ACTION_PARAMS(action, 2, 3)
483 double clock = smpi_process()->simulated_elapsed();
484 unsigned long comm_size = MPI_COMM_WORLD->size();
485 int send_size = parse_double(action[2]);
486 int recv_size = parse_double(action[3]);
487 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
488 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
490 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
491 void* recv = nullptr;
492 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
493 int rank = MPI_COMM_WORLD->rank();
496 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
498 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
499 Datatype::encode(MPI_CURRENT_TYPE),
500 Datatype::encode(MPI_CURRENT_TYPE2)));
502 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
504 TRACE_smpi_comm_out(Actor::self()->getPid());
505 log_timed_action(action, clock);
508 static void action_gatherv(simgrid::xbt::ReplayAction& action)
510 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
511 0 gather 68 68 10 10 10 0 0 0
513 1) 68 is the sendcount
514 2) 68 10 10 10 is the recvcounts
515 3) 0 is the root node
516 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
517 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
519 double clock = smpi_process()->simulated_elapsed();
520 unsigned long comm_size = MPI_COMM_WORLD->size();
521 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
522 int send_size = parse_double(action[2]);
523 std::vector<int> disps(comm_size, 0);
524 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
526 MPI_Datatype MPI_CURRENT_TYPE =
527 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
528 MPI_Datatype MPI_CURRENT_TYPE2{
529 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
531 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
532 void *recv = nullptr;
533 for (unsigned int i = 0; i < comm_size; i++) {
534 (*recvcounts)[i] = std::stoi(action[i + 3]);
536 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
538 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
539 int rank = MPI_COMM_WORLD->rank();
542 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
544 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
545 "gatherV", root, send_size, nullptr, -1, recvcounts,
546 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
548 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
551 TRACE_smpi_comm_out(Actor::self()->getPid());
552 log_timed_action (action, clock);
555 static void action_scatterv(simgrid::xbt::ReplayAction& action)
557 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
558 0 gather 68 10 10 10 68 0 0 0
560 1) 68 10 10 10 is the sendcounts
561 2) 68 is the recvcount
562 3) 0 is the root node
563 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
564 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
566 double clock = smpi_process()->simulated_elapsed();
567 unsigned long comm_size = MPI_COMM_WORLD->size();
568 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
569 int recv_size = parse_double(action[2 + comm_size]);
570 std::vector<int> disps(comm_size, 0);
571 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
573 MPI_Datatype MPI_CURRENT_TYPE =
574 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
575 MPI_Datatype MPI_CURRENT_TYPE2{
576 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
578 void* send = nullptr;
579 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
580 for (unsigned int i = 0; i < comm_size; i++) {
581 (*sendcounts)[i] = std::stoi(action[i + 2]);
583 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
585 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
586 int rank = MPI_COMM_WORLD->rank();
589 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
591 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
592 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
593 Datatype::encode(MPI_CURRENT_TYPE2)));
595 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
598 TRACE_smpi_comm_out(Actor::self()->getPid());
599 log_timed_action(action, clock);
602 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
604 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
605 0 reduceScatter 275427 275427 275427 204020 11346849 0
607 1) The first four values after the name of the action declare the recvcounts array
608 2) The value 11346849 is the amount of instructions
609 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
611 double clock = smpi_process()->simulated_elapsed();
612 unsigned long comm_size = MPI_COMM_WORLD->size();
613 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
614 int comp_size = parse_double(action[2+comm_size]);
615 int my_proc_id = Actor::self()->getPid();
616 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
617 MPI_Datatype MPI_CURRENT_TYPE =
618 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
620 for (unsigned int i = 0; i < comm_size; i++) {
621 recvcounts->push_back(std::stoi(action[i + 2]));
623 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
625 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
626 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
627 std::to_string(comp_size), /* ugly hack to print comp_size */
628 Datatype::encode(MPI_CURRENT_TYPE)));
630 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
631 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
633 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
634 smpi_execute_flops(comp_size);
636 TRACE_smpi_comm_out(my_proc_id);
637 log_timed_action (action, clock);
640 static void action_allgather(simgrid::xbt::ReplayAction& action)
642 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
643 0 allGather 275427 275427
645 1) 275427 is the sendcount
646 2) 275427 is the recvcount
647 3) No more values mean that the datatype for sent and receive buffer is the default one, see
648 simgrid::smpi::Datatype::decode().
650 double clock = smpi_process()->simulated_elapsed();
652 CHECK_ACTION_PARAMS(action, 2, 2)
653 int sendcount = std::stoi(action[2]);
654 int recvcount = std::stoi(action[3]);
656 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
657 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
659 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
660 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
662 int my_proc_id = Actor::self()->getPid();
664 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
665 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
666 Datatype::encode(MPI_CURRENT_TYPE),
667 Datatype::encode(MPI_CURRENT_TYPE2)));
669 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
671 TRACE_smpi_comm_out(my_proc_id);
672 log_timed_action (action, clock);
675 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
677 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
678 0 allGatherV 275427 275427 275427 275427 204020
680 1) 275427 is the sendcount
681 2) The next four elements declare the recvcounts array
682 3) No more values mean that the datatype for sent and receive buffer is the default one, see
683 simgrid::smpi::Datatype::decode().
685 double clock = smpi_process()->simulated_elapsed();
687 unsigned long comm_size = MPI_COMM_WORLD->size();
688 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
689 int sendcount = std::stoi(action[2]);
690 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
691 std::vector<int> disps(comm_size, 0);
693 int datatype_index = 0, disp_index = 0;
694 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
695 datatype_index = 3 + comm_size;
696 disp_index = datatype_index + 1;
697 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
699 disp_index = 3 + comm_size;
700 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
701 datatype_index = 3 + comm_size;
704 if (disp_index != 0) {
705 for (unsigned int i = 0; i < comm_size; i++)
706 disps[i] = std::stoi(action[disp_index + i]);
709 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
711 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
714 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
716 for (unsigned int i = 0; i < comm_size; i++) {
717 (*recvcounts)[i] = std::stoi(action[i + 3]);
719 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
720 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
722 int my_proc_id = Actor::self()->getPid();
724 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
725 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
726 Datatype::encode(MPI_CURRENT_TYPE),
727 Datatype::encode(MPI_CURRENT_TYPE2)));
729 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
732 TRACE_smpi_comm_out(my_proc_id);
733 log_timed_action (action, clock);
736 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
738 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
739 0 allToAllV 100 1 7 10 12 100 1 70 10 5
741 1) 100 is the size of the send buffer *sizeof(int),
742 2) 1 7 10 12 is the sendcounts array
743 3) 100*sizeof(int) is the size of the receiver buffer
744 4) 1 70 10 5 is the recvcounts array
746 double clock = smpi_process()->simulated_elapsed();
748 unsigned long comm_size = MPI_COMM_WORLD->size();
749 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
750 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
751 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
752 std::vector<int> senddisps(comm_size, 0);
753 std::vector<int> recvdisps(comm_size, 0);
755 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
756 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
758 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
759 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
762 int send_buf_size=parse_double(action[2]);
763 int recv_buf_size=parse_double(action[3+comm_size]);
764 int my_proc_id = Actor::self()->getPid();
765 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
766 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
768 for (unsigned int i = 0; i < comm_size; i++) {
769 (*sendcounts)[i] = std::stoi(action[3 + i]);
770 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
772 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
773 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
775 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
776 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
777 Datatype::encode(MPI_CURRENT_TYPE),
778 Datatype::encode(MPI_CURRENT_TYPE2)));
780 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
781 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
783 TRACE_smpi_comm_out(my_proc_id);
784 log_timed_action (action, clock);
787 }} // namespace simgrid::smpi
789 /** @brief Only initialize the replay, don't do it for real */
790 void smpi_replay_init(int* argc, char*** argv)
792 simgrid::smpi::Process::init(argc, argv);
793 smpi_process()->mark_as_initialized();
794 smpi_process()->set_replaying(true);
796 int my_proc_id = Actor::self()->getPid();
797 TRACE_smpi_init(my_proc_id);
798 TRACE_smpi_computing_init(my_proc_id);
799 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
800 TRACE_smpi_comm_out(my_proc_id);
801 xbt_replay_action_register("init", simgrid::smpi::action_init);
802 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
803 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
804 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
805 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
807 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
808 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
809 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
810 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
811 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
812 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
813 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
814 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
815 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
816 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
817 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
818 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
819 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
820 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
821 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
822 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
823 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
824 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
825 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
826 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
827 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
829 //if we have a delayed start, sleep here.
831 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
832 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
833 smpi_execute_flops(value);
835 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
836 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
837 smpi_execute_flops(0.0);
841 /** @brief actually run the replay after initialization */
842 void smpi_replay_main(int* argc, char*** argv)
844 simgrid::xbt::replay_runner(*argc, *argv);
846 /* and now, finalize everything */
847 /* One active process will stop. Decrease the counter*/
848 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
849 if (not get_reqq_self()->empty()) {
850 unsigned int count_requests=get_reqq_self()->size();
851 MPI_Request requests[count_requests];
852 MPI_Status status[count_requests];
855 for (auto const& req : *get_reqq_self()) {
859 simgrid::smpi::Request::waitall(count_requests, requests, status);
861 delete get_reqq_self();
864 if(active_processes==0){
865 /* Last process alive speaking: end the simulated timer */
866 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
867 smpi_free_replay_tmp_buffers();
870 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
872 smpi_process()->finalize();
874 TRACE_smpi_comm_out(Actor::self()->getPid());
875 TRACE_smpi_finalize(Actor::self()->getPid());
878 /** @brief chain a replay initialization and a replay start */
879 void smpi_replay_run(int* argc, char*** argv)
881 smpi_replay_init(argc, argv);
882 smpi_replay_main(argc, argv);