1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%zu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional)); \
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43 std::string s = boost::algorithm::join(action, " ");
44 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
48 static std::vector<MPI_Request>* get_reqq_self()
50 return reqq.at(Actor::self()->getPid());
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 reqq.insert({Actor::self()->getPid(), mpi_request});
59 static double parse_double(std::string string)
61 return xbt_str_parse_double(string.c_str(), "%s is not a double");
68 class ActionArgParser {
70 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
73 class SendRecvParser : public ActionArgParser {
75 /* communication partner; if we send, this is the receiver and vice versa */
78 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
82 CHECK_ACTION_PARAMS(action, 2, 1)
83 partner = std::stoi(action[2]);
84 size = parse_double(action[3]);
85 if (action.size() > 4)
86 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
90 class ComputeParser : public ActionArgParser {
92 /* communication partner; if we send, this is the receiver and vice versa */
95 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
97 CHECK_ACTION_PARAMS(action, 1, 0)
98 flops = parse_double(action[2]);
102 class CollCommParser : public ActionArgParser {
110 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
111 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
114 class BcastArgParser : public CollCommParser {
116 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
118 CHECK_ACTION_PARAMS(action, 1, 2)
119 size = parse_double(action[2]);
120 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
121 if (action.size() > 4)
122 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
126 class ReduceArgParser : public CollCommParser {
128 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
130 CHECK_ACTION_PARAMS(action, 2, 2)
131 comm_size = parse_double(action[2]);
132 comp_size = parse_double(action[3]);
133 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
134 if (action.size() > 5)
135 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
139 class AllReduceArgParser : public CollCommParser {
141 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
143 CHECK_ACTION_PARAMS(action, 2, 1)
144 comm_size = parse_double(action[2]);
145 comp_size = parse_double(action[3]);
146 if (action.size() > 4)
147 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
151 class AllToAllArgParser : public CollCommParser {
153 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
155 CHECK_ACTION_PARAMS(action, 2, 1)
156 comm_size = MPI_COMM_WORLD->size();
157 send_size = parse_double(action[2]);
158 recv_size = parse_double(action[3]);
160 if (action.size() > 4)
161 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
162 if (action.size() > 5)
163 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
167 class GatherArgParser : public CollCommParser {
169 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
171 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
174 1) 68 is the sendcounts
175 2) 68 is the recvcounts
176 3) 0 is the root node
177 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
178 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
180 CHECK_ACTION_PARAMS(action, 2, 3)
181 comm_size = MPI_COMM_WORLD->size();
182 send_size = parse_double(action[2]);
183 recv_size = parse_double(action[3]);
185 if (name == "gather") {
186 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
187 if (action.size() > 5)
188 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
189 if (action.size() > 6)
190 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
193 if (action.size() > 4)
194 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
195 if (action.size() > 5)
196 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
201 class GatherVArgParser : public CollCommParser {
204 std::shared_ptr<std::vector<int>> recvcounts;
205 std::vector<int> disps;
206 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
208 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
209 0 gather 68 68 10 10 10 0 0 0
211 1) 68 is the sendcount
212 2) 68 10 10 10 is the recvcounts
213 3) 0 is the root node
214 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
215 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217 comm_size = MPI_COMM_WORLD->size();
218 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
219 send_size = parse_double(action[2]);
220 disps = std::vector<int>(comm_size, 0);
221 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
223 if (name == "gatherV") {
224 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
225 if (action.size() > 4 + comm_size)
226 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
227 if (action.size() > 5 + comm_size)
228 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
231 int datatype_index = 0, disp_index = 0;
232 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
233 datatype_index = 3 + comm_size;
234 disp_index = datatype_index + 1;
235 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
237 disp_index = 3 + comm_size;
238 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
239 datatype_index = 3 + comm_size;
242 if (disp_index != 0) {
243 for (unsigned int i = 0; i < comm_size; i++)
244 disps[i] = std::stoi(action[disp_index + i]);
247 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
248 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
251 for (unsigned int i = 0; i < comm_size; i++) {
252 (*recvcounts)[i] = std::stoi(action[i + 3]);
254 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
258 class ScatterArgParser : public CollCommParser {
260 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
262 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
265 1) 68 is the sendcounts
266 2) 68 is the recvcounts
267 3) 0 is the root node
268 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
269 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
271 CHECK_ACTION_PARAMS(action, 2, 3)
272 comm_size = MPI_COMM_WORLD->size();
273 send_size = parse_double(action[2]);
274 recv_size = parse_double(action[3]);
275 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
276 if (action.size() > 5)
277 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
278 if (action.size() > 6)
279 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
283 class ScatterVArgParser : public CollCommParser {
287 std::shared_ptr<std::vector<int>> sendcounts;
288 std::vector<int> disps;
289 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
291 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
292 0 gather 68 10 10 10 68 0 0 0
294 1) 68 10 10 10 is the sendcounts
295 2) 68 is the recvcount
296 3) 0 is the root node
297 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
298 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
300 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
301 recv_size = parse_double(action[2 + comm_size]);
302 disps = std::vector<int>(comm_size, 0);
303 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
305 if (action.size() > 5 + comm_size)
306 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
307 if (action.size() > 5 + comm_size)
308 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
310 for (unsigned int i = 0; i < comm_size; i++) {
311 (*sendcounts)[i] = std::stoi(action[i + 2]);
313 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
314 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
318 class ReduceScatterArgParser : public CollCommParser {
321 std::shared_ptr<std::vector<int>> recvcounts;
322 std::vector<int> disps;
323 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
325 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
326 0 reduceScatter 275427 275427 275427 204020 11346849 0
328 1) The first four values after the name of the action declare the recvcounts array
329 2) The value 11346849 is the amount of instructions
330 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
332 comm_size = MPI_COMM_WORLD->size();
333 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
334 comp_size = parse_double(action[2+comm_size]);
335 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
336 if (action.size() > 3 + comm_size)
337 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
339 for (unsigned int i = 0; i < comm_size; i++) {
340 recvcounts->push_back(std::stoi(action[i + 2]));
342 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
346 class AllToAllVArgParser : public CollCommParser {
350 std::shared_ptr<std::vector<int>> recvcounts;
351 std::shared_ptr<std::vector<int>> sendcounts;
352 std::vector<int> senddisps;
353 std::vector<int> recvdisps;
356 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
358 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
359 0 allToAllV 100 1 7 10 12 100 1 70 10 5
361 1) 100 is the size of the send buffer *sizeof(int),
362 2) 1 7 10 12 is the sendcounts array
363 3) 100*sizeof(int) is the size of the receiver buffer
364 4) 1 70 10 5 is the recvcounts array
366 comm_size = MPI_COMM_WORLD->size();
367 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
368 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
369 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
370 senddisps = std::vector<int>(comm_size, 0);
371 recvdisps = std::vector<int>(comm_size, 0);
373 if (action.size() > 5 + 2 * comm_size)
374 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
375 if (action.size() > 5 + 2 * comm_size)
376 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
378 send_buf_size=parse_double(action[2]);
379 recv_buf_size=parse_double(action[3+comm_size]);
380 for (unsigned int i = 0; i < comm_size; i++) {
381 (*sendcounts)[i] = std::stoi(action[3 + i]);
382 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
384 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
385 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
389 template <class T> class ReplayAction {
391 const std::string name;
397 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
399 virtual void execute(simgrid::xbt::ReplayAction& action)
401 // Needs to be re-initialized for every action, hence here
402 double start_time = smpi_process()->simulated_elapsed();
403 args.parse(action, name);
406 log_timed_action(action, start_time);
409 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
411 void* send_buffer(int size)
413 return smpi_get_tmp_sendbuffer(size);
416 void* recv_buffer(int size)
418 return smpi_get_tmp_recvbuffer(size);
422 class WaitAction : public ReplayAction<ActionArgParser> {
424 WaitAction() : ReplayAction("Wait") {}
425 void kernel(simgrid::xbt::ReplayAction& action) override
427 std::string s = boost::algorithm::join(action, " ");
428 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
429 MPI_Request request = get_reqq_self()->back();
430 get_reqq_self()->pop_back();
432 if (request == nullptr) {
433 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
438 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
440 // Must be taken before Request::wait() since the request may be set to
441 // MPI_REQUEST_NULL by Request::wait!
442 int src = request->comm()->group()->rank(request->src());
443 int dst = request->comm()->group()->rank(request->dst());
444 bool is_wait_for_receive = (request->flags() & RECV);
445 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
446 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
449 Request::wait(&request, &status);
451 TRACE_smpi_comm_out(rank);
452 if (is_wait_for_receive)
453 TRACE_smpi_recv(src, dst, 0);
457 class SendAction : public ReplayAction<SendRecvParser> {
459 SendAction() = delete;
460 SendAction(std::string name) : ReplayAction(name) {}
461 void kernel(simgrid::xbt::ReplayAction& action) override
463 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
465 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
466 Datatype::encode(args.datatype1)));
467 if (not TRACE_smpi_view_internals())
468 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
470 if (name == "send") {
471 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
472 } else if (name == "Isend") {
473 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
474 get_reqq_self()->push_back(request);
476 xbt_die("Don't know this action, %s", name.c_str());
479 TRACE_smpi_comm_out(my_proc_id);
483 class RecvAction : public ReplayAction<SendRecvParser> {
485 RecvAction() = delete;
486 explicit RecvAction(std::string name) : ReplayAction(name) {}
487 void kernel(simgrid::xbt::ReplayAction& action) override
489 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
491 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
492 Datatype::encode(args.datatype1)));
495 // unknown size from the receiver point of view
496 if (args.size <= 0.0) {
497 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
498 args.size = status.count;
501 if (name == "recv") {
502 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
503 } else if (name == "Irecv") {
504 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
505 get_reqq_self()->push_back(request);
508 TRACE_smpi_comm_out(my_proc_id);
509 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
510 if (name == "recv" && not TRACE_smpi_view_internals()) {
511 TRACE_smpi_recv(src_traced, my_proc_id, 0);
516 class ComputeAction : public ReplayAction<ComputeParser> {
518 ComputeAction() : ReplayAction("compute") {}
519 void kernel(simgrid::xbt::ReplayAction& action) override
521 TRACE_smpi_computing_in(my_proc_id, args.flops);
522 smpi_execute_flops(args.flops);
523 TRACE_smpi_computing_out(my_proc_id);
527 class TestAction : public ReplayAction<ActionArgParser> {
529 TestAction() : ReplayAction("Test") {}
530 void kernel(simgrid::xbt::ReplayAction& action) override
532 MPI_Request request = get_reqq_self()->back();
533 get_reqq_self()->pop_back();
534 // if request is null here, this may mean that a previous test has succeeded
535 // Different times in traced application and replayed version may lead to this
536 // In this case, ignore the extra calls.
537 if (request != nullptr) {
538 TRACE_smpi_testing_in(my_proc_id);
541 int flag = Request::test(&request, &status);
543 XBT_DEBUG("MPI_Test result: %d", flag);
544 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
546 get_reqq_self()->push_back(request);
548 TRACE_smpi_testing_out(my_proc_id);
553 class InitAction : public ReplayAction<ActionArgParser> {
555 InitAction() : ReplayAction("Init") {}
556 void kernel(simgrid::xbt::ReplayAction& action) override
558 CHECK_ACTION_PARAMS(action, 0, 1)
559 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
560 : MPI_BYTE; // default TAU datatype
562 /* start a simulated timer */
563 smpi_process()->simulated_start();
564 /*initialize the number of active processes */
565 active_processes = smpi_process_count();
567 set_reqq_self(new std::vector<MPI_Request>);
571 class CommunicatorAction : public ReplayAction<ActionArgParser> {
573 CommunicatorAction() : ReplayAction("Comm") {}
574 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
577 class WaitAllAction : public ReplayAction<ActionArgParser> {
579 WaitAllAction() : ReplayAction("waitAll") {}
580 void kernel(simgrid::xbt::ReplayAction& action) override
582 const unsigned int count_requests = get_reqq_self()->size();
584 if (count_requests > 0) {
585 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
586 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
587 for (const auto& req : (*get_reqq_self())) {
588 if (req && (req->flags() & RECV)) {
589 sender_receiver.push_back({req->src(), req->dst()});
592 MPI_Status status[count_requests];
593 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
595 for (auto& pair : sender_receiver) {
596 TRACE_smpi_recv(pair.first, pair.second, 0);
598 TRACE_smpi_comm_out(my_proc_id);
603 class BarrierAction : public ReplayAction<ActionArgParser> {
605 BarrierAction() : ReplayAction("barrier") {}
606 void kernel(simgrid::xbt::ReplayAction& action) override
608 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
609 Colls::barrier(MPI_COMM_WORLD);
610 TRACE_smpi_comm_out(my_proc_id);
614 class BcastAction : public ReplayAction<BcastArgParser> {
616 BcastAction() : ReplayAction("bcast") {}
617 void kernel(simgrid::xbt::ReplayAction& action) override
619 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
620 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
621 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
623 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
625 TRACE_smpi_comm_out(my_proc_id);
629 class ReduceAction : public ReplayAction<ReduceArgParser> {
631 ReduceAction() : ReplayAction("reduce") {}
632 void kernel(simgrid::xbt::ReplayAction& action) override
634 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
635 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
636 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
638 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
639 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
640 smpi_execute_flops(args.comp_size);
642 TRACE_smpi_comm_out(my_proc_id);
646 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
648 AllReduceAction() : ReplayAction("allReduce") {}
649 void kernel(simgrid::xbt::ReplayAction& action) override
651 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
652 Datatype::encode(args.datatype1), ""));
654 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
655 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
656 smpi_execute_flops(args.comp_size);
658 TRACE_smpi_comm_out(my_proc_id);
662 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
664 AllToAllAction() : ReplayAction("allToAll") {}
665 void kernel(simgrid::xbt::ReplayAction& action) override
667 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
668 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
669 Datatype::encode(args.datatype1),
670 Datatype::encode(args.datatype2)));
672 Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()),
673 args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
674 args.recv_size, args.datatype2, MPI_COMM_WORLD);
676 TRACE_smpi_comm_out(my_proc_id);
680 class GatherAction : public ReplayAction<GatherArgParser> {
682 GatherAction(std::string name) : ReplayAction(name) {}
683 void kernel(simgrid::xbt::ReplayAction& action) override
685 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
686 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
688 if (name == "gather") {
689 int rank = MPI_COMM_WORLD->rank();
690 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
691 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
694 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
695 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
697 TRACE_smpi_comm_out(my_proc_id);
701 class GatherVAction : public ReplayAction<GatherVArgParser> {
703 GatherVAction(std::string name) : ReplayAction(name) {}
704 void kernel(simgrid::xbt::ReplayAction& action) override
706 int rank = MPI_COMM_WORLD->rank();
708 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
709 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
710 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
712 if (name == "gatherV") {
713 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
714 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
718 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
719 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
723 TRACE_smpi_comm_out(my_proc_id);
727 class ScatterAction : public ReplayAction<ScatterArgParser> {
729 ScatterAction() : ReplayAction("scatter") {}
730 void kernel(simgrid::xbt::ReplayAction& action) override
732 int rank = MPI_COMM_WORLD->rank();
733 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
734 Datatype::encode(args.datatype1),
735 Datatype::encode(args.datatype2)));
737 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
738 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
740 TRACE_smpi_comm_out(my_proc_id);
745 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
747 ScatterVAction() : ReplayAction("scatterV") {}
748 void kernel(simgrid::xbt::ReplayAction& action) override
750 int rank = MPI_COMM_WORLD->rank();
751 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
752 nullptr, Datatype::encode(args.datatype1),
753 Datatype::encode(args.datatype2)));
755 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr, args.sendcounts->data(), args.disps.data(),
756 args.datatype1, recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
759 TRACE_smpi_comm_out(my_proc_id);
763 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
765 ReduceScatterAction() : ReplayAction("reduceScatter") {}
766 void kernel(simgrid::xbt::ReplayAction& action) override
768 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
769 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
770 std::to_string(args.comp_size), /* ugly hack to print comp_size */
771 Datatype::encode(args.datatype1)));
773 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()), recv_buffer(args.recv_size_sum * args.datatype1->size()),
774 args.recvcounts->data(), args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
776 smpi_execute_flops(args.comp_size);
777 TRACE_smpi_comm_out(my_proc_id);
781 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
783 AllToAllVAction() : ReplayAction("allToAllV") {}
784 void kernel(simgrid::xbt::ReplayAction& action) override
786 TRACE_smpi_comm_in(my_proc_id, __func__,
787 new simgrid::instr::VarCollTIData(
788 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
789 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
791 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
792 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
794 TRACE_smpi_comm_out(my_proc_id);
797 } // Replay Namespace
798 }} // namespace simgrid::smpi
800 /** @brief Only initialize the replay, don't do it for real */
801 void smpi_replay_init(int* argc, char*** argv)
803 simgrid::smpi::Process::init(argc, argv);
804 smpi_process()->mark_as_initialized();
805 smpi_process()->set_replaying(true);
807 int my_proc_id = Actor::self()->getPid();
808 TRACE_smpi_init(my_proc_id);
809 TRACE_smpi_computing_init(my_proc_id);
810 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
811 TRACE_smpi_comm_out(my_proc_id);
812 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
813 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
814 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
815 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
816 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
818 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
819 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
820 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
821 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
822 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
823 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
824 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
825 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
826 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
827 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
828 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
829 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
830 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllVAction().execute(action); });
831 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
832 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterAction().execute(action); });
833 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("gatherV").execute(action); });
834 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterVAction().execute(action); });
835 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
836 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("allGatherV").execute(action); });
837 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceScatterAction().execute(action); });
838 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
840 //if we have a delayed start, sleep here.
842 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
843 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
844 smpi_execute_flops(value);
846 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
847 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
848 smpi_execute_flops(0.0);
852 /** @brief actually run the replay after initialization */
853 void smpi_replay_main(int* argc, char*** argv)
855 simgrid::xbt::replay_runner(*argc, *argv);
857 /* and now, finalize everything */
858 /* One active process will stop. Decrease the counter*/
859 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
860 if (not get_reqq_self()->empty()) {
861 unsigned int count_requests=get_reqq_self()->size();
862 MPI_Request requests[count_requests];
863 MPI_Status status[count_requests];
866 for (auto const& req : *get_reqq_self()) {
870 simgrid::smpi::Request::waitall(count_requests, requests, status);
872 delete get_reqq_self();
875 if(active_processes==0){
876 /* Last process alive speaking: end the simulated timer */
877 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
878 smpi_free_replay_tmp_buffers();
881 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
883 smpi_process()->finalize();
885 TRACE_smpi_comm_out(Actor::self()->getPid());
886 TRACE_smpi_finalize(Actor::self()->getPid());
889 /** @brief chain a replay initialization and a replay start */
890 void smpi_replay_run(int* argc, char*** argv)
892 smpi_replay_init(argc, argv);
893 smpi_replay_main(argc, argv);