1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 template <class T> class ReplayAction {
105 const std::string name;
111 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
113 virtual void execute(simgrid::xbt::ReplayAction& action)
115 // Needs to be re-initialized for every action, hence here
116 double start_time = smpi_process()->simulated_elapsed();
120 log_timed_action(action, start_time);
123 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
126 class WaitAction : public ReplayAction<ActionArgParser> {
128 WaitAction() : ReplayAction("Wait") {}
129 void kernel(simgrid::xbt::ReplayAction& action) override
131 CHECK_ACTION_PARAMS(action, 0, 0)
134 std::string s = boost::algorithm::join(action, " ");
135 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
136 MPI_Request request = get_reqq_self()->back();
137 get_reqq_self()->pop_back();
139 if (request == nullptr) {
140 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
145 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
147 // Must be taken before Request::wait() since the request may be set to
148 // MPI_REQUEST_NULL by Request::wait!
149 int src = request->comm()->group()->rank(request->src());
150 int dst = request->comm()->group()->rank(request->dst());
151 bool is_wait_for_receive = (request->flags() & RECV);
152 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
153 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
155 Request::wait(&request, &status);
157 TRACE_smpi_comm_out(rank);
158 if (is_wait_for_receive)
159 TRACE_smpi_recv(src, dst, 0);
163 class SendAction : public ReplayAction<SendRecvParser> {
165 SendAction() = delete;
166 SendAction(std::string name) : ReplayAction(name) {}
167 void kernel(simgrid::xbt::ReplayAction& action) override
169 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
171 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
172 Datatype::encode(args.datatype1)));
173 if (not TRACE_smpi_view_internals())
174 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
176 if (name == "send") {
177 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178 } else if (name == "Isend") {
179 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
180 get_reqq_self()->push_back(request);
182 xbt_die("Don't know this action, %s", name.c_str());
185 TRACE_smpi_comm_out(my_proc_id);
189 class RecvAction : public ReplayAction<SendRecvParser> {
191 RecvAction() = delete;
192 explicit RecvAction(std::string name) : ReplayAction(name) {}
193 void kernel(simgrid::xbt::ReplayAction& action) override
195 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
197 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
198 Datatype::encode(args.datatype1)));
201 // unknown size from the receiver point of view
202 if (args.size <= 0.0) {
203 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
204 args.size = status.count;
207 if (name == "recv") {
208 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
209 } else if (name == "Irecv") {
210 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
211 get_reqq_self()->push_back(request);
214 TRACE_smpi_comm_out(my_proc_id);
215 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
216 if (name == "recv" && not TRACE_smpi_view_internals()) {
217 TRACE_smpi_recv(src_traced, my_proc_id, 0);
222 class ComputeAction : public ReplayAction<ComputeParser> {
224 ComputeAction() : ReplayAction("compute") {}
225 void kernel(simgrid::xbt::ReplayAction& action) override
227 TRACE_smpi_computing_in(my_proc_id, args.flops);
228 smpi_execute_flops(args.flops);
229 TRACE_smpi_computing_out(my_proc_id);
233 class TestAction : public ReplayAction<ActionArgParser> {
235 TestAction() : ReplayAction("Test") {}
236 void kernel(simgrid::xbt::ReplayAction& action) override
238 MPI_Request request = get_reqq_self()->back();
239 get_reqq_self()->pop_back();
240 // if request is null here, this may mean that a previous test has succeeded
241 // Different times in traced application and replayed version may lead to this
242 // In this case, ignore the extra calls.
243 if (request != nullptr) {
244 TRACE_smpi_testing_in(my_proc_id);
247 int flag = Request::test(&request, &status);
249 XBT_DEBUG("MPI_Test result: %d", flag);
250 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
252 get_reqq_self()->push_back(request);
254 TRACE_smpi_testing_out(my_proc_id);
259 class InitAction : public ReplayAction<ActionArgParser> {
261 InitAction() : ReplayAction("Init") {}
262 void kernel(simgrid::xbt::ReplayAction& action) override
264 CHECK_ACTION_PARAMS(action, 0, 1)
265 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
266 : MPI_BYTE; // default TAU datatype
268 /* start a simulated timer */
269 smpi_process()->simulated_start();
270 /*initialize the number of active processes */
271 active_processes = smpi_process_count();
273 set_reqq_self(new std::vector<MPI_Request>);
277 class CommunicatorAction : public ReplayAction<ActionArgParser> {
279 CommunicatorAction() : ReplayAction("Comm") {}
280 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
283 class WaitAllAction : public ReplayAction<ActionArgParser> {
285 WaitAllAction() : ReplayAction("waitAll") {}
286 void kernel(simgrid::xbt::ReplayAction& action) override
288 const unsigned int count_requests = get_reqq_self()->size();
290 if (count_requests > 0) {
291 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
292 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
293 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
294 for (const auto& req : (*get_reqq_self())) {
295 if (req && (req->flags() & RECV)) {
296 sender_receiver.push_back({req->src(), req->dst()});
299 MPI_Status status[count_requests];
300 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
302 for (auto& pair : sender_receiver) {
303 TRACE_smpi_recv(pair.first, pair.second, 0);
305 TRACE_smpi_comm_out(my_proc_id);
310 } // Replay Namespace
312 static void action_barrier(simgrid::xbt::ReplayAction& action)
314 double clock = smpi_process()->simulated_elapsed();
315 int my_proc_id = Actor::self()->getPid();
316 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
318 Colls::barrier(MPI_COMM_WORLD);
320 TRACE_smpi_comm_out(my_proc_id);
321 log_timed_action (action, clock);
324 static void action_bcast(simgrid::xbt::ReplayAction& action)
326 CHECK_ACTION_PARAMS(action, 1, 2)
327 double size = parse_double(action[2]);
328 double clock = smpi_process()->simulated_elapsed();
329 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
330 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
331 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
333 int my_proc_id = Actor::self()->getPid();
334 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
335 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
336 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
338 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
340 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
342 TRACE_smpi_comm_out(my_proc_id);
343 log_timed_action (action, clock);
346 static void action_reduce(simgrid::xbt::ReplayAction& action)
348 CHECK_ACTION_PARAMS(action, 2, 2)
349 double comm_size = parse_double(action[2]);
350 double comp_size = parse_double(action[3]);
351 double clock = smpi_process()->simulated_elapsed();
352 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
354 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
356 int my_proc_id = Actor::self()->getPid();
357 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
358 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
359 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
361 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
362 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
363 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
364 smpi_execute_flops(comp_size);
366 TRACE_smpi_comm_out(my_proc_id);
367 log_timed_action (action, clock);
370 static void action_allReduce(simgrid::xbt::ReplayAction& action)
372 CHECK_ACTION_PARAMS(action, 2, 1)
373 double comm_size = parse_double(action[2]);
374 double comp_size = parse_double(action[3]);
376 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
378 double clock = smpi_process()->simulated_elapsed();
379 int my_proc_id = Actor::self()->getPid();
380 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
381 Datatype::encode(MPI_CURRENT_TYPE), ""));
383 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
384 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
385 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
386 smpi_execute_flops(comp_size);
388 TRACE_smpi_comm_out(my_proc_id);
389 log_timed_action (action, clock);
392 static void action_allToAll(simgrid::xbt::ReplayAction& action)
394 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
395 double clock = smpi_process()->simulated_elapsed();
396 unsigned long comm_size = MPI_COMM_WORLD->size();
397 int send_size = parse_double(action[2]);
398 int recv_size = parse_double(action[3]);
399 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
400 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
402 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
403 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
405 int my_proc_id = Actor::self()->getPid();
406 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
407 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
408 Datatype::encode(MPI_CURRENT_TYPE),
409 Datatype::encode(MPI_CURRENT_TYPE2)));
411 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
413 TRACE_smpi_comm_out(my_proc_id);
414 log_timed_action (action, clock);
417 static void action_gather(simgrid::xbt::ReplayAction& action)
419 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
422 1) 68 is the sendcounts
423 2) 68 is the recvcounts
424 3) 0 is the root node
425 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
426 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
428 CHECK_ACTION_PARAMS(action, 2, 3)
429 double clock = smpi_process()->simulated_elapsed();
430 unsigned long comm_size = MPI_COMM_WORLD->size();
431 int send_size = parse_double(action[2]);
432 int recv_size = parse_double(action[3]);
433 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
434 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
436 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
437 void *recv = nullptr;
438 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
439 int rank = MPI_COMM_WORLD->rank();
442 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
444 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
445 Datatype::encode(MPI_CURRENT_TYPE),
446 Datatype::encode(MPI_CURRENT_TYPE2)));
448 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
450 TRACE_smpi_comm_out(Actor::self()->getPid());
451 log_timed_action (action, clock);
454 static void action_scatter(simgrid::xbt::ReplayAction& action)
456 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
459 1) 68 is the sendcounts
460 2) 68 is the recvcounts
461 3) 0 is the root node
462 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
463 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
465 CHECK_ACTION_PARAMS(action, 2, 3)
466 double clock = smpi_process()->simulated_elapsed();
467 unsigned long comm_size = MPI_COMM_WORLD->size();
468 int send_size = parse_double(action[2]);
469 int recv_size = parse_double(action[3]);
470 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
471 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
473 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
474 void* recv = nullptr;
475 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
476 int rank = MPI_COMM_WORLD->rank();
479 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
481 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
482 Datatype::encode(MPI_CURRENT_TYPE),
483 Datatype::encode(MPI_CURRENT_TYPE2)));
485 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
487 TRACE_smpi_comm_out(Actor::self()->getPid());
488 log_timed_action(action, clock);
491 static void action_gatherv(simgrid::xbt::ReplayAction& action)
493 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
494 0 gather 68 68 10 10 10 0 0 0
496 1) 68 is the sendcount
497 2) 68 10 10 10 is the recvcounts
498 3) 0 is the root node
499 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
500 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
502 double clock = smpi_process()->simulated_elapsed();
503 unsigned long comm_size = MPI_COMM_WORLD->size();
504 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
505 int send_size = parse_double(action[2]);
506 std::vector<int> disps(comm_size, 0);
507 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
509 MPI_Datatype MPI_CURRENT_TYPE =
510 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
511 MPI_Datatype MPI_CURRENT_TYPE2{
512 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
514 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
515 void *recv = nullptr;
516 for (unsigned int i = 0; i < comm_size; i++) {
517 (*recvcounts)[i] = std::stoi(action[i + 3]);
519 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
521 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
522 int rank = MPI_COMM_WORLD->rank();
525 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
527 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
528 "gatherV", root, send_size, nullptr, -1, recvcounts,
529 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
531 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
534 TRACE_smpi_comm_out(Actor::self()->getPid());
535 log_timed_action (action, clock);
538 static void action_scatterv(simgrid::xbt::ReplayAction& action)
540 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
541 0 gather 68 10 10 10 68 0 0 0
543 1) 68 10 10 10 is the sendcounts
544 2) 68 is the recvcount
545 3) 0 is the root node
546 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
547 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
549 double clock = smpi_process()->simulated_elapsed();
550 unsigned long comm_size = MPI_COMM_WORLD->size();
551 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
552 int recv_size = parse_double(action[2 + comm_size]);
553 std::vector<int> disps(comm_size, 0);
554 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
556 MPI_Datatype MPI_CURRENT_TYPE =
557 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
558 MPI_Datatype MPI_CURRENT_TYPE2{
559 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
561 void* send = nullptr;
562 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
563 for (unsigned int i = 0; i < comm_size; i++) {
564 (*sendcounts)[i] = std::stoi(action[i + 2]);
566 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
568 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
569 int rank = MPI_COMM_WORLD->rank();
572 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
574 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
575 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
576 Datatype::encode(MPI_CURRENT_TYPE2)));
578 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
581 TRACE_smpi_comm_out(Actor::self()->getPid());
582 log_timed_action(action, clock);
585 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
587 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
588 0 reduceScatter 275427 275427 275427 204020 11346849 0
590 1) The first four values after the name of the action declare the recvcounts array
591 2) The value 11346849 is the amount of instructions
592 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
594 double clock = smpi_process()->simulated_elapsed();
595 unsigned long comm_size = MPI_COMM_WORLD->size();
596 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
597 int comp_size = parse_double(action[2+comm_size]);
598 int my_proc_id = Actor::self()->getPid();
599 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
600 MPI_Datatype MPI_CURRENT_TYPE =
601 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
603 for (unsigned int i = 0; i < comm_size; i++) {
604 recvcounts->push_back(std::stoi(action[i + 2]));
606 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
608 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
609 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
610 std::to_string(comp_size), /* ugly hack to print comp_size */
611 Datatype::encode(MPI_CURRENT_TYPE)));
613 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
614 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
616 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
617 smpi_execute_flops(comp_size);
619 TRACE_smpi_comm_out(my_proc_id);
620 log_timed_action (action, clock);
623 static void action_allgather(simgrid::xbt::ReplayAction& action)
625 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
626 0 allGather 275427 275427
628 1) 275427 is the sendcount
629 2) 275427 is the recvcount
630 3) No more values mean that the datatype for sent and receive buffer is the default one, see
631 simgrid::smpi::Datatype::decode().
633 double clock = smpi_process()->simulated_elapsed();
635 CHECK_ACTION_PARAMS(action, 2, 2)
636 int sendcount = std::stoi(action[2]);
637 int recvcount = std::stoi(action[3]);
639 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
640 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
642 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
643 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
645 int my_proc_id = Actor::self()->getPid();
647 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
648 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
649 Datatype::encode(MPI_CURRENT_TYPE),
650 Datatype::encode(MPI_CURRENT_TYPE2)));
652 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
654 TRACE_smpi_comm_out(my_proc_id);
655 log_timed_action (action, clock);
658 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
660 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
661 0 allGatherV 275427 275427 275427 275427 204020
663 1) 275427 is the sendcount
664 2) The next four elements declare the recvcounts array
665 3) No more values mean that the datatype for sent and receive buffer is the default one, see
666 simgrid::smpi::Datatype::decode().
668 double clock = smpi_process()->simulated_elapsed();
670 unsigned long comm_size = MPI_COMM_WORLD->size();
671 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
672 int sendcount = std::stoi(action[2]);
673 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
674 std::vector<int> disps(comm_size, 0);
676 int datatype_index = 0, disp_index = 0;
677 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
678 datatype_index = 3 + comm_size;
679 disp_index = datatype_index + 1;
680 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
682 disp_index = 3 + comm_size;
683 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
684 datatype_index = 3 + comm_size;
687 if (disp_index != 0) {
688 for (unsigned int i = 0; i < comm_size; i++)
689 disps[i] = std::stoi(action[disp_index + i]);
692 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
694 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
697 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
699 for (unsigned int i = 0; i < comm_size; i++) {
700 (*recvcounts)[i] = std::stoi(action[i + 3]);
702 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
703 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
705 int my_proc_id = Actor::self()->getPid();
707 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
708 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
709 Datatype::encode(MPI_CURRENT_TYPE),
710 Datatype::encode(MPI_CURRENT_TYPE2)));
712 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
715 TRACE_smpi_comm_out(my_proc_id);
716 log_timed_action (action, clock);
719 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
721 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
722 0 allToAllV 100 1 7 10 12 100 1 70 10 5
724 1) 100 is the size of the send buffer *sizeof(int),
725 2) 1 7 10 12 is the sendcounts array
726 3) 100*sizeof(int) is the size of the receiver buffer
727 4) 1 70 10 5 is the recvcounts array
729 double clock = smpi_process()->simulated_elapsed();
731 unsigned long comm_size = MPI_COMM_WORLD->size();
732 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
733 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
734 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
735 std::vector<int> senddisps(comm_size, 0);
736 std::vector<int> recvdisps(comm_size, 0);
738 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
739 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
741 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
742 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
745 int send_buf_size=parse_double(action[2]);
746 int recv_buf_size=parse_double(action[3+comm_size]);
747 int my_proc_id = Actor::self()->getPid();
748 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
749 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
751 for (unsigned int i = 0; i < comm_size; i++) {
752 (*sendcounts)[i] = std::stoi(action[3 + i]);
753 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
755 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
756 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
758 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
759 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
760 Datatype::encode(MPI_CURRENT_TYPE),
761 Datatype::encode(MPI_CURRENT_TYPE2)));
763 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
764 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
766 TRACE_smpi_comm_out(my_proc_id);
767 log_timed_action (action, clock);
770 }} // namespace simgrid::smpi
772 /** @brief Only initialize the replay, don't do it for real */
773 void smpi_replay_init(int* argc, char*** argv)
775 simgrid::smpi::Process::init(argc, argv);
776 smpi_process()->mark_as_initialized();
777 smpi_process()->set_replaying(true);
779 int my_proc_id = Actor::self()->getPid();
780 TRACE_smpi_init(my_proc_id);
781 TRACE_smpi_computing_init(my_proc_id);
782 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
783 TRACE_smpi_comm_out(my_proc_id);
784 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
785 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
786 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
787 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
788 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
790 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
791 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
792 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
793 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
794 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
795 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
796 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
797 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
798 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
799 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
800 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
801 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
802 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
803 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
804 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
805 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
806 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
807 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
808 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
809 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
810 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
812 //if we have a delayed start, sleep here.
814 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
815 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
816 smpi_execute_flops(value);
818 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
819 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
820 smpi_execute_flops(0.0);
824 /** @brief actually run the replay after initialization */
825 void smpi_replay_main(int* argc, char*** argv)
827 simgrid::xbt::replay_runner(*argc, *argv);
829 /* and now, finalize everything */
830 /* One active process will stop. Decrease the counter*/
831 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
832 if (not get_reqq_self()->empty()) {
833 unsigned int count_requests=get_reqq_self()->size();
834 MPI_Request requests[count_requests];
835 MPI_Status status[count_requests];
838 for (auto const& req : *get_reqq_self()) {
842 simgrid::smpi::Request::waitall(count_requests, requests, status);
844 delete get_reqq_self();
847 if(active_processes==0){
848 /* Last process alive speaking: end the simulated timer */
849 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
850 smpi_free_replay_tmp_buffers();
853 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
855 smpi_process()->finalize();
857 TRACE_smpi_comm_out(Actor::self()->getPid());
858 TRACE_smpi_finalize(Actor::self()->getPid());
861 /** @brief chain a replay initialization and a replay start */
862 void smpi_replay_run(int* argc, char*** argv)
864 smpi_replay_init(argc, argv);
865 smpi_replay_main(argc, argv);