1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
44 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45 std::string s = boost::algorithm::join(action, " ");
46 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
50 static std::vector<MPI_Request>* get_reqq_self()
52 return reqq.at(Actor::self()->getPid());
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
57 reqq.insert({Actor::self()->getPid(), mpi_request});
61 static double parse_double(std::string string)
63 return xbt_str_parse_double(string.c_str(), "%s is not a double");
70 class ActionArgParser {
72 virtual void parse(simgrid::xbt::ReplayAction& action){};
75 class SendRecvParser : public ActionArgParser {
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 template <class T> class ReplayAction {
93 const std::string name;
97 * Used to compute the duration of this action.
104 explicit ReplayAction(std::string name) : name(name), start_time(0), my_proc_id(simgrid::s4u::Actor::self()->getPid())
108 virtual void execute(simgrid::xbt::ReplayAction& action)
110 // Needs to be re-initialized for every action, hence here
111 start_time = smpi_process()->simulated_elapsed();
114 log_timed_action(action, start_time);
117 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
120 class WaitAction : public ReplayAction<ActionArgParser> {
122 WaitAction() : ReplayAction("Wait") {}
123 void kernel(simgrid::xbt::ReplayAction& action) override
125 CHECK_ACTION_PARAMS(action, 0, 0)
128 std::string s = boost::algorithm::join(action, " ");
129 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
130 MPI_Request request = get_reqq_self()->back();
131 get_reqq_self()->pop_back();
133 if (request == nullptr) {
134 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
139 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
141 MPI_Group group = request->comm()->group();
142 int src_traced = group->rank(request->src());
143 int dst_traced = group->rank(request->dst());
144 bool is_wait_for_receive = (request->flags() & RECV);
145 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
146 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
148 Request::wait(&request, &status);
150 TRACE_smpi_comm_out(rank);
151 if (is_wait_for_receive)
152 TRACE_smpi_recv(src_traced, dst_traced, 0);
156 class SendAction : public ReplayAction<SendRecvParser> {
158 SendAction() = delete;
159 SendAction(std::string name) : ReplayAction(name) {}
160 void kernel(simgrid::xbt::ReplayAction& action) override
162 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
164 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
165 Datatype::encode(args.datatype1)));
166 if (not TRACE_smpi_view_internals())
167 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
169 if (name == "send") {
170 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
171 } else if (name == "Isend") {
172 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
173 get_reqq_self()->push_back(request);
175 xbt_die("Don't know this action, %s", name.c_str());
178 TRACE_smpi_comm_out(my_proc_id);
182 class RecvAction : public ReplayAction<SendRecvParser> {
184 RecvAction() = delete;
185 explicit RecvAction(std::string name) : ReplayAction(name) {}
186 void kernel(simgrid::xbt::ReplayAction& action) override
188 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
190 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
191 Datatype::encode(args.datatype1)));
194 // unknown size from the receiver point of view
195 if (args.size <= 0.0) {
196 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
197 args.size = status.count;
200 if (name == "recv") {
201 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
202 } else if (name == "Irecv") {
203 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
204 get_reqq_self()->push_back(request);
207 TRACE_smpi_comm_out(my_proc_id);
208 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
209 if (name == "recv" && not TRACE_smpi_view_internals()) {
210 TRACE_smpi_recv(src_traced, my_proc_id, 0);
215 } // Replay Namespace
217 static void action_init(simgrid::xbt::ReplayAction& action)
219 XBT_DEBUG("Initialize the counters");
220 CHECK_ACTION_PARAMS(action, 0, 1)
221 if (action.size() > 2)
222 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
224 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
226 /* start a simulated timer */
227 smpi_process()->simulated_start();
228 /*initialize the number of active processes */
229 active_processes = smpi_process_count();
231 set_reqq_self(new std::vector<MPI_Request>);
234 static void action_finalize(simgrid::xbt::ReplayAction& action)
239 static void action_comm_size(simgrid::xbt::ReplayAction& action)
241 communicator_size = parse_double(action[2]);
242 log_timed_action (action, smpi_process()->simulated_elapsed());
245 static void action_comm_split(simgrid::xbt::ReplayAction& action)
247 log_timed_action (action, smpi_process()->simulated_elapsed());
250 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
252 log_timed_action (action, smpi_process()->simulated_elapsed());
255 static void action_compute(simgrid::xbt::ReplayAction& action)
257 CHECK_ACTION_PARAMS(action, 1, 0)
258 double clock = smpi_process()->simulated_elapsed();
259 double flops= parse_double(action[2]);
260 int my_proc_id = Actor::self()->getPid();
262 TRACE_smpi_computing_in(my_proc_id, flops);
263 smpi_execute_flops(flops);
264 TRACE_smpi_computing_out(my_proc_id);
266 log_timed_action (action, clock);
269 static void action_test(simgrid::xbt::ReplayAction& action)
271 CHECK_ACTION_PARAMS(action, 0, 0)
272 double clock = smpi_process()->simulated_elapsed();
275 MPI_Request request = get_reqq_self()->back();
276 get_reqq_self()->pop_back();
277 //if request is null here, this may mean that a previous test has succeeded
278 //Different times in traced application and replayed version may lead to this
279 //In this case, ignore the extra calls.
280 if(request!=nullptr){
281 int my_proc_id = Actor::self()->getPid();
282 TRACE_smpi_testing_in(my_proc_id);
284 int flag = Request::test(&request, &status);
286 XBT_DEBUG("MPI_Test result: %d", flag);
287 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
288 get_reqq_self()->push_back(request);
290 TRACE_smpi_testing_out(my_proc_id);
292 log_timed_action (action, clock);
295 static void action_waitall(simgrid::xbt::ReplayAction& action)
297 CHECK_ACTION_PARAMS(action, 0, 0)
298 double clock = smpi_process()->simulated_elapsed();
299 const unsigned int count_requests = get_reqq_self()->size();
301 if (count_requests>0) {
302 MPI_Status status[count_requests];
304 int my_proc_id_traced = Actor::self()->getPid();
305 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
306 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
307 int recvs_snd[count_requests];
308 int recvs_rcv[count_requests];
309 for (unsigned int i = 0; i < count_requests; i++) {
310 const auto& req = (*get_reqq_self())[i];
311 if (req && (req->flags() & RECV)) {
312 recvs_snd[i] = req->src();
313 recvs_rcv[i] = req->dst();
317 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
319 for (unsigned i = 0; i < count_requests; i++) {
320 if (recvs_snd[i]!=-100)
321 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
323 TRACE_smpi_comm_out(my_proc_id_traced);
325 log_timed_action (action, clock);
328 static void action_barrier(simgrid::xbt::ReplayAction& action)
330 double clock = smpi_process()->simulated_elapsed();
331 int my_proc_id = Actor::self()->getPid();
332 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
334 Colls::barrier(MPI_COMM_WORLD);
336 TRACE_smpi_comm_out(my_proc_id);
337 log_timed_action (action, clock);
340 static void action_bcast(simgrid::xbt::ReplayAction& action)
342 CHECK_ACTION_PARAMS(action, 1, 2)
343 double size = parse_double(action[2]);
344 double clock = smpi_process()->simulated_elapsed();
345 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
346 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
347 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
349 int my_proc_id = Actor::self()->getPid();
350 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
351 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
352 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
354 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
356 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
358 TRACE_smpi_comm_out(my_proc_id);
359 log_timed_action (action, clock);
362 static void action_reduce(simgrid::xbt::ReplayAction& action)
364 CHECK_ACTION_PARAMS(action, 2, 2)
365 double comm_size = parse_double(action[2]);
366 double comp_size = parse_double(action[3]);
367 double clock = smpi_process()->simulated_elapsed();
368 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
370 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
372 int my_proc_id = Actor::self()->getPid();
373 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
374 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
375 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
377 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
378 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
379 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
380 smpi_execute_flops(comp_size);
382 TRACE_smpi_comm_out(my_proc_id);
383 log_timed_action (action, clock);
386 static void action_allReduce(simgrid::xbt::ReplayAction& action)
388 CHECK_ACTION_PARAMS(action, 2, 1)
389 double comm_size = parse_double(action[2]);
390 double comp_size = parse_double(action[3]);
392 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
394 double clock = smpi_process()->simulated_elapsed();
395 int my_proc_id = Actor::self()->getPid();
396 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
397 Datatype::encode(MPI_CURRENT_TYPE), ""));
399 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
400 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
401 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
402 smpi_execute_flops(comp_size);
404 TRACE_smpi_comm_out(my_proc_id);
405 log_timed_action (action, clock);
408 static void action_allToAll(simgrid::xbt::ReplayAction& action)
410 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
411 double clock = smpi_process()->simulated_elapsed();
412 unsigned long comm_size = MPI_COMM_WORLD->size();
413 int send_size = parse_double(action[2]);
414 int recv_size = parse_double(action[3]);
415 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
416 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
418 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
419 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
421 int my_proc_id = Actor::self()->getPid();
422 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
423 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
424 Datatype::encode(MPI_CURRENT_TYPE),
425 Datatype::encode(MPI_CURRENT_TYPE2)));
427 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
429 TRACE_smpi_comm_out(my_proc_id);
430 log_timed_action (action, clock);
433 static void action_gather(simgrid::xbt::ReplayAction& action)
435 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
438 1) 68 is the sendcounts
439 2) 68 is the recvcounts
440 3) 0 is the root node
441 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
442 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
444 CHECK_ACTION_PARAMS(action, 2, 3)
445 double clock = smpi_process()->simulated_elapsed();
446 unsigned long comm_size = MPI_COMM_WORLD->size();
447 int send_size = parse_double(action[2]);
448 int recv_size = parse_double(action[3]);
449 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
450 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
452 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
453 void *recv = nullptr;
454 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
455 int rank = MPI_COMM_WORLD->rank();
458 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
460 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
461 Datatype::encode(MPI_CURRENT_TYPE),
462 Datatype::encode(MPI_CURRENT_TYPE2)));
464 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
466 TRACE_smpi_comm_out(Actor::self()->getPid());
467 log_timed_action (action, clock);
470 static void action_scatter(simgrid::xbt::ReplayAction& action)
472 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
475 1) 68 is the sendcounts
476 2) 68 is the recvcounts
477 3) 0 is the root node
478 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
479 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
481 CHECK_ACTION_PARAMS(action, 2, 3)
482 double clock = smpi_process()->simulated_elapsed();
483 unsigned long comm_size = MPI_COMM_WORLD->size();
484 int send_size = parse_double(action[2]);
485 int recv_size = parse_double(action[3]);
486 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
487 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
489 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
490 void* recv = nullptr;
491 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
492 int rank = MPI_COMM_WORLD->rank();
495 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
497 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
498 Datatype::encode(MPI_CURRENT_TYPE),
499 Datatype::encode(MPI_CURRENT_TYPE2)));
501 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
503 TRACE_smpi_comm_out(Actor::self()->getPid());
504 log_timed_action(action, clock);
507 static void action_gatherv(simgrid::xbt::ReplayAction& action)
509 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
510 0 gather 68 68 10 10 10 0 0 0
512 1) 68 is the sendcount
513 2) 68 10 10 10 is the recvcounts
514 3) 0 is the root node
515 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
516 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
518 double clock = smpi_process()->simulated_elapsed();
519 unsigned long comm_size = MPI_COMM_WORLD->size();
520 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
521 int send_size = parse_double(action[2]);
522 std::vector<int> disps(comm_size, 0);
523 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
525 MPI_Datatype MPI_CURRENT_TYPE =
526 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
527 MPI_Datatype MPI_CURRENT_TYPE2{
528 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
530 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
531 void *recv = nullptr;
532 for (unsigned int i = 0; i < comm_size; i++) {
533 (*recvcounts)[i] = std::stoi(action[i + 3]);
535 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
537 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
538 int rank = MPI_COMM_WORLD->rank();
541 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
543 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
544 "gatherV", root, send_size, nullptr, -1, recvcounts,
545 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
547 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
550 TRACE_smpi_comm_out(Actor::self()->getPid());
551 log_timed_action (action, clock);
554 static void action_scatterv(simgrid::xbt::ReplayAction& action)
556 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
557 0 gather 68 10 10 10 68 0 0 0
559 1) 68 10 10 10 is the sendcounts
560 2) 68 is the recvcount
561 3) 0 is the root node
562 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
563 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
565 double clock = smpi_process()->simulated_elapsed();
566 unsigned long comm_size = MPI_COMM_WORLD->size();
567 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
568 int recv_size = parse_double(action[2 + comm_size]);
569 std::vector<int> disps(comm_size, 0);
570 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
572 MPI_Datatype MPI_CURRENT_TYPE =
573 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
574 MPI_Datatype MPI_CURRENT_TYPE2{
575 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
577 void* send = nullptr;
578 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
579 for (unsigned int i = 0; i < comm_size; i++) {
580 (*sendcounts)[i] = std::stoi(action[i + 2]);
582 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
584 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
585 int rank = MPI_COMM_WORLD->rank();
588 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
590 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
591 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
592 Datatype::encode(MPI_CURRENT_TYPE2)));
594 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
597 TRACE_smpi_comm_out(Actor::self()->getPid());
598 log_timed_action(action, clock);
601 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
603 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
604 0 reduceScatter 275427 275427 275427 204020 11346849 0
606 1) The first four values after the name of the action declare the recvcounts array
607 2) The value 11346849 is the amount of instructions
608 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
610 double clock = smpi_process()->simulated_elapsed();
611 unsigned long comm_size = MPI_COMM_WORLD->size();
612 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
613 int comp_size = parse_double(action[2+comm_size]);
614 int my_proc_id = Actor::self()->getPid();
615 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
616 MPI_Datatype MPI_CURRENT_TYPE =
617 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
619 for (unsigned int i = 0; i < comm_size; i++) {
620 recvcounts->push_back(std::stoi(action[i + 2]));
622 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
624 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
625 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
626 std::to_string(comp_size), /* ugly hack to print comp_size */
627 Datatype::encode(MPI_CURRENT_TYPE)));
629 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
630 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
632 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
633 smpi_execute_flops(comp_size);
635 TRACE_smpi_comm_out(my_proc_id);
636 log_timed_action (action, clock);
639 static void action_allgather(simgrid::xbt::ReplayAction& action)
641 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
642 0 allGather 275427 275427
644 1) 275427 is the sendcount
645 2) 275427 is the recvcount
646 3) No more values mean that the datatype for sent and receive buffer is the default one, see
647 simgrid::smpi::Datatype::decode().
649 double clock = smpi_process()->simulated_elapsed();
651 CHECK_ACTION_PARAMS(action, 2, 2)
652 int sendcount = std::stoi(action[2]);
653 int recvcount = std::stoi(action[3]);
655 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
656 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
658 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
659 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
661 int my_proc_id = Actor::self()->getPid();
663 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
664 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
665 Datatype::encode(MPI_CURRENT_TYPE),
666 Datatype::encode(MPI_CURRENT_TYPE2)));
668 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
670 TRACE_smpi_comm_out(my_proc_id);
671 log_timed_action (action, clock);
674 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
676 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
677 0 allGatherV 275427 275427 275427 275427 204020
679 1) 275427 is the sendcount
680 2) The next four elements declare the recvcounts array
681 3) No more values mean that the datatype for sent and receive buffer is the default one, see
682 simgrid::smpi::Datatype::decode().
684 double clock = smpi_process()->simulated_elapsed();
686 unsigned long comm_size = MPI_COMM_WORLD->size();
687 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
688 int sendcount = std::stoi(action[2]);
689 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
690 std::vector<int> disps(comm_size, 0);
692 int datatype_index = 0, disp_index = 0;
693 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
694 datatype_index = 3 + comm_size;
695 disp_index = datatype_index + 1;
696 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
698 disp_index = 3 + comm_size;
699 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
700 datatype_index = 3 + comm_size;
703 if (disp_index != 0) {
704 for (unsigned int i = 0; i < comm_size; i++)
705 disps[i] = std::stoi(action[disp_index + i]);
708 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
710 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
713 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
715 for (unsigned int i = 0; i < comm_size; i++) {
716 (*recvcounts)[i] = std::stoi(action[i + 3]);
718 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
719 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
721 int my_proc_id = Actor::self()->getPid();
723 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
724 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
725 Datatype::encode(MPI_CURRENT_TYPE),
726 Datatype::encode(MPI_CURRENT_TYPE2)));
728 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
731 TRACE_smpi_comm_out(my_proc_id);
732 log_timed_action (action, clock);
735 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
737 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
738 0 allToAllV 100 1 7 10 12 100 1 70 10 5
740 1) 100 is the size of the send buffer *sizeof(int),
741 2) 1 7 10 12 is the sendcounts array
742 3) 100*sizeof(int) is the size of the receiver buffer
743 4) 1 70 10 5 is the recvcounts array
745 double clock = smpi_process()->simulated_elapsed();
747 unsigned long comm_size = MPI_COMM_WORLD->size();
748 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
749 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
750 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
751 std::vector<int> senddisps(comm_size, 0);
752 std::vector<int> recvdisps(comm_size, 0);
754 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
755 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
757 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
758 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
761 int send_buf_size=parse_double(action[2]);
762 int recv_buf_size=parse_double(action[3+comm_size]);
763 int my_proc_id = Actor::self()->getPid();
764 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
765 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
767 for (unsigned int i = 0; i < comm_size; i++) {
768 (*sendcounts)[i] = std::stoi(action[3 + i]);
769 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
771 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
772 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
774 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
775 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
776 Datatype::encode(MPI_CURRENT_TYPE),
777 Datatype::encode(MPI_CURRENT_TYPE2)));
779 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
780 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
782 TRACE_smpi_comm_out(my_proc_id);
783 log_timed_action (action, clock);
786 }} // namespace simgrid::smpi
788 /** @brief Only initialize the replay, don't do it for real */
789 void smpi_replay_init(int* argc, char*** argv)
791 simgrid::smpi::Process::init(argc, argv);
792 smpi_process()->mark_as_initialized();
793 smpi_process()->set_replaying(true);
795 int my_proc_id = Actor::self()->getPid();
796 TRACE_smpi_init(my_proc_id);
797 TRACE_smpi_computing_init(my_proc_id);
798 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
799 TRACE_smpi_comm_out(my_proc_id);
800 xbt_replay_action_register("init", simgrid::smpi::action_init);
801 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
802 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
803 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
804 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
806 std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
807 std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
808 std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
809 std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
810 std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
812 xbt_replay_action_register("send",
813 std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
814 xbt_replay_action_register("Isend",
815 std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
816 xbt_replay_action_register("recv",
817 std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
818 xbt_replay_action_register("Irecv",
819 std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
820 xbt_replay_action_register("test", simgrid::smpi::action_test);
821 xbt_replay_action_register("wait",
822 std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
823 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
824 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
825 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
826 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
827 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
828 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
829 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
830 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
831 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
832 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
833 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
834 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
835 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
836 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
837 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
839 //if we have a delayed start, sleep here.
841 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
842 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
843 smpi_execute_flops(value);
845 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
846 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
847 smpi_execute_flops(0.0);
851 /** @brief actually run the replay after initialization */
852 void smpi_replay_main(int* argc, char*** argv)
854 simgrid::xbt::replay_runner(*argc, *argv);
856 /* and now, finalize everything */
857 /* One active process will stop. Decrease the counter*/
858 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
859 if (not get_reqq_self()->empty()) {
860 unsigned int count_requests=get_reqq_self()->size();
861 MPI_Request requests[count_requests];
862 MPI_Status status[count_requests];
865 for (auto const& req : *get_reqq_self()) {
869 simgrid::smpi::Request::waitall(count_requests, requests, status);
871 delete get_reqq_self();
874 if(active_processes==0){
875 /* Last process alive speaking: end the simulated timer */
876 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
877 smpi_free_replay_tmp_buffers();
880 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
882 smpi_process()->finalize();
884 TRACE_smpi_comm_out(Actor::self()->getPid());
885 TRACE_smpi_finalize(Actor::self()->getPid());
888 /** @brief chain a replay initialization and a replay start */
889 void smpi_replay_run(int* argc, char*** argv)
891 smpi_replay_init(argc, argv);
892 smpi_replay_main(argc, argv);