1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
22 using simgrid::s4u::Actor;
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
29 template <typename TT>
33 operator()(TT const& tt) const
35 return std::hash<TT>()(tt);
40 inline void hash_combine(std::size_t& seed, T const& v)
42 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
45 // Recursive template code derived from Matthieu M.
46 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1>
49 static void apply(size_t& seed, Tuple const& tuple)
51 HashValueImpl<Tuple, Index-1>::apply(seed, tuple);
52 hash_combine(seed, std::get<Index>(tuple));
56 template <class Tuple>
57 struct HashValueImpl<Tuple,0>
59 static void apply(size_t& seed, Tuple const& tuple)
61 hash_combine(seed, std::get<0>(tuple));
65 template <typename ... TT>
66 struct hash<std::tuple<TT...>>
69 operator()(std::tuple<TT...> const& tt) const
72 HashValueImpl<std::tuple<TT...> >::apply(seed, tt);
78 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
80 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
82 static MPI_Datatype MPI_DEFAULT_TYPE;
84 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
86 if (action.size() < static_cast<unsigned long>(mandatory + 2)) { \
87 std::stringstream ss; \
88 for (const auto& elem : action) { \
91 THROWF(arg_error, 0, "%s replay failed.\n" \
92 "%zu items were given on the line. First two should be process_id and action. " \
93 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
94 "The full line that was given is:\n %s\n" \
95 "Please contact the Simgrid team if support is needed", \
96 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional), \
101 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
103 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
104 std::string s = boost::algorithm::join(action, " ");
105 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
109 static std::vector<MPI_Request>* get_reqq_self()
111 return reqq.at(simgrid::s4u::this_actor::get_pid());
114 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
116 reqq.insert({simgrid::s4u::this_actor::get_pid(), mpi_request});
119 /* Helper function */
120 static double parse_double(std::string string)
122 return xbt_str_parse_double(string.c_str(), "%s is not a double");
129 class ActionArgParser {
131 virtual ~ActionArgParser() = default;
132 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
135 class SendRecvParser : public ActionArgParser {
137 /* communication partner; if we send, this is the receiver and vice versa */
141 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
143 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
145 CHECK_ACTION_PARAMS(action, 3, 1)
146 partner = std::stoi(action[2]);
147 tag = std::stoi(action[3]);
148 size = parse_double(action[4]);
149 if (action.size() > 5)
150 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
154 class ComputeParser : public ActionArgParser {
156 /* communication partner; if we send, this is the receiver and vice versa */
159 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
161 CHECK_ACTION_PARAMS(action, 1, 0)
162 flops = parse_double(action[2]);
166 class CollCommParser : public ActionArgParser {
174 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
175 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
178 class BcastArgParser : public CollCommParser {
180 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
182 CHECK_ACTION_PARAMS(action, 1, 2)
183 size = parse_double(action[2]);
184 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
185 if (action.size() > 4)
186 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
190 class ReduceArgParser : public CollCommParser {
192 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
194 CHECK_ACTION_PARAMS(action, 2, 2)
195 comm_size = parse_double(action[2]);
196 comp_size = parse_double(action[3]);
197 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
198 if (action.size() > 5)
199 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
203 class AllReduceArgParser : public CollCommParser {
205 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
207 CHECK_ACTION_PARAMS(action, 2, 1)
208 comm_size = parse_double(action[2]);
209 comp_size = parse_double(action[3]);
210 if (action.size() > 4)
211 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
215 class AllToAllArgParser : public CollCommParser {
217 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
219 CHECK_ACTION_PARAMS(action, 2, 1)
220 comm_size = MPI_COMM_WORLD->size();
221 send_size = parse_double(action[2]);
222 recv_size = parse_double(action[3]);
224 if (action.size() > 4)
225 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
226 if (action.size() > 5)
227 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
231 class GatherArgParser : public CollCommParser {
233 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
235 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
238 1) 68 is the sendcounts
239 2) 68 is the recvcounts
240 3) 0 is the root node
241 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
242 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
244 CHECK_ACTION_PARAMS(action, 2, 3)
245 comm_size = MPI_COMM_WORLD->size();
246 send_size = parse_double(action[2]);
247 recv_size = parse_double(action[3]);
249 if (name == "gather") {
250 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
251 if (action.size() > 5)
252 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
253 if (action.size() > 6)
254 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
257 if (action.size() > 4)
258 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
259 if (action.size() > 5)
260 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
265 class GatherVArgParser : public CollCommParser {
268 std::shared_ptr<std::vector<int>> recvcounts;
269 std::vector<int> disps;
270 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
272 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
273 0 gather 68 68 10 10 10 0 0 0
275 1) 68 is the sendcount
276 2) 68 10 10 10 is the recvcounts
277 3) 0 is the root node
278 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
279 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
281 comm_size = MPI_COMM_WORLD->size();
282 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
283 send_size = parse_double(action[2]);
284 disps = std::vector<int>(comm_size, 0);
285 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
287 if (name == "gatherV") {
288 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
289 if (action.size() > 4 + comm_size)
290 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
291 if (action.size() > 5 + comm_size)
292 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
295 int datatype_index = 0;
297 /* The 3 comes from "0 gather <sendcount>", which must always be present.
298 * The + comm_size is the recvcounts array, which must also be present
300 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
301 datatype_index = 3 + comm_size;
302 disp_index = datatype_index + 1;
303 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
304 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
305 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
306 disp_index = 3 + comm_size;
307 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
308 datatype_index = 3 + comm_size;
309 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
310 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
313 if (disp_index != 0) {
314 for (unsigned int i = 0; i < comm_size; i++)
315 disps[i] = std::stoi(action[disp_index + i]);
319 for (unsigned int i = 0; i < comm_size; i++) {
320 (*recvcounts)[i] = std::stoi(action[i + 3]);
322 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
326 class ScatterArgParser : public CollCommParser {
328 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
330 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
333 1) 68 is the sendcounts
334 2) 68 is the recvcounts
335 3) 0 is the root node
336 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
337 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
339 CHECK_ACTION_PARAMS(action, 2, 3)
340 comm_size = MPI_COMM_WORLD->size();
341 send_size = parse_double(action[2]);
342 recv_size = parse_double(action[3]);
343 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
344 if (action.size() > 5)
345 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
346 if (action.size() > 6)
347 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
351 class ScatterVArgParser : public CollCommParser {
355 std::shared_ptr<std::vector<int>> sendcounts;
356 std::vector<int> disps;
357 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
359 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
360 0 gather 68 10 10 10 68 0 0 0
362 1) 68 10 10 10 is the sendcounts
363 2) 68 is the recvcount
364 3) 0 is the root node
365 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
366 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
368 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
369 recv_size = parse_double(action[2 + comm_size]);
370 disps = std::vector<int>(comm_size, 0);
371 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
373 if (action.size() > 5 + comm_size)
374 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
375 if (action.size() > 5 + comm_size)
376 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
378 for (unsigned int i = 0; i < comm_size; i++) {
379 (*sendcounts)[i] = std::stoi(action[i + 2]);
381 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
382 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
386 class ReduceScatterArgParser : public CollCommParser {
389 std::shared_ptr<std::vector<int>> recvcounts;
390 std::vector<int> disps;
391 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
393 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
394 0 reduceScatter 275427 275427 275427 204020 11346849 0
396 1) The first four values after the name of the action declare the recvcounts array
397 2) The value 11346849 is the amount of instructions
398 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
400 comm_size = MPI_COMM_WORLD->size();
401 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
402 comp_size = parse_double(action[2+comm_size]);
403 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
404 if (action.size() > 3 + comm_size)
405 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
407 for (unsigned int i = 0; i < comm_size; i++) {
408 recvcounts->push_back(std::stoi(action[i + 2]));
410 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
414 class AllToAllVArgParser : public CollCommParser {
418 std::shared_ptr<std::vector<int>> recvcounts;
419 std::shared_ptr<std::vector<int>> sendcounts;
420 std::vector<int> senddisps;
421 std::vector<int> recvdisps;
424 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
426 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
427 0 allToAllV 100 1 7 10 12 100 1 70 10 5
429 1) 100 is the size of the send buffer *sizeof(int),
430 2) 1 7 10 12 is the sendcounts array
431 3) 100*sizeof(int) is the size of the receiver buffer
432 4) 1 70 10 5 is the recvcounts array
434 comm_size = MPI_COMM_WORLD->size();
435 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
436 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
437 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
438 senddisps = std::vector<int>(comm_size, 0);
439 recvdisps = std::vector<int>(comm_size, 0);
441 if (action.size() > 5 + 2 * comm_size)
442 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
443 if (action.size() > 5 + 2 * comm_size)
444 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
446 send_buf_size=parse_double(action[2]);
447 recv_buf_size=parse_double(action[3+comm_size]);
448 for (unsigned int i = 0; i < comm_size; i++) {
449 (*sendcounts)[i] = std::stoi(action[3 + i]);
450 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
452 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
453 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
457 template <class T> class ReplayAction {
459 const std::string name;
460 const int my_proc_id;
464 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
465 virtual ~ReplayAction() = default;
467 virtual void execute(simgrid::xbt::ReplayAction& action)
469 // Needs to be re-initialized for every action, hence here
470 double start_time = smpi_process()->simulated_elapsed();
471 args.parse(action, name);
474 log_timed_action(action, start_time);
477 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
479 void* send_buffer(int size)
481 return smpi_get_tmp_sendbuffer(size);
484 void* recv_buffer(int size)
486 return smpi_get_tmp_recvbuffer(size);
490 class WaitAction : public ReplayAction<ActionArgParser> {
492 WaitAction() : ReplayAction("Wait") {}
493 void kernel(simgrid::xbt::ReplayAction& action) override
495 std::string s = boost::algorithm::join(action, " ");
496 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
497 MPI_Request request = get_reqq_self()->back();
498 get_reqq_self()->pop_back();
500 if (request == nullptr) {
501 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
506 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
508 // Must be taken before Request::wait() since the request may be set to
509 // MPI_REQUEST_NULL by Request::wait!
510 int src = request->comm()->group()->rank(request->src());
511 int dst = request->comm()->group()->rank(request->dst());
512 int tag = request->tag();
513 bool is_wait_for_receive = (request->flags() & RECV);
514 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
515 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
518 Request::wait(&request, &status);
520 TRACE_smpi_comm_out(rank);
521 if (is_wait_for_receive)
522 TRACE_smpi_recv(src, dst, tag);
526 class SendAction : public ReplayAction<SendRecvParser> {
528 SendAction() = delete;
529 explicit SendAction(std::string name) : ReplayAction(name) {}
530 void kernel(simgrid::xbt::ReplayAction& action) override
532 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
534 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
535 args.tag, Datatype::encode(args.datatype1)));
536 if (not TRACE_smpi_view_internals())
537 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
539 if (name == "send") {
540 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
541 } else if (name == "Isend") {
542 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
543 get_reqq_self()->push_back(request);
545 xbt_die("Don't know this action, %s", name.c_str());
548 TRACE_smpi_comm_out(my_proc_id);
552 class RecvAction : public ReplayAction<SendRecvParser> {
554 RecvAction() = delete;
555 explicit RecvAction(std::string name) : ReplayAction(name) {}
556 void kernel(simgrid::xbt::ReplayAction& action) override
558 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
560 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
561 args.tag, Datatype::encode(args.datatype1)));
564 // unknown size from the receiver point of view
565 if (args.size <= 0.0) {
566 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
567 args.size = status.count;
570 if (name == "recv") {
571 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
572 } else if (name == "Irecv") {
573 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
574 get_reqq_self()->push_back(request);
577 TRACE_smpi_comm_out(my_proc_id);
578 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
579 if (name == "recv" && not TRACE_smpi_view_internals()) {
580 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
585 class ComputeAction : public ReplayAction<ComputeParser> {
587 ComputeAction() : ReplayAction("compute") {}
588 void kernel(simgrid::xbt::ReplayAction& action) override
590 TRACE_smpi_computing_in(my_proc_id, args.flops);
591 smpi_execute_flops(args.flops);
592 TRACE_smpi_computing_out(my_proc_id);
596 class TestAction : public ReplayAction<ActionArgParser> {
598 TestAction() : ReplayAction("Test") {}
599 void kernel(simgrid::xbt::ReplayAction& action) override
601 MPI_Request request = get_reqq_self()->back();
602 get_reqq_self()->pop_back();
603 // if request is null here, this may mean that a previous test has succeeded
604 // Different times in traced application and replayed version may lead to this
605 // In this case, ignore the extra calls.
606 if (request != nullptr) {
607 TRACE_smpi_testing_in(my_proc_id);
610 int flag = Request::test(&request, &status);
612 XBT_DEBUG("MPI_Test result: %d", flag);
613 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
615 get_reqq_self()->push_back(request);
617 TRACE_smpi_testing_out(my_proc_id);
622 class InitAction : public ReplayAction<ActionArgParser> {
624 InitAction() : ReplayAction("Init") {}
625 void kernel(simgrid::xbt::ReplayAction& action) override
627 CHECK_ACTION_PARAMS(action, 0, 1)
628 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
629 : MPI_BYTE; // default TAU datatype
631 /* start a simulated timer */
632 smpi_process()->simulated_start();
633 set_reqq_self(new std::vector<MPI_Request>);
637 class CommunicatorAction : public ReplayAction<ActionArgParser> {
639 CommunicatorAction() : ReplayAction("Comm") {}
640 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
643 class WaitAllAction : public ReplayAction<ActionArgParser> {
645 WaitAllAction() : ReplayAction("waitAll") {}
646 void kernel(simgrid::xbt::ReplayAction& action) override
648 const unsigned int count_requests = get_reqq_self()->size();
650 if (count_requests > 0) {
651 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
652 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
653 for (const auto& req : (*get_reqq_self())) {
654 if (req && (req->flags() & RECV)) {
655 sender_receiver.push_back({req->src(), req->dst()});
658 MPI_Status status[count_requests];
659 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
661 for (auto& pair : sender_receiver) {
662 TRACE_smpi_recv(pair.first, pair.second, 0);
664 TRACE_smpi_comm_out(my_proc_id);
669 class BarrierAction : public ReplayAction<ActionArgParser> {
671 BarrierAction() : ReplayAction("barrier") {}
672 void kernel(simgrid::xbt::ReplayAction& action) override
674 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
675 Colls::barrier(MPI_COMM_WORLD);
676 TRACE_smpi_comm_out(my_proc_id);
680 class BcastAction : public ReplayAction<BcastArgParser> {
682 BcastAction() : ReplayAction("bcast") {}
683 void kernel(simgrid::xbt::ReplayAction& action) override
685 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
686 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
687 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
689 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
691 TRACE_smpi_comm_out(my_proc_id);
695 class ReduceAction : public ReplayAction<ReduceArgParser> {
697 ReduceAction() : ReplayAction("reduce") {}
698 void kernel(simgrid::xbt::ReplayAction& action) override
700 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
701 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
702 args.comp_size, args.comm_size, -1,
703 Datatype::encode(args.datatype1), ""));
705 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
706 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
707 smpi_execute_flops(args.comp_size);
709 TRACE_smpi_comm_out(my_proc_id);
713 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
715 AllReduceAction() : ReplayAction("allReduce") {}
716 void kernel(simgrid::xbt::ReplayAction& action) override
718 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
719 Datatype::encode(args.datatype1), ""));
721 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
722 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
723 smpi_execute_flops(args.comp_size);
725 TRACE_smpi_comm_out(my_proc_id);
729 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
731 AllToAllAction() : ReplayAction("allToAll") {}
732 void kernel(simgrid::xbt::ReplayAction& action) override
734 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
735 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
736 Datatype::encode(args.datatype1),
737 Datatype::encode(args.datatype2)));
739 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
740 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
741 args.recv_size, args.datatype2, MPI_COMM_WORLD);
743 TRACE_smpi_comm_out(my_proc_id);
747 class GatherAction : public ReplayAction<GatherArgParser> {
749 explicit GatherAction(std::string name) : ReplayAction(name) {}
750 void kernel(simgrid::xbt::ReplayAction& action) override
752 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
753 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
755 if (name == "gather") {
756 int rank = MPI_COMM_WORLD->rank();
757 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
758 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
761 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
762 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
764 TRACE_smpi_comm_out(my_proc_id);
768 class GatherVAction : public ReplayAction<GatherVArgParser> {
770 explicit GatherVAction(std::string name) : ReplayAction(name) {}
771 void kernel(simgrid::xbt::ReplayAction& action) override
773 int rank = MPI_COMM_WORLD->rank();
775 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
776 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
777 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
779 if (name == "gatherV") {
780 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
781 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
782 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
785 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
786 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
787 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
790 TRACE_smpi_comm_out(my_proc_id);
794 class ScatterAction : public ReplayAction<ScatterArgParser> {
796 ScatterAction() : ReplayAction("scatter") {}
797 void kernel(simgrid::xbt::ReplayAction& action) override
799 int rank = MPI_COMM_WORLD->rank();
800 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
801 Datatype::encode(args.datatype1),
802 Datatype::encode(args.datatype2)));
804 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
805 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
807 TRACE_smpi_comm_out(my_proc_id);
812 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
814 ScatterVAction() : ReplayAction("scatterV") {}
815 void kernel(simgrid::xbt::ReplayAction& action) override
817 int rank = MPI_COMM_WORLD->rank();
818 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
819 nullptr, Datatype::encode(args.datatype1),
820 Datatype::encode(args.datatype2)));
822 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
823 args.sendcounts->data(), args.disps.data(), args.datatype1,
824 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
827 TRACE_smpi_comm_out(my_proc_id);
831 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
833 ReduceScatterAction() : ReplayAction("reduceScatter") {}
834 void kernel(simgrid::xbt::ReplayAction& action) override
836 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
837 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
838 std::to_string(args.comp_size), /* ugly hack to print comp_size */
839 Datatype::encode(args.datatype1)));
841 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
842 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
843 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
845 smpi_execute_flops(args.comp_size);
846 TRACE_smpi_comm_out(my_proc_id);
850 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
852 AllToAllVAction() : ReplayAction("allToAllV") {}
853 void kernel(simgrid::xbt::ReplayAction& action) override
855 TRACE_smpi_comm_in(my_proc_id, __func__,
856 new simgrid::instr::VarCollTIData(
857 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
858 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
860 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
861 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
863 TRACE_smpi_comm_out(my_proc_id);
866 } // Replay Namespace
867 }} // namespace simgrid::smpi
869 /** @brief Only initialize the replay, don't do it for real */
870 void smpi_replay_init(int* argc, char*** argv)
872 simgrid::smpi::Process::init(argc, argv);
873 smpi_process()->mark_as_initialized();
874 smpi_process()->set_replaying(true);
876 int my_proc_id = simgrid::s4u::this_actor::get_pid();
877 TRACE_smpi_init(my_proc_id);
878 TRACE_smpi_computing_init(my_proc_id);
879 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
880 TRACE_smpi_comm_out(my_proc_id);
881 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
882 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
883 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
884 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
885 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
887 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
888 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
889 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
890 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
891 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
892 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
893 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
894 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
895 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
896 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
897 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
898 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
899 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
900 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
901 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
902 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
903 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
904 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
905 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
906 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
907 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
909 //if we have a delayed start, sleep here.
911 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
912 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
913 smpi_execute_flops(value);
915 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
916 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
917 smpi_execute_flops(0.0);
921 /** @brief actually run the replay after initialization */
922 void smpi_replay_main(int* argc, char*** argv)
924 static int active_processes = 0;
926 simgrid::xbt::replay_runner(*argc, *argv);
928 /* and now, finalize everything */
929 /* One active process will stop. Decrease the counter*/
930 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
931 if (not get_reqq_self()->empty()) {
932 unsigned int count_requests=get_reqq_self()->size();
933 MPI_Request requests[count_requests];
934 MPI_Status status[count_requests];
937 for (auto const& req : *get_reqq_self()) {
941 simgrid::smpi::Request::waitall(count_requests, requests, status);
943 delete get_reqq_self();
946 if(active_processes==0){
947 /* Last process alive speaking: end the simulated timer */
948 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
949 smpi_free_replay_tmp_buffers();
952 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
953 new simgrid::instr::NoOpTIData("finalize"));
955 smpi_process()->finalize();
957 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
958 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
961 /** @brief chain a replay initialization and a replay start */
962 void smpi_replay_run(int* argc, char*** argv)
964 smpi_replay_init(argc, argv);
965 smpi_replay_main(argc, argv);