1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 template <class T> class ReplayAction {
105 const std::string name;
111 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
113 virtual void execute(simgrid::xbt::ReplayAction& action)
115 // Needs to be re-initialized for every action, hence here
116 double start_time = smpi_process()->simulated_elapsed();
120 log_timed_action(action, start_time);
123 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
126 class WaitAction : public ReplayAction<ActionArgParser> {
128 WaitAction() : ReplayAction("Wait") {}
129 void kernel(simgrid::xbt::ReplayAction& action) override
131 std::string s = boost::algorithm::join(action, " ");
132 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
133 MPI_Request request = get_reqq_self()->back();
134 get_reqq_self()->pop_back();
136 if (request == nullptr) {
137 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
142 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
144 // Must be taken before Request::wait() since the request may be set to
145 // MPI_REQUEST_NULL by Request::wait!
146 int src = request->comm()->group()->rank(request->src());
147 int dst = request->comm()->group()->rank(request->dst());
148 bool is_wait_for_receive = (request->flags() & RECV);
149 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
150 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
153 Request::wait(&request, &status);
155 TRACE_smpi_comm_out(rank);
156 if (is_wait_for_receive)
157 TRACE_smpi_recv(src, dst, 0);
161 class SendAction : public ReplayAction<SendRecvParser> {
163 SendAction() = delete;
164 SendAction(std::string name) : ReplayAction(name) {}
165 void kernel(simgrid::xbt::ReplayAction& action) override
167 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
169 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
170 Datatype::encode(args.datatype1)));
171 if (not TRACE_smpi_view_internals())
172 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
174 if (name == "send") {
175 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
176 } else if (name == "Isend") {
177 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178 get_reqq_self()->push_back(request);
180 xbt_die("Don't know this action, %s", name.c_str());
183 TRACE_smpi_comm_out(my_proc_id);
187 class RecvAction : public ReplayAction<SendRecvParser> {
189 RecvAction() = delete;
190 explicit RecvAction(std::string name) : ReplayAction(name) {}
191 void kernel(simgrid::xbt::ReplayAction& action) override
193 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
195 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
196 Datatype::encode(args.datatype1)));
199 // unknown size from the receiver point of view
200 if (args.size <= 0.0) {
201 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
202 args.size = status.count;
205 if (name == "recv") {
206 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
207 } else if (name == "Irecv") {
208 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
209 get_reqq_self()->push_back(request);
212 TRACE_smpi_comm_out(my_proc_id);
213 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
214 if (name == "recv" && not TRACE_smpi_view_internals()) {
215 TRACE_smpi_recv(src_traced, my_proc_id, 0);
220 class ComputeAction : public ReplayAction<ComputeParser> {
222 ComputeAction() : ReplayAction("compute") {}
223 void kernel(simgrid::xbt::ReplayAction& action) override
225 TRACE_smpi_computing_in(my_proc_id, args.flops);
226 smpi_execute_flops(args.flops);
227 TRACE_smpi_computing_out(my_proc_id);
231 class TestAction : public ReplayAction<ActionArgParser> {
233 TestAction() : ReplayAction("Test") {}
234 void kernel(simgrid::xbt::ReplayAction& action) override
236 MPI_Request request = get_reqq_self()->back();
237 get_reqq_self()->pop_back();
238 // if request is null here, this may mean that a previous test has succeeded
239 // Different times in traced application and replayed version may lead to this
240 // In this case, ignore the extra calls.
241 if (request != nullptr) {
242 TRACE_smpi_testing_in(my_proc_id);
245 int flag = Request::test(&request, &status);
247 XBT_DEBUG("MPI_Test result: %d", flag);
248 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
250 get_reqq_self()->push_back(request);
252 TRACE_smpi_testing_out(my_proc_id);
257 class InitAction : public ReplayAction<ActionArgParser> {
259 InitAction() : ReplayAction("Init") {}
260 void kernel(simgrid::xbt::ReplayAction& action) override
262 CHECK_ACTION_PARAMS(action, 0, 1)
263 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
264 : MPI_BYTE; // default TAU datatype
266 /* start a simulated timer */
267 smpi_process()->simulated_start();
268 /*initialize the number of active processes */
269 active_processes = smpi_process_count();
271 set_reqq_self(new std::vector<MPI_Request>);
275 class CommunicatorAction : public ReplayAction<ActionArgParser> {
277 CommunicatorAction() : ReplayAction("Comm") {}
278 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
281 class WaitAllAction : public ReplayAction<ActionArgParser> {
283 WaitAllAction() : ReplayAction("waitAll") {}
284 void kernel(simgrid::xbt::ReplayAction& action) override
286 const unsigned int count_requests = get_reqq_self()->size();
288 if (count_requests > 0) {
289 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
290 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
291 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
292 for (const auto& req : (*get_reqq_self())) {
293 if (req && (req->flags() & RECV)) {
294 sender_receiver.push_back({req->src(), req->dst()});
297 MPI_Status status[count_requests];
298 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
300 for (auto& pair : sender_receiver) {
301 TRACE_smpi_recv(pair.first, pair.second, 0);
303 TRACE_smpi_comm_out(my_proc_id);
308 class BarrierAction : public ReplayAction<ActionArgParser> {
310 BarrierAction() : ReplayAction("barrier") {}
311 void kernel(simgrid::xbt::ReplayAction& action) override
313 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
314 Colls::barrier(MPI_COMM_WORLD);
315 TRACE_smpi_comm_out(my_proc_id);
319 } // Replay Namespace
321 static void action_bcast(simgrid::xbt::ReplayAction& action)
323 CHECK_ACTION_PARAMS(action, 1, 2)
324 double size = parse_double(action[2]);
325 double clock = smpi_process()->simulated_elapsed();
326 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
327 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
328 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
330 int my_proc_id = Actor::self()->getPid();
331 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
332 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
333 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
335 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
337 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
339 TRACE_smpi_comm_out(my_proc_id);
340 log_timed_action (action, clock);
343 static void action_reduce(simgrid::xbt::ReplayAction& action)
345 CHECK_ACTION_PARAMS(action, 2, 2)
346 double comm_size = parse_double(action[2]);
347 double comp_size = parse_double(action[3]);
348 double clock = smpi_process()->simulated_elapsed();
349 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
351 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
353 int my_proc_id = Actor::self()->getPid();
354 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
355 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
356 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
358 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
359 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
360 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
361 smpi_execute_flops(comp_size);
363 TRACE_smpi_comm_out(my_proc_id);
364 log_timed_action (action, clock);
367 static void action_allReduce(simgrid::xbt::ReplayAction& action)
369 CHECK_ACTION_PARAMS(action, 2, 1)
370 double comm_size = parse_double(action[2]);
371 double comp_size = parse_double(action[3]);
373 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
375 double clock = smpi_process()->simulated_elapsed();
376 int my_proc_id = Actor::self()->getPid();
377 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
378 Datatype::encode(MPI_CURRENT_TYPE), ""));
380 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
381 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
382 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
383 smpi_execute_flops(comp_size);
385 TRACE_smpi_comm_out(my_proc_id);
386 log_timed_action (action, clock);
389 static void action_allToAll(simgrid::xbt::ReplayAction& action)
391 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
392 double clock = smpi_process()->simulated_elapsed();
393 unsigned long comm_size = MPI_COMM_WORLD->size();
394 int send_size = parse_double(action[2]);
395 int recv_size = parse_double(action[3]);
396 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
397 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
399 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
400 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
402 int my_proc_id = Actor::self()->getPid();
403 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
404 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
405 Datatype::encode(MPI_CURRENT_TYPE),
406 Datatype::encode(MPI_CURRENT_TYPE2)));
408 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
410 TRACE_smpi_comm_out(my_proc_id);
411 log_timed_action (action, clock);
414 static void action_gather(simgrid::xbt::ReplayAction& action)
416 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
419 1) 68 is the sendcounts
420 2) 68 is the recvcounts
421 3) 0 is the root node
422 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
423 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
425 CHECK_ACTION_PARAMS(action, 2, 3)
426 double clock = smpi_process()->simulated_elapsed();
427 unsigned long comm_size = MPI_COMM_WORLD->size();
428 int send_size = parse_double(action[2]);
429 int recv_size = parse_double(action[3]);
430 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
431 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
433 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
434 void *recv = nullptr;
435 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
436 int rank = MPI_COMM_WORLD->rank();
439 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
441 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
442 Datatype::encode(MPI_CURRENT_TYPE),
443 Datatype::encode(MPI_CURRENT_TYPE2)));
445 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
447 TRACE_smpi_comm_out(Actor::self()->getPid());
448 log_timed_action (action, clock);
451 static void action_scatter(simgrid::xbt::ReplayAction& action)
453 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
456 1) 68 is the sendcounts
457 2) 68 is the recvcounts
458 3) 0 is the root node
459 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
460 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
462 CHECK_ACTION_PARAMS(action, 2, 3)
463 double clock = smpi_process()->simulated_elapsed();
464 unsigned long comm_size = MPI_COMM_WORLD->size();
465 int send_size = parse_double(action[2]);
466 int recv_size = parse_double(action[3]);
467 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
468 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
470 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
471 void* recv = nullptr;
472 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
473 int rank = MPI_COMM_WORLD->rank();
476 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
478 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
479 Datatype::encode(MPI_CURRENT_TYPE),
480 Datatype::encode(MPI_CURRENT_TYPE2)));
482 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
484 TRACE_smpi_comm_out(Actor::self()->getPid());
485 log_timed_action(action, clock);
488 static void action_gatherv(simgrid::xbt::ReplayAction& action)
490 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
491 0 gather 68 68 10 10 10 0 0 0
493 1) 68 is the sendcount
494 2) 68 10 10 10 is the recvcounts
495 3) 0 is the root node
496 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
497 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
499 double clock = smpi_process()->simulated_elapsed();
500 unsigned long comm_size = MPI_COMM_WORLD->size();
501 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
502 int send_size = parse_double(action[2]);
503 std::vector<int> disps(comm_size, 0);
504 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
506 MPI_Datatype MPI_CURRENT_TYPE =
507 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
508 MPI_Datatype MPI_CURRENT_TYPE2{
509 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
511 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
512 void *recv = nullptr;
513 for (unsigned int i = 0; i < comm_size; i++) {
514 (*recvcounts)[i] = std::stoi(action[i + 3]);
516 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
518 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
519 int rank = MPI_COMM_WORLD->rank();
522 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
524 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
525 "gatherV", root, send_size, nullptr, -1, recvcounts,
526 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
528 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
531 TRACE_smpi_comm_out(Actor::self()->getPid());
532 log_timed_action (action, clock);
535 static void action_scatterv(simgrid::xbt::ReplayAction& action)
537 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
538 0 gather 68 10 10 10 68 0 0 0
540 1) 68 10 10 10 is the sendcounts
541 2) 68 is the recvcount
542 3) 0 is the root node
543 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
544 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
546 double clock = smpi_process()->simulated_elapsed();
547 unsigned long comm_size = MPI_COMM_WORLD->size();
548 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
549 int recv_size = parse_double(action[2 + comm_size]);
550 std::vector<int> disps(comm_size, 0);
551 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
553 MPI_Datatype MPI_CURRENT_TYPE =
554 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
555 MPI_Datatype MPI_CURRENT_TYPE2{
556 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
558 void* send = nullptr;
559 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
560 for (unsigned int i = 0; i < comm_size; i++) {
561 (*sendcounts)[i] = std::stoi(action[i + 2]);
563 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
565 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
566 int rank = MPI_COMM_WORLD->rank();
569 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
571 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
572 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
573 Datatype::encode(MPI_CURRENT_TYPE2)));
575 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
578 TRACE_smpi_comm_out(Actor::self()->getPid());
579 log_timed_action(action, clock);
582 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
584 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
585 0 reduceScatter 275427 275427 275427 204020 11346849 0
587 1) The first four values after the name of the action declare the recvcounts array
588 2) The value 11346849 is the amount of instructions
589 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
591 double clock = smpi_process()->simulated_elapsed();
592 unsigned long comm_size = MPI_COMM_WORLD->size();
593 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
594 int comp_size = parse_double(action[2+comm_size]);
595 int my_proc_id = Actor::self()->getPid();
596 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
597 MPI_Datatype MPI_CURRENT_TYPE =
598 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
600 for (unsigned int i = 0; i < comm_size; i++) {
601 recvcounts->push_back(std::stoi(action[i + 2]));
603 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
605 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
606 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
607 std::to_string(comp_size), /* ugly hack to print comp_size */
608 Datatype::encode(MPI_CURRENT_TYPE)));
610 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
611 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
613 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
614 smpi_execute_flops(comp_size);
616 TRACE_smpi_comm_out(my_proc_id);
617 log_timed_action (action, clock);
620 static void action_allgather(simgrid::xbt::ReplayAction& action)
622 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
623 0 allGather 275427 275427
625 1) 275427 is the sendcount
626 2) 275427 is the recvcount
627 3) No more values mean that the datatype for sent and receive buffer is the default one, see
628 simgrid::smpi::Datatype::decode().
630 double clock = smpi_process()->simulated_elapsed();
632 CHECK_ACTION_PARAMS(action, 2, 2)
633 int sendcount = std::stoi(action[2]);
634 int recvcount = std::stoi(action[3]);
636 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
637 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
639 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
640 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
642 int my_proc_id = Actor::self()->getPid();
644 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
645 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
646 Datatype::encode(MPI_CURRENT_TYPE),
647 Datatype::encode(MPI_CURRENT_TYPE2)));
649 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
651 TRACE_smpi_comm_out(my_proc_id);
652 log_timed_action (action, clock);
655 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
657 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
658 0 allGatherV 275427 275427 275427 275427 204020
660 1) 275427 is the sendcount
661 2) The next four elements declare the recvcounts array
662 3) No more values mean that the datatype for sent and receive buffer is the default one, see
663 simgrid::smpi::Datatype::decode().
665 double clock = smpi_process()->simulated_elapsed();
667 unsigned long comm_size = MPI_COMM_WORLD->size();
668 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
669 int sendcount = std::stoi(action[2]);
670 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
671 std::vector<int> disps(comm_size, 0);
673 int datatype_index = 0, disp_index = 0;
674 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
675 datatype_index = 3 + comm_size;
676 disp_index = datatype_index + 1;
677 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
679 disp_index = 3 + comm_size;
680 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
681 datatype_index = 3 + comm_size;
684 if (disp_index != 0) {
685 for (unsigned int i = 0; i < comm_size; i++)
686 disps[i] = std::stoi(action[disp_index + i]);
689 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
691 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
694 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
696 for (unsigned int i = 0; i < comm_size; i++) {
697 (*recvcounts)[i] = std::stoi(action[i + 3]);
699 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
700 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
702 int my_proc_id = Actor::self()->getPid();
704 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
705 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
706 Datatype::encode(MPI_CURRENT_TYPE),
707 Datatype::encode(MPI_CURRENT_TYPE2)));
709 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
712 TRACE_smpi_comm_out(my_proc_id);
713 log_timed_action (action, clock);
716 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
718 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
719 0 allToAllV 100 1 7 10 12 100 1 70 10 5
721 1) 100 is the size of the send buffer *sizeof(int),
722 2) 1 7 10 12 is the sendcounts array
723 3) 100*sizeof(int) is the size of the receiver buffer
724 4) 1 70 10 5 is the recvcounts array
726 double clock = smpi_process()->simulated_elapsed();
728 unsigned long comm_size = MPI_COMM_WORLD->size();
729 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
730 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
731 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
732 std::vector<int> senddisps(comm_size, 0);
733 std::vector<int> recvdisps(comm_size, 0);
735 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
736 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
738 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
739 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
742 int send_buf_size=parse_double(action[2]);
743 int recv_buf_size=parse_double(action[3+comm_size]);
744 int my_proc_id = Actor::self()->getPid();
745 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
746 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
748 for (unsigned int i = 0; i < comm_size; i++) {
749 (*sendcounts)[i] = std::stoi(action[3 + i]);
750 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
752 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
753 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
755 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
756 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
757 Datatype::encode(MPI_CURRENT_TYPE),
758 Datatype::encode(MPI_CURRENT_TYPE2)));
760 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
761 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
763 TRACE_smpi_comm_out(my_proc_id);
764 log_timed_action (action, clock);
767 }} // namespace simgrid::smpi
769 /** @brief Only initialize the replay, don't do it for real */
770 void smpi_replay_init(int* argc, char*** argv)
772 simgrid::smpi::Process::init(argc, argv);
773 smpi_process()->mark_as_initialized();
774 smpi_process()->set_replaying(true);
776 int my_proc_id = Actor::self()->getPid();
777 TRACE_smpi_init(my_proc_id);
778 TRACE_smpi_computing_init(my_proc_id);
779 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
780 TRACE_smpi_comm_out(my_proc_id);
781 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
782 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
783 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
784 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
785 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
787 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
788 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
789 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
790 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
791 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
792 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
793 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
794 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
795 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
796 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
797 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
798 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
799 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
800 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
801 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
802 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
803 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
804 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
805 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
806 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
807 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
809 //if we have a delayed start, sleep here.
811 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
812 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
813 smpi_execute_flops(value);
815 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
816 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
817 smpi_execute_flops(0.0);
821 /** @brief actually run the replay after initialization */
822 void smpi_replay_main(int* argc, char*** argv)
824 simgrid::xbt::replay_runner(*argc, *argv);
826 /* and now, finalize everything */
827 /* One active process will stop. Decrease the counter*/
828 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
829 if (not get_reqq_self()->empty()) {
830 unsigned int count_requests=get_reqq_self()->size();
831 MPI_Request requests[count_requests];
832 MPI_Status status[count_requests];
835 for (auto const& req : *get_reqq_self()) {
839 simgrid::smpi::Request::waitall(count_requests, requests, status);
841 delete get_reqq_self();
844 if(active_processes==0){
845 /* Last process alive speaking: end the simulated timer */
846 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
847 smpi_free_replay_tmp_buffers();
850 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
852 smpi_process()->finalize();
854 TRACE_smpi_comm_out(Actor::self()->getPid());
855 TRACE_smpi_finalize(Actor::self()->getPid());
858 /** @brief chain a replay initialization and a replay start */
859 void smpi_replay_run(int* argc, char*** argv)
861 smpi_replay_init(argc, argv);
862 smpi_replay_main(argc, argv);