1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
20 #include <unordered_map>
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
31 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
36 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
42 static void apply(size_t& seed, Tuple const& tuple)
44 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45 hash_combine(seed, std::get<Index>(tuple));
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
51 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 template <typename... TT> class hash<std::tuple<TT...>> {
56 size_t operator()(std::tuple<TT...> const& tt) const
59 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
67 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68 std::string s = boost::algorithm::join(action, " ");
69 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
73 /* Helper functions */
74 static double parse_double(const std::string& string)
76 return xbt_str_parse_double(string.c_str(), "not a double");
79 template <typename T> static T parse_integer(const std::string& string)
81 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
82 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
83 val <= static_cast<double>(std::numeric_limits<T>::max()),
84 "out of range: %g", val);
85 return static_cast<T>(val);
88 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
90 return i < action.size() ? std::stoi(action[i]) : 0;
93 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
95 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
102 MPI_Datatype MPI_DEFAULT_TYPE;
104 class RequestStorage {
106 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
107 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
112 RequestStorage() = default;
113 size_t size() const { return store.size(); }
115 req_storage_t& get_store() { return store; }
117 void get_requests(std::vector<MPI_Request>& vec) const
119 for (auto const& pair : store) {
120 auto& req = pair.second;
121 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
122 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123 vec.push_back(pair.second);
124 pair.second->print_request("MM");
129 MPI_Request find(int src, int dst, int tag)
131 auto it = store.find(req_key_t(src, dst, tag));
132 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
135 void remove(const Request* req)
137 if (req == MPI_REQUEST_NULL) return;
139 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
142 void add(MPI_Request req)
144 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
145 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
148 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149 void addNullRequest(int src, int dst, int tag)
151 store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
156 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
158 CHECK_ACTION_PARAMS(action, 3, 0)
159 src = std::stoi(action[2]);
160 dst = std::stoi(action[3]);
161 tag = std::stoi(action[4]);
164 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 CHECK_ACTION_PARAMS(action, 3, 1)
167 partner = std::stoi(action[2]);
168 tag = std::stoi(action[3]);
169 size = parse_integer<size_t>(action[4]);
170 datatype1 = parse_datatype(action, 5);
173 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 CHECK_ACTION_PARAMS(action, 1, 0)
176 flops = parse_double(action[2]);
179 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
181 CHECK_ACTION_PARAMS(action, 1, 0)
182 time = parse_double(action[2]);
185 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
187 CHECK_ACTION_PARAMS(action, 2, 0)
188 filename = std::string(action[2]);
189 line = std::stoi(action[3]);
192 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 CHECK_ACTION_PARAMS(action, 1, 2)
195 size = parse_integer<size_t>(action[2]);
196 root = parse_root(action, 3);
197 datatype1 = parse_datatype(action, 4);
200 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 CHECK_ACTION_PARAMS(action, 2, 2)
203 comm_size = parse_integer<unsigned>(action[2]);
204 comp_size = parse_double(action[3]);
205 root = parse_root(action, 4);
206 datatype1 = parse_datatype(action, 5);
209 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 comm_size = parse_integer<unsigned>(action[2]);
213 comp_size = parse_double(action[3]);
214 datatype1 = parse_datatype(action, 4);
217 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
219 CHECK_ACTION_PARAMS(action, 2, 1)
220 comm_size = MPI_COMM_WORLD->size();
221 send_size = parse_integer<int>(action[2]);
222 recv_size = parse_integer<int>(action[3]);
223 datatype1 = parse_datatype(action, 4);
224 datatype2 = parse_datatype(action, 5);
227 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
229 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
232 1) 68 is the sendcounts
233 2) 68 is the recvcounts
234 3) 0 is the root node
235 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
236 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
238 CHECK_ACTION_PARAMS(action, 2, 3)
239 comm_size = MPI_COMM_WORLD->size();
240 send_size = parse_integer<int>(action[2]);
241 recv_size = parse_integer<int>(action[3]);
243 if (name == "gather") {
244 root = parse_root(action, 4);
245 datatype1 = parse_datatype(action, 5);
246 datatype2 = parse_datatype(action, 6);
249 datatype1 = parse_datatype(action, 4);
250 datatype2 = parse_datatype(action, 5);
254 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
256 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
257 0 gather 68 68 10 10 10 0 0 0
259 1) 68 is the sendcount
260 2) 68 10 10 10 is the recvcounts
261 3) 0 is the root node
262 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
263 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
265 comm_size = MPI_COMM_WORLD->size();
266 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
267 send_size = parse_integer<int>(action[2]);
268 disps = std::vector<int>(comm_size, 0);
269 recvcounts = std::make_shared<std::vector<int>>(comm_size);
271 if (name == "gatherv") {
272 root = parse_root(action, 3 + comm_size);
273 datatype1 = parse_datatype(action, 4 + comm_size);
274 datatype2 = parse_datatype(action, 5 + comm_size);
277 unsigned disp_index = 0;
278 /* The 3 comes from "0 gather <sendcount>", which must always be present.
279 * The + comm_size is the recvcounts array, which must also be present
281 if (action.size() > 3 + comm_size + comm_size) {
282 // datatype + disp are specified
283 datatype1 = parse_datatype(action, 3 + comm_size);
284 datatype2 = parse_datatype(action, 4 + comm_size);
285 disp_index = 5 + comm_size;
286 } else if (action.size() > 3 + comm_size + 2) {
287 // disps specified; datatype is not specified; use the default one
288 datatype1 = MPI_DEFAULT_TYPE;
289 datatype2 = MPI_DEFAULT_TYPE;
290 disp_index = 3 + comm_size;
292 // no disp specified, maybe only datatype,
293 datatype1 = parse_datatype(action, 3 + comm_size);
294 datatype2 = parse_datatype(action, 4 + comm_size);
297 if (disp_index != 0) {
298 xbt_assert(disp_index + comm_size <= action.size());
299 for (unsigned i = 0; i < comm_size; i++)
300 disps[i] = std::stoi(action[disp_index + i]);
304 for (unsigned int i = 0; i < comm_size; i++) {
305 (*recvcounts)[i] = std::stoi(action[i + 3]);
307 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
310 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
312 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
315 1) 68 is the sendcounts
316 2) 68 is the recvcounts
317 3) 0 is the root node
318 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
319 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
321 comm_size = MPI_COMM_WORLD->size();
322 CHECK_ACTION_PARAMS(action, 2, 3)
323 comm_size = MPI_COMM_WORLD->size();
324 send_size = parse_integer<int>(action[2]);
325 recv_size = parse_integer<int>(action[3]);
326 root = parse_root(action, 4);
327 datatype1 = parse_datatype(action, 5);
328 datatype2 = parse_datatype(action, 6);
331 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
333 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
334 0 gather 68 10 10 10 68 0 0 0
336 1) 68 10 10 10 is the sendcounts
337 2) 68 is the recvcount
338 3) 0 is the root node
339 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
340 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
342 comm_size = MPI_COMM_WORLD->size();
343 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
344 recv_size = parse_integer<int>(action[2 + comm_size]);
345 disps = std::vector<int>(comm_size, 0);
346 sendcounts = std::make_shared<std::vector<int>>(comm_size);
348 root = parse_root(action, 3 + comm_size);
349 datatype1 = parse_datatype(action, 4 + comm_size);
350 datatype2 = parse_datatype(action, 5 + comm_size);
352 for (unsigned int i = 0; i < comm_size; i++) {
353 (*sendcounts)[i] = std::stoi(action[i + 2]);
355 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
358 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
360 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
361 0 reducescatter 275427 275427 275427 204020 11346849 0
363 1) The first four values after the name of the action declare the recvcounts array
364 2) The value 11346849 is the amount of instructions
365 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
367 comm_size = MPI_COMM_WORLD->size();
368 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
369 comp_size = parse_double(action[2 + comm_size]);
370 recvcounts = std::make_shared<std::vector<int>>(comm_size);
371 datatype1 = parse_datatype(action, 3 + comm_size);
373 for (unsigned int i = 0; i < comm_size; i++) {
374 (*recvcounts)[i]= std::stoi(action[i + 2]);
376 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
379 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
381 CHECK_ACTION_PARAMS(action, 1, 1)
382 size = parse_integer<size_t>(action[2]);
383 datatype1 = parse_datatype(action, 3);
386 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
388 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
389 0 alltoallv 100 1 7 10 12 100 1 70 10 5
391 1) 100 is the size of the send buffer *sizeof(int),
392 2) 1 7 10 12 is the sendcounts array
393 3) 100*sizeof(int) is the size of the receiver buffer
394 4) 1 70 10 5 is the recvcounts array
396 comm_size = MPI_COMM_WORLD->size();
397 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
398 sendcounts = std::make_shared<std::vector<int>>(comm_size);
399 recvcounts = std::make_shared<std::vector<int>>(comm_size);
400 senddisps = std::vector<int>(comm_size, 0);
401 recvdisps = std::vector<int>(comm_size, 0);
403 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
404 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
406 send_buf_size = parse_integer<int>(action[2]);
407 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
408 for (unsigned int i = 0; i < comm_size; i++) {
409 (*sendcounts)[i] = std::stoi(action[3 + i]);
410 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
412 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
413 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
416 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
418 std::string s = boost::algorithm::join(action, " ");
419 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
420 const WaitTestParser& args = get_args();
421 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
422 req_storage.remove(request);
424 if (request == MPI_REQUEST_NULL) {
425 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
430 // Must be taken before Request::wait() since the request may be set to
431 // MPI_REQUEST_NULL by Request::wait!
432 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
434 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
437 Request::wait(&request, &status);
439 TRACE_smpi_comm_out(get_pid());
440 if (is_wait_for_receive)
441 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
444 void SendAction::kernel(simgrid::xbt::ReplayAction&)
446 const SendRecvParser& args = get_args();
447 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
451 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
452 if (not TRACE_smpi_view_internals())
453 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
455 if (get_name() == "send") {
456 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
457 } else if (get_name() == "isend") {
458 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
459 req_storage.add(request);
461 xbt_die("Don't know this action, %s", get_name().c_str());
464 TRACE_smpi_comm_out(get_pid());
467 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
469 const SendRecvParser& args = get_args();
472 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
475 // unknown size from the receiver point of view
476 size_t arg_size = args.size;
478 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
479 arg_size = status.count;
482 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
483 if (get_name() == "recv") {
485 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
486 } else if (get_name() == "irecv") {
487 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
488 req_storage.add(request);
493 TRACE_smpi_comm_out(get_pid());
494 if (is_recv && not TRACE_smpi_view_internals()) {
495 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
496 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
500 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
502 const ComputeParser& args = get_args();
503 if (smpi_cfg_simulate_computation()) {
504 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
508 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
510 const SleepParser& args = get_args();
511 XBT_DEBUG("Sleep for: %lf secs", args.time);
512 aid_t pid = simgrid::s4u::this_actor::get_pid();
513 TRACE_smpi_sleeping_in(pid, args.time);
514 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
515 TRACE_smpi_sleeping_out(pid);
518 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
520 const LocationParser& args = get_args();
521 smpi_trace_set_call_location(args.filename.c_str(), args.line);
524 void TestAction::kernel(simgrid::xbt::ReplayAction&)
526 const WaitTestParser& args = get_args();
527 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
528 req_storage.remove(request);
529 // if request is null here, this may mean that a previous test has succeeded
530 // Different times in traced application and replayed version may lead to this
531 // In this case, ignore the extra calls.
532 if (request != MPI_REQUEST_NULL) {
533 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
537 Request::test(&request, &status, &flag);
539 XBT_DEBUG("MPI_Test result: %d", flag);
540 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
542 if (request == MPI_REQUEST_NULL)
543 req_storage.addNullRequest(args.src, args.dst, args.tag);
545 req_storage.add(request);
547 TRACE_smpi_comm_out(get_pid());
551 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
553 CHECK_ACTION_PARAMS(action, 0, 1)
554 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
555 : MPI_BYTE; // default TAU datatype
557 /* start a simulated timer */
558 smpi_process()->simulated_start();
561 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
566 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
568 const size_t count_requests = req_storage.size();
570 if (count_requests > 0) {
571 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
572 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
573 std::vector<MPI_Request> reqs;
574 req_storage.get_requests(reqs);
575 for (auto const& req : reqs) {
576 if (req && (req->flags() & MPI_REQ_RECV)) {
577 sender_receiver.emplace_back(req->src(), req->dst());
580 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
581 req_storage.get_store().clear();
583 for (auto const& pair : sender_receiver) {
584 TRACE_smpi_recv(pair.first, pair.second, 0);
586 TRACE_smpi_comm_out(get_pid());
590 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
592 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
593 colls::barrier(MPI_COMM_WORLD);
594 TRACE_smpi_comm_out(get_pid());
597 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
599 const BcastArgParser& args = get_args();
600 TRACE_smpi_comm_in(get_pid(), "action_bcast",
601 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
602 0, Datatype::encode(args.datatype1), ""));
604 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
606 TRACE_smpi_comm_out(get_pid());
609 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
611 const ReduceArgParser& args = get_args();
612 TRACE_smpi_comm_in(get_pid(), "action_reduce",
613 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
614 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
616 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
617 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
618 args.root, MPI_COMM_WORLD);
619 if(args.comp_size != 0.0)
620 private_execute_flops(args.comp_size);
622 TRACE_smpi_comm_out(get_pid());
625 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
627 const AllReduceArgParser& args = get_args();
628 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
629 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
630 Datatype::encode(args.datatype1), ""));
632 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
633 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
635 if(args.comp_size != 0.0)
636 private_execute_flops(args.comp_size);
638 TRACE_smpi_comm_out(get_pid());
641 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
643 const AllToAllArgParser& args = get_args();
644 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
645 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
646 Datatype::encode(args.datatype1),
647 Datatype::encode(args.datatype2)));
649 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
650 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
653 TRACE_smpi_comm_out(get_pid());
656 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
658 const GatherArgParser& args = get_args();
659 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
660 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
661 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
662 Datatype::encode(args.datatype2)));
664 if (get_name() == "gather") {
665 int rank = MPI_COMM_WORLD->rank();
666 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
667 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
668 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
670 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
671 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
674 TRACE_smpi_comm_out(get_pid());
677 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
679 int rank = MPI_COMM_WORLD->rank();
680 const GatherVArgParser& args = get_args();
681 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
682 new simgrid::instr::VarCollTIData(
683 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
684 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
686 if (get_name() == "gatherv") {
687 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
688 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
689 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
691 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
692 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
693 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
696 TRACE_smpi_comm_out(get_pid());
699 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
701 int rank = MPI_COMM_WORLD->rank();
702 const ScatterArgParser& args = get_args();
703 TRACE_smpi_comm_in(get_pid(), "action_scatter",
704 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
705 Datatype::encode(args.datatype1),
706 Datatype::encode(args.datatype2)));
708 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
709 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
710 args.datatype2, args.root, MPI_COMM_WORLD);
712 TRACE_smpi_comm_out(get_pid());
715 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
717 int rank = MPI_COMM_WORLD->rank();
718 const ScatterVArgParser& args = get_args();
719 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
720 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
721 nullptr, Datatype::encode(args.datatype1),
722 Datatype::encode(args.datatype2)));
724 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
725 args.sendcounts->data(), args.disps.data(), args.datatype1,
726 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
729 TRACE_smpi_comm_out(get_pid());
732 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
734 const ReduceScatterArgParser& args = get_args();
736 get_pid(), "action_reducescatter",
737 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
738 std::to_string(args.comp_size), /* ugly hack to print comp_size */
739 Datatype::encode(args.datatype1)));
741 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
742 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
743 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
745 private_execute_flops(args.comp_size);
746 TRACE_smpi_comm_out(get_pid());
749 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
751 const AllToAllVArgParser& args = get_args();
752 TRACE_smpi_comm_in(get_pid(), __func__,
753 new simgrid::instr::VarCollTIData(
754 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
755 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
757 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
758 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
759 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
761 TRACE_smpi_comm_out(get_pid());
763 } // Replay Namespace
764 }} // namespace simgrid::smpi
766 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
767 /** @brief Only initialize the replay, don't do it for real */
768 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
770 xbt_assert(not smpi_process()->initializing());
772 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
773 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
774 simgrid::smpi::ActorExt::init();
776 smpi_process()->mark_as_initialized();
777 smpi_process()->set_replaying(true);
779 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
780 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
781 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
782 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
783 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
784 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
785 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
786 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
787 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
788 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
789 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
790 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
791 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
792 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
793 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
794 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
795 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
796 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
797 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
798 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
799 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
800 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
801 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
802 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
803 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
804 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
805 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
806 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
807 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
809 //if we have a delayed start, sleep here.
810 if (start_delay_flops > 0) {
811 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
812 private_execute_flops(start_delay_flops);
814 // Wait for the other actors to initialize also
815 simgrid::s4u::this_actor::yield();
819 /** @brief actually run the replay after initialization */
820 void smpi_replay_main(int rank, const char* private_trace_filename)
822 static int active_processes = 0;
824 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
825 std::string rank_string = std::to_string(rank);
826 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
828 /* and now, finalize everything */
829 /* One active process will stop. Decrease the counter*/
830 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
831 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
832 if (count_requests > 0) {
833 std::vector<MPI_Request> requests(count_requests);
836 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
837 requests[i] = pair.second;
840 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
843 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
844 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
848 if(active_processes==0){
849 /* Last process alive speaking: end the simulated timer */
850 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
851 smpi_free_replay_tmp_buffers();
854 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
855 new simgrid::instr::NoOpTIData("finalize"));
857 smpi_process()->finalize();
859 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
862 /** @brief chain a replay initialization and a replay start */
863 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
865 smpi_replay_init(instance_id, rank, start_delay_flops);
866 smpi_replay_main(rank, private_trace_filename);