1 /* Copyright (c) 2009-2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
18 #include <boost/lexical_cast.hpp>
24 #include <unordered_map>
27 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
28 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
29 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
30 // this could go into a header file.
31 namespace hash_tuple {
32 template <typename TT> class hash {
34 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
37 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
39 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
42 // Recursive template code derived from Matthieu M.
43 template <class Tuple, size_t Index = std::tuple_size_v<Tuple> - 1> class HashValueImpl {
45 static void apply(size_t& seed, Tuple const& tuple)
47 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
48 hash_combine(seed, std::get<Index>(tuple));
52 template <class Tuple> class HashValueImpl<Tuple, 0> {
54 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
57 template <typename... TT> class hash<std::tuple<TT...>> {
59 size_t operator()(std::tuple<TT...> const& tt) const
62 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
68 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
70 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
71 std::string s = boost::algorithm::join(action, " ");
72 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 /* Helper functions */
77 static double parse_double(const std::string& string)
80 return boost::lexical_cast<double>(string);
81 } catch (boost::bad_lexical_cast const&) {
82 throw std::invalid_argument("not a double: " + string);
86 template <typename T> static T parse_integer(const std::string& string)
88 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
89 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
90 val <= static_cast<double>(std::numeric_limits<T>::max()),
91 "out of range: %g", val);
92 return static_cast<T>(val);
95 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
97 return i < action.size() ? std::stoi(action[i]) : 0;
100 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
102 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
105 namespace simgrid::smpi::replay {
106 MPI_Datatype MPI_DEFAULT_TYPE;
108 class RequestStorage {
110 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
111 using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
116 RequestStorage() = default;
117 size_t size() const { return store.size(); }
119 req_storage_t& get_store() { return store; }
121 void get_requests(std::vector<MPI_Request>& vec) const
123 for (auto const& [_, reqs] : store) {
124 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
125 for (const auto& req : reqs) {
126 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
128 req->print_request("MM");
134 MPI_Request pop(int src, int dst, int tag)
136 auto it = store.find(req_key_t(src, dst, tag));
137 if (it == store.end())
138 return MPI_REQUEST_NULL;
139 MPI_Request req = it->second.front();
140 it->second.pop_front();
141 if(it->second.empty())
142 store.erase(req_key_t(src, dst, tag));
146 void add(MPI_Request req)
148 if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
149 store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
153 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
154 void addNullRequest(int src, int dst, int tag)
156 int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
157 int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
158 store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
162 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
164 CHECK_ACTION_PARAMS(action, 3, 0)
165 src = std::stoi(action[2]);
166 dst = std::stoi(action[3]);
167 tag = std::stoi(action[4]);
170 void SendOrRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
172 CHECK_ACTION_PARAMS(action, 3, 1)
173 partner = std::stoi(action[2]);
174 tag = std::stoi(action[3]);
175 size = parse_integer<ssize_t>(action[4]);
176 datatype1 = parse_datatype(action, 5);
179 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
181 CHECK_ACTION_PARAMS(action, 1, 0)
182 flops = parse_double(action[2]);
185 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
187 CHECK_ACTION_PARAMS(action, 1, 0)
188 time = parse_double(action[2]);
191 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
193 CHECK_ACTION_PARAMS(action, 2, 0)
194 filename = action[2];
195 line = std::stoi(action[3]);
198 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
200 CHECK_ACTION_PARAMS(action, 6, 0)
201 sendcount = parse_integer<int>(action[2]);
202 dst = std::stoi(action[3]);
203 recvcount = parse_integer<int>(action[4]);
204 src = std::stoi(action[5]);
205 datatype1 = parse_datatype(action, 6);
206 datatype2 = parse_datatype(action, 7);
209 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 CHECK_ACTION_PARAMS(action, 1, 2)
212 size = parse_integer<size_t>(action[2]);
213 root = parse_root(action, 3);
214 datatype1 = parse_datatype(action, 4);
217 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
219 CHECK_ACTION_PARAMS(action, 2, 2)
220 comm_size = parse_integer<unsigned>(action[2]);
221 comp_size = parse_double(action[3]);
222 root = parse_root(action, 4);
223 datatype1 = parse_datatype(action, 5);
226 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
228 CHECK_ACTION_PARAMS(action, 2, 1)
229 comm_size = parse_integer<unsigned>(action[2]);
230 comp_size = parse_double(action[3]);
231 datatype1 = parse_datatype(action, 4);
234 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
236 CHECK_ACTION_PARAMS(action, 2, 1)
237 comm_size = MPI_COMM_WORLD->size();
238 send_size = parse_integer<int>(action[2]);
239 recv_size = parse_integer<int>(action[3]);
240 datatype1 = parse_datatype(action, 4);
241 datatype2 = parse_datatype(action, 5);
244 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
246 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
249 1) 68 is the sendcounts
250 2) 68 is the recvcounts
251 3) 0 is the root node
252 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
253 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
255 CHECK_ACTION_PARAMS(action, 2, 3)
256 comm_size = MPI_COMM_WORLD->size();
257 send_size = parse_integer<int>(action[2]);
258 recv_size = parse_integer<int>(action[3]);
260 if (name == "gather") {
261 root = parse_root(action, 4);
262 datatype1 = parse_datatype(action, 5);
263 datatype2 = parse_datatype(action, 6);
266 datatype1 = parse_datatype(action, 4);
267 datatype2 = parse_datatype(action, 5);
271 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
273 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
274 0 gather 68 68 10 10 10 0 0 0
276 1) 68 is the sendcount
277 2) 68 10 10 10 is the recvcounts
278 3) 0 is the root node
279 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
280 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
282 comm_size = MPI_COMM_WORLD->size();
283 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
284 send_size = parse_integer<int>(action[2]);
285 disps = std::vector<int>(comm_size, 0);
286 recvcounts = std::make_shared<std::vector<int>>(comm_size);
288 if (name == "gatherv") {
289 root = parse_root(action, 3 + comm_size);
290 datatype1 = parse_datatype(action, 4 + comm_size);
291 datatype2 = parse_datatype(action, 5 + comm_size);
294 unsigned disp_index = 0;
295 /* The 3 comes from "0 gather <sendcount>", which must always be present.
296 * The + comm_size is the recvcounts array, which must also be present
298 if (action.size() > 3 + comm_size + comm_size) {
299 // datatype + disp are specified
300 datatype1 = parse_datatype(action, 3 + comm_size);
301 datatype2 = parse_datatype(action, 4 + comm_size);
302 disp_index = 5 + comm_size;
303 } else if (action.size() > 3 + comm_size + 2) {
304 // disps specified; datatype is not specified; use the default one
305 datatype1 = MPI_DEFAULT_TYPE;
306 datatype2 = MPI_DEFAULT_TYPE;
307 disp_index = 3 + comm_size;
309 // no disp specified, maybe only datatype,
310 datatype1 = parse_datatype(action, 3 + comm_size);
311 datatype2 = parse_datatype(action, 4 + comm_size);
314 if (disp_index != 0) {
315 xbt_assert(disp_index + comm_size <= action.size());
316 for (unsigned i = 0; i < comm_size; i++)
317 disps[i] = std::stoi(action[disp_index + i]);
321 for (unsigned int i = 0; i < comm_size; i++) {
322 (*recvcounts)[i] = std::stoi(action[i + 3]);
324 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
327 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
329 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
332 1) 68 is the sendcounts
333 2) 68 is the recvcounts
334 3) 0 is the root node
335 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
336 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
338 comm_size = MPI_COMM_WORLD->size();
339 CHECK_ACTION_PARAMS(action, 2, 3)
340 comm_size = MPI_COMM_WORLD->size();
341 send_size = parse_integer<int>(action[2]);
342 recv_size = parse_integer<int>(action[3]);
343 root = parse_root(action, 4);
344 datatype1 = parse_datatype(action, 5);
345 datatype2 = parse_datatype(action, 6);
348 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
350 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
351 0 gather 68 10 10 10 68 0 0 0
353 1) 68 10 10 10 is the sendcounts
354 2) 68 is the recvcount
355 3) 0 is the root node
356 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
357 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
359 comm_size = MPI_COMM_WORLD->size();
360 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
361 recv_size = parse_integer<int>(action[2 + comm_size]);
362 disps = std::vector<int>(comm_size, 0);
363 sendcounts = std::make_shared<std::vector<int>>(comm_size);
365 root = parse_root(action, 3 + comm_size);
366 datatype1 = parse_datatype(action, 4 + comm_size);
367 datatype2 = parse_datatype(action, 5 + comm_size);
369 for (unsigned int i = 0; i < comm_size; i++) {
370 (*sendcounts)[i] = std::stoi(action[i + 2]);
372 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
375 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
377 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
378 0 reducescatter 275427 275427 275427 204020 11346849 0
380 1) The first four values after the name of the action declare the recvcounts array
381 2) The value 11346849 is the amount of instructions
382 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
384 comm_size = MPI_COMM_WORLD->size();
385 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
386 comp_size = parse_double(action[2 + comm_size]);
387 recvcounts = std::make_shared<std::vector<int>>(comm_size);
388 datatype1 = parse_datatype(action, 3 + comm_size);
390 for (unsigned int i = 0; i < comm_size; i++) {
391 (*recvcounts)[i]= std::stoi(action[i + 2]);
393 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
396 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
398 CHECK_ACTION_PARAMS(action, 2, 1)
399 size = parse_integer<size_t>(action[2]);
400 comp_size = parse_double(action[3]);
401 datatype1 = parse_datatype(action, 4);
404 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
406 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
407 0 alltoallv 100 1 7 10 12 100 1 70 10 5
409 1) 100 is the size of the send buffer *sizeof(int),
410 2) 1 7 10 12 is the sendcounts array
411 3) 100*sizeof(int) is the size of the receiver buffer
412 4) 1 70 10 5 is the recvcounts array
414 comm_size = MPI_COMM_WORLD->size();
415 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
416 sendcounts = std::make_shared<std::vector<int>>(comm_size);
417 recvcounts = std::make_shared<std::vector<int>>(comm_size);
418 senddisps = std::vector<int>(comm_size, 0);
419 recvdisps = std::vector<int>(comm_size, 0);
421 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
422 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
424 send_buf_size = parse_integer<int>(action[2]);
425 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
426 for (unsigned int i = 0; i < comm_size; i++) {
427 (*sendcounts)[i] = std::stoi(action[3 + i]);
428 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
430 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
431 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
434 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
436 std::string s = boost::algorithm::join(action, " ");
437 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
438 const WaitTestParser& args = get_args();
439 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
441 if (request == MPI_REQUEST_NULL) {
442 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
447 // Must be taken before Request::wait() since the request may be set to
448 // MPI_REQUEST_NULL by Request::wait!
449 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
451 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
454 Request::wait(&request, &status);
455 if(request!=MPI_REQUEST_NULL)
456 Request::unref(&request);
457 TRACE_smpi_comm_out(get_pid());
458 if (is_wait_for_receive)
459 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
462 void SendAction::kernel(simgrid::xbt::ReplayAction&)
464 const SendOrRecvParser& args = get_args();
465 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
469 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
470 if (not TRACE_smpi_view_internals())
471 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
473 if (get_name() == "send") {
474 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
475 } else if (get_name() == "isend") {
476 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
477 req_storage.add(request);
479 xbt_die("Don't know this action, %s", get_name().c_str());
482 TRACE_smpi_comm_out(get_pid());
485 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
487 const SendOrRecvParser& args = get_args();
490 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
493 // unknown size from the receiver point of view
494 ssize_t arg_size = args.size;
496 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
497 arg_size = status.count;
500 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
501 if (get_name() == "recv") {
503 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
504 } else if (get_name() == "irecv") {
505 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
506 req_storage.add(request);
511 TRACE_smpi_comm_out(get_pid());
512 if (is_recv && not TRACE_smpi_view_internals()) {
513 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
514 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
518 void SendRecvAction::kernel(simgrid::xbt::ReplayAction&)
520 XBT_DEBUG("Enters SendRecv");
521 const SendRecvParser& args = get_args();
522 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
523 aid_t src_traced = MPI_COMM_WORLD->group()->actor(args.src);
524 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.dst);
530 // FIXME: Hack the way to trace this one
531 auto dst_hack = std::make_shared<std::vector<int>>();
532 auto src_hack = std::make_shared<std::vector<int>>();
533 dst_hack->push_back(dst_traced);
534 src_hack->push_back(src_traced);
535 TRACE_smpi_comm_in(my_proc_id, __func__,
536 new simgrid::instr::VarCollTIData(
537 "sendRecv", -1, args.sendcount,
538 dst_hack, args.recvcount, src_hack,
539 simgrid::smpi::Datatype::encode(args.datatype1), simgrid::smpi::Datatype::encode(args.datatype2)));
541 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, sendtag, args.sendcount * args.datatype1->size());
543 simgrid::smpi::Request::sendrecv(nullptr, args.sendcount, args.datatype1, args.dst, sendtag, nullptr, args.recvcount, args.datatype2, args.src,
544 recvtag, MPI_COMM_WORLD, &status);
546 TRACE_smpi_recv(src_traced, my_proc_id, recvtag);
547 TRACE_smpi_comm_out(my_proc_id);
548 XBT_DEBUG("Exits SendRecv");
551 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
553 const ComputeParser& args = get_args();
554 if (smpi_cfg_simulate_computation()) {
555 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
559 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
561 const SleepParser& args = get_args();
562 XBT_DEBUG("Sleep for: %lf secs", args.time);
563 aid_t pid = simgrid::s4u::this_actor::get_pid();
564 TRACE_smpi_sleeping_in(pid, args.time);
565 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
566 TRACE_smpi_sleeping_out(pid);
569 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
571 const LocationParser& args = get_args();
572 smpi_trace_set_call_location(args.filename.c_str(), args.line, "replay_action");
575 void TestAction::kernel(simgrid::xbt::ReplayAction&)
577 const WaitTestParser& args = get_args();
578 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
579 // if request is null here, this may mean that a previous test has succeeded
580 // Different times in traced application and replayed version may lead to this
581 // In this case, ignore the extra calls.
582 if (request != MPI_REQUEST_NULL) {
583 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
587 Request::test(&request, &status, &flag);
589 XBT_DEBUG("MPI_Test result: %d", flag);
590 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
592 if (request == MPI_REQUEST_NULL)
593 req_storage.addNullRequest(args.src, args.dst, args.tag);
595 req_storage.add(request);
597 TRACE_smpi_comm_out(get_pid());
601 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
603 CHECK_ACTION_PARAMS(action, 0, 1)
604 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
605 : MPI_BYTE; // default TAU datatype
607 /* start a simulated timer */
608 smpi_process()->simulated_start();
611 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
616 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
618 if (req_storage.size() > 0) {
619 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
620 std::vector<MPI_Request> reqs;
621 req_storage.get_requests(reqs);
622 unsigned long count_requests = reqs.size();
623 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
624 for (auto const& req : reqs) {
625 if (req && (req->flags() & MPI_REQ_RECV)) {
626 sender_receiver.emplace_back(req->src(), req->dst());
629 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
630 req_storage.get_store().clear();
632 for (MPI_Request& req : reqs)
633 if (req != MPI_REQUEST_NULL)
634 Request::unref(&req);
636 for (auto const& [src, dst] : sender_receiver) {
637 TRACE_smpi_recv(src, dst, 0);
639 TRACE_smpi_comm_out(get_pid());
643 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
645 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
646 colls::barrier(MPI_COMM_WORLD);
647 TRACE_smpi_comm_out(get_pid());
650 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
652 const BcastArgParser& args = get_args();
653 TRACE_smpi_comm_in(get_pid(), "action_bcast",
654 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
655 0, Datatype::encode(args.datatype1), ""));
657 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
659 TRACE_smpi_comm_out(get_pid());
662 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
664 const ReduceArgParser& args = get_args();
665 TRACE_smpi_comm_in(get_pid(), "action_reduce",
666 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
667 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
669 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
670 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
671 args.root, MPI_COMM_WORLD);
672 if (args.comp_size != 0.0)
673 simgrid::s4u::this_actor::exec_init(args.comp_size)
674 ->set_name("computation")
678 TRACE_smpi_comm_out(get_pid());
681 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
683 const AllReduceArgParser& args = get_args();
684 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
685 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
686 Datatype::encode(args.datatype1), ""));
688 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
689 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
691 if (args.comp_size != 0.0)
692 simgrid::s4u::this_actor::exec_init(args.comp_size)
693 ->set_name("computation")
697 TRACE_smpi_comm_out(get_pid());
700 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
702 const AllToAllArgParser& args = get_args();
703 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
704 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
705 Datatype::encode(args.datatype1),
706 Datatype::encode(args.datatype2)));
708 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
709 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
712 TRACE_smpi_comm_out(get_pid());
715 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
717 const GatherArgParser& args = get_args();
718 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
719 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
720 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
721 Datatype::encode(args.datatype2)));
723 if (get_name() == "gather") {
724 int rank = MPI_COMM_WORLD->rank();
725 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
726 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
727 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
729 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
730 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
733 TRACE_smpi_comm_out(get_pid());
736 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
738 int rank = MPI_COMM_WORLD->rank();
739 const GatherVArgParser& args = get_args();
740 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
741 new simgrid::instr::VarCollTIData(
742 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
743 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
745 if (get_name() == "gatherv") {
746 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
747 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
748 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
750 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
751 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
752 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
755 TRACE_smpi_comm_out(get_pid());
758 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
760 int rank = MPI_COMM_WORLD->rank();
761 const ScatterArgParser& args = get_args();
762 TRACE_smpi_comm_in(get_pid(), "action_scatter",
763 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
764 Datatype::encode(args.datatype1),
765 Datatype::encode(args.datatype2)));
767 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
768 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
769 args.datatype2, args.root, MPI_COMM_WORLD);
771 TRACE_smpi_comm_out(get_pid());
774 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
776 int rank = MPI_COMM_WORLD->rank();
777 const ScatterVArgParser& args = get_args();
778 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
779 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
780 nullptr, Datatype::encode(args.datatype1),
781 Datatype::encode(args.datatype2)));
783 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
784 args.sendcounts->data(), args.disps.data(), args.datatype1,
785 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
788 TRACE_smpi_comm_out(get_pid());
791 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
793 const ReduceScatterArgParser& args = get_args();
795 get_pid(), "action_reducescatter",
796 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
797 /* ugly as we use datatype field to pass computation as string */
798 /* and because of the trick to avoid getting 0.000000 when 0 is given */
799 args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
800 Datatype::encode(args.datatype1)));
802 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
803 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
804 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
805 if (args.comp_size != 0.0)
806 simgrid::s4u::this_actor::exec_init(args.comp_size)
807 ->set_name("computation")
810 TRACE_smpi_comm_out(get_pid());
813 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
815 const ScanArgParser& args = get_args();
816 TRACE_smpi_comm_in(get_pid(), "action_scan",
817 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
818 args.size, 0, Datatype::encode(args.datatype1), ""));
819 if (get_name() == "scan")
820 colls::scan(send_buffer(args.size * args.datatype1->size()),
821 recv_buffer(args.size * args.datatype1->size()), args.size,
822 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
824 colls::exscan(send_buffer(args.size * args.datatype1->size()),
825 recv_buffer(args.size * args.datatype1->size()), args.size,
826 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
828 if (args.comp_size != 0.0)
829 simgrid::s4u::this_actor::exec_init(args.comp_size)
830 ->set_name("computation")
833 TRACE_smpi_comm_out(get_pid());
836 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
838 const AllToAllVArgParser& args = get_args();
839 TRACE_smpi_comm_in(get_pid(), __func__,
840 new simgrid::instr::VarCollTIData(
841 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
842 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
844 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
845 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
846 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
848 TRACE_smpi_comm_out(get_pid());
850 } // namespace simgrid::smpi::replay
852 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
853 /** @brief Only initialize the replay, don't do it for real */
854 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
856 xbt_assert(not smpi_process()->initializing());
858 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
859 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
860 simgrid::smpi::ActorExt::init();
862 smpi_process()->mark_as_initialized();
863 smpi_process()->set_replaying(true);
865 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
866 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
867 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
868 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
869 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
870 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
871 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
872 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
873 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
874 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
875 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
876 xbt_replay_action_register("sendRecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendRecvAction().execute(action); });
877 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
878 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
879 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
880 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
881 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
882 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
883 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
884 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
885 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
886 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
887 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
888 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
889 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
890 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
891 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
892 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
893 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
894 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
895 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
896 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
898 //if we have a delayed start, sleep here.
899 if (start_delay_flops > 0) {
900 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
901 private_execute_flops(start_delay_flops);
903 // Wait for the other actors to initialize also
904 simgrid::s4u::this_actor::yield();
906 if(_smpi_init_sleep > 0)
907 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
910 /** @brief actually run the replay after initialization */
911 void smpi_replay_main(int rank, const char* private_trace_filename)
913 static int active_processes = 0;
915 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
916 std::string rank_string = std::to_string(rank);
917 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
919 /* and now, finalize everything */
920 /* One active process will stop. Decrease the counter*/
921 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
922 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
923 if (count_requests > 0) {
924 std::vector<MPI_Request> requests(count_requests);
927 for (auto const& [_, reqs] : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
928 for (const auto& req : reqs) {
929 requests[i] = req; // FIXME: overwritten at each iteration?
933 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
936 if (simgrid::config::get_value<bool>("smpi/barrier-finalization"))
937 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
941 if(active_processes==0){
942 /* Last process alive speaking: end the simulated timer */
943 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
944 smpi_free_replay_tmp_buffers();
947 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
948 new simgrid::instr::NoOpTIData("finalize"));
950 smpi_process()->finalize();
952 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
955 /** @brief chain a replay initialization and a replay start */
956 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
958 smpi_replay_init(instance_id, rank, start_delay_flops);
959 smpi_replay_main(rank, private_trace_filename);