1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 template <class T> class ReplayAction {
105 const std::string name;
111 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
113 virtual void execute(simgrid::xbt::ReplayAction& action)
115 // Needs to be re-initialized for every action, hence here
116 double start_time = smpi_process()->simulated_elapsed();
119 log_timed_action(action, start_time);
122 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
125 class WaitAction : public ReplayAction<ActionArgParser> {
127 WaitAction() : ReplayAction("Wait") {}
128 void kernel(simgrid::xbt::ReplayAction& action) override
130 CHECK_ACTION_PARAMS(action, 0, 0)
133 std::string s = boost::algorithm::join(action, " ");
134 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
135 MPI_Request request = get_reqq_self()->back();
136 get_reqq_self()->pop_back();
138 if (request == nullptr) {
139 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
144 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
146 // Must be taken before Request::wait() since the request may be set to
147 // MPI_REQUEST_NULL by Request::wait!
148 int src = request->comm()->group()->rank(request->src());
149 int dst = request->comm()->group()->rank(request->dst());
150 bool is_wait_for_receive = (request->flags() & RECV);
151 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
152 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
154 Request::wait(&request, &status);
156 TRACE_smpi_comm_out(rank);
157 if (is_wait_for_receive)
158 TRACE_smpi_recv(src_traced, dst_traced, 0);
162 class SendAction : public ReplayAction<SendRecvParser> {
164 SendAction() = delete;
165 SendAction(std::string name) : ReplayAction(name) {}
166 void kernel(simgrid::xbt::ReplayAction& action) override
168 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
170 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
171 Datatype::encode(args.datatype1)));
172 if (not TRACE_smpi_view_internals())
173 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
175 if (name == "send") {
176 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
177 } else if (name == "Isend") {
178 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
179 get_reqq_self()->push_back(request);
181 xbt_die("Don't know this action, %s", name.c_str());
184 TRACE_smpi_comm_out(my_proc_id);
188 class RecvAction : public ReplayAction<SendRecvParser> {
190 RecvAction() = delete;
191 explicit RecvAction(std::string name) : ReplayAction(name) {}
192 void kernel(simgrid::xbt::ReplayAction& action) override
194 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
196 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
197 Datatype::encode(args.datatype1)));
200 // unknown size from the receiver point of view
201 if (args.size <= 0.0) {
202 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
203 args.size = status.count;
206 if (name == "recv") {
207 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
208 } else if (name == "Irecv") {
209 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
210 get_reqq_self()->push_back(request);
213 TRACE_smpi_comm_out(my_proc_id);
214 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
215 if (name == "recv" && not TRACE_smpi_view_internals()) {
216 TRACE_smpi_recv(src_traced, my_proc_id, 0);
221 class ComputeAction : public ReplayAction<ComputeParser> {
223 ComputeAction() : ReplayAction("compute") {}
224 void kernel(simgrid::xbt::ReplayAction& action) override
226 TRACE_smpi_computing_in(my_proc_id, args.flops);
227 smpi_execute_flops(args.flops);
228 TRACE_smpi_computing_out(my_proc_id);
232 class TestAction : public ReplayAction<ActionArgParser> {
234 TestAction() : ReplayAction("Test") {}
235 void kernel(simgrid::xbt::ReplayAction& action) override
237 MPI_Request request = get_reqq_self()->back();
238 get_reqq_self()->pop_back();
239 // if request is null here, this may mean that a previous test has succeeded
240 // Different times in traced application and replayed version may lead to this
241 // In this case, ignore the extra calls.
242 if (request != nullptr) {
243 TRACE_smpi_testing_in(my_proc_id);
246 int flag = Request::test(&request, &status);
248 XBT_DEBUG("MPI_Test result: %d", flag);
249 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
251 get_reqq_self()->push_back(request);
253 TRACE_smpi_testing_out(my_proc_id);
258 } // Replay Namespace
260 static void action_init(simgrid::xbt::ReplayAction& action)
262 XBT_DEBUG("Initialize the counters");
263 CHECK_ACTION_PARAMS(action, 0, 1)
264 if (action.size() > 2)
265 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
267 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
269 /* start a simulated timer */
270 smpi_process()->simulated_start();
271 /*initialize the number of active processes */
272 active_processes = smpi_process_count();
274 set_reqq_self(new std::vector<MPI_Request>);
277 static void action_finalize(simgrid::xbt::ReplayAction& action)
282 static void action_comm_size(simgrid::xbt::ReplayAction& action)
284 log_timed_action (action, smpi_process()->simulated_elapsed());
287 static void action_comm_split(simgrid::xbt::ReplayAction& action)
289 log_timed_action (action, smpi_process()->simulated_elapsed());
292 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
294 log_timed_action (action, smpi_process()->simulated_elapsed());
297 static void action_waitall(simgrid::xbt::ReplayAction& action)
299 CHECK_ACTION_PARAMS(action, 0, 0)
300 double clock = smpi_process()->simulated_elapsed();
301 const unsigned int count_requests = get_reqq_self()->size();
303 if (count_requests>0) {
304 MPI_Status status[count_requests];
306 int my_proc_id_traced = Actor::self()->getPid();
307 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
308 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
309 int recvs_snd[count_requests];
310 int recvs_rcv[count_requests];
311 for (unsigned int i = 0; i < count_requests; i++) {
312 const auto& req = (*get_reqq_self())[i];
313 if (req && (req->flags() & RECV)) {
314 recvs_snd[i] = req->src();
315 recvs_rcv[i] = req->dst();
319 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
321 for (unsigned i = 0; i < count_requests; i++) {
322 if (recvs_snd[i]!=-100)
323 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
325 TRACE_smpi_comm_out(my_proc_id_traced);
327 log_timed_action (action, clock);
330 static void action_barrier(simgrid::xbt::ReplayAction& action)
332 double clock = smpi_process()->simulated_elapsed();
333 int my_proc_id = Actor::self()->getPid();
334 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
336 Colls::barrier(MPI_COMM_WORLD);
338 TRACE_smpi_comm_out(my_proc_id);
339 log_timed_action (action, clock);
342 static void action_bcast(simgrid::xbt::ReplayAction& action)
344 CHECK_ACTION_PARAMS(action, 1, 2)
345 double size = parse_double(action[2]);
346 double clock = smpi_process()->simulated_elapsed();
347 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
348 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
349 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
351 int my_proc_id = Actor::self()->getPid();
352 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
353 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
354 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
356 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
358 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
360 TRACE_smpi_comm_out(my_proc_id);
361 log_timed_action (action, clock);
364 static void action_reduce(simgrid::xbt::ReplayAction& action)
366 CHECK_ACTION_PARAMS(action, 2, 2)
367 double comm_size = parse_double(action[2]);
368 double comp_size = parse_double(action[3]);
369 double clock = smpi_process()->simulated_elapsed();
370 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
372 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
374 int my_proc_id = Actor::self()->getPid();
375 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
376 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
377 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
379 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
380 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
381 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
382 smpi_execute_flops(comp_size);
384 TRACE_smpi_comm_out(my_proc_id);
385 log_timed_action (action, clock);
388 static void action_allReduce(simgrid::xbt::ReplayAction& action)
390 CHECK_ACTION_PARAMS(action, 2, 1)
391 double comm_size = parse_double(action[2]);
392 double comp_size = parse_double(action[3]);
394 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
396 double clock = smpi_process()->simulated_elapsed();
397 int my_proc_id = Actor::self()->getPid();
398 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
399 Datatype::encode(MPI_CURRENT_TYPE), ""));
401 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
402 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
403 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
404 smpi_execute_flops(comp_size);
406 TRACE_smpi_comm_out(my_proc_id);
407 log_timed_action (action, clock);
410 static void action_allToAll(simgrid::xbt::ReplayAction& action)
412 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
413 double clock = smpi_process()->simulated_elapsed();
414 unsigned long comm_size = MPI_COMM_WORLD->size();
415 int send_size = parse_double(action[2]);
416 int recv_size = parse_double(action[3]);
417 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
418 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
420 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
421 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
423 int my_proc_id = Actor::self()->getPid();
424 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
425 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
426 Datatype::encode(MPI_CURRENT_TYPE),
427 Datatype::encode(MPI_CURRENT_TYPE2)));
429 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
431 TRACE_smpi_comm_out(my_proc_id);
432 log_timed_action (action, clock);
435 static void action_gather(simgrid::xbt::ReplayAction& action)
437 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
440 1) 68 is the sendcounts
441 2) 68 is the recvcounts
442 3) 0 is the root node
443 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
444 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
446 CHECK_ACTION_PARAMS(action, 2, 3)
447 double clock = smpi_process()->simulated_elapsed();
448 unsigned long comm_size = MPI_COMM_WORLD->size();
449 int send_size = parse_double(action[2]);
450 int recv_size = parse_double(action[3]);
451 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
452 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
454 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
455 void *recv = nullptr;
456 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
457 int rank = MPI_COMM_WORLD->rank();
460 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
462 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
463 Datatype::encode(MPI_CURRENT_TYPE),
464 Datatype::encode(MPI_CURRENT_TYPE2)));
466 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
468 TRACE_smpi_comm_out(Actor::self()->getPid());
469 log_timed_action (action, clock);
472 static void action_scatter(simgrid::xbt::ReplayAction& action)
474 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
477 1) 68 is the sendcounts
478 2) 68 is the recvcounts
479 3) 0 is the root node
480 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
481 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
483 CHECK_ACTION_PARAMS(action, 2, 3)
484 double clock = smpi_process()->simulated_elapsed();
485 unsigned long comm_size = MPI_COMM_WORLD->size();
486 int send_size = parse_double(action[2]);
487 int recv_size = parse_double(action[3]);
488 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
489 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
491 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
492 void* recv = nullptr;
493 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
494 int rank = MPI_COMM_WORLD->rank();
497 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
499 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
500 Datatype::encode(MPI_CURRENT_TYPE),
501 Datatype::encode(MPI_CURRENT_TYPE2)));
503 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
505 TRACE_smpi_comm_out(Actor::self()->getPid());
506 log_timed_action(action, clock);
509 static void action_gatherv(simgrid::xbt::ReplayAction& action)
511 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
512 0 gather 68 68 10 10 10 0 0 0
514 1) 68 is the sendcount
515 2) 68 10 10 10 is the recvcounts
516 3) 0 is the root node
517 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
518 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
520 double clock = smpi_process()->simulated_elapsed();
521 unsigned long comm_size = MPI_COMM_WORLD->size();
522 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
523 int send_size = parse_double(action[2]);
524 std::vector<int> disps(comm_size, 0);
525 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
527 MPI_Datatype MPI_CURRENT_TYPE =
528 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
529 MPI_Datatype MPI_CURRENT_TYPE2{
530 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
532 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
533 void *recv = nullptr;
534 for (unsigned int i = 0; i < comm_size; i++) {
535 (*recvcounts)[i] = std::stoi(action[i + 3]);
537 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
539 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
540 int rank = MPI_COMM_WORLD->rank();
543 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
545 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
546 "gatherV", root, send_size, nullptr, -1, recvcounts,
547 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
549 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
552 TRACE_smpi_comm_out(Actor::self()->getPid());
553 log_timed_action (action, clock);
556 static void action_scatterv(simgrid::xbt::ReplayAction& action)
558 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
559 0 gather 68 10 10 10 68 0 0 0
561 1) 68 10 10 10 is the sendcounts
562 2) 68 is the recvcount
563 3) 0 is the root node
564 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
565 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
567 double clock = smpi_process()->simulated_elapsed();
568 unsigned long comm_size = MPI_COMM_WORLD->size();
569 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
570 int recv_size = parse_double(action[2 + comm_size]);
571 std::vector<int> disps(comm_size, 0);
572 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
574 MPI_Datatype MPI_CURRENT_TYPE =
575 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
576 MPI_Datatype MPI_CURRENT_TYPE2{
577 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
579 void* send = nullptr;
580 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
581 for (unsigned int i = 0; i < comm_size; i++) {
582 (*sendcounts)[i] = std::stoi(action[i + 2]);
584 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
586 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
587 int rank = MPI_COMM_WORLD->rank();
590 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
592 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
593 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
594 Datatype::encode(MPI_CURRENT_TYPE2)));
596 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
599 TRACE_smpi_comm_out(Actor::self()->getPid());
600 log_timed_action(action, clock);
603 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
605 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
606 0 reduceScatter 275427 275427 275427 204020 11346849 0
608 1) The first four values after the name of the action declare the recvcounts array
609 2) The value 11346849 is the amount of instructions
610 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
612 double clock = smpi_process()->simulated_elapsed();
613 unsigned long comm_size = MPI_COMM_WORLD->size();
614 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
615 int comp_size = parse_double(action[2+comm_size]);
616 int my_proc_id = Actor::self()->getPid();
617 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
618 MPI_Datatype MPI_CURRENT_TYPE =
619 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
621 for (unsigned int i = 0; i < comm_size; i++) {
622 recvcounts->push_back(std::stoi(action[i + 2]));
624 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
626 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
627 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
628 std::to_string(comp_size), /* ugly hack to print comp_size */
629 Datatype::encode(MPI_CURRENT_TYPE)));
631 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
632 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
634 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
635 smpi_execute_flops(comp_size);
637 TRACE_smpi_comm_out(my_proc_id);
638 log_timed_action (action, clock);
641 static void action_allgather(simgrid::xbt::ReplayAction& action)
643 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
644 0 allGather 275427 275427
646 1) 275427 is the sendcount
647 2) 275427 is the recvcount
648 3) No more values mean that the datatype for sent and receive buffer is the default one, see
649 simgrid::smpi::Datatype::decode().
651 double clock = smpi_process()->simulated_elapsed();
653 CHECK_ACTION_PARAMS(action, 2, 2)
654 int sendcount = std::stoi(action[2]);
655 int recvcount = std::stoi(action[3]);
657 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
658 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
660 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
661 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
663 int my_proc_id = Actor::self()->getPid();
665 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
666 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
667 Datatype::encode(MPI_CURRENT_TYPE),
668 Datatype::encode(MPI_CURRENT_TYPE2)));
670 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
672 TRACE_smpi_comm_out(my_proc_id);
673 log_timed_action (action, clock);
676 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
678 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
679 0 allGatherV 275427 275427 275427 275427 204020
681 1) 275427 is the sendcount
682 2) The next four elements declare the recvcounts array
683 3) No more values mean that the datatype for sent and receive buffer is the default one, see
684 simgrid::smpi::Datatype::decode().
686 double clock = smpi_process()->simulated_elapsed();
688 unsigned long comm_size = MPI_COMM_WORLD->size();
689 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
690 int sendcount = std::stoi(action[2]);
691 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
692 std::vector<int> disps(comm_size, 0);
694 int datatype_index = 0, disp_index = 0;
695 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
696 datatype_index = 3 + comm_size;
697 disp_index = datatype_index + 1;
698 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
700 disp_index = 3 + comm_size;
701 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
702 datatype_index = 3 + comm_size;
705 if (disp_index != 0) {
706 for (unsigned int i = 0; i < comm_size; i++)
707 disps[i] = std::stoi(action[disp_index + i]);
710 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
712 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
715 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
717 for (unsigned int i = 0; i < comm_size; i++) {
718 (*recvcounts)[i] = std::stoi(action[i + 3]);
720 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
721 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
723 int my_proc_id = Actor::self()->getPid();
725 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
726 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
727 Datatype::encode(MPI_CURRENT_TYPE),
728 Datatype::encode(MPI_CURRENT_TYPE2)));
730 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
733 TRACE_smpi_comm_out(my_proc_id);
734 log_timed_action (action, clock);
737 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
739 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
740 0 allToAllV 100 1 7 10 12 100 1 70 10 5
742 1) 100 is the size of the send buffer *sizeof(int),
743 2) 1 7 10 12 is the sendcounts array
744 3) 100*sizeof(int) is the size of the receiver buffer
745 4) 1 70 10 5 is the recvcounts array
747 double clock = smpi_process()->simulated_elapsed();
749 unsigned long comm_size = MPI_COMM_WORLD->size();
750 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
751 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
752 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
753 std::vector<int> senddisps(comm_size, 0);
754 std::vector<int> recvdisps(comm_size, 0);
756 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
757 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
759 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
760 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
763 int send_buf_size=parse_double(action[2]);
764 int recv_buf_size=parse_double(action[3+comm_size]);
765 int my_proc_id = Actor::self()->getPid();
766 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
767 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
769 for (unsigned int i = 0; i < comm_size; i++) {
770 (*sendcounts)[i] = std::stoi(action[3 + i]);
771 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
773 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
774 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
776 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
777 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
778 Datatype::encode(MPI_CURRENT_TYPE),
779 Datatype::encode(MPI_CURRENT_TYPE2)));
781 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
782 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
784 TRACE_smpi_comm_out(my_proc_id);
785 log_timed_action (action, clock);
788 }} // namespace simgrid::smpi
790 /** @brief Only initialize the replay, don't do it for real */
791 void smpi_replay_init(int* argc, char*** argv)
793 simgrid::smpi::Process::init(argc, argv);
794 smpi_process()->mark_as_initialized();
795 smpi_process()->set_replaying(true);
797 int my_proc_id = Actor::self()->getPid();
798 TRACE_smpi_init(my_proc_id);
799 TRACE_smpi_computing_init(my_proc_id);
800 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
801 TRACE_smpi_comm_out(my_proc_id);
802 xbt_replay_action_register("init", simgrid::smpi::action_init);
803 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
804 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
805 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
806 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
808 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
809 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
810 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
811 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
812 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
813 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
814 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
815 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
816 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
817 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
818 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
819 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
820 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
821 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
822 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
823 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
824 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
825 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
826 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
827 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
828 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
830 //if we have a delayed start, sleep here.
832 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
833 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
834 smpi_execute_flops(value);
836 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
837 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
838 smpi_execute_flops(0.0);
842 /** @brief actually run the replay after initialization */
843 void smpi_replay_main(int* argc, char*** argv)
845 simgrid::xbt::replay_runner(*argc, *argv);
847 /* and now, finalize everything */
848 /* One active process will stop. Decrease the counter*/
849 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
850 if (not get_reqq_self()->empty()) {
851 unsigned int count_requests=get_reqq_self()->size();
852 MPI_Request requests[count_requests];
853 MPI_Status status[count_requests];
856 for (auto const& req : *get_reqq_self()) {
860 simgrid::smpi::Request::waitall(count_requests, requests, status);
862 delete get_reqq_self();
865 if(active_processes==0){
866 /* Last process alive speaking: end the simulated timer */
867 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
868 smpi_free_replay_tmp_buffers();
871 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
873 smpi_process()->finalize();
875 TRACE_smpi_comm_out(Actor::self()->getPid());
876 TRACE_smpi_finalize(Actor::self()->getPid());
879 /** @brief chain a replay initialization and a replay start */
880 void smpi_replay_run(int* argc, char*** argv)
882 smpi_replay_init(argc, argv);
883 smpi_replay_main(argc, argv);