1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 class CollCommParser : public ActionArgParser {
111 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
115 class BcastArgParser : public CollCommParser {
117 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
119 CHECK_ACTION_PARAMS(action, 1, 2)
120 size = parse_double(action[2]);
121 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122 if (action.size() > 4)
123 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
127 class ReduceArgParser : public CollCommParser {
129 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
131 CHECK_ACTION_PARAMS(action, 2, 2)
132 comm_size = parse_double(action[2]);
133 comp_size = parse_double(action[3]);
134 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
135 if (action.size() > 5)
136 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
140 class AllReduceArgParser : public CollCommParser {
142 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
144 CHECK_ACTION_PARAMS(action, 2, 1)
145 comm_size = parse_double(action[2]);
146 comp_size = parse_double(action[3]);
147 if (action.size() > 4)
148 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
152 class AllToAllArgParser : public CollCommParser {
154 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
156 CHECK_ACTION_PARAMS(action, 2, 1)
157 comm_size = MPI_COMM_WORLD->size();
158 send_size = parse_double(action[2]);
159 recv_size = parse_double(action[3]);
161 if (action.size() > 4)
162 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163 if (action.size() > 5)
164 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
168 class GatherArgParser : public CollCommParser {
170 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
172 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
175 1) 68 is the sendcounts
176 2) 68 is the recvcounts
177 3) 0 is the root node
178 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
181 CHECK_ACTION_PARAMS(action, 2, 3)
182 comm_size = MPI_COMM_WORLD->size();
183 send_size = parse_double(action[2]);
184 recv_size = parse_double(action[3]);
186 if (name == "gather") {
187 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
188 if (action.size() > 5)
189 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190 if (action.size() > 6)
191 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
194 if (action.size() > 4)
195 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 if (action.size() > 5)
197 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
202 class GatherVArgParser : public CollCommParser {
205 std::shared_ptr<std::vector<int>> recvcounts;
206 std::vector<int> disps;
207 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
209 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210 0 gather 68 68 10 10 10 0 0 0
212 1) 68 is the sendcount
213 2) 68 10 10 10 is the recvcounts
214 3) 0 is the root node
215 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
218 comm_size = MPI_COMM_WORLD->size();
219 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220 send_size = parse_double(action[2]);
221 disps = std::vector<int>(comm_size, 0);
222 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
224 if (name == "gatherV") {
225 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226 if (action.size() > 4 + comm_size)
227 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228 if (action.size() > 5 + comm_size)
229 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
232 int datatype_index = 0, disp_index = 0;
233 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
234 datatype_index = 3 + comm_size;
235 disp_index = datatype_index + 1;
236 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
238 disp_index = 3 + comm_size;
239 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
240 datatype_index = 3 + comm_size;
243 if (disp_index != 0) {
244 for (unsigned int i = 0; i < comm_size; i++)
245 disps[i] = std::stoi(action[disp_index + i]);
248 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
249 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
252 for (unsigned int i = 0; i < comm_size; i++) {
253 (*recvcounts)[i] = std::stoi(action[i + 3]);
255 recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
259 class ScatterArgParser : public CollCommParser {
261 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
263 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
266 1) 68 is the sendcounts
267 2) 68 is the recvcounts
268 3) 0 is the root node
269 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
270 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
272 CHECK_ACTION_PARAMS(action, 2, 3)
273 comm_size = MPI_COMM_WORLD->size();
274 send_size = parse_double(action[2]);
275 recv_size = parse_double(action[3]);
276 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
277 if (action.size() > 5)
278 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
279 if (action.size() > 6)
280 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
284 template <class T> class ReplayAction {
286 const std::string name;
292 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
294 virtual void execute(simgrid::xbt::ReplayAction& action)
296 // Needs to be re-initialized for every action, hence here
297 double start_time = smpi_process()->simulated_elapsed();
298 args.parse(action, name);
301 log_timed_action(action, start_time);
304 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
306 void* send_buffer(int size)
308 return smpi_get_tmp_sendbuffer(size);
311 void* recv_buffer(int size)
313 return smpi_get_tmp_recvbuffer(size);
317 class WaitAction : public ReplayAction<ActionArgParser> {
319 WaitAction() : ReplayAction("Wait") {}
320 void kernel(simgrid::xbt::ReplayAction& action) override
322 std::string s = boost::algorithm::join(action, " ");
323 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
324 MPI_Request request = get_reqq_self()->back();
325 get_reqq_self()->pop_back();
327 if (request == nullptr) {
328 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
333 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
335 // Must be taken before Request::wait() since the request may be set to
336 // MPI_REQUEST_NULL by Request::wait!
337 int src = request->comm()->group()->rank(request->src());
338 int dst = request->comm()->group()->rank(request->dst());
339 bool is_wait_for_receive = (request->flags() & RECV);
340 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
341 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
344 Request::wait(&request, &status);
346 TRACE_smpi_comm_out(rank);
347 if (is_wait_for_receive)
348 TRACE_smpi_recv(src, dst, 0);
352 class SendAction : public ReplayAction<SendRecvParser> {
354 SendAction() = delete;
355 SendAction(std::string name) : ReplayAction(name) {}
356 void kernel(simgrid::xbt::ReplayAction& action) override
358 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
360 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
361 Datatype::encode(args.datatype1)));
362 if (not TRACE_smpi_view_internals())
363 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
365 if (name == "send") {
366 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
367 } else if (name == "Isend") {
368 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
369 get_reqq_self()->push_back(request);
371 xbt_die("Don't know this action, %s", name.c_str());
374 TRACE_smpi_comm_out(my_proc_id);
378 class RecvAction : public ReplayAction<SendRecvParser> {
380 RecvAction() = delete;
381 explicit RecvAction(std::string name) : ReplayAction(name) {}
382 void kernel(simgrid::xbt::ReplayAction& action) override
384 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
386 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
387 Datatype::encode(args.datatype1)));
390 // unknown size from the receiver point of view
391 if (args.size <= 0.0) {
392 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
393 args.size = status.count;
396 if (name == "recv") {
397 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
398 } else if (name == "Irecv") {
399 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
400 get_reqq_self()->push_back(request);
403 TRACE_smpi_comm_out(my_proc_id);
404 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
405 if (name == "recv" && not TRACE_smpi_view_internals()) {
406 TRACE_smpi_recv(src_traced, my_proc_id, 0);
411 class ComputeAction : public ReplayAction<ComputeParser> {
413 ComputeAction() : ReplayAction("compute") {}
414 void kernel(simgrid::xbt::ReplayAction& action) override
416 TRACE_smpi_computing_in(my_proc_id, args.flops);
417 smpi_execute_flops(args.flops);
418 TRACE_smpi_computing_out(my_proc_id);
422 class TestAction : public ReplayAction<ActionArgParser> {
424 TestAction() : ReplayAction("Test") {}
425 void kernel(simgrid::xbt::ReplayAction& action) override
427 MPI_Request request = get_reqq_self()->back();
428 get_reqq_self()->pop_back();
429 // if request is null here, this may mean that a previous test has succeeded
430 // Different times in traced application and replayed version may lead to this
431 // In this case, ignore the extra calls.
432 if (request != nullptr) {
433 TRACE_smpi_testing_in(my_proc_id);
436 int flag = Request::test(&request, &status);
438 XBT_DEBUG("MPI_Test result: %d", flag);
439 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
441 get_reqq_self()->push_back(request);
443 TRACE_smpi_testing_out(my_proc_id);
448 class InitAction : public ReplayAction<ActionArgParser> {
450 InitAction() : ReplayAction("Init") {}
451 void kernel(simgrid::xbt::ReplayAction& action) override
453 CHECK_ACTION_PARAMS(action, 0, 1)
454 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
455 : MPI_BYTE; // default TAU datatype
457 /* start a simulated timer */
458 smpi_process()->simulated_start();
459 /*initialize the number of active processes */
460 active_processes = smpi_process_count();
462 set_reqq_self(new std::vector<MPI_Request>);
466 class CommunicatorAction : public ReplayAction<ActionArgParser> {
468 CommunicatorAction() : ReplayAction("Comm") {}
469 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
472 class WaitAllAction : public ReplayAction<ActionArgParser> {
474 WaitAllAction() : ReplayAction("waitAll") {}
475 void kernel(simgrid::xbt::ReplayAction& action) override
477 const unsigned int count_requests = get_reqq_self()->size();
479 if (count_requests > 0) {
480 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
481 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
482 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
483 for (const auto& req : (*get_reqq_self())) {
484 if (req && (req->flags() & RECV)) {
485 sender_receiver.push_back({req->src(), req->dst()});
488 MPI_Status status[count_requests];
489 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
491 for (auto& pair : sender_receiver) {
492 TRACE_smpi_recv(pair.first, pair.second, 0);
494 TRACE_smpi_comm_out(my_proc_id);
499 class BarrierAction : public ReplayAction<ActionArgParser> {
501 BarrierAction() : ReplayAction("barrier") {}
502 void kernel(simgrid::xbt::ReplayAction& action) override
504 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
505 Colls::barrier(MPI_COMM_WORLD);
506 TRACE_smpi_comm_out(my_proc_id);
510 class BcastAction : public ReplayAction<BcastArgParser> {
512 BcastAction() : ReplayAction("bcast") {}
513 void kernel(simgrid::xbt::ReplayAction& action) override
515 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
516 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
517 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
519 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
521 TRACE_smpi_comm_out(my_proc_id);
525 class ReduceAction : public ReplayAction<ReduceArgParser> {
527 ReduceAction() : ReplayAction("reduce") {}
528 void kernel(simgrid::xbt::ReplayAction& action) override
530 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
531 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
532 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
534 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
535 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
536 smpi_execute_flops(args.comp_size);
538 TRACE_smpi_comm_out(my_proc_id);
542 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
544 AllReduceAction() : ReplayAction("allReduce") {}
545 void kernel(simgrid::xbt::ReplayAction& action) override
547 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
548 Datatype::encode(args.datatype1), ""));
550 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
551 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
552 smpi_execute_flops(args.comp_size);
554 TRACE_smpi_comm_out(my_proc_id);
558 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
560 AllToAllAction() : ReplayAction("allToAll") {}
561 void kernel(simgrid::xbt::ReplayAction& action) override
563 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
564 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
565 Datatype::encode(args.datatype1),
566 Datatype::encode(args.datatype2)));
568 Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()),
569 args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
570 args.recv_size, args.datatype2, MPI_COMM_WORLD);
572 TRACE_smpi_comm_out(my_proc_id);
576 class GatherAction : public ReplayAction<GatherArgParser> {
578 GatherAction(std::string name) : ReplayAction(name) {}
579 void kernel(simgrid::xbt::ReplayAction& action) override
581 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
582 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
584 if (name == "gather") {
585 int rank = MPI_COMM_WORLD->rank();
586 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
587 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
590 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
591 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
593 TRACE_smpi_comm_out(my_proc_id);
597 class GatherVAction : public ReplayAction<GatherVArgParser> {
599 GatherVAction(std::string name) : ReplayAction(name) {}
600 void kernel(simgrid::xbt::ReplayAction& action) override
602 int rank = MPI_COMM_WORLD->rank();
604 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
605 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
606 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
608 if (name == "gatherV") {
609 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
610 (rank == args.root) ? recv_buffer(args.recv_sum * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
614 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
615 recv_buffer(args.recv_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
619 TRACE_smpi_comm_out(my_proc_id);
623 class ScatterAction : public ReplayAction<ScatterArgParser> {
625 ScatterAction() : ReplayAction("scatter") {}
626 void kernel(simgrid::xbt::ReplayAction& action) override
628 int rank = MPI_COMM_WORLD->rank();
629 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
630 Datatype::encode(args.datatype1),
631 Datatype::encode(args.datatype2)));
633 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
634 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
636 TRACE_smpi_comm_out(my_proc_id);
639 } // Replay Namespace
641 static void action_scatterv(simgrid::xbt::ReplayAction& action)
643 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
644 0 gather 68 10 10 10 68 0 0 0
646 1) 68 10 10 10 is the sendcounts
647 2) 68 is the recvcount
648 3) 0 is the root node
649 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
650 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
652 double clock = smpi_process()->simulated_elapsed();
653 unsigned long comm_size = MPI_COMM_WORLD->size();
654 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
655 int recv_size = parse_double(action[2 + comm_size]);
656 std::vector<int> disps(comm_size, 0);
657 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
659 MPI_Datatype MPI_CURRENT_TYPE =
660 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
661 MPI_Datatype MPI_CURRENT_TYPE2{
662 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
664 void* send = nullptr;
665 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
666 for (unsigned int i = 0; i < comm_size; i++) {
667 (*sendcounts)[i] = std::stoi(action[i + 2]);
669 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
671 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
672 int rank = MPI_COMM_WORLD->rank();
675 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
677 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
678 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
679 Datatype::encode(MPI_CURRENT_TYPE2)));
681 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
684 TRACE_smpi_comm_out(Actor::self()->getPid());
685 log_timed_action(action, clock);
688 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
690 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
691 0 reduceScatter 275427 275427 275427 204020 11346849 0
693 1) The first four values after the name of the action declare the recvcounts array
694 2) The value 11346849 is the amount of instructions
695 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
697 double clock = smpi_process()->simulated_elapsed();
698 unsigned long comm_size = MPI_COMM_WORLD->size();
699 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
700 int comp_size = parse_double(action[2+comm_size]);
701 int my_proc_id = Actor::self()->getPid();
702 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
703 MPI_Datatype MPI_CURRENT_TYPE =
704 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
706 for (unsigned int i = 0; i < comm_size; i++) {
707 recvcounts->push_back(std::stoi(action[i + 2]));
709 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
711 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
712 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
713 std::to_string(comp_size), /* ugly hack to print comp_size */
714 Datatype::encode(MPI_CURRENT_TYPE)));
716 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
717 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
719 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
720 smpi_execute_flops(comp_size);
722 TRACE_smpi_comm_out(my_proc_id);
723 log_timed_action (action, clock);
726 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
728 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
729 0 allToAllV 100 1 7 10 12 100 1 70 10 5
731 1) 100 is the size of the send buffer *sizeof(int),
732 2) 1 7 10 12 is the sendcounts array
733 3) 100*sizeof(int) is the size of the receiver buffer
734 4) 1 70 10 5 is the recvcounts array
736 double clock = smpi_process()->simulated_elapsed();
738 unsigned long comm_size = MPI_COMM_WORLD->size();
739 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
740 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
741 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
742 std::vector<int> senddisps(comm_size, 0);
743 std::vector<int> recvdisps(comm_size, 0);
745 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
746 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
748 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
749 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
752 int send_buf_size=parse_double(action[2]);
753 int recv_buf_size=parse_double(action[3+comm_size]);
754 int my_proc_id = Actor::self()->getPid();
755 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
756 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
758 for (unsigned int i = 0; i < comm_size; i++) {
759 (*sendcounts)[i] = std::stoi(action[3 + i]);
760 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
762 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
763 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
765 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
766 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
767 Datatype::encode(MPI_CURRENT_TYPE),
768 Datatype::encode(MPI_CURRENT_TYPE2)));
770 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
771 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
773 TRACE_smpi_comm_out(my_proc_id);
774 log_timed_action (action, clock);
777 }} // namespace simgrid::smpi
779 /** @brief Only initialize the replay, don't do it for real */
780 void smpi_replay_init(int* argc, char*** argv)
782 simgrid::smpi::Process::init(argc, argv);
783 smpi_process()->mark_as_initialized();
784 smpi_process()->set_replaying(true);
786 int my_proc_id = Actor::self()->getPid();
787 TRACE_smpi_init(my_proc_id);
788 TRACE_smpi_computing_init(my_proc_id);
789 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
790 TRACE_smpi_comm_out(my_proc_id);
791 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
792 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
793 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
794 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
795 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
797 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
798 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
799 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
800 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
801 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
802 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
803 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
804 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
805 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
806 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
807 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
808 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
809 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
810 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
811 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterAction().execute(action); });
812 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("gatherV").execute(action); });
813 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
814 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
815 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("allGatherV").execute(action); });
816 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
817 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
819 //if we have a delayed start, sleep here.
821 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
822 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
823 smpi_execute_flops(value);
825 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
826 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
827 smpi_execute_flops(0.0);
831 /** @brief actually run the replay after initialization */
832 void smpi_replay_main(int* argc, char*** argv)
834 simgrid::xbt::replay_runner(*argc, *argv);
836 /* and now, finalize everything */
837 /* One active process will stop. Decrease the counter*/
838 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
839 if (not get_reqq_self()->empty()) {
840 unsigned int count_requests=get_reqq_self()->size();
841 MPI_Request requests[count_requests];
842 MPI_Status status[count_requests];
845 for (auto const& req : *get_reqq_self()) {
849 simgrid::smpi::Request::waitall(count_requests, requests, status);
851 delete get_reqq_self();
854 if(active_processes==0){
855 /* Last process alive speaking: end the simulated timer */
856 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
857 smpi_free_replay_tmp_buffers();
860 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
862 smpi_process()->finalize();
864 TRACE_smpi_comm_out(Actor::self()->getPid());
865 TRACE_smpi_finalize(Actor::self()->getPid());
868 /** @brief chain a replay initialization and a replay start */
869 void smpi_replay_run(int* argc, char*** argv)
871 smpi_replay_init(argc, argv);
872 smpi_replay_main(argc, argv);