1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%zu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional)); \
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43 std::string s = boost::algorithm::join(action, " ");
44 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
48 static std::vector<MPI_Request>* get_reqq_self()
50 return reqq.at(simgrid::s4u::this_actor::getPid());
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
59 static double parse_double(std::string string)
61 return xbt_str_parse_double(string.c_str(), "%s is not a double");
68 class ActionArgParser {
70 virtual ~ActionArgParser() = default;
71 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 class CollCommParser : public ActionArgParser {
111 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
115 class BcastArgParser : public CollCommParser {
117 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
119 CHECK_ACTION_PARAMS(action, 1, 2)
120 size = parse_double(action[2]);
121 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122 if (action.size() > 4)
123 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
127 class ReduceArgParser : public CollCommParser {
129 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
131 CHECK_ACTION_PARAMS(action, 2, 2)
132 comm_size = parse_double(action[2]);
133 comp_size = parse_double(action[3]);
134 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
135 if (action.size() > 5)
136 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
140 class AllReduceArgParser : public CollCommParser {
142 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
144 CHECK_ACTION_PARAMS(action, 2, 1)
145 comm_size = parse_double(action[2]);
146 comp_size = parse_double(action[3]);
147 if (action.size() > 4)
148 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
152 class AllToAllArgParser : public CollCommParser {
154 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
156 CHECK_ACTION_PARAMS(action, 2, 1)
157 comm_size = MPI_COMM_WORLD->size();
158 send_size = parse_double(action[2]);
159 recv_size = parse_double(action[3]);
161 if (action.size() > 4)
162 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163 if (action.size() > 5)
164 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
168 class GatherArgParser : public CollCommParser {
170 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
172 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
175 1) 68 is the sendcounts
176 2) 68 is the recvcounts
177 3) 0 is the root node
178 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
181 CHECK_ACTION_PARAMS(action, 2, 3)
182 comm_size = MPI_COMM_WORLD->size();
183 send_size = parse_double(action[2]);
184 recv_size = parse_double(action[3]);
186 if (name == "gather") {
187 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
188 if (action.size() > 5)
189 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190 if (action.size() > 6)
191 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
194 if (action.size() > 4)
195 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 if (action.size() > 5)
197 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
202 class GatherVArgParser : public CollCommParser {
205 std::shared_ptr<std::vector<int>> recvcounts;
206 std::vector<int> disps;
207 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
209 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210 0 gather 68 68 10 10 10 0 0 0
212 1) 68 is the sendcount
213 2) 68 10 10 10 is the recvcounts
214 3) 0 is the root node
215 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
218 comm_size = MPI_COMM_WORLD->size();
219 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220 send_size = parse_double(action[2]);
221 disps = std::vector<int>(comm_size, 0);
222 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
224 if (name == "gatherV") {
225 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226 if (action.size() > 4 + comm_size)
227 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228 if (action.size() > 5 + comm_size)
229 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
232 int datatype_index = 0;
234 /* The 3 comes from "0 gather <sendcount>", which must always be present.
235 * The + comm_size is the recvcounts array, which must also be present
237 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
238 datatype_index = 3 + comm_size;
239 disp_index = datatype_index + 1;
240 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
241 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
242 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
243 disp_index = 3 + comm_size;
244 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
245 datatype_index = 3 + comm_size;
246 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
247 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
250 if (disp_index != 0) {
251 for (unsigned int i = 0; i < comm_size; i++)
252 disps[i] = std::stoi(action[disp_index + i]);
256 for (unsigned int i = 0; i < comm_size; i++) {
257 (*recvcounts)[i] = std::stoi(action[i + 3]);
259 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
263 class ScatterArgParser : public CollCommParser {
265 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
267 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
270 1) 68 is the sendcounts
271 2) 68 is the recvcounts
272 3) 0 is the root node
273 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
274 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
276 CHECK_ACTION_PARAMS(action, 2, 3)
277 comm_size = MPI_COMM_WORLD->size();
278 send_size = parse_double(action[2]);
279 recv_size = parse_double(action[3]);
280 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
281 if (action.size() > 5)
282 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
283 if (action.size() > 6)
284 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
288 class ScatterVArgParser : public CollCommParser {
292 std::shared_ptr<std::vector<int>> sendcounts;
293 std::vector<int> disps;
294 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
296 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
297 0 gather 68 10 10 10 68 0 0 0
299 1) 68 10 10 10 is the sendcounts
300 2) 68 is the recvcount
301 3) 0 is the root node
302 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
306 recv_size = parse_double(action[2 + comm_size]);
307 disps = std::vector<int>(comm_size, 0);
308 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
310 if (action.size() > 5 + comm_size)
311 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
312 if (action.size() > 5 + comm_size)
313 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
315 for (unsigned int i = 0; i < comm_size; i++) {
316 (*sendcounts)[i] = std::stoi(action[i + 2]);
318 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
319 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
323 class ReduceScatterArgParser : public CollCommParser {
326 std::shared_ptr<std::vector<int>> recvcounts;
327 std::vector<int> disps;
328 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
330 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
331 0 reduceScatter 275427 275427 275427 204020 11346849 0
333 1) The first four values after the name of the action declare the recvcounts array
334 2) The value 11346849 is the amount of instructions
335 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
337 comm_size = MPI_COMM_WORLD->size();
338 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
339 comp_size = parse_double(action[2+comm_size]);
340 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
341 if (action.size() > 3 + comm_size)
342 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
344 for (unsigned int i = 0; i < comm_size; i++) {
345 recvcounts->push_back(std::stoi(action[i + 2]));
347 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
351 class AllToAllVArgParser : public CollCommParser {
355 std::shared_ptr<std::vector<int>> recvcounts;
356 std::shared_ptr<std::vector<int>> sendcounts;
357 std::vector<int> senddisps;
358 std::vector<int> recvdisps;
361 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
363 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
364 0 allToAllV 100 1 7 10 12 100 1 70 10 5
366 1) 100 is the size of the send buffer *sizeof(int),
367 2) 1 7 10 12 is the sendcounts array
368 3) 100*sizeof(int) is the size of the receiver buffer
369 4) 1 70 10 5 is the recvcounts array
371 comm_size = MPI_COMM_WORLD->size();
372 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
373 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
374 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
375 senddisps = std::vector<int>(comm_size, 0);
376 recvdisps = std::vector<int>(comm_size, 0);
378 if (action.size() > 5 + 2 * comm_size)
379 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
380 if (action.size() > 5 + 2 * comm_size)
381 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
383 send_buf_size=parse_double(action[2]);
384 recv_buf_size=parse_double(action[3+comm_size]);
385 for (unsigned int i = 0; i < comm_size; i++) {
386 (*sendcounts)[i] = std::stoi(action[3 + i]);
387 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
389 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
390 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
394 template <class T> class ReplayAction {
396 const std::string name;
397 const int my_proc_id;
401 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
402 virtual ~ReplayAction() = default;
404 virtual void execute(simgrid::xbt::ReplayAction& action)
406 // Needs to be re-initialized for every action, hence here
407 double start_time = smpi_process()->simulated_elapsed();
408 args.parse(action, name);
411 log_timed_action(action, start_time);
414 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
416 void* send_buffer(int size)
418 return smpi_get_tmp_sendbuffer(size);
421 void* recv_buffer(int size)
423 return smpi_get_tmp_recvbuffer(size);
427 class WaitAction : public ReplayAction<ActionArgParser> {
429 WaitAction() : ReplayAction("Wait") {}
430 void kernel(simgrid::xbt::ReplayAction& action) override
432 std::string s = boost::algorithm::join(action, " ");
433 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
434 MPI_Request request = get_reqq_self()->back();
435 get_reqq_self()->pop_back();
437 if (request == nullptr) {
438 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
443 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
445 // Must be taken before Request::wait() since the request may be set to
446 // MPI_REQUEST_NULL by Request::wait!
447 int src = request->comm()->group()->rank(request->src());
448 int dst = request->comm()->group()->rank(request->dst());
449 bool is_wait_for_receive = (request->flags() & RECV);
450 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
451 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
454 Request::wait(&request, &status);
456 TRACE_smpi_comm_out(rank);
457 if (is_wait_for_receive)
458 TRACE_smpi_recv(src, dst, 0);
462 class SendAction : public ReplayAction<SendRecvParser> {
464 SendAction() = delete;
465 explicit SendAction(std::string name) : ReplayAction(name) {}
466 void kernel(simgrid::xbt::ReplayAction& action) override
468 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
470 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
471 Datatype::encode(args.datatype1)));
472 if (not TRACE_smpi_view_internals())
473 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
475 if (name == "send") {
476 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
477 } else if (name == "Isend") {
478 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
479 get_reqq_self()->push_back(request);
481 xbt_die("Don't know this action, %s", name.c_str());
484 TRACE_smpi_comm_out(my_proc_id);
488 class RecvAction : public ReplayAction<SendRecvParser> {
490 RecvAction() = delete;
491 explicit RecvAction(std::string name) : ReplayAction(name) {}
492 void kernel(simgrid::xbt::ReplayAction& action) override
494 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
496 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
497 Datatype::encode(args.datatype1)));
500 // unknown size from the receiver point of view
501 if (args.size <= 0.0) {
502 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
503 args.size = status.count;
506 if (name == "recv") {
507 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
508 } else if (name == "Irecv") {
509 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
510 get_reqq_self()->push_back(request);
513 TRACE_smpi_comm_out(my_proc_id);
514 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
515 if (name == "recv" && not TRACE_smpi_view_internals()) {
516 TRACE_smpi_recv(src_traced, my_proc_id, 0);
521 class ComputeAction : public ReplayAction<ComputeParser> {
523 ComputeAction() : ReplayAction("compute") {}
524 void kernel(simgrid::xbt::ReplayAction& action) override
526 TRACE_smpi_computing_in(my_proc_id, args.flops);
527 smpi_execute_flops(args.flops);
528 TRACE_smpi_computing_out(my_proc_id);
532 class TestAction : public ReplayAction<ActionArgParser> {
534 TestAction() : ReplayAction("Test") {}
535 void kernel(simgrid::xbt::ReplayAction& action) override
537 MPI_Request request = get_reqq_self()->back();
538 get_reqq_self()->pop_back();
539 // if request is null here, this may mean that a previous test has succeeded
540 // Different times in traced application and replayed version may lead to this
541 // In this case, ignore the extra calls.
542 if (request != nullptr) {
543 TRACE_smpi_testing_in(my_proc_id);
546 int flag = Request::test(&request, &status);
548 XBT_DEBUG("MPI_Test result: %d", flag);
549 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
551 get_reqq_self()->push_back(request);
553 TRACE_smpi_testing_out(my_proc_id);
558 class InitAction : public ReplayAction<ActionArgParser> {
560 InitAction() : ReplayAction("Init") {}
561 void kernel(simgrid::xbt::ReplayAction& action) override
563 CHECK_ACTION_PARAMS(action, 0, 1)
564 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
565 : MPI_BYTE; // default TAU datatype
567 /* start a simulated timer */
568 smpi_process()->simulated_start();
569 /*initialize the number of active processes */
570 active_processes = smpi_process_count();
572 set_reqq_self(new std::vector<MPI_Request>);
576 class CommunicatorAction : public ReplayAction<ActionArgParser> {
578 CommunicatorAction() : ReplayAction("Comm") {}
579 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
582 class WaitAllAction : public ReplayAction<ActionArgParser> {
584 WaitAllAction() : ReplayAction("waitAll") {}
585 void kernel(simgrid::xbt::ReplayAction& action) override
587 const unsigned int count_requests = get_reqq_self()->size();
589 if (count_requests > 0) {
590 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
591 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
592 for (const auto& req : (*get_reqq_self())) {
593 if (req && (req->flags() & RECV)) {
594 sender_receiver.push_back({req->src(), req->dst()});
597 MPI_Status status[count_requests];
598 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
600 for (auto& pair : sender_receiver) {
601 TRACE_smpi_recv(pair.first, pair.second, 0);
603 TRACE_smpi_comm_out(my_proc_id);
608 class BarrierAction : public ReplayAction<ActionArgParser> {
610 BarrierAction() : ReplayAction("barrier") {}
611 void kernel(simgrid::xbt::ReplayAction& action) override
613 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
614 Colls::barrier(MPI_COMM_WORLD);
615 TRACE_smpi_comm_out(my_proc_id);
619 class BcastAction : public ReplayAction<BcastArgParser> {
621 BcastAction() : ReplayAction("bcast") {}
622 void kernel(simgrid::xbt::ReplayAction& action) override
624 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
625 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
626 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
628 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
630 TRACE_smpi_comm_out(my_proc_id);
634 class ReduceAction : public ReplayAction<ReduceArgParser> {
636 ReduceAction() : ReplayAction("reduce") {}
637 void kernel(simgrid::xbt::ReplayAction& action) override
639 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
640 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
641 args.comp_size, args.comm_size, -1,
642 Datatype::encode(args.datatype1), ""));
644 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
645 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
646 smpi_execute_flops(args.comp_size);
648 TRACE_smpi_comm_out(my_proc_id);
652 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
654 AllReduceAction() : ReplayAction("allReduce") {}
655 void kernel(simgrid::xbt::ReplayAction& action) override
657 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
658 Datatype::encode(args.datatype1), ""));
660 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
661 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
662 smpi_execute_flops(args.comp_size);
664 TRACE_smpi_comm_out(my_proc_id);
668 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
670 AllToAllAction() : ReplayAction("allToAll") {}
671 void kernel(simgrid::xbt::ReplayAction& action) override
673 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
674 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
675 Datatype::encode(args.datatype1),
676 Datatype::encode(args.datatype2)));
678 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
679 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
680 args.recv_size, args.datatype2, MPI_COMM_WORLD);
682 TRACE_smpi_comm_out(my_proc_id);
686 class GatherAction : public ReplayAction<GatherArgParser> {
688 explicit GatherAction(std::string name) : ReplayAction(name) {}
689 void kernel(simgrid::xbt::ReplayAction& action) override
691 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
692 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
694 if (name == "gather") {
695 int rank = MPI_COMM_WORLD->rank();
696 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
700 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
703 TRACE_smpi_comm_out(my_proc_id);
707 class GatherVAction : public ReplayAction<GatherVArgParser> {
709 explicit GatherVAction(std::string name) : ReplayAction(name) {}
710 void kernel(simgrid::xbt::ReplayAction& action) override
712 int rank = MPI_COMM_WORLD->rank();
714 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
715 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
716 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
718 if (name == "gatherV") {
719 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
720 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
721 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
724 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
725 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
726 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
729 TRACE_smpi_comm_out(my_proc_id);
733 class ScatterAction : public ReplayAction<ScatterArgParser> {
735 ScatterAction() : ReplayAction("scatter") {}
736 void kernel(simgrid::xbt::ReplayAction& action) override
738 int rank = MPI_COMM_WORLD->rank();
739 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
740 Datatype::encode(args.datatype1),
741 Datatype::encode(args.datatype2)));
743 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
744 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
746 TRACE_smpi_comm_out(my_proc_id);
751 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
753 ScatterVAction() : ReplayAction("scatterV") {}
754 void kernel(simgrid::xbt::ReplayAction& action) override
756 int rank = MPI_COMM_WORLD->rank();
757 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
758 nullptr, Datatype::encode(args.datatype1),
759 Datatype::encode(args.datatype2)));
761 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
762 args.sendcounts->data(), args.disps.data(), args.datatype1,
763 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
766 TRACE_smpi_comm_out(my_proc_id);
770 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
772 ReduceScatterAction() : ReplayAction("reduceScatter") {}
773 void kernel(simgrid::xbt::ReplayAction& action) override
775 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
776 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
777 std::to_string(args.comp_size), /* ugly hack to print comp_size */
778 Datatype::encode(args.datatype1)));
780 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
781 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
782 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
784 smpi_execute_flops(args.comp_size);
785 TRACE_smpi_comm_out(my_proc_id);
789 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
791 AllToAllVAction() : ReplayAction("allToAllV") {}
792 void kernel(simgrid::xbt::ReplayAction& action) override
794 TRACE_smpi_comm_in(my_proc_id, __func__,
795 new simgrid::instr::VarCollTIData(
796 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
797 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
799 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
800 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
802 TRACE_smpi_comm_out(my_proc_id);
805 } // Replay Namespace
806 }} // namespace simgrid::smpi
808 /** @brief Only initialize the replay, don't do it for real */
809 void smpi_replay_init(int* argc, char*** argv)
811 simgrid::smpi::Process::init(argc, argv);
812 smpi_process()->mark_as_initialized();
813 smpi_process()->set_replaying(true);
815 int my_proc_id = simgrid::s4u::this_actor::getPid();
816 TRACE_smpi_init(my_proc_id);
817 TRACE_smpi_computing_init(my_proc_id);
818 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
819 TRACE_smpi_comm_out(my_proc_id);
820 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
821 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
822 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
823 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
824 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
826 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
827 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
828 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
829 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
830 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
831 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
832 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
833 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
834 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
835 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
836 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
837 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
838 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
839 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
840 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
841 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
842 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
843 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
844 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
845 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
846 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
848 //if we have a delayed start, sleep here.
850 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
851 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
852 smpi_execute_flops(value);
854 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
855 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
856 smpi_execute_flops(0.0);
860 /** @brief actually run the replay after initialization */
861 void smpi_replay_main(int* argc, char*** argv)
863 simgrid::xbt::replay_runner(*argc, *argv);
865 /* and now, finalize everything */
866 /* One active process will stop. Decrease the counter*/
867 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
868 if (not get_reqq_self()->empty()) {
869 unsigned int count_requests=get_reqq_self()->size();
870 MPI_Request requests[count_requests];
871 MPI_Status status[count_requests];
874 for (auto const& req : *get_reqq_self()) {
878 simgrid::smpi::Request::waitall(count_requests, requests, status);
880 delete get_reqq_self();
883 if(active_processes==0){
884 /* Last process alive speaking: end the simulated timer */
885 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
886 smpi_free_replay_tmp_buffers();
889 TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
890 new simgrid::instr::NoOpTIData("finalize"));
892 smpi_process()->finalize();
894 TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
895 TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
898 /** @brief chain a replay initialization and a replay start */
899 void smpi_replay_run(int* argc, char*** argv)
901 smpi_replay_init(argc, argv);
902 smpi_replay_main(argc, argv);