1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
44 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45 std::string s = boost::algorithm::join(action, " ");
46 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
50 static std::vector<MPI_Request>* get_reqq_self()
52 return reqq.at(Actor::self()->getPid());
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
57 reqq.insert({Actor::self()->getPid(), mpi_request});
61 static double parse_double(std::string string)
63 return xbt_str_parse_double(string.c_str(), "%s is not a double");
70 class ActionArgParser {
72 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
75 class SendRecvParser : public ActionArgParser {
77 /* communication partner; if we send, this is the receiver and vice versa */
80 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
82 void parse(simgrid::xbt::ReplayAction& action) override
84 CHECK_ACTION_PARAMS(action, 2, 1)
85 partner = std::stoi(action[2]);
86 size = parse_double(action[3]);
87 if (action.size() > 4)
88 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
92 class ComputeParser : public ActionArgParser {
94 /* communication partner; if we send, this is the receiver and vice versa */
97 void parse(simgrid::xbt::ReplayAction& action) override
99 CHECK_ACTION_PARAMS(action, 1, 0)
100 flops = parse_double(action[2]);
104 template <class T> class ReplayAction {
106 const std::string name;
112 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
114 virtual void execute(simgrid::xbt::ReplayAction& action)
116 // Needs to be re-initialized for every action, hence here
117 double start_time = smpi_process()->simulated_elapsed();
120 log_timed_action(action, start_time);
123 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
126 class WaitAction : public ReplayAction<ActionArgParser> {
128 WaitAction() : ReplayAction("Wait") {}
129 void kernel(simgrid::xbt::ReplayAction& action) override
131 CHECK_ACTION_PARAMS(action, 0, 0)
134 std::string s = boost::algorithm::join(action, " ");
135 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
136 MPI_Request request = get_reqq_self()->back();
137 get_reqq_self()->pop_back();
139 if (request == nullptr) {
140 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
145 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
147 MPI_Group group = request->comm()->group();
148 int src_traced = group->rank(request->src());
149 int dst_traced = group->rank(request->dst());
150 bool is_wait_for_receive = (request->flags() & RECV);
151 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
152 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
154 Request::wait(&request, &status);
156 TRACE_smpi_comm_out(rank);
157 if (is_wait_for_receive)
158 TRACE_smpi_recv(src_traced, dst_traced, 0);
162 class SendAction : public ReplayAction<SendRecvParser> {
164 SendAction() = delete;
165 SendAction(std::string name) : ReplayAction(name) {}
166 void kernel(simgrid::xbt::ReplayAction& action) override
168 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
170 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
171 Datatype::encode(args.datatype1)));
172 if (not TRACE_smpi_view_internals())
173 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
175 if (name == "send") {
176 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
177 } else if (name == "Isend") {
178 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
179 get_reqq_self()->push_back(request);
181 xbt_die("Don't know this action, %s", name.c_str());
184 TRACE_smpi_comm_out(my_proc_id);
188 class RecvAction : public ReplayAction<SendRecvParser> {
190 RecvAction() = delete;
191 explicit RecvAction(std::string name) : ReplayAction(name) {}
192 void kernel(simgrid::xbt::ReplayAction& action) override
194 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
196 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
197 Datatype::encode(args.datatype1)));
200 // unknown size from the receiver point of view
201 if (args.size <= 0.0) {
202 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
203 args.size = status.count;
206 if (name == "recv") {
207 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
208 } else if (name == "Irecv") {
209 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
210 get_reqq_self()->push_back(request);
213 TRACE_smpi_comm_out(my_proc_id);
214 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
215 if (name == "recv" && not TRACE_smpi_view_internals()) {
216 TRACE_smpi_recv(src_traced, my_proc_id, 0);
221 class ComputeAction : public ReplayAction<ComputeParser> {
223 ComputeAction() : ReplayAction("compute") {}
224 void kernel(simgrid::xbt::ReplayAction& action) override
226 TRACE_smpi_computing_in(my_proc_id, args.flops);
227 smpi_execute_flops(args.flops);
228 TRACE_smpi_computing_out(my_proc_id);
232 } // Replay Namespace
234 static void action_init(simgrid::xbt::ReplayAction& action)
236 XBT_DEBUG("Initialize the counters");
237 CHECK_ACTION_PARAMS(action, 0, 1)
238 if (action.size() > 2)
239 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
241 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
243 /* start a simulated timer */
244 smpi_process()->simulated_start();
245 /*initialize the number of active processes */
246 active_processes = smpi_process_count();
248 set_reqq_self(new std::vector<MPI_Request>);
251 static void action_finalize(simgrid::xbt::ReplayAction& action)
256 static void action_comm_size(simgrid::xbt::ReplayAction& action)
258 communicator_size = parse_double(action[2]);
259 log_timed_action (action, smpi_process()->simulated_elapsed());
262 static void action_comm_split(simgrid::xbt::ReplayAction& action)
264 log_timed_action (action, smpi_process()->simulated_elapsed());
267 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
269 log_timed_action (action, smpi_process()->simulated_elapsed());
272 static void action_compute(simgrid::xbt::ReplayAction& action)
274 Replay::ComputeAction().execute(action);
277 static void action_test(simgrid::xbt::ReplayAction& action)
279 CHECK_ACTION_PARAMS(action, 0, 0)
280 double clock = smpi_process()->simulated_elapsed();
283 MPI_Request request = get_reqq_self()->back();
284 get_reqq_self()->pop_back();
285 //if request is null here, this may mean that a previous test has succeeded
286 //Different times in traced application and replayed version may lead to this
287 //In this case, ignore the extra calls.
288 if(request!=nullptr){
289 int my_proc_id = Actor::self()->getPid();
290 TRACE_smpi_testing_in(my_proc_id);
292 int flag = Request::test(&request, &status);
294 XBT_DEBUG("MPI_Test result: %d", flag);
295 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
296 get_reqq_self()->push_back(request);
298 TRACE_smpi_testing_out(my_proc_id);
300 log_timed_action (action, clock);
303 static void action_waitall(simgrid::xbt::ReplayAction& action)
305 CHECK_ACTION_PARAMS(action, 0, 0)
306 double clock = smpi_process()->simulated_elapsed();
307 const unsigned int count_requests = get_reqq_self()->size();
309 if (count_requests>0) {
310 MPI_Status status[count_requests];
312 int my_proc_id_traced = Actor::self()->getPid();
313 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
314 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
315 int recvs_snd[count_requests];
316 int recvs_rcv[count_requests];
317 for (unsigned int i = 0; i < count_requests; i++) {
318 const auto& req = (*get_reqq_self())[i];
319 if (req && (req->flags() & RECV)) {
320 recvs_snd[i] = req->src();
321 recvs_rcv[i] = req->dst();
325 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
327 for (unsigned i = 0; i < count_requests; i++) {
328 if (recvs_snd[i]!=-100)
329 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
331 TRACE_smpi_comm_out(my_proc_id_traced);
333 log_timed_action (action, clock);
336 static void action_barrier(simgrid::xbt::ReplayAction& action)
338 double clock = smpi_process()->simulated_elapsed();
339 int my_proc_id = Actor::self()->getPid();
340 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
342 Colls::barrier(MPI_COMM_WORLD);
344 TRACE_smpi_comm_out(my_proc_id);
345 log_timed_action (action, clock);
348 static void action_bcast(simgrid::xbt::ReplayAction& action)
350 CHECK_ACTION_PARAMS(action, 1, 2)
351 double size = parse_double(action[2]);
352 double clock = smpi_process()->simulated_elapsed();
353 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
354 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
355 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
357 int my_proc_id = Actor::self()->getPid();
358 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
359 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
360 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
362 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
364 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
366 TRACE_smpi_comm_out(my_proc_id);
367 log_timed_action (action, clock);
370 static void action_reduce(simgrid::xbt::ReplayAction& action)
372 CHECK_ACTION_PARAMS(action, 2, 2)
373 double comm_size = parse_double(action[2]);
374 double comp_size = parse_double(action[3]);
375 double clock = smpi_process()->simulated_elapsed();
376 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
378 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
380 int my_proc_id = Actor::self()->getPid();
381 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
382 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
383 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
385 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
386 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
387 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
388 smpi_execute_flops(comp_size);
390 TRACE_smpi_comm_out(my_proc_id);
391 log_timed_action (action, clock);
394 static void action_allReduce(simgrid::xbt::ReplayAction& action)
396 CHECK_ACTION_PARAMS(action, 2, 1)
397 double comm_size = parse_double(action[2]);
398 double comp_size = parse_double(action[3]);
400 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
402 double clock = smpi_process()->simulated_elapsed();
403 int my_proc_id = Actor::self()->getPid();
404 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
405 Datatype::encode(MPI_CURRENT_TYPE), ""));
407 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
408 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
409 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
410 smpi_execute_flops(comp_size);
412 TRACE_smpi_comm_out(my_proc_id);
413 log_timed_action (action, clock);
416 static void action_allToAll(simgrid::xbt::ReplayAction& action)
418 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
419 double clock = smpi_process()->simulated_elapsed();
420 unsigned long comm_size = MPI_COMM_WORLD->size();
421 int send_size = parse_double(action[2]);
422 int recv_size = parse_double(action[3]);
423 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
424 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
426 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
427 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
429 int my_proc_id = Actor::self()->getPid();
430 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
431 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
432 Datatype::encode(MPI_CURRENT_TYPE),
433 Datatype::encode(MPI_CURRENT_TYPE2)));
435 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
437 TRACE_smpi_comm_out(my_proc_id);
438 log_timed_action (action, clock);
441 static void action_gather(simgrid::xbt::ReplayAction& action)
443 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
446 1) 68 is the sendcounts
447 2) 68 is the recvcounts
448 3) 0 is the root node
449 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
450 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
452 CHECK_ACTION_PARAMS(action, 2, 3)
453 double clock = smpi_process()->simulated_elapsed();
454 unsigned long comm_size = MPI_COMM_WORLD->size();
455 int send_size = parse_double(action[2]);
456 int recv_size = parse_double(action[3]);
457 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
458 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
460 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
461 void *recv = nullptr;
462 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
463 int rank = MPI_COMM_WORLD->rank();
466 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
468 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
469 Datatype::encode(MPI_CURRENT_TYPE),
470 Datatype::encode(MPI_CURRENT_TYPE2)));
472 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
474 TRACE_smpi_comm_out(Actor::self()->getPid());
475 log_timed_action (action, clock);
478 static void action_scatter(simgrid::xbt::ReplayAction& action)
480 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
483 1) 68 is the sendcounts
484 2) 68 is the recvcounts
485 3) 0 is the root node
486 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
487 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
489 CHECK_ACTION_PARAMS(action, 2, 3)
490 double clock = smpi_process()->simulated_elapsed();
491 unsigned long comm_size = MPI_COMM_WORLD->size();
492 int send_size = parse_double(action[2]);
493 int recv_size = parse_double(action[3]);
494 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
495 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
497 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
498 void* recv = nullptr;
499 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
500 int rank = MPI_COMM_WORLD->rank();
503 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
505 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
506 Datatype::encode(MPI_CURRENT_TYPE),
507 Datatype::encode(MPI_CURRENT_TYPE2)));
509 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
511 TRACE_smpi_comm_out(Actor::self()->getPid());
512 log_timed_action(action, clock);
515 static void action_gatherv(simgrid::xbt::ReplayAction& action)
517 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
518 0 gather 68 68 10 10 10 0 0 0
520 1) 68 is the sendcount
521 2) 68 10 10 10 is the recvcounts
522 3) 0 is the root node
523 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
524 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
526 double clock = smpi_process()->simulated_elapsed();
527 unsigned long comm_size = MPI_COMM_WORLD->size();
528 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
529 int send_size = parse_double(action[2]);
530 std::vector<int> disps(comm_size, 0);
531 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
533 MPI_Datatype MPI_CURRENT_TYPE =
534 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
535 MPI_Datatype MPI_CURRENT_TYPE2{
536 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
538 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
539 void *recv = nullptr;
540 for (unsigned int i = 0; i < comm_size; i++) {
541 (*recvcounts)[i] = std::stoi(action[i + 3]);
543 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
545 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
546 int rank = MPI_COMM_WORLD->rank();
549 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
551 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
552 "gatherV", root, send_size, nullptr, -1, recvcounts,
553 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
555 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
558 TRACE_smpi_comm_out(Actor::self()->getPid());
559 log_timed_action (action, clock);
562 static void action_scatterv(simgrid::xbt::ReplayAction& action)
564 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
565 0 gather 68 10 10 10 68 0 0 0
567 1) 68 10 10 10 is the sendcounts
568 2) 68 is the recvcount
569 3) 0 is the root node
570 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
571 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
573 double clock = smpi_process()->simulated_elapsed();
574 unsigned long comm_size = MPI_COMM_WORLD->size();
575 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
576 int recv_size = parse_double(action[2 + comm_size]);
577 std::vector<int> disps(comm_size, 0);
578 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
580 MPI_Datatype MPI_CURRENT_TYPE =
581 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
582 MPI_Datatype MPI_CURRENT_TYPE2{
583 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
585 void* send = nullptr;
586 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
587 for (unsigned int i = 0; i < comm_size; i++) {
588 (*sendcounts)[i] = std::stoi(action[i + 2]);
590 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
592 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
593 int rank = MPI_COMM_WORLD->rank();
596 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
598 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
599 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
600 Datatype::encode(MPI_CURRENT_TYPE2)));
602 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
605 TRACE_smpi_comm_out(Actor::self()->getPid());
606 log_timed_action(action, clock);
609 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
611 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
612 0 reduceScatter 275427 275427 275427 204020 11346849 0
614 1) The first four values after the name of the action declare the recvcounts array
615 2) The value 11346849 is the amount of instructions
616 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
618 double clock = smpi_process()->simulated_elapsed();
619 unsigned long comm_size = MPI_COMM_WORLD->size();
620 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
621 int comp_size = parse_double(action[2+comm_size]);
622 int my_proc_id = Actor::self()->getPid();
623 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
624 MPI_Datatype MPI_CURRENT_TYPE =
625 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
627 for (unsigned int i = 0; i < comm_size; i++) {
628 recvcounts->push_back(std::stoi(action[i + 2]));
630 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
632 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
633 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
634 std::to_string(comp_size), /* ugly hack to print comp_size */
635 Datatype::encode(MPI_CURRENT_TYPE)));
637 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
638 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
640 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
641 smpi_execute_flops(comp_size);
643 TRACE_smpi_comm_out(my_proc_id);
644 log_timed_action (action, clock);
647 static void action_allgather(simgrid::xbt::ReplayAction& action)
649 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
650 0 allGather 275427 275427
652 1) 275427 is the sendcount
653 2) 275427 is the recvcount
654 3) No more values mean that the datatype for sent and receive buffer is the default one, see
655 simgrid::smpi::Datatype::decode().
657 double clock = smpi_process()->simulated_elapsed();
659 CHECK_ACTION_PARAMS(action, 2, 2)
660 int sendcount = std::stoi(action[2]);
661 int recvcount = std::stoi(action[3]);
663 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
664 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
666 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
667 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
669 int my_proc_id = Actor::self()->getPid();
671 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
672 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
673 Datatype::encode(MPI_CURRENT_TYPE),
674 Datatype::encode(MPI_CURRENT_TYPE2)));
676 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
678 TRACE_smpi_comm_out(my_proc_id);
679 log_timed_action (action, clock);
682 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
684 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
685 0 allGatherV 275427 275427 275427 275427 204020
687 1) 275427 is the sendcount
688 2) The next four elements declare the recvcounts array
689 3) No more values mean that the datatype for sent and receive buffer is the default one, see
690 simgrid::smpi::Datatype::decode().
692 double clock = smpi_process()->simulated_elapsed();
694 unsigned long comm_size = MPI_COMM_WORLD->size();
695 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
696 int sendcount = std::stoi(action[2]);
697 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
698 std::vector<int> disps(comm_size, 0);
700 int datatype_index = 0, disp_index = 0;
701 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
702 datatype_index = 3 + comm_size;
703 disp_index = datatype_index + 1;
704 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
706 disp_index = 3 + comm_size;
707 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
708 datatype_index = 3 + comm_size;
711 if (disp_index != 0) {
712 for (unsigned int i = 0; i < comm_size; i++)
713 disps[i] = std::stoi(action[disp_index + i]);
716 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
718 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
721 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
723 for (unsigned int i = 0; i < comm_size; i++) {
724 (*recvcounts)[i] = std::stoi(action[i + 3]);
726 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
727 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
729 int my_proc_id = Actor::self()->getPid();
731 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
732 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
733 Datatype::encode(MPI_CURRENT_TYPE),
734 Datatype::encode(MPI_CURRENT_TYPE2)));
736 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
739 TRACE_smpi_comm_out(my_proc_id);
740 log_timed_action (action, clock);
743 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
745 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
746 0 allToAllV 100 1 7 10 12 100 1 70 10 5
748 1) 100 is the size of the send buffer *sizeof(int),
749 2) 1 7 10 12 is the sendcounts array
750 3) 100*sizeof(int) is the size of the receiver buffer
751 4) 1 70 10 5 is the recvcounts array
753 double clock = smpi_process()->simulated_elapsed();
755 unsigned long comm_size = MPI_COMM_WORLD->size();
756 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
757 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
758 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
759 std::vector<int> senddisps(comm_size, 0);
760 std::vector<int> recvdisps(comm_size, 0);
762 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
763 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
765 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
766 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
769 int send_buf_size=parse_double(action[2]);
770 int recv_buf_size=parse_double(action[3+comm_size]);
771 int my_proc_id = Actor::self()->getPid();
772 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
773 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
775 for (unsigned int i = 0; i < comm_size; i++) {
776 (*sendcounts)[i] = std::stoi(action[3 + i]);
777 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
779 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
780 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
782 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
783 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
784 Datatype::encode(MPI_CURRENT_TYPE),
785 Datatype::encode(MPI_CURRENT_TYPE2)));
787 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
788 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
790 TRACE_smpi_comm_out(my_proc_id);
791 log_timed_action (action, clock);
794 }} // namespace simgrid::smpi
796 /** @brief Only initialize the replay, don't do it for real */
797 void smpi_replay_init(int* argc, char*** argv)
799 simgrid::smpi::Process::init(argc, argv);
800 smpi_process()->mark_as_initialized();
801 smpi_process()->set_replaying(true);
803 int my_proc_id = Actor::self()->getPid();
804 TRACE_smpi_init(my_proc_id);
805 TRACE_smpi_computing_init(my_proc_id);
806 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
807 TRACE_smpi_comm_out(my_proc_id);
808 xbt_replay_action_register("init", simgrid::smpi::action_init);
809 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
810 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
811 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
812 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
814 std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
815 std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
816 std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
817 std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
818 std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
820 xbt_replay_action_register("send",
821 std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
822 xbt_replay_action_register("Isend",
823 std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
824 xbt_replay_action_register("recv",
825 std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
826 xbt_replay_action_register("Irecv",
827 std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
828 xbt_replay_action_register("test", simgrid::smpi::action_test);
829 xbt_replay_action_register("wait",
830 std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
831 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
832 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
833 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
834 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
835 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
836 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
837 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
838 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
839 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
840 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
841 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
842 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
843 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
844 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
845 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
847 //if we have a delayed start, sleep here.
849 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
850 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
851 smpi_execute_flops(value);
853 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
854 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
855 smpi_execute_flops(0.0);
859 /** @brief actually run the replay after initialization */
860 void smpi_replay_main(int* argc, char*** argv)
862 simgrid::xbt::replay_runner(*argc, *argv);
864 /* and now, finalize everything */
865 /* One active process will stop. Decrease the counter*/
866 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
867 if (not get_reqq_self()->empty()) {
868 unsigned int count_requests=get_reqq_self()->size();
869 MPI_Request requests[count_requests];
870 MPI_Status status[count_requests];
873 for (auto const& req : *get_reqq_self()) {
877 simgrid::smpi::Request::waitall(count_requests, requests, status);
879 delete get_reqq_self();
882 if(active_processes==0){
883 /* Last process alive speaking: end the simulated timer */
884 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
885 smpi_free_replay_tmp_buffers();
888 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
890 smpi_process()->finalize();
892 TRACE_smpi_comm_out(Actor::self()->getPid());
893 TRACE_smpi_finalize(Actor::self()->getPid());
896 /** @brief chain a replay initialization and a replay start */
897 void smpi_replay_run(int* argc, char*** argv)
899 smpi_replay_init(argc, argv);
900 smpi_replay_main(argc, argv);