1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%zu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional)); \
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43 std::string s = boost::algorithm::join(action, " ");
44 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
48 static std::vector<MPI_Request>* get_reqq_self()
50 return reqq.at(Actor::self()->getPid());
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 reqq.insert({Actor::self()->getPid(), mpi_request});
59 static double parse_double(std::string string)
61 return xbt_str_parse_double(string.c_str(), "%s is not a double");
68 class ActionArgParser {
70 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
73 class SendRecvParser : public ActionArgParser {
75 /* communication partner; if we send, this is the receiver and vice versa */
78 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
82 CHECK_ACTION_PARAMS(action, 2, 1)
83 partner = std::stoi(action[2]);
84 size = parse_double(action[3]);
85 if (action.size() > 4)
86 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
90 class ComputeParser : public ActionArgParser {
92 /* communication partner; if we send, this is the receiver and vice versa */
95 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
97 CHECK_ACTION_PARAMS(action, 1, 0)
98 flops = parse_double(action[2]);
102 class CollCommParser : public ActionArgParser {
110 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
111 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
114 class BcastArgParser : public CollCommParser {
116 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
118 CHECK_ACTION_PARAMS(action, 1, 2)
119 size = parse_double(action[2]);
120 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
121 if (action.size() > 4)
122 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
126 class ReduceArgParser : public CollCommParser {
128 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
130 CHECK_ACTION_PARAMS(action, 2, 2)
131 comm_size = parse_double(action[2]);
132 comp_size = parse_double(action[3]);
133 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
134 if (action.size() > 5)
135 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
139 class AllReduceArgParser : public CollCommParser {
141 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
143 CHECK_ACTION_PARAMS(action, 2, 1)
144 comm_size = parse_double(action[2]);
145 comp_size = parse_double(action[3]);
146 if (action.size() > 4)
147 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
151 class AllToAllArgParser : public CollCommParser {
153 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
155 CHECK_ACTION_PARAMS(action, 2, 1)
156 comm_size = MPI_COMM_WORLD->size();
157 send_size = parse_double(action[2]);
158 recv_size = parse_double(action[3]);
160 if (action.size() > 4)
161 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
162 if (action.size() > 5)
163 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
167 class GatherArgParser : public CollCommParser {
169 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
171 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
174 1) 68 is the sendcounts
175 2) 68 is the recvcounts
176 3) 0 is the root node
177 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
178 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
180 CHECK_ACTION_PARAMS(action, 2, 3)
181 comm_size = MPI_COMM_WORLD->size();
182 send_size = parse_double(action[2]);
183 recv_size = parse_double(action[3]);
185 if (name == "gather") {
186 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
187 if (action.size() > 5)
188 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
189 if (action.size() > 6)
190 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
193 if (action.size() > 4)
194 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
195 if (action.size() > 5)
196 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
201 class GatherVArgParser : public CollCommParser {
204 std::shared_ptr<std::vector<int>> recvcounts;
205 std::vector<int> disps;
206 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
208 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
209 0 gather 68 68 10 10 10 0 0 0
211 1) 68 is the sendcount
212 2) 68 10 10 10 is the recvcounts
213 3) 0 is the root node
214 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
215 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217 comm_size = MPI_COMM_WORLD->size();
218 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
219 send_size = parse_double(action[2]);
220 disps = std::vector<int>(comm_size, 0);
221 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
223 if (name == "gatherV") {
224 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
225 if (action.size() > 4 + comm_size)
226 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
227 if (action.size() > 5 + comm_size)
228 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
231 int datatype_index = 0, disp_index = 0;
232 /* The 3 comes from "0 gather <sendcount>", which must always be present.
233 * The + comm_size is the recvcounts array, which must also be present
235 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
236 datatype_index = 3 + comm_size;
237 disp_index = datatype_index + 1;
238 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
239 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
240 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
241 disp_index = 3 + comm_size;
242 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
243 datatype_index = 3 + comm_size;
244 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
245 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
248 if (disp_index != 0) {
249 for (unsigned int i = 0; i < comm_size; i++)
250 disps[i] = std::stoi(action[disp_index + i]);
254 for (unsigned int i = 0; i < comm_size; i++) {
255 (*recvcounts)[i] = std::stoi(action[i + 3]);
257 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
261 class ScatterArgParser : public CollCommParser {
263 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
265 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
268 1) 68 is the sendcounts
269 2) 68 is the recvcounts
270 3) 0 is the root node
271 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
272 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
274 CHECK_ACTION_PARAMS(action, 2, 3)
275 comm_size = MPI_COMM_WORLD->size();
276 send_size = parse_double(action[2]);
277 recv_size = parse_double(action[3]);
278 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
279 if (action.size() > 5)
280 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
281 if (action.size() > 6)
282 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
286 class ScatterVArgParser : public CollCommParser {
290 std::shared_ptr<std::vector<int>> sendcounts;
291 std::vector<int> disps;
292 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
294 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
295 0 gather 68 10 10 10 68 0 0 0
297 1) 68 10 10 10 is the sendcounts
298 2) 68 is the recvcount
299 3) 0 is the root node
300 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
301 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
303 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
304 recv_size = parse_double(action[2 + comm_size]);
305 disps = std::vector<int>(comm_size, 0);
306 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
308 if (action.size() > 5 + comm_size)
309 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
310 if (action.size() > 5 + comm_size)
311 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
313 for (unsigned int i = 0; i < comm_size; i++) {
314 (*sendcounts)[i] = std::stoi(action[i + 2]);
316 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
317 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
321 class ReduceScatterArgParser : public CollCommParser {
324 std::shared_ptr<std::vector<int>> recvcounts;
325 std::vector<int> disps;
326 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
328 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
329 0 reduceScatter 275427 275427 275427 204020 11346849 0
331 1) The first four values after the name of the action declare the recvcounts array
332 2) The value 11346849 is the amount of instructions
333 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
335 comm_size = MPI_COMM_WORLD->size();
336 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
337 comp_size = parse_double(action[2+comm_size]);
338 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
339 if (action.size() > 3 + comm_size)
340 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
342 for (unsigned int i = 0; i < comm_size; i++) {
343 recvcounts->push_back(std::stoi(action[i + 2]));
345 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
349 class AllToAllVArgParser : public CollCommParser {
353 std::shared_ptr<std::vector<int>> recvcounts;
354 std::shared_ptr<std::vector<int>> sendcounts;
355 std::vector<int> senddisps;
356 std::vector<int> recvdisps;
359 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
361 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
362 0 allToAllV 100 1 7 10 12 100 1 70 10 5
364 1) 100 is the size of the send buffer *sizeof(int),
365 2) 1 7 10 12 is the sendcounts array
366 3) 100*sizeof(int) is the size of the receiver buffer
367 4) 1 70 10 5 is the recvcounts array
369 comm_size = MPI_COMM_WORLD->size();
370 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
371 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
372 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
373 senddisps = std::vector<int>(comm_size, 0);
374 recvdisps = std::vector<int>(comm_size, 0);
376 if (action.size() > 5 + 2 * comm_size)
377 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
378 if (action.size() > 5 + 2 * comm_size)
379 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
381 send_buf_size=parse_double(action[2]);
382 recv_buf_size=parse_double(action[3+comm_size]);
383 for (unsigned int i = 0; i < comm_size; i++) {
384 (*sendcounts)[i] = std::stoi(action[3 + i]);
385 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
387 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
388 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
392 template <class T> class ReplayAction {
394 const std::string name;
400 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
402 virtual void execute(simgrid::xbt::ReplayAction& action)
404 // Needs to be re-initialized for every action, hence here
405 double start_time = smpi_process()->simulated_elapsed();
406 args.parse(action, name);
409 log_timed_action(action, start_time);
412 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
414 void* send_buffer(int size)
416 return smpi_get_tmp_sendbuffer(size);
419 void* recv_buffer(int size)
421 return smpi_get_tmp_recvbuffer(size);
425 class WaitAction : public ReplayAction<ActionArgParser> {
427 WaitAction() : ReplayAction("Wait") {}
428 void kernel(simgrid::xbt::ReplayAction& action) override
430 std::string s = boost::algorithm::join(action, " ");
431 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
432 MPI_Request request = get_reqq_self()->back();
433 get_reqq_self()->pop_back();
435 if (request == nullptr) {
436 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
441 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
443 // Must be taken before Request::wait() since the request may be set to
444 // MPI_REQUEST_NULL by Request::wait!
445 int src = request->comm()->group()->rank(request->src());
446 int dst = request->comm()->group()->rank(request->dst());
447 bool is_wait_for_receive = (request->flags() & RECV);
448 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
449 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
452 Request::wait(&request, &status);
454 TRACE_smpi_comm_out(rank);
455 if (is_wait_for_receive)
456 TRACE_smpi_recv(src, dst, 0);
460 class SendAction : public ReplayAction<SendRecvParser> {
462 SendAction() = delete;
463 SendAction(std::string name) : ReplayAction(name) {}
464 void kernel(simgrid::xbt::ReplayAction& action) override
466 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
468 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
469 Datatype::encode(args.datatype1)));
470 if (not TRACE_smpi_view_internals())
471 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
473 if (name == "send") {
474 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
475 } else if (name == "Isend") {
476 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
477 get_reqq_self()->push_back(request);
479 xbt_die("Don't know this action, %s", name.c_str());
482 TRACE_smpi_comm_out(my_proc_id);
486 class RecvAction : public ReplayAction<SendRecvParser> {
488 RecvAction() = delete;
489 explicit RecvAction(std::string name) : ReplayAction(name) {}
490 void kernel(simgrid::xbt::ReplayAction& action) override
492 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
494 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
495 Datatype::encode(args.datatype1)));
498 // unknown size from the receiver point of view
499 if (args.size <= 0.0) {
500 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
501 args.size = status.count;
504 if (name == "recv") {
505 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
506 } else if (name == "Irecv") {
507 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
508 get_reqq_self()->push_back(request);
511 TRACE_smpi_comm_out(my_proc_id);
512 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
513 if (name == "recv" && not TRACE_smpi_view_internals()) {
514 TRACE_smpi_recv(src_traced, my_proc_id, 0);
519 class ComputeAction : public ReplayAction<ComputeParser> {
521 ComputeAction() : ReplayAction("compute") {}
522 void kernel(simgrid::xbt::ReplayAction& action) override
524 TRACE_smpi_computing_in(my_proc_id, args.flops);
525 smpi_execute_flops(args.flops);
526 TRACE_smpi_computing_out(my_proc_id);
530 class TestAction : public ReplayAction<ActionArgParser> {
532 TestAction() : ReplayAction("Test") {}
533 void kernel(simgrid::xbt::ReplayAction& action) override
535 MPI_Request request = get_reqq_self()->back();
536 get_reqq_self()->pop_back();
537 // if request is null here, this may mean that a previous test has succeeded
538 // Different times in traced application and replayed version may lead to this
539 // In this case, ignore the extra calls.
540 if (request != nullptr) {
541 TRACE_smpi_testing_in(my_proc_id);
544 int flag = Request::test(&request, &status);
546 XBT_DEBUG("MPI_Test result: %d", flag);
547 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
549 get_reqq_self()->push_back(request);
551 TRACE_smpi_testing_out(my_proc_id);
556 class InitAction : public ReplayAction<ActionArgParser> {
558 InitAction() : ReplayAction("Init") {}
559 void kernel(simgrid::xbt::ReplayAction& action) override
561 CHECK_ACTION_PARAMS(action, 0, 1)
562 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
563 : MPI_BYTE; // default TAU datatype
565 /* start a simulated timer */
566 smpi_process()->simulated_start();
567 /*initialize the number of active processes */
568 active_processes = smpi_process_count();
570 set_reqq_self(new std::vector<MPI_Request>);
574 class CommunicatorAction : public ReplayAction<ActionArgParser> {
576 CommunicatorAction() : ReplayAction("Comm") {}
577 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
580 class WaitAllAction : public ReplayAction<ActionArgParser> {
582 WaitAllAction() : ReplayAction("waitAll") {}
583 void kernel(simgrid::xbt::ReplayAction& action) override
585 const unsigned int count_requests = get_reqq_self()->size();
587 if (count_requests > 0) {
588 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
589 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
590 for (const auto& req : (*get_reqq_self())) {
591 if (req && (req->flags() & RECV)) {
592 sender_receiver.push_back({req->src(), req->dst()});
595 MPI_Status status[count_requests];
596 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
598 for (auto& pair : sender_receiver) {
599 TRACE_smpi_recv(pair.first, pair.second, 0);
601 TRACE_smpi_comm_out(my_proc_id);
606 class BarrierAction : public ReplayAction<ActionArgParser> {
608 BarrierAction() : ReplayAction("barrier") {}
609 void kernel(simgrid::xbt::ReplayAction& action) override
611 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
612 Colls::barrier(MPI_COMM_WORLD);
613 TRACE_smpi_comm_out(my_proc_id);
617 class BcastAction : public ReplayAction<BcastArgParser> {
619 BcastAction() : ReplayAction("bcast") {}
620 void kernel(simgrid::xbt::ReplayAction& action) override
622 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
623 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
624 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
626 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
628 TRACE_smpi_comm_out(my_proc_id);
632 class ReduceAction : public ReplayAction<ReduceArgParser> {
634 ReduceAction() : ReplayAction("reduce") {}
635 void kernel(simgrid::xbt::ReplayAction& action) override
637 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
638 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
639 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
641 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
642 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
643 smpi_execute_flops(args.comp_size);
645 TRACE_smpi_comm_out(my_proc_id);
649 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
651 AllReduceAction() : ReplayAction("allReduce") {}
652 void kernel(simgrid::xbt::ReplayAction& action) override
654 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
655 Datatype::encode(args.datatype1), ""));
657 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
658 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
659 smpi_execute_flops(args.comp_size);
661 TRACE_smpi_comm_out(my_proc_id);
665 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
667 AllToAllAction() : ReplayAction("allToAll") {}
668 void kernel(simgrid::xbt::ReplayAction& action) override
670 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
671 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
672 Datatype::encode(args.datatype1),
673 Datatype::encode(args.datatype2)));
675 Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()),
676 args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
677 args.recv_size, args.datatype2, MPI_COMM_WORLD);
679 TRACE_smpi_comm_out(my_proc_id);
683 class GatherAction : public ReplayAction<GatherArgParser> {
685 GatherAction(std::string name) : ReplayAction(name) {}
686 void kernel(simgrid::xbt::ReplayAction& action) override
688 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
689 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
691 if (name == "gather") {
692 int rank = MPI_COMM_WORLD->rank();
693 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
694 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
697 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
698 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
700 TRACE_smpi_comm_out(my_proc_id);
704 class GatherVAction : public ReplayAction<GatherVArgParser> {
706 GatherVAction(std::string name) : ReplayAction(name) {}
707 void kernel(simgrid::xbt::ReplayAction& action) override
709 int rank = MPI_COMM_WORLD->rank();
711 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
712 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
713 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
715 if (name == "gatherV") {
716 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
717 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
721 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
722 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
726 TRACE_smpi_comm_out(my_proc_id);
730 class ScatterAction : public ReplayAction<ScatterArgParser> {
732 ScatterAction() : ReplayAction("scatter") {}
733 void kernel(simgrid::xbt::ReplayAction& action) override
735 int rank = MPI_COMM_WORLD->rank();
736 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
737 Datatype::encode(args.datatype1),
738 Datatype::encode(args.datatype2)));
740 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
741 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
743 TRACE_smpi_comm_out(my_proc_id);
748 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
750 ScatterVAction() : ReplayAction("scatterV") {}
751 void kernel(simgrid::xbt::ReplayAction& action) override
753 int rank = MPI_COMM_WORLD->rank();
754 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
755 nullptr, Datatype::encode(args.datatype1),
756 Datatype::encode(args.datatype2)));
758 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr, args.sendcounts->data(), args.disps.data(),
759 args.datatype1, recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
762 TRACE_smpi_comm_out(my_proc_id);
766 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
768 ReduceScatterAction() : ReplayAction("reduceScatter") {}
769 void kernel(simgrid::xbt::ReplayAction& action) override
771 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
772 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
773 std::to_string(args.comp_size), /* ugly hack to print comp_size */
774 Datatype::encode(args.datatype1)));
776 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()), recv_buffer(args.recv_size_sum * args.datatype1->size()),
777 args.recvcounts->data(), args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
779 smpi_execute_flops(args.comp_size);
780 TRACE_smpi_comm_out(my_proc_id);
784 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
786 AllToAllVAction() : ReplayAction("allToAllV") {}
787 void kernel(simgrid::xbt::ReplayAction& action) override
789 TRACE_smpi_comm_in(my_proc_id, __func__,
790 new simgrid::instr::VarCollTIData(
791 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
792 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
794 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
795 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
797 TRACE_smpi_comm_out(my_proc_id);
800 } // Replay Namespace
801 }} // namespace simgrid::smpi
803 /** @brief Only initialize the replay, don't do it for real */
804 void smpi_replay_init(int* argc, char*** argv)
806 simgrid::smpi::Process::init(argc, argv);
807 smpi_process()->mark_as_initialized();
808 smpi_process()->set_replaying(true);
810 int my_proc_id = Actor::self()->getPid();
811 TRACE_smpi_init(my_proc_id);
812 TRACE_smpi_computing_init(my_proc_id);
813 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
814 TRACE_smpi_comm_out(my_proc_id);
815 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
816 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
817 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
818 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
819 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
821 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
822 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
823 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
824 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
825 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
826 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
827 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
828 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
829 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
830 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
831 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
832 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
833 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllVAction().execute(action); });
834 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
835 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterAction().execute(action); });
836 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("gatherV").execute(action); });
837 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterVAction().execute(action); });
838 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
839 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("allGatherV").execute(action); });
840 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceScatterAction().execute(action); });
841 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
843 //if we have a delayed start, sleep here.
845 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
846 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
847 smpi_execute_flops(value);
849 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
850 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
851 smpi_execute_flops(0.0);
855 /** @brief actually run the replay after initialization */
856 void smpi_replay_main(int* argc, char*** argv)
858 simgrid::xbt::replay_runner(*argc, *argv);
860 /* and now, finalize everything */
861 /* One active process will stop. Decrease the counter*/
862 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
863 if (not get_reqq_self()->empty()) {
864 unsigned int count_requests=get_reqq_self()->size();
865 MPI_Request requests[count_requests];
866 MPI_Status status[count_requests];
869 for (auto const& req : *get_reqq_self()) {
873 simgrid::smpi::Request::waitall(count_requests, requests, status);
875 delete get_reqq_self();
878 if(active_processes==0){
879 /* Last process alive speaking: end the simulated timer */
880 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
881 smpi_free_replay_tmp_buffers();
884 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
886 smpi_process()->finalize();
888 TRACE_smpi_comm_out(Actor::self()->getPid());
889 TRACE_smpi_finalize(Actor::self()->getPid());
892 /** @brief chain a replay initialization and a replay start */
893 void smpi_replay_run(int* argc, char*** argv)
895 smpi_replay_init(argc, argv);
896 smpi_replay_main(argc, argv);