1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
23 #include <unordered_map>
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
33 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
38 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
44 static void apply(size_t& seed, Tuple const& tuple)
46 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47 hash_combine(seed, std::get<Index>(tuple));
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
53 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
56 template <typename... TT> class hash<std::tuple<TT...>> {
58 size_t operator()(std::tuple<TT...> const& tt) const
61 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
75 /* Helper functions */
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "not a double");
81 template <typename T> static T parse_integer(const std::string& string)
83 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85 val <= static_cast<double>(std::numeric_limits<T>::max()),
86 "out of range: %g", val);
87 return static_cast<T>(val);
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
92 return i < action.size() ? std::stoi(action[i]) : 0;
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
97 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
104 MPI_Datatype MPI_DEFAULT_TYPE;
106 class RequestStorage {
108 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
109 using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
114 RequestStorage() = default;
115 size_t size() const { return store.size(); }
117 req_storage_t& get_store() { return store; }
119 void get_requests(std::vector<MPI_Request>& vec) const
121 for (auto const& pair : store) {
122 auto& reqs = pair.second;
123 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
124 for (auto& req: reqs){
125 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
127 req->print_request("MM");
133 MPI_Request pop(int src, int dst, int tag)
135 auto it = store.find(req_key_t(src, dst, tag));
136 if (it == store.end())
137 return MPI_REQUEST_NULL;
138 MPI_Request req = it->second.front();
139 it->second.pop_front();
140 if(it->second.empty())
141 store.erase(req_key_t(src, dst, tag));
145 void add(MPI_Request req)
147 if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
148 store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
152 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
153 void addNullRequest(int src, int dst, int tag)
155 int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
156 int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
157 store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
161 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 CHECK_ACTION_PARAMS(action, 3, 0)
164 src = std::stoi(action[2]);
165 dst = std::stoi(action[3]);
166 tag = std::stoi(action[4]);
169 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
171 CHECK_ACTION_PARAMS(action, 3, 1)
172 partner = std::stoi(action[2]);
173 tag = std::stoi(action[3]);
174 size = parse_integer<size_t>(action[4]);
175 datatype1 = parse_datatype(action, 5);
178 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
180 CHECK_ACTION_PARAMS(action, 1, 0)
181 flops = parse_double(action[2]);
184 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
186 CHECK_ACTION_PARAMS(action, 1, 0)
187 time = parse_double(action[2]);
190 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
192 CHECK_ACTION_PARAMS(action, 2, 0)
193 filename = std::string(action[2]);
194 line = std::stoi(action[3]);
197 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
199 CHECK_ACTION_PARAMS(action, 1, 2)
200 size = parse_integer<size_t>(action[2]);
201 root = parse_root(action, 3);
202 datatype1 = parse_datatype(action, 4);
205 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
207 CHECK_ACTION_PARAMS(action, 2, 2)
208 comm_size = parse_integer<unsigned>(action[2]);
209 comp_size = parse_double(action[3]);
210 root = parse_root(action, 4);
211 datatype1 = parse_datatype(action, 5);
214 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
216 CHECK_ACTION_PARAMS(action, 2, 1)
217 comm_size = parse_integer<unsigned>(action[2]);
218 comp_size = parse_double(action[3]);
219 datatype1 = parse_datatype(action, 4);
222 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
224 CHECK_ACTION_PARAMS(action, 2, 1)
225 comm_size = MPI_COMM_WORLD->size();
226 send_size = parse_integer<int>(action[2]);
227 recv_size = parse_integer<int>(action[3]);
228 datatype1 = parse_datatype(action, 4);
229 datatype2 = parse_datatype(action, 5);
232 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
234 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
237 1) 68 is the sendcounts
238 2) 68 is the recvcounts
239 3) 0 is the root node
240 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
241 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
243 CHECK_ACTION_PARAMS(action, 2, 3)
244 comm_size = MPI_COMM_WORLD->size();
245 send_size = parse_integer<int>(action[2]);
246 recv_size = parse_integer<int>(action[3]);
248 if (name == "gather") {
249 root = parse_root(action, 4);
250 datatype1 = parse_datatype(action, 5);
251 datatype2 = parse_datatype(action, 6);
254 datatype1 = parse_datatype(action, 4);
255 datatype2 = parse_datatype(action, 5);
259 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
261 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
262 0 gather 68 68 10 10 10 0 0 0
264 1) 68 is the sendcount
265 2) 68 10 10 10 is the recvcounts
266 3) 0 is the root node
267 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
268 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
270 comm_size = MPI_COMM_WORLD->size();
271 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
272 send_size = parse_integer<int>(action[2]);
273 disps = std::vector<int>(comm_size, 0);
274 recvcounts = std::make_shared<std::vector<int>>(comm_size);
276 if (name == "gatherv") {
277 root = parse_root(action, 3 + comm_size);
278 datatype1 = parse_datatype(action, 4 + comm_size);
279 datatype2 = parse_datatype(action, 5 + comm_size);
282 unsigned disp_index = 0;
283 /* The 3 comes from "0 gather <sendcount>", which must always be present.
284 * The + comm_size is the recvcounts array, which must also be present
286 if (action.size() > 3 + comm_size + comm_size) {
287 // datatype + disp are specified
288 datatype1 = parse_datatype(action, 3 + comm_size);
289 datatype2 = parse_datatype(action, 4 + comm_size);
290 disp_index = 5 + comm_size;
291 } else if (action.size() > 3 + comm_size + 2) {
292 // disps specified; datatype is not specified; use the default one
293 datatype1 = MPI_DEFAULT_TYPE;
294 datatype2 = MPI_DEFAULT_TYPE;
295 disp_index = 3 + comm_size;
297 // no disp specified, maybe only datatype,
298 datatype1 = parse_datatype(action, 3 + comm_size);
299 datatype2 = parse_datatype(action, 4 + comm_size);
302 if (disp_index != 0) {
303 xbt_assert(disp_index + comm_size <= action.size());
304 for (unsigned i = 0; i < comm_size; i++)
305 disps[i] = std::stoi(action[disp_index + i]);
309 for (unsigned int i = 0; i < comm_size; i++) {
310 (*recvcounts)[i] = std::stoi(action[i + 3]);
312 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
315 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
317 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
320 1) 68 is the sendcounts
321 2) 68 is the recvcounts
322 3) 0 is the root node
323 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
324 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
326 comm_size = MPI_COMM_WORLD->size();
327 CHECK_ACTION_PARAMS(action, 2, 3)
328 comm_size = MPI_COMM_WORLD->size();
329 send_size = parse_integer<int>(action[2]);
330 recv_size = parse_integer<int>(action[3]);
331 root = parse_root(action, 4);
332 datatype1 = parse_datatype(action, 5);
333 datatype2 = parse_datatype(action, 6);
336 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
338 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
339 0 gather 68 10 10 10 68 0 0 0
341 1) 68 10 10 10 is the sendcounts
342 2) 68 is the recvcount
343 3) 0 is the root node
344 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
345 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
347 comm_size = MPI_COMM_WORLD->size();
348 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
349 recv_size = parse_integer<int>(action[2 + comm_size]);
350 disps = std::vector<int>(comm_size, 0);
351 sendcounts = std::make_shared<std::vector<int>>(comm_size);
353 root = parse_root(action, 3 + comm_size);
354 datatype1 = parse_datatype(action, 4 + comm_size);
355 datatype2 = parse_datatype(action, 5 + comm_size);
357 for (unsigned int i = 0; i < comm_size; i++) {
358 (*sendcounts)[i] = std::stoi(action[i + 2]);
360 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
363 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
365 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
366 0 reducescatter 275427 275427 275427 204020 11346849 0
368 1) The first four values after the name of the action declare the recvcounts array
369 2) The value 11346849 is the amount of instructions
370 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
372 comm_size = MPI_COMM_WORLD->size();
373 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
374 comp_size = parse_double(action[2 + comm_size]);
375 recvcounts = std::make_shared<std::vector<int>>(comm_size);
376 datatype1 = parse_datatype(action, 3 + comm_size);
378 for (unsigned int i = 0; i < comm_size; i++) {
379 (*recvcounts)[i]= std::stoi(action[i + 2]);
381 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
384 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
386 CHECK_ACTION_PARAMS(action, 2, 1)
387 size = parse_integer<size_t>(action[2]);
388 comp_size = parse_double(action[3]);
389 datatype1 = parse_datatype(action, 4);
392 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
394 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
395 0 alltoallv 100 1 7 10 12 100 1 70 10 5
397 1) 100 is the size of the send buffer *sizeof(int),
398 2) 1 7 10 12 is the sendcounts array
399 3) 100*sizeof(int) is the size of the receiver buffer
400 4) 1 70 10 5 is the recvcounts array
402 comm_size = MPI_COMM_WORLD->size();
403 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
404 sendcounts = std::make_shared<std::vector<int>>(comm_size);
405 recvcounts = std::make_shared<std::vector<int>>(comm_size);
406 senddisps = std::vector<int>(comm_size, 0);
407 recvdisps = std::vector<int>(comm_size, 0);
409 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
410 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
412 send_buf_size = parse_integer<int>(action[2]);
413 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
414 for (unsigned int i = 0; i < comm_size; i++) {
415 (*sendcounts)[i] = std::stoi(action[3 + i]);
416 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
418 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
419 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
422 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
424 std::string s = boost::algorithm::join(action, " ");
425 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
426 const WaitTestParser& args = get_args();
427 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
429 if (request == MPI_REQUEST_NULL) {
430 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
435 // Must be taken before Request::wait() since the request may be set to
436 // MPI_REQUEST_NULL by Request::wait!
437 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
439 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
442 Request::wait(&request, &status);
443 if(request!=MPI_REQUEST_NULL)
444 Request::unref(&request);
445 TRACE_smpi_comm_out(get_pid());
446 if (is_wait_for_receive)
447 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
450 void SendAction::kernel(simgrid::xbt::ReplayAction&)
452 const SendRecvParser& args = get_args();
453 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
457 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
458 if (not TRACE_smpi_view_internals())
459 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
461 if (get_name() == "send") {
462 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
463 } else if (get_name() == "isend") {
464 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
465 req_storage.add(request);
467 xbt_die("Don't know this action, %s", get_name().c_str());
470 TRACE_smpi_comm_out(get_pid());
473 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
475 const SendRecvParser& args = get_args();
478 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
481 // unknown size from the receiver point of view
482 size_t arg_size = args.size;
484 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
485 arg_size = status.count;
488 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
489 if (get_name() == "recv") {
491 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
492 } else if (get_name() == "irecv") {
493 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
494 req_storage.add(request);
499 TRACE_smpi_comm_out(get_pid());
500 if (is_recv && not TRACE_smpi_view_internals()) {
501 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
502 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
506 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
508 const ComputeParser& args = get_args();
509 if (smpi_cfg_simulate_computation()) {
510 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
514 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
516 const SleepParser& args = get_args();
517 XBT_DEBUG("Sleep for: %lf secs", args.time);
518 aid_t pid = simgrid::s4u::this_actor::get_pid();
519 TRACE_smpi_sleeping_in(pid, args.time);
520 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
521 TRACE_smpi_sleeping_out(pid);
524 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
526 const LocationParser& args = get_args();
527 smpi_trace_set_call_location(args.filename.c_str(), args.line);
530 void TestAction::kernel(simgrid::xbt::ReplayAction&)
532 const WaitTestParser& args = get_args();
533 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
534 // if request is null here, this may mean that a previous test has succeeded
535 // Different times in traced application and replayed version may lead to this
536 // In this case, ignore the extra calls.
537 if (request != MPI_REQUEST_NULL) {
538 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
542 Request::test(&request, &status, &flag);
544 XBT_DEBUG("MPI_Test result: %d", flag);
545 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
547 if (request == MPI_REQUEST_NULL)
548 req_storage.addNullRequest(args.src, args.dst, args.tag);
550 req_storage.add(request);
552 TRACE_smpi_comm_out(get_pid());
556 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
558 CHECK_ACTION_PARAMS(action, 0, 1)
559 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
560 : MPI_BYTE; // default TAU datatype
562 /* start a simulated timer */
563 smpi_process()->simulated_start();
566 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
571 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
573 if (req_storage.size() > 0) {
574 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
575 std::vector<MPI_Request> reqs;
576 req_storage.get_requests(reqs);
577 unsigned long count_requests = reqs.size();
578 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
579 for (auto const& req : reqs) {
580 if (req && (req->flags() & MPI_REQ_RECV)) {
581 sender_receiver.emplace_back(req->src(), req->dst());
584 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
585 req_storage.get_store().clear();
587 for (MPI_Request& req : reqs)
588 if (req != MPI_REQUEST_NULL)
589 Request::unref(&req);
591 for (auto const& pair : sender_receiver) {
592 TRACE_smpi_recv(pair.first, pair.second, 0);
594 TRACE_smpi_comm_out(get_pid());
598 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
600 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
601 colls::barrier(MPI_COMM_WORLD);
602 TRACE_smpi_comm_out(get_pid());
605 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
607 const BcastArgParser& args = get_args();
608 TRACE_smpi_comm_in(get_pid(), "action_bcast",
609 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
610 0, Datatype::encode(args.datatype1), ""));
612 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
614 TRACE_smpi_comm_out(get_pid());
617 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
619 const ReduceArgParser& args = get_args();
620 TRACE_smpi_comm_in(get_pid(), "action_reduce",
621 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
622 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
624 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
625 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
626 args.root, MPI_COMM_WORLD);
627 if (args.comp_size != 0.0)
628 simgrid::s4u::this_actor::exec_init(args.comp_size)
629 ->set_name("computation")
633 TRACE_smpi_comm_out(get_pid());
636 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
638 const AllReduceArgParser& args = get_args();
639 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
640 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
641 Datatype::encode(args.datatype1), ""));
643 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
644 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
646 if (args.comp_size != 0.0)
647 simgrid::s4u::this_actor::exec_init(args.comp_size)
648 ->set_name("computation")
652 TRACE_smpi_comm_out(get_pid());
655 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
657 const AllToAllArgParser& args = get_args();
658 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
659 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
660 Datatype::encode(args.datatype1),
661 Datatype::encode(args.datatype2)));
663 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
664 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
667 TRACE_smpi_comm_out(get_pid());
670 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
672 const GatherArgParser& args = get_args();
673 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
674 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
675 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
676 Datatype::encode(args.datatype2)));
678 if (get_name() == "gather") {
679 int rank = MPI_COMM_WORLD->rank();
680 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
681 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
682 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
684 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
685 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
688 TRACE_smpi_comm_out(get_pid());
691 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
693 int rank = MPI_COMM_WORLD->rank();
694 const GatherVArgParser& args = get_args();
695 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
696 new simgrid::instr::VarCollTIData(
697 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
698 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
700 if (get_name() == "gatherv") {
701 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
702 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
703 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
705 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
706 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
707 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
710 TRACE_smpi_comm_out(get_pid());
713 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
715 int rank = MPI_COMM_WORLD->rank();
716 const ScatterArgParser& args = get_args();
717 TRACE_smpi_comm_in(get_pid(), "action_scatter",
718 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
719 Datatype::encode(args.datatype1),
720 Datatype::encode(args.datatype2)));
722 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
723 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
724 args.datatype2, args.root, MPI_COMM_WORLD);
726 TRACE_smpi_comm_out(get_pid());
729 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
731 int rank = MPI_COMM_WORLD->rank();
732 const ScatterVArgParser& args = get_args();
733 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
734 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
735 nullptr, Datatype::encode(args.datatype1),
736 Datatype::encode(args.datatype2)));
738 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
739 args.sendcounts->data(), args.disps.data(), args.datatype1,
740 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
743 TRACE_smpi_comm_out(get_pid());
746 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
748 const ReduceScatterArgParser& args = get_args();
750 get_pid(), "action_reducescatter",
751 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
752 /* ugly as we use datatype field to pass computation as string */
753 /* and because of the trick to avoid getting 0.000000 when 0 is given */
754 args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
755 Datatype::encode(args.datatype1)));
757 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
758 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
759 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
760 if (args.comp_size != 0.0)
761 simgrid::s4u::this_actor::exec_init(args.comp_size)
762 ->set_name("computation")
765 TRACE_smpi_comm_out(get_pid());
768 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
770 const ScanArgParser& args = get_args();
771 TRACE_smpi_comm_in(get_pid(), "action_scan",
772 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
773 args.size, 0, Datatype::encode(args.datatype1), ""));
774 if (get_name() == "scan")
775 colls::scan(send_buffer(args.size * args.datatype1->size()),
776 recv_buffer(args.size * args.datatype1->size()), args.size,
777 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
779 colls::exscan(send_buffer(args.size * args.datatype1->size()),
780 recv_buffer(args.size * args.datatype1->size()), args.size,
781 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
783 if (args.comp_size != 0.0)
784 simgrid::s4u::this_actor::exec_init(args.comp_size)
785 ->set_name("computation")
788 TRACE_smpi_comm_out(get_pid());
791 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
793 const AllToAllVArgParser& args = get_args();
794 TRACE_smpi_comm_in(get_pid(), __func__,
795 new simgrid::instr::VarCollTIData(
796 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
797 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
799 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
800 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
801 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
803 TRACE_smpi_comm_out(get_pid());
805 } // Replay Namespace
806 }} // namespace simgrid::smpi
808 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
809 /** @brief Only initialize the replay, don't do it for real */
810 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
812 xbt_assert(not smpi_process()->initializing());
814 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
815 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
816 simgrid::smpi::ActorExt::init();
818 smpi_process()->mark_as_initialized();
819 smpi_process()->set_replaying(true);
821 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
822 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
823 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
824 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
825 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
826 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
827 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
828 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
829 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
830 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
831 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
832 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
833 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
834 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
835 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
836 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
837 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
838 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
839 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
840 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
841 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
842 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
843 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
844 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
845 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
846 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
847 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
848 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
849 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
850 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
851 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
853 //if we have a delayed start, sleep here.
854 if (start_delay_flops > 0) {
855 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
856 private_execute_flops(start_delay_flops);
858 // Wait for the other actors to initialize also
859 simgrid::s4u::this_actor::yield();
861 if(_smpi_init_sleep > 0)
862 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
865 /** @brief actually run the replay after initialization */
866 void smpi_replay_main(int rank, const char* private_trace_filename)
868 static int active_processes = 0;
870 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
871 std::string rank_string = std::to_string(rank);
872 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
874 /* and now, finalize everything */
875 /* One active process will stop. Decrease the counter*/
876 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
877 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
878 if (count_requests > 0) {
879 std::vector<MPI_Request> requests(count_requests);
882 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
883 for (auto& req: pair.second){
888 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
891 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
892 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
896 if(active_processes==0){
897 /* Last process alive speaking: end the simulated timer */
898 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
899 smpi_free_replay_tmp_buffers();
902 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
903 new simgrid::instr::NoOpTIData("finalize"));
905 smpi_process()->finalize();
907 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
910 /** @brief chain a replay initialization and a replay start */
911 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
913 smpi_replay_init(instance_id, rank, start_delay_flops);
914 smpi_replay_main(rank, private_trace_filename);