1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
44 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45 std::string s = boost::algorithm::join(action, " ");
46 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
50 static std::vector<MPI_Request>* get_reqq_self()
52 return reqq.at(Actor::self()->getPid());
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
57 reqq.insert({Actor::self()->getPid(), mpi_request});
61 static double parse_double(std::string string)
63 return xbt_str_parse_double(string.c_str(), "%s is not a double");
70 class ActionArgParser {
72 virtual void parse(simgrid::xbt::ReplayAction& action){};
75 class SendRecvParser : public ActionArgParser {
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 template <class T> class ReplayAction {
93 const std::string name;
97 * Used to compute the duration of this action.
104 explicit ReplayAction(std::string name) : name(name), start_time(0), my_proc_id(simgrid::s4u::Actor::self()->getPid())
108 virtual void execute(simgrid::xbt::ReplayAction& action)
110 // Needs to be re-initialized for every action, hence here
111 start_time = smpi_process()->simulated_elapsed();
114 log_timed_action(action, start_time);
117 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
120 class WaitAction : public ReplayAction<ActionArgParser> {
122 WaitAction() : ReplayAction("Wait") {}
123 void kernel(simgrid::xbt::ReplayAction& action) override
125 CHECK_ACTION_PARAMS(action, 0, 0)
128 std::string s = boost::algorithm::join(action, " ");
129 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
130 MPI_Request request = get_reqq_self()->back();
131 get_reqq_self()->pop_back();
133 if (request == nullptr) {
134 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
139 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
141 MPI_Group group = request->comm()->group();
142 int src_traced = group->rank(request->src());
143 int dst_traced = group->rank(request->dst());
144 bool is_wait_for_receive = (request->flags() & RECV);
145 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
146 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
148 Request::wait(&request, &status);
150 TRACE_smpi_comm_out(rank);
151 if (is_wait_for_receive)
152 TRACE_smpi_recv(src_traced, dst_traced, 0);
156 class SendAction : public ReplayAction<SendRecvParser> {
158 SendAction() = delete;
159 SendAction(std::string name) : ReplayAction(name) {}
160 void kernel(simgrid::xbt::ReplayAction& action) override
162 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
164 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
165 Datatype::encode(args.datatype1)));
166 if (not TRACE_smpi_view_internals())
167 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
169 if (name == "send") {
170 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
171 } else if (name == "Isend") {
172 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
173 get_reqq_self()->push_back(request);
175 xbt_die("Don't know this action, %s", name.c_str());
178 TRACE_smpi_comm_out(my_proc_id);
182 class RecvAction : public ReplayAction<SendRecvParser> {
184 RecvAction() = delete;
185 explicit RecvAction(std::string name) : ReplayAction(name) {}
186 void kernel(simgrid::xbt::ReplayAction& action) override
188 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
190 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
191 Datatype::encode(args.datatype1)));
194 // unknown size from the receiver point of view
195 if (args.size <= 0.0) {
196 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
197 args.size = status.count;
200 if (name == "recv") {
201 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
202 } else if (name == "Irecv") {
203 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
204 get_reqq_self()->push_back(request);
207 TRACE_smpi_comm_out(my_proc_id);
208 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
209 if (name == "recv" && not TRACE_smpi_view_internals()) {
210 TRACE_smpi_recv(src_traced, my_proc_id, 0);
215 } // Replay Namespace
217 static void action_init(simgrid::xbt::ReplayAction& action)
219 XBT_DEBUG("Initialize the counters");
220 CHECK_ACTION_PARAMS(action, 0, 1)
221 if (action.size() > 2)
222 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
224 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
226 /* start a simulated timer */
227 smpi_process()->simulated_start();
228 /*initialize the number of active processes */
229 active_processes = smpi_process_count();
231 set_reqq_self(new std::vector<MPI_Request>);
234 static void action_finalize(simgrid::xbt::ReplayAction& action)
239 static void action_comm_size(simgrid::xbt::ReplayAction& action)
241 communicator_size = parse_double(action[2]);
242 log_timed_action (action, smpi_process()->simulated_elapsed());
245 static void action_comm_split(simgrid::xbt::ReplayAction& action)
247 log_timed_action (action, smpi_process()->simulated_elapsed());
250 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
252 log_timed_action (action, smpi_process()->simulated_elapsed());
255 static void action_compute(simgrid::xbt::ReplayAction& action)
257 CHECK_ACTION_PARAMS(action, 1, 0)
258 double clock = smpi_process()->simulated_elapsed();
259 double flops= parse_double(action[2]);
260 int my_proc_id = Actor::self()->getPid();
262 TRACE_smpi_computing_in(my_proc_id, flops);
263 smpi_execute_flops(flops);
264 TRACE_smpi_computing_out(my_proc_id);
266 log_timed_action (action, clock);
269 static void action_send(simgrid::xbt::ReplayAction& action)
271 Replay::SendAction("send").execute(action);
274 static void action_Isend(simgrid::xbt::ReplayAction& action)
276 Replay::SendAction("Isend").execute(action);
279 static void action_recv(simgrid::xbt::ReplayAction& action)
281 Replay::RecvAction("recv").execute(action);
284 static void action_Irecv(simgrid::xbt::ReplayAction& action)
286 Replay::RecvAction("Irecv").execute(action);
289 static void action_test(simgrid::xbt::ReplayAction& action)
291 CHECK_ACTION_PARAMS(action, 0, 0)
292 double clock = smpi_process()->simulated_elapsed();
295 MPI_Request request = get_reqq_self()->back();
296 get_reqq_self()->pop_back();
297 //if request is null here, this may mean that a previous test has succeeded
298 //Different times in traced application and replayed version may lead to this
299 //In this case, ignore the extra calls.
300 if(request!=nullptr){
301 int my_proc_id = Actor::self()->getPid();
302 TRACE_smpi_testing_in(my_proc_id);
304 int flag = Request::test(&request, &status);
306 XBT_DEBUG("MPI_Test result: %d", flag);
307 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
308 get_reqq_self()->push_back(request);
310 TRACE_smpi_testing_out(my_proc_id);
312 log_timed_action (action, clock);
315 static void action_waitall(simgrid::xbt::ReplayAction& action)
317 CHECK_ACTION_PARAMS(action, 0, 0)
318 double clock = smpi_process()->simulated_elapsed();
319 const unsigned int count_requests = get_reqq_self()->size();
321 if (count_requests>0) {
322 MPI_Status status[count_requests];
324 int my_proc_id_traced = Actor::self()->getPid();
325 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
326 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
327 int recvs_snd[count_requests];
328 int recvs_rcv[count_requests];
329 for (unsigned int i = 0; i < count_requests; i++) {
330 const auto& req = (*get_reqq_self())[i];
331 if (req && (req->flags() & RECV)) {
332 recvs_snd[i] = req->src();
333 recvs_rcv[i] = req->dst();
337 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
339 for (unsigned i = 0; i < count_requests; i++) {
340 if (recvs_snd[i]!=-100)
341 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
343 TRACE_smpi_comm_out(my_proc_id_traced);
345 log_timed_action (action, clock);
348 static void action_barrier(simgrid::xbt::ReplayAction& action)
350 double clock = smpi_process()->simulated_elapsed();
351 int my_proc_id = Actor::self()->getPid();
352 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
354 Colls::barrier(MPI_COMM_WORLD);
356 TRACE_smpi_comm_out(my_proc_id);
357 log_timed_action (action, clock);
360 static void action_bcast(simgrid::xbt::ReplayAction& action)
362 CHECK_ACTION_PARAMS(action, 1, 2)
363 double size = parse_double(action[2]);
364 double clock = smpi_process()->simulated_elapsed();
365 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
366 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
367 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
369 int my_proc_id = Actor::self()->getPid();
370 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
371 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
372 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
374 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
376 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
378 TRACE_smpi_comm_out(my_proc_id);
379 log_timed_action (action, clock);
382 static void action_reduce(simgrid::xbt::ReplayAction& action)
384 CHECK_ACTION_PARAMS(action, 2, 2)
385 double comm_size = parse_double(action[2]);
386 double comp_size = parse_double(action[3]);
387 double clock = smpi_process()->simulated_elapsed();
388 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
390 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
392 int my_proc_id = Actor::self()->getPid();
393 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
394 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
395 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
397 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
398 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
399 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
400 smpi_execute_flops(comp_size);
402 TRACE_smpi_comm_out(my_proc_id);
403 log_timed_action (action, clock);
406 static void action_allReduce(simgrid::xbt::ReplayAction& action)
408 CHECK_ACTION_PARAMS(action, 2, 1)
409 double comm_size = parse_double(action[2]);
410 double comp_size = parse_double(action[3]);
412 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
414 double clock = smpi_process()->simulated_elapsed();
415 int my_proc_id = Actor::self()->getPid();
416 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
417 Datatype::encode(MPI_CURRENT_TYPE), ""));
419 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
420 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
421 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
422 smpi_execute_flops(comp_size);
424 TRACE_smpi_comm_out(my_proc_id);
425 log_timed_action (action, clock);
428 static void action_allToAll(simgrid::xbt::ReplayAction& action)
430 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
431 double clock = smpi_process()->simulated_elapsed();
432 unsigned long comm_size = MPI_COMM_WORLD->size();
433 int send_size = parse_double(action[2]);
434 int recv_size = parse_double(action[3]);
435 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
436 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
438 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
439 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
441 int my_proc_id = Actor::self()->getPid();
442 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
443 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
444 Datatype::encode(MPI_CURRENT_TYPE),
445 Datatype::encode(MPI_CURRENT_TYPE2)));
447 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
449 TRACE_smpi_comm_out(my_proc_id);
450 log_timed_action (action, clock);
453 static void action_gather(simgrid::xbt::ReplayAction& action)
455 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
458 1) 68 is the sendcounts
459 2) 68 is the recvcounts
460 3) 0 is the root node
461 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
462 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
464 CHECK_ACTION_PARAMS(action, 2, 3)
465 double clock = smpi_process()->simulated_elapsed();
466 unsigned long comm_size = MPI_COMM_WORLD->size();
467 int send_size = parse_double(action[2]);
468 int recv_size = parse_double(action[3]);
469 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
470 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
472 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
473 void *recv = nullptr;
474 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
475 int rank = MPI_COMM_WORLD->rank();
478 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
480 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
481 Datatype::encode(MPI_CURRENT_TYPE),
482 Datatype::encode(MPI_CURRENT_TYPE2)));
484 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
486 TRACE_smpi_comm_out(Actor::self()->getPid());
487 log_timed_action (action, clock);
490 static void action_scatter(simgrid::xbt::ReplayAction& action)
492 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
495 1) 68 is the sendcounts
496 2) 68 is the recvcounts
497 3) 0 is the root node
498 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
499 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
501 CHECK_ACTION_PARAMS(action, 2, 3)
502 double clock = smpi_process()->simulated_elapsed();
503 unsigned long comm_size = MPI_COMM_WORLD->size();
504 int send_size = parse_double(action[2]);
505 int recv_size = parse_double(action[3]);
506 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
507 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
509 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
510 void* recv = nullptr;
511 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
512 int rank = MPI_COMM_WORLD->rank();
515 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
517 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
518 Datatype::encode(MPI_CURRENT_TYPE),
519 Datatype::encode(MPI_CURRENT_TYPE2)));
521 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
523 TRACE_smpi_comm_out(Actor::self()->getPid());
524 log_timed_action(action, clock);
527 static void action_gatherv(simgrid::xbt::ReplayAction& action)
529 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
530 0 gather 68 68 10 10 10 0 0 0
532 1) 68 is the sendcount
533 2) 68 10 10 10 is the recvcounts
534 3) 0 is the root node
535 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
536 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
538 double clock = smpi_process()->simulated_elapsed();
539 unsigned long comm_size = MPI_COMM_WORLD->size();
540 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
541 int send_size = parse_double(action[2]);
542 std::vector<int> disps(comm_size, 0);
543 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
545 MPI_Datatype MPI_CURRENT_TYPE =
546 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
547 MPI_Datatype MPI_CURRENT_TYPE2{
548 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
550 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
551 void *recv = nullptr;
552 for (unsigned int i = 0; i < comm_size; i++) {
553 (*recvcounts)[i] = std::stoi(action[i + 3]);
555 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
557 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
558 int rank = MPI_COMM_WORLD->rank();
561 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
563 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
564 "gatherV", root, send_size, nullptr, -1, recvcounts,
565 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
567 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
570 TRACE_smpi_comm_out(Actor::self()->getPid());
571 log_timed_action (action, clock);
574 static void action_scatterv(simgrid::xbt::ReplayAction& action)
576 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
577 0 gather 68 10 10 10 68 0 0 0
579 1) 68 10 10 10 is the sendcounts
580 2) 68 is the recvcount
581 3) 0 is the root node
582 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
583 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
585 double clock = smpi_process()->simulated_elapsed();
586 unsigned long comm_size = MPI_COMM_WORLD->size();
587 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
588 int recv_size = parse_double(action[2 + comm_size]);
589 std::vector<int> disps(comm_size, 0);
590 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
592 MPI_Datatype MPI_CURRENT_TYPE =
593 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
594 MPI_Datatype MPI_CURRENT_TYPE2{
595 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
597 void* send = nullptr;
598 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
599 for (unsigned int i = 0; i < comm_size; i++) {
600 (*sendcounts)[i] = std::stoi(action[i + 2]);
602 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
604 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
605 int rank = MPI_COMM_WORLD->rank();
608 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
610 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
611 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
612 Datatype::encode(MPI_CURRENT_TYPE2)));
614 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
617 TRACE_smpi_comm_out(Actor::self()->getPid());
618 log_timed_action(action, clock);
621 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
623 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
624 0 reduceScatter 275427 275427 275427 204020 11346849 0
626 1) The first four values after the name of the action declare the recvcounts array
627 2) The value 11346849 is the amount of instructions
628 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
630 double clock = smpi_process()->simulated_elapsed();
631 unsigned long comm_size = MPI_COMM_WORLD->size();
632 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
633 int comp_size = parse_double(action[2+comm_size]);
634 int my_proc_id = Actor::self()->getPid();
635 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
636 MPI_Datatype MPI_CURRENT_TYPE =
637 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
639 for (unsigned int i = 0; i < comm_size; i++) {
640 recvcounts->push_back(std::stoi(action[i + 2]));
642 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
644 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
645 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
646 std::to_string(comp_size), /* ugly hack to print comp_size */
647 Datatype::encode(MPI_CURRENT_TYPE)));
649 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
650 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
652 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
653 smpi_execute_flops(comp_size);
655 TRACE_smpi_comm_out(my_proc_id);
656 log_timed_action (action, clock);
659 static void action_allgather(simgrid::xbt::ReplayAction& action)
661 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
662 0 allGather 275427 275427
664 1) 275427 is the sendcount
665 2) 275427 is the recvcount
666 3) No more values mean that the datatype for sent and receive buffer is the default one, see
667 simgrid::smpi::Datatype::decode().
669 double clock = smpi_process()->simulated_elapsed();
671 CHECK_ACTION_PARAMS(action, 2, 2)
672 int sendcount = std::stoi(action[2]);
673 int recvcount = std::stoi(action[3]);
675 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
676 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
678 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
679 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
681 int my_proc_id = Actor::self()->getPid();
683 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
684 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
685 Datatype::encode(MPI_CURRENT_TYPE),
686 Datatype::encode(MPI_CURRENT_TYPE2)));
688 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
690 TRACE_smpi_comm_out(my_proc_id);
691 log_timed_action (action, clock);
694 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
696 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
697 0 allGatherV 275427 275427 275427 275427 204020
699 1) 275427 is the sendcount
700 2) The next four elements declare the recvcounts array
701 3) No more values mean that the datatype for sent and receive buffer is the default one, see
702 simgrid::smpi::Datatype::decode().
704 double clock = smpi_process()->simulated_elapsed();
706 unsigned long comm_size = MPI_COMM_WORLD->size();
707 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
708 int sendcount = std::stoi(action[2]);
709 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
710 std::vector<int> disps(comm_size, 0);
712 int datatype_index = 0, disp_index = 0;
713 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
714 datatype_index = 3 + comm_size;
715 disp_index = datatype_index + 1;
716 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
718 disp_index = 3 + comm_size;
719 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
720 datatype_index = 3 + comm_size;
723 if (disp_index != 0) {
724 for (unsigned int i = 0; i < comm_size; i++)
725 disps[i] = std::stoi(action[disp_index + i]);
728 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
730 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
733 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
735 for (unsigned int i = 0; i < comm_size; i++) {
736 (*recvcounts)[i] = std::stoi(action[i + 3]);
738 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
739 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
741 int my_proc_id = Actor::self()->getPid();
743 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
744 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
745 Datatype::encode(MPI_CURRENT_TYPE),
746 Datatype::encode(MPI_CURRENT_TYPE2)));
748 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
751 TRACE_smpi_comm_out(my_proc_id);
752 log_timed_action (action, clock);
755 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
757 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
758 0 allToAllV 100 1 7 10 12 100 1 70 10 5
760 1) 100 is the size of the send buffer *sizeof(int),
761 2) 1 7 10 12 is the sendcounts array
762 3) 100*sizeof(int) is the size of the receiver buffer
763 4) 1 70 10 5 is the recvcounts array
765 double clock = smpi_process()->simulated_elapsed();
767 unsigned long comm_size = MPI_COMM_WORLD->size();
768 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
769 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
770 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
771 std::vector<int> senddisps(comm_size, 0);
772 std::vector<int> recvdisps(comm_size, 0);
774 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
775 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
777 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
778 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
781 int send_buf_size=parse_double(action[2]);
782 int recv_buf_size=parse_double(action[3+comm_size]);
783 int my_proc_id = Actor::self()->getPid();
784 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
785 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
787 for (unsigned int i = 0; i < comm_size; i++) {
788 (*sendcounts)[i] = std::stoi(action[3 + i]);
789 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
791 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
792 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
794 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
795 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
796 Datatype::encode(MPI_CURRENT_TYPE),
797 Datatype::encode(MPI_CURRENT_TYPE2)));
799 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
800 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
802 TRACE_smpi_comm_out(my_proc_id);
803 log_timed_action (action, clock);
806 }} // namespace simgrid::smpi
808 /** @brief Only initialize the replay, don't do it for real */
809 void smpi_replay_init(int* argc, char*** argv)
811 simgrid::smpi::Process::init(argc, argv);
812 smpi_process()->mark_as_initialized();
813 smpi_process()->set_replaying(true);
815 int my_proc_id = Actor::self()->getPid();
816 TRACE_smpi_init(my_proc_id);
817 TRACE_smpi_computing_init(my_proc_id);
818 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
819 TRACE_smpi_comm_out(my_proc_id);
820 xbt_replay_action_register("init", simgrid::smpi::action_init);
821 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
822 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
823 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
824 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
825 xbt_replay_action_register("send", simgrid::smpi::action_send);
826 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
827 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
828 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
829 xbt_replay_action_register("test", simgrid::smpi::action_test);
830 std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
831 xbt_replay_action_register("wait",
832 std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
833 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
834 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
835 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
836 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
837 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
838 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
839 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
840 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
841 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
842 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
843 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
844 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
845 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
846 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
847 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
849 //if we have a delayed start, sleep here.
851 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
852 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
853 smpi_execute_flops(value);
855 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
856 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
857 smpi_execute_flops(0.0);
861 /** @brief actually run the replay after initialization */
862 void smpi_replay_main(int* argc, char*** argv)
864 simgrid::xbt::replay_runner(*argc, *argv);
866 /* and now, finalize everything */
867 /* One active process will stop. Decrease the counter*/
868 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
869 if (not get_reqq_self()->empty()) {
870 unsigned int count_requests=get_reqq_self()->size();
871 MPI_Request requests[count_requests];
872 MPI_Status status[count_requests];
875 for (auto const& req : *get_reqq_self()) {
879 simgrid::smpi::Request::waitall(count_requests, requests, status);
881 delete get_reqq_self();
884 if(active_processes==0){
885 /* Last process alive speaking: end the simulated timer */
886 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
887 smpi_free_replay_tmp_buffers();
890 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
892 smpi_process()->finalize();
894 TRACE_smpi_comm_out(Actor::self()->getPid());
895 TRACE_smpi_finalize(Actor::self()->getPid());
898 /** @brief chain a replay initialization and a replay start */
899 void smpi_replay_run(int* argc, char*** argv)
901 smpi_replay_init(argc, argv);
902 smpi_replay_main(argc, argv);