1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
23 #include <unordered_map>
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
33 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
38 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
44 static void apply(size_t& seed, Tuple const& tuple)
46 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47 hash_combine(seed, std::get<Index>(tuple));
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
53 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
56 template <typename... TT> class hash<std::tuple<TT...>> {
58 size_t operator()(std::tuple<TT...> const& tt) const
61 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
75 /* Helper functions */
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "not a double");
81 template <typename T> static T parse_integer(const std::string& string)
83 double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84 xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85 val <= static_cast<double>(std::numeric_limits<T>::max()),
86 "out of range: %g", val);
87 return static_cast<T>(val);
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
92 return i < action.size() ? std::stoi(action[i]) : 0;
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
97 return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
104 MPI_Datatype MPI_DEFAULT_TYPE;
106 class RequestStorage {
108 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
109 using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
114 RequestStorage() = default;
115 size_t size() const { return store.size(); }
117 req_storage_t& get_store() { return store; }
119 void get_requests(std::vector<MPI_Request>& vec) const
121 for (auto const& [_, reqs] : store) {
122 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
123 for (auto& req: reqs){
124 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
126 req->print_request("MM");
132 MPI_Request pop(int src, int dst, int tag)
134 auto it = store.find(req_key_t(src, dst, tag));
135 if (it == store.end())
136 return MPI_REQUEST_NULL;
137 MPI_Request req = it->second.front();
138 it->second.pop_front();
139 if(it->second.empty())
140 store.erase(req_key_t(src, dst, tag));
144 void add(MPI_Request req)
146 if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
147 store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
151 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
152 void addNullRequest(int src, int dst, int tag)
154 int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
155 int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
156 store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
160 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
162 CHECK_ACTION_PARAMS(action, 3, 0)
163 src = std::stoi(action[2]);
164 dst = std::stoi(action[3]);
165 tag = std::stoi(action[4]);
168 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 CHECK_ACTION_PARAMS(action, 3, 1)
171 partner = std::stoi(action[2]);
172 tag = std::stoi(action[3]);
173 size = parse_integer<size_t>(action[4]);
174 datatype1 = parse_datatype(action, 5);
177 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
179 CHECK_ACTION_PARAMS(action, 1, 0)
180 flops = parse_double(action[2]);
183 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
185 CHECK_ACTION_PARAMS(action, 1, 0)
186 time = parse_double(action[2]);
189 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
191 CHECK_ACTION_PARAMS(action, 2, 0)
192 filename = std::string(action[2]);
193 line = std::stoi(action[3]);
196 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
198 CHECK_ACTION_PARAMS(action, 1, 2)
199 size = parse_integer<size_t>(action[2]);
200 root = parse_root(action, 3);
201 datatype1 = parse_datatype(action, 4);
204 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
206 CHECK_ACTION_PARAMS(action, 2, 2)
207 comm_size = parse_integer<unsigned>(action[2]);
208 comp_size = parse_double(action[3]);
209 root = parse_root(action, 4);
210 datatype1 = parse_datatype(action, 5);
213 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
215 CHECK_ACTION_PARAMS(action, 2, 1)
216 comm_size = parse_integer<unsigned>(action[2]);
217 comp_size = parse_double(action[3]);
218 datatype1 = parse_datatype(action, 4);
221 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
223 CHECK_ACTION_PARAMS(action, 2, 1)
224 comm_size = MPI_COMM_WORLD->size();
225 send_size = parse_integer<int>(action[2]);
226 recv_size = parse_integer<int>(action[3]);
227 datatype1 = parse_datatype(action, 4);
228 datatype2 = parse_datatype(action, 5);
231 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
233 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
236 1) 68 is the sendcounts
237 2) 68 is the recvcounts
238 3) 0 is the root node
239 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
240 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
242 CHECK_ACTION_PARAMS(action, 2, 3)
243 comm_size = MPI_COMM_WORLD->size();
244 send_size = parse_integer<int>(action[2]);
245 recv_size = parse_integer<int>(action[3]);
247 if (name == "gather") {
248 root = parse_root(action, 4);
249 datatype1 = parse_datatype(action, 5);
250 datatype2 = parse_datatype(action, 6);
253 datatype1 = parse_datatype(action, 4);
254 datatype2 = parse_datatype(action, 5);
258 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
260 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
261 0 gather 68 68 10 10 10 0 0 0
263 1) 68 is the sendcount
264 2) 68 10 10 10 is the recvcounts
265 3) 0 is the root node
266 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
267 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
269 comm_size = MPI_COMM_WORLD->size();
270 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
271 send_size = parse_integer<int>(action[2]);
272 disps = std::vector<int>(comm_size, 0);
273 recvcounts = std::make_shared<std::vector<int>>(comm_size);
275 if (name == "gatherv") {
276 root = parse_root(action, 3 + comm_size);
277 datatype1 = parse_datatype(action, 4 + comm_size);
278 datatype2 = parse_datatype(action, 5 + comm_size);
281 unsigned disp_index = 0;
282 /* The 3 comes from "0 gather <sendcount>", which must always be present.
283 * The + comm_size is the recvcounts array, which must also be present
285 if (action.size() > 3 + comm_size + comm_size) {
286 // datatype + disp are specified
287 datatype1 = parse_datatype(action, 3 + comm_size);
288 datatype2 = parse_datatype(action, 4 + comm_size);
289 disp_index = 5 + comm_size;
290 } else if (action.size() > 3 + comm_size + 2) {
291 // disps specified; datatype is not specified; use the default one
292 datatype1 = MPI_DEFAULT_TYPE;
293 datatype2 = MPI_DEFAULT_TYPE;
294 disp_index = 3 + comm_size;
296 // no disp specified, maybe only datatype,
297 datatype1 = parse_datatype(action, 3 + comm_size);
298 datatype2 = parse_datatype(action, 4 + comm_size);
301 if (disp_index != 0) {
302 xbt_assert(disp_index + comm_size <= action.size());
303 for (unsigned i = 0; i < comm_size; i++)
304 disps[i] = std::stoi(action[disp_index + i]);
308 for (unsigned int i = 0; i < comm_size; i++) {
309 (*recvcounts)[i] = std::stoi(action[i + 3]);
311 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
314 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
316 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
319 1) 68 is the sendcounts
320 2) 68 is the recvcounts
321 3) 0 is the root node
322 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
323 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
325 comm_size = MPI_COMM_WORLD->size();
326 CHECK_ACTION_PARAMS(action, 2, 3)
327 comm_size = MPI_COMM_WORLD->size();
328 send_size = parse_integer<int>(action[2]);
329 recv_size = parse_integer<int>(action[3]);
330 root = parse_root(action, 4);
331 datatype1 = parse_datatype(action, 5);
332 datatype2 = parse_datatype(action, 6);
335 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
337 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
338 0 gather 68 10 10 10 68 0 0 0
340 1) 68 10 10 10 is the sendcounts
341 2) 68 is the recvcount
342 3) 0 is the root node
343 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
344 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
346 comm_size = MPI_COMM_WORLD->size();
347 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
348 recv_size = parse_integer<int>(action[2 + comm_size]);
349 disps = std::vector<int>(comm_size, 0);
350 sendcounts = std::make_shared<std::vector<int>>(comm_size);
352 root = parse_root(action, 3 + comm_size);
353 datatype1 = parse_datatype(action, 4 + comm_size);
354 datatype2 = parse_datatype(action, 5 + comm_size);
356 for (unsigned int i = 0; i < comm_size; i++) {
357 (*sendcounts)[i] = std::stoi(action[i + 2]);
359 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
362 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
364 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
365 0 reducescatter 275427 275427 275427 204020 11346849 0
367 1) The first four values after the name of the action declare the recvcounts array
368 2) The value 11346849 is the amount of instructions
369 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
371 comm_size = MPI_COMM_WORLD->size();
372 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
373 comp_size = parse_double(action[2 + comm_size]);
374 recvcounts = std::make_shared<std::vector<int>>(comm_size);
375 datatype1 = parse_datatype(action, 3 + comm_size);
377 for (unsigned int i = 0; i < comm_size; i++) {
378 (*recvcounts)[i]= std::stoi(action[i + 2]);
380 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
383 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
385 CHECK_ACTION_PARAMS(action, 2, 1)
386 size = parse_integer<size_t>(action[2]);
387 comp_size = parse_double(action[3]);
388 datatype1 = parse_datatype(action, 4);
391 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
393 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
394 0 alltoallv 100 1 7 10 12 100 1 70 10 5
396 1) 100 is the size of the send buffer *sizeof(int),
397 2) 1 7 10 12 is the sendcounts array
398 3) 100*sizeof(int) is the size of the receiver buffer
399 4) 1 70 10 5 is the recvcounts array
401 comm_size = MPI_COMM_WORLD->size();
402 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
403 sendcounts = std::make_shared<std::vector<int>>(comm_size);
404 recvcounts = std::make_shared<std::vector<int>>(comm_size);
405 senddisps = std::vector<int>(comm_size, 0);
406 recvdisps = std::vector<int>(comm_size, 0);
408 datatype1 = parse_datatype(action, 4 + 2 * comm_size);
409 datatype2 = parse_datatype(action, 5 + 2 * comm_size);
411 send_buf_size = parse_integer<int>(action[2]);
412 recv_buf_size = parse_integer<int>(action[3 + comm_size]);
413 for (unsigned int i = 0; i < comm_size; i++) {
414 (*sendcounts)[i] = std::stoi(action[3 + i]);
415 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
417 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
418 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
421 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
423 std::string s = boost::algorithm::join(action, " ");
424 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
425 const WaitTestParser& args = get_args();
426 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
428 if (request == MPI_REQUEST_NULL) {
429 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
434 // Must be taken before Request::wait() since the request may be set to
435 // MPI_REQUEST_NULL by Request::wait!
436 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
438 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
441 Request::wait(&request, &status);
442 if(request!=MPI_REQUEST_NULL)
443 Request::unref(&request);
444 TRACE_smpi_comm_out(get_pid());
445 if (is_wait_for_receive)
446 TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
449 void SendAction::kernel(simgrid::xbt::ReplayAction&)
451 const SendRecvParser& args = get_args();
452 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
456 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
457 if (not TRACE_smpi_view_internals())
458 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
460 if (get_name() == "send") {
461 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
462 } else if (get_name() == "isend") {
463 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
464 req_storage.add(request);
466 xbt_die("Don't know this action, %s", get_name().c_str());
469 TRACE_smpi_comm_out(get_pid());
472 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
474 const SendRecvParser& args = get_args();
477 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
480 // unknown size from the receiver point of view
481 size_t arg_size = args.size;
483 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
484 arg_size = status.count;
487 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
488 if (get_name() == "recv") {
490 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
491 } else if (get_name() == "irecv") {
492 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
493 req_storage.add(request);
498 TRACE_smpi_comm_out(get_pid());
499 if (is_recv && not TRACE_smpi_view_internals()) {
500 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
501 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
505 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
507 const ComputeParser& args = get_args();
508 if (smpi_cfg_simulate_computation()) {
509 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
513 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
515 const SleepParser& args = get_args();
516 XBT_DEBUG("Sleep for: %lf secs", args.time);
517 aid_t pid = simgrid::s4u::this_actor::get_pid();
518 TRACE_smpi_sleeping_in(pid, args.time);
519 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
520 TRACE_smpi_sleeping_out(pid);
523 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
525 const LocationParser& args = get_args();
526 smpi_trace_set_call_location(args.filename.c_str(), args.line);
529 void TestAction::kernel(simgrid::xbt::ReplayAction&)
531 const WaitTestParser& args = get_args();
532 MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
533 // if request is null here, this may mean that a previous test has succeeded
534 // Different times in traced application and replayed version may lead to this
535 // In this case, ignore the extra calls.
536 if (request != MPI_REQUEST_NULL) {
537 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
541 Request::test(&request, &status, &flag);
543 XBT_DEBUG("MPI_Test result: %d", flag);
544 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
546 if (request == MPI_REQUEST_NULL)
547 req_storage.addNullRequest(args.src, args.dst, args.tag);
549 req_storage.add(request);
551 TRACE_smpi_comm_out(get_pid());
555 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
557 CHECK_ACTION_PARAMS(action, 0, 1)
558 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
559 : MPI_BYTE; // default TAU datatype
561 /* start a simulated timer */
562 smpi_process()->simulated_start();
565 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
570 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
572 if (req_storage.size() > 0) {
573 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
574 std::vector<MPI_Request> reqs;
575 req_storage.get_requests(reqs);
576 unsigned long count_requests = reqs.size();
577 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
578 for (auto const& req : reqs) {
579 if (req && (req->flags() & MPI_REQ_RECV)) {
580 sender_receiver.emplace_back(req->src(), req->dst());
583 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
584 req_storage.get_store().clear();
586 for (MPI_Request& req : reqs)
587 if (req != MPI_REQUEST_NULL)
588 Request::unref(&req);
590 for (auto const& [src, dst] : sender_receiver) {
591 TRACE_smpi_recv(src, dst, 0);
593 TRACE_smpi_comm_out(get_pid());
597 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
599 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
600 colls::barrier(MPI_COMM_WORLD);
601 TRACE_smpi_comm_out(get_pid());
604 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
606 const BcastArgParser& args = get_args();
607 TRACE_smpi_comm_in(get_pid(), "action_bcast",
608 new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
609 0, Datatype::encode(args.datatype1), ""));
611 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
613 TRACE_smpi_comm_out(get_pid());
616 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
618 const ReduceArgParser& args = get_args();
619 TRACE_smpi_comm_in(get_pid(), "action_reduce",
620 new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
621 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
623 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
624 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
625 args.root, MPI_COMM_WORLD);
626 if (args.comp_size != 0.0)
627 simgrid::s4u::this_actor::exec_init(args.comp_size)
628 ->set_name("computation")
632 TRACE_smpi_comm_out(get_pid());
635 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
637 const AllReduceArgParser& args = get_args();
638 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
639 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
640 Datatype::encode(args.datatype1), ""));
642 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
643 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
645 if (args.comp_size != 0.0)
646 simgrid::s4u::this_actor::exec_init(args.comp_size)
647 ->set_name("computation")
651 TRACE_smpi_comm_out(get_pid());
654 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
656 const AllToAllArgParser& args = get_args();
657 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
658 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
659 Datatype::encode(args.datatype1),
660 Datatype::encode(args.datatype2)));
662 colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
663 recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
666 TRACE_smpi_comm_out(get_pid());
669 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
671 const GatherArgParser& args = get_args();
672 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
673 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
674 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
675 Datatype::encode(args.datatype2)));
677 if (get_name() == "gather") {
678 int rank = MPI_COMM_WORLD->rank();
679 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
680 (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
681 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
683 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
684 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
687 TRACE_smpi_comm_out(get_pid());
690 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
692 int rank = MPI_COMM_WORLD->rank();
693 const GatherVArgParser& args = get_args();
694 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
695 new simgrid::instr::VarCollTIData(
696 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
697 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
699 if (get_name() == "gatherv") {
700 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
702 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
704 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
705 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
706 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
709 TRACE_smpi_comm_out(get_pid());
712 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
714 int rank = MPI_COMM_WORLD->rank();
715 const ScatterArgParser& args = get_args();
716 TRACE_smpi_comm_in(get_pid(), "action_scatter",
717 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
718 Datatype::encode(args.datatype1),
719 Datatype::encode(args.datatype2)));
721 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
722 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
723 args.datatype2, args.root, MPI_COMM_WORLD);
725 TRACE_smpi_comm_out(get_pid());
728 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
730 int rank = MPI_COMM_WORLD->rank();
731 const ScatterVArgParser& args = get_args();
732 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
733 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
734 nullptr, Datatype::encode(args.datatype1),
735 Datatype::encode(args.datatype2)));
737 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
738 args.sendcounts->data(), args.disps.data(), args.datatype1,
739 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
742 TRACE_smpi_comm_out(get_pid());
745 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
747 const ReduceScatterArgParser& args = get_args();
749 get_pid(), "action_reducescatter",
750 new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
751 /* ugly as we use datatype field to pass computation as string */
752 /* and because of the trick to avoid getting 0.000000 when 0 is given */
753 args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
754 Datatype::encode(args.datatype1)));
756 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
757 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
758 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
759 if (args.comp_size != 0.0)
760 simgrid::s4u::this_actor::exec_init(args.comp_size)
761 ->set_name("computation")
764 TRACE_smpi_comm_out(get_pid());
767 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
769 const ScanArgParser& args = get_args();
770 TRACE_smpi_comm_in(get_pid(), "action_scan",
771 new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
772 args.size, 0, Datatype::encode(args.datatype1), ""));
773 if (get_name() == "scan")
774 colls::scan(send_buffer(args.size * args.datatype1->size()),
775 recv_buffer(args.size * args.datatype1->size()), args.size,
776 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
778 colls::exscan(send_buffer(args.size * args.datatype1->size()),
779 recv_buffer(args.size * args.datatype1->size()), args.size,
780 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
782 if (args.comp_size != 0.0)
783 simgrid::s4u::this_actor::exec_init(args.comp_size)
784 ->set_name("computation")
787 TRACE_smpi_comm_out(get_pid());
790 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
792 const AllToAllVArgParser& args = get_args();
793 TRACE_smpi_comm_in(get_pid(), __func__,
794 new simgrid::instr::VarCollTIData(
795 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
796 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
798 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
799 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
800 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
802 TRACE_smpi_comm_out(get_pid());
804 } // Replay Namespace
805 }} // namespace simgrid::smpi
807 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
808 /** @brief Only initialize the replay, don't do it for real */
809 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
811 xbt_assert(not smpi_process()->initializing());
813 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
814 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
815 simgrid::smpi::ActorExt::init();
817 smpi_process()->mark_as_initialized();
818 smpi_process()->set_replaying(true);
820 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
821 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
822 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
823 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
824 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
825 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
826 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
827 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
828 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
829 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
830 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
831 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
832 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
833 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
834 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
835 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
836 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
837 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
838 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
839 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
840 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
841 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
842 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
843 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
844 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
845 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
846 xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
847 xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
848 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
849 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
850 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
852 //if we have a delayed start, sleep here.
853 if (start_delay_flops > 0) {
854 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
855 private_execute_flops(start_delay_flops);
857 // Wait for the other actors to initialize also
858 simgrid::s4u::this_actor::yield();
860 if(_smpi_init_sleep > 0)
861 simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
864 /** @brief actually run the replay after initialization */
865 void smpi_replay_main(int rank, const char* private_trace_filename)
867 static int active_processes = 0;
869 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
870 std::string rank_string = std::to_string(rank);
871 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
873 /* and now, finalize everything */
874 /* One active process will stop. Decrease the counter*/
875 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
876 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
877 if (count_requests > 0) {
878 std::vector<MPI_Request> requests(count_requests);
881 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
882 for (auto& req: pair.second){
887 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
890 if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
891 simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
895 if(active_processes==0){
896 /* Last process alive speaking: end the simulated timer */
897 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
898 smpi_free_replay_tmp_buffers();
901 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
902 new simgrid::instr::NoOpTIData("finalize"));
904 smpi_process()->finalize();
906 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
909 /** @brief chain a replay initialization and a replay start */
910 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
912 smpi_replay_init(instance_id, rank, start_delay_flops);
913 smpi_replay_main(rank, private_trace_filename);