1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "smpi_config.hpp"
12 #include "simgrid/s4u/Exec.hpp"
13 #include "xbt/replay.hpp"
14 #include <simgrid/smpi/smpi_replay.hpp>
15 #include <src/smpi/include/private.hpp>
22 #include <unordered_map>
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
33 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
38 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
44 static void apply(size_t& seed, Tuple const& tuple)
46 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47 hash_combine(seed, std::get<Index>(tuple));
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
53 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
56 template <typename... TT> class hash<std::tuple<TT...>> {
58 size_t operator()(std::tuple<TT...> const& tt) const
61 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
75 /* Helper functions */
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "not a double");
81 template <typename T> static T parse_integer(const std::string& string)
83 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85 val <= static_cast<double>(std::numeric_limits<T>::max()),
86 "out of range: %g", val);
87 return static_cast<T>(val);
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
92 return i < action.size() ? std::stoi(action[i]) : 0;
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
97 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
104 MPI_Datatype MPI_DEFAULT_TYPE;
106 class RequestStorage {
108 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
109 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
114 RequestStorage() = default;
115 size_t size() const { return store.size(); }
117 req_storage_t& get_store() { return store; }
119 void get_requests(std::vector<MPI_Request>& vec) const
121 for (auto const& pair : store) {
122 auto& req = pair.second;
123 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
124 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
125 vec.push_back(pair.second);
126 pair.second->print_request("MM");
131 MPI_Request find(int src, int dst, int tag)
133 auto it = store.find(req_key_t(src, dst, tag));
134 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
137 void remove(const Request* req)
139 if (req == MPI_REQUEST_NULL) return;
141 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
144 void add(MPI_Request req)
146 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
147 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
150 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
151 void addNullRequest(int src, int dst, int tag)
153 store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
158 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
160 CHECK_ACTION_PARAMS(action, 3, 0)
161 src = std::stoi(action[2]);
162 dst = std::stoi(action[3]);
163 tag = std::stoi(action[4]);
166 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
168 CHECK_ACTION_PARAMS(action, 3, 1)
169 partner = std::stoi(action[2]);
170 tag = std::stoi(action[3]);
171 size = parse_integer<size_t>(action[4]);
172 datatype1 = parse_datatype(action, 5);
175 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
177 CHECK_ACTION_PARAMS(action, 1, 0)
178 flops = parse_double(action[2]);
181 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
183 CHECK_ACTION_PARAMS(action, 1, 0)
184 time = parse_double(action[2]);
187 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
189 CHECK_ACTION_PARAMS(action, 2, 0)
190 filename = std::string(action[2]);
191 line = std::stoi(action[3]);
194 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
196 CHECK_ACTION_PARAMS(action, 1, 2)
197 size = parse_integer<size_t>(action[2]);
198 root = parse_root(action, 3);
199 datatype1 = parse_datatype(action, 4);
202 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
204 CHECK_ACTION_PARAMS(action, 2, 2)
205 comm_size = parse_integer<unsigned>(action[2]);
206 comp_size = parse_double(action[3]);
207 root = parse_root(action, 4);
208 datatype1 = parse_datatype(action, 5);
211 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
213 CHECK_ACTION_PARAMS(action, 2, 1)
214 comm_size = parse_integer<unsigned>(action[2]);
215 comp_size = parse_double(action[3]);
216 datatype1 = parse_datatype(action, 4);
219 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
221 CHECK_ACTION_PARAMS(action, 2, 1)
222 comm_size = MPI_COMM_WORLD->size();
223 send_size = parse_integer<int>(action[2]);
224 recv_size = parse_integer<int>(action[3]);
225 datatype1 = parse_datatype(action, 4);
226 datatype2 = parse_datatype(action, 5);
229 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
231 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
234 1) 68 is the sendcounts
235 2) 68 is the recvcounts
236 3) 0 is the root node
237 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
238 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
240 CHECK_ACTION_PARAMS(action, 2, 3)
241 comm_size = MPI_COMM_WORLD->size();
242 send_size = parse_integer<int>(action[2]);
243 recv_size = parse_integer<int>(action[3]);
245 if (name == "gather") {
246 root = parse_root(action, 4);
247 datatype1 = parse_datatype(action, 5);
248 datatype2 = parse_datatype(action, 6);
251 datatype1 = parse_datatype(action, 4);
252 datatype2 = parse_datatype(action, 5);
256 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
258 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
259 0 gather 68 68 10 10 10 0 0 0
261 1) 68 is the sendcount
262 2) 68 10 10 10 is the recvcounts
263 3) 0 is the root node
264 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
265 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
267 comm_size = MPI_COMM_WORLD->size();
268 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
269 send_size = parse_integer<int>(action[2]);
270 disps = std::vector<int>(comm_size, 0);
271 recvcounts = std::make_shared<std::vector<int>>(comm_size);
273 if (name == "gatherv") {
274 root = parse_root(action, 3 + comm_size);
275 datatype1 = parse_datatype(action, 4 + comm_size);
276 datatype2 = parse_datatype(action, 5 + comm_size);
279 unsigned disp_index = 0;
280 /* The 3 comes from "0 gather <sendcount>", which must always be present.
281 * The + comm_size is the recvcounts array, which must also be present
283 if (action.size() > 3 + comm_size + comm_size) {
284 // datatype + disp are specified
285 datatype1 = parse_datatype(action, 3 + comm_size);
286 datatype2 = parse_datatype(action, 4 + comm_size);
287 disp_index = 5 + comm_size;
288 } else if (action.size() > 3 + comm_size + 2) {
289 // disps specified; datatype is not specified; use the default one
290 datatype1 = MPI_DEFAULT_TYPE;
291 datatype2 = MPI_DEFAULT_TYPE;
292 disp_index = 3 + comm_size;
294 // no disp specified, maybe only datatype,
295 datatype1 = parse_datatype(action, 3 + comm_size);
296 datatype2 = parse_datatype(action, 4 + comm_size);
299 if (disp_index != 0) {
300 xbt_assert(disp_index + comm_size <= action.size());
301 for (unsigned i = 0; i < comm_size; i++)
302 disps[i] = std::stoi(action[disp_index + i]);
306 for (unsigned int i = 0; i < comm_size; i++) {
307 (*recvcounts)[i] = std::stoi(action[i + 3]);
309 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
312 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
314 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
317 1) 68 is the sendcounts
318 2) 68 is the recvcounts
319 3) 0 is the root node
320 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
321 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
323 comm_size = MPI_COMM_WORLD->size();
324 CHECK_ACTION_PARAMS(action, 2, 3)
325 comm_size = MPI_COMM_WORLD->size();
326 send_size = parse_integer<int>(action[2]);
327 recv_size = parse_integer<int>(action[3]);
328 root = parse_root(action, 4);
329 datatype1 = parse_datatype(action, 5);
330 datatype2 = parse_datatype(action, 6);
333 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
335 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
336 0 gather 68 10 10 10 68 0 0 0
338 1) 68 10 10 10 is the sendcounts
339 2) 68 is the recvcount
340 3) 0 is the root node
341 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
342 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
344 comm_size = MPI_COMM_WORLD->size();
345 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
346 recv_size = parse_integer<int>(action[2 + comm_size]);
347 disps = std::vector<int>(comm_size, 0);
348 sendcounts = std::make_shared<std::vector<int>>(comm_size);
350 root = parse_root(action, 3 + comm_size);
351 datatype1 = parse_datatype(action, 4 + comm_size);
352 datatype2 = parse_datatype(action, 5 + comm_size);
354 for (unsigned int i = 0; i < comm_size; i++) {
355 (*sendcounts)[i] = std::stoi(action[i + 2]);
357 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
360 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
362 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
363 0 reducescatter 275427 275427 275427 204020 11346849 0
365 1) The first four values after the name of the action declare the recvcounts array
366 2) The value 11346849 is the amount of instructions
367 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
369 comm_size = MPI_COMM_WORLD->size();
370 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
371 comp_size = parse_double(action[2 + comm_size]);
372 recvcounts = std::make_shared<std::vector<int>>(comm_size);
373 datatype1 = parse_datatype(action, 3 + comm_size);
375 for (unsigned int i = 0; i < comm_size; i++) {
376 (*recvcounts)[i]= std::stoi(action[i + 2]);
378 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
381 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
383 CHECK_ACTION_PARAMS(action, 2, 1)
384 size = parse_integer<size_t>(action[2]);
385 comp_size = parse_double(action[3]);
386 datatype1 = parse_datatype(action, 4);
389 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
391 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
392 0 alltoallv 100 1 7 10 12 100 1 70 10 5
394 1) 100 is the size of the send buffer *sizeof(int),
395 2) 1 7 10 12 is the sendcounts array
396 3) 100*sizeof(int) is the size of the receiver buffer
397 4) 1 70 10 5 is the recvcounts array
399 comm_size = MPI_COMM_WORLD->size();
400 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
401 sendcounts = std::make_shared<std::vector<int>>(comm_size);
402 recvcounts = std::make_shared<std::vector<int>>(comm_size);
403 senddisps = std::vector<int>(comm_size, 0);
404 recvdisps = std::vector<int>(comm_size, 0);
406 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
407 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
409 send_buf_size = parse_integer<int>(action[2]);
410 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
411 for (unsigned int i = 0; i < comm_size; i++) {
412 (*sendcounts)[i] = std::stoi(action[3 + i]);
413 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
415 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
416 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
419 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
421 std::string s = boost::algorithm::join(action, " ");
422 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
423 const WaitTestParser& args = get_args();
424 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
425 req_storage.remove(request);
427 if (request == MPI_REQUEST_NULL) {
428 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
433 // Must be taken before Request::wait() since the request may be set to
434 // MPI_REQUEST_NULL by Request::wait!
435 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
437 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
440 Request::wait(&request, &status);
442 TRACE_smpi_comm_out(get_pid());
443 if (is_wait_for_receive)
444 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
447 void SendAction::kernel(simgrid::xbt::ReplayAction&)
449 const SendRecvParser& args = get_args();
450 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
454 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
455 if (not TRACE_smpi_view_internals())
456 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
458 if (get_name() == "send") {
459 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
460 } else if (get_name() == "isend") {
461 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
462 req_storage.add(request);
464 xbt_die("Don't know this action, %s", get_name().c_str());
467 TRACE_smpi_comm_out(get_pid());
470 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
472 const SendRecvParser& args = get_args();
475 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
478 // unknown size from the receiver point of view
479 size_t arg_size = args.size;
481 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
482 arg_size = status.count;
485 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
486 if (get_name() == "recv") {
488 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
489 } else if (get_name() == "irecv") {
490 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
491 req_storage.add(request);
496 TRACE_smpi_comm_out(get_pid());
497 if (is_recv && not TRACE_smpi_view_internals()) {
498 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
499 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
503 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
505 const ComputeParser& args = get_args();
506 if (smpi_cfg_simulate_computation()) {
507 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
511 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
513 const SleepParser& args = get_args();
514 XBT_DEBUG("Sleep for: %lf secs", args.time);
515 aid_t pid = simgrid::s4u::this_actor::get_pid();
516 TRACE_smpi_sleeping_in(pid, args.time);
517 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
518 TRACE_smpi_sleeping_out(pid);
521 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
523 const LocationParser& args = get_args();
524 smpi_trace_set_call_location(args.filename.c_str(), args.line);
527 void TestAction::kernel(simgrid::xbt::ReplayAction&)
529 const WaitTestParser& args = get_args();
530 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
531 req_storage.remove(request);
532 // if request is null here, this may mean that a previous test has succeeded
533 // Different times in traced application and replayed version may lead to this
534 // In this case, ignore the extra calls.
535 if (request != MPI_REQUEST_NULL) {
536 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
540 Request::test(&request, &status, &flag);
542 XBT_DEBUG("MPI_Test result: %d", flag);
543 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
545 if (request == MPI_REQUEST_NULL)
546 req_storage.addNullRequest(args.src, args.dst, args.tag);
548 req_storage.add(request);
550 TRACE_smpi_comm_out(get_pid());
554 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
556 CHECK_ACTION_PARAMS(action, 0, 1)
557 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
558 : MPI_BYTE; // default TAU datatype
560 /* start a simulated timer */
561 smpi_process()->simulated_start();
564 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
569 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
571 const size_t count_requests = req_storage.size();
573 if (count_requests > 0) {
574 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
575 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
576 std::vector<MPI_Request> reqs;
577 req_storage.get_requests(reqs);
578 for (auto const& req : reqs) {
579 if (req && (req->flags() & MPI_REQ_RECV)) {
580 sender_receiver.emplace_back(req->src(), req->dst());
583 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
584 req_storage.get_store().clear();
586 for (auto const& pair : sender_receiver) {
587 TRACE_smpi_recv(pair.first, pair.second, 0);
589 TRACE_smpi_comm_out(get_pid());
593 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
595 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
596 colls::barrier(MPI_COMM_WORLD);
597 TRACE_smpi_comm_out(get_pid());
600 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
602 const BcastArgParser& args = get_args();
603 TRACE_smpi_comm_in(get_pid(), "action_bcast",
604 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
605 0, Datatype::encode(args.datatype1), ""));
607 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
609 TRACE_smpi_comm_out(get_pid());
612 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
614 const ReduceArgParser& args = get_args();
615 TRACE_smpi_comm_in(get_pid(), "action_reduce",
616 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
617 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
619 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
620 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
621 args.root, MPI_COMM_WORLD);
622 if (args.comp_size != 0.0)
623 simgrid::s4u::this_actor::exec_init(args.comp_size)
624 ->set_name("computation")
628 TRACE_smpi_comm_out(get_pid());
631 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
633 const AllReduceArgParser& args = get_args();
634 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
635 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
636 Datatype::encode(args.datatype1), ""));
638 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
639 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
641 if (args.comp_size != 0.0)
642 simgrid::s4u::this_actor::exec_init(args.comp_size)
643 ->set_name("computation")
647 TRACE_smpi_comm_out(get_pid());
650 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
652 const AllToAllArgParser& args = get_args();
653 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
654 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
655 Datatype::encode(args.datatype1),
656 Datatype::encode(args.datatype2)));
658 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
659 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
662 TRACE_smpi_comm_out(get_pid());
665 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
667 const GatherArgParser& args = get_args();
668 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
669 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
670 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
671 Datatype::encode(args.datatype2)));
673 if (get_name() == "gather") {
674 int rank = MPI_COMM_WORLD->rank();
675 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
676 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
677 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
679 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
680 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
683 TRACE_smpi_comm_out(get_pid());
686 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
688 int rank = MPI_COMM_WORLD->rank();
689 const GatherVArgParser& args = get_args();
690 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
691 new simgrid::instr::VarCollTIData(
692 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
693 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
695 if (get_name() == "gatherv") {
696 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
698 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
700 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
702 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
705 TRACE_smpi_comm_out(get_pid());
708 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
710 int rank = MPI_COMM_WORLD->rank();
711 const ScatterArgParser& args = get_args();
712 TRACE_smpi_comm_in(get_pid(), "action_scatter",
713 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
714 Datatype::encode(args.datatype1),
715 Datatype::encode(args.datatype2)));
717 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
718 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
719 args.datatype2, args.root, MPI_COMM_WORLD);
721 TRACE_smpi_comm_out(get_pid());
724 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
726 int rank = MPI_COMM_WORLD->rank();
727 const ScatterVArgParser& args = get_args();
728 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
729 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
730 nullptr, Datatype::encode(args.datatype1),
731 Datatype::encode(args.datatype2)));
733 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
734 args.sendcounts->data(), args.disps.data(), args.datatype1,
735 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
738 TRACE_smpi_comm_out(get_pid());
741 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
743 const ReduceScatterArgParser& args = get_args();
745 get_pid(), "action_reducescatter",
746 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
747 std::to_string(args.comp_size),
748 Datatype::encode(args.datatype1)));
750 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
751 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
752 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
753 if (args.comp_size != 0.0)
754 simgrid::s4u::this_actor::exec_init(args.comp_size)
755 ->set_name("computation")
758 TRACE_smpi_comm_out(get_pid());
761 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
763 const ScanArgParser& args = get_args();
764 TRACE_smpi_comm_in(get_pid(), "action_scan",
765 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
766 args.size, 0, Datatype::encode(args.datatype1), ""));
767 if (get_name() == "scan")
768 colls::scan(send_buffer(args.size * args.datatype1->size()),
769 recv_buffer(args.size * args.datatype1->size()), args.size,
770 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
772 colls::exscan(send_buffer(args.size * args.datatype1->size()),
773 recv_buffer(args.size * args.datatype1->size()), args.size,
774 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
776 if (args.comp_size != 0.0)
777 simgrid::s4u::this_actor::exec_init(args.comp_size)
778 ->set_name("computation")
781 TRACE_smpi_comm_out(get_pid());
784 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
786 const AllToAllVArgParser& args = get_args();
787 TRACE_smpi_comm_in(get_pid(), __func__,
788 new simgrid::instr::VarCollTIData(
789 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
790 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
792 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
793 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
794 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
796 TRACE_smpi_comm_out(get_pid());
798 } // Replay Namespace
799 }} // namespace simgrid::smpi
801 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
802 /** @brief Only initialize the replay, don't do it for real */
803 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
805 xbt_assert(not smpi_process()->initializing());
807 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
808 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
809 simgrid::smpi::ActorExt::init();
811 smpi_process()->mark_as_initialized();
812 smpi_process()->set_replaying(true);
814 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
815 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
816 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
817 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
818 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
819 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
821 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
822 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
823 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
824 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
825 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
826 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
827 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
828 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
829 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
830 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
831 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
832 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
833 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
834 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
835 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
836 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
837 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
838 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
839 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
840 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
841 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
842 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
843 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
844 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
846 //if we have a delayed start, sleep here.
847 if (start_delay_flops > 0) {
848 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
849 private_execute_flops(start_delay_flops);
851 // Wait for the other actors to initialize also
852 simgrid::s4u::this_actor::yield();
854 if(_smpi_init_sleep > 0)
855 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
858 /** @brief actually run the replay after initialization */
859 void smpi_replay_main(int rank, const char* private_trace_filename)
861 static int active_processes = 0;
863 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
864 std::string rank_string = std::to_string(rank);
865 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
867 /* and now, finalize everything */
868 /* One active process will stop. Decrease the counter*/
869 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
870 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
871 if (count_requests > 0) {
872 std::vector<MPI_Request> requests(count_requests);
875 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
876 requests[i] = pair.second;
879 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
882 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
883 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
887 if(active_processes==0){
888 /* Last process alive speaking: end the simulated timer */
889 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
890 smpi_free_replay_tmp_buffers();
893 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
894 new simgrid::instr::NoOpTIData("finalize"));
896 smpi_process()->finalize();
898 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
901 /** @brief chain a replay initialization and a replay start */
902 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
904 smpi_replay_init(instance_id, rank, start_delay_flops);
905 smpi_replay_main(rank, private_trace_filename);