1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 class CollCommParser : public ActionArgParser {
109 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
110 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
113 class BcastArgParser : public CollCommParser {
115 void parse(simgrid::xbt::ReplayAction& action) override
117 CHECK_ACTION_PARAMS(action, 1, 2)
118 size = parse_double(action[2]);
119 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
120 if (action.size() > 4)
121 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
125 class ReduceArgParser : public CollCommParser {
127 void parse(simgrid::xbt::ReplayAction& action) override
129 CHECK_ACTION_PARAMS(action, 2, 2)
130 comm_size = parse_double(action[2]);
131 comp_size = parse_double(action[3]);
132 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
133 if (action.size() > 5)
134 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
138 template <class T> class ReplayAction {
140 const std::string name;
146 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
148 virtual void execute(simgrid::xbt::ReplayAction& action)
150 // Needs to be re-initialized for every action, hence here
151 double start_time = smpi_process()->simulated_elapsed();
155 log_timed_action(action, start_time);
158 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
160 void* send_buffer(int size)
162 return smpi_get_tmp_sendbuffer(size);
165 void* recv_buffer(int size)
167 return smpi_get_tmp_recvbuffer(size);
171 class WaitAction : public ReplayAction<ActionArgParser> {
173 WaitAction() : ReplayAction("Wait") {}
174 void kernel(simgrid::xbt::ReplayAction& action) override
176 std::string s = boost::algorithm::join(action, " ");
177 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
178 MPI_Request request = get_reqq_self()->back();
179 get_reqq_self()->pop_back();
181 if (request == nullptr) {
182 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
187 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
189 // Must be taken before Request::wait() since the request may be set to
190 // MPI_REQUEST_NULL by Request::wait!
191 int src = request->comm()->group()->rank(request->src());
192 int dst = request->comm()->group()->rank(request->dst());
193 bool is_wait_for_receive = (request->flags() & RECV);
194 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
195 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
198 Request::wait(&request, &status);
200 TRACE_smpi_comm_out(rank);
201 if (is_wait_for_receive)
202 TRACE_smpi_recv(src, dst, 0);
206 class SendAction : public ReplayAction<SendRecvParser> {
208 SendAction() = delete;
209 SendAction(std::string name) : ReplayAction(name) {}
210 void kernel(simgrid::xbt::ReplayAction& action) override
212 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
214 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
215 Datatype::encode(args.datatype1)));
216 if (not TRACE_smpi_view_internals())
217 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
219 if (name == "send") {
220 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
221 } else if (name == "Isend") {
222 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
223 get_reqq_self()->push_back(request);
225 xbt_die("Don't know this action, %s", name.c_str());
228 TRACE_smpi_comm_out(my_proc_id);
232 class RecvAction : public ReplayAction<SendRecvParser> {
234 RecvAction() = delete;
235 explicit RecvAction(std::string name) : ReplayAction(name) {}
236 void kernel(simgrid::xbt::ReplayAction& action) override
238 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
240 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
241 Datatype::encode(args.datatype1)));
244 // unknown size from the receiver point of view
245 if (args.size <= 0.0) {
246 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
247 args.size = status.count;
250 if (name == "recv") {
251 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
252 } else if (name == "Irecv") {
253 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
254 get_reqq_self()->push_back(request);
257 TRACE_smpi_comm_out(my_proc_id);
258 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
259 if (name == "recv" && not TRACE_smpi_view_internals()) {
260 TRACE_smpi_recv(src_traced, my_proc_id, 0);
265 class ComputeAction : public ReplayAction<ComputeParser> {
267 ComputeAction() : ReplayAction("compute") {}
268 void kernel(simgrid::xbt::ReplayAction& action) override
270 TRACE_smpi_computing_in(my_proc_id, args.flops);
271 smpi_execute_flops(args.flops);
272 TRACE_smpi_computing_out(my_proc_id);
276 class TestAction : public ReplayAction<ActionArgParser> {
278 TestAction() : ReplayAction("Test") {}
279 void kernel(simgrid::xbt::ReplayAction& action) override
281 MPI_Request request = get_reqq_self()->back();
282 get_reqq_self()->pop_back();
283 // if request is null here, this may mean that a previous test has succeeded
284 // Different times in traced application and replayed version may lead to this
285 // In this case, ignore the extra calls.
286 if (request != nullptr) {
287 TRACE_smpi_testing_in(my_proc_id);
290 int flag = Request::test(&request, &status);
292 XBT_DEBUG("MPI_Test result: %d", flag);
293 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
295 get_reqq_self()->push_back(request);
297 TRACE_smpi_testing_out(my_proc_id);
302 class InitAction : public ReplayAction<ActionArgParser> {
304 InitAction() : ReplayAction("Init") {}
305 void kernel(simgrid::xbt::ReplayAction& action) override
307 CHECK_ACTION_PARAMS(action, 0, 1)
308 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
309 : MPI_BYTE; // default TAU datatype
311 /* start a simulated timer */
312 smpi_process()->simulated_start();
313 /*initialize the number of active processes */
314 active_processes = smpi_process_count();
316 set_reqq_self(new std::vector<MPI_Request>);
320 class CommunicatorAction : public ReplayAction<ActionArgParser> {
322 CommunicatorAction() : ReplayAction("Comm") {}
323 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
326 class WaitAllAction : public ReplayAction<ActionArgParser> {
328 WaitAllAction() : ReplayAction("waitAll") {}
329 void kernel(simgrid::xbt::ReplayAction& action) override
331 const unsigned int count_requests = get_reqq_self()->size();
333 if (count_requests > 0) {
334 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
335 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
336 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
337 for (const auto& req : (*get_reqq_self())) {
338 if (req && (req->flags() & RECV)) {
339 sender_receiver.push_back({req->src(), req->dst()});
342 MPI_Status status[count_requests];
343 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
345 for (auto& pair : sender_receiver) {
346 TRACE_smpi_recv(pair.first, pair.second, 0);
348 TRACE_smpi_comm_out(my_proc_id);
353 class BarrierAction : public ReplayAction<ActionArgParser> {
355 BarrierAction() : ReplayAction("barrier") {}
356 void kernel(simgrid::xbt::ReplayAction& action) override
358 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
359 Colls::barrier(MPI_COMM_WORLD);
360 TRACE_smpi_comm_out(my_proc_id);
364 class BcastAction : public ReplayAction<BcastArgParser> {
366 BcastAction() : ReplayAction("bcast") {}
367 void kernel(simgrid::xbt::ReplayAction& action) override
369 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
370 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
371 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
373 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
375 TRACE_smpi_comm_out(my_proc_id);
379 class ReduceAction : public ReplayAction<ReduceArgParser> {
381 ReduceAction() : ReplayAction("reduce") {}
382 void kernel(simgrid::xbt::ReplayAction& action) override
384 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
385 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
386 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
388 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
389 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
390 smpi_execute_flops(args.comp_size);
392 TRACE_smpi_comm_out(my_proc_id);
396 class AllReduceAction : public ReplayAction<ActionArgParser> {
398 AllReduceAction() : ReplayAction("barrier") {}
399 void kernel(simgrid::xbt::ReplayAction& action) override
401 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
402 Datatype::encode(MPI_CURRENT_TYPE), ""));
404 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
405 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
406 smpi_execute_flops(args.comp_size);
408 TRACE_smpi_comm_out(my_proc_id);
411 } // Replay Namespace
413 static void action_allToAll(simgrid::xbt::ReplayAction& action)
415 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
416 double clock = smpi_process()->simulated_elapsed();
417 unsigned long comm_size = MPI_COMM_WORLD->size();
418 int send_size = parse_double(action[2]);
419 int recv_size = parse_double(action[3]);
420 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
421 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
423 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
424 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
426 int my_proc_id = Actor::self()->getPid();
427 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
428 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
429 Datatype::encode(MPI_CURRENT_TYPE),
430 Datatype::encode(MPI_CURRENT_TYPE2)));
432 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
434 TRACE_smpi_comm_out(my_proc_id);
435 log_timed_action (action, clock);
438 static void action_gather(simgrid::xbt::ReplayAction& action)
440 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
443 1) 68 is the sendcounts
444 2) 68 is the recvcounts
445 3) 0 is the root node
446 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
447 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
449 CHECK_ACTION_PARAMS(action, 2, 3)
450 double clock = smpi_process()->simulated_elapsed();
451 unsigned long comm_size = MPI_COMM_WORLD->size();
452 int send_size = parse_double(action[2]);
453 int recv_size = parse_double(action[3]);
454 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
455 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
457 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
458 void *recv = nullptr;
459 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
460 int rank = MPI_COMM_WORLD->rank();
463 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
465 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
466 Datatype::encode(MPI_CURRENT_TYPE),
467 Datatype::encode(MPI_CURRENT_TYPE2)));
469 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
471 TRACE_smpi_comm_out(Actor::self()->getPid());
472 log_timed_action (action, clock);
475 static void action_scatter(simgrid::xbt::ReplayAction& action)
477 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
480 1) 68 is the sendcounts
481 2) 68 is the recvcounts
482 3) 0 is the root node
483 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
484 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
486 CHECK_ACTION_PARAMS(action, 2, 3)
487 double clock = smpi_process()->simulated_elapsed();
488 unsigned long comm_size = MPI_COMM_WORLD->size();
489 int send_size = parse_double(action[2]);
490 int recv_size = parse_double(action[3]);
491 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
492 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
494 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
495 void* recv = nullptr;
496 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
497 int rank = MPI_COMM_WORLD->rank();
500 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
502 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
503 Datatype::encode(MPI_CURRENT_TYPE),
504 Datatype::encode(MPI_CURRENT_TYPE2)));
506 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
508 TRACE_smpi_comm_out(Actor::self()->getPid());
509 log_timed_action(action, clock);
512 static void action_gatherv(simgrid::xbt::ReplayAction& action)
514 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
515 0 gather 68 68 10 10 10 0 0 0
517 1) 68 is the sendcount
518 2) 68 10 10 10 is the recvcounts
519 3) 0 is the root node
520 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
521 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
523 double clock = smpi_process()->simulated_elapsed();
524 unsigned long comm_size = MPI_COMM_WORLD->size();
525 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
526 int send_size = parse_double(action[2]);
527 std::vector<int> disps(comm_size, 0);
528 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
530 MPI_Datatype MPI_CURRENT_TYPE =
531 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
532 MPI_Datatype MPI_CURRENT_TYPE2{
533 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
535 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
536 void *recv = nullptr;
537 for (unsigned int i = 0; i < comm_size; i++) {
538 (*recvcounts)[i] = std::stoi(action[i + 3]);
540 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
542 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
543 int rank = MPI_COMM_WORLD->rank();
546 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
548 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
549 "gatherV", root, send_size, nullptr, -1, recvcounts,
550 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
552 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
555 TRACE_smpi_comm_out(Actor::self()->getPid());
556 log_timed_action (action, clock);
559 static void action_scatterv(simgrid::xbt::ReplayAction& action)
561 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
562 0 gather 68 10 10 10 68 0 0 0
564 1) 68 10 10 10 is the sendcounts
565 2) 68 is the recvcount
566 3) 0 is the root node
567 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
568 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
570 double clock = smpi_process()->simulated_elapsed();
571 unsigned long comm_size = MPI_COMM_WORLD->size();
572 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
573 int recv_size = parse_double(action[2 + comm_size]);
574 std::vector<int> disps(comm_size, 0);
575 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
577 MPI_Datatype MPI_CURRENT_TYPE =
578 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
579 MPI_Datatype MPI_CURRENT_TYPE2{
580 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
582 void* send = nullptr;
583 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
584 for (unsigned int i = 0; i < comm_size; i++) {
585 (*sendcounts)[i] = std::stoi(action[i + 2]);
587 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
589 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
590 int rank = MPI_COMM_WORLD->rank();
593 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
595 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
596 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
597 Datatype::encode(MPI_CURRENT_TYPE2)));
599 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
602 TRACE_smpi_comm_out(Actor::self()->getPid());
603 log_timed_action(action, clock);
606 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
608 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
609 0 reduceScatter 275427 275427 275427 204020 11346849 0
611 1) The first four values after the name of the action declare the recvcounts array
612 2) The value 11346849 is the amount of instructions
613 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
615 double clock = smpi_process()->simulated_elapsed();
616 unsigned long comm_size = MPI_COMM_WORLD->size();
617 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
618 int comp_size = parse_double(action[2+comm_size]);
619 int my_proc_id = Actor::self()->getPid();
620 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
621 MPI_Datatype MPI_CURRENT_TYPE =
622 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
624 for (unsigned int i = 0; i < comm_size; i++) {
625 recvcounts->push_back(std::stoi(action[i + 2]));
627 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
629 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
630 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
631 std::to_string(comp_size), /* ugly hack to print comp_size */
632 Datatype::encode(MPI_CURRENT_TYPE)));
634 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
635 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
637 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
638 smpi_execute_flops(comp_size);
640 TRACE_smpi_comm_out(my_proc_id);
641 log_timed_action (action, clock);
644 static void action_allgather(simgrid::xbt::ReplayAction& action)
646 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
647 0 allGather 275427 275427
649 1) 275427 is the sendcount
650 2) 275427 is the recvcount
651 3) No more values mean that the datatype for sent and receive buffer is the default one, see
652 simgrid::smpi::Datatype::decode().
654 double clock = smpi_process()->simulated_elapsed();
656 CHECK_ACTION_PARAMS(action, 2, 2)
657 int sendcount = std::stoi(action[2]);
658 int recvcount = std::stoi(action[3]);
660 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
661 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
663 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
664 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
666 int my_proc_id = Actor::self()->getPid();
668 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
669 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
670 Datatype::encode(MPI_CURRENT_TYPE),
671 Datatype::encode(MPI_CURRENT_TYPE2)));
673 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
675 TRACE_smpi_comm_out(my_proc_id);
676 log_timed_action (action, clock);
679 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
681 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
682 0 allGatherV 275427 275427 275427 275427 204020
684 1) 275427 is the sendcount
685 2) The next four elements declare the recvcounts array
686 3) No more values mean that the datatype for sent and receive buffer is the default one, see
687 simgrid::smpi::Datatype::decode().
689 double clock = smpi_process()->simulated_elapsed();
691 unsigned long comm_size = MPI_COMM_WORLD->size();
692 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
693 int sendcount = std::stoi(action[2]);
694 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
695 std::vector<int> disps(comm_size, 0);
697 int datatype_index = 0, disp_index = 0;
698 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
699 datatype_index = 3 + comm_size;
700 disp_index = datatype_index + 1;
701 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
703 disp_index = 3 + comm_size;
704 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
705 datatype_index = 3 + comm_size;
708 if (disp_index != 0) {
709 for (unsigned int i = 0; i < comm_size; i++)
710 disps[i] = std::stoi(action[disp_index + i]);
713 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
715 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
718 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
720 for (unsigned int i = 0; i < comm_size; i++) {
721 (*recvcounts)[i] = std::stoi(action[i + 3]);
723 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
724 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
726 int my_proc_id = Actor::self()->getPid();
728 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
729 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
730 Datatype::encode(MPI_CURRENT_TYPE),
731 Datatype::encode(MPI_CURRENT_TYPE2)));
733 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
736 TRACE_smpi_comm_out(my_proc_id);
737 log_timed_action (action, clock);
740 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
742 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
743 0 allToAllV 100 1 7 10 12 100 1 70 10 5
745 1) 100 is the size of the send buffer *sizeof(int),
746 2) 1 7 10 12 is the sendcounts array
747 3) 100*sizeof(int) is the size of the receiver buffer
748 4) 1 70 10 5 is the recvcounts array
750 double clock = smpi_process()->simulated_elapsed();
752 unsigned long comm_size = MPI_COMM_WORLD->size();
753 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
754 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
755 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
756 std::vector<int> senddisps(comm_size, 0);
757 std::vector<int> recvdisps(comm_size, 0);
759 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
760 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
762 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
763 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
766 int send_buf_size=parse_double(action[2]);
767 int recv_buf_size=parse_double(action[3+comm_size]);
768 int my_proc_id = Actor::self()->getPid();
769 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
770 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
772 for (unsigned int i = 0; i < comm_size; i++) {
773 (*sendcounts)[i] = std::stoi(action[3 + i]);
774 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
776 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
777 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
779 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
780 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
781 Datatype::encode(MPI_CURRENT_TYPE),
782 Datatype::encode(MPI_CURRENT_TYPE2)));
784 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
785 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
787 TRACE_smpi_comm_out(my_proc_id);
788 log_timed_action (action, clock);
791 }} // namespace simgrid::smpi
793 /** @brief Only initialize the replay, don't do it for real */
794 void smpi_replay_init(int* argc, char*** argv)
796 simgrid::smpi::Process::init(argc, argv);
797 smpi_process()->mark_as_initialized();
798 smpi_process()->set_replaying(true);
800 int my_proc_id = Actor::self()->getPid();
801 TRACE_smpi_init(my_proc_id);
802 TRACE_smpi_computing_init(my_proc_id);
803 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
804 TRACE_smpi_comm_out(my_proc_id);
805 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
806 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
807 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
808 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
809 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
811 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
812 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
813 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
814 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
815 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
816 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
817 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
818 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
819 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
820 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
821 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
822 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
823 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
824 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
825 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
826 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
827 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
828 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
829 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
830 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
831 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
833 //if we have a delayed start, sleep here.
835 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
836 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
837 smpi_execute_flops(value);
839 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
840 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
841 smpi_execute_flops(0.0);
845 /** @brief actually run the replay after initialization */
846 void smpi_replay_main(int* argc, char*** argv)
848 simgrid::xbt::replay_runner(*argc, *argv);
850 /* and now, finalize everything */
851 /* One active process will stop. Decrease the counter*/
852 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
853 if (not get_reqq_self()->empty()) {
854 unsigned int count_requests=get_reqq_self()->size();
855 MPI_Request requests[count_requests];
856 MPI_Status status[count_requests];
859 for (auto const& req : *get_reqq_self()) {
863 simgrid::smpi::Request::waitall(count_requests, requests, status);
865 delete get_reqq_self();
868 if(active_processes==0){
869 /* Last process alive speaking: end the simulated timer */
870 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
871 smpi_free_replay_tmp_buffers();
874 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
876 smpi_process()->finalize();
878 TRACE_smpi_comm_out(Actor::self()->getPid());
879 TRACE_smpi_finalize(Actor::self()->getPid());
882 /** @brief chain a replay initialization and a replay start */
883 void smpi_replay_run(int* argc, char*** argv)
885 smpi_replay_init(argc, argv);
886 smpi_replay_main(argc, argv);