1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%zu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional)); \
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43 std::string s = boost::algorithm::join(action, " ");
44 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
48 static std::vector<MPI_Request>* get_reqq_self()
50 return reqq.at(simgrid::s4u::this_actor::getPid());
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
59 static double parse_double(std::string string)
61 return xbt_str_parse_double(string.c_str(), "%s is not a double");
68 class ActionArgParser {
70 virtual ~ActionArgParser() = default;
71 virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 class CollCommParser : public ActionArgParser {
111 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
115 class BcastArgParser : public CollCommParser {
117 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
119 CHECK_ACTION_PARAMS(action, 1, 2)
120 size = parse_double(action[2]);
121 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122 if (action.size() > 4)
123 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
127 class ReduceArgParser : public CollCommParser {
129 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
131 CHECK_ACTION_PARAMS(action, 2, 2)
132 comm_size = parse_double(action[2]);
133 comp_size = parse_double(action[3]);
134 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
135 if (action.size() > 5)
136 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
140 class AllReduceArgParser : public CollCommParser {
142 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
144 CHECK_ACTION_PARAMS(action, 2, 1)
145 comm_size = parse_double(action[2]);
146 comp_size = parse_double(action[3]);
147 if (action.size() > 4)
148 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
152 class AllToAllArgParser : public CollCommParser {
154 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
156 CHECK_ACTION_PARAMS(action, 2, 1)
157 comm_size = MPI_COMM_WORLD->size();
158 send_size = parse_double(action[2]);
159 recv_size = parse_double(action[3]);
161 if (action.size() > 4)
162 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163 if (action.size() > 5)
164 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
168 class GatherArgParser : public CollCommParser {
170 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
172 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
175 1) 68 is the sendcounts
176 2) 68 is the recvcounts
177 3) 0 is the root node
178 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
181 CHECK_ACTION_PARAMS(action, 2, 3)
182 comm_size = MPI_COMM_WORLD->size();
183 send_size = parse_double(action[2]);
184 recv_size = parse_double(action[3]);
186 if (name == "gather") {
187 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
188 if (action.size() > 5)
189 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190 if (action.size() > 6)
191 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
194 if (action.size() > 4)
195 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 if (action.size() > 5)
197 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
202 class GatherVArgParser : public CollCommParser {
205 std::shared_ptr<std::vector<int>> recvcounts;
206 std::vector<int> disps;
207 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
209 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210 0 gather 68 68 10 10 10 0 0 0
212 1) 68 is the sendcount
213 2) 68 10 10 10 is the recvcounts
214 3) 0 is the root node
215 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
218 comm_size = MPI_COMM_WORLD->size();
219 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220 send_size = parse_double(action[2]);
221 disps = std::vector<int>(comm_size, 0);
222 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
224 if (name == "gatherV") {
225 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226 if (action.size() > 4 + comm_size)
227 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228 if (action.size() > 5 + comm_size)
229 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
232 int datatype_index = 0;
234 /* The 3 comes from "0 gather <sendcount>", which must always be present.
235 * The + comm_size is the recvcounts array, which must also be present
237 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
238 datatype_index = 3 + comm_size;
239 disp_index = datatype_index + 1;
240 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
241 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
242 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
243 disp_index = 3 + comm_size;
244 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
245 datatype_index = 3 + comm_size;
246 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
247 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
250 if (disp_index != 0) {
251 for (unsigned int i = 0; i < comm_size; i++)
252 disps[i] = std::stoi(action[disp_index + i]);
256 for (unsigned int i = 0; i < comm_size; i++) {
257 (*recvcounts)[i] = std::stoi(action[i + 3]);
259 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
263 class ScatterArgParser : public CollCommParser {
265 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
267 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
270 1) 68 is the sendcounts
271 2) 68 is the recvcounts
272 3) 0 is the root node
273 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
274 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
276 CHECK_ACTION_PARAMS(action, 2, 3)
277 comm_size = MPI_COMM_WORLD->size();
278 send_size = parse_double(action[2]);
279 recv_size = parse_double(action[3]);
280 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
281 if (action.size() > 5)
282 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
283 if (action.size() > 6)
284 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
288 class ScatterVArgParser : public CollCommParser {
292 std::shared_ptr<std::vector<int>> sendcounts;
293 std::vector<int> disps;
294 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
296 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
297 0 gather 68 10 10 10 68 0 0 0
299 1) 68 10 10 10 is the sendcounts
300 2) 68 is the recvcount
301 3) 0 is the root node
302 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
306 recv_size = parse_double(action[2 + comm_size]);
307 disps = std::vector<int>(comm_size, 0);
308 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
310 if (action.size() > 5 + comm_size)
311 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
312 if (action.size() > 5 + comm_size)
313 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
315 for (unsigned int i = 0; i < comm_size; i++) {
316 (*sendcounts)[i] = std::stoi(action[i + 2]);
318 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
319 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
323 class ReduceScatterArgParser : public CollCommParser {
326 std::shared_ptr<std::vector<int>> recvcounts;
327 std::vector<int> disps;
328 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
330 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
331 0 reduceScatter 275427 275427 275427 204020 11346849 0
333 1) The first four values after the name of the action declare the recvcounts array
334 2) The value 11346849 is the amount of instructions
335 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
337 comm_size = MPI_COMM_WORLD->size();
338 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
339 comp_size = parse_double(action[2+comm_size]);
340 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
341 if (action.size() > 3 + comm_size)
342 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
344 for (unsigned int i = 0; i < comm_size; i++) {
345 recvcounts->push_back(std::stoi(action[i + 2]));
347 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
351 class AllToAllVArgParser : public CollCommParser {
355 std::shared_ptr<std::vector<int>> recvcounts;
356 std::shared_ptr<std::vector<int>> sendcounts;
357 std::vector<int> senddisps;
358 std::vector<int> recvdisps;
361 void parse(simgrid::xbt::ReplayAction& action, std::string name) override
363 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
364 0 allToAllV 100 1 7 10 12 100 1 70 10 5
366 1) 100 is the size of the send buffer *sizeof(int),
367 2) 1 7 10 12 is the sendcounts array
368 3) 100*sizeof(int) is the size of the receiver buffer
369 4) 1 70 10 5 is the recvcounts array
371 comm_size = MPI_COMM_WORLD->size();
372 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
373 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
374 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
375 senddisps = std::vector<int>(comm_size, 0);
376 recvdisps = std::vector<int>(comm_size, 0);
378 if (action.size() > 5 + 2 * comm_size)
379 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
380 if (action.size() > 5 + 2 * comm_size)
381 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
383 send_buf_size=parse_double(action[2]);
384 recv_buf_size=parse_double(action[3+comm_size]);
385 for (unsigned int i = 0; i < comm_size; i++) {
386 (*sendcounts)[i] = std::stoi(action[3 + i]);
387 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
389 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
390 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
394 template <class T> class ReplayAction {
396 const std::string name;
402 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
403 virtual ~ReplayAction() = default;
405 virtual void execute(simgrid::xbt::ReplayAction& action)
407 // Needs to be re-initialized for every action, hence here
408 double start_time = smpi_process()->simulated_elapsed();
409 args.parse(action, name);
412 log_timed_action(action, start_time);
415 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
417 void* send_buffer(int size)
419 return smpi_get_tmp_sendbuffer(size);
422 void* recv_buffer(int size)
424 return smpi_get_tmp_recvbuffer(size);
428 class WaitAction : public ReplayAction<ActionArgParser> {
430 WaitAction() : ReplayAction("Wait") {}
431 void kernel(simgrid::xbt::ReplayAction& action) override
433 std::string s = boost::algorithm::join(action, " ");
434 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
435 MPI_Request request = get_reqq_self()->back();
436 get_reqq_self()->pop_back();
438 if (request == nullptr) {
439 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
444 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
446 // Must be taken before Request::wait() since the request may be set to
447 // MPI_REQUEST_NULL by Request::wait!
448 int src = request->comm()->group()->rank(request->src());
449 int dst = request->comm()->group()->rank(request->dst());
450 bool is_wait_for_receive = (request->flags() & RECV);
451 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
452 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
455 Request::wait(&request, &status);
457 TRACE_smpi_comm_out(rank);
458 if (is_wait_for_receive)
459 TRACE_smpi_recv(src, dst, 0);
463 class SendAction : public ReplayAction<SendRecvParser> {
465 SendAction() = delete;
466 explicit SendAction(std::string name) : ReplayAction(name) {}
467 void kernel(simgrid::xbt::ReplayAction& action) override
469 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
471 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
472 Datatype::encode(args.datatype1)));
473 if (not TRACE_smpi_view_internals())
474 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
476 if (name == "send") {
477 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
478 } else if (name == "Isend") {
479 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
480 get_reqq_self()->push_back(request);
482 xbt_die("Don't know this action, %s", name.c_str());
485 TRACE_smpi_comm_out(my_proc_id);
489 class RecvAction : public ReplayAction<SendRecvParser> {
491 RecvAction() = delete;
492 explicit RecvAction(std::string name) : ReplayAction(name) {}
493 void kernel(simgrid::xbt::ReplayAction& action) override
495 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
497 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
498 Datatype::encode(args.datatype1)));
501 // unknown size from the receiver point of view
502 if (args.size <= 0.0) {
503 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
504 args.size = status.count;
507 if (name == "recv") {
508 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
509 } else if (name == "Irecv") {
510 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
511 get_reqq_self()->push_back(request);
514 TRACE_smpi_comm_out(my_proc_id);
515 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
516 if (name == "recv" && not TRACE_smpi_view_internals()) {
517 TRACE_smpi_recv(src_traced, my_proc_id, 0);
522 class ComputeAction : public ReplayAction<ComputeParser> {
524 ComputeAction() : ReplayAction("compute") {}
525 void kernel(simgrid::xbt::ReplayAction& action) override
527 TRACE_smpi_computing_in(my_proc_id, args.flops);
528 smpi_execute_flops(args.flops);
529 TRACE_smpi_computing_out(my_proc_id);
533 class TestAction : public ReplayAction<ActionArgParser> {
535 TestAction() : ReplayAction("Test") {}
536 void kernel(simgrid::xbt::ReplayAction& action) override
538 MPI_Request request = get_reqq_self()->back();
539 get_reqq_self()->pop_back();
540 // if request is null here, this may mean that a previous test has succeeded
541 // Different times in traced application and replayed version may lead to this
542 // In this case, ignore the extra calls.
543 if (request != nullptr) {
544 TRACE_smpi_testing_in(my_proc_id);
547 int flag = Request::test(&request, &status);
549 XBT_DEBUG("MPI_Test result: %d", flag);
550 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
552 get_reqq_self()->push_back(request);
554 TRACE_smpi_testing_out(my_proc_id);
559 class InitAction : public ReplayAction<ActionArgParser> {
561 InitAction() : ReplayAction("Init") {}
562 void kernel(simgrid::xbt::ReplayAction& action) override
564 CHECK_ACTION_PARAMS(action, 0, 1)
565 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
566 : MPI_BYTE; // default TAU datatype
568 /* start a simulated timer */
569 smpi_process()->simulated_start();
570 /*initialize the number of active processes */
571 active_processes = smpi_process_count();
573 set_reqq_self(new std::vector<MPI_Request>);
577 class CommunicatorAction : public ReplayAction<ActionArgParser> {
579 CommunicatorAction() : ReplayAction("Comm") {}
580 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
583 class WaitAllAction : public ReplayAction<ActionArgParser> {
585 WaitAllAction() : ReplayAction("waitAll") {}
586 void kernel(simgrid::xbt::ReplayAction& action) override
588 const unsigned int count_requests = get_reqq_self()->size();
590 if (count_requests > 0) {
591 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
592 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
593 for (const auto& req : (*get_reqq_self())) {
594 if (req && (req->flags() & RECV)) {
595 sender_receiver.push_back({req->src(), req->dst()});
598 MPI_Status status[count_requests];
599 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
601 for (auto& pair : sender_receiver) {
602 TRACE_smpi_recv(pair.first, pair.second, 0);
604 TRACE_smpi_comm_out(my_proc_id);
609 class BarrierAction : public ReplayAction<ActionArgParser> {
611 BarrierAction() : ReplayAction("barrier") {}
612 void kernel(simgrid::xbt::ReplayAction& action) override
614 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
615 Colls::barrier(MPI_COMM_WORLD);
616 TRACE_smpi_comm_out(my_proc_id);
620 class BcastAction : public ReplayAction<BcastArgParser> {
622 BcastAction() : ReplayAction("bcast") {}
623 void kernel(simgrid::xbt::ReplayAction& action) override
625 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
626 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
627 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
629 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
631 TRACE_smpi_comm_out(my_proc_id);
635 class ReduceAction : public ReplayAction<ReduceArgParser> {
637 ReduceAction() : ReplayAction("reduce") {}
638 void kernel(simgrid::xbt::ReplayAction& action) override
640 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
641 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
642 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
644 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
645 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
646 smpi_execute_flops(args.comp_size);
648 TRACE_smpi_comm_out(my_proc_id);
652 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
654 AllReduceAction() : ReplayAction("allReduce") {}
655 void kernel(simgrid::xbt::ReplayAction& action) override
657 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
658 Datatype::encode(args.datatype1), ""));
660 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
661 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
662 smpi_execute_flops(args.comp_size);
664 TRACE_smpi_comm_out(my_proc_id);
668 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
670 AllToAllAction() : ReplayAction("allToAll") {}
671 void kernel(simgrid::xbt::ReplayAction& action) override
673 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
674 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
675 Datatype::encode(args.datatype1),
676 Datatype::encode(args.datatype2)));
678 Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()),
679 args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
680 args.recv_size, args.datatype2, MPI_COMM_WORLD);
682 TRACE_smpi_comm_out(my_proc_id);
686 class GatherAction : public ReplayAction<GatherArgParser> {
688 explicit GatherAction(std::string name) : ReplayAction(name) {}
689 void kernel(simgrid::xbt::ReplayAction& action) override
691 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
692 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
694 if (name == "gather") {
695 int rank = MPI_COMM_WORLD->rank();
696 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
700 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
703 TRACE_smpi_comm_out(my_proc_id);
707 class GatherVAction : public ReplayAction<GatherVArgParser> {
709 explicit GatherVAction(std::string name) : ReplayAction(name) {}
710 void kernel(simgrid::xbt::ReplayAction& action) override
712 int rank = MPI_COMM_WORLD->rank();
714 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
715 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
716 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
718 if (name == "gatherV") {
719 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
720 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
724 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
725 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
729 TRACE_smpi_comm_out(my_proc_id);
733 class ScatterAction : public ReplayAction<ScatterArgParser> {
735 ScatterAction() : ReplayAction("scatter") {}
736 void kernel(simgrid::xbt::ReplayAction& action) override
738 int rank = MPI_COMM_WORLD->rank();
739 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
740 Datatype::encode(args.datatype1),
741 Datatype::encode(args.datatype2)));
743 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
744 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
746 TRACE_smpi_comm_out(my_proc_id);
751 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
753 ScatterVAction() : ReplayAction("scatterV") {}
754 void kernel(simgrid::xbt::ReplayAction& action) override
756 int rank = MPI_COMM_WORLD->rank();
757 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
758 nullptr, Datatype::encode(args.datatype1),
759 Datatype::encode(args.datatype2)));
761 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr, args.sendcounts->data(), args.disps.data(),
762 args.datatype1, recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
765 TRACE_smpi_comm_out(my_proc_id);
769 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
771 ReduceScatterAction() : ReplayAction("reduceScatter") {}
772 void kernel(simgrid::xbt::ReplayAction& action) override
774 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
775 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
776 std::to_string(args.comp_size), /* ugly hack to print comp_size */
777 Datatype::encode(args.datatype1)));
779 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()), recv_buffer(args.recv_size_sum * args.datatype1->size()),
780 args.recvcounts->data(), args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
782 smpi_execute_flops(args.comp_size);
783 TRACE_smpi_comm_out(my_proc_id);
787 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
789 AllToAllVAction() : ReplayAction("allToAllV") {}
790 void kernel(simgrid::xbt::ReplayAction& action) override
792 TRACE_smpi_comm_in(my_proc_id, __func__,
793 new simgrid::instr::VarCollTIData(
794 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
795 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
797 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
798 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
800 TRACE_smpi_comm_out(my_proc_id);
803 } // Replay Namespace
804 }} // namespace simgrid::smpi
806 /** @brief Only initialize the replay, don't do it for real */
807 void smpi_replay_init(int* argc, char*** argv)
809 simgrid::smpi::Process::init(argc, argv);
810 smpi_process()->mark_as_initialized();
811 smpi_process()->set_replaying(true);
813 int my_proc_id = simgrid::s4u::this_actor::getPid();
814 TRACE_smpi_init(my_proc_id);
815 TRACE_smpi_computing_init(my_proc_id);
816 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
817 TRACE_smpi_comm_out(my_proc_id);
818 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
819 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
820 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
824 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
825 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
826 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
827 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
828 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
829 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
830 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
831 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
832 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
833 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
834 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
835 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
836 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
837 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
838 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
839 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
840 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
841 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
842 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
843 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
844 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
846 //if we have a delayed start, sleep here.
848 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
849 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
850 smpi_execute_flops(value);
852 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
853 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
854 smpi_execute_flops(0.0);
858 /** @brief actually run the replay after initialization */
859 void smpi_replay_main(int* argc, char*** argv)
861 simgrid::xbt::replay_runner(*argc, *argv);
863 /* and now, finalize everything */
864 /* One active process will stop. Decrease the counter*/
865 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
866 if (not get_reqq_self()->empty()) {
867 unsigned int count_requests=get_reqq_self()->size();
868 MPI_Request requests[count_requests];
869 MPI_Status status[count_requests];
872 for (auto const& req : *get_reqq_self()) {
876 simgrid::smpi::Request::waitall(count_requests, requests, status);
878 delete get_reqq_self();
881 if(active_processes==0){
882 /* Last process alive speaking: end the simulated timer */
883 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
884 smpi_free_replay_tmp_buffers();
887 TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
888 new simgrid::instr::NoOpTIData("finalize"));
890 smpi_process()->finalize();
892 TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
893 TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
896 /** @brief chain a replay initialization and a replay start */
897 void smpi_replay_run(int* argc, char*** argv)
899 smpi_replay_init(argc, argv);
900 smpi_replay_main(argc, argv);