1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
23 #include <unordered_map>
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
33 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
38 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
44 static void apply(size_t& seed, Tuple const& tuple)
46 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47 hash_combine(seed, std::get<Index>(tuple));
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
53 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
56 template <typename... TT> class hash<std::tuple<TT...>> {
58 size_t operator()(std::tuple<TT...> const& tt) const
61 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
75 /* Helper functions */
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "not a double");
81 template <typename T> static T parse_integer(const std::string& string)
83 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85 val <= static_cast<double>(std::numeric_limits<T>::max()),
86 "out of range: %g", val);
87 return static_cast<T>(val);
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
92 return i < action.size() ? std::stoi(action[i]) : 0;
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
97 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
100 namespace simgrid::smpi::replay {
101 MPI_Datatype MPI_DEFAULT_TYPE;
103 class RequestStorage {
105 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
106 using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
111 RequestStorage() = default;
112 size_t size() const { return store.size(); }
114 req_storage_t& get_store() { return store; }
116 void get_requests(std::vector<MPI_Request>& vec) const
118 for (auto const& [_, reqs] : store) {
119 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
120 for (auto& req: reqs){
121 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123 req->print_request("MM");
129 MPI_Request pop(int src, int dst, int tag)
131 auto it = store.find(req_key_t(src, dst, tag));
132 if (it == store.end())
133 return MPI_REQUEST_NULL;
134 MPI_Request req = it->second.front();
135 it->second.pop_front();
136 if(it->second.empty())
137 store.erase(req_key_t(src, dst, tag));
141 void add(MPI_Request req)
143 if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
144 store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
148 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149 void addNullRequest(int src, int dst, int tag)
151 int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
152 int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
153 store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
157 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
159 CHECK_ACTION_PARAMS(action, 3, 0)
160 src = std::stoi(action[2]);
161 dst = std::stoi(action[3]);
162 tag = std::stoi(action[4]);
165 void SendOrRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
167 CHECK_ACTION_PARAMS(action, 3, 1)
168 partner = std::stoi(action[2]);
169 tag = std::stoi(action[3]);
170 size = parse_integer<size_t>(action[4]);
171 datatype1 = parse_datatype(action, 5);
174 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 1, 0)
177 flops = parse_double(action[2]);
180 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 CHECK_ACTION_PARAMS(action, 1, 0)
183 time = parse_double(action[2]);
186 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
188 CHECK_ACTION_PARAMS(action, 2, 0)
189 filename = std::string(action[2]);
190 line = std::stoi(action[3]);
193 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
195 CHECK_ACTION_PARAMS(action, 6, 0)
196 sendcount = parse_integer<int>(action[2]);
197 dst = std::stoi(action[3]);
198 recvcount = parse_integer<int>(action[4]);
199 src = std::stoi(action[5]);
200 datatype1 = parse_datatype(action, 6);
201 datatype2 = parse_datatype(action, 7);
204 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
206 CHECK_ACTION_PARAMS(action, 1, 2)
207 size = parse_integer<size_t>(action[2]);
208 root = parse_root(action, 3);
209 datatype1 = parse_datatype(action, 4);
212 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
214 CHECK_ACTION_PARAMS(action, 2, 2)
215 comm_size = parse_integer<unsigned>(action[2]);
216 comp_size = parse_double(action[3]);
217 root = parse_root(action, 4);
218 datatype1 = parse_datatype(action, 5);
221 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
223 CHECK_ACTION_PARAMS(action, 2, 1)
224 comm_size = parse_integer<unsigned>(action[2]);
225 comp_size = parse_double(action[3]);
226 datatype1 = parse_datatype(action, 4);
229 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
231 CHECK_ACTION_PARAMS(action, 2, 1)
232 comm_size = MPI_COMM_WORLD->size();
233 send_size = parse_integer<int>(action[2]);
234 recv_size = parse_integer<int>(action[3]);
235 datatype1 = parse_datatype(action, 4);
236 datatype2 = parse_datatype(action, 5);
239 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
241 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
244 1) 68 is the sendcounts
245 2) 68 is the recvcounts
246 3) 0 is the root node
247 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
250 CHECK_ACTION_PARAMS(action, 2, 3)
251 comm_size = MPI_COMM_WORLD->size();
252 send_size = parse_integer<int>(action[2]);
253 recv_size = parse_integer<int>(action[3]);
255 if (name == "gather") {
256 root = parse_root(action, 4);
257 datatype1 = parse_datatype(action, 5);
258 datatype2 = parse_datatype(action, 6);
261 datatype1 = parse_datatype(action, 4);
262 datatype2 = parse_datatype(action, 5);
266 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
268 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
269 0 gather 68 68 10 10 10 0 0 0
271 1) 68 is the sendcount
272 2) 68 10 10 10 is the recvcounts
273 3) 0 is the root node
274 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
275 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
277 comm_size = MPI_COMM_WORLD->size();
278 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
279 send_size = parse_integer<int>(action[2]);
280 disps = std::vector<int>(comm_size, 0);
281 recvcounts = std::make_shared<std::vector<int>>(comm_size);
283 if (name == "gatherv") {
284 root = parse_root(action, 3 + comm_size);
285 datatype1 = parse_datatype(action, 4 + comm_size);
286 datatype2 = parse_datatype(action, 5 + comm_size);
289 unsigned disp_index = 0;
290 /* The 3 comes from "0 gather <sendcount>", which must always be present.
291 * The + comm_size is the recvcounts array, which must also be present
293 if (action.size() > 3 + comm_size + comm_size) {
294 // datatype + disp are specified
295 datatype1 = parse_datatype(action, 3 + comm_size);
296 datatype2 = parse_datatype(action, 4 + comm_size);
297 disp_index = 5 + comm_size;
298 } else if (action.size() > 3 + comm_size + 2) {
299 // disps specified; datatype is not specified; use the default one
300 datatype1 = MPI_DEFAULT_TYPE;
301 datatype2 = MPI_DEFAULT_TYPE;
302 disp_index = 3 + comm_size;
304 // no disp specified, maybe only datatype,
305 datatype1 = parse_datatype(action, 3 + comm_size);
306 datatype2 = parse_datatype(action, 4 + comm_size);
309 if (disp_index != 0) {
310 xbt_assert(disp_index + comm_size <= action.size());
311 for (unsigned i = 0; i < comm_size; i++)
312 disps[i] = std::stoi(action[disp_index + i]);
316 for (unsigned int i = 0; i < comm_size; i++) {
317 (*recvcounts)[i] = std::stoi(action[i + 3]);
319 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
322 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
324 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
327 1) 68 is the sendcounts
328 2) 68 is the recvcounts
329 3) 0 is the root node
330 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
331 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
333 comm_size = MPI_COMM_WORLD->size();
334 CHECK_ACTION_PARAMS(action, 2, 3)
335 comm_size = MPI_COMM_WORLD->size();
336 send_size = parse_integer<int>(action[2]);
337 recv_size = parse_integer<int>(action[3]);
338 root = parse_root(action, 4);
339 datatype1 = parse_datatype(action, 5);
340 datatype2 = parse_datatype(action, 6);
343 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
345 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
346 0 gather 68 10 10 10 68 0 0 0
348 1) 68 10 10 10 is the sendcounts
349 2) 68 is the recvcount
350 3) 0 is the root node
351 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
352 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
354 comm_size = MPI_COMM_WORLD->size();
355 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
356 recv_size = parse_integer<int>(action[2 + comm_size]);
357 disps = std::vector<int>(comm_size, 0);
358 sendcounts = std::make_shared<std::vector<int>>(comm_size);
360 root = parse_root(action, 3 + comm_size);
361 datatype1 = parse_datatype(action, 4 + comm_size);
362 datatype2 = parse_datatype(action, 5 + comm_size);
364 for (unsigned int i = 0; i < comm_size; i++) {
365 (*sendcounts)[i] = std::stoi(action[i + 2]);
367 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
370 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
372 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
373 0 reducescatter 275427 275427 275427 204020 11346849 0
375 1) The first four values after the name of the action declare the recvcounts array
376 2) The value 11346849 is the amount of instructions
377 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
379 comm_size = MPI_COMM_WORLD->size();
380 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
381 comp_size = parse_double(action[2 + comm_size]);
382 recvcounts = std::make_shared<std::vector<int>>(comm_size);
383 datatype1 = parse_datatype(action, 3 + comm_size);
385 for (unsigned int i = 0; i < comm_size; i++) {
386 (*recvcounts)[i]= std::stoi(action[i + 2]);
388 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
391 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
393 CHECK_ACTION_PARAMS(action, 2, 1)
394 size = parse_integer<size_t>(action[2]);
395 comp_size = parse_double(action[3]);
396 datatype1 = parse_datatype(action, 4);
399 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
401 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
402 0 alltoallv 100 1 7 10 12 100 1 70 10 5
404 1) 100 is the size of the send buffer *sizeof(int),
405 2) 1 7 10 12 is the sendcounts array
406 3) 100*sizeof(int) is the size of the receiver buffer
407 4) 1 70 10 5 is the recvcounts array
409 comm_size = MPI_COMM_WORLD->size();
410 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
411 sendcounts = std::make_shared<std::vector<int>>(comm_size);
412 recvcounts = std::make_shared<std::vector<int>>(comm_size);
413 senddisps = std::vector<int>(comm_size, 0);
414 recvdisps = std::vector<int>(comm_size, 0);
416 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
417 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
419 send_buf_size = parse_integer<int>(action[2]);
420 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
421 for (unsigned int i = 0; i < comm_size; i++) {
422 (*sendcounts)[i] = std::stoi(action[3 + i]);
423 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
425 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
426 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
429 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
431 std::string s = boost::algorithm::join(action, " ");
432 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
433 const WaitTestParser& args = get_args();
434 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
436 if (request == MPI_REQUEST_NULL) {
437 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
442 // Must be taken before Request::wait() since the request may be set to
443 // MPI_REQUEST_NULL by Request::wait!
444 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
446 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
449 Request::wait(&request, &status);
450 if(request!=MPI_REQUEST_NULL)
451 Request::unref(&request);
452 TRACE_smpi_comm_out(get_pid());
453 if (is_wait_for_receive)
454 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
457 void SendAction::kernel(simgrid::xbt::ReplayAction&)
459 const SendOrRecvParser& args = get_args();
460 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
464 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
465 if (not TRACE_smpi_view_internals())
466 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
468 if (get_name() == "send") {
469 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
470 } else if (get_name() == "isend") {
471 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
472 req_storage.add(request);
474 xbt_die("Don't know this action, %s", get_name().c_str());
477 TRACE_smpi_comm_out(get_pid());
480 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
482 const SendOrRecvParser& args = get_args();
485 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
488 // unknown size from the receiver point of view
489 size_t arg_size = args.size;
491 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
492 arg_size = status.count;
495 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
496 if (get_name() == "recv") {
498 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
499 } else if (get_name() == "irecv") {
500 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
501 req_storage.add(request);
506 TRACE_smpi_comm_out(get_pid());
507 if (is_recv && not TRACE_smpi_view_internals()) {
508 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
509 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
513 void SendRecvAction::kernel(simgrid::xbt::ReplayAction&)
515 XBT_DEBUG("Enters SendRecv");
516 const SendRecvParser& args = get_args();
517 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
518 aid_t src_traced = MPI_COMM_WORLD->group()->actor(args.src);
519 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.dst);
525 // FIXME: Hack the way to trace this one
526 auto dst_hack = std::make_shared<std::vector<int>>();
527 auto src_hack = std::make_shared<std::vector<int>>();
528 dst_hack->push_back(dst_traced);
529 src_hack->push_back(src_traced);
530 TRACE_smpi_comm_in(my_proc_id, __func__,
531 new simgrid::instr::VarCollTIData(
532 "sendRecv", -1, args.sendcount,
533 dst_hack, args.recvcount, src_hack,
534 simgrid::smpi::Datatype::encode(args.datatype1), simgrid::smpi::Datatype::encode(args.datatype2)));
536 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, sendtag, args.sendcount * args.datatype1->size());
538 simgrid::smpi::Request::sendrecv(nullptr, args.sendcount, args.datatype1, args.dst, sendtag, nullptr, args.recvcount, args.datatype2, args.src,
539 recvtag, MPI_COMM_WORLD, &status);
541 TRACE_smpi_recv(src_traced, my_proc_id, recvtag);
542 TRACE_smpi_comm_out(my_proc_id);
543 XBT_DEBUG("Exits SendRecv");
547 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
549 const ComputeParser& args = get_args();
550 if (smpi_cfg_simulate_computation()) {
551 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
555 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
557 const SleepParser& args = get_args();
558 XBT_DEBUG("Sleep for: %lf secs", args.time);
559 aid_t pid = simgrid::s4u::this_actor::get_pid();
560 TRACE_smpi_sleeping_in(pid, args.time);
561 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
562 TRACE_smpi_sleeping_out(pid);
565 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
567 const LocationParser& args = get_args();
568 smpi_trace_set_call_location(args.filename.c_str(), args.line);
571 void TestAction::kernel(simgrid::xbt::ReplayAction&)
573 const WaitTestParser& args = get_args();
574 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
575 // if request is null here, this may mean that a previous test has succeeded
576 // Different times in traced application and replayed version may lead to this
577 // In this case, ignore the extra calls.
578 if (request != MPI_REQUEST_NULL) {
579 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
583 Request::test(&request, &status, &flag);
585 XBT_DEBUG("MPI_Test result: %d", flag);
586 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
588 if (request == MPI_REQUEST_NULL)
589 req_storage.addNullRequest(args.src, args.dst, args.tag);
591 req_storage.add(request);
593 TRACE_smpi_comm_out(get_pid());
597 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
599 CHECK_ACTION_PARAMS(action, 0, 1)
600 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
601 : MPI_BYTE; // default TAU datatype
603 /* start a simulated timer */
604 smpi_process()->simulated_start();
607 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
612 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
614 if (req_storage.size() > 0) {
615 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
616 std::vector<MPI_Request> reqs;
617 req_storage.get_requests(reqs);
618 unsigned long count_requests = reqs.size();
619 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
620 for (auto const& req : reqs) {
621 if (req && (req->flags() & MPI_REQ_RECV)) {
622 sender_receiver.emplace_back(req->src(), req->dst());
625 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
626 req_storage.get_store().clear();
628 for (MPI_Request& req : reqs)
629 if (req != MPI_REQUEST_NULL)
630 Request::unref(&req);
632 for (auto const& [src, dst] : sender_receiver) {
633 TRACE_smpi_recv(src, dst, 0);
635 TRACE_smpi_comm_out(get_pid());
639 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
641 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
642 colls::barrier(MPI_COMM_WORLD);
643 TRACE_smpi_comm_out(get_pid());
646 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
648 const BcastArgParser& args = get_args();
649 TRACE_smpi_comm_in(get_pid(), "action_bcast",
650 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
651 0, Datatype::encode(args.datatype1), ""));
653 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
655 TRACE_smpi_comm_out(get_pid());
658 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
660 const ReduceArgParser& args = get_args();
661 TRACE_smpi_comm_in(get_pid(), "action_reduce",
662 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
663 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
665 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
666 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
667 args.root, MPI_COMM_WORLD);
668 if (args.comp_size != 0.0)
669 simgrid::s4u::this_actor::exec_init(args.comp_size)
670 ->set_name("computation")
674 TRACE_smpi_comm_out(get_pid());
677 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
679 const AllReduceArgParser& args = get_args();
680 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
681 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
682 Datatype::encode(args.datatype1), ""));
684 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
685 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
687 if (args.comp_size != 0.0)
688 simgrid::s4u::this_actor::exec_init(args.comp_size)
689 ->set_name("computation")
693 TRACE_smpi_comm_out(get_pid());
696 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
698 const AllToAllArgParser& args = get_args();
699 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
700 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
701 Datatype::encode(args.datatype1),
702 Datatype::encode(args.datatype2)));
704 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
705 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
708 TRACE_smpi_comm_out(get_pid());
711 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
713 const GatherArgParser& args = get_args();
714 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
715 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
716 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
717 Datatype::encode(args.datatype2)));
719 if (get_name() == "gather") {
720 int rank = MPI_COMM_WORLD->rank();
721 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
722 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
723 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
725 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
726 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
729 TRACE_smpi_comm_out(get_pid());
732 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
734 int rank = MPI_COMM_WORLD->rank();
735 const GatherVArgParser& args = get_args();
736 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
737 new simgrid::instr::VarCollTIData(
738 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
739 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
741 if (get_name() == "gatherv") {
742 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
743 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
744 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
746 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
747 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
748 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
751 TRACE_smpi_comm_out(get_pid());
754 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
756 int rank = MPI_COMM_WORLD->rank();
757 const ScatterArgParser& args = get_args();
758 TRACE_smpi_comm_in(get_pid(), "action_scatter",
759 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
760 Datatype::encode(args.datatype1),
761 Datatype::encode(args.datatype2)));
763 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
764 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
765 args.datatype2, args.root, MPI_COMM_WORLD);
767 TRACE_smpi_comm_out(get_pid());
770 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
772 int rank = MPI_COMM_WORLD->rank();
773 const ScatterVArgParser& args = get_args();
774 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
775 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
776 nullptr, Datatype::encode(args.datatype1),
777 Datatype::encode(args.datatype2)));
779 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
780 args.sendcounts->data(), args.disps.data(), args.datatype1,
781 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
784 TRACE_smpi_comm_out(get_pid());
787 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
789 const ReduceScatterArgParser& args = get_args();
791 get_pid(), "action_reducescatter",
792 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
793 /* ugly as we use datatype field to pass computation as string */
794 /* and because of the trick to avoid getting 0.000000 when 0 is given */
795 args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
796 Datatype::encode(args.datatype1)));
798 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
799 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
800 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
801 if (args.comp_size != 0.0)
802 simgrid::s4u::this_actor::exec_init(args.comp_size)
803 ->set_name("computation")
806 TRACE_smpi_comm_out(get_pid());
809 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
811 const ScanArgParser& args = get_args();
812 TRACE_smpi_comm_in(get_pid(), "action_scan",
813 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
814 args.size, 0, Datatype::encode(args.datatype1), ""));
815 if (get_name() == "scan")
816 colls::scan(send_buffer(args.size * args.datatype1->size()),
817 recv_buffer(args.size * args.datatype1->size()), args.size,
818 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
820 colls::exscan(send_buffer(args.size * args.datatype1->size()),
821 recv_buffer(args.size * args.datatype1->size()), args.size,
822 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
824 if (args.comp_size != 0.0)
825 simgrid::s4u::this_actor::exec_init(args.comp_size)
826 ->set_name("computation")
829 TRACE_smpi_comm_out(get_pid());
832 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
834 const AllToAllVArgParser& args = get_args();
835 TRACE_smpi_comm_in(get_pid(), __func__,
836 new simgrid::instr::VarCollTIData(
837 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
838 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
840 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
841 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
842 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
844 TRACE_smpi_comm_out(get_pid());
846 } // namespace simgrid::smpi::replay
848 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
849 /** @brief Only initialize the replay, don't do it for real */
850 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
852 xbt_assert(not smpi_process()->initializing());
854 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
855 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
856 simgrid::smpi::ActorExt::init();
858 smpi_process()->mark_as_initialized();
859 smpi_process()->set_replaying(true);
861 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
862 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
863 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
864 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
865 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
866 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
867 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
868 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
869 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
870 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
871 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
872 xbt_replay_action_register("sendRecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendRecvAction().execute(action); });
873 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
874 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
875 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
876 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
877 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
878 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
879 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
880 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
881 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
882 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
883 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
884 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
885 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
886 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
887 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
888 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
889 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
890 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
891 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
892 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
894 //if we have a delayed start, sleep here.
895 if (start_delay_flops > 0) {
896 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
897 private_execute_flops(start_delay_flops);
899 // Wait for the other actors to initialize also
900 simgrid::s4u::this_actor::yield();
902 if(_smpi_init_sleep > 0)
903 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
906 /** @brief actually run the replay after initialization */
907 void smpi_replay_main(int rank, const char* private_trace_filename)
909 static int active_processes = 0;
911 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
912 std::string rank_string = std::to_string(rank);
913 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
915 /* and now, finalize everything */
916 /* One active process will stop. Decrease the counter*/
917 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
918 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
919 if (count_requests > 0) {
920 std::vector<MPI_Request> requests(count_requests);
923 for (auto const& [_, reqs] : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
924 for (auto& req : reqs) {
925 requests[i] = req; // FIXME: overwritten at each iteration?
929 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
932 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
933 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
937 if(active_processes==0){
938 /* Last process alive speaking: end the simulated timer */
939 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
940 smpi_free_replay_tmp_buffers();
943 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
944 new simgrid::instr::NoOpTIData("finalize"));
946 smpi_process()->finalize();
948 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
951 /** @brief chain a replay initialization and a replay start */
952 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
954 smpi_replay_init(instance_id, rank, start_delay_flops);
955 smpi_replay_main(rank, private_trace_filename);