1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
20 #include <unordered_map>
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
31 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
36 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
42 static void apply(size_t& seed, Tuple const& tuple)
44 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45 hash_combine(seed, std::get<Index>(tuple));
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
51 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 template <typename... TT> class hash<std::tuple<TT...>> {
56 size_t operator()(std::tuple<TT...> const& tt) const
59 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
67 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68 std::string s = boost::algorithm::join(action, " ");
69 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
73 /* Helper functions */
74 static double parse_double(const std::string& string)
76 return xbt_str_parse_double(string.c_str(), "not a double");
79 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
81 return i < action.size() ? std::stoi(action[i]) : 0;
84 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
86 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
93 MPI_Datatype MPI_DEFAULT_TYPE;
95 class RequestStorage {
97 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
98 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
103 RequestStorage() = default;
104 size_t size() const { return store.size(); }
106 req_storage_t& get_store() { return store; }
108 void get_requests(std::vector<MPI_Request>& vec) const
110 for (auto const& pair : store) {
111 auto& req = pair.second;
112 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
113 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
114 vec.push_back(pair.second);
115 pair.second->print_request("MM");
120 MPI_Request find(int src, int dst, int tag)
122 auto it = store.find(req_key_t(src, dst, tag));
123 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
126 void remove(const Request* req)
128 if (req == MPI_REQUEST_NULL) return;
130 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
133 void add(MPI_Request req)
135 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
136 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
139 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
140 void addNullRequest(int src, int dst, int tag)
142 store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
147 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
149 CHECK_ACTION_PARAMS(action, 3, 0)
150 src = std::stoi(action[2]);
151 dst = std::stoi(action[3]);
152 tag = std::stoi(action[4]);
155 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
157 CHECK_ACTION_PARAMS(action, 3, 1)
158 partner = std::stoi(action[2]);
159 tag = std::stoi(action[3]);
160 size = parse_double(action[4]);
161 datatype1 = parse_datatype(action, 5);
164 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 CHECK_ACTION_PARAMS(action, 1, 0)
167 flops = parse_double(action[2]);
170 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
172 CHECK_ACTION_PARAMS(action, 1, 0)
173 time = parse_double(action[2]);
176 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
178 CHECK_ACTION_PARAMS(action, 2, 0)
179 filename = std::string(action[2]);
180 line = std::stoi(action[3]);
183 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
185 CHECK_ACTION_PARAMS(action, 1, 2)
186 size = parse_double(action[2]);
187 root = parse_root(action, 3);
188 datatype1 = parse_datatype(action, 4);
191 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
193 CHECK_ACTION_PARAMS(action, 2, 2)
194 double arg2 = trunc(parse_double(action[2]));
195 xbt_assert(0.0 <= arg2 && arg2 <= static_cast<double>(std::numeric_limits<unsigned>::max()));
196 comm_size = static_cast<unsigned>(arg2);
197 comp_size = parse_double(action[3]);
198 root = parse_root(action, 4);
199 datatype1 = parse_datatype(action, 5);
202 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
204 CHECK_ACTION_PARAMS(action, 2, 1)
205 double arg2 = trunc(parse_double(action[2]));
206 xbt_assert(0.0 <= arg2 && arg2 <= static_cast<double>(std::numeric_limits<unsigned>::max()));
207 comm_size = static_cast<unsigned>(arg2);
208 comp_size = parse_double(action[3]);
209 datatype1 = parse_datatype(action, 4);
212 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
214 CHECK_ACTION_PARAMS(action, 2, 1)
215 comm_size = MPI_COMM_WORLD->size();
216 send_size = parse_double(action[2]);
217 recv_size = parse_double(action[3]);
218 datatype1 = parse_datatype(action, 4);
219 datatype2 = parse_datatype(action, 5);
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
224 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
227 1) 68 is the sendcounts
228 2) 68 is the recvcounts
229 3) 0 is the root node
230 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
233 CHECK_ACTION_PARAMS(action, 2, 3)
234 comm_size = MPI_COMM_WORLD->size();
235 send_size = parse_double(action[2]);
236 recv_size = parse_double(action[3]);
238 if (name == "gather") {
239 root = parse_root(action, 4);
240 datatype1 = parse_datatype(action, 5);
241 datatype2 = parse_datatype(action, 6);
244 datatype1 = parse_datatype(action, 4);
245 datatype2 = parse_datatype(action, 5);
249 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
251 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
252 0 gather 68 68 10 10 10 0 0 0
254 1) 68 is the sendcount
255 2) 68 10 10 10 is the recvcounts
256 3) 0 is the root node
257 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
258 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
260 comm_size = MPI_COMM_WORLD->size();
261 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
262 send_size = parse_double(action[2]);
263 disps = std::vector<int>(comm_size, 0);
264 recvcounts = std::make_shared<std::vector<int>>(comm_size);
266 if (name == "gatherv") {
267 root = parse_root(action, 3 + comm_size);
268 datatype1 = parse_datatype(action, 4 + comm_size);
269 datatype2 = parse_datatype(action, 5 + comm_size);
272 unsigned disp_index = 0;
273 /* The 3 comes from "0 gather <sendcount>", which must always be present.
274 * The + comm_size is the recvcounts array, which must also be present
276 if (action.size() > 3 + comm_size + comm_size) {
277 // datatype + disp are specified
278 datatype1 = parse_datatype(action, 3 + comm_size);
279 datatype2 = parse_datatype(action, 4 + comm_size);
280 disp_index = 5 + comm_size;
281 } else if (action.size() > 3 + comm_size + 2) {
282 // disps specified; datatype is not specified; use the default one
283 datatype1 = MPI_DEFAULT_TYPE;
284 datatype2 = MPI_DEFAULT_TYPE;
285 disp_index = 3 + comm_size;
287 // no disp specified, maybe only datatype,
288 datatype1 = parse_datatype(action, 3 + comm_size);
289 datatype2 = parse_datatype(action, 4 + comm_size);
292 if (disp_index != 0) {
293 xbt_assert(disp_index + comm_size <= action.size());
294 for (unsigned i = 0; i < comm_size; i++)
295 disps[i] = std::stoi(action[disp_index + i]);
299 for (unsigned int i = 0; i < comm_size; i++) {
300 (*recvcounts)[i] = std::stoi(action[i + 3]);
302 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
305 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
307 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
310 1) 68 is the sendcounts
311 2) 68 is the recvcounts
312 3) 0 is the root node
313 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
314 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
316 CHECK_ACTION_PARAMS(action, 2, 3)
317 comm_size = MPI_COMM_WORLD->size();
318 send_size = parse_double(action[2]);
319 recv_size = parse_double(action[3]);
320 root = parse_root(action, 4);
321 datatype1 = parse_datatype(action, 5);
322 datatype2 = parse_datatype(action, 6);
325 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
327 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
328 0 gather 68 10 10 10 68 0 0 0
330 1) 68 10 10 10 is the sendcounts
331 2) 68 is the recvcount
332 3) 0 is the root node
333 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
334 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
336 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
337 recv_size = parse_double(action[2 + comm_size]);
338 disps = std::vector<int>(comm_size, 0);
339 sendcounts = std::make_shared<std::vector<int>>(comm_size);
341 root = parse_root(action, 3 + comm_size);
342 datatype1 = parse_datatype(action, 4 + comm_size);
343 datatype2 = parse_datatype(action, 5 + comm_size);
345 for (unsigned int i = 0; i < comm_size; i++) {
346 (*sendcounts)[i] = std::stoi(action[i + 2]);
348 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
351 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
353 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
354 0 reducescatter 275427 275427 275427 204020 11346849 0
356 1) The first four values after the name of the action declare the recvcounts array
357 2) The value 11346849 is the amount of instructions
358 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
360 comm_size = MPI_COMM_WORLD->size();
361 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
362 comp_size = parse_double(action[2 + comm_size]);
363 recvcounts = std::make_shared<std::vector<int>>(comm_size);
364 datatype1 = parse_datatype(action, 3 + comm_size);
366 for (unsigned int i = 0; i < comm_size; i++) {
367 recvcounts->push_back(std::stoi(action[i + 2]));
369 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
372 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
374 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
375 0 alltoallv 100 1 7 10 12 100 1 70 10 5
377 1) 100 is the size of the send buffer *sizeof(int),
378 2) 1 7 10 12 is the sendcounts array
379 3) 100*sizeof(int) is the size of the receiver buffer
380 4) 1 70 10 5 is the recvcounts array
382 comm_size = MPI_COMM_WORLD->size();
383 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
384 sendcounts = std::make_shared<std::vector<int>>(comm_size);
385 recvcounts = std::make_shared<std::vector<int>>(comm_size);
386 senddisps = std::vector<int>(comm_size, 0);
387 recvdisps = std::vector<int>(comm_size, 0);
389 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
390 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
392 send_buf_size = parse_double(action[2]);
393 recv_buf_size = parse_double(action[3 + comm_size]);
394 for (unsigned int i = 0; i < comm_size; i++) {
395 (*sendcounts)[i] = std::stoi(action[3 + i]);
396 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
398 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
399 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
402 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
404 std::string s = boost::algorithm::join(action, " ");
405 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
406 const WaitTestParser& args = get_args();
407 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
408 req_storage.remove(request);
410 if (request == MPI_REQUEST_NULL) {
411 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
416 aid_t rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
418 // Must be taken before Request::wait() since the request may be set to
419 // MPI_REQUEST_NULL by Request::wait!
420 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
421 // TODO: Here we take the rank while we normally take the process id (look for get_pid())
422 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
425 Request::wait(&request, &status);
427 TRACE_smpi_comm_out(rank);
428 if (is_wait_for_receive)
429 TRACE_smpi_recv(args.src, args.dst, args.tag);
432 void SendAction::kernel(simgrid::xbt::ReplayAction&)
434 const SendRecvParser& args = get_args();
435 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
439 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
440 if (not TRACE_smpi_view_internals())
441 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
443 if (get_name() == "send") {
444 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
445 } else if (get_name() == "isend") {
446 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447 req_storage.add(request);
449 xbt_die("Don't know this action, %s", get_name().c_str());
452 TRACE_smpi_comm_out(get_pid());
455 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
457 const SendRecvParser& args = get_args();
460 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
463 // unknown size from the receiver point of view
464 double arg_size = args.size;
465 if (arg_size <= 0.0) {
466 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
467 arg_size = status.count;
470 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
471 if (get_name() == "recv") {
473 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
474 } else if (get_name() == "irecv") {
475 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
476 req_storage.add(request);
481 TRACE_smpi_comm_out(get_pid());
482 if (is_recv && not TRACE_smpi_view_internals()) {
483 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
484 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
488 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
490 const ComputeParser& args = get_args();
491 if (smpi_cfg_simulate_computation()) {
492 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
496 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
498 const SleepParser& args = get_args();
499 XBT_DEBUG("Sleep for: %lf secs", args.time);
500 aid_t pid = simgrid::s4u::this_actor::get_pid();
501 TRACE_smpi_sleeping_in(pid, args.time);
502 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
503 TRACE_smpi_sleeping_out(pid);
506 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
508 const LocationParser& args = get_args();
509 smpi_trace_set_call_location(args.filename.c_str(), args.line);
512 void TestAction::kernel(simgrid::xbt::ReplayAction&)
514 const WaitTestParser& args = get_args();
515 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
516 req_storage.remove(request);
517 // if request is null here, this may mean that a previous test has succeeded
518 // Different times in traced application and replayed version may lead to this
519 // In this case, ignore the extra calls.
520 if (request != MPI_REQUEST_NULL) {
521 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
525 Request::test(&request, &status, &flag);
527 XBT_DEBUG("MPI_Test result: %d", flag);
528 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
530 if (request == MPI_REQUEST_NULL)
531 req_storage.addNullRequest(args.src, args.dst, args.tag);
533 req_storage.add(request);
535 TRACE_smpi_comm_out(get_pid());
539 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
541 CHECK_ACTION_PARAMS(action, 0, 1)
542 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
543 : MPI_BYTE; // default TAU datatype
545 /* start a simulated timer */
546 smpi_process()->simulated_start();
549 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
554 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
556 const size_t count_requests = req_storage.size();
558 if (count_requests > 0) {
559 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
560 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
561 std::vector<MPI_Request> reqs;
562 req_storage.get_requests(reqs);
563 for (auto const& req : reqs) {
564 if (req && (req->flags() & MPI_REQ_RECV)) {
565 sender_receiver.emplace_back(req->src(), req->dst());
568 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
569 req_storage.get_store().clear();
571 for (auto const& pair : sender_receiver) {
572 TRACE_smpi_recv(pair.first, pair.second, 0);
574 TRACE_smpi_comm_out(get_pid());
578 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
580 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
581 colls::barrier(MPI_COMM_WORLD);
582 TRACE_smpi_comm_out(get_pid());
585 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
587 const BcastArgParser& args = get_args();
588 TRACE_smpi_comm_in(get_pid(), "action_bcast",
589 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root), -1.0, args.size,
590 0, Datatype::encode(args.datatype1), ""));
592 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
594 TRACE_smpi_comm_out(get_pid());
597 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
599 const ReduceArgParser& args = get_args();
600 TRACE_smpi_comm_in(get_pid(), "action_reduce",
601 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root), args.comp_size,
602 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
604 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
605 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
606 args.root, MPI_COMM_WORLD);
607 private_execute_flops(args.comp_size);
609 TRACE_smpi_comm_out(get_pid());
612 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
614 const AllReduceArgParser& args = get_args();
615 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
616 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
617 Datatype::encode(args.datatype1), ""));
619 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
620 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
622 private_execute_flops(args.comp_size);
624 TRACE_smpi_comm_out(get_pid());
627 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
629 const AllToAllArgParser& args = get_args();
630 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
631 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
632 Datatype::encode(args.datatype1),
633 Datatype::encode(args.datatype2)));
635 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
636 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
639 TRACE_smpi_comm_out(get_pid());
642 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
644 const GatherArgParser& args = get_args();
645 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
646 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
647 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
648 Datatype::encode(args.datatype2)));
650 if (get_name() == "gather") {
651 int rank = MPI_COMM_WORLD->rank();
652 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
653 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
654 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
656 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
657 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
660 TRACE_smpi_comm_out(get_pid());
663 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
665 int rank = MPI_COMM_WORLD->rank();
666 const GatherVArgParser& args = get_args();
667 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
668 new simgrid::instr::VarCollTIData(
669 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, 0,
670 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
672 if (get_name() == "gatherv") {
673 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
674 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
675 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
677 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
678 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
679 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
682 TRACE_smpi_comm_out(get_pid());
685 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
687 int rank = MPI_COMM_WORLD->rank();
688 const ScatterArgParser& args = get_args();
689 TRACE_smpi_comm_in(get_pid(), "action_scatter",
690 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
691 Datatype::encode(args.datatype1),
692 Datatype::encode(args.datatype2)));
694 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
695 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
696 args.datatype2, args.root, MPI_COMM_WORLD);
698 TRACE_smpi_comm_out(get_pid());
701 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
703 int rank = MPI_COMM_WORLD->rank();
704 const ScatterVArgParser& args = get_args();
705 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
706 new simgrid::instr::VarCollTIData(get_name(), args.root, 0, args.sendcounts, args.recv_size,
707 nullptr, Datatype::encode(args.datatype1),
708 Datatype::encode(args.datatype2)));
710 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
711 args.sendcounts->data(), args.disps.data(), args.datatype1,
712 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
715 TRACE_smpi_comm_out(get_pid());
718 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
720 const ReduceScatterArgParser& args = get_args();
722 get_pid(), "action_reducescatter",
723 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, 0, args.recvcounts,
724 std::to_string(args.comp_size), /* ugly hack to print comp_size */
725 Datatype::encode(args.datatype1)));
727 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
728 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
729 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
731 private_execute_flops(args.comp_size);
732 TRACE_smpi_comm_out(get_pid());
735 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
737 const AllToAllVArgParser& args = get_args();
738 TRACE_smpi_comm_in(get_pid(), __func__,
739 new simgrid::instr::VarCollTIData(
740 "alltoallv", 0, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
741 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
743 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
744 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
745 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
747 TRACE_smpi_comm_out(get_pid());
749 } // Replay Namespace
750 }} // namespace simgrid::smpi
752 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
753 /** @brief Only initialize the replay, don't do it for real */
754 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
756 xbt_assert(not smpi_process()->initializing());
758 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
759 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
760 simgrid::smpi::ActorExt::init();
762 smpi_process()->mark_as_initialized();
763 smpi_process()->set_replaying(true);
765 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
766 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
767 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
768 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
769 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
770 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
771 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
772 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
773 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
774 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
775 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
776 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
779 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
780 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
781 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
782 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
783 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
784 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
785 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
786 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
787 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
788 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
789 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
790 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
791 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
792 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
793 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
795 //if we have a delayed start, sleep here.
796 if (start_delay_flops > 0) {
797 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
798 private_execute_flops(start_delay_flops);
800 // Wait for the other actors to initialize also
801 simgrid::s4u::this_actor::yield();
805 /** @brief actually run the replay after initialization */
806 void smpi_replay_main(int rank, const char* private_trace_filename)
808 static int active_processes = 0;
810 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
811 std::string rank_string = std::to_string(rank);
812 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
814 /* and now, finalize everything */
815 /* One active process will stop. Decrease the counter*/
816 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
817 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
818 if (count_requests > 0) {
819 std::vector<MPI_Request> requests(count_requests);
822 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
823 requests[i] = pair.second;
826 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
830 if(active_processes==0){
831 /* Last process alive speaking: end the simulated timer */
832 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
833 smpi_free_replay_tmp_buffers();
836 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
837 new simgrid::instr::NoOpTIData("finalize"));
839 smpi_process()->finalize();
841 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
844 /** @brief chain a replay initialization and a replay start */
845 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
847 smpi_replay_init(instance_id, rank, start_delay_flops);
848 smpi_replay_main(rank, private_trace_filename);