1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
22 using simgrid::s4u::Actor;
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
29 template <typename TT>
33 operator()(TT const& tt) const
35 return std::hash<TT>()(tt);
40 inline void hash_combine(std::size_t& seed, T const& v)
42 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
45 // Recursive template code derived from Matthieu M.
46 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1>
49 static void apply(size_t& seed, Tuple const& tuple)
51 HashValueImpl<Tuple, Index-1>::apply(seed, tuple);
52 hash_combine(seed, std::get<Index>(tuple));
56 template <class Tuple>
57 struct HashValueImpl<Tuple,0>
59 static void apply(size_t& seed, Tuple const& tuple)
61 hash_combine(seed, std::get<0>(tuple));
65 template <typename ... TT>
66 struct hash<std::tuple<TT...>>
69 operator()(std::tuple<TT...> const& tt) const
72 HashValueImpl<std::tuple<TT...> >::apply(seed, tt);
78 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
80 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
81 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
83 static MPI_Datatype MPI_DEFAULT_TYPE;
85 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
87 if (action.size() < static_cast<unsigned long>(mandatory + 2)) { \
88 std::stringstream ss; \
89 for (const auto& elem : action) { \
92 THROWF(arg_error, 0, "%s replay failed.\n" \
93 "%zu items were given on the line. First two should be process_id and action. " \
94 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
95 "The full line that was given is:\n %s\n" \
96 "Please contact the Simgrid team if support is needed", \
97 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional), \
102 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
104 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
105 std::string s = boost::algorithm::join(action, " ");
106 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
110 /* Helper function */
111 static double parse_double(std::string string)
113 return xbt_str_parse_double(string.c_str(), "%s is not a double");
121 class RequestStorage {
132 req_storage_t& get_store()
137 void get_requests(std::vector<MPI_Request>& vec)
139 for (auto& pair : store) {
140 auto& req = pair.second;
141 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
142 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
143 vec.push_back(pair.second);
144 pair.second->print_request("MM");
149 MPI_Request find(int src, int dst, int tag)
151 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
152 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
155 void remove(MPI_Request req)
157 if (req == MPI_REQUEST_NULL) return;
159 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
162 void add(MPI_Request req)
164 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
165 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
168 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
169 void addNullRequest(int src, int dst, int tag)
171 store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
175 class ActionArgParser {
177 virtual ~ActionArgParser() = default;
178 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
181 class WaitTestParser : public ActionArgParser {
187 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
189 CHECK_ACTION_PARAMS(action, 3, 0)
190 src = std::stoi(action[2]);
191 dst = std::stoi(action[3]);
192 tag = std::stoi(action[4]);
196 class SendRecvParser : public ActionArgParser {
198 /* communication partner; if we send, this is the receiver and vice versa */
202 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
204 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
206 CHECK_ACTION_PARAMS(action, 3, 1)
207 partner = std::stoi(action[2]);
208 tag = std::stoi(action[3]);
209 size = parse_double(action[4]);
210 if (action.size() > 5)
211 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
215 class ComputeParser : public ActionArgParser {
217 /* communication partner; if we send, this is the receiver and vice versa */
220 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
222 CHECK_ACTION_PARAMS(action, 1, 0)
223 flops = parse_double(action[2]);
227 class CollCommParser : public ActionArgParser {
235 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
236 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
239 class BcastArgParser : public CollCommParser {
241 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
243 CHECK_ACTION_PARAMS(action, 1, 2)
244 size = parse_double(action[2]);
245 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
246 if (action.size() > 4)
247 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
251 class ReduceArgParser : public CollCommParser {
253 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
255 CHECK_ACTION_PARAMS(action, 2, 2)
256 comm_size = parse_double(action[2]);
257 comp_size = parse_double(action[3]);
258 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
259 if (action.size() > 5)
260 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
264 class AllReduceArgParser : public CollCommParser {
266 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
268 CHECK_ACTION_PARAMS(action, 2, 1)
269 comm_size = parse_double(action[2]);
270 comp_size = parse_double(action[3]);
271 if (action.size() > 4)
272 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
276 class AllToAllArgParser : public CollCommParser {
278 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
280 CHECK_ACTION_PARAMS(action, 2, 1)
281 comm_size = MPI_COMM_WORLD->size();
282 send_size = parse_double(action[2]);
283 recv_size = parse_double(action[3]);
285 if (action.size() > 4)
286 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
287 if (action.size() > 5)
288 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
292 class GatherArgParser : public CollCommParser {
294 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
296 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
299 1) 68 is the sendcounts
300 2) 68 is the recvcounts
301 3) 0 is the root node
302 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305 CHECK_ACTION_PARAMS(action, 2, 3)
306 comm_size = MPI_COMM_WORLD->size();
307 send_size = parse_double(action[2]);
308 recv_size = parse_double(action[3]);
310 if (name == "gather") {
311 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
312 if (action.size() > 5)
313 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
314 if (action.size() > 6)
315 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
318 if (action.size() > 4)
319 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
320 if (action.size() > 5)
321 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
326 class GatherVArgParser : public CollCommParser {
329 std::shared_ptr<std::vector<int>> recvcounts;
330 std::vector<int> disps;
331 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
333 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
334 0 gather 68 68 10 10 10 0 0 0
336 1) 68 is the sendcount
337 2) 68 10 10 10 is the recvcounts
338 3) 0 is the root node
339 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
340 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
342 comm_size = MPI_COMM_WORLD->size();
343 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
344 send_size = parse_double(action[2]);
345 disps = std::vector<int>(comm_size, 0);
346 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
348 if (name == "gatherV") {
349 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
350 if (action.size() > 4 + comm_size)
351 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
352 if (action.size() > 5 + comm_size)
353 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
356 int datatype_index = 0;
358 /* The 3 comes from "0 gather <sendcount>", which must always be present.
359 * The + comm_size is the recvcounts array, which must also be present
361 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
362 datatype_index = 3 + comm_size;
363 disp_index = datatype_index + 1;
364 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
365 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
366 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
367 disp_index = 3 + comm_size;
368 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
369 datatype_index = 3 + comm_size;
370 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
371 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
374 if (disp_index != 0) {
375 for (unsigned int i = 0; i < comm_size; i++)
376 disps[i] = std::stoi(action[disp_index + i]);
380 for (unsigned int i = 0; i < comm_size; i++) {
381 (*recvcounts)[i] = std::stoi(action[i + 3]);
383 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
387 class ScatterArgParser : public CollCommParser {
389 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
391 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
394 1) 68 is the sendcounts
395 2) 68 is the recvcounts
396 3) 0 is the root node
397 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
398 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
400 CHECK_ACTION_PARAMS(action, 2, 3)
401 comm_size = MPI_COMM_WORLD->size();
402 send_size = parse_double(action[2]);
403 recv_size = parse_double(action[3]);
404 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
405 if (action.size() > 5)
406 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
407 if (action.size() > 6)
408 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
412 class ScatterVArgParser : public CollCommParser {
416 std::shared_ptr<std::vector<int>> sendcounts;
417 std::vector<int> disps;
418 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
420 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
421 0 gather 68 10 10 10 68 0 0 0
423 1) 68 10 10 10 is the sendcounts
424 2) 68 is the recvcount
425 3) 0 is the root node
426 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
427 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
429 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
430 recv_size = parse_double(action[2 + comm_size]);
431 disps = std::vector<int>(comm_size, 0);
432 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
434 if (action.size() > 5 + comm_size)
435 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
436 if (action.size() > 5 + comm_size)
437 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
439 for (unsigned int i = 0; i < comm_size; i++) {
440 (*sendcounts)[i] = std::stoi(action[i + 2]);
442 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
443 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
447 class ReduceScatterArgParser : public CollCommParser {
450 std::shared_ptr<std::vector<int>> recvcounts;
451 std::vector<int> disps;
452 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
454 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
455 0 reduceScatter 275427 275427 275427 204020 11346849 0
457 1) The first four values after the name of the action declare the recvcounts array
458 2) The value 11346849 is the amount of instructions
459 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
461 comm_size = MPI_COMM_WORLD->size();
462 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
463 comp_size = parse_double(action[2+comm_size]);
464 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
465 if (action.size() > 3 + comm_size)
466 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
468 for (unsigned int i = 0; i < comm_size; i++) {
469 recvcounts->push_back(std::stoi(action[i + 2]));
471 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
475 class AllToAllVArgParser : public CollCommParser {
479 std::shared_ptr<std::vector<int>> recvcounts;
480 std::shared_ptr<std::vector<int>> sendcounts;
481 std::vector<int> senddisps;
482 std::vector<int> recvdisps;
485 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
487 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
488 0 allToAllV 100 1 7 10 12 100 1 70 10 5
490 1) 100 is the size of the send buffer *sizeof(int),
491 2) 1 7 10 12 is the sendcounts array
492 3) 100*sizeof(int) is the size of the receiver buffer
493 4) 1 70 10 5 is the recvcounts array
495 comm_size = MPI_COMM_WORLD->size();
496 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
497 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
498 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
499 senddisps = std::vector<int>(comm_size, 0);
500 recvdisps = std::vector<int>(comm_size, 0);
502 if (action.size() > 5 + 2 * comm_size)
503 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
504 if (action.size() > 5 + 2 * comm_size)
505 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
507 send_buf_size=parse_double(action[2]);
508 recv_buf_size=parse_double(action[3+comm_size]);
509 for (unsigned int i = 0; i < comm_size; i++) {
510 (*sendcounts)[i] = std::stoi(action[3 + i]);
511 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
513 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
514 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
518 template <class T> class ReplayAction {
520 const std::string name;
521 RequestStorage* req_storage; // Points to the right storage for this process, nullptr except for Send/Recv/Wait/Test actions.
522 const int my_proc_id;
526 explicit ReplayAction(std::string name, RequestStorage& storage) : name(name), req_storage(&storage), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
527 explicit ReplayAction(std::string name) : name(name), req_storage(nullptr), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
528 virtual ~ReplayAction() = default;
530 virtual void execute(simgrid::xbt::ReplayAction& action)
532 // Needs to be re-initialized for every action, hence here
533 double start_time = smpi_process()->simulated_elapsed();
534 args.parse(action, name);
537 log_timed_action(action, start_time);
540 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
542 void* send_buffer(int size)
544 return smpi_get_tmp_sendbuffer(size);
547 void* recv_buffer(int size)
549 return smpi_get_tmp_recvbuffer(size);
553 class WaitAction : public ReplayAction<WaitTestParser> {
555 WaitAction(RequestStorage& storage) : ReplayAction("Wait", storage) {}
556 void kernel(simgrid::xbt::ReplayAction& action) override
558 std::string s = boost::algorithm::join(action, " ");
559 xbt_assert(req_storage->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
560 MPI_Request request = req_storage->find(args.src, args.dst, args.tag);
561 req_storage->remove(request);
563 if (request == MPI_REQUEST_NULL) {
564 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
569 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
571 // Must be taken before Request::wait() since the request may be set to
572 // MPI_REQUEST_NULL by Request::wait!
573 bool is_wait_for_receive = (request->flags() & RECV);
574 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
575 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
578 Request::wait(&request, &status);
580 TRACE_smpi_comm_out(rank);
581 if (is_wait_for_receive)
582 TRACE_smpi_recv(args.src, args.dst, args.tag);
586 class SendAction : public ReplayAction<SendRecvParser> {
588 SendAction() = delete;
589 explicit SendAction(std::string name, RequestStorage& storage) : ReplayAction(name, storage) {}
590 void kernel(simgrid::xbt::ReplayAction& action) override
592 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
594 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
595 args.tag, Datatype::encode(args.datatype1)));
596 if (not TRACE_smpi_view_internals())
597 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
599 if (name == "send") {
600 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
601 } else if (name == "Isend") {
602 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
603 req_storage->add(request);
605 xbt_die("Don't know this action, %s", name.c_str());
608 TRACE_smpi_comm_out(my_proc_id);
612 class RecvAction : public ReplayAction<SendRecvParser> {
614 RecvAction() = delete;
615 explicit RecvAction(std::string name, RequestStorage& storage) : ReplayAction(name, storage) {}
616 void kernel(simgrid::xbt::ReplayAction& action) override
618 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
620 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
621 args.tag, Datatype::encode(args.datatype1)));
624 // unknown size from the receiver point of view
625 if (args.size <= 0.0) {
626 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
627 args.size = status.count;
630 if (name == "recv") {
631 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
632 } else if (name == "Irecv") {
633 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
634 req_storage->add(request);
637 TRACE_smpi_comm_out(my_proc_id);
638 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
639 if (name == "recv" && not TRACE_smpi_view_internals()) {
640 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
645 class ComputeAction : public ReplayAction<ComputeParser> {
647 ComputeAction() : ReplayAction("compute") {}
648 void kernel(simgrid::xbt::ReplayAction& action) override
650 TRACE_smpi_computing_in(my_proc_id, args.flops);
651 smpi_execute_flops(args.flops);
652 TRACE_smpi_computing_out(my_proc_id);
656 class TestAction : public ReplayAction<WaitTestParser> {
658 TestAction(RequestStorage& storage) : ReplayAction("Test", storage) {}
659 void kernel(simgrid::xbt::ReplayAction& action) override
661 MPI_Request request = req_storage->find(args.src, args.dst, args.tag);
662 req_storage->remove(request);
663 // if request is null here, this may mean that a previous test has succeeded
664 // Different times in traced application and replayed version may lead to this
665 // In this case, ignore the extra calls.
666 if (request != MPI_REQUEST_NULL) {
667 TRACE_smpi_testing_in(my_proc_id);
670 int flag = Request::test(&request, &status);
672 XBT_DEBUG("MPI_Test result: %d", flag);
673 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
675 if (request == MPI_REQUEST_NULL)
676 req_storage->addNullRequest(args.src, args.dst, args.tag);
678 req_storage->add(request);
680 TRACE_smpi_testing_out(my_proc_id);
685 class InitAction : public ReplayAction<ActionArgParser> {
687 InitAction() : ReplayAction("Init") {}
688 void kernel(simgrid::xbt::ReplayAction& action) override
690 CHECK_ACTION_PARAMS(action, 0, 1)
691 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
692 : MPI_BYTE; // default TAU datatype
694 /* start a simulated timer */
695 smpi_process()->simulated_start();
699 class CommunicatorAction : public ReplayAction<ActionArgParser> {
701 CommunicatorAction() : ReplayAction("Comm") {}
702 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
705 class WaitAllAction : public ReplayAction<ActionArgParser> {
707 WaitAllAction(RequestStorage& storage) : ReplayAction("waitAll", storage) {}
708 void kernel(simgrid::xbt::ReplayAction& action) override
710 const unsigned int count_requests = req_storage->size();
712 if (count_requests > 0) {
713 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
714 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
715 std::vector<MPI_Request> reqs;
716 req_storage->get_requests(reqs);
717 for (const auto& req : reqs) {
718 if (req && (req->flags() & RECV)) {
719 sender_receiver.push_back({req->src(), req->dst()});
722 MPI_Status status[count_requests];
723 Request::waitall(count_requests, &(reqs.data())[0], status);
724 req_storage->get_store().clear();
726 for (auto& pair : sender_receiver) {
727 TRACE_smpi_recv(pair.first, pair.second, 0);
729 TRACE_smpi_comm_out(my_proc_id);
734 class BarrierAction : public ReplayAction<ActionArgParser> {
736 BarrierAction() : ReplayAction("barrier") {}
737 void kernel(simgrid::xbt::ReplayAction& action) override
739 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
740 Colls::barrier(MPI_COMM_WORLD);
741 TRACE_smpi_comm_out(my_proc_id);
745 class BcastAction : public ReplayAction<BcastArgParser> {
747 BcastAction() : ReplayAction("bcast") {}
748 void kernel(simgrid::xbt::ReplayAction& action) override
750 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
751 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
752 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
754 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
756 TRACE_smpi_comm_out(my_proc_id);
760 class ReduceAction : public ReplayAction<ReduceArgParser> {
762 ReduceAction() : ReplayAction("reduce") {}
763 void kernel(simgrid::xbt::ReplayAction& action) override
765 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
766 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
767 args.comp_size, args.comm_size, -1,
768 Datatype::encode(args.datatype1), ""));
770 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
771 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
772 smpi_execute_flops(args.comp_size);
774 TRACE_smpi_comm_out(my_proc_id);
778 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
780 AllReduceAction() : ReplayAction("allReduce") {}
781 void kernel(simgrid::xbt::ReplayAction& action) override
783 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
784 Datatype::encode(args.datatype1), ""));
786 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
787 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
788 smpi_execute_flops(args.comp_size);
790 TRACE_smpi_comm_out(my_proc_id);
794 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
796 AllToAllAction() : ReplayAction("allToAll") {}
797 void kernel(simgrid::xbt::ReplayAction& action) override
799 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
800 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
801 Datatype::encode(args.datatype1),
802 Datatype::encode(args.datatype2)));
804 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
805 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
806 args.recv_size, args.datatype2, MPI_COMM_WORLD);
808 TRACE_smpi_comm_out(my_proc_id);
812 class GatherAction : public ReplayAction<GatherArgParser> {
814 explicit GatherAction(std::string name) : ReplayAction(name) {}
815 void kernel(simgrid::xbt::ReplayAction& action) override
817 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
818 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
820 if (name == "gather") {
821 int rank = MPI_COMM_WORLD->rank();
822 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
823 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
826 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
827 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
829 TRACE_smpi_comm_out(my_proc_id);
833 class GatherVAction : public ReplayAction<GatherVArgParser> {
835 explicit GatherVAction(std::string name) : ReplayAction(name) {}
836 void kernel(simgrid::xbt::ReplayAction& action) override
838 int rank = MPI_COMM_WORLD->rank();
840 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
841 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
842 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
844 if (name == "gatherV") {
845 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
846 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
847 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
850 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
851 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
852 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
855 TRACE_smpi_comm_out(my_proc_id);
859 class ScatterAction : public ReplayAction<ScatterArgParser> {
861 ScatterAction() : ReplayAction("scatter") {}
862 void kernel(simgrid::xbt::ReplayAction& action) override
864 int rank = MPI_COMM_WORLD->rank();
865 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
866 Datatype::encode(args.datatype1),
867 Datatype::encode(args.datatype2)));
869 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
870 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
872 TRACE_smpi_comm_out(my_proc_id);
877 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
879 ScatterVAction() : ReplayAction("scatterV") {}
880 void kernel(simgrid::xbt::ReplayAction& action) override
882 int rank = MPI_COMM_WORLD->rank();
883 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
884 nullptr, Datatype::encode(args.datatype1),
885 Datatype::encode(args.datatype2)));
887 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
888 args.sendcounts->data(), args.disps.data(), args.datatype1,
889 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
892 TRACE_smpi_comm_out(my_proc_id);
896 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
898 ReduceScatterAction() : ReplayAction("reduceScatter") {}
899 void kernel(simgrid::xbt::ReplayAction& action) override
901 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
902 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
903 std::to_string(args.comp_size), /* ugly hack to print comp_size */
904 Datatype::encode(args.datatype1)));
906 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
907 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
908 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
910 smpi_execute_flops(args.comp_size);
911 TRACE_smpi_comm_out(my_proc_id);
915 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
917 AllToAllVAction() : ReplayAction("allToAllV") {}
918 void kernel(simgrid::xbt::ReplayAction& action) override
920 TRACE_smpi_comm_in(my_proc_id, __func__,
921 new simgrid::instr::VarCollTIData(
922 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
923 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
925 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
926 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
928 TRACE_smpi_comm_out(my_proc_id);
931 } // Replay Namespace
932 }} // namespace simgrid::smpi
934 std::vector<simgrid::smpi::replay::RequestStorage> storage;
935 /** @brief Only initialize the replay, don't do it for real */
936 void smpi_replay_init(int* argc, char*** argv)
938 simgrid::smpi::Process::init(argc, argv);
939 smpi_process()->mark_as_initialized();
940 smpi_process()->set_replaying(true);
942 int my_proc_id = simgrid::s4u::this_actor::get_pid();
943 storage.resize(smpi_process_count());
945 TRACE_smpi_init(my_proc_id);
946 TRACE_smpi_computing_init(my_proc_id);
947 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
948 TRACE_smpi_comm_out(my_proc_id);
949 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
950 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
951 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
952 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
953 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
955 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
956 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
957 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
958 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
959 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
960 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
961 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
962 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
963 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
964 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
965 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
966 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
967 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
968 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
969 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
970 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
971 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
972 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
973 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
974 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
975 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
977 //if we have a delayed start, sleep here.
979 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
980 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
981 smpi_execute_flops(value);
983 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
984 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
985 smpi_execute_flops(0.0);
989 /** @brief actually run the replay after initialization */
990 void smpi_replay_main(int* argc, char*** argv)
992 static int active_processes = 0;
994 simgrid::xbt::replay_runner(*argc, *argv);
996 /* and now, finalize everything */
997 /* One active process will stop. Decrease the counter*/
998 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
999 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
1000 if (count_requests > 0) {
1001 MPI_Request requests[count_requests];
1002 MPI_Status status[count_requests];
1005 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
1006 requests[i] = pair.second;
1009 simgrid::smpi::Request::waitall(count_requests, requests, status);
1013 if(active_processes==0){
1014 /* Last process alive speaking: end the simulated timer */
1015 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
1016 smpi_free_replay_tmp_buffers();
1019 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
1020 new simgrid::instr::NoOpTIData("finalize"));
1022 smpi_process()->finalize();
1024 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
1025 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
1028 /** @brief chain a replay initialization and a replay start */
1029 void smpi_replay_run(int* argc, char*** argv)
1031 smpi_replay_init(argc, argv);
1032 smpi_replay_main(argc, argv);