1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 class CollCommParser : public ActionArgParser {
111 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
115 class BcastArgParser : public CollCommParser {
117 void parse(simgrid::xbt::ReplayAction& action) override
119 CHECK_ACTION_PARAMS(action, 1, 2)
120 size = parse_double(action[2]);
121 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122 if (action.size() > 4)
123 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
127 class ReduceArgParser : public CollCommParser {
129 void parse(simgrid::xbt::ReplayAction& action) override
131 CHECK_ACTION_PARAMS(action, 2, 2)
132 comm_size = parse_double(action[2]);
133 comp_size = parse_double(action[3]);
134 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
135 if (action.size() > 5)
136 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
140 class AllReduceArgParser : public CollCommParser {
142 void parse(simgrid::xbt::ReplayAction& action) override
144 CHECK_ACTION_PARAMS(action, 2, 1)
145 comm_size = parse_double(action[2]);
146 comp_size = parse_double(action[3]);
147 if (action.size() > 4)
148 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
152 class AllToAllArgParser : public CollCommParser {
154 void parse(simgrid::xbt::ReplayAction& action) override
156 CHECK_ACTION_PARAMS(action, 2, 1)
157 comm_size = MPI_COMM_WORLD->size();
158 send_size = parse_double(action[2]);
159 recv_size = parse_double(action[3]);
161 if (action.size() > 4)
162 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163 if (action.size() > 5)
164 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
168 template <class T> class ReplayAction {
170 const std::string name;
176 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
178 virtual void execute(simgrid::xbt::ReplayAction& action)
180 // Needs to be re-initialized for every action, hence here
181 double start_time = smpi_process()->simulated_elapsed();
185 log_timed_action(action, start_time);
188 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
190 void* send_buffer(int size)
192 return smpi_get_tmp_sendbuffer(size);
195 void* recv_buffer(int size)
197 return smpi_get_tmp_recvbuffer(size);
201 class WaitAction : public ReplayAction<ActionArgParser> {
203 WaitAction() : ReplayAction("Wait") {}
204 void kernel(simgrid::xbt::ReplayAction& action) override
206 std::string s = boost::algorithm::join(action, " ");
207 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
208 MPI_Request request = get_reqq_self()->back();
209 get_reqq_self()->pop_back();
211 if (request == nullptr) {
212 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
217 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
219 // Must be taken before Request::wait() since the request may be set to
220 // MPI_REQUEST_NULL by Request::wait!
221 int src = request->comm()->group()->rank(request->src());
222 int dst = request->comm()->group()->rank(request->dst());
223 bool is_wait_for_receive = (request->flags() & RECV);
224 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
225 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
228 Request::wait(&request, &status);
230 TRACE_smpi_comm_out(rank);
231 if (is_wait_for_receive)
232 TRACE_smpi_recv(src, dst, 0);
236 class SendAction : public ReplayAction<SendRecvParser> {
238 SendAction() = delete;
239 SendAction(std::string name) : ReplayAction(name) {}
240 void kernel(simgrid::xbt::ReplayAction& action) override
242 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
244 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
245 Datatype::encode(args.datatype1)));
246 if (not TRACE_smpi_view_internals())
247 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
249 if (name == "send") {
250 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
251 } else if (name == "Isend") {
252 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
253 get_reqq_self()->push_back(request);
255 xbt_die("Don't know this action, %s", name.c_str());
258 TRACE_smpi_comm_out(my_proc_id);
262 class RecvAction : public ReplayAction<SendRecvParser> {
264 RecvAction() = delete;
265 explicit RecvAction(std::string name) : ReplayAction(name) {}
266 void kernel(simgrid::xbt::ReplayAction& action) override
268 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
270 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
271 Datatype::encode(args.datatype1)));
274 // unknown size from the receiver point of view
275 if (args.size <= 0.0) {
276 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
277 args.size = status.count;
280 if (name == "recv") {
281 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
282 } else if (name == "Irecv") {
283 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
284 get_reqq_self()->push_back(request);
287 TRACE_smpi_comm_out(my_proc_id);
288 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
289 if (name == "recv" && not TRACE_smpi_view_internals()) {
290 TRACE_smpi_recv(src_traced, my_proc_id, 0);
295 class ComputeAction : public ReplayAction<ComputeParser> {
297 ComputeAction() : ReplayAction("compute") {}
298 void kernel(simgrid::xbt::ReplayAction& action) override
300 TRACE_smpi_computing_in(my_proc_id, args.flops);
301 smpi_execute_flops(args.flops);
302 TRACE_smpi_computing_out(my_proc_id);
306 class TestAction : public ReplayAction<ActionArgParser> {
308 TestAction() : ReplayAction("Test") {}
309 void kernel(simgrid::xbt::ReplayAction& action) override
311 MPI_Request request = get_reqq_self()->back();
312 get_reqq_self()->pop_back();
313 // if request is null here, this may mean that a previous test has succeeded
314 // Different times in traced application and replayed version may lead to this
315 // In this case, ignore the extra calls.
316 if (request != nullptr) {
317 TRACE_smpi_testing_in(my_proc_id);
320 int flag = Request::test(&request, &status);
322 XBT_DEBUG("MPI_Test result: %d", flag);
323 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
325 get_reqq_self()->push_back(request);
327 TRACE_smpi_testing_out(my_proc_id);
332 class InitAction : public ReplayAction<ActionArgParser> {
334 InitAction() : ReplayAction("Init") {}
335 void kernel(simgrid::xbt::ReplayAction& action) override
337 CHECK_ACTION_PARAMS(action, 0, 1)
338 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
339 : MPI_BYTE; // default TAU datatype
341 /* start a simulated timer */
342 smpi_process()->simulated_start();
343 /*initialize the number of active processes */
344 active_processes = smpi_process_count();
346 set_reqq_self(new std::vector<MPI_Request>);
350 class CommunicatorAction : public ReplayAction<ActionArgParser> {
352 CommunicatorAction() : ReplayAction("Comm") {}
353 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
356 class WaitAllAction : public ReplayAction<ActionArgParser> {
358 WaitAllAction() : ReplayAction("waitAll") {}
359 void kernel(simgrid::xbt::ReplayAction& action) override
361 const unsigned int count_requests = get_reqq_self()->size();
363 if (count_requests > 0) {
364 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
365 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
366 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
367 for (const auto& req : (*get_reqq_self())) {
368 if (req && (req->flags() & RECV)) {
369 sender_receiver.push_back({req->src(), req->dst()});
372 MPI_Status status[count_requests];
373 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
375 for (auto& pair : sender_receiver) {
376 TRACE_smpi_recv(pair.first, pair.second, 0);
378 TRACE_smpi_comm_out(my_proc_id);
383 class BarrierAction : public ReplayAction<ActionArgParser> {
385 BarrierAction() : ReplayAction("barrier") {}
386 void kernel(simgrid::xbt::ReplayAction& action) override
388 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
389 Colls::barrier(MPI_COMM_WORLD);
390 TRACE_smpi_comm_out(my_proc_id);
394 class BcastAction : public ReplayAction<BcastArgParser> {
396 BcastAction() : ReplayAction("bcast") {}
397 void kernel(simgrid::xbt::ReplayAction& action) override
399 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
400 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
401 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
403 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
405 TRACE_smpi_comm_out(my_proc_id);
409 class ReduceAction : public ReplayAction<ReduceArgParser> {
411 ReduceAction() : ReplayAction("reduce") {}
412 void kernel(simgrid::xbt::ReplayAction& action) override
414 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
415 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
416 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
418 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
419 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
420 smpi_execute_flops(args.comp_size);
422 TRACE_smpi_comm_out(my_proc_id);
426 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
428 AllReduceAction() : ReplayAction("allReduce") {}
429 void kernel(simgrid::xbt::ReplayAction& action) override
431 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
432 Datatype::encode(args.datatype1), ""));
434 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
435 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
436 smpi_execute_flops(args.comp_size);
438 TRACE_smpi_comm_out(my_proc_id);
442 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
444 AllToAllAction() : ReplayAction("allToAll") {}
445 void kernel(simgrid::xbt::ReplayAction& action) override
447 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
448 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
449 Datatype::encode(args.datatype1),
450 Datatype::encode(args.datatype2)));
452 Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()),
453 args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
454 args.recv_size, args.datatype2, MPI_COMM_WORLD);
456 TRACE_smpi_comm_out(my_proc_id);
459 } // Replay Namespace
461 static void action_gather(simgrid::xbt::ReplayAction& action)
463 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
466 1) 68 is the sendcounts
467 2) 68 is the recvcounts
468 3) 0 is the root node
469 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
470 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
472 CHECK_ACTION_PARAMS(action, 2, 3)
473 double clock = smpi_process()->simulated_elapsed();
474 unsigned long comm_size = MPI_COMM_WORLD->size();
475 int send_size = parse_double(action[2]);
476 int recv_size = parse_double(action[3]);
477 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
478 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
480 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
481 void *recv = nullptr;
482 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
483 int rank = MPI_COMM_WORLD->rank();
486 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
488 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
489 Datatype::encode(MPI_CURRENT_TYPE),
490 Datatype::encode(MPI_CURRENT_TYPE2)));
492 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
494 TRACE_smpi_comm_out(Actor::self()->getPid());
495 log_timed_action (action, clock);
498 static void action_scatter(simgrid::xbt::ReplayAction& action)
500 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
503 1) 68 is the sendcounts
504 2) 68 is the recvcounts
505 3) 0 is the root node
506 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
507 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
509 CHECK_ACTION_PARAMS(action, 2, 3)
510 double clock = smpi_process()->simulated_elapsed();
511 unsigned long comm_size = MPI_COMM_WORLD->size();
512 int send_size = parse_double(action[2]);
513 int recv_size = parse_double(action[3]);
514 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
515 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
517 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
518 void* recv = nullptr;
519 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
520 int rank = MPI_COMM_WORLD->rank();
523 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
525 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
526 Datatype::encode(MPI_CURRENT_TYPE),
527 Datatype::encode(MPI_CURRENT_TYPE2)));
529 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
531 TRACE_smpi_comm_out(Actor::self()->getPid());
532 log_timed_action(action, clock);
535 static void action_gatherv(simgrid::xbt::ReplayAction& action)
537 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
538 0 gather 68 68 10 10 10 0 0 0
540 1) 68 is the sendcount
541 2) 68 10 10 10 is the recvcounts
542 3) 0 is the root node
543 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
544 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
546 double clock = smpi_process()->simulated_elapsed();
547 unsigned long comm_size = MPI_COMM_WORLD->size();
548 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
549 int send_size = parse_double(action[2]);
550 std::vector<int> disps(comm_size, 0);
551 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
553 MPI_Datatype MPI_CURRENT_TYPE =
554 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
555 MPI_Datatype MPI_CURRENT_TYPE2{
556 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
558 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
559 void *recv = nullptr;
560 for (unsigned int i = 0; i < comm_size; i++) {
561 (*recvcounts)[i] = std::stoi(action[i + 3]);
563 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
565 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
566 int rank = MPI_COMM_WORLD->rank();
569 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
571 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
572 "gatherV", root, send_size, nullptr, -1, recvcounts,
573 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
575 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
578 TRACE_smpi_comm_out(Actor::self()->getPid());
579 log_timed_action (action, clock);
582 static void action_scatterv(simgrid::xbt::ReplayAction& action)
584 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
585 0 gather 68 10 10 10 68 0 0 0
587 1) 68 10 10 10 is the sendcounts
588 2) 68 is the recvcount
589 3) 0 is the root node
590 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
591 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
593 double clock = smpi_process()->simulated_elapsed();
594 unsigned long comm_size = MPI_COMM_WORLD->size();
595 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
596 int recv_size = parse_double(action[2 + comm_size]);
597 std::vector<int> disps(comm_size, 0);
598 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
600 MPI_Datatype MPI_CURRENT_TYPE =
601 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
602 MPI_Datatype MPI_CURRENT_TYPE2{
603 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
605 void* send = nullptr;
606 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
607 for (unsigned int i = 0; i < comm_size; i++) {
608 (*sendcounts)[i] = std::stoi(action[i + 2]);
610 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
612 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
613 int rank = MPI_COMM_WORLD->rank();
616 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
618 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
619 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
620 Datatype::encode(MPI_CURRENT_TYPE2)));
622 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
625 TRACE_smpi_comm_out(Actor::self()->getPid());
626 log_timed_action(action, clock);
629 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
631 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
632 0 reduceScatter 275427 275427 275427 204020 11346849 0
634 1) The first four values after the name of the action declare the recvcounts array
635 2) The value 11346849 is the amount of instructions
636 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
638 double clock = smpi_process()->simulated_elapsed();
639 unsigned long comm_size = MPI_COMM_WORLD->size();
640 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
641 int comp_size = parse_double(action[2+comm_size]);
642 int my_proc_id = Actor::self()->getPid();
643 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
644 MPI_Datatype MPI_CURRENT_TYPE =
645 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
647 for (unsigned int i = 0; i < comm_size; i++) {
648 recvcounts->push_back(std::stoi(action[i + 2]));
650 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
652 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
653 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
654 std::to_string(comp_size), /* ugly hack to print comp_size */
655 Datatype::encode(MPI_CURRENT_TYPE)));
657 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
658 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
660 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
661 smpi_execute_flops(comp_size);
663 TRACE_smpi_comm_out(my_proc_id);
664 log_timed_action (action, clock);
667 static void action_allgather(simgrid::xbt::ReplayAction& action)
669 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
670 0 allGather 275427 275427
672 1) 275427 is the sendcount
673 2) 275427 is the recvcount
674 3) No more values mean that the datatype for sent and receive buffer is the default one, see
675 simgrid::smpi::Datatype::decode().
677 double clock = smpi_process()->simulated_elapsed();
679 CHECK_ACTION_PARAMS(action, 2, 2)
680 int sendcount = std::stoi(action[2]);
681 int recvcount = std::stoi(action[3]);
683 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
684 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
686 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
687 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
689 int my_proc_id = Actor::self()->getPid();
691 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
692 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
693 Datatype::encode(MPI_CURRENT_TYPE),
694 Datatype::encode(MPI_CURRENT_TYPE2)));
696 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
698 TRACE_smpi_comm_out(my_proc_id);
699 log_timed_action (action, clock);
702 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
704 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
705 0 allGatherV 275427 275427 275427 275427 204020
707 1) 275427 is the sendcount
708 2) The next four elements declare the recvcounts array
709 3) No more values mean that the datatype for sent and receive buffer is the default one, see
710 simgrid::smpi::Datatype::decode().
712 double clock = smpi_process()->simulated_elapsed();
714 unsigned long comm_size = MPI_COMM_WORLD->size();
715 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
716 int sendcount = std::stoi(action[2]);
717 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
718 std::vector<int> disps(comm_size, 0);
720 int datatype_index = 0, disp_index = 0;
721 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
722 datatype_index = 3 + comm_size;
723 disp_index = datatype_index + 1;
724 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
726 disp_index = 3 + comm_size;
727 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
728 datatype_index = 3 + comm_size;
731 if (disp_index != 0) {
732 for (unsigned int i = 0; i < comm_size; i++)
733 disps[i] = std::stoi(action[disp_index + i]);
736 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
738 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
741 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
743 for (unsigned int i = 0; i < comm_size; i++) {
744 (*recvcounts)[i] = std::stoi(action[i + 3]);
746 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
747 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
749 int my_proc_id = Actor::self()->getPid();
751 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
752 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
753 Datatype::encode(MPI_CURRENT_TYPE),
754 Datatype::encode(MPI_CURRENT_TYPE2)));
756 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
759 TRACE_smpi_comm_out(my_proc_id);
760 log_timed_action (action, clock);
763 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
765 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
766 0 allToAllV 100 1 7 10 12 100 1 70 10 5
768 1) 100 is the size of the send buffer *sizeof(int),
769 2) 1 7 10 12 is the sendcounts array
770 3) 100*sizeof(int) is the size of the receiver buffer
771 4) 1 70 10 5 is the recvcounts array
773 double clock = smpi_process()->simulated_elapsed();
775 unsigned long comm_size = MPI_COMM_WORLD->size();
776 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
777 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
778 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
779 std::vector<int> senddisps(comm_size, 0);
780 std::vector<int> recvdisps(comm_size, 0);
782 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
783 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
785 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
786 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
789 int send_buf_size=parse_double(action[2]);
790 int recv_buf_size=parse_double(action[3+comm_size]);
791 int my_proc_id = Actor::self()->getPid();
792 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
793 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
795 for (unsigned int i = 0; i < comm_size; i++) {
796 (*sendcounts)[i] = std::stoi(action[3 + i]);
797 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
799 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
800 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
802 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
803 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
804 Datatype::encode(MPI_CURRENT_TYPE),
805 Datatype::encode(MPI_CURRENT_TYPE2)));
807 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
808 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
810 TRACE_smpi_comm_out(my_proc_id);
811 log_timed_action (action, clock);
814 }} // namespace simgrid::smpi
816 /** @brief Only initialize the replay, don't do it for real */
817 void smpi_replay_init(int* argc, char*** argv)
819 simgrid::smpi::Process::init(argc, argv);
820 smpi_process()->mark_as_initialized();
821 smpi_process()->set_replaying(true);
823 int my_proc_id = Actor::self()->getPid();
824 TRACE_smpi_init(my_proc_id);
825 TRACE_smpi_computing_init(my_proc_id);
826 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
827 TRACE_smpi_comm_out(my_proc_id);
828 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
829 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
830 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
831 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
832 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
834 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
835 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
836 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
837 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
838 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
839 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
840 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
841 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
842 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
843 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
844 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
845 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
846 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
847 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
848 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
849 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
850 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
851 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
852 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
853 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
854 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
856 //if we have a delayed start, sleep here.
858 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
859 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
860 smpi_execute_flops(value);
862 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
863 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
864 smpi_execute_flops(0.0);
868 /** @brief actually run the replay after initialization */
869 void smpi_replay_main(int* argc, char*** argv)
871 simgrid::xbt::replay_runner(*argc, *argv);
873 /* and now, finalize everything */
874 /* One active process will stop. Decrease the counter*/
875 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
876 if (not get_reqq_self()->empty()) {
877 unsigned int count_requests=get_reqq_self()->size();
878 MPI_Request requests[count_requests];
879 MPI_Status status[count_requests];
882 for (auto const& req : *get_reqq_self()) {
886 simgrid::smpi::Request::waitall(count_requests, requests, status);
888 delete get_reqq_self();
891 if(active_processes==0){
892 /* Last process alive speaking: end the simulated timer */
893 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
894 smpi_free_replay_tmp_buffers();
897 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
899 smpi_process()->finalize();
901 TRACE_smpi_comm_out(Actor::self()->getPid());
902 TRACE_smpi_finalize(Actor::self()->getPid());
905 /** @brief chain a replay initialization and a replay start */
906 void smpi_replay_run(int* argc, char*** argv)
908 smpi_replay_init(argc, argv);
909 smpi_replay_main(argc, argv);