1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27 static MPI_Datatype MPI_DEFAULT_TYPE;
29 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
31 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
32 THROWF(arg_error, 0, "%s replay failed.\n" \
33 "%zu items were given on the line. First two should be process_id and action. " \
34 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
35 "Please contact the Simgrid team if support is needed", \
36 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional)); \
39 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
41 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
42 std::string s = boost::algorithm::join(action, " ");
43 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
47 static std::vector<MPI_Request>* get_reqq_self()
49 return reqq.at(simgrid::s4u::this_actor::getPid());
52 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
54 reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
58 static double parse_double(std::string string)
60 return xbt_str_parse_double(string.c_str(), "%s is not a double");
67 class ActionArgParser {
69 virtual ~ActionArgParser() = default;
70 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
73 class SendRecvParser : public ActionArgParser {
75 /* communication partner; if we send, this is the receiver and vice versa */
78 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
82 CHECK_ACTION_PARAMS(action, 2, 1)
83 partner = std::stoi(action[2]);
84 size = parse_double(action[3]);
85 if (action.size() > 4)
86 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
90 class ComputeParser : public ActionArgParser {
92 /* communication partner; if we send, this is the receiver and vice versa */
95 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
97 CHECK_ACTION_PARAMS(action, 1, 0)
98 flops = parse_double(action[2]);
102 class CollCommParser : public ActionArgParser {
110 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
111 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
114 class BcastArgParser : public CollCommParser {
116 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
118 CHECK_ACTION_PARAMS(action, 1, 2)
119 size = parse_double(action[2]);
120 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
121 if (action.size() > 4)
122 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
126 class ReduceArgParser : public CollCommParser {
128 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
130 CHECK_ACTION_PARAMS(action, 2, 2)
131 comm_size = parse_double(action[2]);
132 comp_size = parse_double(action[3]);
133 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
134 if (action.size() > 5)
135 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
139 class AllReduceArgParser : public CollCommParser {
141 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
143 CHECK_ACTION_PARAMS(action, 2, 1)
144 comm_size = parse_double(action[2]);
145 comp_size = parse_double(action[3]);
146 if (action.size() > 4)
147 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
151 class AllToAllArgParser : public CollCommParser {
153 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
155 CHECK_ACTION_PARAMS(action, 2, 1)
156 comm_size = MPI_COMM_WORLD->size();
157 send_size = parse_double(action[2]);
158 recv_size = parse_double(action[3]);
160 if (action.size() > 4)
161 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
162 if (action.size() > 5)
163 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
167 class GatherArgParser : public CollCommParser {
169 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
171 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
174 1) 68 is the sendcounts
175 2) 68 is the recvcounts
176 3) 0 is the root node
177 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
178 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
180 CHECK_ACTION_PARAMS(action, 2, 3)
181 comm_size = MPI_COMM_WORLD->size();
182 send_size = parse_double(action[2]);
183 recv_size = parse_double(action[3]);
185 if (name == "gather") {
186 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
187 if (action.size() > 5)
188 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
189 if (action.size() > 6)
190 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
193 if (action.size() > 4)
194 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
195 if (action.size() > 5)
196 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
201 class GatherVArgParser : public CollCommParser {
204 std::shared_ptr<std::vector<int>> recvcounts;
205 std::vector<int> disps;
206 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
208 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
209 0 gather 68 68 10 10 10 0 0 0
211 1) 68 is the sendcount
212 2) 68 10 10 10 is the recvcounts
213 3) 0 is the root node
214 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
215 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217 comm_size = MPI_COMM_WORLD->size();
218 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
219 send_size = parse_double(action[2]);
220 disps = std::vector<int>(comm_size, 0);
221 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
223 if (name == "gatherV") {
224 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
225 if (action.size() > 4 + comm_size)
226 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
227 if (action.size() > 5 + comm_size)
228 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
231 int datatype_index = 0;
233 /* The 3 comes from "0 gather <sendcount>", which must always be present.
234 * The + comm_size is the recvcounts array, which must also be present
236 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
237 datatype_index = 3 + comm_size;
238 disp_index = datatype_index + 1;
239 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
240 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
241 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
242 disp_index = 3 + comm_size;
243 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
244 datatype_index = 3 + comm_size;
245 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
246 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
249 if (disp_index != 0) {
250 for (unsigned int i = 0; i < comm_size; i++)
251 disps[i] = std::stoi(action[disp_index + i]);
255 for (unsigned int i = 0; i < comm_size; i++) {
256 (*recvcounts)[i] = std::stoi(action[i + 3]);
258 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
262 class ScatterArgParser : public CollCommParser {
264 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
266 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
269 1) 68 is the sendcounts
270 2) 68 is the recvcounts
271 3) 0 is the root node
272 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
273 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
275 CHECK_ACTION_PARAMS(action, 2, 3)
276 comm_size = MPI_COMM_WORLD->size();
277 send_size = parse_double(action[2]);
278 recv_size = parse_double(action[3]);
279 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
280 if (action.size() > 5)
281 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
282 if (action.size() > 6)
283 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
287 class ScatterVArgParser : public CollCommParser {
291 std::shared_ptr<std::vector<int>> sendcounts;
292 std::vector<int> disps;
293 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
295 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
296 0 gather 68 10 10 10 68 0 0 0
298 1) 68 10 10 10 is the sendcounts
299 2) 68 is the recvcount
300 3) 0 is the root node
301 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
302 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
304 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
305 recv_size = parse_double(action[2 + comm_size]);
306 disps = std::vector<int>(comm_size, 0);
307 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
309 if (action.size() > 5 + comm_size)
310 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
311 if (action.size() > 5 + comm_size)
312 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
314 for (unsigned int i = 0; i < comm_size; i++) {
315 (*sendcounts)[i] = std::stoi(action[i + 2]);
317 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
318 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
322 class ReduceScatterArgParser : public CollCommParser {
325 std::shared_ptr<std::vector<int>> recvcounts;
326 std::vector<int> disps;
327 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
329 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
330 0 reduceScatter 275427 275427 275427 204020 11346849 0
332 1) The first four values after the name of the action declare the recvcounts array
333 2) The value 11346849 is the amount of instructions
334 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
336 comm_size = MPI_COMM_WORLD->size();
337 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
338 comp_size = parse_double(action[2+comm_size]);
339 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
340 if (action.size() > 3 + comm_size)
341 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
343 for (unsigned int i = 0; i < comm_size; i++) {
344 recvcounts->push_back(std::stoi(action[i + 2]));
346 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
350 class AllToAllVArgParser : public CollCommParser {
354 std::shared_ptr<std::vector<int>> recvcounts;
355 std::shared_ptr<std::vector<int>> sendcounts;
356 std::vector<int> senddisps;
357 std::vector<int> recvdisps;
360 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
362 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
363 0 allToAllV 100 1 7 10 12 100 1 70 10 5
365 1) 100 is the size of the send buffer *sizeof(int),
366 2) 1 7 10 12 is the sendcounts array
367 3) 100*sizeof(int) is the size of the receiver buffer
368 4) 1 70 10 5 is the recvcounts array
370 comm_size = MPI_COMM_WORLD->size();
371 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
372 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
373 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
374 senddisps = std::vector<int>(comm_size, 0);
375 recvdisps = std::vector<int>(comm_size, 0);
377 if (action.size() > 5 + 2 * comm_size)
378 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
379 if (action.size() > 5 + 2 * comm_size)
380 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
382 send_buf_size=parse_double(action[2]);
383 recv_buf_size=parse_double(action[3+comm_size]);
384 for (unsigned int i = 0; i < comm_size; i++) {
385 (*sendcounts)[i] = std::stoi(action[3 + i]);
386 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
388 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
389 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
393 template <class T> class ReplayAction {
395 const std::string name;
396 const int my_proc_id;
400 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
401 virtual ~ReplayAction() = default;
403 virtual void execute(simgrid::xbt::ReplayAction& action)
405 // Needs to be re-initialized for every action, hence here
406 double start_time = smpi_process()->simulated_elapsed();
407 args.parse(action, name);
410 log_timed_action(action, start_time);
413 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
415 void* send_buffer(int size)
417 return smpi_get_tmp_sendbuffer(size);
420 void* recv_buffer(int size)
422 return smpi_get_tmp_recvbuffer(size);
426 class WaitAction : public ReplayAction<ActionArgParser> {
428 WaitAction() : ReplayAction("Wait") {}
429 void kernel(simgrid::xbt::ReplayAction& action) override
431 std::string s = boost::algorithm::join(action, " ");
432 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
433 MPI_Request request = get_reqq_self()->back();
434 get_reqq_self()->pop_back();
436 if (request == nullptr) {
437 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
442 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
444 // Must be taken before Request::wait() since the request may be set to
445 // MPI_REQUEST_NULL by Request::wait!
446 int src = request->comm()->group()->rank(request->src());
447 int dst = request->comm()->group()->rank(request->dst());
448 bool is_wait_for_receive = (request->flags() & RECV);
449 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
450 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
453 Request::wait(&request, &status);
455 TRACE_smpi_comm_out(rank);
456 if (is_wait_for_receive)
457 TRACE_smpi_recv(src, dst, 0);
461 class SendAction : public ReplayAction<SendRecvParser> {
463 SendAction() = delete;
464 explicit SendAction(std::string name) : ReplayAction(name) {}
465 void kernel(simgrid::xbt::ReplayAction& action) override
467 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
469 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
470 Datatype::encode(args.datatype1)));
471 if (not TRACE_smpi_view_internals())
472 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
474 if (name == "send") {
475 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
476 } else if (name == "Isend") {
477 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
478 get_reqq_self()->push_back(request);
480 xbt_die("Don't know this action, %s", name.c_str());
483 TRACE_smpi_comm_out(my_proc_id);
487 class RecvAction : public ReplayAction<SendRecvParser> {
489 RecvAction() = delete;
490 explicit RecvAction(std::string name) : ReplayAction(name) {}
491 void kernel(simgrid::xbt::ReplayAction& action) override
493 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
495 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
496 Datatype::encode(args.datatype1)));
499 // unknown size from the receiver point of view
500 if (args.size <= 0.0) {
501 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
502 args.size = status.count;
505 if (name == "recv") {
506 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
507 } else if (name == "Irecv") {
508 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
509 get_reqq_self()->push_back(request);
512 TRACE_smpi_comm_out(my_proc_id);
513 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
514 if (name == "recv" && not TRACE_smpi_view_internals()) {
515 TRACE_smpi_recv(src_traced, my_proc_id, 0);
520 class ComputeAction : public ReplayAction<ComputeParser> {
522 ComputeAction() : ReplayAction("compute") {}
523 void kernel(simgrid::xbt::ReplayAction& action) override
525 TRACE_smpi_computing_in(my_proc_id, args.flops);
526 smpi_execute_flops(args.flops);
527 TRACE_smpi_computing_out(my_proc_id);
531 class TestAction : public ReplayAction<ActionArgParser> {
533 TestAction() : ReplayAction("Test") {}
534 void kernel(simgrid::xbt::ReplayAction& action) override
536 MPI_Request request = get_reqq_self()->back();
537 get_reqq_self()->pop_back();
538 // if request is null here, this may mean that a previous test has succeeded
539 // Different times in traced application and replayed version may lead to this
540 // In this case, ignore the extra calls.
541 if (request != nullptr) {
542 TRACE_smpi_testing_in(my_proc_id);
545 int flag = Request::test(&request, &status);
547 XBT_DEBUG("MPI_Test result: %d", flag);
548 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
550 get_reqq_self()->push_back(request);
552 TRACE_smpi_testing_out(my_proc_id);
557 class InitAction : public ReplayAction<ActionArgParser> {
559 InitAction() : ReplayAction("Init") {}
560 void kernel(simgrid::xbt::ReplayAction& action) override
562 CHECK_ACTION_PARAMS(action, 0, 1)
563 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
564 : MPI_BYTE; // default TAU datatype
566 /* start a simulated timer */
567 smpi_process()->simulated_start();
568 set_reqq_self(new std::vector<MPI_Request>);
572 class CommunicatorAction : public ReplayAction<ActionArgParser> {
574 CommunicatorAction() : ReplayAction("Comm") {}
575 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
578 class WaitAllAction : public ReplayAction<ActionArgParser> {
580 WaitAllAction() : ReplayAction("waitAll") {}
581 void kernel(simgrid::xbt::ReplayAction& action) override
583 const unsigned int count_requests = get_reqq_self()->size();
585 if (count_requests > 0) {
586 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
587 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
588 for (const auto& req : (*get_reqq_self())) {
589 if (req && (req->flags() & RECV)) {
590 sender_receiver.push_back({req->src(), req->dst()});
593 MPI_Status status[count_requests];
594 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
596 for (auto& pair : sender_receiver) {
597 TRACE_smpi_recv(pair.first, pair.second, 0);
599 TRACE_smpi_comm_out(my_proc_id);
604 class BarrierAction : public ReplayAction<ActionArgParser> {
606 BarrierAction() : ReplayAction("barrier") {}
607 void kernel(simgrid::xbt::ReplayAction& action) override
609 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
610 Colls::barrier(MPI_COMM_WORLD);
611 TRACE_smpi_comm_out(my_proc_id);
615 class BcastAction : public ReplayAction<BcastArgParser> {
617 BcastAction() : ReplayAction("bcast") {}
618 void kernel(simgrid::xbt::ReplayAction& action) override
620 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
621 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
622 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
624 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
626 TRACE_smpi_comm_out(my_proc_id);
630 class ReduceAction : public ReplayAction<ReduceArgParser> {
632 ReduceAction() : ReplayAction("reduce") {}
633 void kernel(simgrid::xbt::ReplayAction& action) override
635 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
636 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
637 args.comp_size, args.comm_size, -1,
638 Datatype::encode(args.datatype1), ""));
640 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
641 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
642 smpi_execute_flops(args.comp_size);
644 TRACE_smpi_comm_out(my_proc_id);
648 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
650 AllReduceAction() : ReplayAction("allReduce") {}
651 void kernel(simgrid::xbt::ReplayAction& action) override
653 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
654 Datatype::encode(args.datatype1), ""));
656 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
657 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
658 smpi_execute_flops(args.comp_size);
660 TRACE_smpi_comm_out(my_proc_id);
664 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
666 AllToAllAction() : ReplayAction("allToAll") {}
667 void kernel(simgrid::xbt::ReplayAction& action) override
669 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
670 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
671 Datatype::encode(args.datatype1),
672 Datatype::encode(args.datatype2)));
674 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
675 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
676 args.recv_size, args.datatype2, MPI_COMM_WORLD);
678 TRACE_smpi_comm_out(my_proc_id);
682 class GatherAction : public ReplayAction<GatherArgParser> {
684 explicit GatherAction(std::string name) : ReplayAction(name) {}
685 void kernel(simgrid::xbt::ReplayAction& action) override
687 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
688 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
690 if (name == "gather") {
691 int rank = MPI_COMM_WORLD->rank();
692 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
693 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
696 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
699 TRACE_smpi_comm_out(my_proc_id);
703 class GatherVAction : public ReplayAction<GatherVArgParser> {
705 explicit GatherVAction(std::string name) : ReplayAction(name) {}
706 void kernel(simgrid::xbt::ReplayAction& action) override
708 int rank = MPI_COMM_WORLD->rank();
710 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
711 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
712 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
714 if (name == "gatherV") {
715 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
716 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
717 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
720 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
721 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
722 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
725 TRACE_smpi_comm_out(my_proc_id);
729 class ScatterAction : public ReplayAction<ScatterArgParser> {
731 ScatterAction() : ReplayAction("scatter") {}
732 void kernel(simgrid::xbt::ReplayAction& action) override
734 int rank = MPI_COMM_WORLD->rank();
735 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
736 Datatype::encode(args.datatype1),
737 Datatype::encode(args.datatype2)));
739 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
740 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
742 TRACE_smpi_comm_out(my_proc_id);
747 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
749 ScatterVAction() : ReplayAction("scatterV") {}
750 void kernel(simgrid::xbt::ReplayAction& action) override
752 int rank = MPI_COMM_WORLD->rank();
753 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
754 nullptr, Datatype::encode(args.datatype1),
755 Datatype::encode(args.datatype2)));
757 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
758 args.sendcounts->data(), args.disps.data(), args.datatype1,
759 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
762 TRACE_smpi_comm_out(my_proc_id);
766 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
768 ReduceScatterAction() : ReplayAction("reduceScatter") {}
769 void kernel(simgrid::xbt::ReplayAction& action) override
771 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
772 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
773 std::to_string(args.comp_size), /* ugly hack to print comp_size */
774 Datatype::encode(args.datatype1)));
776 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
777 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
778 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
780 smpi_execute_flops(args.comp_size);
781 TRACE_smpi_comm_out(my_proc_id);
785 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
787 AllToAllVAction() : ReplayAction("allToAllV") {}
788 void kernel(simgrid::xbt::ReplayAction& action) override
790 TRACE_smpi_comm_in(my_proc_id, __func__,
791 new simgrid::instr::VarCollTIData(
792 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
793 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
795 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
796 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
798 TRACE_smpi_comm_out(my_proc_id);
801 } // Replay Namespace
802 }} // namespace simgrid::smpi
804 /** @brief Only initialize the replay, don't do it for real */
805 void smpi_replay_init(int* argc, char*** argv)
807 simgrid::smpi::Process::init(argc, argv);
808 smpi_process()->mark_as_initialized();
809 smpi_process()->set_replaying(true);
811 int my_proc_id = simgrid::s4u::this_actor::getPid();
812 TRACE_smpi_init(my_proc_id);
813 TRACE_smpi_computing_init(my_proc_id);
814 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
815 TRACE_smpi_comm_out(my_proc_id);
816 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
817 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
818 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
819 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
823 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
824 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
825 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
826 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
827 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
828 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
829 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
830 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
831 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
832 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
833 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
834 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
835 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
836 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
837 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
838 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
839 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
840 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
841 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
842 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
844 //if we have a delayed start, sleep here.
846 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
847 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
848 smpi_execute_flops(value);
850 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
851 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
852 smpi_execute_flops(0.0);
856 /** @brief actually run the replay after initialization */
857 void smpi_replay_main(int* argc, char*** argv)
859 static int active_processes = 0;
861 simgrid::xbt::replay_runner(*argc, *argv);
863 /* and now, finalize everything */
864 /* One active process will stop. Decrease the counter*/
865 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
866 if (not get_reqq_self()->empty()) {
867 unsigned int count_requests=get_reqq_self()->size();
868 MPI_Request requests[count_requests];
869 MPI_Status status[count_requests];
872 for (auto const& req : *get_reqq_self()) {
876 simgrid::smpi::Request::waitall(count_requests, requests, status);
878 delete get_reqq_self();
881 if(active_processes==0){
882 /* Last process alive speaking: end the simulated timer */
883 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
884 smpi_free_replay_tmp_buffers();
887 TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
888 new simgrid::instr::NoOpTIData("finalize"));
890 smpi_process()->finalize();
892 TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
893 TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
896 /** @brief chain a replay initialization and a replay start */
897 void smpi_replay_run(int* argc, char*** argv)
899 smpi_replay_init(argc, argv);
900 smpi_replay_main(argc, argv);