1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
20 #include <unordered_map>
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
31 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
36 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
42 static void apply(size_t& seed, Tuple const& tuple)
44 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45 hash_combine(seed, std::get<Index>(tuple));
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
51 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 template <typename... TT> class hash<std::tuple<TT...>> {
56 size_t operator()(std::tuple<TT...> const& tt) const
59 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
67 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68 std::string s = boost::algorithm::join(action, " ");
69 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
73 /* Helper functions */
74 static double parse_double(const std::string& string)
76 return xbt_str_parse_double(string.c_str(), "not a double");
79 template <typename T> static T parse_integer(const std::string& string)
81 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
82 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
83 val <= static_cast<double>(std::numeric_limits<T>::max()),
84 "out of range: %g", val);
85 return static_cast<T>(val);
88 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
90 return i < action.size() ? std::stoi(action[i]) : 0;
93 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
95 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
102 MPI_Datatype MPI_DEFAULT_TYPE;
104 class RequestStorage {
106 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
107 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
112 RequestStorage() = default;
113 size_t size() const { return store.size(); }
115 req_storage_t& get_store() { return store; }
117 void get_requests(std::vector<MPI_Request>& vec) const
119 for (auto const& pair : store) {
120 auto& req = pair.second;
121 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
122 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123 vec.push_back(pair.second);
124 pair.second->print_request("MM");
129 MPI_Request find(int src, int dst, int tag)
131 auto it = store.find(req_key_t(src, dst, tag));
132 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
135 void remove(const Request* req)
137 if (req == MPI_REQUEST_NULL) return;
139 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
142 void add(MPI_Request req)
144 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
145 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
148 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149 void addNullRequest(int src, int dst, int tag)
151 store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
156 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
158 CHECK_ACTION_PARAMS(action, 3, 0)
159 src = std::stoi(action[2]);
160 dst = std::stoi(action[3]);
161 tag = std::stoi(action[4]);
164 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 CHECK_ACTION_PARAMS(action, 3, 1)
167 partner = std::stoi(action[2]);
168 tag = std::stoi(action[3]);
169 size = parse_integer<size_t>(action[4]);
170 datatype1 = parse_datatype(action, 5);
173 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 CHECK_ACTION_PARAMS(action, 1, 0)
176 flops = parse_double(action[2]);
179 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
181 CHECK_ACTION_PARAMS(action, 1, 0)
182 time = parse_double(action[2]);
185 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
187 CHECK_ACTION_PARAMS(action, 2, 0)
188 filename = std::string(action[2]);
189 line = std::stoi(action[3]);
192 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 CHECK_ACTION_PARAMS(action, 1, 2)
195 size = parse_integer<size_t>(action[2]);
196 root = parse_root(action, 3);
197 datatype1 = parse_datatype(action, 4);
200 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 CHECK_ACTION_PARAMS(action, 2, 2)
203 comm_size = parse_integer<unsigned>(action[2]);
204 comp_size = parse_double(action[3]);
205 root = parse_root(action, 4);
206 datatype1 = parse_datatype(action, 5);
209 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 comm_size = parse_integer<unsigned>(action[2]);
213 comp_size = parse_double(action[3]);
214 datatype1 = parse_datatype(action, 4);
217 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
219 CHECK_ACTION_PARAMS(action, 2, 1)
220 comm_size = MPI_COMM_WORLD->size();
221 send_size = parse_integer<int>(action[2]);
222 recv_size = parse_integer<int>(action[3]);
223 datatype1 = parse_datatype(action, 4);
224 datatype2 = parse_datatype(action, 5);
227 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
229 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
232 1) 68 is the sendcounts
233 2) 68 is the recvcounts
234 3) 0 is the root node
235 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
236 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
238 CHECK_ACTION_PARAMS(action, 2, 3)
239 comm_size = MPI_COMM_WORLD->size();
240 send_size = parse_integer<int>(action[2]);
241 recv_size = parse_integer<int>(action[3]);
243 if (name == "gather") {
244 root = parse_root(action, 4);
245 datatype1 = parse_datatype(action, 5);
246 datatype2 = parse_datatype(action, 6);
249 datatype1 = parse_datatype(action, 4);
250 datatype2 = parse_datatype(action, 5);
254 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
256 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
257 0 gather 68 68 10 10 10 0 0 0
259 1) 68 is the sendcount
260 2) 68 10 10 10 is the recvcounts
261 3) 0 is the root node
262 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
263 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
265 comm_size = MPI_COMM_WORLD->size();
266 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
267 send_size = parse_integer<int>(action[2]);
268 disps = std::vector<int>(comm_size, 0);
269 recvcounts = std::make_shared<std::vector<int>>(comm_size);
271 if (name == "gatherv") {
272 root = parse_root(action, 3 + comm_size);
273 datatype1 = parse_datatype(action, 4 + comm_size);
274 datatype2 = parse_datatype(action, 5 + comm_size);
277 unsigned disp_index = 0;
278 /* The 3 comes from "0 gather <sendcount>", which must always be present.
279 * The + comm_size is the recvcounts array, which must also be present
281 if (action.size() > 3 + comm_size + comm_size) {
282 // datatype + disp are specified
283 datatype1 = parse_datatype(action, 3 + comm_size);
284 datatype2 = parse_datatype(action, 4 + comm_size);
285 disp_index = 5 + comm_size;
286 } else if (action.size() > 3 + comm_size + 2) {
287 // disps specified; datatype is not specified; use the default one
288 datatype1 = MPI_DEFAULT_TYPE;
289 datatype2 = MPI_DEFAULT_TYPE;
290 disp_index = 3 + comm_size;
292 // no disp specified, maybe only datatype,
293 datatype1 = parse_datatype(action, 3 + comm_size);
294 datatype2 = parse_datatype(action, 4 + comm_size);
297 if (disp_index != 0) {
298 xbt_assert(disp_index + comm_size <= action.size());
299 for (unsigned i = 0; i < comm_size; i++)
300 disps[i] = std::stoi(action[disp_index + i]);
304 for (unsigned int i = 0; i < comm_size; i++) {
305 (*recvcounts)[i] = std::stoi(action[i + 3]);
307 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
310 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
312 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
315 1) 68 is the sendcounts
316 2) 68 is the recvcounts
317 3) 0 is the root node
318 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
319 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
321 CHECK_ACTION_PARAMS(action, 2, 3)
322 comm_size = MPI_COMM_WORLD->size();
323 send_size = parse_integer<int>(action[2]);
324 recv_size = parse_integer<int>(action[3]);
325 root = parse_root(action, 4);
326 datatype1 = parse_datatype(action, 5);
327 datatype2 = parse_datatype(action, 6);
330 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
332 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
333 0 gather 68 10 10 10 68 0 0 0
335 1) 68 10 10 10 is the sendcounts
336 2) 68 is the recvcount
337 3) 0 is the root node
338 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
339 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
341 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
342 recv_size = parse_integer<int>(action[2 + comm_size]);
343 disps = std::vector<int>(comm_size, 0);
344 sendcounts = std::make_shared<std::vector<int>>(comm_size);
346 root = parse_root(action, 3 + comm_size);
347 datatype1 = parse_datatype(action, 4 + comm_size);
348 datatype2 = parse_datatype(action, 5 + comm_size);
350 for (unsigned int i = 0; i < comm_size; i++) {
351 (*sendcounts)[i] = std::stoi(action[i + 2]);
353 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
356 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
358 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
359 0 reducescatter 275427 275427 275427 204020 11346849 0
361 1) The first four values after the name of the action declare the recvcounts array
362 2) The value 11346849 is the amount of instructions
363 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
365 comm_size = MPI_COMM_WORLD->size();
366 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
367 comp_size = parse_double(action[2 + comm_size]);
368 recvcounts = std::make_shared<std::vector<int>>(comm_size);
369 datatype1 = parse_datatype(action, 3 + comm_size);
371 for (unsigned int i = 0; i < comm_size; i++) {
372 recvcounts->push_back(std::stoi(action[i + 2]));
374 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
377 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
379 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
380 0 alltoallv 100 1 7 10 12 100 1 70 10 5
382 1) 100 is the size of the send buffer *sizeof(int),
383 2) 1 7 10 12 is the sendcounts array
384 3) 100*sizeof(int) is the size of the receiver buffer
385 4) 1 70 10 5 is the recvcounts array
387 comm_size = MPI_COMM_WORLD->size();
388 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
389 sendcounts = std::make_shared<std::vector<int>>(comm_size);
390 recvcounts = std::make_shared<std::vector<int>>(comm_size);
391 senddisps = std::vector<int>(comm_size, 0);
392 recvdisps = std::vector<int>(comm_size, 0);
394 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
395 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
397 send_buf_size = parse_integer<int>(action[2]);
398 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
399 for (unsigned int i = 0; i < comm_size; i++) {
400 (*sendcounts)[i] = std::stoi(action[3 + i]);
401 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
403 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
404 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
409 std::string s = boost::algorithm::join(action, " ");
410 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411 const WaitTestParser& args = get_args();
412 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413 req_storage.remove(request);
415 if (request == MPI_REQUEST_NULL) {
416 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
421 aid_t rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
423 // Must be taken before Request::wait() since the request may be set to
424 // MPI_REQUEST_NULL by Request::wait!
425 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
426 // TODO: Here we take the rank while we normally take the process id (look for get_pid())
427 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
430 Request::wait(&request, &status);
432 TRACE_smpi_comm_out(rank);
433 if (is_wait_for_receive)
434 TRACE_smpi_recv(args.src, args.dst, args.tag);
437 void SendAction::kernel(simgrid::xbt::ReplayAction&)
439 const SendRecvParser& args = get_args();
440 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
444 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
445 if (not TRACE_smpi_view_internals())
446 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
448 if (get_name() == "send") {
449 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450 } else if (get_name() == "isend") {
451 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452 req_storage.add(request);
454 xbt_die("Don't know this action, %s", get_name().c_str());
457 TRACE_smpi_comm_out(get_pid());
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
462 const SendRecvParser& args = get_args();
465 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
468 // unknown size from the receiver point of view
469 size_t arg_size = args.size;
471 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
472 arg_size = status.count;
475 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
476 if (get_name() == "recv") {
478 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
479 } else if (get_name() == "irecv") {
480 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
481 req_storage.add(request);
486 TRACE_smpi_comm_out(get_pid());
487 if (is_recv && not TRACE_smpi_view_internals()) {
488 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
489 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
493 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
495 const ComputeParser& args = get_args();
496 if (smpi_cfg_simulate_computation()) {
497 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
501 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
503 const SleepParser& args = get_args();
504 XBT_DEBUG("Sleep for: %lf secs", args.time);
505 aid_t pid = simgrid::s4u::this_actor::get_pid();
506 TRACE_smpi_sleeping_in(pid, args.time);
507 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
508 TRACE_smpi_sleeping_out(pid);
511 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
513 const LocationParser& args = get_args();
514 smpi_trace_set_call_location(args.filename.c_str(), args.line);
517 void TestAction::kernel(simgrid::xbt::ReplayAction&)
519 const WaitTestParser& args = get_args();
520 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
521 req_storage.remove(request);
522 // if request is null here, this may mean that a previous test has succeeded
523 // Different times in traced application and replayed version may lead to this
524 // In this case, ignore the extra calls.
525 if (request != MPI_REQUEST_NULL) {
526 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
530 Request::test(&request, &status, &flag);
532 XBT_DEBUG("MPI_Test result: %d", flag);
533 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
535 if (request == MPI_REQUEST_NULL)
536 req_storage.addNullRequest(args.src, args.dst, args.tag);
538 req_storage.add(request);
540 TRACE_smpi_comm_out(get_pid());
544 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
546 CHECK_ACTION_PARAMS(action, 0, 1)
547 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
548 : MPI_BYTE; // default TAU datatype
550 /* start a simulated timer */
551 smpi_process()->simulated_start();
554 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
559 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
561 const size_t count_requests = req_storage.size();
563 if (count_requests > 0) {
564 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
565 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
566 std::vector<MPI_Request> reqs;
567 req_storage.get_requests(reqs);
568 for (auto const& req : reqs) {
569 if (req && (req->flags() & MPI_REQ_RECV)) {
570 sender_receiver.emplace_back(req->src(), req->dst());
573 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
574 req_storage.get_store().clear();
576 for (auto const& pair : sender_receiver) {
577 TRACE_smpi_recv(pair.first, pair.second, 0);
579 TRACE_smpi_comm_out(get_pid());
583 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
585 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
586 colls::barrier(MPI_COMM_WORLD);
587 TRACE_smpi_comm_out(get_pid());
590 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
592 const BcastArgParser& args = get_args();
593 TRACE_smpi_comm_in(get_pid(), "action_bcast",
594 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root), -1.0, args.size,
595 0, Datatype::encode(args.datatype1), ""));
597 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
599 TRACE_smpi_comm_out(get_pid());
602 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
604 const ReduceArgParser& args = get_args();
605 TRACE_smpi_comm_in(get_pid(), "action_reduce",
606 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root), args.comp_size,
607 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
609 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
610 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
611 args.root, MPI_COMM_WORLD);
612 private_execute_flops(args.comp_size);
614 TRACE_smpi_comm_out(get_pid());
617 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
619 const AllReduceArgParser& args = get_args();
620 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
621 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
622 Datatype::encode(args.datatype1), ""));
624 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
625 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
627 private_execute_flops(args.comp_size);
629 TRACE_smpi_comm_out(get_pid());
632 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
634 const AllToAllArgParser& args = get_args();
635 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
636 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
637 Datatype::encode(args.datatype1),
638 Datatype::encode(args.datatype2)));
640 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
641 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
644 TRACE_smpi_comm_out(get_pid());
647 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
649 const GatherArgParser& args = get_args();
650 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
651 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
652 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
653 Datatype::encode(args.datatype2)));
655 if (get_name() == "gather") {
656 int rank = MPI_COMM_WORLD->rank();
657 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
658 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
659 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
661 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
665 TRACE_smpi_comm_out(get_pid());
668 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
670 int rank = MPI_COMM_WORLD->rank();
671 const GatherVArgParser& args = get_args();
672 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
673 new simgrid::instr::VarCollTIData(
674 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, 0,
675 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
677 if (get_name() == "gatherv") {
678 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
679 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
680 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
682 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
683 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
684 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
687 TRACE_smpi_comm_out(get_pid());
690 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
692 int rank = MPI_COMM_WORLD->rank();
693 const ScatterArgParser& args = get_args();
694 TRACE_smpi_comm_in(get_pid(), "action_scatter",
695 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
696 Datatype::encode(args.datatype1),
697 Datatype::encode(args.datatype2)));
699 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
701 args.datatype2, args.root, MPI_COMM_WORLD);
703 TRACE_smpi_comm_out(get_pid());
706 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
708 int rank = MPI_COMM_WORLD->rank();
709 const ScatterVArgParser& args = get_args();
710 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
711 new simgrid::instr::VarCollTIData(get_name(), args.root, 0, args.sendcounts, args.recv_size,
712 nullptr, Datatype::encode(args.datatype1),
713 Datatype::encode(args.datatype2)));
715 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
716 args.sendcounts->data(), args.disps.data(), args.datatype1,
717 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
720 TRACE_smpi_comm_out(get_pid());
723 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
725 const ReduceScatterArgParser& args = get_args();
727 get_pid(), "action_reducescatter",
728 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, 0, args.recvcounts,
729 std::to_string(args.comp_size), /* ugly hack to print comp_size */
730 Datatype::encode(args.datatype1)));
732 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
733 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
734 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
736 private_execute_flops(args.comp_size);
737 TRACE_smpi_comm_out(get_pid());
740 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
742 const AllToAllVArgParser& args = get_args();
743 TRACE_smpi_comm_in(get_pid(), __func__,
744 new simgrid::instr::VarCollTIData(
745 "alltoallv", 0, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
746 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
748 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
749 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
750 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
752 TRACE_smpi_comm_out(get_pid());
754 } // Replay Namespace
755 }} // namespace simgrid::smpi
757 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
758 /** @brief Only initialize the replay, don't do it for real */
759 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
761 xbt_assert(not smpi_process()->initializing());
763 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
764 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
765 simgrid::smpi::ActorExt::init();
767 smpi_process()->mark_as_initialized();
768 smpi_process()->set_replaying(true);
770 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
771 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
772 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
773 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
774 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
775 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
776 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
784 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
785 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
786 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
787 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
788 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
789 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
790 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
791 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
792 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
793 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
794 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
795 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
796 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
797 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
798 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
800 //if we have a delayed start, sleep here.
801 if (start_delay_flops > 0) {
802 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
803 private_execute_flops(start_delay_flops);
805 // Wait for the other actors to initialize also
806 simgrid::s4u::this_actor::yield();
810 /** @brief actually run the replay after initialization */
811 void smpi_replay_main(int rank, const char* private_trace_filename)
813 static int active_processes = 0;
815 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
816 std::string rank_string = std::to_string(rank);
817 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
819 /* and now, finalize everything */
820 /* One active process will stop. Decrease the counter*/
821 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
822 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
823 if (count_requests > 0) {
824 std::vector<MPI_Request> requests(count_requests);
827 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
828 requests[i] = pair.second;
831 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
834 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
835 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
839 if(active_processes==0){
840 /* Last process alive speaking: end the simulated timer */
841 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
842 smpi_free_replay_tmp_buffers();
845 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
846 new simgrid::instr::NoOpTIData("finalize"));
848 smpi_process()->finalize();
850 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
853 /** @brief chain a replay initialization and a replay start */
854 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
856 smpi_replay_init(instance_id, rank, start_delay_flops);
857 smpi_replay_main(rank, private_trace_filename);