1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
23 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
24 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
25 // this could go into a header file.
26 namespace hash_tuple {
27 template <typename TT> class hash {
29 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 // Recursive template code derived from Matthieu M.
38 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 static void apply(size_t& seed, Tuple const& tuple)
42 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
43 hash_combine(seed, std::get<Index>(tuple));
47 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 template <typename... TT> class hash<std::tuple<TT...>> {
54 size_t operator()(std::tuple<TT...> const& tt) const
57 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
63 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
65 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
66 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
68 static MPI_Datatype MPI_DEFAULT_TYPE;
70 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
72 if (action.size() < static_cast<unsigned long>(mandatory + 2)) { \
73 std::stringstream ss; \
74 for (const auto& elem : action) { \
77 THROWF(arg_error, 0, "%s replay failed.\n" \
78 "%zu items were given on the line. First two should be process_id and action. " \
79 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
80 "The full line that was given is:\n %s\n" \
81 "Please contact the Simgrid team if support is needed", \
82 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional), \
87 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
89 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
90 std::string s = boost::algorithm::join(action, " ");
91 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
96 static double parse_double(std::string string)
98 return xbt_str_parse_double(string.c_str(), "%s is not a double");
106 class RequestStorage {
117 req_storage_t& get_store()
122 void get_requests(std::vector<MPI_Request>& vec)
124 for (auto& pair : store) {
125 auto& req = pair.second;
126 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
127 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
128 vec.push_back(pair.second);
129 pair.second->print_request("MM");
134 MPI_Request find(int src, int dst, int tag)
136 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
137 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
140 void remove(MPI_Request req)
142 if (req == MPI_REQUEST_NULL) return;
144 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
147 void add(MPI_Request req)
149 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
150 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
153 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
154 void addNullRequest(int src, int dst, int tag)
156 store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
160 class ActionArgParser {
162 virtual ~ActionArgParser() = default;
163 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
166 class WaitTestParser : public ActionArgParser {
172 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
174 CHECK_ACTION_PARAMS(action, 3, 0)
175 src = std::stoi(action[2]);
176 dst = std::stoi(action[3]);
177 tag = std::stoi(action[4]);
181 class SendRecvParser : public ActionArgParser {
183 /* communication partner; if we send, this is the receiver and vice versa */
187 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
189 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
191 CHECK_ACTION_PARAMS(action, 3, 1)
192 partner = std::stoi(action[2]);
193 tag = std::stoi(action[3]);
194 size = parse_double(action[4]);
195 if (action.size() > 5)
196 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
200 class ComputeParser : public ActionArgParser {
202 /* communication partner; if we send, this is the receiver and vice versa */
205 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
207 CHECK_ACTION_PARAMS(action, 1, 0)
208 flops = parse_double(action[2]);
212 class CollCommParser : public ActionArgParser {
220 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
221 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
224 class BcastArgParser : public CollCommParser {
226 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
228 CHECK_ACTION_PARAMS(action, 1, 2)
229 size = parse_double(action[2]);
230 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
231 if (action.size() > 4)
232 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
236 class ReduceArgParser : public CollCommParser {
238 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
240 CHECK_ACTION_PARAMS(action, 2, 2)
241 comm_size = parse_double(action[2]);
242 comp_size = parse_double(action[3]);
243 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
244 if (action.size() > 5)
245 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
249 class AllReduceArgParser : public CollCommParser {
251 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
253 CHECK_ACTION_PARAMS(action, 2, 1)
254 comm_size = parse_double(action[2]);
255 comp_size = parse_double(action[3]);
256 if (action.size() > 4)
257 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
261 class AllToAllArgParser : public CollCommParser {
263 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
265 CHECK_ACTION_PARAMS(action, 2, 1)
266 comm_size = MPI_COMM_WORLD->size();
267 send_size = parse_double(action[2]);
268 recv_size = parse_double(action[3]);
270 if (action.size() > 4)
271 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
272 if (action.size() > 5)
273 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
277 class GatherArgParser : public CollCommParser {
279 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
281 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
284 1) 68 is the sendcounts
285 2) 68 is the recvcounts
286 3) 0 is the root node
287 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
288 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
290 CHECK_ACTION_PARAMS(action, 2, 3)
291 comm_size = MPI_COMM_WORLD->size();
292 send_size = parse_double(action[2]);
293 recv_size = parse_double(action[3]);
295 if (name == "gather") {
296 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
297 if (action.size() > 5)
298 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
299 if (action.size() > 6)
300 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
303 if (action.size() > 4)
304 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
305 if (action.size() > 5)
306 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
311 class GatherVArgParser : public CollCommParser {
314 std::shared_ptr<std::vector<int>> recvcounts;
315 std::vector<int> disps;
316 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
318 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
319 0 gather 68 68 10 10 10 0 0 0
321 1) 68 is the sendcount
322 2) 68 10 10 10 is the recvcounts
323 3) 0 is the root node
324 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
327 comm_size = MPI_COMM_WORLD->size();
328 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
329 send_size = parse_double(action[2]);
330 disps = std::vector<int>(comm_size, 0);
331 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
333 if (name == "gatherV") {
334 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
335 if (action.size() > 4 + comm_size)
336 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
337 if (action.size() > 5 + comm_size)
338 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
341 int datatype_index = 0;
343 /* The 3 comes from "0 gather <sendcount>", which must always be present.
344 * The + comm_size is the recvcounts array, which must also be present
346 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
347 datatype_index = 3 + comm_size;
348 disp_index = datatype_index + 1;
349 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
350 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
351 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
352 disp_index = 3 + comm_size;
353 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
354 datatype_index = 3 + comm_size;
355 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
356 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
359 if (disp_index != 0) {
360 for (unsigned int i = 0; i < comm_size; i++)
361 disps[i] = std::stoi(action[disp_index + i]);
365 for (unsigned int i = 0; i < comm_size; i++) {
366 (*recvcounts)[i] = std::stoi(action[i + 3]);
368 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
372 class ScatterArgParser : public CollCommParser {
374 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
376 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
379 1) 68 is the sendcounts
380 2) 68 is the recvcounts
381 3) 0 is the root node
382 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
383 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
385 CHECK_ACTION_PARAMS(action, 2, 3)
386 comm_size = MPI_COMM_WORLD->size();
387 send_size = parse_double(action[2]);
388 recv_size = parse_double(action[3]);
389 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
390 if (action.size() > 5)
391 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
392 if (action.size() > 6)
393 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
397 class ScatterVArgParser : public CollCommParser {
401 std::shared_ptr<std::vector<int>> sendcounts;
402 std::vector<int> disps;
403 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
405 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
406 0 gather 68 10 10 10 68 0 0 0
408 1) 68 10 10 10 is the sendcounts
409 2) 68 is the recvcount
410 3) 0 is the root node
411 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
412 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
414 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
415 recv_size = parse_double(action[2 + comm_size]);
416 disps = std::vector<int>(comm_size, 0);
417 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
419 if (action.size() > 5 + comm_size)
420 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
421 if (action.size() > 5 + comm_size)
422 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
424 for (unsigned int i = 0; i < comm_size; i++) {
425 (*sendcounts)[i] = std::stoi(action[i + 2]);
427 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
428 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
432 class ReduceScatterArgParser : public CollCommParser {
435 std::shared_ptr<std::vector<int>> recvcounts;
436 std::vector<int> disps;
437 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
439 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
440 0 reduceScatter 275427 275427 275427 204020 11346849 0
442 1) The first four values after the name of the action declare the recvcounts array
443 2) The value 11346849 is the amount of instructions
444 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
446 comm_size = MPI_COMM_WORLD->size();
447 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
448 comp_size = parse_double(action[2+comm_size]);
449 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
450 if (action.size() > 3 + comm_size)
451 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
453 for (unsigned int i = 0; i < comm_size; i++) {
454 recvcounts->push_back(std::stoi(action[i + 2]));
456 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
460 class AllToAllVArgParser : public CollCommParser {
464 std::shared_ptr<std::vector<int>> recvcounts;
465 std::shared_ptr<std::vector<int>> sendcounts;
466 std::vector<int> senddisps;
467 std::vector<int> recvdisps;
470 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
472 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
473 0 allToAllV 100 1 7 10 12 100 1 70 10 5
475 1) 100 is the size of the send buffer *sizeof(int),
476 2) 1 7 10 12 is the sendcounts array
477 3) 100*sizeof(int) is the size of the receiver buffer
478 4) 1 70 10 5 is the recvcounts array
480 comm_size = MPI_COMM_WORLD->size();
481 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
482 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
483 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
484 senddisps = std::vector<int>(comm_size, 0);
485 recvdisps = std::vector<int>(comm_size, 0);
487 if (action.size() > 5 + 2 * comm_size)
488 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
489 if (action.size() > 5 + 2 * comm_size)
490 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
492 send_buf_size=parse_double(action[2]);
493 recv_buf_size=parse_double(action[3+comm_size]);
494 for (unsigned int i = 0; i < comm_size; i++) {
495 (*sendcounts)[i] = std::stoi(action[3 + i]);
496 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
498 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
499 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
503 template <class T> class ReplayAction {
505 const std::string name;
506 RequestStorage* req_storage; // Points to the right storage for this process, nullptr except for Send/Recv/Wait/Test actions.
507 const int my_proc_id;
511 explicit ReplayAction(std::string name, RequestStorage& storage) : name(name), req_storage(&storage), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
512 explicit ReplayAction(std::string name) : name(name), req_storage(nullptr), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
513 virtual ~ReplayAction() = default;
515 virtual void execute(simgrid::xbt::ReplayAction& action)
517 // Needs to be re-initialized for every action, hence here
518 double start_time = smpi_process()->simulated_elapsed();
519 args.parse(action, name);
522 log_timed_action(action, start_time);
525 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
527 void* send_buffer(int size)
529 return smpi_get_tmp_sendbuffer(size);
532 void* recv_buffer(int size)
534 return smpi_get_tmp_recvbuffer(size);
538 class WaitAction : public ReplayAction<WaitTestParser> {
540 explicit WaitAction(RequestStorage& storage) : ReplayAction("Wait", storage) {}
541 void kernel(simgrid::xbt::ReplayAction& action) override
543 std::string s = boost::algorithm::join(action, " ");
544 xbt_assert(req_storage->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
545 MPI_Request request = req_storage->find(args.src, args.dst, args.tag);
546 req_storage->remove(request);
548 if (request == MPI_REQUEST_NULL) {
549 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
554 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
556 // Must be taken before Request::wait() since the request may be set to
557 // MPI_REQUEST_NULL by Request::wait!
558 bool is_wait_for_receive = (request->flags() & RECV);
559 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
560 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
563 Request::wait(&request, &status);
565 TRACE_smpi_comm_out(rank);
566 if (is_wait_for_receive)
567 TRACE_smpi_recv(args.src, args.dst, args.tag);
571 class SendAction : public ReplayAction<SendRecvParser> {
573 SendAction() = delete;
574 explicit SendAction(std::string name, RequestStorage& storage) : ReplayAction(name, storage) {}
575 void kernel(simgrid::xbt::ReplayAction& action) override
577 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
579 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
580 args.tag, Datatype::encode(args.datatype1)));
581 if (not TRACE_smpi_view_internals())
582 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
584 if (name == "send") {
585 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
586 } else if (name == "Isend") {
587 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
588 req_storage->add(request);
590 xbt_die("Don't know this action, %s", name.c_str());
593 TRACE_smpi_comm_out(my_proc_id);
597 class RecvAction : public ReplayAction<SendRecvParser> {
599 RecvAction() = delete;
600 explicit RecvAction(std::string name, RequestStorage& storage) : ReplayAction(name, storage) {}
601 void kernel(simgrid::xbt::ReplayAction& action) override
603 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
605 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
606 args.tag, Datatype::encode(args.datatype1)));
609 // unknown size from the receiver point of view
610 if (args.size <= 0.0) {
611 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
612 args.size = status.count;
615 if (name == "recv") {
616 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
617 } else if (name == "Irecv") {
618 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
619 req_storage->add(request);
622 TRACE_smpi_comm_out(my_proc_id);
623 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
624 if (name == "recv" && not TRACE_smpi_view_internals()) {
625 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
630 class ComputeAction : public ReplayAction<ComputeParser> {
632 ComputeAction() : ReplayAction("compute") {}
633 void kernel(simgrid::xbt::ReplayAction& action) override
635 TRACE_smpi_computing_in(my_proc_id, args.flops);
636 smpi_execute_flops(args.flops);
637 TRACE_smpi_computing_out(my_proc_id);
641 class TestAction : public ReplayAction<WaitTestParser> {
643 explicit TestAction(RequestStorage& storage) : ReplayAction("Test", storage) {}
644 void kernel(simgrid::xbt::ReplayAction& action) override
646 MPI_Request request = req_storage->find(args.src, args.dst, args.tag);
647 req_storage->remove(request);
648 // if request is null here, this may mean that a previous test has succeeded
649 // Different times in traced application and replayed version may lead to this
650 // In this case, ignore the extra calls.
651 if (request != MPI_REQUEST_NULL) {
652 TRACE_smpi_testing_in(my_proc_id);
655 int flag = Request::test(&request, &status);
657 XBT_DEBUG("MPI_Test result: %d", flag);
658 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
660 if (request == MPI_REQUEST_NULL)
661 req_storage->addNullRequest(args.src, args.dst, args.tag);
663 req_storage->add(request);
665 TRACE_smpi_testing_out(my_proc_id);
670 class InitAction : public ReplayAction<ActionArgParser> {
672 InitAction() : ReplayAction("Init") {}
673 void kernel(simgrid::xbt::ReplayAction& action) override
675 CHECK_ACTION_PARAMS(action, 0, 1)
676 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
677 : MPI_BYTE; // default TAU datatype
679 /* start a simulated timer */
680 smpi_process()->simulated_start();
684 class CommunicatorAction : public ReplayAction<ActionArgParser> {
686 CommunicatorAction() : ReplayAction("Comm") {}
687 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
690 class WaitAllAction : public ReplayAction<ActionArgParser> {
692 explicit WaitAllAction(RequestStorage& storage) : ReplayAction("waitAll", storage) {}
693 void kernel(simgrid::xbt::ReplayAction& action) override
695 const unsigned int count_requests = req_storage->size();
697 if (count_requests > 0) {
698 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
699 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
700 std::vector<MPI_Request> reqs;
701 req_storage->get_requests(reqs);
702 for (const auto& req : reqs) {
703 if (req && (req->flags() & RECV)) {
704 sender_receiver.push_back({req->src(), req->dst()});
707 MPI_Status status[count_requests];
708 Request::waitall(count_requests, &(reqs.data())[0], status);
709 req_storage->get_store().clear();
711 for (auto& pair : sender_receiver) {
712 TRACE_smpi_recv(pair.first, pair.second, 0);
714 TRACE_smpi_comm_out(my_proc_id);
719 class BarrierAction : public ReplayAction<ActionArgParser> {
721 BarrierAction() : ReplayAction("barrier") {}
722 void kernel(simgrid::xbt::ReplayAction& action) override
724 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
725 Colls::barrier(MPI_COMM_WORLD);
726 TRACE_smpi_comm_out(my_proc_id);
730 class BcastAction : public ReplayAction<BcastArgParser> {
732 BcastAction() : ReplayAction("bcast") {}
733 void kernel(simgrid::xbt::ReplayAction& action) override
735 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
736 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
737 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
739 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
741 TRACE_smpi_comm_out(my_proc_id);
745 class ReduceAction : public ReplayAction<ReduceArgParser> {
747 ReduceAction() : ReplayAction("reduce") {}
748 void kernel(simgrid::xbt::ReplayAction& action) override
750 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
751 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
752 args.comp_size, args.comm_size, -1,
753 Datatype::encode(args.datatype1), ""));
755 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
756 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
757 smpi_execute_flops(args.comp_size);
759 TRACE_smpi_comm_out(my_proc_id);
763 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
765 AllReduceAction() : ReplayAction("allReduce") {}
766 void kernel(simgrid::xbt::ReplayAction& action) override
768 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
769 Datatype::encode(args.datatype1), ""));
771 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
772 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
773 smpi_execute_flops(args.comp_size);
775 TRACE_smpi_comm_out(my_proc_id);
779 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
781 AllToAllAction() : ReplayAction("allToAll") {}
782 void kernel(simgrid::xbt::ReplayAction& action) override
784 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
785 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
786 Datatype::encode(args.datatype1),
787 Datatype::encode(args.datatype2)));
789 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
790 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
791 args.recv_size, args.datatype2, MPI_COMM_WORLD);
793 TRACE_smpi_comm_out(my_proc_id);
797 class GatherAction : public ReplayAction<GatherArgParser> {
799 explicit GatherAction(std::string name) : ReplayAction(name) {}
800 void kernel(simgrid::xbt::ReplayAction& action) override
802 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
803 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
805 if (name == "gather") {
806 int rank = MPI_COMM_WORLD->rank();
807 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
808 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
811 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
812 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
814 TRACE_smpi_comm_out(my_proc_id);
818 class GatherVAction : public ReplayAction<GatherVArgParser> {
820 explicit GatherVAction(std::string name) : ReplayAction(name) {}
821 void kernel(simgrid::xbt::ReplayAction& action) override
823 int rank = MPI_COMM_WORLD->rank();
825 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
826 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
827 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
829 if (name == "gatherV") {
830 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
831 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
832 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
835 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
836 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
837 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
840 TRACE_smpi_comm_out(my_proc_id);
844 class ScatterAction : public ReplayAction<ScatterArgParser> {
846 ScatterAction() : ReplayAction("scatter") {}
847 void kernel(simgrid::xbt::ReplayAction& action) override
849 int rank = MPI_COMM_WORLD->rank();
850 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
851 Datatype::encode(args.datatype1),
852 Datatype::encode(args.datatype2)));
854 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
855 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
857 TRACE_smpi_comm_out(my_proc_id);
862 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
864 ScatterVAction() : ReplayAction("scatterV") {}
865 void kernel(simgrid::xbt::ReplayAction& action) override
867 int rank = MPI_COMM_WORLD->rank();
868 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
869 nullptr, Datatype::encode(args.datatype1),
870 Datatype::encode(args.datatype2)));
872 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
873 args.sendcounts->data(), args.disps.data(), args.datatype1,
874 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
877 TRACE_smpi_comm_out(my_proc_id);
881 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
883 ReduceScatterAction() : ReplayAction("reduceScatter") {}
884 void kernel(simgrid::xbt::ReplayAction& action) override
886 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
887 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
888 std::to_string(args.comp_size), /* ugly hack to print comp_size */
889 Datatype::encode(args.datatype1)));
891 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
892 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
893 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
895 smpi_execute_flops(args.comp_size);
896 TRACE_smpi_comm_out(my_proc_id);
900 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
902 AllToAllVAction() : ReplayAction("allToAllV") {}
903 void kernel(simgrid::xbt::ReplayAction& action) override
905 TRACE_smpi_comm_in(my_proc_id, __func__,
906 new simgrid::instr::VarCollTIData(
907 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
908 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
910 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
911 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
913 TRACE_smpi_comm_out(my_proc_id);
916 } // Replay Namespace
917 }} // namespace simgrid::smpi
919 std::vector<simgrid::smpi::replay::RequestStorage> storage;
920 /** @brief Only initialize the replay, don't do it for real */
921 void smpi_replay_init(int* argc, char*** argv)
923 simgrid::smpi::Process::init(argc, argv);
924 smpi_process()->mark_as_initialized();
925 smpi_process()->set_replaying(true);
927 int my_proc_id = simgrid::s4u::this_actor::get_pid();
928 storage.resize(smpi_process_count());
930 TRACE_smpi_init(my_proc_id);
931 TRACE_smpi_computing_init(my_proc_id);
932 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
933 TRACE_smpi_comm_out(my_proc_id);
934 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
935 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
936 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
937 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
938 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
940 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
941 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
942 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
943 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
944 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
945 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
946 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
947 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
948 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
949 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
950 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
951 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
952 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
953 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
954 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
955 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
956 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
957 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
958 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
959 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
960 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
962 //if we have a delayed start, sleep here.
964 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
965 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
966 smpi_execute_flops(value);
968 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
969 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
970 smpi_execute_flops(0.0);
974 /** @brief actually run the replay after initialization */
975 void smpi_replay_main(int* argc, char*** argv)
977 static int active_processes = 0;
979 simgrid::xbt::replay_runner(*argc, *argv);
981 /* and now, finalize everything */
982 /* One active process will stop. Decrease the counter*/
983 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
984 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
985 if (count_requests > 0) {
986 MPI_Request requests[count_requests];
987 MPI_Status status[count_requests];
990 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
991 requests[i] = pair.second;
994 simgrid::smpi::Request::waitall(count_requests, requests, status);
998 if(active_processes==0){
999 /* Last process alive speaking: end the simulated timer */
1000 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
1001 smpi_free_replay_tmp_buffers();
1004 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
1005 new simgrid::instr::NoOpTIData("finalize"));
1007 smpi_process()->finalize();
1009 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
1010 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
1013 /** @brief chain a replay initialization and a replay start */
1014 void smpi_replay_run(int* argc, char*** argv)
1016 smpi_replay_init(argc, argv);
1017 smpi_replay_main(argc, argv);