1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
23 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
24 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
25 // this could go into a header file.
26 namespace hash_tuple {
27 template <typename TT> class hash {
29 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 // Recursive template code derived from Matthieu M.
38 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 static void apply(size_t& seed, Tuple const& tuple)
42 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
43 hash_combine(seed, std::get<Index>(tuple));
47 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 template <typename... TT> class hash<std::tuple<TT...>> {
54 size_t operator()(std::tuple<TT...> const& tt) const
57 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
63 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
65 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
66 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
68 static MPI_Datatype MPI_DEFAULT_TYPE;
70 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
72 if (action.size() < static_cast<unsigned long>(mandatory + 2)) { \
73 std::stringstream ss; \
74 for (const auto& elem : action) { \
77 THROWF(arg_error, 0, "%s replay failed.\n" \
78 "%zu items were given on the line. First two should be process_id and action. " \
79 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
80 "The full line that was given is:\n %s\n" \
81 "Please contact the Simgrid team if support is needed", \
82 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional), \
87 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
89 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
90 std::string s = boost::algorithm::join(action, " ");
91 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
96 static double parse_double(std::string string)
98 return xbt_str_parse_double(string.c_str(), "%s is not a double");
106 class RequestStorage {
117 req_storage_t& get_store()
122 void get_requests(std::vector<MPI_Request>& vec)
124 for (auto& pair : store) {
125 auto& req = pair.second;
126 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
127 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
128 vec.push_back(pair.second);
129 pair.second->print_request("MM");
134 MPI_Request find(int src, int dst, int tag)
136 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
137 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
140 void remove(MPI_Request req)
142 if (req == MPI_REQUEST_NULL) return;
144 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
147 void add(MPI_Request req)
149 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
150 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
153 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
154 void addNullRequest(int src, int dst, int tag)
156 store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
161 * Base class for all parsers.
163 class ActionArgParser {
165 virtual ~ActionArgParser() = default;
166 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
169 class WaitTestParser : public ActionArgParser {
175 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
177 CHECK_ACTION_PARAMS(action, 3, 0)
178 src = std::stoi(action[2]);
179 dst = std::stoi(action[3]);
180 tag = std::stoi(action[4]);
184 class SendRecvParser : public ActionArgParser {
186 /* communication partner; if we send, this is the receiver and vice versa */
190 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
192 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
194 CHECK_ACTION_PARAMS(action, 3, 1)
195 partner = std::stoi(action[2]);
196 tag = std::stoi(action[3]);
197 size = parse_double(action[4]);
198 if (action.size() > 5)
199 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
203 class ComputeParser : public ActionArgParser {
205 /* communication partner; if we send, this is the receiver and vice versa */
208 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
210 CHECK_ACTION_PARAMS(action, 1, 0)
211 flops = parse_double(action[2]);
215 class CollCommParser : public ActionArgParser {
223 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
224 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
227 class BcastArgParser : public CollCommParser {
229 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
231 CHECK_ACTION_PARAMS(action, 1, 2)
232 size = parse_double(action[2]);
233 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
234 if (action.size() > 4)
235 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
239 class ReduceArgParser : public CollCommParser {
241 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
243 CHECK_ACTION_PARAMS(action, 2, 2)
244 comm_size = parse_double(action[2]);
245 comp_size = parse_double(action[3]);
246 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
247 if (action.size() > 5)
248 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
252 class AllReduceArgParser : public CollCommParser {
254 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
256 CHECK_ACTION_PARAMS(action, 2, 1)
257 comm_size = parse_double(action[2]);
258 comp_size = parse_double(action[3]);
259 if (action.size() > 4)
260 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
264 class AllToAllArgParser : public CollCommParser {
266 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
268 CHECK_ACTION_PARAMS(action, 2, 1)
269 comm_size = MPI_COMM_WORLD->size();
270 send_size = parse_double(action[2]);
271 recv_size = parse_double(action[3]);
273 if (action.size() > 4)
274 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
275 if (action.size() > 5)
276 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
280 class GatherArgParser : public CollCommParser {
282 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
284 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
287 1) 68 is the sendcounts
288 2) 68 is the recvcounts
289 3) 0 is the root node
290 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
291 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
293 CHECK_ACTION_PARAMS(action, 2, 3)
294 comm_size = MPI_COMM_WORLD->size();
295 send_size = parse_double(action[2]);
296 recv_size = parse_double(action[3]);
298 if (name == "gather") {
299 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
300 if (action.size() > 5)
301 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
302 if (action.size() > 6)
303 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
306 if (action.size() > 4)
307 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
308 if (action.size() > 5)
309 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
314 class GatherVArgParser : public CollCommParser {
317 std::shared_ptr<std::vector<int>> recvcounts;
318 std::vector<int> disps;
319 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
321 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
322 0 gather 68 68 10 10 10 0 0 0
324 1) 68 is the sendcount
325 2) 68 10 10 10 is the recvcounts
326 3) 0 is the root node
327 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
328 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
330 comm_size = MPI_COMM_WORLD->size();
331 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
332 send_size = parse_double(action[2]);
333 disps = std::vector<int>(comm_size, 0);
334 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
336 if (name == "gatherV") {
337 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
338 if (action.size() > 4 + comm_size)
339 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
340 if (action.size() > 5 + comm_size)
341 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
344 int datatype_index = 0;
346 /* The 3 comes from "0 gather <sendcount>", which must always be present.
347 * The + comm_size is the recvcounts array, which must also be present
349 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
350 datatype_index = 3 + comm_size;
351 disp_index = datatype_index + 1;
352 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
353 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
354 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
355 disp_index = 3 + comm_size;
356 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
357 datatype_index = 3 + comm_size;
358 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
359 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
362 if (disp_index != 0) {
363 for (unsigned int i = 0; i < comm_size; i++)
364 disps[i] = std::stoi(action[disp_index + i]);
368 for (unsigned int i = 0; i < comm_size; i++) {
369 (*recvcounts)[i] = std::stoi(action[i + 3]);
371 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
375 class ScatterArgParser : public CollCommParser {
377 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
379 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
382 1) 68 is the sendcounts
383 2) 68 is the recvcounts
384 3) 0 is the root node
385 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
386 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
388 CHECK_ACTION_PARAMS(action, 2, 3)
389 comm_size = MPI_COMM_WORLD->size();
390 send_size = parse_double(action[2]);
391 recv_size = parse_double(action[3]);
392 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
393 if (action.size() > 5)
394 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
395 if (action.size() > 6)
396 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
400 class ScatterVArgParser : public CollCommParser {
404 std::shared_ptr<std::vector<int>> sendcounts;
405 std::vector<int> disps;
406 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
408 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
409 0 gather 68 10 10 10 68 0 0 0
411 1) 68 10 10 10 is the sendcounts
412 2) 68 is the recvcount
413 3) 0 is the root node
414 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
415 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
417 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
418 recv_size = parse_double(action[2 + comm_size]);
419 disps = std::vector<int>(comm_size, 0);
420 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
422 if (action.size() > 5 + comm_size)
423 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
424 if (action.size() > 5 + comm_size)
425 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
427 for (unsigned int i = 0; i < comm_size; i++) {
428 (*sendcounts)[i] = std::stoi(action[i + 2]);
430 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
431 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
435 class ReduceScatterArgParser : public CollCommParser {
438 std::shared_ptr<std::vector<int>> recvcounts;
439 std::vector<int> disps;
440 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
442 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
443 0 reduceScatter 275427 275427 275427 204020 11346849 0
445 1) The first four values after the name of the action declare the recvcounts array
446 2) The value 11346849 is the amount of instructions
447 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
449 comm_size = MPI_COMM_WORLD->size();
450 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
451 comp_size = parse_double(action[2+comm_size]);
452 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
453 if (action.size() > 3 + comm_size)
454 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
456 for (unsigned int i = 0; i < comm_size; i++) {
457 recvcounts->push_back(std::stoi(action[i + 2]));
459 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
463 class AllToAllVArgParser : public CollCommParser {
467 std::shared_ptr<std::vector<int>> recvcounts;
468 std::shared_ptr<std::vector<int>> sendcounts;
469 std::vector<int> senddisps;
470 std::vector<int> recvdisps;
473 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
475 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
476 0 allToAllV 100 1 7 10 12 100 1 70 10 5
478 1) 100 is the size of the send buffer *sizeof(int),
479 2) 1 7 10 12 is the sendcounts array
480 3) 100*sizeof(int) is the size of the receiver buffer
481 4) 1 70 10 5 is the recvcounts array
483 comm_size = MPI_COMM_WORLD->size();
484 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
485 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
486 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
487 senddisps = std::vector<int>(comm_size, 0);
488 recvdisps = std::vector<int>(comm_size, 0);
490 if (action.size() > 5 + 2 * comm_size)
491 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
492 if (action.size() > 5 + 2 * comm_size)
493 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
495 send_buf_size=parse_double(action[2]);
496 recv_buf_size=parse_double(action[3+comm_size]);
497 for (unsigned int i = 0; i < comm_size; i++) {
498 (*sendcounts)[i] = std::stoi(action[3 + i]);
499 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
501 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
502 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
507 * Base class for all ReplayActions.
508 * Note that this class actually implements the behavior of each action
509 * while the parsing of the replay arguments is done in the @ActionArgParser class.
510 * In other words: The logic goes here, the setup is done by the ActionArgParser.
512 template <class T> class ReplayAction {
514 const std::string name;
515 const int my_proc_id;
519 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
520 virtual ~ReplayAction() = default;
522 virtual void execute(simgrid::xbt::ReplayAction& action)
524 // Needs to be re-initialized for every action, hence here
525 double start_time = smpi_process()->simulated_elapsed();
526 args.parse(action, name);
529 log_timed_action(action, start_time);
532 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
534 void* send_buffer(int size)
536 return smpi_get_tmp_sendbuffer(size);
539 void* recv_buffer(int size)
541 return smpi_get_tmp_recvbuffer(size);
545 class WaitAction : public ReplayAction<WaitTestParser> {
547 RequestStorage& req_storage;
550 explicit WaitAction(RequestStorage& storage) : ReplayAction("Wait"), req_storage(storage) {}
551 void kernel(simgrid::xbt::ReplayAction& action) override
553 std::string s = boost::algorithm::join(action, " ");
554 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
555 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
556 req_storage.remove(request);
558 if (request == MPI_REQUEST_NULL) {
559 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
564 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
566 // Must be taken before Request::wait() since the request may be set to
567 // MPI_REQUEST_NULL by Request::wait!
568 bool is_wait_for_receive = (request->flags() & RECV);
569 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
570 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
573 Request::wait(&request, &status);
575 TRACE_smpi_comm_out(rank);
576 if (is_wait_for_receive)
577 TRACE_smpi_recv(args.src, args.dst, args.tag);
581 class SendAction : public ReplayAction<SendRecvParser> {
583 RequestStorage& req_storage;
586 explicit SendAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
587 void kernel(simgrid::xbt::ReplayAction& action) override
589 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
591 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
592 args.tag, Datatype::encode(args.datatype1)));
593 if (not TRACE_smpi_view_internals())
594 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
596 if (name == "send") {
597 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
598 } else if (name == "Isend") {
599 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
600 req_storage.add(request);
602 xbt_die("Don't know this action, %s", name.c_str());
605 TRACE_smpi_comm_out(my_proc_id);
609 class RecvAction : public ReplayAction<SendRecvParser> {
611 RequestStorage& req_storage;
614 explicit RecvAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
615 void kernel(simgrid::xbt::ReplayAction& action) override
617 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
619 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
620 args.tag, Datatype::encode(args.datatype1)));
623 // unknown size from the receiver point of view
624 if (args.size <= 0.0) {
625 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
626 args.size = status.count;
629 if (name == "recv") {
630 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
631 } else if (name == "Irecv") {
632 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
633 req_storage.add(request);
636 TRACE_smpi_comm_out(my_proc_id);
637 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
638 if (name == "recv" && not TRACE_smpi_view_internals()) {
639 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
644 class ComputeAction : public ReplayAction<ComputeParser> {
646 ComputeAction() : ReplayAction("compute") {}
647 void kernel(simgrid::xbt::ReplayAction& action) override
649 TRACE_smpi_computing_in(my_proc_id, args.flops);
650 smpi_execute_flops(args.flops);
651 TRACE_smpi_computing_out(my_proc_id);
655 class TestAction : public ReplayAction<WaitTestParser> {
657 RequestStorage& req_storage;
660 explicit TestAction(RequestStorage& storage) : ReplayAction("Test"), req_storage(storage) {}
661 void kernel(simgrid::xbt::ReplayAction& action) override
663 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
664 req_storage.remove(request);
665 // if request is null here, this may mean that a previous test has succeeded
666 // Different times in traced application and replayed version may lead to this
667 // In this case, ignore the extra calls.
668 if (request != MPI_REQUEST_NULL) {
669 TRACE_smpi_testing_in(my_proc_id);
672 int flag = Request::test(&request, &status);
674 XBT_DEBUG("MPI_Test result: %d", flag);
675 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
677 if (request == MPI_REQUEST_NULL)
678 req_storage.addNullRequest(args.src, args.dst, args.tag);
680 req_storage.add(request);
682 TRACE_smpi_testing_out(my_proc_id);
687 class InitAction : public ReplayAction<ActionArgParser> {
689 InitAction() : ReplayAction("Init") {}
690 void kernel(simgrid::xbt::ReplayAction& action) override
692 CHECK_ACTION_PARAMS(action, 0, 1)
693 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
694 : MPI_BYTE; // default TAU datatype
696 /* start a simulated timer */
697 smpi_process()->simulated_start();
701 class CommunicatorAction : public ReplayAction<ActionArgParser> {
703 CommunicatorAction() : ReplayAction("Comm") {}
704 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
707 class WaitAllAction : public ReplayAction<ActionArgParser> {
709 RequestStorage& req_storage;
712 explicit WaitAllAction(RequestStorage& storage) : ReplayAction("waitAll"), req_storage(storage) {}
713 void kernel(simgrid::xbt::ReplayAction& action) override
715 const unsigned int count_requests = req_storage.size();
717 if (count_requests > 0) {
718 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
719 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
720 std::vector<MPI_Request> reqs;
721 req_storage.get_requests(reqs);
722 for (const auto& req : reqs) {
723 if (req && (req->flags() & RECV)) {
724 sender_receiver.push_back({req->src(), req->dst()});
727 MPI_Status status[count_requests];
728 Request::waitall(count_requests, &(reqs.data())[0], status);
729 req_storage.get_store().clear();
731 for (auto& pair : sender_receiver) {
732 TRACE_smpi_recv(pair.first, pair.second, 0);
734 TRACE_smpi_comm_out(my_proc_id);
739 class BarrierAction : public ReplayAction<ActionArgParser> {
741 BarrierAction() : ReplayAction("barrier") {}
742 void kernel(simgrid::xbt::ReplayAction& action) override
744 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
745 Colls::barrier(MPI_COMM_WORLD);
746 TRACE_smpi_comm_out(my_proc_id);
750 class BcastAction : public ReplayAction<BcastArgParser> {
752 BcastAction() : ReplayAction("bcast") {}
753 void kernel(simgrid::xbt::ReplayAction& action) override
755 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
756 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
757 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
759 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
761 TRACE_smpi_comm_out(my_proc_id);
765 class ReduceAction : public ReplayAction<ReduceArgParser> {
767 ReduceAction() : ReplayAction("reduce") {}
768 void kernel(simgrid::xbt::ReplayAction& action) override
770 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
771 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
772 args.comp_size, args.comm_size, -1,
773 Datatype::encode(args.datatype1), ""));
775 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
776 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
777 smpi_execute_flops(args.comp_size);
779 TRACE_smpi_comm_out(my_proc_id);
783 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
785 AllReduceAction() : ReplayAction("allReduce") {}
786 void kernel(simgrid::xbt::ReplayAction& action) override
788 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
789 Datatype::encode(args.datatype1), ""));
791 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
792 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
793 smpi_execute_flops(args.comp_size);
795 TRACE_smpi_comm_out(my_proc_id);
799 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
801 AllToAllAction() : ReplayAction("allToAll") {}
802 void kernel(simgrid::xbt::ReplayAction& action) override
804 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
805 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
806 Datatype::encode(args.datatype1),
807 Datatype::encode(args.datatype2)));
809 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
810 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
811 args.recv_size, args.datatype2, MPI_COMM_WORLD);
813 TRACE_smpi_comm_out(my_proc_id);
817 class GatherAction : public ReplayAction<GatherArgParser> {
819 explicit GatherAction(std::string name) : ReplayAction(name) {}
820 void kernel(simgrid::xbt::ReplayAction& action) override
822 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
823 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
825 if (name == "gather") {
826 int rank = MPI_COMM_WORLD->rank();
827 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
828 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
831 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
832 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
834 TRACE_smpi_comm_out(my_proc_id);
838 class GatherVAction : public ReplayAction<GatherVArgParser> {
840 explicit GatherVAction(std::string name) : ReplayAction(name) {}
841 void kernel(simgrid::xbt::ReplayAction& action) override
843 int rank = MPI_COMM_WORLD->rank();
845 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
846 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
847 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
849 if (name == "gatherV") {
850 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
851 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
852 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
855 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
856 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
857 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
860 TRACE_smpi_comm_out(my_proc_id);
864 class ScatterAction : public ReplayAction<ScatterArgParser> {
866 ScatterAction() : ReplayAction("scatter") {}
867 void kernel(simgrid::xbt::ReplayAction& action) override
869 int rank = MPI_COMM_WORLD->rank();
870 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
871 Datatype::encode(args.datatype1),
872 Datatype::encode(args.datatype2)));
874 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
875 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
877 TRACE_smpi_comm_out(my_proc_id);
882 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
884 ScatterVAction() : ReplayAction("scatterV") {}
885 void kernel(simgrid::xbt::ReplayAction& action) override
887 int rank = MPI_COMM_WORLD->rank();
888 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
889 nullptr, Datatype::encode(args.datatype1),
890 Datatype::encode(args.datatype2)));
892 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
893 args.sendcounts->data(), args.disps.data(), args.datatype1,
894 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
897 TRACE_smpi_comm_out(my_proc_id);
901 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
903 ReduceScatterAction() : ReplayAction("reduceScatter") {}
904 void kernel(simgrid::xbt::ReplayAction& action) override
906 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
907 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
908 std::to_string(args.comp_size), /* ugly hack to print comp_size */
909 Datatype::encode(args.datatype1)));
911 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
912 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
913 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
915 smpi_execute_flops(args.comp_size);
916 TRACE_smpi_comm_out(my_proc_id);
920 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
922 AllToAllVAction() : ReplayAction("allToAllV") {}
923 void kernel(simgrid::xbt::ReplayAction& action) override
925 TRACE_smpi_comm_in(my_proc_id, __func__,
926 new simgrid::instr::VarCollTIData(
927 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
928 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
930 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
931 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
933 TRACE_smpi_comm_out(my_proc_id);
936 } // Replay Namespace
937 }} // namespace simgrid::smpi
939 std::vector<simgrid::smpi::replay::RequestStorage> storage;
940 /** @brief Only initialize the replay, don't do it for real */
941 void smpi_replay_init(int* argc, char*** argv)
943 simgrid::smpi::Process::init(argc, argv);
944 smpi_process()->mark_as_initialized();
945 smpi_process()->set_replaying(true);
947 int my_proc_id = simgrid::s4u::this_actor::get_pid();
948 storage.resize(smpi_process_count());
950 TRACE_smpi_init(my_proc_id);
951 TRACE_smpi_computing_init(my_proc_id);
952 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
953 TRACE_smpi_comm_out(my_proc_id);
954 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
955 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
956 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
957 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
958 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
959 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
960 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
961 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
962 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
963 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
964 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
965 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
966 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
967 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
968 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
969 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
970 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
971 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
972 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
973 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
974 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
975 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
976 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
977 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
978 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
979 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
981 //if we have a delayed start, sleep here.
983 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
984 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
985 smpi_execute_flops(value);
987 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
988 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
989 smpi_execute_flops(0.0);
993 /** @brief actually run the replay after initialization */
994 void smpi_replay_main(int* argc, char*** argv)
996 static int active_processes = 0;
998 simgrid::xbt::replay_runner(*argc, *argv);
1000 /* and now, finalize everything */
1001 /* One active process will stop. Decrease the counter*/
1002 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
1003 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
1004 if (count_requests > 0) {
1005 MPI_Request requests[count_requests];
1006 MPI_Status status[count_requests];
1009 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
1010 requests[i] = pair.second;
1013 simgrid::smpi::Request::waitall(count_requests, requests, status);
1017 if(active_processes==0){
1018 /* Last process alive speaking: end the simulated timer */
1019 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
1020 smpi_free_replay_tmp_buffers();
1023 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
1024 new simgrid::instr::NoOpTIData("finalize"));
1026 smpi_process()->finalize();
1028 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
1029 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
1032 /** @brief chain a replay initialization and a replay start */
1033 void smpi_replay_run(int* argc, char*** argv)
1035 smpi_replay_init(argc, argv);
1036 smpi_replay_main(argc, argv);