1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
22 #include <unordered_map>
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
26 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
27 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
28 // this could go into a header file.
29 namespace hash_tuple {
30 template <typename TT> class hash {
32 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
35 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
37 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
40 // Recursive template code derived from Matthieu M.
41 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
43 static void apply(size_t& seed, Tuple const& tuple)
45 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
46 hash_combine(seed, std::get<Index>(tuple));
50 template <class Tuple> class HashValueImpl<Tuple, 0> {
52 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
55 template <typename... TT> class hash<std::tuple<TT...>> {
57 size_t operator()(std::tuple<TT...> const& tt) const
60 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
66 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
69 std::string s = boost::algorithm::join(action, " ");
70 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
74 /* Helper functions */
75 static double parse_double(const std::string& string)
77 return xbt_str_parse_double(string.c_str(), "not a double");
80 template <typename T> static T parse_integer(const std::string& string)
82 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
83 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
84 val <= static_cast<double>(std::numeric_limits<T>::max()),
85 "out of range: %g", val);
86 return static_cast<T>(val);
89 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
91 return i < action.size() ? std::stoi(action[i]) : 0;
94 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
96 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
103 MPI_Datatype MPI_DEFAULT_TYPE;
105 class RequestStorage {
107 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
108 using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
113 RequestStorage() = default;
114 size_t size() const { return store.size(); }
116 req_storage_t& get_store() { return store; }
118 void get_requests(std::vector<MPI_Request>& vec) const
120 for (auto const& pair : store) {
121 auto& reqs = pair.second;
122 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
123 for (auto& req: reqs){
124 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
126 req->print_request("MM");
132 MPI_Request pop(int src, int dst, int tag)
134 auto it = store.find(req_key_t(src, dst, tag));
135 if (it == store.end())
136 return MPI_REQUEST_NULL;
137 MPI_Request req = it->second.front();
138 it->second.pop_front();
139 if(it->second.empty())
140 store.erase(req_key_t(src, dst, tag));
144 void add(MPI_Request req)
146 if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
147 auto it = store.find(req_key_t(req->src()-1, req->dst()-1, req->tag()));
148 if (it == store.end())
149 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), std::list<MPI_Request>()});
150 store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
154 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
155 void addNullRequest(int src, int dst, int tag)
157 int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
158 int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
159 auto it = store.find(req_key_t(src_pid, dest_pid, tag));
160 if (it == store.end())
161 store.insert({req_key_t(src_pid, dest_pid, tag), std::list<MPI_Request>()});
162 store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
166 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
168 CHECK_ACTION_PARAMS(action, 3, 0)
169 src = std::stoi(action[2]);
170 dst = std::stoi(action[3]);
171 tag = std::stoi(action[4]);
174 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 3, 1)
177 partner = std::stoi(action[2]);
178 tag = std::stoi(action[3]);
179 size = parse_integer<size_t>(action[4]);
180 datatype1 = parse_datatype(action, 5);
183 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
185 CHECK_ACTION_PARAMS(action, 1, 0)
186 flops = parse_double(action[2]);
189 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
191 CHECK_ACTION_PARAMS(action, 1, 0)
192 time = parse_double(action[2]);
195 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
197 CHECK_ACTION_PARAMS(action, 2, 0)
198 filename = std::string(action[2]);
199 line = std::stoi(action[3]);
202 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
204 CHECK_ACTION_PARAMS(action, 1, 2)
205 size = parse_integer<size_t>(action[2]);
206 root = parse_root(action, 3);
207 datatype1 = parse_datatype(action, 4);
210 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
212 CHECK_ACTION_PARAMS(action, 2, 2)
213 comm_size = parse_integer<unsigned>(action[2]);
214 comp_size = parse_double(action[3]);
215 root = parse_root(action, 4);
216 datatype1 = parse_datatype(action, 5);
219 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
221 CHECK_ACTION_PARAMS(action, 2, 1)
222 comm_size = parse_integer<unsigned>(action[2]);
223 comp_size = parse_double(action[3]);
224 datatype1 = parse_datatype(action, 4);
227 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
229 CHECK_ACTION_PARAMS(action, 2, 1)
230 comm_size = MPI_COMM_WORLD->size();
231 send_size = parse_integer<int>(action[2]);
232 recv_size = parse_integer<int>(action[3]);
233 datatype1 = parse_datatype(action, 4);
234 datatype2 = parse_datatype(action, 5);
237 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
239 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
242 1) 68 is the sendcounts
243 2) 68 is the recvcounts
244 3) 0 is the root node
245 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
246 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
248 CHECK_ACTION_PARAMS(action, 2, 3)
249 comm_size = MPI_COMM_WORLD->size();
250 send_size = parse_integer<int>(action[2]);
251 recv_size = parse_integer<int>(action[3]);
253 if (name == "gather") {
254 root = parse_root(action, 4);
255 datatype1 = parse_datatype(action, 5);
256 datatype2 = parse_datatype(action, 6);
259 datatype1 = parse_datatype(action, 4);
260 datatype2 = parse_datatype(action, 5);
264 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
266 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
267 0 gather 68 68 10 10 10 0 0 0
269 1) 68 is the sendcount
270 2) 68 10 10 10 is the recvcounts
271 3) 0 is the root node
272 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
273 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
275 comm_size = MPI_COMM_WORLD->size();
276 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
277 send_size = parse_integer<int>(action[2]);
278 disps = std::vector<int>(comm_size, 0);
279 recvcounts = std::make_shared<std::vector<int>>(comm_size);
281 if (name == "gatherv") {
282 root = parse_root(action, 3 + comm_size);
283 datatype1 = parse_datatype(action, 4 + comm_size);
284 datatype2 = parse_datatype(action, 5 + comm_size);
287 unsigned disp_index = 0;
288 /* The 3 comes from "0 gather <sendcount>", which must always be present.
289 * The + comm_size is the recvcounts array, which must also be present
291 if (action.size() > 3 + comm_size + comm_size) {
292 // datatype + disp are specified
293 datatype1 = parse_datatype(action, 3 + comm_size);
294 datatype2 = parse_datatype(action, 4 + comm_size);
295 disp_index = 5 + comm_size;
296 } else if (action.size() > 3 + comm_size + 2) {
297 // disps specified; datatype is not specified; use the default one
298 datatype1 = MPI_DEFAULT_TYPE;
299 datatype2 = MPI_DEFAULT_TYPE;
300 disp_index = 3 + comm_size;
302 // no disp specified, maybe only datatype,
303 datatype1 = parse_datatype(action, 3 + comm_size);
304 datatype2 = parse_datatype(action, 4 + comm_size);
307 if (disp_index != 0) {
308 xbt_assert(disp_index + comm_size <= action.size());
309 for (unsigned i = 0; i < comm_size; i++)
310 disps[i] = std::stoi(action[disp_index + i]);
314 for (unsigned int i = 0; i < comm_size; i++) {
315 (*recvcounts)[i] = std::stoi(action[i + 3]);
317 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
320 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
322 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
325 1) 68 is the sendcounts
326 2) 68 is the recvcounts
327 3) 0 is the root node
328 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
329 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
331 comm_size = MPI_COMM_WORLD->size();
332 CHECK_ACTION_PARAMS(action, 2, 3)
333 comm_size = MPI_COMM_WORLD->size();
334 send_size = parse_integer<int>(action[2]);
335 recv_size = parse_integer<int>(action[3]);
336 root = parse_root(action, 4);
337 datatype1 = parse_datatype(action, 5);
338 datatype2 = parse_datatype(action, 6);
341 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
343 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
344 0 gather 68 10 10 10 68 0 0 0
346 1) 68 10 10 10 is the sendcounts
347 2) 68 is the recvcount
348 3) 0 is the root node
349 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
350 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
352 comm_size = MPI_COMM_WORLD->size();
353 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
354 recv_size = parse_integer<int>(action[2 + comm_size]);
355 disps = std::vector<int>(comm_size, 0);
356 sendcounts = std::make_shared<std::vector<int>>(comm_size);
358 root = parse_root(action, 3 + comm_size);
359 datatype1 = parse_datatype(action, 4 + comm_size);
360 datatype2 = parse_datatype(action, 5 + comm_size);
362 for (unsigned int i = 0; i < comm_size; i++) {
363 (*sendcounts)[i] = std::stoi(action[i + 2]);
365 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
368 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
370 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
371 0 reducescatter 275427 275427 275427 204020 11346849 0
373 1) The first four values after the name of the action declare the recvcounts array
374 2) The value 11346849 is the amount of instructions
375 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
377 comm_size = MPI_COMM_WORLD->size();
378 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
379 comp_size = parse_double(action[2 + comm_size]);
380 recvcounts = std::make_shared<std::vector<int>>(comm_size);
381 datatype1 = parse_datatype(action, 3 + comm_size);
383 for (unsigned int i = 0; i < comm_size; i++) {
384 (*recvcounts)[i]= std::stoi(action[i + 2]);
386 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
389 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
391 CHECK_ACTION_PARAMS(action, 2, 1)
392 size = parse_integer<size_t>(action[2]);
393 comp_size = parse_double(action[3]);
394 datatype1 = parse_datatype(action, 4);
397 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
399 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
400 0 alltoallv 100 1 7 10 12 100 1 70 10 5
402 1) 100 is the size of the send buffer *sizeof(int),
403 2) 1 7 10 12 is the sendcounts array
404 3) 100*sizeof(int) is the size of the receiver buffer
405 4) 1 70 10 5 is the recvcounts array
407 comm_size = MPI_COMM_WORLD->size();
408 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
409 sendcounts = std::make_shared<std::vector<int>>(comm_size);
410 recvcounts = std::make_shared<std::vector<int>>(comm_size);
411 senddisps = std::vector<int>(comm_size, 0);
412 recvdisps = std::vector<int>(comm_size, 0);
414 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
415 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
417 send_buf_size = parse_integer<int>(action[2]);
418 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
419 for (unsigned int i = 0; i < comm_size; i++) {
420 (*sendcounts)[i] = std::stoi(action[3 + i]);
421 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
423 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
424 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
427 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
429 std::string s = boost::algorithm::join(action, " ");
430 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
431 const WaitTestParser& args = get_args();
432 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
434 if (request == MPI_REQUEST_NULL) {
435 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
440 // Must be taken before Request::wait() since the request may be set to
441 // MPI_REQUEST_NULL by Request::wait!
442 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
444 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
447 Request::wait(&request, &status);
448 if(request!=MPI_REQUEST_NULL)
449 Request::unref(&request);
450 TRACE_smpi_comm_out(get_pid());
451 if (is_wait_for_receive)
452 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
455 void SendAction::kernel(simgrid::xbt::ReplayAction&)
457 const SendRecvParser& args = get_args();
458 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
462 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
463 if (not TRACE_smpi_view_internals())
464 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
466 if (get_name() == "send") {
467 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
468 } else if (get_name() == "isend") {
469 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
470 req_storage.add(request);
472 xbt_die("Don't know this action, %s", get_name().c_str());
475 TRACE_smpi_comm_out(get_pid());
478 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
480 const SendRecvParser& args = get_args();
483 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
486 // unknown size from the receiver point of view
487 size_t arg_size = args.size;
489 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
490 arg_size = status.count;
493 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
494 if (get_name() == "recv") {
496 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
497 } else if (get_name() == "irecv") {
498 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
499 req_storage.add(request);
504 TRACE_smpi_comm_out(get_pid());
505 if (is_recv && not TRACE_smpi_view_internals()) {
506 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
507 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
511 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
513 const ComputeParser& args = get_args();
514 if (smpi_cfg_simulate_computation()) {
515 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
519 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
521 const SleepParser& args = get_args();
522 XBT_DEBUG("Sleep for: %lf secs", args.time);
523 aid_t pid = simgrid::s4u::this_actor::get_pid();
524 TRACE_smpi_sleeping_in(pid, args.time);
525 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
526 TRACE_smpi_sleeping_out(pid);
529 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
531 const LocationParser& args = get_args();
532 smpi_trace_set_call_location(args.filename.c_str(), args.line);
535 void TestAction::kernel(simgrid::xbt::ReplayAction&)
537 const WaitTestParser& args = get_args();
538 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
539 // if request is null here, this may mean that a previous test has succeeded
540 // Different times in traced application and replayed version may lead to this
541 // In this case, ignore the extra calls.
542 if (request != MPI_REQUEST_NULL) {
543 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
547 Request::test(&request, &status, &flag);
549 XBT_DEBUG("MPI_Test result: %d", flag);
550 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
552 if (request == MPI_REQUEST_NULL)
553 req_storage.addNullRequest(args.src, args.dst, args.tag);
555 req_storage.add(request);
557 TRACE_smpi_comm_out(get_pid());
561 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
563 CHECK_ACTION_PARAMS(action, 0, 1)
564 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
565 : MPI_BYTE; // default TAU datatype
567 /* start a simulated timer */
568 smpi_process()->simulated_start();
571 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
576 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
578 if (req_storage.size() > 0) {
579 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
580 std::vector<MPI_Request> reqs;
581 req_storage.get_requests(reqs);
582 unsigned long count_requests = reqs.size();
583 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
584 for (auto const& req : reqs) {
585 if (req && (req->flags() & MPI_REQ_RECV)) {
586 sender_receiver.emplace_back(req->src(), req->dst());
589 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
590 req_storage.get_store().clear();
592 for (MPI_Request& req : reqs)
593 if (req != MPI_REQUEST_NULL)
594 Request::unref(&req);
596 for (auto const& pair : sender_receiver) {
597 TRACE_smpi_recv(pair.first, pair.second, 0);
599 TRACE_smpi_comm_out(get_pid());
603 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
605 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
606 colls::barrier(MPI_COMM_WORLD);
607 TRACE_smpi_comm_out(get_pid());
610 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
612 const BcastArgParser& args = get_args();
613 TRACE_smpi_comm_in(get_pid(), "action_bcast",
614 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
615 0, Datatype::encode(args.datatype1), ""));
617 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
619 TRACE_smpi_comm_out(get_pid());
622 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
624 const ReduceArgParser& args = get_args();
625 TRACE_smpi_comm_in(get_pid(), "action_reduce",
626 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
627 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
629 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
630 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
631 args.root, MPI_COMM_WORLD);
632 if (args.comp_size != 0.0)
633 simgrid::s4u::this_actor::exec_init(args.comp_size)
634 ->set_name("computation")
638 TRACE_smpi_comm_out(get_pid());
641 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
643 const AllReduceArgParser& args = get_args();
644 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
645 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
646 Datatype::encode(args.datatype1), ""));
648 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
649 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
651 if (args.comp_size != 0.0)
652 simgrid::s4u::this_actor::exec_init(args.comp_size)
653 ->set_name("computation")
657 TRACE_smpi_comm_out(get_pid());
660 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
662 const AllToAllArgParser& args = get_args();
663 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
664 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
665 Datatype::encode(args.datatype1),
666 Datatype::encode(args.datatype2)));
668 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
669 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
672 TRACE_smpi_comm_out(get_pid());
675 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
677 const GatherArgParser& args = get_args();
678 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
679 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
680 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
681 Datatype::encode(args.datatype2)));
683 if (get_name() == "gather") {
684 int rank = MPI_COMM_WORLD->rank();
685 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
686 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
687 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
689 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
690 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
693 TRACE_smpi_comm_out(get_pid());
696 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
698 int rank = MPI_COMM_WORLD->rank();
699 const GatherVArgParser& args = get_args();
700 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
701 new simgrid::instr::VarCollTIData(
702 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
703 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
705 if (get_name() == "gatherv") {
706 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
707 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
708 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
710 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
711 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
712 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
715 TRACE_smpi_comm_out(get_pid());
718 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
720 int rank = MPI_COMM_WORLD->rank();
721 const ScatterArgParser& args = get_args();
722 TRACE_smpi_comm_in(get_pid(), "action_scatter",
723 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
724 Datatype::encode(args.datatype1),
725 Datatype::encode(args.datatype2)));
727 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
728 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
729 args.datatype2, args.root, MPI_COMM_WORLD);
731 TRACE_smpi_comm_out(get_pid());
734 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
736 int rank = MPI_COMM_WORLD->rank();
737 const ScatterVArgParser& args = get_args();
738 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
739 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
740 nullptr, Datatype::encode(args.datatype1),
741 Datatype::encode(args.datatype2)));
743 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
744 args.sendcounts->data(), args.disps.data(), args.datatype1,
745 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
748 TRACE_smpi_comm_out(get_pid());
751 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
753 const ReduceScatterArgParser& args = get_args();
755 get_pid(), "action_reducescatter",
756 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
757 /* ugly as we use datatype field to pass computation as string */
758 /* and because of the trick to avoid getting 0.000000 when 0 is given */
759 args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
760 Datatype::encode(args.datatype1)));
762 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
763 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
764 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
765 if (args.comp_size != 0.0)
766 simgrid::s4u::this_actor::exec_init(args.comp_size)
767 ->set_name("computation")
770 TRACE_smpi_comm_out(get_pid());
773 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
775 const ScanArgParser& args = get_args();
776 TRACE_smpi_comm_in(get_pid(), "action_scan",
777 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
778 args.size, 0, Datatype::encode(args.datatype1), ""));
779 if (get_name() == "scan")
780 colls::scan(send_buffer(args.size * args.datatype1->size()),
781 recv_buffer(args.size * args.datatype1->size()), args.size,
782 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
784 colls::exscan(send_buffer(args.size * args.datatype1->size()),
785 recv_buffer(args.size * args.datatype1->size()), args.size,
786 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
788 if (args.comp_size != 0.0)
789 simgrid::s4u::this_actor::exec_init(args.comp_size)
790 ->set_name("computation")
793 TRACE_smpi_comm_out(get_pid());
796 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
798 const AllToAllVArgParser& args = get_args();
799 TRACE_smpi_comm_in(get_pid(), __func__,
800 new simgrid::instr::VarCollTIData(
801 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
802 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
804 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
805 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
806 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
808 TRACE_smpi_comm_out(get_pid());
810 } // Replay Namespace
811 }} // namespace simgrid::smpi
813 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
814 /** @brief Only initialize the replay, don't do it for real */
815 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
817 xbt_assert(not smpi_process()->initializing());
819 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
820 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
821 simgrid::smpi::ActorExt::init();
823 smpi_process()->mark_as_initialized();
824 smpi_process()->set_replaying(true);
826 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
827 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
828 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
829 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
830 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
831 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
832 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
833 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
834 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
835 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
836 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
837 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
838 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
839 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
840 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
841 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
842 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
843 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
844 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
845 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
846 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
847 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
848 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
849 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
850 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
851 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
852 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
853 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
854 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
855 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
856 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
858 //if we have a delayed start, sleep here.
859 if (start_delay_flops > 0) {
860 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
861 private_execute_flops(start_delay_flops);
863 // Wait for the other actors to initialize also
864 simgrid::s4u::this_actor::yield();
866 if(_smpi_init_sleep > 0)
867 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
870 /** @brief actually run the replay after initialization */
871 void smpi_replay_main(int rank, const char* private_trace_filename)
873 static int active_processes = 0;
875 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
876 std::string rank_string = std::to_string(rank);
877 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
879 /* and now, finalize everything */
880 /* One active process will stop. Decrease the counter*/
881 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
882 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
883 if (count_requests > 0) {
884 std::vector<MPI_Request> requests(count_requests);
887 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
888 for (auto& req: pair.second){
893 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
896 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
897 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
901 if(active_processes==0){
902 /* Last process alive speaking: end the simulated timer */
903 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
904 smpi_free_replay_tmp_buffers();
907 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
908 new simgrid::instr::NoOpTIData("finalize"));
910 smpi_process()->finalize();
912 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
915 /** @brief chain a replay initialization and a replay start */
916 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
918 smpi_replay_init(instance_id, rank, start_delay_flops);
919 smpi_replay_main(rank, private_trace_filename);