1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 class CollCommParser : public ActionArgParser {
111 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
115 class BcastArgParser : public CollCommParser {
117 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
119 CHECK_ACTION_PARAMS(action, 1, 2)
120 size = parse_double(action[2]);
121 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122 if (action.size() > 4)
123 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
127 class ReduceArgParser : public CollCommParser {
129 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
131 CHECK_ACTION_PARAMS(action, 2, 2)
132 comm_size = parse_double(action[2]);
133 comp_size = parse_double(action[3]);
134 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
135 if (action.size() > 5)
136 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
140 class AllReduceArgParser : public CollCommParser {
142 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
144 CHECK_ACTION_PARAMS(action, 2, 1)
145 comm_size = parse_double(action[2]);
146 comp_size = parse_double(action[3]);
147 if (action.size() > 4)
148 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
152 class AllToAllArgParser : public CollCommParser {
154 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
156 CHECK_ACTION_PARAMS(action, 2, 1)
157 comm_size = MPI_COMM_WORLD->size();
158 send_size = parse_double(action[2]);
159 recv_size = parse_double(action[3]);
161 if (action.size() > 4)
162 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163 if (action.size() > 5)
164 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
168 class GatherArgParser : public CollCommParser {
170 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
172 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
175 1) 68 is the sendcounts
176 2) 68 is the recvcounts
177 3) 0 is the root node
178 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
181 CHECK_ACTION_PARAMS(action, 2, 3)
182 comm_size = MPI_COMM_WORLD->size();
183 send_size = parse_double(action[2]);
184 recv_size = parse_double(action[3]);
186 if (name == "gather") {
187 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
188 if (action.size() > 5)
189 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190 if (action.size() > 6)
191 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
194 if (action.size() > 4)
195 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 if (action.size() > 5)
197 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
202 class GatherVArgParser : public CollCommParser {
205 std::shared_ptr<std::vector<int>> recvcounts;
206 std::vector<int> disps;
207 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
209 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210 0 gather 68 68 10 10 10 0 0 0
212 1) 68 is the sendcount
213 2) 68 10 10 10 is the recvcounts
214 3) 0 is the root node
215 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
218 comm_size = MPI_COMM_WORLD->size();
219 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220 send_size = parse_double(action[2]);
221 disps = std::vector<int>(comm_size, 0);
222 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
224 if (name == "gatherV") {
225 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226 if (action.size() > 4 + comm_size)
227 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228 if (action.size() > 5 + comm_size)
229 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
232 int datatype_index = 0, disp_index = 0;
233 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
234 datatype_index = 3 + comm_size;
235 disp_index = datatype_index + 1;
236 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
238 disp_index = 3 + comm_size;
239 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
240 datatype_index = 3 + comm_size;
243 if (disp_index != 0) {
244 for (unsigned int i = 0; i < comm_size; i++)
245 disps[i] = std::stoi(action[disp_index + i]);
248 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
249 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
252 for (unsigned int i = 0; i < comm_size; i++) {
253 (*recvcounts)[i] = std::stoi(action[i + 3]);
255 recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
259 class ScatterArgParser : public CollCommParser {
261 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
263 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
266 1) 68 is the sendcounts
267 2) 68 is the recvcounts
268 3) 0 is the root node
269 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
270 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
272 CHECK_ACTION_PARAMS(action, 2, 3)
273 comm_size = MPI_COMM_WORLD->size();
274 send_size = parse_double(action[2]);
275 recv_size = parse_double(action[3]);
276 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
277 if (action.size() > 5)
278 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
279 if (action.size() > 6)
280 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
284 class ScatterVArgParser : public CollCommParser {
288 std::shared_ptr<std::vector<int>> sendcounts;
289 std::vector<int> disps;
290 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
292 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
293 0 gather 68 10 10 10 68 0 0 0
295 1) 68 10 10 10 is the sendcounts
296 2) 68 is the recvcount
297 3) 0 is the root node
298 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
299 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
301 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
302 recv_size = parse_double(action[2 + comm_size]);
303 disps = std::vector<int>(comm_size, 0);
304 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
306 if (action.size() > 5 + comm_size)
307 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
308 if (action.size() > 5 + comm_size)
309 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
311 for (unsigned int i = 0; i < comm_size; i++) {
312 (*sendcounts)[i] = std::stoi(action[i + 2]);
314 send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
315 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
319 class ReduceScatterArgParser : public CollCommParser {
322 std::shared_ptr<std::vector<int>> recvcounts;
323 std::vector<int> disps;
324 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
326 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
327 0 reduceScatter 275427 275427 275427 204020 11346849 0
329 1) The first four values after the name of the action declare the recvcounts array
330 2) The value 11346849 is the amount of instructions
331 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
333 comm_size = MPI_COMM_WORLD->size();
334 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
335 comp_size = parse_double(action[2+comm_size]);
336 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
337 if (action.size() > 3 + comm_size)
338 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
340 for (unsigned int i = 0; i < comm_size; i++) {
341 recvcounts->push_back(std::stoi(action[i + 2]));
343 recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
347 template <class T> class ReplayAction {
349 const std::string name;
355 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
357 virtual void execute(simgrid::xbt::ReplayAction& action)
359 // Needs to be re-initialized for every action, hence here
360 double start_time = smpi_process()->simulated_elapsed();
361 args.parse(action, name);
364 log_timed_action(action, start_time);
367 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
369 void* send_buffer(int size)
371 return smpi_get_tmp_sendbuffer(size);
374 void* recv_buffer(int size)
376 return smpi_get_tmp_recvbuffer(size);
380 class WaitAction : public ReplayAction<ActionArgParser> {
382 WaitAction() : ReplayAction("Wait") {}
383 void kernel(simgrid::xbt::ReplayAction& action) override
385 std::string s = boost::algorithm::join(action, " ");
386 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
387 MPI_Request request = get_reqq_self()->back();
388 get_reqq_self()->pop_back();
390 if (request == nullptr) {
391 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
396 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
398 // Must be taken before Request::wait() since the request may be set to
399 // MPI_REQUEST_NULL by Request::wait!
400 int src = request->comm()->group()->rank(request->src());
401 int dst = request->comm()->group()->rank(request->dst());
402 bool is_wait_for_receive = (request->flags() & RECV);
403 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
404 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
407 Request::wait(&request, &status);
409 TRACE_smpi_comm_out(rank);
410 if (is_wait_for_receive)
411 TRACE_smpi_recv(src, dst, 0);
415 class SendAction : public ReplayAction<SendRecvParser> {
417 SendAction() = delete;
418 SendAction(std::string name) : ReplayAction(name) {}
419 void kernel(simgrid::xbt::ReplayAction& action) override
421 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
423 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
424 Datatype::encode(args.datatype1)));
425 if (not TRACE_smpi_view_internals())
426 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
428 if (name == "send") {
429 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
430 } else if (name == "Isend") {
431 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
432 get_reqq_self()->push_back(request);
434 xbt_die("Don't know this action, %s", name.c_str());
437 TRACE_smpi_comm_out(my_proc_id);
441 class RecvAction : public ReplayAction<SendRecvParser> {
443 RecvAction() = delete;
444 explicit RecvAction(std::string name) : ReplayAction(name) {}
445 void kernel(simgrid::xbt::ReplayAction& action) override
447 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
449 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
450 Datatype::encode(args.datatype1)));
453 // unknown size from the receiver point of view
454 if (args.size <= 0.0) {
455 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
456 args.size = status.count;
459 if (name == "recv") {
460 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
461 } else if (name == "Irecv") {
462 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
463 get_reqq_self()->push_back(request);
466 TRACE_smpi_comm_out(my_proc_id);
467 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
468 if (name == "recv" && not TRACE_smpi_view_internals()) {
469 TRACE_smpi_recv(src_traced, my_proc_id, 0);
474 class ComputeAction : public ReplayAction<ComputeParser> {
476 ComputeAction() : ReplayAction("compute") {}
477 void kernel(simgrid::xbt::ReplayAction& action) override
479 TRACE_smpi_computing_in(my_proc_id, args.flops);
480 smpi_execute_flops(args.flops);
481 TRACE_smpi_computing_out(my_proc_id);
485 class TestAction : public ReplayAction<ActionArgParser> {
487 TestAction() : ReplayAction("Test") {}
488 void kernel(simgrid::xbt::ReplayAction& action) override
490 MPI_Request request = get_reqq_self()->back();
491 get_reqq_self()->pop_back();
492 // if request is null here, this may mean that a previous test has succeeded
493 // Different times in traced application and replayed version may lead to this
494 // In this case, ignore the extra calls.
495 if (request != nullptr) {
496 TRACE_smpi_testing_in(my_proc_id);
499 int flag = Request::test(&request, &status);
501 XBT_DEBUG("MPI_Test result: %d", flag);
502 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
504 get_reqq_self()->push_back(request);
506 TRACE_smpi_testing_out(my_proc_id);
511 class InitAction : public ReplayAction<ActionArgParser> {
513 InitAction() : ReplayAction("Init") {}
514 void kernel(simgrid::xbt::ReplayAction& action) override
516 CHECK_ACTION_PARAMS(action, 0, 1)
517 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
518 : MPI_BYTE; // default TAU datatype
520 /* start a simulated timer */
521 smpi_process()->simulated_start();
522 /*initialize the number of active processes */
523 active_processes = smpi_process_count();
525 set_reqq_self(new std::vector<MPI_Request>);
529 class CommunicatorAction : public ReplayAction<ActionArgParser> {
531 CommunicatorAction() : ReplayAction("Comm") {}
532 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
535 class WaitAllAction : public ReplayAction<ActionArgParser> {
537 WaitAllAction() : ReplayAction("waitAll") {}
538 void kernel(simgrid::xbt::ReplayAction& action) override
540 const unsigned int count_requests = get_reqq_self()->size();
542 if (count_requests > 0) {
543 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
544 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
545 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
546 for (const auto& req : (*get_reqq_self())) {
547 if (req && (req->flags() & RECV)) {
548 sender_receiver.push_back({req->src(), req->dst()});
551 MPI_Status status[count_requests];
552 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
554 for (auto& pair : sender_receiver) {
555 TRACE_smpi_recv(pair.first, pair.second, 0);
557 TRACE_smpi_comm_out(my_proc_id);
562 class BarrierAction : public ReplayAction<ActionArgParser> {
564 BarrierAction() : ReplayAction("barrier") {}
565 void kernel(simgrid::xbt::ReplayAction& action) override
567 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
568 Colls::barrier(MPI_COMM_WORLD);
569 TRACE_smpi_comm_out(my_proc_id);
573 class BcastAction : public ReplayAction<BcastArgParser> {
575 BcastAction() : ReplayAction("bcast") {}
576 void kernel(simgrid::xbt::ReplayAction& action) override
578 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
579 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
580 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
582 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
584 TRACE_smpi_comm_out(my_proc_id);
588 class ReduceAction : public ReplayAction<ReduceArgParser> {
590 ReduceAction() : ReplayAction("reduce") {}
591 void kernel(simgrid::xbt::ReplayAction& action) override
593 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
594 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
595 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
597 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
598 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
599 smpi_execute_flops(args.comp_size);
601 TRACE_smpi_comm_out(my_proc_id);
605 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
607 AllReduceAction() : ReplayAction("allReduce") {}
608 void kernel(simgrid::xbt::ReplayAction& action) override
610 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
611 Datatype::encode(args.datatype1), ""));
613 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
614 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
615 smpi_execute_flops(args.comp_size);
617 TRACE_smpi_comm_out(my_proc_id);
621 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
623 AllToAllAction() : ReplayAction("allToAll") {}
624 void kernel(simgrid::xbt::ReplayAction& action) override
626 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
627 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
628 Datatype::encode(args.datatype1),
629 Datatype::encode(args.datatype2)));
631 Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()),
632 args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
633 args.recv_size, args.datatype2, MPI_COMM_WORLD);
635 TRACE_smpi_comm_out(my_proc_id);
639 class GatherAction : public ReplayAction<GatherArgParser> {
641 GatherAction(std::string name) : ReplayAction(name) {}
642 void kernel(simgrid::xbt::ReplayAction& action) override
644 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
645 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
647 if (name == "gather") {
648 int rank = MPI_COMM_WORLD->rank();
649 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
650 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
653 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
654 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
656 TRACE_smpi_comm_out(my_proc_id);
660 class GatherVAction : public ReplayAction<GatherVArgParser> {
662 GatherVAction(std::string name) : ReplayAction(name) {}
663 void kernel(simgrid::xbt::ReplayAction& action) override
665 int rank = MPI_COMM_WORLD->rank();
667 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
668 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
669 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
671 if (name == "gatherV") {
672 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
673 (rank == args.root) ? recv_buffer(args.recv_sum * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
677 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
678 recv_buffer(args.recv_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
682 TRACE_smpi_comm_out(my_proc_id);
686 class ScatterAction : public ReplayAction<ScatterArgParser> {
688 ScatterAction() : ReplayAction("scatter") {}
689 void kernel(simgrid::xbt::ReplayAction& action) override
691 int rank = MPI_COMM_WORLD->rank();
692 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
693 Datatype::encode(args.datatype1),
694 Datatype::encode(args.datatype2)));
696 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
699 TRACE_smpi_comm_out(my_proc_id);
704 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
706 ScatterVAction() : ReplayAction("scatterV") {}
707 void kernel(simgrid::xbt::ReplayAction& action) override
709 int rank = MPI_COMM_WORLD->rank();
710 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
711 nullptr, Datatype::encode(args.datatype1),
712 Datatype::encode(args.datatype2)));
714 Colls::scatterv((rank == args.root) ? send_buffer(args.send_sum * args.datatype1->size()) : nullptr, args.sendcounts->data(), args.disps.data(),
715 args.datatype1, recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
718 TRACE_smpi_comm_out(my_proc_id);
722 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
724 ReduceScatterAction() : ReplayAction("reduceScatter") {}
725 void kernel(simgrid::xbt::ReplayAction& action) override
727 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
728 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
729 std::to_string(args.comp_size), /* ugly hack to print comp_size */
730 Datatype::encode(args.datatype1)));
732 Colls::reduce_scatter(send_buffer(args.recv_sum * args.datatype1->size()), recv_buffer(args.recv_sum * args.datatype1->size()),
733 args.recvcounts->data(), args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
735 smpi_execute_flops(args.comp_size);
736 TRACE_smpi_comm_out(my_proc_id);
739 } // Replay Namespace
741 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
743 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
744 0 allToAllV 100 1 7 10 12 100 1 70 10 5
746 1) 100 is the size of the send buffer *sizeof(int),
747 2) 1 7 10 12 is the sendcounts array
748 3) 100*sizeof(int) is the size of the receiver buffer
749 4) 1 70 10 5 is the recvcounts array
751 double clock = smpi_process()->simulated_elapsed();
753 unsigned long comm_size = MPI_COMM_WORLD->size();
754 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
755 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
756 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
757 std::vector<int> senddisps(comm_size, 0);
758 std::vector<int> recvdisps(comm_size, 0);
760 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
761 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
763 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
764 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
767 int send_buf_size=parse_double(action[2]);
768 int recv_buf_size=parse_double(action[3+comm_size]);
769 int my_proc_id = Actor::self()->getPid();
770 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
771 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
773 for (unsigned int i = 0; i < comm_size; i++) {
774 (*sendcounts)[i] = std::stoi(action[3 + i]);
775 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
777 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
778 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
780 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
781 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
782 Datatype::encode(MPI_CURRENT_TYPE),
783 Datatype::encode(MPI_CURRENT_TYPE2)));
785 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
786 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
788 TRACE_smpi_comm_out(my_proc_id);
789 log_timed_action (action, clock);
792 }} // namespace simgrid::smpi
794 /** @brief Only initialize the replay, don't do it for real */
795 void smpi_replay_init(int* argc, char*** argv)
797 simgrid::smpi::Process::init(argc, argv);
798 smpi_process()->mark_as_initialized();
799 smpi_process()->set_replaying(true);
801 int my_proc_id = Actor::self()->getPid();
802 TRACE_smpi_init(my_proc_id);
803 TRACE_smpi_computing_init(my_proc_id);
804 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
805 TRACE_smpi_comm_out(my_proc_id);
806 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
807 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
808 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
809 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
810 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
812 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
813 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
814 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
815 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
816 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
817 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
818 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
819 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
820 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
821 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
822 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
823 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
824 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
825 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
826 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterAction().execute(action); });
827 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("gatherV").execute(action); });
828 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterVAction().execute(action); });
829 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
830 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("allGatherV").execute(action); });
831 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceScatterAction().execute(action); });
832 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
834 //if we have a delayed start, sleep here.
836 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
837 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
838 smpi_execute_flops(value);
840 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
841 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
842 smpi_execute_flops(0.0);
846 /** @brief actually run the replay after initialization */
847 void smpi_replay_main(int* argc, char*** argv)
849 simgrid::xbt::replay_runner(*argc, *argv);
851 /* and now, finalize everything */
852 /* One active process will stop. Decrease the counter*/
853 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
854 if (not get_reqq_self()->empty()) {
855 unsigned int count_requests=get_reqq_self()->size();
856 MPI_Request requests[count_requests];
857 MPI_Status status[count_requests];
860 for (auto const& req : *get_reqq_self()) {
864 simgrid::smpi::Request::waitall(count_requests, requests, status);
866 delete get_reqq_self();
869 if(active_processes==0){
870 /* Last process alive speaking: end the simulated timer */
871 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
872 smpi_free_replay_tmp_buffers();
875 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
877 smpi_process()->finalize();
879 TRACE_smpi_comm_out(Actor::self()->getPid());
880 TRACE_smpi_finalize(Actor::self()->getPid());
883 /** @brief chain a replay initialization and a replay start */
884 void smpi_replay_run(int* argc, char*** argv)
886 smpi_replay_init(argc, argv);
887 smpi_replay_main(argc, argv);