1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
20 #include <unordered_map>
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
31 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
36 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
42 static void apply(size_t& seed, Tuple const& tuple)
44 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45 hash_combine(seed, std::get<Index>(tuple));
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
51 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 template <typename... TT> class hash<std::tuple<TT...>> {
56 size_t operator()(std::tuple<TT...> const& tt) const
59 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
67 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68 std::string s = boost::algorithm::join(action, " ");
69 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
73 /* Helper functions */
74 static double parse_double(const std::string& string)
76 return xbt_str_parse_double(string.c_str(), "not a double");
79 template <typename T> static T parse_integer(const std::string& string)
81 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
82 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
83 val <= static_cast<double>(std::numeric_limits<T>::max()),
84 "out of range: %g", val);
85 return static_cast<T>(val);
88 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
90 return i < action.size() ? std::stoi(action[i]) : 0;
93 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
95 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
102 MPI_Datatype MPI_DEFAULT_TYPE;
104 class RequestStorage {
106 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
107 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
112 RequestStorage() = default;
113 size_t size() const { return store.size(); }
115 req_storage_t& get_store() { return store; }
117 void get_requests(std::vector<MPI_Request>& vec) const
119 for (auto const& pair : store) {
120 auto& req = pair.second;
121 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
122 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123 vec.push_back(pair.second);
124 pair.second->print_request("MM");
129 MPI_Request find(int src, int dst, int tag)
131 auto it = store.find(req_key_t(src, dst, tag));
132 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
135 void remove(const Request* req)
137 if (req == MPI_REQUEST_NULL) return;
139 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
142 void add(MPI_Request req)
144 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
145 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
148 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149 void addNullRequest(int src, int dst, int tag)
151 store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
156 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
158 CHECK_ACTION_PARAMS(action, 3, 0)
159 src = std::stoi(action[2]);
160 dst = std::stoi(action[3]);
161 tag = std::stoi(action[4]);
164 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 CHECK_ACTION_PARAMS(action, 3, 1)
167 partner = std::stoi(action[2]);
168 tag = std::stoi(action[3]);
169 size = parse_integer<size_t>(action[4]);
170 datatype1 = parse_datatype(action, 5);
173 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 CHECK_ACTION_PARAMS(action, 1, 0)
176 flops = parse_double(action[2]);
179 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
181 CHECK_ACTION_PARAMS(action, 1, 0)
182 time = parse_double(action[2]);
185 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
187 CHECK_ACTION_PARAMS(action, 2, 0)
188 filename = std::string(action[2]);
189 line = std::stoi(action[3]);
192 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 CHECK_ACTION_PARAMS(action, 1, 2)
195 size = parse_integer<size_t>(action[2]);
196 root = parse_root(action, 3);
197 datatype1 = parse_datatype(action, 4);
200 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 CHECK_ACTION_PARAMS(action, 2, 2)
203 comm_size = parse_integer<unsigned>(action[2]);
204 comp_size = parse_double(action[3]);
205 root = parse_root(action, 4);
206 datatype1 = parse_datatype(action, 5);
209 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 comm_size = parse_integer<unsigned>(action[2]);
213 comp_size = parse_double(action[3]);
214 datatype1 = parse_datatype(action, 4);
217 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
219 CHECK_ACTION_PARAMS(action, 2, 1)
220 comm_size = MPI_COMM_WORLD->size();
221 send_size = parse_integer<int>(action[2]);
222 recv_size = parse_integer<int>(action[3]);
223 datatype1 = parse_datatype(action, 4);
224 datatype2 = parse_datatype(action, 5);
227 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
229 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
232 1) 68 is the sendcounts
233 2) 68 is the recvcounts
234 3) 0 is the root node
235 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
236 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
238 CHECK_ACTION_PARAMS(action, 2, 3)
239 comm_size = MPI_COMM_WORLD->size();
240 send_size = parse_integer<int>(action[2]);
241 recv_size = parse_integer<int>(action[3]);
243 if (name == "gather") {
244 root = parse_root(action, 4);
245 datatype1 = parse_datatype(action, 5);
246 datatype2 = parse_datatype(action, 6);
249 datatype1 = parse_datatype(action, 4);
250 datatype2 = parse_datatype(action, 5);
254 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
256 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
257 0 gather 68 68 10 10 10 0 0 0
259 1) 68 is the sendcount
260 2) 68 10 10 10 is the recvcounts
261 3) 0 is the root node
262 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
263 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
265 comm_size = MPI_COMM_WORLD->size();
266 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
267 send_size = parse_integer<int>(action[2]);
268 disps = std::vector<int>(comm_size, 0);
269 recvcounts = std::make_shared<std::vector<int>>(comm_size);
271 if (name == "gatherv") {
272 root = parse_root(action, 3 + comm_size);
273 datatype1 = parse_datatype(action, 4 + comm_size);
274 datatype2 = parse_datatype(action, 5 + comm_size);
277 unsigned disp_index = 0;
278 /* The 3 comes from "0 gather <sendcount>", which must always be present.
279 * The + comm_size is the recvcounts array, which must also be present
281 if (action.size() > 3 + comm_size + comm_size) {
282 // datatype + disp are specified
283 datatype1 = parse_datatype(action, 3 + comm_size);
284 datatype2 = parse_datatype(action, 4 + comm_size);
285 disp_index = 5 + comm_size;
286 } else if (action.size() > 3 + comm_size + 2) {
287 // disps specified; datatype is not specified; use the default one
288 datatype1 = MPI_DEFAULT_TYPE;
289 datatype2 = MPI_DEFAULT_TYPE;
290 disp_index = 3 + comm_size;
292 // no disp specified, maybe only datatype,
293 datatype1 = parse_datatype(action, 3 + comm_size);
294 datatype2 = parse_datatype(action, 4 + comm_size);
297 if (disp_index != 0) {
298 xbt_assert(disp_index + comm_size <= action.size());
299 for (unsigned i = 0; i < comm_size; i++)
300 disps[i] = std::stoi(action[disp_index + i]);
304 for (unsigned int i = 0; i < comm_size; i++) {
305 (*recvcounts)[i] = std::stoi(action[i + 3]);
307 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
310 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
312 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
315 1) 68 is the sendcounts
316 2) 68 is the recvcounts
317 3) 0 is the root node
318 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
319 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
321 CHECK_ACTION_PARAMS(action, 2, 3)
322 comm_size = MPI_COMM_WORLD->size();
323 send_size = parse_integer<int>(action[2]);
324 recv_size = parse_integer<int>(action[3]);
325 root = parse_root(action, 4);
326 datatype1 = parse_datatype(action, 5);
327 datatype2 = parse_datatype(action, 6);
330 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
332 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
333 0 gather 68 10 10 10 68 0 0 0
335 1) 68 10 10 10 is the sendcounts
336 2) 68 is the recvcount
337 3) 0 is the root node
338 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
339 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
341 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
342 recv_size = parse_integer<int>(action[2 + comm_size]);
343 disps = std::vector<int>(comm_size, 0);
344 sendcounts = std::make_shared<std::vector<int>>(comm_size);
346 root = parse_root(action, 3 + comm_size);
347 datatype1 = parse_datatype(action, 4 + comm_size);
348 datatype2 = parse_datatype(action, 5 + comm_size);
350 for (unsigned int i = 0; i < comm_size; i++) {
351 (*sendcounts)[i] = std::stoi(action[i + 2]);
353 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
356 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
358 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
359 0 reducescatter 275427 275427 275427 204020 11346849 0
361 1) The first four values after the name of the action declare the recvcounts array
362 2) The value 11346849 is the amount of instructions
363 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
365 comm_size = MPI_COMM_WORLD->size();
366 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
367 comp_size = parse_double(action[2 + comm_size]);
368 recvcounts = std::make_shared<std::vector<int>>(comm_size);
369 datatype1 = parse_datatype(action, 3 + comm_size);
371 for (unsigned int i = 0; i < comm_size; i++) {
372 recvcounts->push_back(std::stoi(action[i + 2]));
374 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
377 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
379 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
380 0 alltoallv 100 1 7 10 12 100 1 70 10 5
382 1) 100 is the size of the send buffer *sizeof(int),
383 2) 1 7 10 12 is the sendcounts array
384 3) 100*sizeof(int) is the size of the receiver buffer
385 4) 1 70 10 5 is the recvcounts array
387 comm_size = MPI_COMM_WORLD->size();
388 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
389 sendcounts = std::make_shared<std::vector<int>>(comm_size);
390 recvcounts = std::make_shared<std::vector<int>>(comm_size);
391 senddisps = std::vector<int>(comm_size, 0);
392 recvdisps = std::vector<int>(comm_size, 0);
394 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
395 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
397 send_buf_size = parse_integer<int>(action[2]);
398 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
399 for (unsigned int i = 0; i < comm_size; i++) {
400 (*sendcounts)[i] = std::stoi(action[3 + i]);
401 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
403 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
404 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
409 std::string s = boost::algorithm::join(action, " ");
410 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411 const WaitTestParser& args = get_args();
412 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413 req_storage.remove(request);
415 if (request == MPI_REQUEST_NULL) {
416 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
421 // Must be taken before Request::wait() since the request may be set to
422 // MPI_REQUEST_NULL by Request::wait!
423 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
425 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
428 Request::wait(&request, &status);
430 TRACE_smpi_comm_out(get_pid());
431 if (is_wait_for_receive)
432 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
435 void SendAction::kernel(simgrid::xbt::ReplayAction&)
437 const SendRecvParser& args = get_args();
438 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
442 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
443 if (not TRACE_smpi_view_internals())
444 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
446 if (get_name() == "send") {
447 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
448 } else if (get_name() == "isend") {
449 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450 req_storage.add(request);
452 xbt_die("Don't know this action, %s", get_name().c_str());
455 TRACE_smpi_comm_out(get_pid());
458 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
460 const SendRecvParser& args = get_args();
463 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
466 // unknown size from the receiver point of view
467 size_t arg_size = args.size;
469 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
470 arg_size = status.count;
473 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
474 if (get_name() == "recv") {
476 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
477 } else if (get_name() == "irecv") {
478 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
479 req_storage.add(request);
484 TRACE_smpi_comm_out(get_pid());
485 if (is_recv && not TRACE_smpi_view_internals()) {
486 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
487 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
491 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
493 const ComputeParser& args = get_args();
494 if (smpi_cfg_simulate_computation()) {
495 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
499 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
501 const SleepParser& args = get_args();
502 XBT_DEBUG("Sleep for: %lf secs", args.time);
503 aid_t pid = simgrid::s4u::this_actor::get_pid();
504 TRACE_smpi_sleeping_in(pid, args.time);
505 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
506 TRACE_smpi_sleeping_out(pid);
509 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
511 const LocationParser& args = get_args();
512 smpi_trace_set_call_location(args.filename.c_str(), args.line);
515 void TestAction::kernel(simgrid::xbt::ReplayAction&)
517 const WaitTestParser& args = get_args();
518 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
519 req_storage.remove(request);
520 // if request is null here, this may mean that a previous test has succeeded
521 // Different times in traced application and replayed version may lead to this
522 // In this case, ignore the extra calls.
523 if (request != MPI_REQUEST_NULL) {
524 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
528 Request::test(&request, &status, &flag);
530 XBT_DEBUG("MPI_Test result: %d", flag);
531 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
533 if (request == MPI_REQUEST_NULL)
534 req_storage.addNullRequest(args.src, args.dst, args.tag);
536 req_storage.add(request);
538 TRACE_smpi_comm_out(get_pid());
542 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
544 CHECK_ACTION_PARAMS(action, 0, 1)
545 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
546 : MPI_BYTE; // default TAU datatype
548 /* start a simulated timer */
549 smpi_process()->simulated_start();
552 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
557 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
559 const size_t count_requests = req_storage.size();
561 if (count_requests > 0) {
562 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
563 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
564 std::vector<MPI_Request> reqs;
565 req_storage.get_requests(reqs);
566 for (auto const& req : reqs) {
567 if (req && (req->flags() & MPI_REQ_RECV)) {
568 sender_receiver.emplace_back(req->src(), req->dst());
571 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
572 req_storage.get_store().clear();
574 for (auto const& pair : sender_receiver) {
575 TRACE_smpi_recv(pair.first, pair.second, 0);
577 TRACE_smpi_comm_out(get_pid());
581 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
583 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
584 colls::barrier(MPI_COMM_WORLD);
585 TRACE_smpi_comm_out(get_pid());
588 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
590 const BcastArgParser& args = get_args();
591 TRACE_smpi_comm_in(get_pid(), "action_bcast",
592 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->rank(args.root), -1.0, args.size,
593 0, Datatype::encode(args.datatype1), ""));
595 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
597 TRACE_smpi_comm_out(get_pid());
600 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
602 const ReduceArgParser& args = get_args();
603 TRACE_smpi_comm_in(get_pid(), "action_reduce",
604 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->rank(args.root), args.comp_size,
605 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
607 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
608 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
609 args.root, MPI_COMM_WORLD);
610 if(args.comp_size != 0.0)
611 private_execute_flops(args.comp_size);
613 TRACE_smpi_comm_out(get_pid());
616 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
618 const AllReduceArgParser& args = get_args();
619 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
620 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
621 Datatype::encode(args.datatype1), ""));
623 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
624 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
626 if(args.comp_size != 0.0)
627 private_execute_flops(args.comp_size);
629 TRACE_smpi_comm_out(get_pid());
632 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
634 const AllToAllArgParser& args = get_args();
635 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
636 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
637 Datatype::encode(args.datatype1),
638 Datatype::encode(args.datatype2)));
640 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
641 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
644 TRACE_smpi_comm_out(get_pid());
647 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
649 const GatherArgParser& args = get_args();
650 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
651 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
652 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
653 Datatype::encode(args.datatype2)));
655 if (get_name() == "gather") {
656 int rank = MPI_COMM_WORLD->rank();
657 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
658 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
659 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
661 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
665 TRACE_smpi_comm_out(get_pid());
668 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
670 int rank = MPI_COMM_WORLD->rank();
671 const GatherVArgParser& args = get_args();
672 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
673 new simgrid::instr::VarCollTIData(
674 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, 0,
675 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
677 if (get_name() == "gatherv") {
678 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
679 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
680 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
682 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
683 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
684 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
687 TRACE_smpi_comm_out(get_pid());
690 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
692 int rank = MPI_COMM_WORLD->rank();
693 const ScatterArgParser& args = get_args();
694 TRACE_smpi_comm_in(get_pid(), "action_scatter",
695 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
696 Datatype::encode(args.datatype1),
697 Datatype::encode(args.datatype2)));
699 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
701 args.datatype2, args.root, MPI_COMM_WORLD);
703 TRACE_smpi_comm_out(get_pid());
706 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
708 int rank = MPI_COMM_WORLD->rank();
709 const ScatterVArgParser& args = get_args();
710 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
711 new simgrid::instr::VarCollTIData(get_name(), args.root, 0, args.sendcounts, args.recv_size,
712 nullptr, Datatype::encode(args.datatype1),
713 Datatype::encode(args.datatype2)));
715 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
716 args.sendcounts->data(), args.disps.data(), args.datatype1,
717 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
720 TRACE_smpi_comm_out(get_pid());
723 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
725 const ReduceScatterArgParser& args = get_args();
727 get_pid(), "action_reducescatter",
728 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, 0, args.recvcounts,
729 std::to_string(args.comp_size), /* ugly hack to print comp_size */
730 Datatype::encode(args.datatype1)));
732 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
733 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
734 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
736 private_execute_flops(args.comp_size);
737 TRACE_smpi_comm_out(get_pid());
740 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
742 const AllToAllVArgParser& args = get_args();
743 TRACE_smpi_comm_in(get_pid(), __func__,
744 new simgrid::instr::VarCollTIData(
745 "alltoallv", 0, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
746 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
748 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
749 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
750 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
752 TRACE_smpi_comm_out(get_pid());
754 } // Replay Namespace
755 }} // namespace simgrid::smpi
757 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
758 /** @brief Only initialize the replay, don't do it for real */
759 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
761 xbt_assert(not smpi_process()->initializing());
763 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
764 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
765 simgrid::smpi::ActorExt::init();
767 smpi_process()->mark_as_initialized();
768 smpi_process()->set_replaying(true);
770 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
771 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
772 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
773 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
774 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
775 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
776 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
784 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
785 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
786 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
787 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
788 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
789 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
790 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
791 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
792 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
793 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
794 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
795 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
796 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
797 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
798 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
800 //if we have a delayed start, sleep here.
801 if (start_delay_flops > 0) {
802 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
803 private_execute_flops(start_delay_flops);
805 // Wait for the other actors to initialize also
806 simgrid::s4u::this_actor::yield();
810 /** @brief actually run the replay after initialization */
811 void smpi_replay_main(int rank, const char* private_trace_filename)
813 static int active_processes = 0;
815 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
816 std::string rank_string = std::to_string(rank);
817 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
819 /* and now, finalize everything */
820 /* One active process will stop. Decrease the counter*/
821 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
822 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
823 if (count_requests > 0) {
824 std::vector<MPI_Request> requests(count_requests);
827 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
828 requests[i] = pair.second;
831 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
834 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
835 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
839 if(active_processes==0){
840 /* Last process alive speaking: end the simulated timer */
841 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
842 smpi_free_replay_tmp_buffers();
845 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
846 new simgrid::instr::NoOpTIData("finalize"));
848 smpi_process()->finalize();
850 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
853 /** @brief chain a replay initialization and a replay start */
854 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
856 smpi_replay_init(instance_id, rank, start_delay_flops);
857 smpi_replay_main(rank, private_trace_filename);