1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
22 using simgrid::s4u::Actor;
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) { \
33 std::stringstream ss; \
34 for (const auto& elem : action) { \
37 THROWF(arg_error, 0, "%s replay failed.\n" \
38 "%zu items were given on the line. First two should be process_id and action. " \
39 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
40 "The full line that was given is:\n %s\n" \
41 "Please contact the Simgrid team if support is needed", \
42 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional), ss.str().c_str()); \
46 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
48 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
49 std::string s = boost::algorithm::join(action, " ");
50 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
54 static std::vector<MPI_Request>* get_reqq_self()
56 return reqq.at(simgrid::s4u::this_actor::getPid());
59 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
61 reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
65 static double parse_double(std::string string)
67 return xbt_str_parse_double(string.c_str(), "%s is not a double");
74 class ActionArgParser {
76 virtual ~ActionArgParser() = default;
77 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
80 class SendRecvParser : public ActionArgParser {
82 /* communication partner; if we send, this is the receiver and vice versa */
85 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
87 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
89 CHECK_ACTION_PARAMS(action, 2, 1)
90 partner = std::stoi(action[2]);
91 size = parse_double(action[3]);
92 if (action.size() > 4)
93 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
97 class ComputeParser : public ActionArgParser {
99 /* communication partner; if we send, this is the receiver and vice versa */
102 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
104 CHECK_ACTION_PARAMS(action, 1, 0)
105 flops = parse_double(action[2]);
109 class CollCommParser : public ActionArgParser {
117 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
118 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
121 class BcastArgParser : public CollCommParser {
123 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
125 CHECK_ACTION_PARAMS(action, 1, 2)
126 size = parse_double(action[2]);
127 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
128 if (action.size() > 4)
129 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
133 class ReduceArgParser : public CollCommParser {
135 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
137 CHECK_ACTION_PARAMS(action, 2, 2)
138 comm_size = parse_double(action[2]);
139 comp_size = parse_double(action[3]);
140 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
141 if (action.size() > 5)
142 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
146 class AllReduceArgParser : public CollCommParser {
148 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
150 CHECK_ACTION_PARAMS(action, 2, 1)
151 comm_size = parse_double(action[2]);
152 comp_size = parse_double(action[3]);
153 if (action.size() > 4)
154 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
158 class AllToAllArgParser : public CollCommParser {
160 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
162 CHECK_ACTION_PARAMS(action, 2, 1)
163 comm_size = MPI_COMM_WORLD->size();
164 send_size = parse_double(action[2]);
165 recv_size = parse_double(action[3]);
167 if (action.size() > 4)
168 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
169 if (action.size() > 5)
170 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
174 class GatherArgParser : public CollCommParser {
176 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
178 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
181 1) 68 is the sendcounts
182 2) 68 is the recvcounts
183 3) 0 is the root node
184 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
185 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
187 CHECK_ACTION_PARAMS(action, 2, 3)
188 comm_size = MPI_COMM_WORLD->size();
189 send_size = parse_double(action[2]);
190 recv_size = parse_double(action[3]);
192 if (name == "gather") {
193 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
194 if (action.size() > 5)
195 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
196 if (action.size() > 6)
197 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
200 if (action.size() > 4)
201 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
202 if (action.size() > 5)
203 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
208 class GatherVArgParser : public CollCommParser {
211 std::shared_ptr<std::vector<int>> recvcounts;
212 std::vector<int> disps;
213 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
215 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
216 0 gather 68 68 10 10 10 0 0 0
218 1) 68 is the sendcount
219 2) 68 10 10 10 is the recvcounts
220 3) 0 is the root node
221 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
222 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
224 comm_size = MPI_COMM_WORLD->size();
225 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
226 send_size = parse_double(action[2]);
227 disps = std::vector<int>(comm_size, 0);
228 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
230 if (name == "gatherV") {
231 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
232 if (action.size() > 4 + comm_size)
233 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
234 if (action.size() > 5 + comm_size)
235 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
238 int datatype_index = 0;
240 /* The 3 comes from "0 gather <sendcount>", which must always be present.
241 * The + comm_size is the recvcounts array, which must also be present
243 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
244 datatype_index = 3 + comm_size;
245 disp_index = datatype_index + 1;
246 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
247 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
248 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
249 disp_index = 3 + comm_size;
250 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
251 datatype_index = 3 + comm_size;
252 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
253 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
256 if (disp_index != 0) {
257 for (unsigned int i = 0; i < comm_size; i++)
258 disps[i] = std::stoi(action[disp_index + i]);
262 for (unsigned int i = 0; i < comm_size; i++) {
263 (*recvcounts)[i] = std::stoi(action[i + 3]);
265 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
269 class ScatterArgParser : public CollCommParser {
271 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
273 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
276 1) 68 is the sendcounts
277 2) 68 is the recvcounts
278 3) 0 is the root node
279 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
280 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
282 CHECK_ACTION_PARAMS(action, 2, 3)
283 comm_size = MPI_COMM_WORLD->size();
284 send_size = parse_double(action[2]);
285 recv_size = parse_double(action[3]);
286 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
287 if (action.size() > 5)
288 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
289 if (action.size() > 6)
290 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
294 class ScatterVArgParser : public CollCommParser {
298 std::shared_ptr<std::vector<int>> sendcounts;
299 std::vector<int> disps;
300 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
302 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
303 0 gather 68 10 10 10 68 0 0 0
305 1) 68 10 10 10 is the sendcounts
306 2) 68 is the recvcount
307 3) 0 is the root node
308 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
309 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
311 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
312 recv_size = parse_double(action[2 + comm_size]);
313 disps = std::vector<int>(comm_size, 0);
314 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
316 if (action.size() > 5 + comm_size)
317 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
318 if (action.size() > 5 + comm_size)
319 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
321 for (unsigned int i = 0; i < comm_size; i++) {
322 (*sendcounts)[i] = std::stoi(action[i + 2]);
324 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
325 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
329 class ReduceScatterArgParser : public CollCommParser {
332 std::shared_ptr<std::vector<int>> recvcounts;
333 std::vector<int> disps;
334 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
336 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
337 0 reduceScatter 275427 275427 275427 204020 11346849 0
339 1) The first four values after the name of the action declare the recvcounts array
340 2) The value 11346849 is the amount of instructions
341 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
343 comm_size = MPI_COMM_WORLD->size();
344 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
345 comp_size = parse_double(action[2+comm_size]);
346 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
347 if (action.size() > 3 + comm_size)
348 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
350 for (unsigned int i = 0; i < comm_size; i++) {
351 recvcounts->push_back(std::stoi(action[i + 2]));
353 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
357 class AllToAllVArgParser : public CollCommParser {
361 std::shared_ptr<std::vector<int>> recvcounts;
362 std::shared_ptr<std::vector<int>> sendcounts;
363 std::vector<int> senddisps;
364 std::vector<int> recvdisps;
367 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
369 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
370 0 allToAllV 100 1 7 10 12 100 1 70 10 5
372 1) 100 is the size of the send buffer *sizeof(int),
373 2) 1 7 10 12 is the sendcounts array
374 3) 100*sizeof(int) is the size of the receiver buffer
375 4) 1 70 10 5 is the recvcounts array
377 comm_size = MPI_COMM_WORLD->size();
378 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
379 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
381 senddisps = std::vector<int>(comm_size, 0);
382 recvdisps = std::vector<int>(comm_size, 0);
384 if (action.size() > 5 + 2 * comm_size)
385 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
386 if (action.size() > 5 + 2 * comm_size)
387 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
389 send_buf_size=parse_double(action[2]);
390 recv_buf_size=parse_double(action[3+comm_size]);
391 for (unsigned int i = 0; i < comm_size; i++) {
392 (*sendcounts)[i] = std::stoi(action[3 + i]);
393 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
395 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
396 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
400 template <class T> class ReplayAction {
402 const std::string name;
403 const int my_proc_id;
407 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
408 virtual ~ReplayAction() = default;
410 virtual void execute(simgrid::xbt::ReplayAction& action)
412 // Needs to be re-initialized for every action, hence here
413 double start_time = smpi_process()->simulated_elapsed();
414 args.parse(action, name);
417 log_timed_action(action, start_time);
420 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
422 void* send_buffer(int size)
424 return smpi_get_tmp_sendbuffer(size);
427 void* recv_buffer(int size)
429 return smpi_get_tmp_recvbuffer(size);
433 class WaitAction : public ReplayAction<ActionArgParser> {
435 WaitAction() : ReplayAction("Wait") {}
436 void kernel(simgrid::xbt::ReplayAction& action) override
438 std::string s = boost::algorithm::join(action, " ");
439 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
440 MPI_Request request = get_reqq_self()->back();
441 get_reqq_self()->pop_back();
443 if (request == nullptr) {
444 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
449 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
451 // Must be taken before Request::wait() since the request may be set to
452 // MPI_REQUEST_NULL by Request::wait!
453 int src = request->comm()->group()->rank(request->src());
454 int dst = request->comm()->group()->rank(request->dst());
455 bool is_wait_for_receive = (request->flags() & RECV);
456 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
457 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
460 Request::wait(&request, &status);
462 TRACE_smpi_comm_out(rank);
463 if (is_wait_for_receive)
464 TRACE_smpi_recv(src, dst, 0);
468 class SendAction : public ReplayAction<SendRecvParser> {
470 SendAction() = delete;
471 explicit SendAction(std::string name) : ReplayAction(name) {}
472 void kernel(simgrid::xbt::ReplayAction& action) override
474 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
476 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
477 Datatype::encode(args.datatype1)));
478 if (not TRACE_smpi_view_internals())
479 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
481 if (name == "send") {
482 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
483 } else if (name == "Isend") {
484 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
485 get_reqq_self()->push_back(request);
487 xbt_die("Don't know this action, %s", name.c_str());
490 TRACE_smpi_comm_out(my_proc_id);
494 class RecvAction : public ReplayAction<SendRecvParser> {
496 RecvAction() = delete;
497 explicit RecvAction(std::string name) : ReplayAction(name) {}
498 void kernel(simgrid::xbt::ReplayAction& action) override
500 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
502 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
503 Datatype::encode(args.datatype1)));
506 // unknown size from the receiver point of view
507 if (args.size <= 0.0) {
508 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
509 args.size = status.count;
512 if (name == "recv") {
513 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
514 } else if (name == "Irecv") {
515 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
516 get_reqq_self()->push_back(request);
519 TRACE_smpi_comm_out(my_proc_id);
520 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
521 if (name == "recv" && not TRACE_smpi_view_internals()) {
522 TRACE_smpi_recv(src_traced, my_proc_id, 0);
527 class ComputeAction : public ReplayAction<ComputeParser> {
529 ComputeAction() : ReplayAction("compute") {}
530 void kernel(simgrid::xbt::ReplayAction& action) override
532 TRACE_smpi_computing_in(my_proc_id, args.flops);
533 smpi_execute_flops(args.flops);
534 TRACE_smpi_computing_out(my_proc_id);
538 class TestAction : public ReplayAction<ActionArgParser> {
540 TestAction() : ReplayAction("Test") {}
541 void kernel(simgrid::xbt::ReplayAction& action) override
543 MPI_Request request = get_reqq_self()->back();
544 get_reqq_self()->pop_back();
545 // if request is null here, this may mean that a previous test has succeeded
546 // Different times in traced application and replayed version may lead to this
547 // In this case, ignore the extra calls.
548 if (request != nullptr) {
549 TRACE_smpi_testing_in(my_proc_id);
552 int flag = Request::test(&request, &status);
554 XBT_DEBUG("MPI_Test result: %d", flag);
555 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
557 get_reqq_self()->push_back(request);
559 TRACE_smpi_testing_out(my_proc_id);
564 class InitAction : public ReplayAction<ActionArgParser> {
566 InitAction() : ReplayAction("Init") {}
567 void kernel(simgrid::xbt::ReplayAction& action) override
569 CHECK_ACTION_PARAMS(action, 0, 1)
570 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
571 : MPI_BYTE; // default TAU datatype
573 /* start a simulated timer */
574 smpi_process()->simulated_start();
575 set_reqq_self(new std::vector<MPI_Request>);
579 class CommunicatorAction : public ReplayAction<ActionArgParser> {
581 CommunicatorAction() : ReplayAction("Comm") {}
582 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
585 class WaitAllAction : public ReplayAction<ActionArgParser> {
587 WaitAllAction() : ReplayAction("waitAll") {}
588 void kernel(simgrid::xbt::ReplayAction& action) override
590 const unsigned int count_requests = get_reqq_self()->size();
592 if (count_requests > 0) {
593 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
594 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
595 for (const auto& req : (*get_reqq_self())) {
596 if (req && (req->flags() & RECV)) {
597 sender_receiver.push_back({req->src(), req->dst()});
600 MPI_Status status[count_requests];
601 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
603 for (auto& pair : sender_receiver) {
604 TRACE_smpi_recv(pair.first, pair.second, 0);
606 TRACE_smpi_comm_out(my_proc_id);
611 class BarrierAction : public ReplayAction<ActionArgParser> {
613 BarrierAction() : ReplayAction("barrier") {}
614 void kernel(simgrid::xbt::ReplayAction& action) override
616 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
617 Colls::barrier(MPI_COMM_WORLD);
618 TRACE_smpi_comm_out(my_proc_id);
622 class BcastAction : public ReplayAction<BcastArgParser> {
624 BcastAction() : ReplayAction("bcast") {}
625 void kernel(simgrid::xbt::ReplayAction& action) override
627 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
628 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
629 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
631 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
633 TRACE_smpi_comm_out(my_proc_id);
637 class ReduceAction : public ReplayAction<ReduceArgParser> {
639 ReduceAction() : ReplayAction("reduce") {}
640 void kernel(simgrid::xbt::ReplayAction& action) override
642 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
643 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
644 args.comp_size, args.comm_size, -1,
645 Datatype::encode(args.datatype1), ""));
647 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
648 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
649 smpi_execute_flops(args.comp_size);
651 TRACE_smpi_comm_out(my_proc_id);
655 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
657 AllReduceAction() : ReplayAction("allReduce") {}
658 void kernel(simgrid::xbt::ReplayAction& action) override
660 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
661 Datatype::encode(args.datatype1), ""));
663 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
664 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
665 smpi_execute_flops(args.comp_size);
667 TRACE_smpi_comm_out(my_proc_id);
671 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
673 AllToAllAction() : ReplayAction("allToAll") {}
674 void kernel(simgrid::xbt::ReplayAction& action) override
676 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
677 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
678 Datatype::encode(args.datatype1),
679 Datatype::encode(args.datatype2)));
681 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
682 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
683 args.recv_size, args.datatype2, MPI_COMM_WORLD);
685 TRACE_smpi_comm_out(my_proc_id);
689 class GatherAction : public ReplayAction<GatherArgParser> {
691 explicit GatherAction(std::string name) : ReplayAction(name) {}
692 void kernel(simgrid::xbt::ReplayAction& action) override
694 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
695 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
697 if (name == "gather") {
698 int rank = MPI_COMM_WORLD->rank();
699 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
703 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
704 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
706 TRACE_smpi_comm_out(my_proc_id);
710 class GatherVAction : public ReplayAction<GatherVArgParser> {
712 explicit GatherVAction(std::string name) : ReplayAction(name) {}
713 void kernel(simgrid::xbt::ReplayAction& action) override
715 int rank = MPI_COMM_WORLD->rank();
717 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
718 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
719 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
721 if (name == "gatherV") {
722 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
723 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
724 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
727 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
728 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
729 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
732 TRACE_smpi_comm_out(my_proc_id);
736 class ScatterAction : public ReplayAction<ScatterArgParser> {
738 ScatterAction() : ReplayAction("scatter") {}
739 void kernel(simgrid::xbt::ReplayAction& action) override
741 int rank = MPI_COMM_WORLD->rank();
742 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
743 Datatype::encode(args.datatype1),
744 Datatype::encode(args.datatype2)));
746 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
747 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
749 TRACE_smpi_comm_out(my_proc_id);
754 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
756 ScatterVAction() : ReplayAction("scatterV") {}
757 void kernel(simgrid::xbt::ReplayAction& action) override
759 int rank = MPI_COMM_WORLD->rank();
760 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
761 nullptr, Datatype::encode(args.datatype1),
762 Datatype::encode(args.datatype2)));
764 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
765 args.sendcounts->data(), args.disps.data(), args.datatype1,
766 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
769 TRACE_smpi_comm_out(my_proc_id);
773 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
775 ReduceScatterAction() : ReplayAction("reduceScatter") {}
776 void kernel(simgrid::xbt::ReplayAction& action) override
778 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
779 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
780 std::to_string(args.comp_size), /* ugly hack to print comp_size */
781 Datatype::encode(args.datatype1)));
783 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
784 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
785 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
787 smpi_execute_flops(args.comp_size);
788 TRACE_smpi_comm_out(my_proc_id);
792 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
794 AllToAllVAction() : ReplayAction("allToAllV") {}
795 void kernel(simgrid::xbt::ReplayAction& action) override
797 TRACE_smpi_comm_in(my_proc_id, __func__,
798 new simgrid::instr::VarCollTIData(
799 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
800 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
802 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
803 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
805 TRACE_smpi_comm_out(my_proc_id);
808 } // Replay Namespace
809 }} // namespace simgrid::smpi
811 /** @brief Only initialize the replay, don't do it for real */
812 void smpi_replay_init(int* argc, char*** argv)
814 simgrid::smpi::Process::init(argc, argv);
815 smpi_process()->mark_as_initialized();
816 smpi_process()->set_replaying(true);
818 int my_proc_id = simgrid::s4u::this_actor::getPid();
819 TRACE_smpi_init(my_proc_id);
820 TRACE_smpi_computing_init(my_proc_id);
821 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
822 TRACE_smpi_comm_out(my_proc_id);
823 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
824 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
825 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
826 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
827 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
829 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
830 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
831 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
832 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
833 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
834 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
835 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
836 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
837 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
838 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
839 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
840 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
841 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
842 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
843 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
844 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
845 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
846 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
847 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
848 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
849 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
851 //if we have a delayed start, sleep here.
853 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
854 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
855 smpi_execute_flops(value);
857 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
858 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
859 smpi_execute_flops(0.0);
863 /** @brief actually run the replay after initialization */
864 void smpi_replay_main(int* argc, char*** argv)
866 static int active_processes = 0;
868 simgrid::xbt::replay_runner(*argc, *argv);
870 /* and now, finalize everything */
871 /* One active process will stop. Decrease the counter*/
872 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
873 if (not get_reqq_self()->empty()) {
874 unsigned int count_requests=get_reqq_self()->size();
875 MPI_Request requests[count_requests];
876 MPI_Status status[count_requests];
879 for (auto const& req : *get_reqq_self()) {
883 simgrid::smpi::Request::waitall(count_requests, requests, status);
885 delete get_reqq_self();
888 if(active_processes==0){
889 /* Last process alive speaking: end the simulated timer */
890 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
891 smpi_free_replay_tmp_buffers();
894 TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
895 new simgrid::instr::NoOpTIData("finalize"));
897 smpi_process()->finalize();
899 TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
900 TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
903 /** @brief chain a replay initialization and a replay start */
904 void smpi_replay_run(int* argc, char*** argv)
906 smpi_replay_init(argc, argv);
907 smpi_replay_main(argc, argv);