1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 template <class T> class ReplayAction {
105 const std::string name;
111 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
113 virtual void execute(simgrid::xbt::ReplayAction& action)
115 // Needs to be re-initialized for every action, hence here
116 double start_time = smpi_process()->simulated_elapsed();
120 log_timed_action(action, start_time);
123 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
126 class WaitAction : public ReplayAction<ActionArgParser> {
128 WaitAction() : ReplayAction("Wait") {}
129 void kernel(simgrid::xbt::ReplayAction& action) override
131 CHECK_ACTION_PARAMS(action, 0, 0)
134 std::string s = boost::algorithm::join(action, " ");
135 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
136 MPI_Request request = get_reqq_self()->back();
137 get_reqq_self()->pop_back();
139 if (request == nullptr) {
140 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
145 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
147 // Must be taken before Request::wait() since the request may be set to
148 // MPI_REQUEST_NULL by Request::wait!
149 int src = request->comm()->group()->rank(request->src());
150 int dst = request->comm()->group()->rank(request->dst());
151 bool is_wait_for_receive = (request->flags() & RECV);
152 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
153 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
155 Request::wait(&request, &status);
157 TRACE_smpi_comm_out(rank);
158 if (is_wait_for_receive)
159 TRACE_smpi_recv(src, dst, 0);
163 class SendAction : public ReplayAction<SendRecvParser> {
165 SendAction() = delete;
166 SendAction(std::string name) : ReplayAction(name) {}
167 void kernel(simgrid::xbt::ReplayAction& action) override
169 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
171 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
172 Datatype::encode(args.datatype1)));
173 if (not TRACE_smpi_view_internals())
174 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
176 if (name == "send") {
177 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178 } else if (name == "Isend") {
179 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
180 get_reqq_self()->push_back(request);
182 xbt_die("Don't know this action, %s", name.c_str());
185 TRACE_smpi_comm_out(my_proc_id);
189 class RecvAction : public ReplayAction<SendRecvParser> {
191 RecvAction() = delete;
192 explicit RecvAction(std::string name) : ReplayAction(name) {}
193 void kernel(simgrid::xbt::ReplayAction& action) override
195 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
197 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
198 Datatype::encode(args.datatype1)));
201 // unknown size from the receiver point of view
202 if (args.size <= 0.0) {
203 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
204 args.size = status.count;
207 if (name == "recv") {
208 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
209 } else if (name == "Irecv") {
210 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
211 get_reqq_self()->push_back(request);
214 TRACE_smpi_comm_out(my_proc_id);
215 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
216 if (name == "recv" && not TRACE_smpi_view_internals()) {
217 TRACE_smpi_recv(src_traced, my_proc_id, 0);
222 class ComputeAction : public ReplayAction<ComputeParser> {
224 ComputeAction() : ReplayAction("compute") {}
225 void kernel(simgrid::xbt::ReplayAction& action) override
227 TRACE_smpi_computing_in(my_proc_id, args.flops);
228 smpi_execute_flops(args.flops);
229 TRACE_smpi_computing_out(my_proc_id);
233 class TestAction : public ReplayAction<ActionArgParser> {
235 TestAction() : ReplayAction("Test") {}
236 void kernel(simgrid::xbt::ReplayAction& action) override
238 MPI_Request request = get_reqq_self()->back();
239 get_reqq_self()->pop_back();
240 // if request is null here, this may mean that a previous test has succeeded
241 // Different times in traced application and replayed version may lead to this
242 // In this case, ignore the extra calls.
243 if (request != nullptr) {
244 TRACE_smpi_testing_in(my_proc_id);
247 int flag = Request::test(&request, &status);
249 XBT_DEBUG("MPI_Test result: %d", flag);
250 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
252 get_reqq_self()->push_back(request);
254 TRACE_smpi_testing_out(my_proc_id);
259 class InitAction : public ReplayAction<ActionArgParser> {
261 InitAction() : ReplayAction("Init") {}
262 void kernel(simgrid::xbt::ReplayAction& action) override
264 CHECK_ACTION_PARAMS(action, 0, 1)
265 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
266 : MPI_BYTE; // default TAU datatype
268 /* start a simulated timer */
269 smpi_process()->simulated_start();
270 /*initialize the number of active processes */
271 active_processes = smpi_process_count();
273 set_reqq_self(new std::vector<MPI_Request>);
277 class CommunicatorAction : public ReplayAction<ActionArgParser> {
279 CommunicatorAction() : ReplayAction("Comm") {}
280 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
283 class WaitAllAction : public ReplayAction<ActionArgParser> {
285 WaitAllAction() : ReplayAction("waitAll") {}
286 void kernel(simgrid::xbt::ReplayAction& action) override
288 const unsigned int count_requests = get_reqq_self()->size();
290 if (count_requests > 0) {
291 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
292 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
293 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
294 for (const auto& req : (*get_reqq_self())) {
295 if (req && (req->flags() & RECV)) {
296 sender_receiver.push_back({req->src(), req->dst()});
299 MPI_Status status[count_requests];
300 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
302 for (auto& pair : sender_receiver) {
303 TRACE_smpi_recv(pair.first, pair.second, 0);
305 TRACE_smpi_comm_out(my_proc_id);
310 class BarrierAction : public ReplayAction<ActionArgParser> {
312 BarrierAction() : ReplayAction("barrier") {}
313 void kernel(simgrid::xbt::ReplayAction& action) override
315 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
316 Colls::barrier(MPI_COMM_WORLD);
317 TRACE_smpi_comm_out(my_proc_id);
321 } // Replay Namespace
323 static void action_bcast(simgrid::xbt::ReplayAction& action)
325 CHECK_ACTION_PARAMS(action, 1, 2)
326 double size = parse_double(action[2]);
327 double clock = smpi_process()->simulated_elapsed();
328 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
329 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
330 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
332 int my_proc_id = Actor::self()->getPid();
333 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
334 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
335 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
337 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
339 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
341 TRACE_smpi_comm_out(my_proc_id);
342 log_timed_action (action, clock);
345 static void action_reduce(simgrid::xbt::ReplayAction& action)
347 CHECK_ACTION_PARAMS(action, 2, 2)
348 double comm_size = parse_double(action[2]);
349 double comp_size = parse_double(action[3]);
350 double clock = smpi_process()->simulated_elapsed();
351 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
353 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
355 int my_proc_id = Actor::self()->getPid();
356 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
357 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
358 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
360 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
361 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
362 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
363 smpi_execute_flops(comp_size);
365 TRACE_smpi_comm_out(my_proc_id);
366 log_timed_action (action, clock);
369 static void action_allReduce(simgrid::xbt::ReplayAction& action)
371 CHECK_ACTION_PARAMS(action, 2, 1)
372 double comm_size = parse_double(action[2]);
373 double comp_size = parse_double(action[3]);
375 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
377 double clock = smpi_process()->simulated_elapsed();
378 int my_proc_id = Actor::self()->getPid();
379 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
380 Datatype::encode(MPI_CURRENT_TYPE), ""));
382 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
383 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
384 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
385 smpi_execute_flops(comp_size);
387 TRACE_smpi_comm_out(my_proc_id);
388 log_timed_action (action, clock);
391 static void action_allToAll(simgrid::xbt::ReplayAction& action)
393 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
394 double clock = smpi_process()->simulated_elapsed();
395 unsigned long comm_size = MPI_COMM_WORLD->size();
396 int send_size = parse_double(action[2]);
397 int recv_size = parse_double(action[3]);
398 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
399 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
401 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
402 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
404 int my_proc_id = Actor::self()->getPid();
405 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
406 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
407 Datatype::encode(MPI_CURRENT_TYPE),
408 Datatype::encode(MPI_CURRENT_TYPE2)));
410 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
412 TRACE_smpi_comm_out(my_proc_id);
413 log_timed_action (action, clock);
416 static void action_gather(simgrid::xbt::ReplayAction& action)
418 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
421 1) 68 is the sendcounts
422 2) 68 is the recvcounts
423 3) 0 is the root node
424 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
425 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
427 CHECK_ACTION_PARAMS(action, 2, 3)
428 double clock = smpi_process()->simulated_elapsed();
429 unsigned long comm_size = MPI_COMM_WORLD->size();
430 int send_size = parse_double(action[2]);
431 int recv_size = parse_double(action[3]);
432 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
433 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
435 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
436 void *recv = nullptr;
437 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
438 int rank = MPI_COMM_WORLD->rank();
441 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
443 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
444 Datatype::encode(MPI_CURRENT_TYPE),
445 Datatype::encode(MPI_CURRENT_TYPE2)));
447 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
449 TRACE_smpi_comm_out(Actor::self()->getPid());
450 log_timed_action (action, clock);
453 static void action_scatter(simgrid::xbt::ReplayAction& action)
455 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
458 1) 68 is the sendcounts
459 2) 68 is the recvcounts
460 3) 0 is the root node
461 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
462 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
464 CHECK_ACTION_PARAMS(action, 2, 3)
465 double clock = smpi_process()->simulated_elapsed();
466 unsigned long comm_size = MPI_COMM_WORLD->size();
467 int send_size = parse_double(action[2]);
468 int recv_size = parse_double(action[3]);
469 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
470 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
472 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
473 void* recv = nullptr;
474 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
475 int rank = MPI_COMM_WORLD->rank();
478 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
480 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
481 Datatype::encode(MPI_CURRENT_TYPE),
482 Datatype::encode(MPI_CURRENT_TYPE2)));
484 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
486 TRACE_smpi_comm_out(Actor::self()->getPid());
487 log_timed_action(action, clock);
490 static void action_gatherv(simgrid::xbt::ReplayAction& action)
492 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
493 0 gather 68 68 10 10 10 0 0 0
495 1) 68 is the sendcount
496 2) 68 10 10 10 is the recvcounts
497 3) 0 is the root node
498 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
499 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
501 double clock = smpi_process()->simulated_elapsed();
502 unsigned long comm_size = MPI_COMM_WORLD->size();
503 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
504 int send_size = parse_double(action[2]);
505 std::vector<int> disps(comm_size, 0);
506 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
508 MPI_Datatype MPI_CURRENT_TYPE =
509 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
510 MPI_Datatype MPI_CURRENT_TYPE2{
511 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
513 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
514 void *recv = nullptr;
515 for (unsigned int i = 0; i < comm_size; i++) {
516 (*recvcounts)[i] = std::stoi(action[i + 3]);
518 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
520 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
521 int rank = MPI_COMM_WORLD->rank();
524 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
526 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
527 "gatherV", root, send_size, nullptr, -1, recvcounts,
528 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
530 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
533 TRACE_smpi_comm_out(Actor::self()->getPid());
534 log_timed_action (action, clock);
537 static void action_scatterv(simgrid::xbt::ReplayAction& action)
539 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
540 0 gather 68 10 10 10 68 0 0 0
542 1) 68 10 10 10 is the sendcounts
543 2) 68 is the recvcount
544 3) 0 is the root node
545 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
546 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
548 double clock = smpi_process()->simulated_elapsed();
549 unsigned long comm_size = MPI_COMM_WORLD->size();
550 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
551 int recv_size = parse_double(action[2 + comm_size]);
552 std::vector<int> disps(comm_size, 0);
553 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
555 MPI_Datatype MPI_CURRENT_TYPE =
556 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
557 MPI_Datatype MPI_CURRENT_TYPE2{
558 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
560 void* send = nullptr;
561 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
562 for (unsigned int i = 0; i < comm_size; i++) {
563 (*sendcounts)[i] = std::stoi(action[i + 2]);
565 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
567 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
568 int rank = MPI_COMM_WORLD->rank();
571 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
573 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
574 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
575 Datatype::encode(MPI_CURRENT_TYPE2)));
577 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
580 TRACE_smpi_comm_out(Actor::self()->getPid());
581 log_timed_action(action, clock);
584 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
586 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
587 0 reduceScatter 275427 275427 275427 204020 11346849 0
589 1) The first four values after the name of the action declare the recvcounts array
590 2) The value 11346849 is the amount of instructions
591 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
593 double clock = smpi_process()->simulated_elapsed();
594 unsigned long comm_size = MPI_COMM_WORLD->size();
595 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
596 int comp_size = parse_double(action[2+comm_size]);
597 int my_proc_id = Actor::self()->getPid();
598 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
599 MPI_Datatype MPI_CURRENT_TYPE =
600 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
602 for (unsigned int i = 0; i < comm_size; i++) {
603 recvcounts->push_back(std::stoi(action[i + 2]));
605 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
607 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
608 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
609 std::to_string(comp_size), /* ugly hack to print comp_size */
610 Datatype::encode(MPI_CURRENT_TYPE)));
612 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
613 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
615 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
616 smpi_execute_flops(comp_size);
618 TRACE_smpi_comm_out(my_proc_id);
619 log_timed_action (action, clock);
622 static void action_allgather(simgrid::xbt::ReplayAction& action)
624 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
625 0 allGather 275427 275427
627 1) 275427 is the sendcount
628 2) 275427 is the recvcount
629 3) No more values mean that the datatype for sent and receive buffer is the default one, see
630 simgrid::smpi::Datatype::decode().
632 double clock = smpi_process()->simulated_elapsed();
634 CHECK_ACTION_PARAMS(action, 2, 2)
635 int sendcount = std::stoi(action[2]);
636 int recvcount = std::stoi(action[3]);
638 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
639 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
641 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
642 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
644 int my_proc_id = Actor::self()->getPid();
646 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
647 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
648 Datatype::encode(MPI_CURRENT_TYPE),
649 Datatype::encode(MPI_CURRENT_TYPE2)));
651 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
653 TRACE_smpi_comm_out(my_proc_id);
654 log_timed_action (action, clock);
657 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
659 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
660 0 allGatherV 275427 275427 275427 275427 204020
662 1) 275427 is the sendcount
663 2) The next four elements declare the recvcounts array
664 3) No more values mean that the datatype for sent and receive buffer is the default one, see
665 simgrid::smpi::Datatype::decode().
667 double clock = smpi_process()->simulated_elapsed();
669 unsigned long comm_size = MPI_COMM_WORLD->size();
670 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
671 int sendcount = std::stoi(action[2]);
672 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
673 std::vector<int> disps(comm_size, 0);
675 int datatype_index = 0, disp_index = 0;
676 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
677 datatype_index = 3 + comm_size;
678 disp_index = datatype_index + 1;
679 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
681 disp_index = 3 + comm_size;
682 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
683 datatype_index = 3 + comm_size;
686 if (disp_index != 0) {
687 for (unsigned int i = 0; i < comm_size; i++)
688 disps[i] = std::stoi(action[disp_index + i]);
691 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
693 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
696 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
698 for (unsigned int i = 0; i < comm_size; i++) {
699 (*recvcounts)[i] = std::stoi(action[i + 3]);
701 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
702 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
704 int my_proc_id = Actor::self()->getPid();
706 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
707 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
708 Datatype::encode(MPI_CURRENT_TYPE),
709 Datatype::encode(MPI_CURRENT_TYPE2)));
711 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
714 TRACE_smpi_comm_out(my_proc_id);
715 log_timed_action (action, clock);
718 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
720 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
721 0 allToAllV 100 1 7 10 12 100 1 70 10 5
723 1) 100 is the size of the send buffer *sizeof(int),
724 2) 1 7 10 12 is the sendcounts array
725 3) 100*sizeof(int) is the size of the receiver buffer
726 4) 1 70 10 5 is the recvcounts array
728 double clock = smpi_process()->simulated_elapsed();
730 unsigned long comm_size = MPI_COMM_WORLD->size();
731 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
732 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
733 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
734 std::vector<int> senddisps(comm_size, 0);
735 std::vector<int> recvdisps(comm_size, 0);
737 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
738 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
740 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
741 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
744 int send_buf_size=parse_double(action[2]);
745 int recv_buf_size=parse_double(action[3+comm_size]);
746 int my_proc_id = Actor::self()->getPid();
747 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
748 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
750 for (unsigned int i = 0; i < comm_size; i++) {
751 (*sendcounts)[i] = std::stoi(action[3 + i]);
752 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
754 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
755 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
757 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
758 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
759 Datatype::encode(MPI_CURRENT_TYPE),
760 Datatype::encode(MPI_CURRENT_TYPE2)));
762 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
763 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
765 TRACE_smpi_comm_out(my_proc_id);
766 log_timed_action (action, clock);
769 }} // namespace simgrid::smpi
771 /** @brief Only initialize the replay, don't do it for real */
772 void smpi_replay_init(int* argc, char*** argv)
774 simgrid::smpi::Process::init(argc, argv);
775 smpi_process()->mark_as_initialized();
776 smpi_process()->set_replaying(true);
778 int my_proc_id = Actor::self()->getPid();
779 TRACE_smpi_init(my_proc_id);
780 TRACE_smpi_computing_init(my_proc_id);
781 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
782 TRACE_smpi_comm_out(my_proc_id);
783 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
784 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
785 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
786 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
787 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
789 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
790 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
791 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
792 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
793 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
794 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
795 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
796 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
797 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
798 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
799 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
800 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
801 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
802 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
803 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
804 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
805 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
806 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
807 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
808 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
809 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
811 //if we have a delayed start, sleep here.
813 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
814 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
815 smpi_execute_flops(value);
817 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
818 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
819 smpi_execute_flops(0.0);
823 /** @brief actually run the replay after initialization */
824 void smpi_replay_main(int* argc, char*** argv)
826 simgrid::xbt::replay_runner(*argc, *argv);
828 /* and now, finalize everything */
829 /* One active process will stop. Decrease the counter*/
830 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
831 if (not get_reqq_self()->empty()) {
832 unsigned int count_requests=get_reqq_self()->size();
833 MPI_Request requests[count_requests];
834 MPI_Status status[count_requests];
837 for (auto const& req : *get_reqq_self()) {
841 simgrid::smpi::Request::waitall(count_requests, requests, status);
843 delete get_reqq_self();
846 if(active_processes==0){
847 /* Last process alive speaking: end the simulated timer */
848 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
849 smpi_free_replay_tmp_buffers();
852 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
854 smpi_process()->finalize();
856 TRACE_smpi_comm_out(Actor::self()->getPid());
857 TRACE_smpi_finalize(Actor::self()->getPid());
860 /** @brief chain a replay initialization and a replay start */
861 void smpi_replay_run(int* argc, char*** argv)
863 smpi_replay_init(argc, argv);
864 smpi_replay_main(argc, argv);