1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%zu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional)); \
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43 std::string s = boost::algorithm::join(action, " ");
44 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
48 static std::vector<MPI_Request>* get_reqq_self()
50 return reqq.at(simgrid::s4u::this_actor::getPid());
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
59 static double parse_double(std::string string)
61 return xbt_str_parse_double(string.c_str(), "%s is not a double");
68 class ActionArgParser {
70 virtual ~ActionArgParser() = default;
71 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 class CollCommParser : public ActionArgParser {
111 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
115 class BcastArgParser : public CollCommParser {
117 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
119 CHECK_ACTION_PARAMS(action, 1, 2)
120 size = parse_double(action[2]);
121 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122 if (action.size() > 4)
123 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
127 class ReduceArgParser : public CollCommParser {
129 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
131 CHECK_ACTION_PARAMS(action, 2, 2)
132 comm_size = parse_double(action[2]);
133 comp_size = parse_double(action[3]);
134 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
135 if (action.size() > 5)
136 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
140 class AllReduceArgParser : public CollCommParser {
142 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
144 CHECK_ACTION_PARAMS(action, 2, 1)
145 comm_size = parse_double(action[2]);
146 comp_size = parse_double(action[3]);
147 if (action.size() > 4)
148 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
152 class AllToAllArgParser : public CollCommParser {
154 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
156 CHECK_ACTION_PARAMS(action, 2, 1)
157 comm_size = MPI_COMM_WORLD->size();
158 send_size = parse_double(action[2]);
159 recv_size = parse_double(action[3]);
161 if (action.size() > 4)
162 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163 if (action.size() > 5)
164 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
168 class GatherArgParser : public CollCommParser {
170 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
172 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
175 1) 68 is the sendcounts
176 2) 68 is the recvcounts
177 3) 0 is the root node
178 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
181 CHECK_ACTION_PARAMS(action, 2, 3)
182 comm_size = MPI_COMM_WORLD->size();
183 send_size = parse_double(action[2]);
184 recv_size = parse_double(action[3]);
186 if (name == "gather") {
187 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
188 if (action.size() > 5)
189 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190 if (action.size() > 6)
191 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
194 if (action.size() > 4)
195 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 if (action.size() > 5)
197 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
202 class GatherVArgParser : public CollCommParser {
205 std::shared_ptr<std::vector<int>> recvcounts;
206 std::vector<int> disps;
207 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
209 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210 0 gather 68 68 10 10 10 0 0 0
212 1) 68 is the sendcount
213 2) 68 10 10 10 is the recvcounts
214 3) 0 is the root node
215 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
218 comm_size = MPI_COMM_WORLD->size();
219 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220 send_size = parse_double(action[2]);
221 disps = std::vector<int>(comm_size, 0);
222 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
224 if (name == "gatherV") {
225 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226 if (action.size() > 4 + comm_size)
227 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228 if (action.size() > 5 + comm_size)
229 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
232 int datatype_index = 0;
234 /* The 3 comes from "0 gather <sendcount>", which must always be present.
235 * The + comm_size is the recvcounts array, which must also be present
237 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
238 datatype_index = 3 + comm_size;
239 disp_index = datatype_index + 1;
240 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
241 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
242 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
243 disp_index = 3 + comm_size;
244 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
245 datatype_index = 3 + comm_size;
246 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
247 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
250 if (disp_index != 0) {
251 for (unsigned int i = 0; i < comm_size; i++)
252 disps[i] = std::stoi(action[disp_index + i]);
256 for (unsigned int i = 0; i < comm_size; i++) {
257 (*recvcounts)[i] = std::stoi(action[i + 3]);
259 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
263 class ScatterArgParser : public CollCommParser {
265 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
267 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
270 1) 68 is the sendcounts
271 2) 68 is the recvcounts
272 3) 0 is the root node
273 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
274 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
276 CHECK_ACTION_PARAMS(action, 2, 3)
277 comm_size = MPI_COMM_WORLD->size();
278 send_size = parse_double(action[2]);
279 recv_size = parse_double(action[3]);
280 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
281 if (action.size() > 5)
282 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
283 if (action.size() > 6)
284 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
288 class ScatterVArgParser : public CollCommParser {
292 std::shared_ptr<std::vector<int>> sendcounts;
293 std::vector<int> disps;
294 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
296 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
297 0 gather 68 10 10 10 68 0 0 0
299 1) 68 10 10 10 is the sendcounts
300 2) 68 is the recvcount
301 3) 0 is the root node
302 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
306 recv_size = parse_double(action[2 + comm_size]);
307 disps = std::vector<int>(comm_size, 0);
308 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
310 if (action.size() > 5 + comm_size)
311 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
312 if (action.size() > 5 + comm_size)
313 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
315 for (unsigned int i = 0; i < comm_size; i++) {
316 (*sendcounts)[i] = std::stoi(action[i + 2]);
318 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
319 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
323 class ReduceScatterArgParser : public CollCommParser {
326 std::shared_ptr<std::vector<int>> recvcounts;
327 std::vector<int> disps;
328 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
330 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
331 0 reduceScatter 275427 275427 275427 204020 11346849 0
333 1) The first four values after the name of the action declare the recvcounts array
334 2) The value 11346849 is the amount of instructions
335 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
337 comm_size = MPI_COMM_WORLD->size();
338 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
339 comp_size = parse_double(action[2+comm_size]);
340 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
341 if (action.size() > 3 + comm_size)
342 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
344 for (unsigned int i = 0; i < comm_size; i++) {
345 recvcounts->push_back(std::stoi(action[i + 2]));
347 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
351 class AllToAllVArgParser : public CollCommParser {
355 std::shared_ptr<std::vector<int>> recvcounts;
356 std::shared_ptr<std::vector<int>> sendcounts;
357 std::vector<int> senddisps;
358 std::vector<int> recvdisps;
361 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
363 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
364 0 allToAllV 100 1 7 10 12 100 1 70 10 5
366 1) 100 is the size of the send buffer *sizeof(int),
367 2) 1 7 10 12 is the sendcounts array
368 3) 100*sizeof(int) is the size of the receiver buffer
369 4) 1 70 10 5 is the recvcounts array
371 comm_size = MPI_COMM_WORLD->size();
372 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
373 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
374 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
375 senddisps = std::vector<int>(comm_size, 0);
376 recvdisps = std::vector<int>(comm_size, 0);
378 if (action.size() > 5 + 2 * comm_size)
379 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
380 if (action.size() > 5 + 2 * comm_size)
381 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
383 send_buf_size=parse_double(action[2]);
384 recv_buf_size=parse_double(action[3+comm_size]);
385 for (unsigned int i = 0; i < comm_size; i++) {
386 (*sendcounts)[i] = std::stoi(action[3 + i]);
387 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
389 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
390 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
394 template <class T> class ReplayAction {
396 const std::string name;
397 const int my_proc_id;
401 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
402 virtual ~ReplayAction() = default;
404 virtual void execute(simgrid::xbt::ReplayAction& action)
406 // Needs to be re-initialized for every action, hence here
407 double start_time = smpi_process()->simulated_elapsed();
408 args.parse(action, name);
411 log_timed_action(action, start_time);
414 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
416 void* send_buffer(int size)
418 return smpi_get_tmp_sendbuffer(size);
421 void* recv_buffer(int size)
423 return smpi_get_tmp_recvbuffer(size);
427 class WaitAction : public ReplayAction<ActionArgParser> {
429 WaitAction() : ReplayAction("Wait") {}
430 void kernel(simgrid::xbt::ReplayAction& action) override
432 std::string s = boost::algorithm::join(action, " ");
433 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
434 MPI_Request request = get_reqq_self()->back();
435 get_reqq_self()->pop_back();
437 if (request == nullptr) {
438 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
443 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
445 // Must be taken before Request::wait() since the request may be set to
446 // MPI_REQUEST_NULL by Request::wait!
447 int src = request->comm()->group()->rank(request->src());
448 int dst = request->comm()->group()->rank(request->dst());
449 bool is_wait_for_receive = (request->flags() & RECV);
450 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
451 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
454 Request::wait(&request, &status);
456 TRACE_smpi_comm_out(rank);
457 if (is_wait_for_receive)
458 TRACE_smpi_recv(src, dst, 0);
462 class SendAction : public ReplayAction<SendRecvParser> {
464 SendAction() = delete;
465 explicit SendAction(std::string name) : ReplayAction(name) {}
466 void kernel(simgrid::xbt::ReplayAction& action) override
468 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
470 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
471 Datatype::encode(args.datatype1)));
472 if (not TRACE_smpi_view_internals())
473 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
475 if (name == "send") {
476 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
477 } else if (name == "Isend") {
478 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
479 get_reqq_self()->push_back(request);
481 xbt_die("Don't know this action, %s", name.c_str());
484 TRACE_smpi_comm_out(my_proc_id);
488 class RecvAction : public ReplayAction<SendRecvParser> {
490 RecvAction() = delete;
491 explicit RecvAction(std::string name) : ReplayAction(name) {}
492 void kernel(simgrid::xbt::ReplayAction& action) override
494 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
496 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
497 Datatype::encode(args.datatype1)));
500 // unknown size from the receiver point of view
501 if (args.size <= 0.0) {
502 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
503 args.size = status.count;
506 if (name == "recv") {
507 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
508 } else if (name == "Irecv") {
509 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
510 get_reqq_self()->push_back(request);
513 TRACE_smpi_comm_out(my_proc_id);
514 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
515 if (name == "recv" && not TRACE_smpi_view_internals()) {
516 TRACE_smpi_recv(src_traced, my_proc_id, 0);
521 class ComputeAction : public ReplayAction<ComputeParser> {
523 ComputeAction() : ReplayAction("compute") {}
524 void kernel(simgrid::xbt::ReplayAction& action) override
526 TRACE_smpi_computing_in(my_proc_id, args.flops);
527 smpi_execute_flops(args.flops);
528 TRACE_smpi_computing_out(my_proc_id);
532 class TestAction : public ReplayAction<ActionArgParser> {
534 TestAction() : ReplayAction("Test") {}
535 void kernel(simgrid::xbt::ReplayAction& action) override
537 MPI_Request request = get_reqq_self()->back();
538 get_reqq_self()->pop_back();
539 // if request is null here, this may mean that a previous test has succeeded
540 // Different times in traced application and replayed version may lead to this
541 // In this case, ignore the extra calls.
542 if (request != nullptr) {
543 TRACE_smpi_testing_in(my_proc_id);
546 int flag = Request::test(&request, &status);
548 XBT_DEBUG("MPI_Test result: %d", flag);
549 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
551 get_reqq_self()->push_back(request);
553 TRACE_smpi_testing_out(my_proc_id);
558 class InitAction : public ReplayAction<ActionArgParser> {
560 InitAction() : ReplayAction("Init") {}
561 void kernel(simgrid::xbt::ReplayAction& action) override
563 CHECK_ACTION_PARAMS(action, 0, 1)
564 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
565 : MPI_BYTE; // default TAU datatype
567 /* start a simulated timer */
568 smpi_process()->simulated_start();
569 /*initialize the number of active processes */
570 active_processes = smpi_process_count();
572 set_reqq_self(new std::vector<MPI_Request>);
576 class CommunicatorAction : public ReplayAction<ActionArgParser> {
578 CommunicatorAction() : ReplayAction("Comm") {}
579 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
582 class WaitAllAction : public ReplayAction<ActionArgParser> {
584 WaitAllAction() : ReplayAction("waitAll") {}
585 void kernel(simgrid::xbt::ReplayAction& action) override
587 const unsigned int count_requests = get_reqq_self()->size();
589 if (count_requests > 0) {
590 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
591 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
592 for (const auto& req : (*get_reqq_self())) {
593 if (req && (req->flags() & RECV)) {
594 sender_receiver.push_back({req->src(), req->dst()});
597 MPI_Status status[count_requests];
598 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
600 for (auto& pair : sender_receiver) {
601 TRACE_smpi_recv(pair.first, pair.second, 0);
603 TRACE_smpi_comm_out(my_proc_id);
608 class BarrierAction : public ReplayAction<ActionArgParser> {
610 BarrierAction() : ReplayAction("barrier") {}
611 void kernel(simgrid::xbt::ReplayAction& action) override
613 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
614 Colls::barrier(MPI_COMM_WORLD);
615 TRACE_smpi_comm_out(my_proc_id);
619 class BcastAction : public ReplayAction<BcastArgParser> {
621 BcastAction() : ReplayAction("bcast") {}
622 void kernel(simgrid::xbt::ReplayAction& action) override
624 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
625 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
626 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
628 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
630 TRACE_smpi_comm_out(my_proc_id);
634 class ReduceAction : public ReplayAction<ReduceArgParser> {
636 ReduceAction() : ReplayAction("reduce") {}
637 void kernel(simgrid::xbt::ReplayAction& action) override
639 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
640 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
641 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
643 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
644 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
645 smpi_execute_flops(args.comp_size);
647 TRACE_smpi_comm_out(my_proc_id);
651 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
653 AllReduceAction() : ReplayAction("allReduce") {}
654 void kernel(simgrid::xbt::ReplayAction& action) override
656 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
657 Datatype::encode(args.datatype1), ""));
659 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
660 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
661 smpi_execute_flops(args.comp_size);
663 TRACE_smpi_comm_out(my_proc_id);
667 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
669 AllToAllAction() : ReplayAction("allToAll") {}
670 void kernel(simgrid::xbt::ReplayAction& action) override
672 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
673 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
674 Datatype::encode(args.datatype1),
675 Datatype::encode(args.datatype2)));
677 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
678 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
679 args.recv_size, args.datatype2, MPI_COMM_WORLD);
681 TRACE_smpi_comm_out(my_proc_id);
685 class GatherAction : public ReplayAction<GatherArgParser> {
687 explicit GatherAction(std::string name) : ReplayAction(name) {}
688 void kernel(simgrid::xbt::ReplayAction& action) override
690 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
691 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
693 if (name == "gather") {
694 int rank = MPI_COMM_WORLD->rank();
695 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
696 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
699 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
702 TRACE_smpi_comm_out(my_proc_id);
706 class GatherVAction : public ReplayAction<GatherVArgParser> {
708 explicit GatherVAction(std::string name) : ReplayAction(name) {}
709 void kernel(simgrid::xbt::ReplayAction& action) override
711 int rank = MPI_COMM_WORLD->rank();
713 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
714 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
715 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
717 if (name == "gatherV") {
718 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
719 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
720 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
723 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
724 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
725 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
728 TRACE_smpi_comm_out(my_proc_id);
732 class ScatterAction : public ReplayAction<ScatterArgParser> {
734 ScatterAction() : ReplayAction("scatter") {}
735 void kernel(simgrid::xbt::ReplayAction& action) override
737 int rank = MPI_COMM_WORLD->rank();
738 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
739 Datatype::encode(args.datatype1),
740 Datatype::encode(args.datatype2)));
742 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
743 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
745 TRACE_smpi_comm_out(my_proc_id);
750 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
752 ScatterVAction() : ReplayAction("scatterV") {}
753 void kernel(simgrid::xbt::ReplayAction& action) override
755 int rank = MPI_COMM_WORLD->rank();
756 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
757 nullptr, Datatype::encode(args.datatype1),
758 Datatype::encode(args.datatype2)));
760 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
761 args.sendcounts->data(), args.disps.data(), args.datatype1,
762 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
765 TRACE_smpi_comm_out(my_proc_id);
769 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
771 ReduceScatterAction() : ReplayAction("reduceScatter") {}
772 void kernel(simgrid::xbt::ReplayAction& action) override
774 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
775 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
776 std::to_string(args.comp_size), /* ugly hack to print comp_size */
777 Datatype::encode(args.datatype1)));
779 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
780 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
781 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
783 smpi_execute_flops(args.comp_size);
784 TRACE_smpi_comm_out(my_proc_id);
788 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
790 AllToAllVAction() : ReplayAction("allToAllV") {}
791 void kernel(simgrid::xbt::ReplayAction& action) override
793 TRACE_smpi_comm_in(my_proc_id, __func__,
794 new simgrid::instr::VarCollTIData(
795 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
796 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
798 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
799 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
801 TRACE_smpi_comm_out(my_proc_id);
804 } // Replay Namespace
805 }} // namespace simgrid::smpi
807 /** @brief Only initialize the replay, don't do it for real */
808 void smpi_replay_init(int* argc, char*** argv)
810 simgrid::smpi::Process::init(argc, argv);
811 smpi_process()->mark_as_initialized();
812 smpi_process()->set_replaying(true);
814 int my_proc_id = simgrid::s4u::this_actor::getPid();
815 TRACE_smpi_init(my_proc_id);
816 TRACE_smpi_computing_init(my_proc_id);
817 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
818 TRACE_smpi_comm_out(my_proc_id);
819 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
820 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
821 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
823 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
825 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
826 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
827 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
828 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
829 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
830 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
831 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
832 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
833 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
834 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
835 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
836 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
837 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
838 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
839 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
840 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
841 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
842 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
843 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
844 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
845 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
847 //if we have a delayed start, sleep here.
849 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
850 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
851 smpi_execute_flops(value);
853 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
854 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
855 smpi_execute_flops(0.0);
859 /** @brief actually run the replay after initialization */
860 void smpi_replay_main(int* argc, char*** argv)
862 simgrid::xbt::replay_runner(*argc, *argv);
864 /* and now, finalize everything */
865 /* One active process will stop. Decrease the counter*/
866 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
867 if (not get_reqq_self()->empty()) {
868 unsigned int count_requests=get_reqq_self()->size();
869 MPI_Request requests[count_requests];
870 MPI_Status status[count_requests];
873 for (auto const& req : *get_reqq_self()) {
877 simgrid::smpi::Request::waitall(count_requests, requests, status);
879 delete get_reqq_self();
882 if(active_processes==0){
883 /* Last process alive speaking: end the simulated timer */
884 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
885 smpi_free_replay_tmp_buffers();
888 TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
889 new simgrid::instr::NoOpTIData("finalize"));
891 smpi_process()->finalize();
893 TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
894 TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
897 /** @brief chain a replay initialization and a replay start */
898 void smpi_replay_run(int* argc, char*** argv)
900 smpi_replay_init(argc, argv);
901 smpi_replay_main(argc, argv);