1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
44 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45 std::string s = boost::algorithm::join(action, " ");
46 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
50 static std::vector<MPI_Request>* get_reqq_self()
52 return reqq.at(Actor::self()->getPid());
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
57 reqq.insert({Actor::self()->getPid(), mpi_request});
61 static double parse_double(std::string string)
63 return xbt_str_parse_double(string.c_str(), "%s is not a double");
70 class ActionArgParser {
72 virtual void parse(simgrid::xbt::ReplayAction& action){};
75 class SendRecvParser : public ActionArgParser {
77 /* communication partner; if we send, this is the receiver and vice versa */
80 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
82 void parse(simgrid::xbt::ReplayAction& action) override
84 CHECK_ACTION_PARAMS(action, 2, 1)
85 partner = std::stoi(action[2]);
86 size = parse_double(action[3]);
87 if (action.size() > 4)
88 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
92 class ComputeParser : public ActionArgParser {
94 /* communication partner; if we send, this is the receiver and vice versa */
97 void parse(simgrid::xbt::ReplayAction& action) override
99 CHECK_ACTION_PARAMS(action, 1, 0)
100 flops = parse_double(action[2]);
104 template <class T> class ReplayAction {
106 const std::string name;
110 * Used to compute the duration of this action.
117 explicit ReplayAction(std::string name) : name(name), start_time(0), my_proc_id(simgrid::s4u::Actor::self()->getPid())
121 virtual void execute(simgrid::xbt::ReplayAction& action)
123 // Needs to be re-initialized for every action, hence here
124 start_time = smpi_process()->simulated_elapsed();
127 log_timed_action(action, start_time);
130 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
133 class WaitAction : public ReplayAction<ActionArgParser> {
135 WaitAction() : ReplayAction("Wait") {}
136 void kernel(simgrid::xbt::ReplayAction& action) override
138 CHECK_ACTION_PARAMS(action, 0, 0)
141 std::string s = boost::algorithm::join(action, " ");
142 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
143 MPI_Request request = get_reqq_self()->back();
144 get_reqq_self()->pop_back();
146 if (request == nullptr) {
147 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
152 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
154 MPI_Group group = request->comm()->group();
155 int src_traced = group->rank(request->src());
156 int dst_traced = group->rank(request->dst());
157 bool is_wait_for_receive = (request->flags() & RECV);
158 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
159 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
161 Request::wait(&request, &status);
163 TRACE_smpi_comm_out(rank);
164 if (is_wait_for_receive)
165 TRACE_smpi_recv(src_traced, dst_traced, 0);
169 class SendAction : public ReplayAction<SendRecvParser> {
171 SendAction() = delete;
172 SendAction(std::string name) : ReplayAction(name) {}
173 void kernel(simgrid::xbt::ReplayAction& action) override
175 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
177 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
178 Datatype::encode(args.datatype1)));
179 if (not TRACE_smpi_view_internals())
180 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
182 if (name == "send") {
183 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
184 } else if (name == "Isend") {
185 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
186 get_reqq_self()->push_back(request);
188 xbt_die("Don't know this action, %s", name.c_str());
191 TRACE_smpi_comm_out(my_proc_id);
195 class RecvAction : public ReplayAction<SendRecvParser> {
197 RecvAction() = delete;
198 explicit RecvAction(std::string name) : ReplayAction(name) {}
199 void kernel(simgrid::xbt::ReplayAction& action) override
201 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
203 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
204 Datatype::encode(args.datatype1)));
207 // unknown size from the receiver point of view
208 if (args.size <= 0.0) {
209 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
210 args.size = status.count;
213 if (name == "recv") {
214 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
215 } else if (name == "Irecv") {
216 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
217 get_reqq_self()->push_back(request);
220 TRACE_smpi_comm_out(my_proc_id);
221 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
222 if (name == "recv" && not TRACE_smpi_view_internals()) {
223 TRACE_smpi_recv(src_traced, my_proc_id, 0);
228 class ComputeAction : public ReplayAction<ComputeParser> {
230 ComputeAction() : ReplayAction("compute") {}
231 void kernel(simgrid::xbt::ReplayAction& action) override
233 TRACE_smpi_computing_in(my_proc_id, args.flops);
234 smpi_execute_flops(args.flops);
235 TRACE_smpi_computing_out(my_proc_id);
239 } // Replay Namespace
241 static void action_init(simgrid::xbt::ReplayAction& action)
243 XBT_DEBUG("Initialize the counters");
244 CHECK_ACTION_PARAMS(action, 0, 1)
245 if (action.size() > 2)
246 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
248 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
250 /* start a simulated timer */
251 smpi_process()->simulated_start();
252 /*initialize the number of active processes */
253 active_processes = smpi_process_count();
255 set_reqq_self(new std::vector<MPI_Request>);
258 static void action_finalize(simgrid::xbt::ReplayAction& action)
263 static void action_comm_size(simgrid::xbt::ReplayAction& action)
265 communicator_size = parse_double(action[2]);
266 log_timed_action (action, smpi_process()->simulated_elapsed());
269 static void action_comm_split(simgrid::xbt::ReplayAction& action)
271 log_timed_action (action, smpi_process()->simulated_elapsed());
274 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
276 log_timed_action (action, smpi_process()->simulated_elapsed());
279 static void action_compute(simgrid::xbt::ReplayAction& action)
281 Replay::ComputeAction().execute(action);
284 static void action_test(simgrid::xbt::ReplayAction& action)
286 CHECK_ACTION_PARAMS(action, 0, 0)
287 double clock = smpi_process()->simulated_elapsed();
290 MPI_Request request = get_reqq_self()->back();
291 get_reqq_self()->pop_back();
292 //if request is null here, this may mean that a previous test has succeeded
293 //Different times in traced application and replayed version may lead to this
294 //In this case, ignore the extra calls.
295 if(request!=nullptr){
296 int my_proc_id = Actor::self()->getPid();
297 TRACE_smpi_testing_in(my_proc_id);
299 int flag = Request::test(&request, &status);
301 XBT_DEBUG("MPI_Test result: %d", flag);
302 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
303 get_reqq_self()->push_back(request);
305 TRACE_smpi_testing_out(my_proc_id);
307 log_timed_action (action, clock);
310 static void action_waitall(simgrid::xbt::ReplayAction& action)
312 CHECK_ACTION_PARAMS(action, 0, 0)
313 double clock = smpi_process()->simulated_elapsed();
314 const unsigned int count_requests = get_reqq_self()->size();
316 if (count_requests>0) {
317 MPI_Status status[count_requests];
319 int my_proc_id_traced = Actor::self()->getPid();
320 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
321 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
322 int recvs_snd[count_requests];
323 int recvs_rcv[count_requests];
324 for (unsigned int i = 0; i < count_requests; i++) {
325 const auto& req = (*get_reqq_self())[i];
326 if (req && (req->flags() & RECV)) {
327 recvs_snd[i] = req->src();
328 recvs_rcv[i] = req->dst();
332 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
334 for (unsigned i = 0; i < count_requests; i++) {
335 if (recvs_snd[i]!=-100)
336 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
338 TRACE_smpi_comm_out(my_proc_id_traced);
340 log_timed_action (action, clock);
343 static void action_barrier(simgrid::xbt::ReplayAction& action)
345 double clock = smpi_process()->simulated_elapsed();
346 int my_proc_id = Actor::self()->getPid();
347 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
349 Colls::barrier(MPI_COMM_WORLD);
351 TRACE_smpi_comm_out(my_proc_id);
352 log_timed_action (action, clock);
355 static void action_bcast(simgrid::xbt::ReplayAction& action)
357 CHECK_ACTION_PARAMS(action, 1, 2)
358 double size = parse_double(action[2]);
359 double clock = smpi_process()->simulated_elapsed();
360 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
361 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
362 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
364 int my_proc_id = Actor::self()->getPid();
365 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
366 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
367 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
369 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
371 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
373 TRACE_smpi_comm_out(my_proc_id);
374 log_timed_action (action, clock);
377 static void action_reduce(simgrid::xbt::ReplayAction& action)
379 CHECK_ACTION_PARAMS(action, 2, 2)
380 double comm_size = parse_double(action[2]);
381 double comp_size = parse_double(action[3]);
382 double clock = smpi_process()->simulated_elapsed();
383 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
385 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
387 int my_proc_id = Actor::self()->getPid();
388 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
389 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
390 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
392 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
393 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
394 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
395 smpi_execute_flops(comp_size);
397 TRACE_smpi_comm_out(my_proc_id);
398 log_timed_action (action, clock);
401 static void action_allReduce(simgrid::xbt::ReplayAction& action)
403 CHECK_ACTION_PARAMS(action, 2, 1)
404 double comm_size = parse_double(action[2]);
405 double comp_size = parse_double(action[3]);
407 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
409 double clock = smpi_process()->simulated_elapsed();
410 int my_proc_id = Actor::self()->getPid();
411 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
412 Datatype::encode(MPI_CURRENT_TYPE), ""));
414 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
415 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
416 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
417 smpi_execute_flops(comp_size);
419 TRACE_smpi_comm_out(my_proc_id);
420 log_timed_action (action, clock);
423 static void action_allToAll(simgrid::xbt::ReplayAction& action)
425 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
426 double clock = smpi_process()->simulated_elapsed();
427 unsigned long comm_size = MPI_COMM_WORLD->size();
428 int send_size = parse_double(action[2]);
429 int recv_size = parse_double(action[3]);
430 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
431 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
433 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
434 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
436 int my_proc_id = Actor::self()->getPid();
437 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
438 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
439 Datatype::encode(MPI_CURRENT_TYPE),
440 Datatype::encode(MPI_CURRENT_TYPE2)));
442 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
444 TRACE_smpi_comm_out(my_proc_id);
445 log_timed_action (action, clock);
448 static void action_gather(simgrid::xbt::ReplayAction& action)
450 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
453 1) 68 is the sendcounts
454 2) 68 is the recvcounts
455 3) 0 is the root node
456 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
457 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
459 CHECK_ACTION_PARAMS(action, 2, 3)
460 double clock = smpi_process()->simulated_elapsed();
461 unsigned long comm_size = MPI_COMM_WORLD->size();
462 int send_size = parse_double(action[2]);
463 int recv_size = parse_double(action[3]);
464 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
465 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
467 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
468 void *recv = nullptr;
469 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
470 int rank = MPI_COMM_WORLD->rank();
473 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
475 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
476 Datatype::encode(MPI_CURRENT_TYPE),
477 Datatype::encode(MPI_CURRENT_TYPE2)));
479 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
481 TRACE_smpi_comm_out(Actor::self()->getPid());
482 log_timed_action (action, clock);
485 static void action_scatter(simgrid::xbt::ReplayAction& action)
487 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
490 1) 68 is the sendcounts
491 2) 68 is the recvcounts
492 3) 0 is the root node
493 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
494 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
496 CHECK_ACTION_PARAMS(action, 2, 3)
497 double clock = smpi_process()->simulated_elapsed();
498 unsigned long comm_size = MPI_COMM_WORLD->size();
499 int send_size = parse_double(action[2]);
500 int recv_size = parse_double(action[3]);
501 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
502 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
504 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
505 void* recv = nullptr;
506 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
507 int rank = MPI_COMM_WORLD->rank();
510 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
512 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
513 Datatype::encode(MPI_CURRENT_TYPE),
514 Datatype::encode(MPI_CURRENT_TYPE2)));
516 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
518 TRACE_smpi_comm_out(Actor::self()->getPid());
519 log_timed_action(action, clock);
522 static void action_gatherv(simgrid::xbt::ReplayAction& action)
524 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
525 0 gather 68 68 10 10 10 0 0 0
527 1) 68 is the sendcount
528 2) 68 10 10 10 is the recvcounts
529 3) 0 is the root node
530 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
531 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
533 double clock = smpi_process()->simulated_elapsed();
534 unsigned long comm_size = MPI_COMM_WORLD->size();
535 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
536 int send_size = parse_double(action[2]);
537 std::vector<int> disps(comm_size, 0);
538 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
540 MPI_Datatype MPI_CURRENT_TYPE =
541 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
542 MPI_Datatype MPI_CURRENT_TYPE2{
543 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
545 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
546 void *recv = nullptr;
547 for (unsigned int i = 0; i < comm_size; i++) {
548 (*recvcounts)[i] = std::stoi(action[i + 3]);
550 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
552 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
553 int rank = MPI_COMM_WORLD->rank();
556 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
558 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
559 "gatherV", root, send_size, nullptr, -1, recvcounts,
560 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
562 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
565 TRACE_smpi_comm_out(Actor::self()->getPid());
566 log_timed_action (action, clock);
569 static void action_scatterv(simgrid::xbt::ReplayAction& action)
571 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
572 0 gather 68 10 10 10 68 0 0 0
574 1) 68 10 10 10 is the sendcounts
575 2) 68 is the recvcount
576 3) 0 is the root node
577 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
578 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
580 double clock = smpi_process()->simulated_elapsed();
581 unsigned long comm_size = MPI_COMM_WORLD->size();
582 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
583 int recv_size = parse_double(action[2 + comm_size]);
584 std::vector<int> disps(comm_size, 0);
585 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
587 MPI_Datatype MPI_CURRENT_TYPE =
588 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
589 MPI_Datatype MPI_CURRENT_TYPE2{
590 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
592 void* send = nullptr;
593 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
594 for (unsigned int i = 0; i < comm_size; i++) {
595 (*sendcounts)[i] = std::stoi(action[i + 2]);
597 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
599 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
600 int rank = MPI_COMM_WORLD->rank();
603 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
605 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
606 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
607 Datatype::encode(MPI_CURRENT_TYPE2)));
609 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
612 TRACE_smpi_comm_out(Actor::self()->getPid());
613 log_timed_action(action, clock);
616 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
618 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
619 0 reduceScatter 275427 275427 275427 204020 11346849 0
621 1) The first four values after the name of the action declare the recvcounts array
622 2) The value 11346849 is the amount of instructions
623 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
625 double clock = smpi_process()->simulated_elapsed();
626 unsigned long comm_size = MPI_COMM_WORLD->size();
627 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
628 int comp_size = parse_double(action[2+comm_size]);
629 int my_proc_id = Actor::self()->getPid();
630 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
631 MPI_Datatype MPI_CURRENT_TYPE =
632 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
634 for (unsigned int i = 0; i < comm_size; i++) {
635 recvcounts->push_back(std::stoi(action[i + 2]));
637 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
639 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
640 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
641 std::to_string(comp_size), /* ugly hack to print comp_size */
642 Datatype::encode(MPI_CURRENT_TYPE)));
644 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
645 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
647 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
648 smpi_execute_flops(comp_size);
650 TRACE_smpi_comm_out(my_proc_id);
651 log_timed_action (action, clock);
654 static void action_allgather(simgrid::xbt::ReplayAction& action)
656 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
657 0 allGather 275427 275427
659 1) 275427 is the sendcount
660 2) 275427 is the recvcount
661 3) No more values mean that the datatype for sent and receive buffer is the default one, see
662 simgrid::smpi::Datatype::decode().
664 double clock = smpi_process()->simulated_elapsed();
666 CHECK_ACTION_PARAMS(action, 2, 2)
667 int sendcount = std::stoi(action[2]);
668 int recvcount = std::stoi(action[3]);
670 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
671 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
673 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
674 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
676 int my_proc_id = Actor::self()->getPid();
678 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
679 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
680 Datatype::encode(MPI_CURRENT_TYPE),
681 Datatype::encode(MPI_CURRENT_TYPE2)));
683 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
685 TRACE_smpi_comm_out(my_proc_id);
686 log_timed_action (action, clock);
689 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
691 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
692 0 allGatherV 275427 275427 275427 275427 204020
694 1) 275427 is the sendcount
695 2) The next four elements declare the recvcounts array
696 3) No more values mean that the datatype for sent and receive buffer is the default one, see
697 simgrid::smpi::Datatype::decode().
699 double clock = smpi_process()->simulated_elapsed();
701 unsigned long comm_size = MPI_COMM_WORLD->size();
702 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
703 int sendcount = std::stoi(action[2]);
704 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
705 std::vector<int> disps(comm_size, 0);
707 int datatype_index = 0, disp_index = 0;
708 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
709 datatype_index = 3 + comm_size;
710 disp_index = datatype_index + 1;
711 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
713 disp_index = 3 + comm_size;
714 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
715 datatype_index = 3 + comm_size;
718 if (disp_index != 0) {
719 for (unsigned int i = 0; i < comm_size; i++)
720 disps[i] = std::stoi(action[disp_index + i]);
723 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
725 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
728 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
730 for (unsigned int i = 0; i < comm_size; i++) {
731 (*recvcounts)[i] = std::stoi(action[i + 3]);
733 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
734 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
736 int my_proc_id = Actor::self()->getPid();
738 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
739 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
740 Datatype::encode(MPI_CURRENT_TYPE),
741 Datatype::encode(MPI_CURRENT_TYPE2)));
743 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
746 TRACE_smpi_comm_out(my_proc_id);
747 log_timed_action (action, clock);
750 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
752 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
753 0 allToAllV 100 1 7 10 12 100 1 70 10 5
755 1) 100 is the size of the send buffer *sizeof(int),
756 2) 1 7 10 12 is the sendcounts array
757 3) 100*sizeof(int) is the size of the receiver buffer
758 4) 1 70 10 5 is the recvcounts array
760 double clock = smpi_process()->simulated_elapsed();
762 unsigned long comm_size = MPI_COMM_WORLD->size();
763 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
764 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
765 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
766 std::vector<int> senddisps(comm_size, 0);
767 std::vector<int> recvdisps(comm_size, 0);
769 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
770 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
772 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
773 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
776 int send_buf_size=parse_double(action[2]);
777 int recv_buf_size=parse_double(action[3+comm_size]);
778 int my_proc_id = Actor::self()->getPid();
779 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
780 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
782 for (unsigned int i = 0; i < comm_size; i++) {
783 (*sendcounts)[i] = std::stoi(action[3 + i]);
784 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
786 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
787 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
789 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
790 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
791 Datatype::encode(MPI_CURRENT_TYPE),
792 Datatype::encode(MPI_CURRENT_TYPE2)));
794 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
795 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
797 TRACE_smpi_comm_out(my_proc_id);
798 log_timed_action (action, clock);
801 }} // namespace simgrid::smpi
803 /** @brief Only initialize the replay, don't do it for real */
804 void smpi_replay_init(int* argc, char*** argv)
806 simgrid::smpi::Process::init(argc, argv);
807 smpi_process()->mark_as_initialized();
808 smpi_process()->set_replaying(true);
810 int my_proc_id = Actor::self()->getPid();
811 TRACE_smpi_init(my_proc_id);
812 TRACE_smpi_computing_init(my_proc_id);
813 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
814 TRACE_smpi_comm_out(my_proc_id);
815 xbt_replay_action_register("init", simgrid::smpi::action_init);
816 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
817 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
818 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
819 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
821 std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
822 std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
823 std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
824 std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
825 std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
827 xbt_replay_action_register("send",
828 std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
829 xbt_replay_action_register("Isend",
830 std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
831 xbt_replay_action_register("recv",
832 std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
833 xbt_replay_action_register("Irecv",
834 std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
835 xbt_replay_action_register("test", simgrid::smpi::action_test);
836 xbt_replay_action_register("wait",
837 std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
838 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
839 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
840 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
841 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
842 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
843 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
844 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
845 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
846 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
847 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
848 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
849 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
850 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
851 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
852 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
854 //if we have a delayed start, sleep here.
856 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
857 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
858 smpi_execute_flops(value);
860 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
861 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
862 smpi_execute_flops(0.0);
866 /** @brief actually run the replay after initialization */
867 void smpi_replay_main(int* argc, char*** argv)
869 simgrid::xbt::replay_runner(*argc, *argv);
871 /* and now, finalize everything */
872 /* One active process will stop. Decrease the counter*/
873 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
874 if (not get_reqq_self()->empty()) {
875 unsigned int count_requests=get_reqq_self()->size();
876 MPI_Request requests[count_requests];
877 MPI_Status status[count_requests];
880 for (auto const& req : *get_reqq_self()) {
884 simgrid::smpi::Request::waitall(count_requests, requests, status);
886 delete get_reqq_self();
889 if(active_processes==0){
890 /* Last process alive speaking: end the simulated timer */
891 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
892 smpi_free_replay_tmp_buffers();
895 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
897 smpi_process()->finalize();
899 TRACE_smpi_comm_out(Actor::self()->getPid());
900 TRACE_smpi_finalize(Actor::self()->getPid());
903 /** @brief chain a replay initialization and a replay start */
904 void smpi_replay_run(int* argc, char*** argv)
906 smpi_replay_init(argc, argv);
907 smpi_replay_main(argc, argv);