1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 template <class T> class ReplayAction {
105 const std::string name;
111 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
113 virtual void execute(simgrid::xbt::ReplayAction& action)
115 // Needs to be re-initialized for every action, hence here
116 double start_time = smpi_process()->simulated_elapsed();
120 log_timed_action(action, start_time);
123 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
126 class WaitAction : public ReplayAction<ActionArgParser> {
128 WaitAction() : ReplayAction("Wait") {}
129 void kernel(simgrid::xbt::ReplayAction& action) override
131 CHECK_ACTION_PARAMS(action, 0, 0)
134 std::string s = boost::algorithm::join(action, " ");
135 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
136 MPI_Request request = get_reqq_self()->back();
137 get_reqq_self()->pop_back();
139 if (request == nullptr) {
140 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
145 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
147 // Must be taken before Request::wait() since the request may be set to
148 // MPI_REQUEST_NULL by Request::wait!
149 int src = request->comm()->group()->rank(request->src());
150 int dst = request->comm()->group()->rank(request->dst());
151 bool is_wait_for_receive = (request->flags() & RECV);
152 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
153 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
155 Request::wait(&request, &status);
157 TRACE_smpi_comm_out(rank);
158 if (is_wait_for_receive)
159 TRACE_smpi_recv(src, dst, 0);
163 class SendAction : public ReplayAction<SendRecvParser> {
165 SendAction() = delete;
166 SendAction(std::string name) : ReplayAction(name) {}
167 void kernel(simgrid::xbt::ReplayAction& action) override
169 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
171 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
172 Datatype::encode(args.datatype1)));
173 if (not TRACE_smpi_view_internals())
174 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
176 if (name == "send") {
177 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178 } else if (name == "Isend") {
179 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
180 get_reqq_self()->push_back(request);
182 xbt_die("Don't know this action, %s", name.c_str());
185 TRACE_smpi_comm_out(my_proc_id);
189 class RecvAction : public ReplayAction<SendRecvParser> {
191 RecvAction() = delete;
192 explicit RecvAction(std::string name) : ReplayAction(name) {}
193 void kernel(simgrid::xbt::ReplayAction& action) override
195 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
197 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
198 Datatype::encode(args.datatype1)));
201 // unknown size from the receiver point of view
202 if (args.size <= 0.0) {
203 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
204 args.size = status.count;
207 if (name == "recv") {
208 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
209 } else if (name == "Irecv") {
210 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
211 get_reqq_self()->push_back(request);
214 TRACE_smpi_comm_out(my_proc_id);
215 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
216 if (name == "recv" && not TRACE_smpi_view_internals()) {
217 TRACE_smpi_recv(src_traced, my_proc_id, 0);
222 class ComputeAction : public ReplayAction<ComputeParser> {
224 ComputeAction() : ReplayAction("compute") {}
225 void kernel(simgrid::xbt::ReplayAction& action) override
227 TRACE_smpi_computing_in(my_proc_id, args.flops);
228 smpi_execute_flops(args.flops);
229 TRACE_smpi_computing_out(my_proc_id);
233 class TestAction : public ReplayAction<ActionArgParser> {
235 TestAction() : ReplayAction("Test") {}
236 void kernel(simgrid::xbt::ReplayAction& action) override
238 MPI_Request request = get_reqq_self()->back();
239 get_reqq_self()->pop_back();
240 // if request is null here, this may mean that a previous test has succeeded
241 // Different times in traced application and replayed version may lead to this
242 // In this case, ignore the extra calls.
243 if (request != nullptr) {
244 TRACE_smpi_testing_in(my_proc_id);
247 int flag = Request::test(&request, &status);
249 XBT_DEBUG("MPI_Test result: %d", flag);
250 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
252 get_reqq_self()->push_back(request);
254 TRACE_smpi_testing_out(my_proc_id);
259 class InitAction : public ReplayAction<ActionArgParser> {
261 InitAction() : ReplayAction("Init") {}
262 void kernel(simgrid::xbt::ReplayAction& action) override
264 CHECK_ACTION_PARAMS(action, 0, 1)
265 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
266 : MPI_BYTE; // default TAU datatype
268 /* start a simulated timer */
269 smpi_process()->simulated_start();
270 /*initialize the number of active processes */
271 active_processes = smpi_process_count();
273 set_reqq_self(new std::vector<MPI_Request>);
277 } // Replay Namespace
279 static void action_finalize(simgrid::xbt::ReplayAction& action)
284 static void action_comm_size(simgrid::xbt::ReplayAction& action)
286 log_timed_action (action, smpi_process()->simulated_elapsed());
289 static void action_comm_split(simgrid::xbt::ReplayAction& action)
291 log_timed_action (action, smpi_process()->simulated_elapsed());
294 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
296 log_timed_action (action, smpi_process()->simulated_elapsed());
299 static void action_waitall(simgrid::xbt::ReplayAction& action)
301 CHECK_ACTION_PARAMS(action, 0, 0)
302 double clock = smpi_process()->simulated_elapsed();
303 const unsigned int count_requests = get_reqq_self()->size();
305 if (count_requests>0) {
306 MPI_Status status[count_requests];
308 int my_proc_id_traced = Actor::self()->getPid();
309 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
310 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
311 int recvs_snd[count_requests];
312 int recvs_rcv[count_requests];
313 for (unsigned int i = 0; i < count_requests; i++) {
314 const auto& req = (*get_reqq_self())[i];
315 if (req && (req->flags() & RECV)) {
316 recvs_snd[i] = req->src();
317 recvs_rcv[i] = req->dst();
321 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
323 for (unsigned i = 0; i < count_requests; i++) {
324 if (recvs_snd[i]!=-100)
325 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
327 TRACE_smpi_comm_out(my_proc_id_traced);
329 log_timed_action (action, clock);
332 static void action_barrier(simgrid::xbt::ReplayAction& action)
334 double clock = smpi_process()->simulated_elapsed();
335 int my_proc_id = Actor::self()->getPid();
336 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
338 Colls::barrier(MPI_COMM_WORLD);
340 TRACE_smpi_comm_out(my_proc_id);
341 log_timed_action (action, clock);
344 static void action_bcast(simgrid::xbt::ReplayAction& action)
346 CHECK_ACTION_PARAMS(action, 1, 2)
347 double size = parse_double(action[2]);
348 double clock = smpi_process()->simulated_elapsed();
349 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
350 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
351 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
353 int my_proc_id = Actor::self()->getPid();
354 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
355 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
356 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
358 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
360 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
362 TRACE_smpi_comm_out(my_proc_id);
363 log_timed_action (action, clock);
366 static void action_reduce(simgrid::xbt::ReplayAction& action)
368 CHECK_ACTION_PARAMS(action, 2, 2)
369 double comm_size = parse_double(action[2]);
370 double comp_size = parse_double(action[3]);
371 double clock = smpi_process()->simulated_elapsed();
372 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
374 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
376 int my_proc_id = Actor::self()->getPid();
377 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
378 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
379 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
381 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
382 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
383 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
384 smpi_execute_flops(comp_size);
386 TRACE_smpi_comm_out(my_proc_id);
387 log_timed_action (action, clock);
390 static void action_allReduce(simgrid::xbt::ReplayAction& action)
392 CHECK_ACTION_PARAMS(action, 2, 1)
393 double comm_size = parse_double(action[2]);
394 double comp_size = parse_double(action[3]);
396 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
398 double clock = smpi_process()->simulated_elapsed();
399 int my_proc_id = Actor::self()->getPid();
400 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
401 Datatype::encode(MPI_CURRENT_TYPE), ""));
403 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
404 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
405 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
406 smpi_execute_flops(comp_size);
408 TRACE_smpi_comm_out(my_proc_id);
409 log_timed_action (action, clock);
412 static void action_allToAll(simgrid::xbt::ReplayAction& action)
414 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
415 double clock = smpi_process()->simulated_elapsed();
416 unsigned long comm_size = MPI_COMM_WORLD->size();
417 int send_size = parse_double(action[2]);
418 int recv_size = parse_double(action[3]);
419 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
420 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
422 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
423 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
425 int my_proc_id = Actor::self()->getPid();
426 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
427 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
428 Datatype::encode(MPI_CURRENT_TYPE),
429 Datatype::encode(MPI_CURRENT_TYPE2)));
431 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
433 TRACE_smpi_comm_out(my_proc_id);
434 log_timed_action (action, clock);
437 static void action_gather(simgrid::xbt::ReplayAction& action)
439 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
442 1) 68 is the sendcounts
443 2) 68 is the recvcounts
444 3) 0 is the root node
445 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
446 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
448 CHECK_ACTION_PARAMS(action, 2, 3)
449 double clock = smpi_process()->simulated_elapsed();
450 unsigned long comm_size = MPI_COMM_WORLD->size();
451 int send_size = parse_double(action[2]);
452 int recv_size = parse_double(action[3]);
453 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
454 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
456 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
457 void *recv = nullptr;
458 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
459 int rank = MPI_COMM_WORLD->rank();
462 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
464 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
465 Datatype::encode(MPI_CURRENT_TYPE),
466 Datatype::encode(MPI_CURRENT_TYPE2)));
468 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
470 TRACE_smpi_comm_out(Actor::self()->getPid());
471 log_timed_action (action, clock);
474 static void action_scatter(simgrid::xbt::ReplayAction& action)
476 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
479 1) 68 is the sendcounts
480 2) 68 is the recvcounts
481 3) 0 is the root node
482 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
483 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
485 CHECK_ACTION_PARAMS(action, 2, 3)
486 double clock = smpi_process()->simulated_elapsed();
487 unsigned long comm_size = MPI_COMM_WORLD->size();
488 int send_size = parse_double(action[2]);
489 int recv_size = parse_double(action[3]);
490 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
491 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
493 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
494 void* recv = nullptr;
495 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
496 int rank = MPI_COMM_WORLD->rank();
499 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
501 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
502 Datatype::encode(MPI_CURRENT_TYPE),
503 Datatype::encode(MPI_CURRENT_TYPE2)));
505 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
507 TRACE_smpi_comm_out(Actor::self()->getPid());
508 log_timed_action(action, clock);
511 static void action_gatherv(simgrid::xbt::ReplayAction& action)
513 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
514 0 gather 68 68 10 10 10 0 0 0
516 1) 68 is the sendcount
517 2) 68 10 10 10 is the recvcounts
518 3) 0 is the root node
519 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
520 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
522 double clock = smpi_process()->simulated_elapsed();
523 unsigned long comm_size = MPI_COMM_WORLD->size();
524 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
525 int send_size = parse_double(action[2]);
526 std::vector<int> disps(comm_size, 0);
527 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
529 MPI_Datatype MPI_CURRENT_TYPE =
530 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
531 MPI_Datatype MPI_CURRENT_TYPE2{
532 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
534 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
535 void *recv = nullptr;
536 for (unsigned int i = 0; i < comm_size; i++) {
537 (*recvcounts)[i] = std::stoi(action[i + 3]);
539 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
541 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
542 int rank = MPI_COMM_WORLD->rank();
545 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
547 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
548 "gatherV", root, send_size, nullptr, -1, recvcounts,
549 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
551 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
554 TRACE_smpi_comm_out(Actor::self()->getPid());
555 log_timed_action (action, clock);
558 static void action_scatterv(simgrid::xbt::ReplayAction& action)
560 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
561 0 gather 68 10 10 10 68 0 0 0
563 1) 68 10 10 10 is the sendcounts
564 2) 68 is the recvcount
565 3) 0 is the root node
566 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
567 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
569 double clock = smpi_process()->simulated_elapsed();
570 unsigned long comm_size = MPI_COMM_WORLD->size();
571 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
572 int recv_size = parse_double(action[2 + comm_size]);
573 std::vector<int> disps(comm_size, 0);
574 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
576 MPI_Datatype MPI_CURRENT_TYPE =
577 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
578 MPI_Datatype MPI_CURRENT_TYPE2{
579 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
581 void* send = nullptr;
582 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
583 for (unsigned int i = 0; i < comm_size; i++) {
584 (*sendcounts)[i] = std::stoi(action[i + 2]);
586 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
588 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
589 int rank = MPI_COMM_WORLD->rank();
592 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
594 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
595 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
596 Datatype::encode(MPI_CURRENT_TYPE2)));
598 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
601 TRACE_smpi_comm_out(Actor::self()->getPid());
602 log_timed_action(action, clock);
605 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
607 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
608 0 reduceScatter 275427 275427 275427 204020 11346849 0
610 1) The first four values after the name of the action declare the recvcounts array
611 2) The value 11346849 is the amount of instructions
612 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
614 double clock = smpi_process()->simulated_elapsed();
615 unsigned long comm_size = MPI_COMM_WORLD->size();
616 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
617 int comp_size = parse_double(action[2+comm_size]);
618 int my_proc_id = Actor::self()->getPid();
619 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
620 MPI_Datatype MPI_CURRENT_TYPE =
621 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
623 for (unsigned int i = 0; i < comm_size; i++) {
624 recvcounts->push_back(std::stoi(action[i + 2]));
626 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
628 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
629 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
630 std::to_string(comp_size), /* ugly hack to print comp_size */
631 Datatype::encode(MPI_CURRENT_TYPE)));
633 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
634 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
636 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
637 smpi_execute_flops(comp_size);
639 TRACE_smpi_comm_out(my_proc_id);
640 log_timed_action (action, clock);
643 static void action_allgather(simgrid::xbt::ReplayAction& action)
645 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
646 0 allGather 275427 275427
648 1) 275427 is the sendcount
649 2) 275427 is the recvcount
650 3) No more values mean that the datatype for sent and receive buffer is the default one, see
651 simgrid::smpi::Datatype::decode().
653 double clock = smpi_process()->simulated_elapsed();
655 CHECK_ACTION_PARAMS(action, 2, 2)
656 int sendcount = std::stoi(action[2]);
657 int recvcount = std::stoi(action[3]);
659 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
660 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
662 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
663 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
665 int my_proc_id = Actor::self()->getPid();
667 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
668 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
669 Datatype::encode(MPI_CURRENT_TYPE),
670 Datatype::encode(MPI_CURRENT_TYPE2)));
672 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
674 TRACE_smpi_comm_out(my_proc_id);
675 log_timed_action (action, clock);
678 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
680 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
681 0 allGatherV 275427 275427 275427 275427 204020
683 1) 275427 is the sendcount
684 2) The next four elements declare the recvcounts array
685 3) No more values mean that the datatype for sent and receive buffer is the default one, see
686 simgrid::smpi::Datatype::decode().
688 double clock = smpi_process()->simulated_elapsed();
690 unsigned long comm_size = MPI_COMM_WORLD->size();
691 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
692 int sendcount = std::stoi(action[2]);
693 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
694 std::vector<int> disps(comm_size, 0);
696 int datatype_index = 0, disp_index = 0;
697 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
698 datatype_index = 3 + comm_size;
699 disp_index = datatype_index + 1;
700 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
702 disp_index = 3 + comm_size;
703 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
704 datatype_index = 3 + comm_size;
707 if (disp_index != 0) {
708 for (unsigned int i = 0; i < comm_size; i++)
709 disps[i] = std::stoi(action[disp_index + i]);
712 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
714 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
717 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
719 for (unsigned int i = 0; i < comm_size; i++) {
720 (*recvcounts)[i] = std::stoi(action[i + 3]);
722 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
723 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
725 int my_proc_id = Actor::self()->getPid();
727 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
728 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
729 Datatype::encode(MPI_CURRENT_TYPE),
730 Datatype::encode(MPI_CURRENT_TYPE2)));
732 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
735 TRACE_smpi_comm_out(my_proc_id);
736 log_timed_action (action, clock);
739 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
741 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
742 0 allToAllV 100 1 7 10 12 100 1 70 10 5
744 1) 100 is the size of the send buffer *sizeof(int),
745 2) 1 7 10 12 is the sendcounts array
746 3) 100*sizeof(int) is the size of the receiver buffer
747 4) 1 70 10 5 is the recvcounts array
749 double clock = smpi_process()->simulated_elapsed();
751 unsigned long comm_size = MPI_COMM_WORLD->size();
752 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
753 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
754 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
755 std::vector<int> senddisps(comm_size, 0);
756 std::vector<int> recvdisps(comm_size, 0);
758 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
759 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
761 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
762 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
765 int send_buf_size=parse_double(action[2]);
766 int recv_buf_size=parse_double(action[3+comm_size]);
767 int my_proc_id = Actor::self()->getPid();
768 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
769 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
771 for (unsigned int i = 0; i < comm_size; i++) {
772 (*sendcounts)[i] = std::stoi(action[3 + i]);
773 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
775 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
776 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
778 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
779 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
780 Datatype::encode(MPI_CURRENT_TYPE),
781 Datatype::encode(MPI_CURRENT_TYPE2)));
783 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
784 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
786 TRACE_smpi_comm_out(my_proc_id);
787 log_timed_action (action, clock);
790 }} // namespace simgrid::smpi
792 /** @brief Only initialize the replay, don't do it for real */
793 void smpi_replay_init(int* argc, char*** argv)
795 simgrid::smpi::Process::init(argc, argv);
796 smpi_process()->mark_as_initialized();
797 smpi_process()->set_replaying(true);
799 int my_proc_id = Actor::self()->getPid();
800 TRACE_smpi_init(my_proc_id);
801 TRACE_smpi_computing_init(my_proc_id);
802 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
803 TRACE_smpi_comm_out(my_proc_id);
804 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
805 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
806 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
807 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
808 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
810 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
811 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
812 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
813 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
814 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
815 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
816 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
817 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
818 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
819 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
820 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
821 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
822 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
823 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
824 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
825 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
826 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
827 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
828 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
829 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
830 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
832 //if we have a delayed start, sleep here.
834 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
835 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
836 smpi_execute_flops(value);
838 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
839 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
840 smpi_execute_flops(0.0);
844 /** @brief actually run the replay after initialization */
845 void smpi_replay_main(int* argc, char*** argv)
847 simgrid::xbt::replay_runner(*argc, *argv);
849 /* and now, finalize everything */
850 /* One active process will stop. Decrease the counter*/
851 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
852 if (not get_reqq_self()->empty()) {
853 unsigned int count_requests=get_reqq_self()->size();
854 MPI_Request requests[count_requests];
855 MPI_Status status[count_requests];
858 for (auto const& req : *get_reqq_self()) {
862 simgrid::smpi::Request::waitall(count_requests, requests, status);
864 delete get_reqq_self();
867 if(active_processes==0){
868 /* Last process alive speaking: end the simulated timer */
869 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
870 smpi_free_replay_tmp_buffers();
873 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
875 smpi_process()->finalize();
877 TRACE_smpi_comm_out(Actor::self()->getPid());
878 TRACE_smpi_finalize(Actor::self()->getPid());
881 /** @brief chain a replay initialization and a replay start */
882 void smpi_replay_run(int* argc, char*** argv)
884 smpi_replay_init(argc, argv);
885 smpi_replay_main(argc, argv);