1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
20 #include <unordered_map>
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
31 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
36 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
42 static void apply(size_t& seed, Tuple const& tuple)
44 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45 hash_combine(seed, std::get<Index>(tuple));
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
51 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 template <typename... TT> class hash<std::tuple<TT...>> {
56 size_t operator()(std::tuple<TT...> const& tt) const
59 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
67 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68 std::string s = boost::algorithm::join(action, " ");
69 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
73 /* Helper functions */
74 static double parse_double(const std::string& string)
76 return xbt_str_parse_double(string.c_str(), "not a double");
79 template <typename T> static T parse_integer(const std::string& string)
81 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
82 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
83 val <= static_cast<double>(std::numeric_limits<T>::max()),
84 "out of range: %g", val);
85 return static_cast<T>(val);
88 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
90 return i < action.size() ? std::stoi(action[i]) : 0;
93 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
95 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
102 MPI_Datatype MPI_DEFAULT_TYPE;
104 class RequestStorage {
106 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
107 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
112 RequestStorage() = default;
113 size_t size() const { return store.size(); }
115 req_storage_t& get_store() { return store; }
117 void get_requests(std::vector<MPI_Request>& vec) const
119 for (auto const& pair : store) {
120 auto& req = pair.second;
121 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
122 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123 vec.push_back(pair.second);
124 pair.second->print_request("MM");
129 MPI_Request find(int src, int dst, int tag)
131 auto it = store.find(req_key_t(src, dst, tag));
132 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
135 void remove(const Request* req)
137 if (req == MPI_REQUEST_NULL) return;
139 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
142 void add(MPI_Request req)
144 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
145 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
148 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149 void addNullRequest(int src, int dst, int tag)
151 store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
156 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
158 CHECK_ACTION_PARAMS(action, 3, 0)
159 src = std::stoi(action[2]);
160 dst = std::stoi(action[3]);
161 tag = std::stoi(action[4]);
164 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 CHECK_ACTION_PARAMS(action, 3, 1)
167 partner = std::stoi(action[2]);
168 tag = std::stoi(action[3]);
169 size = parse_integer<size_t>(action[4]);
170 datatype1 = parse_datatype(action, 5);
173 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 CHECK_ACTION_PARAMS(action, 1, 0)
176 flops = parse_double(action[2]);
179 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
181 CHECK_ACTION_PARAMS(action, 1, 0)
182 time = parse_double(action[2]);
185 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
187 CHECK_ACTION_PARAMS(action, 2, 0)
188 filename = std::string(action[2]);
189 line = std::stoi(action[3]);
192 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 CHECK_ACTION_PARAMS(action, 1, 2)
195 size = parse_integer<size_t>(action[2]);
196 root = parse_root(action, 3);
197 datatype1 = parse_datatype(action, 4);
200 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 CHECK_ACTION_PARAMS(action, 2, 2)
203 comm_size = parse_integer<unsigned>(action[2]);
204 comp_size = parse_double(action[3]);
205 root = parse_root(action, 4);
206 datatype1 = parse_datatype(action, 5);
209 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 comm_size = parse_integer<unsigned>(action[2]);
213 comp_size = parse_double(action[3]);
214 datatype1 = parse_datatype(action, 4);
217 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
219 CHECK_ACTION_PARAMS(action, 2, 1)
220 comm_size = MPI_COMM_WORLD->size();
221 send_size = parse_integer<int>(action[2]);
222 recv_size = parse_integer<int>(action[3]);
223 datatype1 = parse_datatype(action, 4);
224 datatype2 = parse_datatype(action, 5);
227 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
229 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
232 1) 68 is the sendcounts
233 2) 68 is the recvcounts
234 3) 0 is the root node
235 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
236 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
238 CHECK_ACTION_PARAMS(action, 2, 3)
239 comm_size = MPI_COMM_WORLD->size();
240 send_size = parse_integer<int>(action[2]);
241 recv_size = parse_integer<int>(action[3]);
243 if (name == "gather") {
244 root = parse_root(action, 4);
245 datatype1 = parse_datatype(action, 5);
246 datatype2 = parse_datatype(action, 6);
249 datatype1 = parse_datatype(action, 4);
250 datatype2 = parse_datatype(action, 5);
254 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
256 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
257 0 gather 68 68 10 10 10 0 0 0
259 1) 68 is the sendcount
260 2) 68 10 10 10 is the recvcounts
261 3) 0 is the root node
262 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
263 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
265 comm_size = MPI_COMM_WORLD->size();
266 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
267 send_size = parse_integer<int>(action[2]);
268 disps = std::vector<int>(comm_size, 0);
269 recvcounts = std::make_shared<std::vector<int>>(comm_size);
271 if (name == "gatherv") {
272 root = parse_root(action, 3 + comm_size);
273 datatype1 = parse_datatype(action, 4 + comm_size);
274 datatype2 = parse_datatype(action, 5 + comm_size);
277 unsigned disp_index = 0;
278 /* The 3 comes from "0 gather <sendcount>", which must always be present.
279 * The + comm_size is the recvcounts array, which must also be present
281 if (action.size() > 3 + comm_size + comm_size) {
282 // datatype + disp are specified
283 datatype1 = parse_datatype(action, 3 + comm_size);
284 datatype2 = parse_datatype(action, 4 + comm_size);
285 disp_index = 5 + comm_size;
286 } else if (action.size() > 3 + comm_size + 2) {
287 // disps specified; datatype is not specified; use the default one
288 datatype1 = MPI_DEFAULT_TYPE;
289 datatype2 = MPI_DEFAULT_TYPE;
290 disp_index = 3 + comm_size;
292 // no disp specified, maybe only datatype,
293 datatype1 = parse_datatype(action, 3 + comm_size);
294 datatype2 = parse_datatype(action, 4 + comm_size);
297 if (disp_index != 0) {
298 xbt_assert(disp_index + comm_size <= action.size());
299 for (unsigned i = 0; i < comm_size; i++)
300 disps[i] = std::stoi(action[disp_index + i]);
304 for (unsigned int i = 0; i < comm_size; i++) {
305 (*recvcounts)[i] = std::stoi(action[i + 3]);
307 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
310 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
312 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
315 1) 68 is the sendcounts
316 2) 68 is the recvcounts
317 3) 0 is the root node
318 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
319 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
321 CHECK_ACTION_PARAMS(action, 2, 3)
322 comm_size = MPI_COMM_WORLD->size();
323 send_size = parse_integer<int>(action[2]);
324 recv_size = parse_integer<int>(action[3]);
325 root = parse_root(action, 4);
326 datatype1 = parse_datatype(action, 5);
327 datatype2 = parse_datatype(action, 6);
330 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
332 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
333 0 gather 68 10 10 10 68 0 0 0
335 1) 68 10 10 10 is the sendcounts
336 2) 68 is the recvcount
337 3) 0 is the root node
338 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
339 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
341 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
342 recv_size = parse_integer<int>(action[2 + comm_size]);
343 disps = std::vector<int>(comm_size, 0);
344 sendcounts = std::make_shared<std::vector<int>>(comm_size);
346 root = parse_root(action, 3 + comm_size);
347 datatype1 = parse_datatype(action, 4 + comm_size);
348 datatype2 = parse_datatype(action, 5 + comm_size);
350 for (unsigned int i = 0; i < comm_size; i++) {
351 (*sendcounts)[i] = std::stoi(action[i + 2]);
353 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
356 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
358 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
359 0 reducescatter 275427 275427 275427 204020 11346849 0
361 1) The first four values after the name of the action declare the recvcounts array
362 2) The value 11346849 is the amount of instructions
363 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
365 comm_size = MPI_COMM_WORLD->size();
366 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
367 comp_size = parse_double(action[2 + comm_size]);
368 recvcounts = std::make_shared<std::vector<int>>(comm_size);
369 datatype1 = parse_datatype(action, 3 + comm_size);
371 for (unsigned int i = 0; i < comm_size; i++) {
372 recvcounts->push_back(std::stoi(action[i + 2]));
374 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
377 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
379 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
380 0 alltoallv 100 1 7 10 12 100 1 70 10 5
382 1) 100 is the size of the send buffer *sizeof(int),
383 2) 1 7 10 12 is the sendcounts array
384 3) 100*sizeof(int) is the size of the receiver buffer
385 4) 1 70 10 5 is the recvcounts array
387 comm_size = MPI_COMM_WORLD->size();
388 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
389 sendcounts = std::make_shared<std::vector<int>>(comm_size);
390 recvcounts = std::make_shared<std::vector<int>>(comm_size);
391 senddisps = std::vector<int>(comm_size, 0);
392 recvdisps = std::vector<int>(comm_size, 0);
394 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
395 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
397 send_buf_size = parse_integer<int>(action[2]);
398 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
399 for (unsigned int i = 0; i < comm_size; i++) {
400 (*sendcounts)[i] = std::stoi(action[3 + i]);
401 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
403 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
404 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
409 std::string s = boost::algorithm::join(action, " ");
410 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411 const WaitTestParser& args = get_args();
412 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413 req_storage.remove(request);
415 if (request == MPI_REQUEST_NULL) {
416 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
421 aid_t rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
423 // Must be taken before Request::wait() since the request may be set to
424 // MPI_REQUEST_NULL by Request::wait!
425 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
426 // TODO: Here we take the rank while we normally take the process id (look for get_pid())
427 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
430 Request::wait(&request, &status);
432 TRACE_smpi_comm_out(rank);
433 if (is_wait_for_receive)
434 TRACE_smpi_recv(args.src, args.dst, args.tag);
437 void SendAction::kernel(simgrid::xbt::ReplayAction&)
439 const SendRecvParser& args = get_args();
440 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
444 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
445 if (not TRACE_smpi_view_internals())
446 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
448 if (get_name() == "send") {
449 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450 } else if (get_name() == "isend") {
451 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452 req_storage.add(request);
454 xbt_die("Don't know this action, %s", get_name().c_str());
457 TRACE_smpi_comm_out(get_pid());
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
462 const SendRecvParser& args = get_args();
465 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
468 // unknown size from the receiver point of view
469 size_t arg_size = args.size;
471 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
472 arg_size = status.count;
475 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
476 if (get_name() == "recv") {
478 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
479 } else if (get_name() == "irecv") {
480 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
481 req_storage.add(request);
486 TRACE_smpi_comm_out(get_pid());
487 if (is_recv && not TRACE_smpi_view_internals()) {
488 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
489 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
493 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
495 const ComputeParser& args = get_args();
496 if (smpi_cfg_simulate_computation()) {
497 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
501 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
503 const SleepParser& args = get_args();
504 XBT_DEBUG("Sleep for: %lf secs", args.time);
505 aid_t pid = simgrid::s4u::this_actor::get_pid();
506 TRACE_smpi_sleeping_in(pid, args.time);
507 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
508 TRACE_smpi_sleeping_out(pid);
511 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
513 const LocationParser& args = get_args();
514 smpi_trace_set_call_location(args.filename.c_str(), args.line);
517 void TestAction::kernel(simgrid::xbt::ReplayAction&)
519 const WaitTestParser& args = get_args();
520 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
521 req_storage.remove(request);
522 // if request is null here, this may mean that a previous test has succeeded
523 // Different times in traced application and replayed version may lead to this
524 // In this case, ignore the extra calls.
525 if (request != MPI_REQUEST_NULL) {
526 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
530 Request::test(&request, &status, &flag);
532 XBT_DEBUG("MPI_Test result: %d", flag);
533 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
535 if (request == MPI_REQUEST_NULL)
536 req_storage.addNullRequest(args.src, args.dst, args.tag);
538 req_storage.add(request);
540 TRACE_smpi_comm_out(get_pid());
544 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
546 CHECK_ACTION_PARAMS(action, 0, 1)
547 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
548 : MPI_BYTE; // default TAU datatype
550 /* start a simulated timer */
551 smpi_process()->simulated_start();
554 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
559 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
561 const size_t count_requests = req_storage.size();
563 if (count_requests > 0) {
564 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
565 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
566 std::vector<MPI_Request> reqs;
567 req_storage.get_requests(reqs);
568 for (auto const& req : reqs) {
569 if (req && (req->flags() & MPI_REQ_RECV)) {
570 sender_receiver.emplace_back(req->src(), req->dst());
573 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
574 req_storage.get_store().clear();
576 for (auto const& pair : sender_receiver) {
577 TRACE_smpi_recv(pair.first, pair.second, 0);
579 TRACE_smpi_comm_out(get_pid());
583 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
585 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
586 colls::barrier(MPI_COMM_WORLD);
587 TRACE_smpi_comm_out(get_pid());
590 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
592 const BcastArgParser& args = get_args();
593 TRACE_smpi_comm_in(get_pid(), "action_bcast",
594 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->rank(args.root), -1.0, args.size,
595 0, Datatype::encode(args.datatype1), ""));
597 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
599 TRACE_smpi_comm_out(get_pid());
602 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
604 const ReduceArgParser& args = get_args();
605 TRACE_smpi_comm_in(get_pid(), "action_reduce",
606 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->rank(args.root), args.comp_size,
607 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
609 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
610 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
611 args.root, MPI_COMM_WORLD);
612 if(args.comp_size != 0.0)
613 private_execute_flops(args.comp_size);
615 TRACE_smpi_comm_out(get_pid());
618 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
620 const AllReduceArgParser& args = get_args();
621 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
622 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
623 Datatype::encode(args.datatype1), ""));
625 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
626 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
628 if(args.comp_size != 0.0)
629 private_execute_flops(args.comp_size);
631 TRACE_smpi_comm_out(get_pid());
634 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
636 const AllToAllArgParser& args = get_args();
637 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
638 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
639 Datatype::encode(args.datatype1),
640 Datatype::encode(args.datatype2)));
642 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
643 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
646 TRACE_smpi_comm_out(get_pid());
649 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
651 const GatherArgParser& args = get_args();
652 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
653 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
654 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
655 Datatype::encode(args.datatype2)));
657 if (get_name() == "gather") {
658 int rank = MPI_COMM_WORLD->rank();
659 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
660 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
661 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
663 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
664 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
667 TRACE_smpi_comm_out(get_pid());
670 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
672 int rank = MPI_COMM_WORLD->rank();
673 const GatherVArgParser& args = get_args();
674 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
675 new simgrid::instr::VarCollTIData(
676 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, 0,
677 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
679 if (get_name() == "gatherv") {
680 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
681 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
682 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
684 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
685 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
686 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
689 TRACE_smpi_comm_out(get_pid());
692 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
694 int rank = MPI_COMM_WORLD->rank();
695 const ScatterArgParser& args = get_args();
696 TRACE_smpi_comm_in(get_pid(), "action_scatter",
697 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
698 Datatype::encode(args.datatype1),
699 Datatype::encode(args.datatype2)));
701 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
702 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
703 args.datatype2, args.root, MPI_COMM_WORLD);
705 TRACE_smpi_comm_out(get_pid());
708 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
710 int rank = MPI_COMM_WORLD->rank();
711 const ScatterVArgParser& args = get_args();
712 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
713 new simgrid::instr::VarCollTIData(get_name(), args.root, 0, args.sendcounts, args.recv_size,
714 nullptr, Datatype::encode(args.datatype1),
715 Datatype::encode(args.datatype2)));
717 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
718 args.sendcounts->data(), args.disps.data(), args.datatype1,
719 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
722 TRACE_smpi_comm_out(get_pid());
725 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
727 const ReduceScatterArgParser& args = get_args();
729 get_pid(), "action_reducescatter",
730 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, 0, args.recvcounts,
731 std::to_string(args.comp_size), /* ugly hack to print comp_size */
732 Datatype::encode(args.datatype1)));
734 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
735 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
736 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
738 private_execute_flops(args.comp_size);
739 TRACE_smpi_comm_out(get_pid());
742 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
744 const AllToAllVArgParser& args = get_args();
745 TRACE_smpi_comm_in(get_pid(), __func__,
746 new simgrid::instr::VarCollTIData(
747 "alltoallv", 0, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
748 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
750 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
751 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
752 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
754 TRACE_smpi_comm_out(get_pid());
756 } // Replay Namespace
757 }} // namespace simgrid::smpi
759 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
760 /** @brief Only initialize the replay, don't do it for real */
761 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
763 xbt_assert(not smpi_process()->initializing());
765 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
766 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
767 simgrid::smpi::ActorExt::init();
769 smpi_process()->mark_as_initialized();
770 smpi_process()->set_replaying(true);
772 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
773 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
774 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
775 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
776 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
777 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
778 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
784 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
785 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
786 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
787 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
788 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
789 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
790 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
791 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
792 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
793 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
794 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
795 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
796 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
797 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
798 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
799 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
800 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
802 //if we have a delayed start, sleep here.
803 if (start_delay_flops > 0) {
804 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
805 private_execute_flops(start_delay_flops);
807 // Wait for the other actors to initialize also
808 simgrid::s4u::this_actor::yield();
812 /** @brief actually run the replay after initialization */
813 void smpi_replay_main(int rank, const char* private_trace_filename)
815 static int active_processes = 0;
817 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
818 std::string rank_string = std::to_string(rank);
819 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
821 /* and now, finalize everything */
822 /* One active process will stop. Decrease the counter*/
823 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
824 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
825 if (count_requests > 0) {
826 std::vector<MPI_Request> requests(count_requests);
829 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
830 requests[i] = pair.second;
833 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
836 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
837 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
841 if(active_processes==0){
842 /* Last process alive speaking: end the simulated timer */
843 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
844 smpi_free_replay_tmp_buffers();
847 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
848 new simgrid::instr::NoOpTIData("finalize"));
850 smpi_process()->finalize();
852 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
855 /** @brief chain a replay initialization and a replay start */
856 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
858 smpi_replay_init(instance_id, rank, start_delay_flops);
859 smpi_replay_main(rank, private_trace_filename);