1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
22 using simgrid::s4u::Actor;
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) { \
33 std::stringstream ss; \
34 for (const auto& elem : action) { \
37 THROWF(arg_error, 0, "%s replay failed.\n" \
38 "%zu items were given on the line. First two should be process_id and action. " \
39 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
40 "The full line that was given is:\n %s\n" \
41 "Please contact the Simgrid team if support is needed", \
42 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional), ss.str().c_str()); \
46 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
48 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
49 std::string s = boost::algorithm::join(action, " ");
50 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
54 static std::vector<MPI_Request>* get_reqq_self()
56 return reqq.at(simgrid::s4u::this_actor::get_pid());
59 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
61 reqq.insert({simgrid::s4u::this_actor::get_pid(), mpi_request});
65 static double parse_double(std::string string)
67 return xbt_str_parse_double(string.c_str(), "%s is not a double");
74 class ActionArgParser {
76 virtual ~ActionArgParser() = default;
77 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
80 class SendRecvParser : public ActionArgParser {
82 /* communication partner; if we send, this is the receiver and vice versa */
86 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
88 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
90 CHECK_ACTION_PARAMS(action, 3, 1)
91 partner = std::stoi(action[2]);
92 tag = std::stoi(action[3]);
93 size = parse_double(action[4]);
94 if (action.size() > 5)
95 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
99 class ComputeParser : public ActionArgParser {
101 /* communication partner; if we send, this is the receiver and vice versa */
104 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
106 CHECK_ACTION_PARAMS(action, 1, 0)
107 flops = parse_double(action[2]);
111 class CollCommParser : public ActionArgParser {
119 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
120 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
123 class BcastArgParser : public CollCommParser {
125 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
127 CHECK_ACTION_PARAMS(action, 1, 2)
128 size = parse_double(action[2]);
129 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
130 if (action.size() > 4)
131 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
135 class ReduceArgParser : public CollCommParser {
137 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
139 CHECK_ACTION_PARAMS(action, 2, 2)
140 comm_size = parse_double(action[2]);
141 comp_size = parse_double(action[3]);
142 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
143 if (action.size() > 5)
144 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
148 class AllReduceArgParser : public CollCommParser {
150 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
152 CHECK_ACTION_PARAMS(action, 2, 1)
153 comm_size = parse_double(action[2]);
154 comp_size = parse_double(action[3]);
155 if (action.size() > 4)
156 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
160 class AllToAllArgParser : public CollCommParser {
162 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
164 CHECK_ACTION_PARAMS(action, 2, 1)
165 comm_size = MPI_COMM_WORLD->size();
166 send_size = parse_double(action[2]);
167 recv_size = parse_double(action[3]);
169 if (action.size() > 4)
170 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
171 if (action.size() > 5)
172 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
176 class GatherArgParser : public CollCommParser {
178 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
180 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
183 1) 68 is the sendcounts
184 2) 68 is the recvcounts
185 3) 0 is the root node
186 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
187 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
189 CHECK_ACTION_PARAMS(action, 2, 3)
190 comm_size = MPI_COMM_WORLD->size();
191 send_size = parse_double(action[2]);
192 recv_size = parse_double(action[3]);
194 if (name == "gather") {
195 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
196 if (action.size() > 5)
197 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
198 if (action.size() > 6)
199 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
202 if (action.size() > 4)
203 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
204 if (action.size() > 5)
205 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
210 class GatherVArgParser : public CollCommParser {
213 std::shared_ptr<std::vector<int>> recvcounts;
214 std::vector<int> disps;
215 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
217 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
218 0 gather 68 68 10 10 10 0 0 0
220 1) 68 is the sendcount
221 2) 68 10 10 10 is the recvcounts
222 3) 0 is the root node
223 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
224 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
226 comm_size = MPI_COMM_WORLD->size();
227 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
228 send_size = parse_double(action[2]);
229 disps = std::vector<int>(comm_size, 0);
230 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
232 if (name == "gatherV") {
233 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
234 if (action.size() > 4 + comm_size)
235 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
236 if (action.size() > 5 + comm_size)
237 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
240 int datatype_index = 0;
242 /* The 3 comes from "0 gather <sendcount>", which must always be present.
243 * The + comm_size is the recvcounts array, which must also be present
245 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
246 datatype_index = 3 + comm_size;
247 disp_index = datatype_index + 1;
248 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
249 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
250 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
251 disp_index = 3 + comm_size;
252 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
253 datatype_index = 3 + comm_size;
254 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
255 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
258 if (disp_index != 0) {
259 for (unsigned int i = 0; i < comm_size; i++)
260 disps[i] = std::stoi(action[disp_index + i]);
264 for (unsigned int i = 0; i < comm_size; i++) {
265 (*recvcounts)[i] = std::stoi(action[i + 3]);
267 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
271 class ScatterArgParser : public CollCommParser {
273 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
275 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
278 1) 68 is the sendcounts
279 2) 68 is the recvcounts
280 3) 0 is the root node
281 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
282 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
284 CHECK_ACTION_PARAMS(action, 2, 3)
285 comm_size = MPI_COMM_WORLD->size();
286 send_size = parse_double(action[2]);
287 recv_size = parse_double(action[3]);
288 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
289 if (action.size() > 5)
290 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
291 if (action.size() > 6)
292 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
296 class ScatterVArgParser : public CollCommParser {
300 std::shared_ptr<std::vector<int>> sendcounts;
301 std::vector<int> disps;
302 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
304 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
305 0 gather 68 10 10 10 68 0 0 0
307 1) 68 10 10 10 is the sendcounts
308 2) 68 is the recvcount
309 3) 0 is the root node
310 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
311 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
313 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
314 recv_size = parse_double(action[2 + comm_size]);
315 disps = std::vector<int>(comm_size, 0);
316 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
318 if (action.size() > 5 + comm_size)
319 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
320 if (action.size() > 5 + comm_size)
321 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
323 for (unsigned int i = 0; i < comm_size; i++) {
324 (*sendcounts)[i] = std::stoi(action[i + 2]);
326 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
327 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
331 class ReduceScatterArgParser : public CollCommParser {
334 std::shared_ptr<std::vector<int>> recvcounts;
335 std::vector<int> disps;
336 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
338 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
339 0 reduceScatter 275427 275427 275427 204020 11346849 0
341 1) The first four values after the name of the action declare the recvcounts array
342 2) The value 11346849 is the amount of instructions
343 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
345 comm_size = MPI_COMM_WORLD->size();
346 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
347 comp_size = parse_double(action[2+comm_size]);
348 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
349 if (action.size() > 3 + comm_size)
350 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
352 for (unsigned int i = 0; i < comm_size; i++) {
353 recvcounts->push_back(std::stoi(action[i + 2]));
355 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
359 class AllToAllVArgParser : public CollCommParser {
363 std::shared_ptr<std::vector<int>> recvcounts;
364 std::shared_ptr<std::vector<int>> sendcounts;
365 std::vector<int> senddisps;
366 std::vector<int> recvdisps;
369 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
371 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
372 0 allToAllV 100 1 7 10 12 100 1 70 10 5
374 1) 100 is the size of the send buffer *sizeof(int),
375 2) 1 7 10 12 is the sendcounts array
376 3) 100*sizeof(int) is the size of the receiver buffer
377 4) 1 70 10 5 is the recvcounts array
379 comm_size = MPI_COMM_WORLD->size();
380 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
381 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
382 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
383 senddisps = std::vector<int>(comm_size, 0);
384 recvdisps = std::vector<int>(comm_size, 0);
386 if (action.size() > 5 + 2 * comm_size)
387 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
388 if (action.size() > 5 + 2 * comm_size)
389 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
391 send_buf_size=parse_double(action[2]);
392 recv_buf_size=parse_double(action[3+comm_size]);
393 for (unsigned int i = 0; i < comm_size; i++) {
394 (*sendcounts)[i] = std::stoi(action[3 + i]);
395 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
397 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
398 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
402 template <class T> class ReplayAction {
404 const std::string name;
405 const int my_proc_id;
409 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
410 virtual ~ReplayAction() = default;
412 virtual void execute(simgrid::xbt::ReplayAction& action)
414 // Needs to be re-initialized for every action, hence here
415 double start_time = smpi_process()->simulated_elapsed();
416 args.parse(action, name);
419 log_timed_action(action, start_time);
422 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
424 void* send_buffer(int size)
426 return smpi_get_tmp_sendbuffer(size);
429 void* recv_buffer(int size)
431 return smpi_get_tmp_recvbuffer(size);
435 class WaitAction : public ReplayAction<ActionArgParser> {
437 WaitAction() : ReplayAction("Wait") {}
438 void kernel(simgrid::xbt::ReplayAction& action) override
440 std::string s = boost::algorithm::join(action, " ");
441 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
442 MPI_Request request = get_reqq_self()->back();
443 get_reqq_self()->pop_back();
445 if (request == nullptr) {
446 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
451 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
453 // Must be taken before Request::wait() since the request may be set to
454 // MPI_REQUEST_NULL by Request::wait!
455 int src = request->comm()->group()->rank(request->src());
456 int dst = request->comm()->group()->rank(request->dst());
457 int tag = request->tag();
458 bool is_wait_for_receive = (request->flags() & RECV);
459 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
460 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
463 Request::wait(&request, &status);
465 TRACE_smpi_comm_out(rank);
466 if (is_wait_for_receive)
467 TRACE_smpi_recv(src, dst, tag);
471 class SendAction : public ReplayAction<SendRecvParser> {
473 SendAction() = delete;
474 explicit SendAction(std::string name) : ReplayAction(name) {}
475 void kernel(simgrid::xbt::ReplayAction& action) override
477 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
479 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
480 args.tag, Datatype::encode(args.datatype1)));
481 if (not TRACE_smpi_view_internals())
482 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
484 if (name == "send") {
485 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
486 } else if (name == "Isend") {
487 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
488 get_reqq_self()->push_back(request);
490 xbt_die("Don't know this action, %s", name.c_str());
493 TRACE_smpi_comm_out(my_proc_id);
497 class RecvAction : public ReplayAction<SendRecvParser> {
499 RecvAction() = delete;
500 explicit RecvAction(std::string name) : ReplayAction(name) {}
501 void kernel(simgrid::xbt::ReplayAction& action) override
503 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
505 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
506 args.tag, Datatype::encode(args.datatype1)));
509 // unknown size from the receiver point of view
510 if (args.size <= 0.0) {
511 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
512 args.size = status.count;
515 if (name == "recv") {
516 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
517 } else if (name == "Irecv") {
518 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
519 get_reqq_self()->push_back(request);
522 TRACE_smpi_comm_out(my_proc_id);
523 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
524 if (name == "recv" && not TRACE_smpi_view_internals()) {
525 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
530 class ComputeAction : public ReplayAction<ComputeParser> {
532 ComputeAction() : ReplayAction("compute") {}
533 void kernel(simgrid::xbt::ReplayAction& action) override
535 TRACE_smpi_computing_in(my_proc_id, args.flops);
536 smpi_execute_flops(args.flops);
537 TRACE_smpi_computing_out(my_proc_id);
541 class TestAction : public ReplayAction<ActionArgParser> {
543 TestAction() : ReplayAction("Test") {}
544 void kernel(simgrid::xbt::ReplayAction& action) override
546 MPI_Request request = get_reqq_self()->back();
547 get_reqq_self()->pop_back();
548 // if request is null here, this may mean that a previous test has succeeded
549 // Different times in traced application and replayed version may lead to this
550 // In this case, ignore the extra calls.
551 if (request != nullptr) {
552 TRACE_smpi_testing_in(my_proc_id);
555 int flag = Request::test(&request, &status);
557 XBT_DEBUG("MPI_Test result: %d", flag);
558 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
560 get_reqq_self()->push_back(request);
562 TRACE_smpi_testing_out(my_proc_id);
567 class InitAction : public ReplayAction<ActionArgParser> {
569 InitAction() : ReplayAction("Init") {}
570 void kernel(simgrid::xbt::ReplayAction& action) override
572 CHECK_ACTION_PARAMS(action, 0, 1)
573 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
574 : MPI_BYTE; // default TAU datatype
576 /* start a simulated timer */
577 smpi_process()->simulated_start();
578 set_reqq_self(new std::vector<MPI_Request>);
582 class CommunicatorAction : public ReplayAction<ActionArgParser> {
584 CommunicatorAction() : ReplayAction("Comm") {}
585 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
588 class WaitAllAction : public ReplayAction<ActionArgParser> {
590 WaitAllAction() : ReplayAction("waitAll") {}
591 void kernel(simgrid::xbt::ReplayAction& action) override
593 const unsigned int count_requests = get_reqq_self()->size();
595 if (count_requests > 0) {
596 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
597 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
598 for (const auto& req : (*get_reqq_self())) {
599 if (req && (req->flags() & RECV)) {
600 sender_receiver.push_back({req->src(), req->dst()});
603 MPI_Status status[count_requests];
604 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
606 for (auto& pair : sender_receiver) {
607 TRACE_smpi_recv(pair.first, pair.second, 0);
609 TRACE_smpi_comm_out(my_proc_id);
614 class BarrierAction : public ReplayAction<ActionArgParser> {
616 BarrierAction() : ReplayAction("barrier") {}
617 void kernel(simgrid::xbt::ReplayAction& action) override
619 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
620 Colls::barrier(MPI_COMM_WORLD);
621 TRACE_smpi_comm_out(my_proc_id);
625 class BcastAction : public ReplayAction<BcastArgParser> {
627 BcastAction() : ReplayAction("bcast") {}
628 void kernel(simgrid::xbt::ReplayAction& action) override
630 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
631 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
632 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
634 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
636 TRACE_smpi_comm_out(my_proc_id);
640 class ReduceAction : public ReplayAction<ReduceArgParser> {
642 ReduceAction() : ReplayAction("reduce") {}
643 void kernel(simgrid::xbt::ReplayAction& action) override
645 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
646 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
647 args.comp_size, args.comm_size, -1,
648 Datatype::encode(args.datatype1), ""));
650 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
651 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
652 smpi_execute_flops(args.comp_size);
654 TRACE_smpi_comm_out(my_proc_id);
658 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
660 AllReduceAction() : ReplayAction("allReduce") {}
661 void kernel(simgrid::xbt::ReplayAction& action) override
663 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
664 Datatype::encode(args.datatype1), ""));
666 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
667 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
668 smpi_execute_flops(args.comp_size);
670 TRACE_smpi_comm_out(my_proc_id);
674 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
676 AllToAllAction() : ReplayAction("allToAll") {}
677 void kernel(simgrid::xbt::ReplayAction& action) override
679 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
680 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
681 Datatype::encode(args.datatype1),
682 Datatype::encode(args.datatype2)));
684 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
685 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
686 args.recv_size, args.datatype2, MPI_COMM_WORLD);
688 TRACE_smpi_comm_out(my_proc_id);
692 class GatherAction : public ReplayAction<GatherArgParser> {
694 explicit GatherAction(std::string name) : ReplayAction(name) {}
695 void kernel(simgrid::xbt::ReplayAction& action) override
697 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
698 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
700 if (name == "gather") {
701 int rank = MPI_COMM_WORLD->rank();
702 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
703 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
706 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
707 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
709 TRACE_smpi_comm_out(my_proc_id);
713 class GatherVAction : public ReplayAction<GatherVArgParser> {
715 explicit GatherVAction(std::string name) : ReplayAction(name) {}
716 void kernel(simgrid::xbt::ReplayAction& action) override
718 int rank = MPI_COMM_WORLD->rank();
720 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
721 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
722 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
724 if (name == "gatherV") {
725 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
726 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
727 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
730 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
731 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
732 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
735 TRACE_smpi_comm_out(my_proc_id);
739 class ScatterAction : public ReplayAction<ScatterArgParser> {
741 ScatterAction() : ReplayAction("scatter") {}
742 void kernel(simgrid::xbt::ReplayAction& action) override
744 int rank = MPI_COMM_WORLD->rank();
745 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
746 Datatype::encode(args.datatype1),
747 Datatype::encode(args.datatype2)));
749 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
750 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
752 TRACE_smpi_comm_out(my_proc_id);
757 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
759 ScatterVAction() : ReplayAction("scatterV") {}
760 void kernel(simgrid::xbt::ReplayAction& action) override
762 int rank = MPI_COMM_WORLD->rank();
763 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
764 nullptr, Datatype::encode(args.datatype1),
765 Datatype::encode(args.datatype2)));
767 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
768 args.sendcounts->data(), args.disps.data(), args.datatype1,
769 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
772 TRACE_smpi_comm_out(my_proc_id);
776 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
778 ReduceScatterAction() : ReplayAction("reduceScatter") {}
779 void kernel(simgrid::xbt::ReplayAction& action) override
781 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
782 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
783 std::to_string(args.comp_size), /* ugly hack to print comp_size */
784 Datatype::encode(args.datatype1)));
786 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
787 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
788 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
790 smpi_execute_flops(args.comp_size);
791 TRACE_smpi_comm_out(my_proc_id);
795 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
797 AllToAllVAction() : ReplayAction("allToAllV") {}
798 void kernel(simgrid::xbt::ReplayAction& action) override
800 TRACE_smpi_comm_in(my_proc_id, __func__,
801 new simgrid::instr::VarCollTIData(
802 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
803 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
805 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
806 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
808 TRACE_smpi_comm_out(my_proc_id);
811 } // Replay Namespace
812 }} // namespace simgrid::smpi
814 /** @brief Only initialize the replay, don't do it for real */
815 void smpi_replay_init(int* argc, char*** argv)
817 simgrid::smpi::Process::init(argc, argv);
818 smpi_process()->mark_as_initialized();
819 smpi_process()->set_replaying(true);
821 int my_proc_id = simgrid::s4u::this_actor::get_pid();
822 TRACE_smpi_init(my_proc_id);
823 TRACE_smpi_computing_init(my_proc_id);
824 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
825 TRACE_smpi_comm_out(my_proc_id);
826 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
827 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
828 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
829 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
830 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
832 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
833 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
834 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
835 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
836 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
837 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
838 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
839 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
840 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
841 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
842 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
843 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
844 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
845 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
846 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
847 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
848 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
849 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
850 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
851 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
852 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
854 //if we have a delayed start, sleep here.
856 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
857 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
858 smpi_execute_flops(value);
860 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
861 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
862 smpi_execute_flops(0.0);
866 /** @brief actually run the replay after initialization */
867 void smpi_replay_main(int* argc, char*** argv)
869 static int active_processes = 0;
871 simgrid::xbt::replay_runner(*argc, *argv);
873 /* and now, finalize everything */
874 /* One active process will stop. Decrease the counter*/
875 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
876 if (not get_reqq_self()->empty()) {
877 unsigned int count_requests=get_reqq_self()->size();
878 MPI_Request requests[count_requests];
879 MPI_Status status[count_requests];
882 for (auto const& req : *get_reqq_self()) {
886 simgrid::smpi::Request::waitall(count_requests, requests, status);
888 delete get_reqq_self();
891 if(active_processes==0){
892 /* Last process alive speaking: end the simulated timer */
893 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
894 smpi_free_replay_tmp_buffers();
897 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
898 new simgrid::instr::NoOpTIData("finalize"));
900 smpi_process()->finalize();
902 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
903 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
906 /** @brief chain a replay initialization and a replay start */
907 void smpi_replay_run(int* argc, char*** argv)
909 smpi_replay_init(argc, argv);
910 smpi_replay_main(argc, argv);