1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
44 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45 std::string s = boost::algorithm::join(action, " ");
46 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
50 static std::vector<MPI_Request>* get_reqq_self()
52 return reqq.at(Actor::self()->getPid());
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
57 reqq.insert({Actor::self()->getPid(), mpi_request});
61 static double parse_double(std::string string)
63 return xbt_str_parse_double(string.c_str(), "%s is not a double");
70 class ActionArgParser {
72 virtual void parse(simgrid::xbt::ReplayAction& action){};
75 class SendRecvParser : public ActionArgParser {
77 /* communication partner; if we send, this is the receiver and vice versa */
80 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
82 void parse(simgrid::xbt::ReplayAction& action) override
84 CHECK_ACTION_PARAMS(action, 2, 1)
85 partner = std::stoi(action[2]);
86 size = parse_double(action[3]);
87 if (action.size() > 4)
88 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
92 class ComputeParser : public ActionArgParser {
94 /* communication partner; if we send, this is the receiver and vice versa */
97 void parse(simgrid::xbt::ReplayAction& action) override
99 CHECK_ACTION_PARAMS(action, 1, 0)
100 flops = parse_double(action[2]);
104 template <class T> class ReplayAction {
106 const std::string name;
112 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid())
116 virtual void execute(simgrid::xbt::ReplayAction& action)
118 // Needs to be re-initialized for every action, hence here
119 double start_time = smpi_process()->simulated_elapsed();
122 log_timed_action(action, start_time);
125 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
128 class WaitAction : public ReplayAction<ActionArgParser> {
130 WaitAction() : ReplayAction("Wait") {}
131 void kernel(simgrid::xbt::ReplayAction& action) override
133 CHECK_ACTION_PARAMS(action, 0, 0)
136 std::string s = boost::algorithm::join(action, " ");
137 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
138 MPI_Request request = get_reqq_self()->back();
139 get_reqq_self()->pop_back();
141 if (request == nullptr) {
142 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
147 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
149 MPI_Group group = request->comm()->group();
150 int src_traced = group->rank(request->src());
151 int dst_traced = group->rank(request->dst());
152 bool is_wait_for_receive = (request->flags() & RECV);
153 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
154 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
156 Request::wait(&request, &status);
158 TRACE_smpi_comm_out(rank);
159 if (is_wait_for_receive)
160 TRACE_smpi_recv(src_traced, dst_traced, 0);
164 class SendAction : public ReplayAction<SendRecvParser> {
166 SendAction() = delete;
167 SendAction(std::string name) : ReplayAction(name) {}
168 void kernel(simgrid::xbt::ReplayAction& action) override
170 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
172 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
173 Datatype::encode(args.datatype1)));
174 if (not TRACE_smpi_view_internals())
175 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
177 if (name == "send") {
178 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
179 } else if (name == "Isend") {
180 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
181 get_reqq_self()->push_back(request);
183 xbt_die("Don't know this action, %s", name.c_str());
186 TRACE_smpi_comm_out(my_proc_id);
190 class RecvAction : public ReplayAction<SendRecvParser> {
192 RecvAction() = delete;
193 explicit RecvAction(std::string name) : ReplayAction(name) {}
194 void kernel(simgrid::xbt::ReplayAction& action) override
196 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
198 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
199 Datatype::encode(args.datatype1)));
202 // unknown size from the receiver point of view
203 if (args.size <= 0.0) {
204 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
205 args.size = status.count;
208 if (name == "recv") {
209 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
210 } else if (name == "Irecv") {
211 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
212 get_reqq_self()->push_back(request);
215 TRACE_smpi_comm_out(my_proc_id);
216 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
217 if (name == "recv" && not TRACE_smpi_view_internals()) {
218 TRACE_smpi_recv(src_traced, my_proc_id, 0);
223 class ComputeAction : public ReplayAction<ComputeParser> {
225 ComputeAction() : ReplayAction("compute") {}
226 void kernel(simgrid::xbt::ReplayAction& action) override
228 TRACE_smpi_computing_in(my_proc_id, args.flops);
229 smpi_execute_flops(args.flops);
230 TRACE_smpi_computing_out(my_proc_id);
234 } // Replay Namespace
236 static void action_init(simgrid::xbt::ReplayAction& action)
238 XBT_DEBUG("Initialize the counters");
239 CHECK_ACTION_PARAMS(action, 0, 1)
240 if (action.size() > 2)
241 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
243 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
245 /* start a simulated timer */
246 smpi_process()->simulated_start();
247 /*initialize the number of active processes */
248 active_processes = smpi_process_count();
250 set_reqq_self(new std::vector<MPI_Request>);
253 static void action_finalize(simgrid::xbt::ReplayAction& action)
258 static void action_comm_size(simgrid::xbt::ReplayAction& action)
260 communicator_size = parse_double(action[2]);
261 log_timed_action (action, smpi_process()->simulated_elapsed());
264 static void action_comm_split(simgrid::xbt::ReplayAction& action)
266 log_timed_action (action, smpi_process()->simulated_elapsed());
269 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
271 log_timed_action (action, smpi_process()->simulated_elapsed());
274 static void action_compute(simgrid::xbt::ReplayAction& action)
276 Replay::ComputeAction().execute(action);
279 static void action_test(simgrid::xbt::ReplayAction& action)
281 CHECK_ACTION_PARAMS(action, 0, 0)
282 double clock = smpi_process()->simulated_elapsed();
285 MPI_Request request = get_reqq_self()->back();
286 get_reqq_self()->pop_back();
287 //if request is null here, this may mean that a previous test has succeeded
288 //Different times in traced application and replayed version may lead to this
289 //In this case, ignore the extra calls.
290 if(request!=nullptr){
291 int my_proc_id = Actor::self()->getPid();
292 TRACE_smpi_testing_in(my_proc_id);
294 int flag = Request::test(&request, &status);
296 XBT_DEBUG("MPI_Test result: %d", flag);
297 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
298 get_reqq_self()->push_back(request);
300 TRACE_smpi_testing_out(my_proc_id);
302 log_timed_action (action, clock);
305 static void action_waitall(simgrid::xbt::ReplayAction& action)
307 CHECK_ACTION_PARAMS(action, 0, 0)
308 double clock = smpi_process()->simulated_elapsed();
309 const unsigned int count_requests = get_reqq_self()->size();
311 if (count_requests>0) {
312 MPI_Status status[count_requests];
314 int my_proc_id_traced = Actor::self()->getPid();
315 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
316 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
317 int recvs_snd[count_requests];
318 int recvs_rcv[count_requests];
319 for (unsigned int i = 0; i < count_requests; i++) {
320 const auto& req = (*get_reqq_self())[i];
321 if (req && (req->flags() & RECV)) {
322 recvs_snd[i] = req->src();
323 recvs_rcv[i] = req->dst();
327 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
329 for (unsigned i = 0; i < count_requests; i++) {
330 if (recvs_snd[i]!=-100)
331 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
333 TRACE_smpi_comm_out(my_proc_id_traced);
335 log_timed_action (action, clock);
338 static void action_barrier(simgrid::xbt::ReplayAction& action)
340 double clock = smpi_process()->simulated_elapsed();
341 int my_proc_id = Actor::self()->getPid();
342 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
344 Colls::barrier(MPI_COMM_WORLD);
346 TRACE_smpi_comm_out(my_proc_id);
347 log_timed_action (action, clock);
350 static void action_bcast(simgrid::xbt::ReplayAction& action)
352 CHECK_ACTION_PARAMS(action, 1, 2)
353 double size = parse_double(action[2]);
354 double clock = smpi_process()->simulated_elapsed();
355 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
356 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
357 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
359 int my_proc_id = Actor::self()->getPid();
360 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
361 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
362 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
364 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
366 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
368 TRACE_smpi_comm_out(my_proc_id);
369 log_timed_action (action, clock);
372 static void action_reduce(simgrid::xbt::ReplayAction& action)
374 CHECK_ACTION_PARAMS(action, 2, 2)
375 double comm_size = parse_double(action[2]);
376 double comp_size = parse_double(action[3]);
377 double clock = smpi_process()->simulated_elapsed();
378 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
380 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
382 int my_proc_id = Actor::self()->getPid();
383 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
384 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
385 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
387 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
388 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
389 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
390 smpi_execute_flops(comp_size);
392 TRACE_smpi_comm_out(my_proc_id);
393 log_timed_action (action, clock);
396 static void action_allReduce(simgrid::xbt::ReplayAction& action)
398 CHECK_ACTION_PARAMS(action, 2, 1)
399 double comm_size = parse_double(action[2]);
400 double comp_size = parse_double(action[3]);
402 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
404 double clock = smpi_process()->simulated_elapsed();
405 int my_proc_id = Actor::self()->getPid();
406 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
407 Datatype::encode(MPI_CURRENT_TYPE), ""));
409 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
410 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
411 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
412 smpi_execute_flops(comp_size);
414 TRACE_smpi_comm_out(my_proc_id);
415 log_timed_action (action, clock);
418 static void action_allToAll(simgrid::xbt::ReplayAction& action)
420 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
421 double clock = smpi_process()->simulated_elapsed();
422 unsigned long comm_size = MPI_COMM_WORLD->size();
423 int send_size = parse_double(action[2]);
424 int recv_size = parse_double(action[3]);
425 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
426 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
428 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
429 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
431 int my_proc_id = Actor::self()->getPid();
432 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
433 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
434 Datatype::encode(MPI_CURRENT_TYPE),
435 Datatype::encode(MPI_CURRENT_TYPE2)));
437 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
439 TRACE_smpi_comm_out(my_proc_id);
440 log_timed_action (action, clock);
443 static void action_gather(simgrid::xbt::ReplayAction& action)
445 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
448 1) 68 is the sendcounts
449 2) 68 is the recvcounts
450 3) 0 is the root node
451 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
452 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
454 CHECK_ACTION_PARAMS(action, 2, 3)
455 double clock = smpi_process()->simulated_elapsed();
456 unsigned long comm_size = MPI_COMM_WORLD->size();
457 int send_size = parse_double(action[2]);
458 int recv_size = parse_double(action[3]);
459 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
460 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
462 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
463 void *recv = nullptr;
464 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
465 int rank = MPI_COMM_WORLD->rank();
468 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
470 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
471 Datatype::encode(MPI_CURRENT_TYPE),
472 Datatype::encode(MPI_CURRENT_TYPE2)));
474 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
476 TRACE_smpi_comm_out(Actor::self()->getPid());
477 log_timed_action (action, clock);
480 static void action_scatter(simgrid::xbt::ReplayAction& action)
482 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
485 1) 68 is the sendcounts
486 2) 68 is the recvcounts
487 3) 0 is the root node
488 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
489 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
491 CHECK_ACTION_PARAMS(action, 2, 3)
492 double clock = smpi_process()->simulated_elapsed();
493 unsigned long comm_size = MPI_COMM_WORLD->size();
494 int send_size = parse_double(action[2]);
495 int recv_size = parse_double(action[3]);
496 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
497 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
499 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
500 void* recv = nullptr;
501 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
502 int rank = MPI_COMM_WORLD->rank();
505 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
507 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
508 Datatype::encode(MPI_CURRENT_TYPE),
509 Datatype::encode(MPI_CURRENT_TYPE2)));
511 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
513 TRACE_smpi_comm_out(Actor::self()->getPid());
514 log_timed_action(action, clock);
517 static void action_gatherv(simgrid::xbt::ReplayAction& action)
519 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
520 0 gather 68 68 10 10 10 0 0 0
522 1) 68 is the sendcount
523 2) 68 10 10 10 is the recvcounts
524 3) 0 is the root node
525 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
526 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
528 double clock = smpi_process()->simulated_elapsed();
529 unsigned long comm_size = MPI_COMM_WORLD->size();
530 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
531 int send_size = parse_double(action[2]);
532 std::vector<int> disps(comm_size, 0);
533 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
535 MPI_Datatype MPI_CURRENT_TYPE =
536 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
537 MPI_Datatype MPI_CURRENT_TYPE2{
538 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
540 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
541 void *recv = nullptr;
542 for (unsigned int i = 0; i < comm_size; i++) {
543 (*recvcounts)[i] = std::stoi(action[i + 3]);
545 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
547 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
548 int rank = MPI_COMM_WORLD->rank();
551 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
553 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
554 "gatherV", root, send_size, nullptr, -1, recvcounts,
555 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
557 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
560 TRACE_smpi_comm_out(Actor::self()->getPid());
561 log_timed_action (action, clock);
564 static void action_scatterv(simgrid::xbt::ReplayAction& action)
566 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
567 0 gather 68 10 10 10 68 0 0 0
569 1) 68 10 10 10 is the sendcounts
570 2) 68 is the recvcount
571 3) 0 is the root node
572 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
573 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
575 double clock = smpi_process()->simulated_elapsed();
576 unsigned long comm_size = MPI_COMM_WORLD->size();
577 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
578 int recv_size = parse_double(action[2 + comm_size]);
579 std::vector<int> disps(comm_size, 0);
580 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
582 MPI_Datatype MPI_CURRENT_TYPE =
583 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
584 MPI_Datatype MPI_CURRENT_TYPE2{
585 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
587 void* send = nullptr;
588 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
589 for (unsigned int i = 0; i < comm_size; i++) {
590 (*sendcounts)[i] = std::stoi(action[i + 2]);
592 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
594 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
595 int rank = MPI_COMM_WORLD->rank();
598 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
600 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
601 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
602 Datatype::encode(MPI_CURRENT_TYPE2)));
604 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
607 TRACE_smpi_comm_out(Actor::self()->getPid());
608 log_timed_action(action, clock);
611 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
613 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
614 0 reduceScatter 275427 275427 275427 204020 11346849 0
616 1) The first four values after the name of the action declare the recvcounts array
617 2) The value 11346849 is the amount of instructions
618 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
620 double clock = smpi_process()->simulated_elapsed();
621 unsigned long comm_size = MPI_COMM_WORLD->size();
622 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
623 int comp_size = parse_double(action[2+comm_size]);
624 int my_proc_id = Actor::self()->getPid();
625 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
626 MPI_Datatype MPI_CURRENT_TYPE =
627 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
629 for (unsigned int i = 0; i < comm_size; i++) {
630 recvcounts->push_back(std::stoi(action[i + 2]));
632 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
634 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
635 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
636 std::to_string(comp_size), /* ugly hack to print comp_size */
637 Datatype::encode(MPI_CURRENT_TYPE)));
639 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
640 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
642 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
643 smpi_execute_flops(comp_size);
645 TRACE_smpi_comm_out(my_proc_id);
646 log_timed_action (action, clock);
649 static void action_allgather(simgrid::xbt::ReplayAction& action)
651 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
652 0 allGather 275427 275427
654 1) 275427 is the sendcount
655 2) 275427 is the recvcount
656 3) No more values mean that the datatype for sent and receive buffer is the default one, see
657 simgrid::smpi::Datatype::decode().
659 double clock = smpi_process()->simulated_elapsed();
661 CHECK_ACTION_PARAMS(action, 2, 2)
662 int sendcount = std::stoi(action[2]);
663 int recvcount = std::stoi(action[3]);
665 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
666 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
668 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
669 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
671 int my_proc_id = Actor::self()->getPid();
673 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
674 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
675 Datatype::encode(MPI_CURRENT_TYPE),
676 Datatype::encode(MPI_CURRENT_TYPE2)));
678 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
680 TRACE_smpi_comm_out(my_proc_id);
681 log_timed_action (action, clock);
684 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
686 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
687 0 allGatherV 275427 275427 275427 275427 204020
689 1) 275427 is the sendcount
690 2) The next four elements declare the recvcounts array
691 3) No more values mean that the datatype for sent and receive buffer is the default one, see
692 simgrid::smpi::Datatype::decode().
694 double clock = smpi_process()->simulated_elapsed();
696 unsigned long comm_size = MPI_COMM_WORLD->size();
697 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
698 int sendcount = std::stoi(action[2]);
699 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
700 std::vector<int> disps(comm_size, 0);
702 int datatype_index = 0, disp_index = 0;
703 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
704 datatype_index = 3 + comm_size;
705 disp_index = datatype_index + 1;
706 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
708 disp_index = 3 + comm_size;
709 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
710 datatype_index = 3 + comm_size;
713 if (disp_index != 0) {
714 for (unsigned int i = 0; i < comm_size; i++)
715 disps[i] = std::stoi(action[disp_index + i]);
718 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
720 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
723 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
725 for (unsigned int i = 0; i < comm_size; i++) {
726 (*recvcounts)[i] = std::stoi(action[i + 3]);
728 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
729 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
731 int my_proc_id = Actor::self()->getPid();
733 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
734 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
735 Datatype::encode(MPI_CURRENT_TYPE),
736 Datatype::encode(MPI_CURRENT_TYPE2)));
738 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
741 TRACE_smpi_comm_out(my_proc_id);
742 log_timed_action (action, clock);
745 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
747 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
748 0 allToAllV 100 1 7 10 12 100 1 70 10 5
750 1) 100 is the size of the send buffer *sizeof(int),
751 2) 1 7 10 12 is the sendcounts array
752 3) 100*sizeof(int) is the size of the receiver buffer
753 4) 1 70 10 5 is the recvcounts array
755 double clock = smpi_process()->simulated_elapsed();
757 unsigned long comm_size = MPI_COMM_WORLD->size();
758 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
759 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
760 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
761 std::vector<int> senddisps(comm_size, 0);
762 std::vector<int> recvdisps(comm_size, 0);
764 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
765 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
767 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
768 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
771 int send_buf_size=parse_double(action[2]);
772 int recv_buf_size=parse_double(action[3+comm_size]);
773 int my_proc_id = Actor::self()->getPid();
774 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
775 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
777 for (unsigned int i = 0; i < comm_size; i++) {
778 (*sendcounts)[i] = std::stoi(action[3 + i]);
779 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
781 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
782 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
784 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
785 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
786 Datatype::encode(MPI_CURRENT_TYPE),
787 Datatype::encode(MPI_CURRENT_TYPE2)));
789 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
790 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
792 TRACE_smpi_comm_out(my_proc_id);
793 log_timed_action (action, clock);
796 }} // namespace simgrid::smpi
798 /** @brief Only initialize the replay, don't do it for real */
799 void smpi_replay_init(int* argc, char*** argv)
801 simgrid::smpi::Process::init(argc, argv);
802 smpi_process()->mark_as_initialized();
803 smpi_process()->set_replaying(true);
805 int my_proc_id = Actor::self()->getPid();
806 TRACE_smpi_init(my_proc_id);
807 TRACE_smpi_computing_init(my_proc_id);
808 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
809 TRACE_smpi_comm_out(my_proc_id);
810 xbt_replay_action_register("init", simgrid::smpi::action_init);
811 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
812 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
813 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
814 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
816 std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
817 std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
818 std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
819 std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
820 std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
822 xbt_replay_action_register("send",
823 std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
824 xbt_replay_action_register("Isend",
825 std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
826 xbt_replay_action_register("recv",
827 std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
828 xbt_replay_action_register("Irecv",
829 std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
830 xbt_replay_action_register("test", simgrid::smpi::action_test);
831 xbt_replay_action_register("wait",
832 std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
833 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
834 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
835 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
836 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
837 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
838 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
839 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
840 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
841 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
842 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
843 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
844 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
845 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
846 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
847 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
849 //if we have a delayed start, sleep here.
851 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
852 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
853 smpi_execute_flops(value);
855 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
856 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
857 smpi_execute_flops(0.0);
861 /** @brief actually run the replay after initialization */
862 void smpi_replay_main(int* argc, char*** argv)
864 simgrid::xbt::replay_runner(*argc, *argv);
866 /* and now, finalize everything */
867 /* One active process will stop. Decrease the counter*/
868 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
869 if (not get_reqq_self()->empty()) {
870 unsigned int count_requests=get_reqq_self()->size();
871 MPI_Request requests[count_requests];
872 MPI_Status status[count_requests];
875 for (auto const& req : *get_reqq_self()) {
879 simgrid::smpi::Request::waitall(count_requests, requests, status);
881 delete get_reqq_self();
884 if(active_processes==0){
885 /* Last process alive speaking: end the simulated timer */
886 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
887 smpi_free_replay_tmp_buffers();
890 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
892 smpi_process()->finalize();
894 TRACE_smpi_comm_out(Actor::self()->getPid());
895 TRACE_smpi_finalize(Actor::self()->getPid());
898 /** @brief chain a replay initialization and a replay start */
899 void smpi_replay_run(int* argc, char*** argv)
901 smpi_replay_init(argc, argv);
902 smpi_replay_main(argc, argv);