1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
22 #include <unordered_map>
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
26 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
27 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
28 // this could go into a header file.
29 namespace hash_tuple {
30 template <typename TT> class hash {
32 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
35 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
37 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
40 // Recursive template code derived from Matthieu M.
41 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
43 static void apply(size_t& seed, Tuple const& tuple)
45 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
46 hash_combine(seed, std::get<Index>(tuple));
50 template <class Tuple> class HashValueImpl<Tuple, 0> {
52 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
55 template <typename... TT> class hash<std::tuple<TT...>> {
57 size_t operator()(std::tuple<TT...> const& tt) const
60 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
66 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
69 std::string s = boost::algorithm::join(action, " ");
70 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
74 /* Helper functions */
75 static double parse_double(const std::string& string)
77 return xbt_str_parse_double(string.c_str(), "not a double");
80 template <typename T> static T parse_integer(const std::string& string)
82 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
83 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
84 val <= static_cast<double>(std::numeric_limits<T>::max()),
85 "out of range: %g", val);
86 return static_cast<T>(val);
89 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
91 return i < action.size() ? std::stoi(action[i]) : 0;
94 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
96 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
103 MPI_Datatype MPI_DEFAULT_TYPE;
105 class RequestStorage {
107 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
108 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
113 RequestStorage() = default;
114 size_t size() const { return store.size(); }
116 req_storage_t& get_store() { return store; }
118 void get_requests(std::vector<MPI_Request>& vec) const
120 for (auto const& pair : store) {
121 auto& req = pair.second;
122 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
123 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
124 vec.push_back(pair.second);
125 pair.second->print_request("MM");
130 MPI_Request find(int src, int dst, int tag)
132 auto it = store.find(req_key_t(src, dst, tag));
133 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
136 void remove(const Request* req)
138 if (req == MPI_REQUEST_NULL) return;
140 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
143 void add(MPI_Request req)
145 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
146 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
149 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
150 void addNullRequest(int src, int dst, int tag)
152 store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
157 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
159 CHECK_ACTION_PARAMS(action, 3, 0)
160 src = std::stoi(action[2]);
161 dst = std::stoi(action[3]);
162 tag = std::stoi(action[4]);
165 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
167 CHECK_ACTION_PARAMS(action, 3, 1)
168 partner = std::stoi(action[2]);
169 tag = std::stoi(action[3]);
170 size = parse_integer<size_t>(action[4]);
171 datatype1 = parse_datatype(action, 5);
174 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 1, 0)
177 flops = parse_double(action[2]);
180 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 CHECK_ACTION_PARAMS(action, 1, 0)
183 time = parse_double(action[2]);
186 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
188 CHECK_ACTION_PARAMS(action, 2, 0)
189 filename = std::string(action[2]);
190 line = std::stoi(action[3]);
193 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
195 CHECK_ACTION_PARAMS(action, 1, 2)
196 size = parse_integer<size_t>(action[2]);
197 root = parse_root(action, 3);
198 datatype1 = parse_datatype(action, 4);
201 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
203 CHECK_ACTION_PARAMS(action, 2, 2)
204 comm_size = parse_integer<unsigned>(action[2]);
205 comp_size = parse_double(action[3]);
206 root = parse_root(action, 4);
207 datatype1 = parse_datatype(action, 5);
210 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
212 CHECK_ACTION_PARAMS(action, 2, 1)
213 comm_size = parse_integer<unsigned>(action[2]);
214 comp_size = parse_double(action[3]);
215 datatype1 = parse_datatype(action, 4);
218 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
220 CHECK_ACTION_PARAMS(action, 2, 1)
221 comm_size = MPI_COMM_WORLD->size();
222 send_size = parse_integer<int>(action[2]);
223 recv_size = parse_integer<int>(action[3]);
224 datatype1 = parse_datatype(action, 4);
225 datatype2 = parse_datatype(action, 5);
228 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
230 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
233 1) 68 is the sendcounts
234 2) 68 is the recvcounts
235 3) 0 is the root node
236 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
237 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
239 CHECK_ACTION_PARAMS(action, 2, 3)
240 comm_size = MPI_COMM_WORLD->size();
241 send_size = parse_integer<int>(action[2]);
242 recv_size = parse_integer<int>(action[3]);
244 if (name == "gather") {
245 root = parse_root(action, 4);
246 datatype1 = parse_datatype(action, 5);
247 datatype2 = parse_datatype(action, 6);
250 datatype1 = parse_datatype(action, 4);
251 datatype2 = parse_datatype(action, 5);
255 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
257 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
258 0 gather 68 68 10 10 10 0 0 0
260 1) 68 is the sendcount
261 2) 68 10 10 10 is the recvcounts
262 3) 0 is the root node
263 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
264 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
266 comm_size = MPI_COMM_WORLD->size();
267 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
268 send_size = parse_integer<int>(action[2]);
269 disps = std::vector<int>(comm_size, 0);
270 recvcounts = std::make_shared<std::vector<int>>(comm_size);
272 if (name == "gatherv") {
273 root = parse_root(action, 3 + comm_size);
274 datatype1 = parse_datatype(action, 4 + comm_size);
275 datatype2 = parse_datatype(action, 5 + comm_size);
278 unsigned disp_index = 0;
279 /* The 3 comes from "0 gather <sendcount>", which must always be present.
280 * The + comm_size is the recvcounts array, which must also be present
282 if (action.size() > 3 + comm_size + comm_size) {
283 // datatype + disp are specified
284 datatype1 = parse_datatype(action, 3 + comm_size);
285 datatype2 = parse_datatype(action, 4 + comm_size);
286 disp_index = 5 + comm_size;
287 } else if (action.size() > 3 + comm_size + 2) {
288 // disps specified; datatype is not specified; use the default one
289 datatype1 = MPI_DEFAULT_TYPE;
290 datatype2 = MPI_DEFAULT_TYPE;
291 disp_index = 3 + comm_size;
293 // no disp specified, maybe only datatype,
294 datatype1 = parse_datatype(action, 3 + comm_size);
295 datatype2 = parse_datatype(action, 4 + comm_size);
298 if (disp_index != 0) {
299 xbt_assert(disp_index + comm_size <= action.size());
300 for (unsigned i = 0; i < comm_size; i++)
301 disps[i] = std::stoi(action[disp_index + i]);
305 for (unsigned int i = 0; i < comm_size; i++) {
306 (*recvcounts)[i] = std::stoi(action[i + 3]);
308 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
311 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
313 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
316 1) 68 is the sendcounts
317 2) 68 is the recvcounts
318 3) 0 is the root node
319 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
320 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
322 comm_size = MPI_COMM_WORLD->size();
323 CHECK_ACTION_PARAMS(action, 2, 3)
324 comm_size = MPI_COMM_WORLD->size();
325 send_size = parse_integer<int>(action[2]);
326 recv_size = parse_integer<int>(action[3]);
327 root = parse_root(action, 4);
328 datatype1 = parse_datatype(action, 5);
329 datatype2 = parse_datatype(action, 6);
332 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
334 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
335 0 gather 68 10 10 10 68 0 0 0
337 1) 68 10 10 10 is the sendcounts
338 2) 68 is the recvcount
339 3) 0 is the root node
340 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
341 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
343 comm_size = MPI_COMM_WORLD->size();
344 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
345 recv_size = parse_integer<int>(action[2 + comm_size]);
346 disps = std::vector<int>(comm_size, 0);
347 sendcounts = std::make_shared<std::vector<int>>(comm_size);
349 root = parse_root(action, 3 + comm_size);
350 datatype1 = parse_datatype(action, 4 + comm_size);
351 datatype2 = parse_datatype(action, 5 + comm_size);
353 for (unsigned int i = 0; i < comm_size; i++) {
354 (*sendcounts)[i] = std::stoi(action[i + 2]);
356 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
359 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
361 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
362 0 reducescatter 275427 275427 275427 204020 11346849 0
364 1) The first four values after the name of the action declare the recvcounts array
365 2) The value 11346849 is the amount of instructions
366 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
368 comm_size = MPI_COMM_WORLD->size();
369 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
370 comp_size = parse_double(action[2 + comm_size]);
371 recvcounts = std::make_shared<std::vector<int>>(comm_size);
372 datatype1 = parse_datatype(action, 3 + comm_size);
374 for (unsigned int i = 0; i < comm_size; i++) {
375 (*recvcounts)[i]= std::stoi(action[i + 2]);
377 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
380 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
382 CHECK_ACTION_PARAMS(action, 2, 1)
383 size = parse_integer<size_t>(action[2]);
384 comp_size = parse_double(action[3]);
385 datatype1 = parse_datatype(action, 4);
388 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
390 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
391 0 alltoallv 100 1 7 10 12 100 1 70 10 5
393 1) 100 is the size of the send buffer *sizeof(int),
394 2) 1 7 10 12 is the sendcounts array
395 3) 100*sizeof(int) is the size of the receiver buffer
396 4) 1 70 10 5 is the recvcounts array
398 comm_size = MPI_COMM_WORLD->size();
399 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
400 sendcounts = std::make_shared<std::vector<int>>(comm_size);
401 recvcounts = std::make_shared<std::vector<int>>(comm_size);
402 senddisps = std::vector<int>(comm_size, 0);
403 recvdisps = std::vector<int>(comm_size, 0);
405 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
406 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
408 send_buf_size = parse_integer<int>(action[2]);
409 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
410 for (unsigned int i = 0; i < comm_size; i++) {
411 (*sendcounts)[i] = std::stoi(action[3 + i]);
412 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
414 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
415 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
418 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
420 std::string s = boost::algorithm::join(action, " ");
421 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
422 const WaitTestParser& args = get_args();
423 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
424 req_storage.remove(request);
426 if (request == MPI_REQUEST_NULL) {
427 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
432 // Must be taken before Request::wait() since the request may be set to
433 // MPI_REQUEST_NULL by Request::wait!
434 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
436 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
439 Request::wait(&request, &status);
441 TRACE_smpi_comm_out(get_pid());
442 if (is_wait_for_receive)
443 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
446 void SendAction::kernel(simgrid::xbt::ReplayAction&)
448 const SendRecvParser& args = get_args();
449 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
453 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
454 if (not TRACE_smpi_view_internals())
455 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
457 if (get_name() == "send") {
458 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
459 } else if (get_name() == "isend") {
460 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
461 req_storage.add(request);
463 xbt_die("Don't know this action, %s", get_name().c_str());
466 TRACE_smpi_comm_out(get_pid());
469 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
471 const SendRecvParser& args = get_args();
474 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
477 // unknown size from the receiver point of view
478 size_t arg_size = args.size;
480 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
481 arg_size = status.count;
484 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
485 if (get_name() == "recv") {
487 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
488 } else if (get_name() == "irecv") {
489 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
490 req_storage.add(request);
495 TRACE_smpi_comm_out(get_pid());
496 if (is_recv && not TRACE_smpi_view_internals()) {
497 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
498 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
502 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
504 const ComputeParser& args = get_args();
505 if (smpi_cfg_simulate_computation()) {
506 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
510 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
512 const SleepParser& args = get_args();
513 XBT_DEBUG("Sleep for: %lf secs", args.time);
514 aid_t pid = simgrid::s4u::this_actor::get_pid();
515 TRACE_smpi_sleeping_in(pid, args.time);
516 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
517 TRACE_smpi_sleeping_out(pid);
520 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
522 const LocationParser& args = get_args();
523 smpi_trace_set_call_location(args.filename.c_str(), args.line);
526 void TestAction::kernel(simgrid::xbt::ReplayAction&)
528 const WaitTestParser& args = get_args();
529 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
530 req_storage.remove(request);
531 // if request is null here, this may mean that a previous test has succeeded
532 // Different times in traced application and replayed version may lead to this
533 // In this case, ignore the extra calls.
534 if (request != MPI_REQUEST_NULL) {
535 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
539 Request::test(&request, &status, &flag);
541 XBT_DEBUG("MPI_Test result: %d", flag);
542 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
544 if (request == MPI_REQUEST_NULL)
545 req_storage.addNullRequest(args.src, args.dst, args.tag);
547 req_storage.add(request);
549 TRACE_smpi_comm_out(get_pid());
553 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
555 CHECK_ACTION_PARAMS(action, 0, 1)
556 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
557 : MPI_BYTE; // default TAU datatype
559 /* start a simulated timer */
560 smpi_process()->simulated_start();
563 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
568 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
570 const size_t count_requests = req_storage.size();
572 if (count_requests > 0) {
573 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
574 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
575 std::vector<MPI_Request> reqs;
576 req_storage.get_requests(reqs);
577 for (auto const& req : reqs) {
578 if (req && (req->flags() & MPI_REQ_RECV)) {
579 sender_receiver.emplace_back(req->src(), req->dst());
582 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
583 req_storage.get_store().clear();
585 for (auto const& pair : sender_receiver) {
586 TRACE_smpi_recv(pair.first, pair.second, 0);
588 TRACE_smpi_comm_out(get_pid());
592 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
594 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
595 colls::barrier(MPI_COMM_WORLD);
596 TRACE_smpi_comm_out(get_pid());
599 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
601 const BcastArgParser& args = get_args();
602 TRACE_smpi_comm_in(get_pid(), "action_bcast",
603 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
604 0, Datatype::encode(args.datatype1), ""));
606 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
608 TRACE_smpi_comm_out(get_pid());
611 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
613 const ReduceArgParser& args = get_args();
614 TRACE_smpi_comm_in(get_pid(), "action_reduce",
615 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
616 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
618 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
619 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
620 args.root, MPI_COMM_WORLD);
621 if (args.comp_size != 0.0)
622 simgrid::s4u::this_actor::exec_init(args.comp_size)
623 ->set_name("computation")
627 TRACE_smpi_comm_out(get_pid());
630 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
632 const AllReduceArgParser& args = get_args();
633 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
634 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
635 Datatype::encode(args.datatype1), ""));
637 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
638 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
640 if (args.comp_size != 0.0)
641 simgrid::s4u::this_actor::exec_init(args.comp_size)
642 ->set_name("computation")
646 TRACE_smpi_comm_out(get_pid());
649 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
651 const AllToAllArgParser& args = get_args();
652 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
653 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
654 Datatype::encode(args.datatype1),
655 Datatype::encode(args.datatype2)));
657 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
658 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
661 TRACE_smpi_comm_out(get_pid());
664 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
666 const GatherArgParser& args = get_args();
667 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
668 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
669 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
670 Datatype::encode(args.datatype2)));
672 if (get_name() == "gather") {
673 int rank = MPI_COMM_WORLD->rank();
674 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
675 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
676 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
678 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
679 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
682 TRACE_smpi_comm_out(get_pid());
685 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
687 int rank = MPI_COMM_WORLD->rank();
688 const GatherVArgParser& args = get_args();
689 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
690 new simgrid::instr::VarCollTIData(
691 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
692 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
694 if (get_name() == "gatherv") {
695 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
696 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
697 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
699 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
701 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
704 TRACE_smpi_comm_out(get_pid());
707 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
709 int rank = MPI_COMM_WORLD->rank();
710 const ScatterArgParser& args = get_args();
711 TRACE_smpi_comm_in(get_pid(), "action_scatter",
712 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
713 Datatype::encode(args.datatype1),
714 Datatype::encode(args.datatype2)));
716 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
717 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
718 args.datatype2, args.root, MPI_COMM_WORLD);
720 TRACE_smpi_comm_out(get_pid());
723 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
725 int rank = MPI_COMM_WORLD->rank();
726 const ScatterVArgParser& args = get_args();
727 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
728 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
729 nullptr, Datatype::encode(args.datatype1),
730 Datatype::encode(args.datatype2)));
732 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
733 args.sendcounts->data(), args.disps.data(), args.datatype1,
734 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
737 TRACE_smpi_comm_out(get_pid());
740 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
742 const ReduceScatterArgParser& args = get_args();
744 get_pid(), "action_reducescatter",
745 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
746 /* ugly as we use datatype field to pass computation as string */
747 /* and because of the trick to avoid getting 0.000000 when 0 is given */
748 args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
749 Datatype::encode(args.datatype1)));
751 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
752 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
753 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
754 if (args.comp_size != 0.0)
755 simgrid::s4u::this_actor::exec_init(args.comp_size)
756 ->set_name("computation")
759 TRACE_smpi_comm_out(get_pid());
762 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
764 const ScanArgParser& args = get_args();
765 TRACE_smpi_comm_in(get_pid(), "action_scan",
766 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
767 args.size, 0, Datatype::encode(args.datatype1), ""));
768 if (get_name() == "scan")
769 colls::scan(send_buffer(args.size * args.datatype1->size()),
770 recv_buffer(args.size * args.datatype1->size()), args.size,
771 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
773 colls::exscan(send_buffer(args.size * args.datatype1->size()),
774 recv_buffer(args.size * args.datatype1->size()), args.size,
775 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
777 if (args.comp_size != 0.0)
778 simgrid::s4u::this_actor::exec_init(args.comp_size)
779 ->set_name("computation")
782 TRACE_smpi_comm_out(get_pid());
785 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
787 const AllToAllVArgParser& args = get_args();
788 TRACE_smpi_comm_in(get_pid(), __func__,
789 new simgrid::instr::VarCollTIData(
790 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
791 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
793 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
794 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
795 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
797 TRACE_smpi_comm_out(get_pid());
799 } // Replay Namespace
800 }} // namespace simgrid::smpi
802 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
803 /** @brief Only initialize the replay, don't do it for real */
804 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
806 xbt_assert(not smpi_process()->initializing());
808 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
809 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
810 simgrid::smpi::ActorExt::init();
812 smpi_process()->mark_as_initialized();
813 smpi_process()->set_replaying(true);
815 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
816 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
817 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
818 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
819 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
822 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
823 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
824 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
825 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
826 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
827 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
828 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
829 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
830 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
831 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
832 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
833 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
834 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
835 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
836 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
837 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
838 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
839 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
840 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
841 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
842 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
843 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
844 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
845 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
847 //if we have a delayed start, sleep here.
848 if (start_delay_flops > 0) {
849 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
850 private_execute_flops(start_delay_flops);
852 // Wait for the other actors to initialize also
853 simgrid::s4u::this_actor::yield();
855 if(_smpi_init_sleep > 0)
856 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
859 /** @brief actually run the replay after initialization */
860 void smpi_replay_main(int rank, const char* private_trace_filename)
862 static int active_processes = 0;
864 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
865 std::string rank_string = std::to_string(rank);
866 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
868 /* and now, finalize everything */
869 /* One active process will stop. Decrease the counter*/
870 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
871 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
872 if (count_requests > 0) {
873 std::vector<MPI_Request> requests(count_requests);
876 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
877 requests[i] = pair.second;
880 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
883 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
884 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
888 if(active_processes==0){
889 /* Last process alive speaking: end the simulated timer */
890 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
891 smpi_free_replay_tmp_buffers();
894 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
895 new simgrid::instr::NoOpTIData("finalize"));
897 smpi_process()->finalize();
899 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
902 /** @brief chain a replay initialization and a replay start */
903 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
905 smpi_replay_init(instance_id, rank, start_delay_flops);
906 smpi_replay_main(rank, private_trace_filename);