1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
23 #include <unordered_map>
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
33 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
38 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
44 static void apply(size_t& seed, Tuple const& tuple)
46 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47 hash_combine(seed, std::get<Index>(tuple));
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
53 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
56 template <typename... TT> class hash<std::tuple<TT...>> {
58 size_t operator()(std::tuple<TT...> const& tt) const
61 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
75 /* Helper functions */
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "not a double");
81 template <typename T> static T parse_integer(const std::string& string)
83 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85 val <= static_cast<double>(std::numeric_limits<T>::max()),
86 "out of range: %g", val);
87 return static_cast<T>(val);
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
92 return i < action.size() ? std::stoi(action[i]) : 0;
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
97 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
100 namespace simgrid::smpi::replay {
101 MPI_Datatype MPI_DEFAULT_TYPE;
103 class RequestStorage {
105 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
106 using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
111 RequestStorage() = default;
112 size_t size() const { return store.size(); }
114 req_storage_t& get_store() { return store; }
116 void get_requests(std::vector<MPI_Request>& vec) const
118 for (auto const& [_, reqs] : store) {
119 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
120 for (auto& req: reqs){
121 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123 req->print_request("MM");
129 MPI_Request pop(int src, int dst, int tag)
131 auto it = store.find(req_key_t(src, dst, tag));
132 if (it == store.end())
133 return MPI_REQUEST_NULL;
134 MPI_Request req = it->second.front();
135 it->second.pop_front();
136 if(it->second.empty())
137 store.erase(req_key_t(src, dst, tag));
141 void add(MPI_Request req)
143 if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
144 store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
148 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149 void addNullRequest(int src, int dst, int tag)
151 int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
152 int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
153 store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
157 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
159 CHECK_ACTION_PARAMS(action, 3, 0)
160 src = std::stoi(action[2]);
161 dst = std::stoi(action[3]);
162 tag = std::stoi(action[4]);
165 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
167 CHECK_ACTION_PARAMS(action, 3, 1)
168 partner = std::stoi(action[2]);
169 tag = std::stoi(action[3]);
170 size = parse_integer<size_t>(action[4]);
171 datatype1 = parse_datatype(action, 5);
174 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 1, 0)
177 flops = parse_double(action[2]);
180 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 CHECK_ACTION_PARAMS(action, 1, 0)
183 time = parse_double(action[2]);
186 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
188 CHECK_ACTION_PARAMS(action, 2, 0)
189 filename = std::string(action[2]);
190 line = std::stoi(action[3]);
193 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
195 CHECK_ACTION_PARAMS(action, 1, 2)
196 size = parse_integer<size_t>(action[2]);
197 root = parse_root(action, 3);
198 datatype1 = parse_datatype(action, 4);
201 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
203 CHECK_ACTION_PARAMS(action, 2, 2)
204 comm_size = parse_integer<unsigned>(action[2]);
205 comp_size = parse_double(action[3]);
206 root = parse_root(action, 4);
207 datatype1 = parse_datatype(action, 5);
210 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
212 CHECK_ACTION_PARAMS(action, 2, 1)
213 comm_size = parse_integer<unsigned>(action[2]);
214 comp_size = parse_double(action[3]);
215 datatype1 = parse_datatype(action, 4);
218 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
220 CHECK_ACTION_PARAMS(action, 2, 1)
221 comm_size = MPI_COMM_WORLD->size();
222 send_size = parse_integer<int>(action[2]);
223 recv_size = parse_integer<int>(action[3]);
224 datatype1 = parse_datatype(action, 4);
225 datatype2 = parse_datatype(action, 5);
228 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
230 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
233 1) 68 is the sendcounts
234 2) 68 is the recvcounts
235 3) 0 is the root node
236 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
237 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
239 CHECK_ACTION_PARAMS(action, 2, 3)
240 comm_size = MPI_COMM_WORLD->size();
241 send_size = parse_integer<int>(action[2]);
242 recv_size = parse_integer<int>(action[3]);
244 if (name == "gather") {
245 root = parse_root(action, 4);
246 datatype1 = parse_datatype(action, 5);
247 datatype2 = parse_datatype(action, 6);
250 datatype1 = parse_datatype(action, 4);
251 datatype2 = parse_datatype(action, 5);
255 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
257 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
258 0 gather 68 68 10 10 10 0 0 0
260 1) 68 is the sendcount
261 2) 68 10 10 10 is the recvcounts
262 3) 0 is the root node
263 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
264 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
266 comm_size = MPI_COMM_WORLD->size();
267 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
268 send_size = parse_integer<int>(action[2]);
269 disps = std::vector<int>(comm_size, 0);
270 recvcounts = std::make_shared<std::vector<int>>(comm_size);
272 if (name == "gatherv") {
273 root = parse_root(action, 3 + comm_size);
274 datatype1 = parse_datatype(action, 4 + comm_size);
275 datatype2 = parse_datatype(action, 5 + comm_size);
278 unsigned disp_index = 0;
279 /* The 3 comes from "0 gather <sendcount>", which must always be present.
280 * The + comm_size is the recvcounts array, which must also be present
282 if (action.size() > 3 + comm_size + comm_size) {
283 // datatype + disp are specified
284 datatype1 = parse_datatype(action, 3 + comm_size);
285 datatype2 = parse_datatype(action, 4 + comm_size);
286 disp_index = 5 + comm_size;
287 } else if (action.size() > 3 + comm_size + 2) {
288 // disps specified; datatype is not specified; use the default one
289 datatype1 = MPI_DEFAULT_TYPE;
290 datatype2 = MPI_DEFAULT_TYPE;
291 disp_index = 3 + comm_size;
293 // no disp specified, maybe only datatype,
294 datatype1 = parse_datatype(action, 3 + comm_size);
295 datatype2 = parse_datatype(action, 4 + comm_size);
298 if (disp_index != 0) {
299 xbt_assert(disp_index + comm_size <= action.size());
300 for (unsigned i = 0; i < comm_size; i++)
301 disps[i] = std::stoi(action[disp_index + i]);
305 for (unsigned int i = 0; i < comm_size; i++) {
306 (*recvcounts)[i] = std::stoi(action[i + 3]);
308 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
311 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
313 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
316 1) 68 is the sendcounts
317 2) 68 is the recvcounts
318 3) 0 is the root node
319 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
320 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
322 comm_size = MPI_COMM_WORLD->size();
323 CHECK_ACTION_PARAMS(action, 2, 3)
324 comm_size = MPI_COMM_WORLD->size();
325 send_size = parse_integer<int>(action[2]);
326 recv_size = parse_integer<int>(action[3]);
327 root = parse_root(action, 4);
328 datatype1 = parse_datatype(action, 5);
329 datatype2 = parse_datatype(action, 6);
332 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
334 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
335 0 gather 68 10 10 10 68 0 0 0
337 1) 68 10 10 10 is the sendcounts
338 2) 68 is the recvcount
339 3) 0 is the root node
340 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
341 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
343 comm_size = MPI_COMM_WORLD->size();
344 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
345 recv_size = parse_integer<int>(action[2 + comm_size]);
346 disps = std::vector<int>(comm_size, 0);
347 sendcounts = std::make_shared<std::vector<int>>(comm_size);
349 root = parse_root(action, 3 + comm_size);
350 datatype1 = parse_datatype(action, 4 + comm_size);
351 datatype2 = parse_datatype(action, 5 + comm_size);
353 for (unsigned int i = 0; i < comm_size; i++) {
354 (*sendcounts)[i] = std::stoi(action[i + 2]);
356 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
359 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
361 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
362 0 reducescatter 275427 275427 275427 204020 11346849 0
364 1) The first four values after the name of the action declare the recvcounts array
365 2) The value 11346849 is the amount of instructions
366 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
368 comm_size = MPI_COMM_WORLD->size();
369 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
370 comp_size = parse_double(action[2 + comm_size]);
371 recvcounts = std::make_shared<std::vector<int>>(comm_size);
372 datatype1 = parse_datatype(action, 3 + comm_size);
374 for (unsigned int i = 0; i < comm_size; i++) {
375 (*recvcounts)[i]= std::stoi(action[i + 2]);
377 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
380 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
382 CHECK_ACTION_PARAMS(action, 2, 1)
383 size = parse_integer<size_t>(action[2]);
384 comp_size = parse_double(action[3]);
385 datatype1 = parse_datatype(action, 4);
388 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
390 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
391 0 alltoallv 100 1 7 10 12 100 1 70 10 5
393 1) 100 is the size of the send buffer *sizeof(int),
394 2) 1 7 10 12 is the sendcounts array
395 3) 100*sizeof(int) is the size of the receiver buffer
396 4) 1 70 10 5 is the recvcounts array
398 comm_size = MPI_COMM_WORLD->size();
399 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
400 sendcounts = std::make_shared<std::vector<int>>(comm_size);
401 recvcounts = std::make_shared<std::vector<int>>(comm_size);
402 senddisps = std::vector<int>(comm_size, 0);
403 recvdisps = std::vector<int>(comm_size, 0);
405 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
406 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
408 send_buf_size = parse_integer<int>(action[2]);
409 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
410 for (unsigned int i = 0; i < comm_size; i++) {
411 (*sendcounts)[i] = std::stoi(action[3 + i]);
412 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
414 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
415 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
418 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
420 std::string s = boost::algorithm::join(action, " ");
421 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
422 const WaitTestParser& args = get_args();
423 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
425 if (request == MPI_REQUEST_NULL) {
426 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
431 // Must be taken before Request::wait() since the request may be set to
432 // MPI_REQUEST_NULL by Request::wait!
433 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
435 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
438 Request::wait(&request, &status);
439 if(request!=MPI_REQUEST_NULL)
440 Request::unref(&request);
441 TRACE_smpi_comm_out(get_pid());
442 if (is_wait_for_receive)
443 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
446 void SendAction::kernel(simgrid::xbt::ReplayAction&)
448 const SendRecvParser& args = get_args();
449 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
453 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
454 if (not TRACE_smpi_view_internals())
455 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
457 if (get_name() == "send") {
458 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
459 } else if (get_name() == "isend") {
460 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
461 req_storage.add(request);
463 xbt_die("Don't know this action, %s", get_name().c_str());
466 TRACE_smpi_comm_out(get_pid());
469 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
471 const SendRecvParser& args = get_args();
474 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
477 // unknown size from the receiver point of view
478 size_t arg_size = args.size;
480 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
481 arg_size = status.count;
484 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
485 if (get_name() == "recv") {
487 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
488 } else if (get_name() == "irecv") {
489 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
490 req_storage.add(request);
495 TRACE_smpi_comm_out(get_pid());
496 if (is_recv && not TRACE_smpi_view_internals()) {
497 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
498 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
502 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
504 const ComputeParser& args = get_args();
505 if (smpi_cfg_simulate_computation()) {
506 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
510 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
512 const SleepParser& args = get_args();
513 XBT_DEBUG("Sleep for: %lf secs", args.time);
514 aid_t pid = simgrid::s4u::this_actor::get_pid();
515 TRACE_smpi_sleeping_in(pid, args.time);
516 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
517 TRACE_smpi_sleeping_out(pid);
520 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
522 const LocationParser& args = get_args();
523 smpi_trace_set_call_location(args.filename.c_str(), args.line);
526 void TestAction::kernel(simgrid::xbt::ReplayAction&)
528 const WaitTestParser& args = get_args();
529 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
530 // if request is null here, this may mean that a previous test has succeeded
531 // Different times in traced application and replayed version may lead to this
532 // In this case, ignore the extra calls.
533 if (request != MPI_REQUEST_NULL) {
534 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
538 Request::test(&request, &status, &flag);
540 XBT_DEBUG("MPI_Test result: %d", flag);
541 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
543 if (request == MPI_REQUEST_NULL)
544 req_storage.addNullRequest(args.src, args.dst, args.tag);
546 req_storage.add(request);
548 TRACE_smpi_comm_out(get_pid());
552 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
554 CHECK_ACTION_PARAMS(action, 0, 1)
555 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
556 : MPI_BYTE; // default TAU datatype
558 /* start a simulated timer */
559 smpi_process()->simulated_start();
562 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
567 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
569 if (req_storage.size() > 0) {
570 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
571 std::vector<MPI_Request> reqs;
572 req_storage.get_requests(reqs);
573 unsigned long count_requests = reqs.size();
574 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
575 for (auto const& req : reqs) {
576 if (req && (req->flags() & MPI_REQ_RECV)) {
577 sender_receiver.emplace_back(req->src(), req->dst());
580 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
581 req_storage.get_store().clear();
583 for (MPI_Request& req : reqs)
584 if (req != MPI_REQUEST_NULL)
585 Request::unref(&req);
587 for (auto const& [src, dst] : sender_receiver) {
588 TRACE_smpi_recv(src, dst, 0);
590 TRACE_smpi_comm_out(get_pid());
594 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
596 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
597 colls::barrier(MPI_COMM_WORLD);
598 TRACE_smpi_comm_out(get_pid());
601 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
603 const BcastArgParser& args = get_args();
604 TRACE_smpi_comm_in(get_pid(), "action_bcast",
605 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
606 0, Datatype::encode(args.datatype1), ""));
608 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
610 TRACE_smpi_comm_out(get_pid());
613 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
615 const ReduceArgParser& args = get_args();
616 TRACE_smpi_comm_in(get_pid(), "action_reduce",
617 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
618 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
620 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
621 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
622 args.root, MPI_COMM_WORLD);
623 if (args.comp_size != 0.0)
624 simgrid::s4u::this_actor::exec_init(args.comp_size)
625 ->set_name("computation")
629 TRACE_smpi_comm_out(get_pid());
632 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
634 const AllReduceArgParser& args = get_args();
635 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
636 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
637 Datatype::encode(args.datatype1), ""));
639 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
640 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
642 if (args.comp_size != 0.0)
643 simgrid::s4u::this_actor::exec_init(args.comp_size)
644 ->set_name("computation")
648 TRACE_smpi_comm_out(get_pid());
651 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
653 const AllToAllArgParser& args = get_args();
654 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
655 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
656 Datatype::encode(args.datatype1),
657 Datatype::encode(args.datatype2)));
659 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
660 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
663 TRACE_smpi_comm_out(get_pid());
666 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
668 const GatherArgParser& args = get_args();
669 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
670 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
671 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
672 Datatype::encode(args.datatype2)));
674 if (get_name() == "gather") {
675 int rank = MPI_COMM_WORLD->rank();
676 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
677 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
678 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
680 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
681 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
684 TRACE_smpi_comm_out(get_pid());
687 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
689 int rank = MPI_COMM_WORLD->rank();
690 const GatherVArgParser& args = get_args();
691 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
692 new simgrid::instr::VarCollTIData(
693 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
694 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
696 if (get_name() == "gatherv") {
697 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
698 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
699 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
701 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
702 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
703 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
706 TRACE_smpi_comm_out(get_pid());
709 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
711 int rank = MPI_COMM_WORLD->rank();
712 const ScatterArgParser& args = get_args();
713 TRACE_smpi_comm_in(get_pid(), "action_scatter",
714 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
715 Datatype::encode(args.datatype1),
716 Datatype::encode(args.datatype2)));
718 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
719 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
720 args.datatype2, args.root, MPI_COMM_WORLD);
722 TRACE_smpi_comm_out(get_pid());
725 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
727 int rank = MPI_COMM_WORLD->rank();
728 const ScatterVArgParser& args = get_args();
729 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
730 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
731 nullptr, Datatype::encode(args.datatype1),
732 Datatype::encode(args.datatype2)));
734 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
735 args.sendcounts->data(), args.disps.data(), args.datatype1,
736 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
739 TRACE_smpi_comm_out(get_pid());
742 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
744 const ReduceScatterArgParser& args = get_args();
746 get_pid(), "action_reducescatter",
747 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
748 /* ugly as we use datatype field to pass computation as string */
749 /* and because of the trick to avoid getting 0.000000 when 0 is given */
750 args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
751 Datatype::encode(args.datatype1)));
753 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
754 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
755 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
756 if (args.comp_size != 0.0)
757 simgrid::s4u::this_actor::exec_init(args.comp_size)
758 ->set_name("computation")
761 TRACE_smpi_comm_out(get_pid());
764 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
766 const ScanArgParser& args = get_args();
767 TRACE_smpi_comm_in(get_pid(), "action_scan",
768 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
769 args.size, 0, Datatype::encode(args.datatype1), ""));
770 if (get_name() == "scan")
771 colls::scan(send_buffer(args.size * args.datatype1->size()),
772 recv_buffer(args.size * args.datatype1->size()), args.size,
773 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
775 colls::exscan(send_buffer(args.size * args.datatype1->size()),
776 recv_buffer(args.size * args.datatype1->size()), args.size,
777 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
779 if (args.comp_size != 0.0)
780 simgrid::s4u::this_actor::exec_init(args.comp_size)
781 ->set_name("computation")
784 TRACE_smpi_comm_out(get_pid());
787 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
789 const AllToAllVArgParser& args = get_args();
790 TRACE_smpi_comm_in(get_pid(), __func__,
791 new simgrid::instr::VarCollTIData(
792 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
793 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
795 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
796 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
797 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
799 TRACE_smpi_comm_out(get_pid());
801 } // namespace simgrid::smpi::replay
803 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
804 /** @brief Only initialize the replay, don't do it for real */
805 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
807 xbt_assert(not smpi_process()->initializing());
809 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
810 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
811 simgrid::smpi::ActorExt::init();
813 smpi_process()->mark_as_initialized();
814 smpi_process()->set_replaying(true);
816 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
817 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
818 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
819 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
823 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
824 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
825 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
826 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
827 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
828 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
829 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
830 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
831 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
832 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
833 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
834 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
835 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
836 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
837 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
838 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
839 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
840 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
841 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
842 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
843 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
844 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
845 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
846 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
848 //if we have a delayed start, sleep here.
849 if (start_delay_flops > 0) {
850 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
851 private_execute_flops(start_delay_flops);
853 // Wait for the other actors to initialize also
854 simgrid::s4u::this_actor::yield();
856 if(_smpi_init_sleep > 0)
857 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
860 /** @brief actually run the replay after initialization */
861 void smpi_replay_main(int rank, const char* private_trace_filename)
863 static int active_processes = 0;
865 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
866 std::string rank_string = std::to_string(rank);
867 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
869 /* and now, finalize everything */
870 /* One active process will stop. Decrease the counter*/
871 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
872 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
873 if (count_requests > 0) {
874 std::vector<MPI_Request> requests(count_requests);
877 for (auto const& [_, reqs] : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
878 for (auto& req : reqs) {
879 requests[i] = req; // FIXME: overwritten at each iteration?
883 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
886 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
887 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
891 if(active_processes==0){
892 /* Last process alive speaking: end the simulated timer */
893 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
894 smpi_free_replay_tmp_buffers();
897 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
898 new simgrid::instr::NoOpTIData("finalize"));
900 smpi_process()->finalize();
902 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
905 /** @brief chain a replay initialization and a replay start */
906 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
908 smpi_replay_init(instance_id, rank, start_delay_flops);
909 smpi_replay_main(rank, private_trace_filename);