1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
23 #include <unordered_map>
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
33 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
38 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
44 static void apply(size_t& seed, Tuple const& tuple)
46 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47 hash_combine(seed, std::get<Index>(tuple));
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
53 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
56 template <typename... TT> class hash<std::tuple<TT...>> {
58 size_t operator()(std::tuple<TT...> const& tt) const
61 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
75 /* Helper functions */
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "not a double");
81 template <typename T> static T parse_integer(const std::string& string)
83 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85 val <= static_cast<double>(std::numeric_limits<T>::max()),
86 "out of range: %g", val);
87 return static_cast<T>(val);
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
92 return i < action.size() ? std::stoi(action[i]) : 0;
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
97 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
104 MPI_Datatype MPI_DEFAULT_TYPE;
106 class RequestStorage {
108 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
109 using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
114 RequestStorage() = default;
115 size_t size() const { return store.size(); }
117 req_storage_t& get_store() { return store; }
119 void get_requests(std::vector<MPI_Request>& vec) const
121 for (auto const& pair : store) {
122 auto& reqs = pair.second;
123 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
124 for (auto& req: reqs){
125 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
127 req->print_request("MM");
133 MPI_Request pop(int src, int dst, int tag)
135 auto it = store.find(req_key_t(src, dst, tag));
136 if (it == store.end())
137 return MPI_REQUEST_NULL;
138 MPI_Request req = it->second.front();
139 it->second.pop_front();
140 if(it->second.empty())
141 store.erase(req_key_t(src, dst, tag));
145 void add(MPI_Request req)
147 if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
148 auto it = store.find(req_key_t(req->src()-1, req->dst()-1, req->tag()));
149 if (it == store.end())
150 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), std::list<MPI_Request>()});
151 store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
155 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
156 void addNullRequest(int src, int dst, int tag)
158 int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
159 int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
160 auto it = store.find(req_key_t(src_pid, dest_pid, tag));
161 if (it == store.end())
162 store.insert({req_key_t(src_pid, dest_pid, tag), std::list<MPI_Request>()});
163 store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
167 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 CHECK_ACTION_PARAMS(action, 3, 0)
170 src = std::stoi(action[2]);
171 dst = std::stoi(action[3]);
172 tag = std::stoi(action[4]);
175 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
177 CHECK_ACTION_PARAMS(action, 3, 1)
178 partner = std::stoi(action[2]);
179 tag = std::stoi(action[3]);
180 size = parse_integer<size_t>(action[4]);
181 datatype1 = parse_datatype(action, 5);
184 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
186 CHECK_ACTION_PARAMS(action, 1, 0)
187 flops = parse_double(action[2]);
190 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
192 CHECK_ACTION_PARAMS(action, 1, 0)
193 time = parse_double(action[2]);
196 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
198 CHECK_ACTION_PARAMS(action, 2, 0)
199 filename = std::string(action[2]);
200 line = std::stoi(action[3]);
203 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
205 CHECK_ACTION_PARAMS(action, 1, 2)
206 size = parse_integer<size_t>(action[2]);
207 root = parse_root(action, 3);
208 datatype1 = parse_datatype(action, 4);
211 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
213 CHECK_ACTION_PARAMS(action, 2, 2)
214 comm_size = parse_integer<unsigned>(action[2]);
215 comp_size = parse_double(action[3]);
216 root = parse_root(action, 4);
217 datatype1 = parse_datatype(action, 5);
220 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
222 CHECK_ACTION_PARAMS(action, 2, 1)
223 comm_size = parse_integer<unsigned>(action[2]);
224 comp_size = parse_double(action[3]);
225 datatype1 = parse_datatype(action, 4);
228 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
230 CHECK_ACTION_PARAMS(action, 2, 1)
231 comm_size = MPI_COMM_WORLD->size();
232 send_size = parse_integer<int>(action[2]);
233 recv_size = parse_integer<int>(action[3]);
234 datatype1 = parse_datatype(action, 4);
235 datatype2 = parse_datatype(action, 5);
238 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
240 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
243 1) 68 is the sendcounts
244 2) 68 is the recvcounts
245 3) 0 is the root node
246 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
247 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
249 CHECK_ACTION_PARAMS(action, 2, 3)
250 comm_size = MPI_COMM_WORLD->size();
251 send_size = parse_integer<int>(action[2]);
252 recv_size = parse_integer<int>(action[3]);
254 if (name == "gather") {
255 root = parse_root(action, 4);
256 datatype1 = parse_datatype(action, 5);
257 datatype2 = parse_datatype(action, 6);
260 datatype1 = parse_datatype(action, 4);
261 datatype2 = parse_datatype(action, 5);
265 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
267 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
268 0 gather 68 68 10 10 10 0 0 0
270 1) 68 is the sendcount
271 2) 68 10 10 10 is the recvcounts
272 3) 0 is the root node
273 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
274 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
276 comm_size = MPI_COMM_WORLD->size();
277 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
278 send_size = parse_integer<int>(action[2]);
279 disps = std::vector<int>(comm_size, 0);
280 recvcounts = std::make_shared<std::vector<int>>(comm_size);
282 if (name == "gatherv") {
283 root = parse_root(action, 3 + comm_size);
284 datatype1 = parse_datatype(action, 4 + comm_size);
285 datatype2 = parse_datatype(action, 5 + comm_size);
288 unsigned disp_index = 0;
289 /* The 3 comes from "0 gather <sendcount>", which must always be present.
290 * The + comm_size is the recvcounts array, which must also be present
292 if (action.size() > 3 + comm_size + comm_size) {
293 // datatype + disp are specified
294 datatype1 = parse_datatype(action, 3 + comm_size);
295 datatype2 = parse_datatype(action, 4 + comm_size);
296 disp_index = 5 + comm_size;
297 } else if (action.size() > 3 + comm_size + 2) {
298 // disps specified; datatype is not specified; use the default one
299 datatype1 = MPI_DEFAULT_TYPE;
300 datatype2 = MPI_DEFAULT_TYPE;
301 disp_index = 3 + comm_size;
303 // no disp specified, maybe only datatype,
304 datatype1 = parse_datatype(action, 3 + comm_size);
305 datatype2 = parse_datatype(action, 4 + comm_size);
308 if (disp_index != 0) {
309 xbt_assert(disp_index + comm_size <= action.size());
310 for (unsigned i = 0; i < comm_size; i++)
311 disps[i] = std::stoi(action[disp_index + i]);
315 for (unsigned int i = 0; i < comm_size; i++) {
316 (*recvcounts)[i] = std::stoi(action[i + 3]);
318 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
321 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
323 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
326 1) 68 is the sendcounts
327 2) 68 is the recvcounts
328 3) 0 is the root node
329 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
330 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
332 comm_size = MPI_COMM_WORLD->size();
333 CHECK_ACTION_PARAMS(action, 2, 3)
334 comm_size = MPI_COMM_WORLD->size();
335 send_size = parse_integer<int>(action[2]);
336 recv_size = parse_integer<int>(action[3]);
337 root = parse_root(action, 4);
338 datatype1 = parse_datatype(action, 5);
339 datatype2 = parse_datatype(action, 6);
342 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
344 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
345 0 gather 68 10 10 10 68 0 0 0
347 1) 68 10 10 10 is the sendcounts
348 2) 68 is the recvcount
349 3) 0 is the root node
350 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
351 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
353 comm_size = MPI_COMM_WORLD->size();
354 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
355 recv_size = parse_integer<int>(action[2 + comm_size]);
356 disps = std::vector<int>(comm_size, 0);
357 sendcounts = std::make_shared<std::vector<int>>(comm_size);
359 root = parse_root(action, 3 + comm_size);
360 datatype1 = parse_datatype(action, 4 + comm_size);
361 datatype2 = parse_datatype(action, 5 + comm_size);
363 for (unsigned int i = 0; i < comm_size; i++) {
364 (*sendcounts)[i] = std::stoi(action[i + 2]);
366 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
369 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
371 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
372 0 reducescatter 275427 275427 275427 204020 11346849 0
374 1) The first four values after the name of the action declare the recvcounts array
375 2) The value 11346849 is the amount of instructions
376 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
378 comm_size = MPI_COMM_WORLD->size();
379 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
380 comp_size = parse_double(action[2 + comm_size]);
381 recvcounts = std::make_shared<std::vector<int>>(comm_size);
382 datatype1 = parse_datatype(action, 3 + comm_size);
384 for (unsigned int i = 0; i < comm_size; i++) {
385 (*recvcounts)[i]= std::stoi(action[i + 2]);
387 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
390 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
392 CHECK_ACTION_PARAMS(action, 2, 1)
393 size = parse_integer<size_t>(action[2]);
394 comp_size = parse_double(action[3]);
395 datatype1 = parse_datatype(action, 4);
398 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
400 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
401 0 alltoallv 100 1 7 10 12 100 1 70 10 5
403 1) 100 is the size of the send buffer *sizeof(int),
404 2) 1 7 10 12 is the sendcounts array
405 3) 100*sizeof(int) is the size of the receiver buffer
406 4) 1 70 10 5 is the recvcounts array
408 comm_size = MPI_COMM_WORLD->size();
409 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
410 sendcounts = std::make_shared<std::vector<int>>(comm_size);
411 recvcounts = std::make_shared<std::vector<int>>(comm_size);
412 senddisps = std::vector<int>(comm_size, 0);
413 recvdisps = std::vector<int>(comm_size, 0);
415 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
416 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
418 send_buf_size = parse_integer<int>(action[2]);
419 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
420 for (unsigned int i = 0; i < comm_size; i++) {
421 (*sendcounts)[i] = std::stoi(action[3 + i]);
422 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
424 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
425 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
428 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
430 std::string s = boost::algorithm::join(action, " ");
431 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
432 const WaitTestParser& args = get_args();
433 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
435 if (request == MPI_REQUEST_NULL) {
436 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
441 // Must be taken before Request::wait() since the request may be set to
442 // MPI_REQUEST_NULL by Request::wait!
443 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
445 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
448 Request::wait(&request, &status);
449 if(request!=MPI_REQUEST_NULL)
450 Request::unref(&request);
451 TRACE_smpi_comm_out(get_pid());
452 if (is_wait_for_receive)
453 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
456 void SendAction::kernel(simgrid::xbt::ReplayAction&)
458 const SendRecvParser& args = get_args();
459 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
463 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
464 if (not TRACE_smpi_view_internals())
465 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
467 if (get_name() == "send") {
468 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
469 } else if (get_name() == "isend") {
470 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
471 req_storage.add(request);
473 xbt_die("Don't know this action, %s", get_name().c_str());
476 TRACE_smpi_comm_out(get_pid());
479 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
481 const SendRecvParser& args = get_args();
484 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
487 // unknown size from the receiver point of view
488 size_t arg_size = args.size;
490 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
491 arg_size = status.count;
494 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
495 if (get_name() == "recv") {
497 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
498 } else if (get_name() == "irecv") {
499 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
500 req_storage.add(request);
505 TRACE_smpi_comm_out(get_pid());
506 if (is_recv && not TRACE_smpi_view_internals()) {
507 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
508 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
512 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
514 const ComputeParser& args = get_args();
515 if (smpi_cfg_simulate_computation()) {
516 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
520 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
522 const SleepParser& args = get_args();
523 XBT_DEBUG("Sleep for: %lf secs", args.time);
524 aid_t pid = simgrid::s4u::this_actor::get_pid();
525 TRACE_smpi_sleeping_in(pid, args.time);
526 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
527 TRACE_smpi_sleeping_out(pid);
530 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
532 const LocationParser& args = get_args();
533 smpi_trace_set_call_location(args.filename.c_str(), args.line);
536 void TestAction::kernel(simgrid::xbt::ReplayAction&)
538 const WaitTestParser& args = get_args();
539 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
540 // if request is null here, this may mean that a previous test has succeeded
541 // Different times in traced application and replayed version may lead to this
542 // In this case, ignore the extra calls.
543 if (request != MPI_REQUEST_NULL) {
544 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
548 Request::test(&request, &status, &flag);
550 XBT_DEBUG("MPI_Test result: %d", flag);
551 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
553 if (request == MPI_REQUEST_NULL)
554 req_storage.addNullRequest(args.src, args.dst, args.tag);
556 req_storage.add(request);
558 TRACE_smpi_comm_out(get_pid());
562 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
564 CHECK_ACTION_PARAMS(action, 0, 1)
565 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
566 : MPI_BYTE; // default TAU datatype
568 /* start a simulated timer */
569 smpi_process()->simulated_start();
572 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
577 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
579 if (req_storage.size() > 0) {
580 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
581 std::vector<MPI_Request> reqs;
582 req_storage.get_requests(reqs);
583 unsigned long count_requests = reqs.size();
584 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
585 for (auto const& req : reqs) {
586 if (req && (req->flags() & MPI_REQ_RECV)) {
587 sender_receiver.emplace_back(req->src(), req->dst());
590 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
591 req_storage.get_store().clear();
593 for (MPI_Request& req : reqs)
594 if (req != MPI_REQUEST_NULL)
595 Request::unref(&req);
597 for (auto const& pair : sender_receiver) {
598 TRACE_smpi_recv(pair.first, pair.second, 0);
600 TRACE_smpi_comm_out(get_pid());
604 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
606 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
607 colls::barrier(MPI_COMM_WORLD);
608 TRACE_smpi_comm_out(get_pid());
611 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
613 const BcastArgParser& args = get_args();
614 TRACE_smpi_comm_in(get_pid(), "action_bcast",
615 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
616 0, Datatype::encode(args.datatype1), ""));
618 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
620 TRACE_smpi_comm_out(get_pid());
623 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
625 const ReduceArgParser& args = get_args();
626 TRACE_smpi_comm_in(get_pid(), "action_reduce",
627 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
628 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
630 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
631 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
632 args.root, MPI_COMM_WORLD);
633 if (args.comp_size != 0.0)
634 simgrid::s4u::this_actor::exec_init(args.comp_size)
635 ->set_name("computation")
639 TRACE_smpi_comm_out(get_pid());
642 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
644 const AllReduceArgParser& args = get_args();
645 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
646 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
647 Datatype::encode(args.datatype1), ""));
649 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
650 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
652 if (args.comp_size != 0.0)
653 simgrid::s4u::this_actor::exec_init(args.comp_size)
654 ->set_name("computation")
658 TRACE_smpi_comm_out(get_pid());
661 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
663 const AllToAllArgParser& args = get_args();
664 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
665 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
666 Datatype::encode(args.datatype1),
667 Datatype::encode(args.datatype2)));
669 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
670 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
673 TRACE_smpi_comm_out(get_pid());
676 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
678 const GatherArgParser& args = get_args();
679 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
680 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
681 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
682 Datatype::encode(args.datatype2)));
684 if (get_name() == "gather") {
685 int rank = MPI_COMM_WORLD->rank();
686 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
687 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
688 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
690 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
691 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
694 TRACE_smpi_comm_out(get_pid());
697 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
699 int rank = MPI_COMM_WORLD->rank();
700 const GatherVArgParser& args = get_args();
701 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
702 new simgrid::instr::VarCollTIData(
703 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
704 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
706 if (get_name() == "gatherv") {
707 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
708 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
709 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
711 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
712 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
713 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
716 TRACE_smpi_comm_out(get_pid());
719 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
721 int rank = MPI_COMM_WORLD->rank();
722 const ScatterArgParser& args = get_args();
723 TRACE_smpi_comm_in(get_pid(), "action_scatter",
724 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
725 Datatype::encode(args.datatype1),
726 Datatype::encode(args.datatype2)));
728 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
729 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
730 args.datatype2, args.root, MPI_COMM_WORLD);
732 TRACE_smpi_comm_out(get_pid());
735 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
737 int rank = MPI_COMM_WORLD->rank();
738 const ScatterVArgParser& args = get_args();
739 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
740 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
741 nullptr, Datatype::encode(args.datatype1),
742 Datatype::encode(args.datatype2)));
744 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
745 args.sendcounts->data(), args.disps.data(), args.datatype1,
746 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
749 TRACE_smpi_comm_out(get_pid());
752 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
754 const ReduceScatterArgParser& args = get_args();
756 get_pid(), "action_reducescatter",
757 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
758 /* ugly as we use datatype field to pass computation as string */
759 /* and because of the trick to avoid getting 0.000000 when 0 is given */
760 args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
761 Datatype::encode(args.datatype1)));
763 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
764 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
765 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
766 if (args.comp_size != 0.0)
767 simgrid::s4u::this_actor::exec_init(args.comp_size)
768 ->set_name("computation")
771 TRACE_smpi_comm_out(get_pid());
774 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
776 const ScanArgParser& args = get_args();
777 TRACE_smpi_comm_in(get_pid(), "action_scan",
778 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
779 args.size, 0, Datatype::encode(args.datatype1), ""));
780 if (get_name() == "scan")
781 colls::scan(send_buffer(args.size * args.datatype1->size()),
782 recv_buffer(args.size * args.datatype1->size()), args.size,
783 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
785 colls::exscan(send_buffer(args.size * args.datatype1->size()),
786 recv_buffer(args.size * args.datatype1->size()), args.size,
787 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
789 if (args.comp_size != 0.0)
790 simgrid::s4u::this_actor::exec_init(args.comp_size)
791 ->set_name("computation")
794 TRACE_smpi_comm_out(get_pid());
797 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
799 const AllToAllVArgParser& args = get_args();
800 TRACE_smpi_comm_in(get_pid(), __func__,
801 new simgrid::instr::VarCollTIData(
802 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
803 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
805 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
806 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
807 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
809 TRACE_smpi_comm_out(get_pid());
811 } // Replay Namespace
812 }} // namespace simgrid::smpi
814 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
815 /** @brief Only initialize the replay, don't do it for real */
816 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
818 xbt_assert(not smpi_process()->initializing());
820 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
821 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
822 simgrid::smpi::ActorExt::init();
824 smpi_process()->mark_as_initialized();
825 smpi_process()->set_replaying(true);
827 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
828 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
829 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
830 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
831 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
832 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
833 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
834 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
835 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
836 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
837 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
838 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
839 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
840 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
841 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
842 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
843 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
844 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
845 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
846 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
847 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
848 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
849 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
850 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
851 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
852 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
853 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
854 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
855 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
856 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
857 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
859 //if we have a delayed start, sleep here.
860 if (start_delay_flops > 0) {
861 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
862 private_execute_flops(start_delay_flops);
864 // Wait for the other actors to initialize also
865 simgrid::s4u::this_actor::yield();
867 if(_smpi_init_sleep > 0)
868 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
871 /** @brief actually run the replay after initialization */
872 void smpi_replay_main(int rank, const char* private_trace_filename)
874 static int active_processes = 0;
876 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
877 std::string rank_string = std::to_string(rank);
878 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
880 /* and now, finalize everything */
881 /* One active process will stop. Decrease the counter*/
882 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
883 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
884 if (count_requests > 0) {
885 std::vector<MPI_Request> requests(count_requests);
888 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
889 for (auto& req: pair.second){
894 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
897 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
898 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
902 if(active_processes==0){
903 /* Last process alive speaking: end the simulated timer */
904 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
905 smpi_free_replay_tmp_buffers();
908 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
909 new simgrid::instr::NoOpTIData("finalize"));
911 smpi_process()->finalize();
913 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
916 /** @brief chain a replay initialization and a replay start */
917 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
919 smpi_replay_init(instance_id, rank, start_delay_flops);
920 smpi_replay_main(rank, private_trace_filename);