1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
23 #include <unordered_map>
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
33 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
38 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
44 static void apply(size_t& seed, Tuple const& tuple)
46 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47 hash_combine(seed, std::get<Index>(tuple));
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
53 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
56 template <typename... TT> class hash<std::tuple<TT...>> {
58 size_t operator()(std::tuple<TT...> const& tt) const
61 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
75 /* Helper functions */
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "not a double");
81 template <typename T> static T parse_integer(const std::string& string)
83 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85 val <= static_cast<double>(std::numeric_limits<T>::max()),
86 "out of range: %g", val);
87 return static_cast<T>(val);
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
92 return i < action.size() ? std::stoi(action[i]) : 0;
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
97 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
100 namespace simgrid::smpi::replay {
101 MPI_Datatype MPI_DEFAULT_TYPE;
103 class RequestStorage {
105 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
106 using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
111 RequestStorage() = default;
112 size_t size() const { return store.size(); }
114 req_storage_t& get_store() { return store; }
116 void get_requests(std::vector<MPI_Request>& vec) const
118 for (auto const& [_, reqs] : store) {
119 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
120 for (auto& req: reqs){
121 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123 req->print_request("MM");
129 MPI_Request pop(int src, int dst, int tag)
131 auto it = store.find(req_key_t(src, dst, tag));
132 if (it == store.end())
133 return MPI_REQUEST_NULL;
134 MPI_Request req = it->second.front();
135 it->second.pop_front();
136 if(it->second.empty())
137 store.erase(req_key_t(src, dst, tag));
141 void add(MPI_Request req)
143 if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
144 store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
148 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149 void addNullRequest(int src, int dst, int tag)
151 int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
152 int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
153 store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
157 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
159 CHECK_ACTION_PARAMS(action, 3, 0)
160 src = std::stoi(action[2]);
161 dst = std::stoi(action[3]);
162 tag = std::stoi(action[4]);
165 void SendOrRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
167 CHECK_ACTION_PARAMS(action, 3, 1)
168 partner = std::stoi(action[2]);
169 tag = std::stoi(action[3]);
170 size = parse_integer<ssize_t>(action[4]);
171 datatype1 = parse_datatype(action, 5);
174 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 1, 0)
177 flops = parse_double(action[2]);
180 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 CHECK_ACTION_PARAMS(action, 1, 0)
183 time = parse_double(action[2]);
186 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
188 CHECK_ACTION_PARAMS(action, 2, 0)
189 filename = std::string(action[2]);
190 line = std::stoi(action[3]);
193 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
195 CHECK_ACTION_PARAMS(action, 6, 0)
196 sendcount = parse_integer<int>(action[2]);
197 dst = std::stoi(action[3]);
198 recvcount = parse_integer<int>(action[4]);
199 src = std::stoi(action[5]);
200 datatype1 = parse_datatype(action, 6);
201 datatype2 = parse_datatype(action, 7);
204 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
206 CHECK_ACTION_PARAMS(action, 1, 2)
207 size = parse_integer<size_t>(action[2]);
208 root = parse_root(action, 3);
209 datatype1 = parse_datatype(action, 4);
212 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
214 CHECK_ACTION_PARAMS(action, 2, 2)
215 comm_size = parse_integer<unsigned>(action[2]);
216 comp_size = parse_double(action[3]);
217 root = parse_root(action, 4);
218 datatype1 = parse_datatype(action, 5);
221 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
223 CHECK_ACTION_PARAMS(action, 2, 1)
224 comm_size = parse_integer<unsigned>(action[2]);
225 comp_size = parse_double(action[3]);
226 datatype1 = parse_datatype(action, 4);
229 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
231 CHECK_ACTION_PARAMS(action, 2, 1)
232 comm_size = MPI_COMM_WORLD->size();
233 send_size = parse_integer<int>(action[2]);
234 recv_size = parse_integer<int>(action[3]);
235 datatype1 = parse_datatype(action, 4);
236 datatype2 = parse_datatype(action, 5);
239 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
241 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
244 1) 68 is the sendcounts
245 2) 68 is the recvcounts
246 3) 0 is the root node
247 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
250 CHECK_ACTION_PARAMS(action, 2, 3)
251 comm_size = MPI_COMM_WORLD->size();
252 send_size = parse_integer<int>(action[2]);
253 recv_size = parse_integer<int>(action[3]);
255 if (name == "gather") {
256 root = parse_root(action, 4);
257 datatype1 = parse_datatype(action, 5);
258 datatype2 = parse_datatype(action, 6);
261 datatype1 = parse_datatype(action, 4);
262 datatype2 = parse_datatype(action, 5);
266 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
268 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
269 0 gather 68 68 10 10 10 0 0 0
271 1) 68 is the sendcount
272 2) 68 10 10 10 is the recvcounts
273 3) 0 is the root node
274 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
275 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
277 comm_size = MPI_COMM_WORLD->size();
278 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
279 send_size = parse_integer<int>(action[2]);
280 disps = std::vector<int>(comm_size, 0);
281 recvcounts = std::make_shared<std::vector<int>>(comm_size);
283 if (name == "gatherv") {
284 root = parse_root(action, 3 + comm_size);
285 datatype1 = parse_datatype(action, 4 + comm_size);
286 datatype2 = parse_datatype(action, 5 + comm_size);
289 unsigned disp_index = 0;
290 /* The 3 comes from "0 gather <sendcount>", which must always be present.
291 * The + comm_size is the recvcounts array, which must also be present
293 if (action.size() > 3 + comm_size + comm_size) {
294 // datatype + disp are specified
295 datatype1 = parse_datatype(action, 3 + comm_size);
296 datatype2 = parse_datatype(action, 4 + comm_size);
297 disp_index = 5 + comm_size;
298 } else if (action.size() > 3 + comm_size + 2) {
299 // disps specified; datatype is not specified; use the default one
300 datatype1 = MPI_DEFAULT_TYPE;
301 datatype2 = MPI_DEFAULT_TYPE;
302 disp_index = 3 + comm_size;
304 // no disp specified, maybe only datatype,
305 datatype1 = parse_datatype(action, 3 + comm_size);
306 datatype2 = parse_datatype(action, 4 + comm_size);
309 if (disp_index != 0) {
310 xbt_assert(disp_index + comm_size <= action.size());
311 for (unsigned i = 0; i < comm_size; i++)
312 disps[i] = std::stoi(action[disp_index + i]);
316 for (unsigned int i = 0; i < comm_size; i++) {
317 (*recvcounts)[i] = std::stoi(action[i + 3]);
319 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
322 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
324 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
327 1) 68 is the sendcounts
328 2) 68 is the recvcounts
329 3) 0 is the root node
330 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
331 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
333 comm_size = MPI_COMM_WORLD->size();
334 CHECK_ACTION_PARAMS(action, 2, 3)
335 comm_size = MPI_COMM_WORLD->size();
336 send_size = parse_integer<int>(action[2]);
337 recv_size = parse_integer<int>(action[3]);
338 root = parse_root(action, 4);
339 datatype1 = parse_datatype(action, 5);
340 datatype2 = parse_datatype(action, 6);
343 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
345 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
346 0 gather 68 10 10 10 68 0 0 0
348 1) 68 10 10 10 is the sendcounts
349 2) 68 is the recvcount
350 3) 0 is the root node
351 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
352 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
354 comm_size = MPI_COMM_WORLD->size();
355 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
356 recv_size = parse_integer<int>(action[2 + comm_size]);
357 disps = std::vector<int>(comm_size, 0);
358 sendcounts = std::make_shared<std::vector<int>>(comm_size);
360 root = parse_root(action, 3 + comm_size);
361 datatype1 = parse_datatype(action, 4 + comm_size);
362 datatype2 = parse_datatype(action, 5 + comm_size);
364 for (unsigned int i = 0; i < comm_size; i++) {
365 (*sendcounts)[i] = std::stoi(action[i + 2]);
367 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
370 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
372 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
373 0 reducescatter 275427 275427 275427 204020 11346849 0
375 1) The first four values after the name of the action declare the recvcounts array
376 2) The value 11346849 is the amount of instructions
377 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
379 comm_size = MPI_COMM_WORLD->size();
380 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
381 comp_size = parse_double(action[2 + comm_size]);
382 recvcounts = std::make_shared<std::vector<int>>(comm_size);
383 datatype1 = parse_datatype(action, 3 + comm_size);
385 for (unsigned int i = 0; i < comm_size; i++) {
386 (*recvcounts)[i]= std::stoi(action[i + 2]);
388 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
391 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
393 CHECK_ACTION_PARAMS(action, 2, 1)
394 size = parse_integer<size_t>(action[2]);
395 comp_size = parse_double(action[3]);
396 datatype1 = parse_datatype(action, 4);
399 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
401 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
402 0 alltoallv 100 1 7 10 12 100 1 70 10 5
404 1) 100 is the size of the send buffer *sizeof(int),
405 2) 1 7 10 12 is the sendcounts array
406 3) 100*sizeof(int) is the size of the receiver buffer
407 4) 1 70 10 5 is the recvcounts array
409 comm_size = MPI_COMM_WORLD->size();
410 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
411 sendcounts = std::make_shared<std::vector<int>>(comm_size);
412 recvcounts = std::make_shared<std::vector<int>>(comm_size);
413 senddisps = std::vector<int>(comm_size, 0);
414 recvdisps = std::vector<int>(comm_size, 0);
416 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
417 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
419 send_buf_size = parse_integer<int>(action[2]);
420 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
421 for (unsigned int i = 0; i < comm_size; i++) {
422 (*sendcounts)[i] = std::stoi(action[3 + i]);
423 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
425 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
426 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
429 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
431 std::string s = boost::algorithm::join(action, " ");
432 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
433 const WaitTestParser& args = get_args();
434 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
436 if (request == MPI_REQUEST_NULL) {
437 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
442 // Must be taken before Request::wait() since the request may be set to
443 // MPI_REQUEST_NULL by Request::wait!
444 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
446 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
449 Request::wait(&request, &status);
450 if(request!=MPI_REQUEST_NULL)
451 Request::unref(&request);
452 TRACE_smpi_comm_out(get_pid());
453 if (is_wait_for_receive)
454 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
457 void SendAction::kernel(simgrid::xbt::ReplayAction&)
459 const SendOrRecvParser& args = get_args();
460 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
464 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
465 if (not TRACE_smpi_view_internals())
466 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
468 if (get_name() == "send") {
469 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
470 } else if (get_name() == "isend") {
471 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
472 req_storage.add(request);
474 xbt_die("Don't know this action, %s", get_name().c_str());
477 TRACE_smpi_comm_out(get_pid());
480 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
482 const SendOrRecvParser& args = get_args();
485 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
488 // unknown size from the receiver point of view
489 ssize_t arg_size = args.size;
491 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
492 arg_size = status.count;
495 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
496 if (get_name() == "recv") {
498 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
499 } else if (get_name() == "irecv") {
500 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
501 req_storage.add(request);
506 TRACE_smpi_comm_out(get_pid());
507 if (is_recv && not TRACE_smpi_view_internals()) {
508 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
509 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
513 void SendRecvAction::kernel(simgrid::xbt::ReplayAction&)
515 XBT_DEBUG("Enters SendRecv");
516 const SendRecvParser& args = get_args();
517 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
518 aid_t src_traced = MPI_COMM_WORLD->group()->actor(args.src);
519 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.dst);
525 // FIXME: Hack the way to trace this one
526 auto dst_hack = std::make_shared<std::vector<int>>();
527 auto src_hack = std::make_shared<std::vector<int>>();
528 dst_hack->push_back(dst_traced);
529 src_hack->push_back(src_traced);
530 TRACE_smpi_comm_in(my_proc_id, __func__,
531 new simgrid::instr::VarCollTIData(
532 "sendRecv", -1, args.sendcount,
533 dst_hack, args.recvcount, src_hack,
534 simgrid::smpi::Datatype::encode(args.datatype1), simgrid::smpi::Datatype::encode(args.datatype2)));
536 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, sendtag, args.sendcount * args.datatype1->size());
538 simgrid::smpi::Request::sendrecv(nullptr, args.sendcount, args.datatype1, args.dst, sendtag, nullptr, args.recvcount, args.datatype2, args.src,
539 recvtag, MPI_COMM_WORLD, &status);
541 TRACE_smpi_recv(src_traced, my_proc_id, recvtag);
542 TRACE_smpi_comm_out(my_proc_id);
543 XBT_DEBUG("Exits SendRecv");
546 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
548 const ComputeParser& args = get_args();
549 if (smpi_cfg_simulate_computation()) {
550 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
554 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
556 const SleepParser& args = get_args();
557 XBT_DEBUG("Sleep for: %lf secs", args.time);
558 aid_t pid = simgrid::s4u::this_actor::get_pid();
559 TRACE_smpi_sleeping_in(pid, args.time);
560 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
561 TRACE_smpi_sleeping_out(pid);
564 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
566 const LocationParser& args = get_args();
567 smpi_trace_set_call_location(args.filename.c_str(), args.line);
570 void TestAction::kernel(simgrid::xbt::ReplayAction&)
572 const WaitTestParser& args = get_args();
573 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
574 // if request is null here, this may mean that a previous test has succeeded
575 // Different times in traced application and replayed version may lead to this
576 // In this case, ignore the extra calls.
577 if (request != MPI_REQUEST_NULL) {
578 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
582 Request::test(&request, &status, &flag);
584 XBT_DEBUG("MPI_Test result: %d", flag);
585 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
587 if (request == MPI_REQUEST_NULL)
588 req_storage.addNullRequest(args.src, args.dst, args.tag);
590 req_storage.add(request);
592 TRACE_smpi_comm_out(get_pid());
596 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
598 CHECK_ACTION_PARAMS(action, 0, 1)
599 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
600 : MPI_BYTE; // default TAU datatype
602 /* start a simulated timer */
603 smpi_process()->simulated_start();
606 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
611 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
613 if (req_storage.size() > 0) {
614 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
615 std::vector<MPI_Request> reqs;
616 req_storage.get_requests(reqs);
617 unsigned long count_requests = reqs.size();
618 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
619 for (auto const& req : reqs) {
620 if (req && (req->flags() & MPI_REQ_RECV)) {
621 sender_receiver.emplace_back(req->src(), req->dst());
624 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
625 req_storage.get_store().clear();
627 for (MPI_Request& req : reqs)
628 if (req != MPI_REQUEST_NULL)
629 Request::unref(&req);
631 for (auto const& [src, dst] : sender_receiver) {
632 TRACE_smpi_recv(src, dst, 0);
634 TRACE_smpi_comm_out(get_pid());
638 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
640 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
641 colls::barrier(MPI_COMM_WORLD);
642 TRACE_smpi_comm_out(get_pid());
645 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
647 const BcastArgParser& args = get_args();
648 TRACE_smpi_comm_in(get_pid(), "action_bcast",
649 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
650 0, Datatype::encode(args.datatype1), ""));
652 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
654 TRACE_smpi_comm_out(get_pid());
657 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
659 const ReduceArgParser& args = get_args();
660 TRACE_smpi_comm_in(get_pid(), "action_reduce",
661 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
662 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
664 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
665 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
666 args.root, MPI_COMM_WORLD);
667 if (args.comp_size != 0.0)
668 simgrid::s4u::this_actor::exec_init(args.comp_size)
669 ->set_name("computation")
673 TRACE_smpi_comm_out(get_pid());
676 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
678 const AllReduceArgParser& args = get_args();
679 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
680 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
681 Datatype::encode(args.datatype1), ""));
683 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
684 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
686 if (args.comp_size != 0.0)
687 simgrid::s4u::this_actor::exec_init(args.comp_size)
688 ->set_name("computation")
692 TRACE_smpi_comm_out(get_pid());
695 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
697 const AllToAllArgParser& args = get_args();
698 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
699 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
700 Datatype::encode(args.datatype1),
701 Datatype::encode(args.datatype2)));
703 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
704 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
707 TRACE_smpi_comm_out(get_pid());
710 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
712 const GatherArgParser& args = get_args();
713 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
714 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
715 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
716 Datatype::encode(args.datatype2)));
718 if (get_name() == "gather") {
719 int rank = MPI_COMM_WORLD->rank();
720 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
721 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
722 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
724 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
725 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
728 TRACE_smpi_comm_out(get_pid());
731 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
733 int rank = MPI_COMM_WORLD->rank();
734 const GatherVArgParser& args = get_args();
735 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
736 new simgrid::instr::VarCollTIData(
737 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
738 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
740 if (get_name() == "gatherv") {
741 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
742 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
743 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
745 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
746 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
747 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
750 TRACE_smpi_comm_out(get_pid());
753 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
755 int rank = MPI_COMM_WORLD->rank();
756 const ScatterArgParser& args = get_args();
757 TRACE_smpi_comm_in(get_pid(), "action_scatter",
758 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
759 Datatype::encode(args.datatype1),
760 Datatype::encode(args.datatype2)));
762 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
763 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
764 args.datatype2, args.root, MPI_COMM_WORLD);
766 TRACE_smpi_comm_out(get_pid());
769 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
771 int rank = MPI_COMM_WORLD->rank();
772 const ScatterVArgParser& args = get_args();
773 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
774 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
775 nullptr, Datatype::encode(args.datatype1),
776 Datatype::encode(args.datatype2)));
778 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
779 args.sendcounts->data(), args.disps.data(), args.datatype1,
780 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
783 TRACE_smpi_comm_out(get_pid());
786 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
788 const ReduceScatterArgParser& args = get_args();
790 get_pid(), "action_reducescatter",
791 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
792 /* ugly as we use datatype field to pass computation as string */
793 /* and because of the trick to avoid getting 0.000000 when 0 is given */
794 args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
795 Datatype::encode(args.datatype1)));
797 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
798 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
799 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
800 if (args.comp_size != 0.0)
801 simgrid::s4u::this_actor::exec_init(args.comp_size)
802 ->set_name("computation")
805 TRACE_smpi_comm_out(get_pid());
808 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
810 const ScanArgParser& args = get_args();
811 TRACE_smpi_comm_in(get_pid(), "action_scan",
812 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
813 args.size, 0, Datatype::encode(args.datatype1), ""));
814 if (get_name() == "scan")
815 colls::scan(send_buffer(args.size * args.datatype1->size()),
816 recv_buffer(args.size * args.datatype1->size()), args.size,
817 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
819 colls::exscan(send_buffer(args.size * args.datatype1->size()),
820 recv_buffer(args.size * args.datatype1->size()), args.size,
821 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
823 if (args.comp_size != 0.0)
824 simgrid::s4u::this_actor::exec_init(args.comp_size)
825 ->set_name("computation")
828 TRACE_smpi_comm_out(get_pid());
831 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
833 const AllToAllVArgParser& args = get_args();
834 TRACE_smpi_comm_in(get_pid(), __func__,
835 new simgrid::instr::VarCollTIData(
836 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
837 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
839 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
840 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
841 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
843 TRACE_smpi_comm_out(get_pid());
845 } // namespace simgrid::smpi::replay
847 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
848 /** @brief Only initialize the replay, don't do it for real */
849 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
851 xbt_assert(not smpi_process()->initializing());
853 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
854 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
855 simgrid::smpi::ActorExt::init();
857 smpi_process()->mark_as_initialized();
858 smpi_process()->set_replaying(true);
860 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
861 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
862 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
863 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
864 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
865 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
866 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
867 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
868 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
869 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
870 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
871 xbt_replay_action_register("sendRecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendRecvAction().execute(action); });
872 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
873 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
874 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
875 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
876 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
877 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
878 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
879 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
880 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
881 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
882 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
883 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
884 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
885 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
886 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
887 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
888 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
889 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
890 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
891 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
893 //if we have a delayed start, sleep here.
894 if (start_delay_flops > 0) {
895 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
896 private_execute_flops(start_delay_flops);
898 // Wait for the other actors to initialize also
899 simgrid::s4u::this_actor::yield();
901 if(_smpi_init_sleep > 0)
902 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
905 /** @brief actually run the replay after initialization */
906 void smpi_replay_main(int rank, const char* private_trace_filename)
908 static int active_processes = 0;
910 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
911 std::string rank_string = std::to_string(rank);
912 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
914 /* and now, finalize everything */
915 /* One active process will stop. Decrease the counter*/
916 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
917 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
918 if (count_requests > 0) {
919 std::vector<MPI_Request> requests(count_requests);
922 for (auto const& [_, reqs] : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
923 for (auto& req : reqs) {
924 requests[i] = req; // FIXME: overwritten at each iteration?
928 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
931 if (simgrid::config::get_value<bool>("smpi/barrier-finalization"))
932 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
936 if(active_processes==0){
937 /* Last process alive speaking: end the simulated timer */
938 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
939 smpi_free_replay_tmp_buffers();
942 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
943 new simgrid::instr::NoOpTIData("finalize"));
945 smpi_process()->finalize();
947 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
950 /** @brief chain a replay initialization and a replay start */
951 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
953 smpi_replay_init(instance_id, rank, start_delay_flops);
954 smpi_replay_main(rank, private_trace_filename);