1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
20 #include <unordered_map>
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
31 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
36 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
42 static void apply(size_t& seed, Tuple const& tuple)
44 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45 hash_combine(seed, std::get<Index>(tuple));
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
51 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 template <typename... TT> class hash<std::tuple<TT...>> {
56 size_t operator()(std::tuple<TT...> const& tt) const
59 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
67 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68 std::string s = boost::algorithm::join(action, " ");
69 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
74 static double parse_double(const std::string& string)
76 return xbt_str_parse_double(string.c_str(), "not a double");
83 MPI_Datatype MPI_DEFAULT_TYPE;
85 class RequestStorage {
87 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
88 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
93 RequestStorage() = default;
94 size_t size() const { return store.size(); }
96 req_storage_t& get_store() { return store; }
98 void get_requests(std::vector<MPI_Request>& vec) const
100 for (auto const& pair : store) {
101 auto& req = pair.second;
102 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
103 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
104 vec.push_back(pair.second);
105 pair.second->print_request("MM");
110 MPI_Request find(int src, int dst, int tag)
112 auto it = store.find(req_key_t(src, dst, tag));
113 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
116 void remove(const Request* req)
118 if (req == MPI_REQUEST_NULL) return;
120 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
123 void add(MPI_Request req)
125 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
126 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
129 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
130 void addNullRequest(int src, int dst, int tag)
132 store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
137 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
139 CHECK_ACTION_PARAMS(action, 3, 0)
140 src = std::stoi(action[2]);
141 dst = std::stoi(action[3]);
142 tag = std::stoi(action[4]);
145 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
147 CHECK_ACTION_PARAMS(action, 3, 1)
148 partner = std::stoi(action[2]);
149 tag = std::stoi(action[3]);
150 size = parse_double(action[4]);
151 if (action.size() > 5)
152 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
155 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
157 CHECK_ACTION_PARAMS(action, 1, 0)
158 flops = parse_double(action[2]);
161 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 CHECK_ACTION_PARAMS(action, 1, 0)
164 time = parse_double(action[2]);
167 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 CHECK_ACTION_PARAMS(action, 2, 0)
170 filename = std::string(action[2]);
171 line = std::stoi(action[3]);
174 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 1, 2)
177 size = parse_double(action[2]);
178 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
179 if (action.size() > 4)
180 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
183 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
185 CHECK_ACTION_PARAMS(action, 2, 2)
186 double arg2 = trunc(parse_double(action[2]));
187 xbt_assert(0.0 <= arg2 && arg2 <= static_cast<double>(std::numeric_limits<unsigned>::max()));
188 comm_size = static_cast<unsigned>(arg2);
189 comp_size = parse_double(action[3]);
190 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
191 if (action.size() > 5)
192 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
195 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
197 CHECK_ACTION_PARAMS(action, 2, 1)
198 double arg2 = trunc(parse_double(action[2]));
199 xbt_assert(0.0 <= arg2 && arg2 <= static_cast<double>(std::numeric_limits<unsigned>::max()));
200 comm_size = static_cast<unsigned>(arg2);
201 comp_size = parse_double(action[3]);
202 if (action.size() > 4)
203 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
206 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
208 CHECK_ACTION_PARAMS(action, 2, 1)
209 comm_size = MPI_COMM_WORLD->size();
210 send_size = parse_double(action[2]);
211 recv_size = parse_double(action[3]);
213 if (action.size() > 4)
214 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
215 if (action.size() > 5)
216 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
219 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
221 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
224 1) 68 is the sendcounts
225 2) 68 is the recvcounts
226 3) 0 is the root node
227 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
228 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
230 CHECK_ACTION_PARAMS(action, 2, 3)
231 comm_size = MPI_COMM_WORLD->size();
232 send_size = parse_double(action[2]);
233 recv_size = parse_double(action[3]);
235 if (name == "gather") {
236 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
237 if (action.size() > 5)
238 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
239 if (action.size() > 6)
240 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
242 if (action.size() > 4)
243 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
244 if (action.size() > 5)
245 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
249 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
251 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
252 0 gather 68 68 10 10 10 0 0 0
254 1) 68 is the sendcount
255 2) 68 10 10 10 is the recvcounts
256 3) 0 is the root node
257 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
258 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
260 comm_size = MPI_COMM_WORLD->size();
261 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
262 send_size = parse_double(action[2]);
263 disps = std::vector<int>(comm_size, 0);
264 recvcounts = std::make_shared<std::vector<int>>(comm_size);
266 if (name == "gatherv") {
267 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
268 if (action.size() > 4 + comm_size)
269 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
270 if (action.size() > 5 + comm_size)
271 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
274 /* The 3 comes from "0 gather <sendcount>", which must always be present.
275 * The + comm_size is the recvcounts array, which must also be present
277 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
278 int datatype_index = 3 + comm_size;
279 disp_index = datatype_index + 1;
280 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
281 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
282 } else if (action.size() >
283 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
284 disp_index = 3 + comm_size;
285 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
286 int datatype_index = 3 + comm_size;
287 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
288 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
291 if (disp_index != 0) {
292 for (unsigned int i = 0; i < comm_size; i++)
293 disps[i] = std::stoi(action[disp_index + i]);
297 for (unsigned int i = 0; i < comm_size; i++) {
298 (*recvcounts)[i] = std::stoi(action[i + 3]);
300 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
303 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
305 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
308 1) 68 is the sendcounts
309 2) 68 is the recvcounts
310 3) 0 is the root node
311 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
312 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
314 CHECK_ACTION_PARAMS(action, 2, 3)
315 comm_size = MPI_COMM_WORLD->size();
316 send_size = parse_double(action[2]);
317 recv_size = parse_double(action[3]);
318 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
319 if (action.size() > 5)
320 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
321 if (action.size() > 6)
322 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
325 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
327 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
328 0 gather 68 10 10 10 68 0 0 0
330 1) 68 10 10 10 is the sendcounts
331 2) 68 is the recvcount
332 3) 0 is the root node
333 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
334 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
336 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
337 recv_size = parse_double(action[2 + comm_size]);
338 disps = std::vector<int>(comm_size, 0);
339 sendcounts = std::make_shared<std::vector<int>>(comm_size);
341 if (action.size() > 5 + comm_size)
342 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
343 if (action.size() > 5 + comm_size)
344 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
346 for (unsigned int i = 0; i < comm_size; i++) {
347 (*sendcounts)[i] = std::stoi(action[i + 2]);
349 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
350 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
353 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
355 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
356 0 reducescatter 275427 275427 275427 204020 11346849 0
358 1) The first four values after the name of the action declare the recvcounts array
359 2) The value 11346849 is the amount of instructions
360 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
362 comm_size = MPI_COMM_WORLD->size();
363 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
364 comp_size = parse_double(action[2 + comm_size]);
365 recvcounts = std::make_shared<std::vector<int>>(comm_size);
366 if (action.size() > 3 + comm_size)
367 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
369 for (unsigned int i = 0; i < comm_size; i++) {
370 recvcounts->push_back(std::stoi(action[i + 2]));
372 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
375 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
377 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
378 0 alltoallv 100 1 7 10 12 100 1 70 10 5
380 1) 100 is the size of the send buffer *sizeof(int),
381 2) 1 7 10 12 is the sendcounts array
382 3) 100*sizeof(int) is the size of the receiver buffer
383 4) 1 70 10 5 is the recvcounts array
385 comm_size = MPI_COMM_WORLD->size();
386 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
387 sendcounts = std::make_shared<std::vector<int>>(comm_size);
388 recvcounts = std::make_shared<std::vector<int>>(comm_size);
389 senddisps = std::vector<int>(comm_size, 0);
390 recvdisps = std::vector<int>(comm_size, 0);
392 if (action.size() > 5 + 2 * comm_size)
393 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
394 if (action.size() > 5 + 2 * comm_size)
395 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
397 send_buf_size = parse_double(action[2]);
398 recv_buf_size = parse_double(action[3 + comm_size]);
399 for (unsigned int i = 0; i < comm_size; i++) {
400 (*sendcounts)[i] = std::stoi(action[3 + i]);
401 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
403 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
404 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
409 std::string s = boost::algorithm::join(action, " ");
410 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411 const WaitTestParser& args = get_args();
412 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413 req_storage.remove(request);
415 if (request == MPI_REQUEST_NULL) {
416 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
421 aid_t rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
423 // Must be taken before Request::wait() since the request may be set to
424 // MPI_REQUEST_NULL by Request::wait!
425 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
426 // TODO: Here we take the rank while we normally take the process id (look for get_pid())
427 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
430 Request::wait(&request, &status);
432 TRACE_smpi_comm_out(rank);
433 if (is_wait_for_receive)
434 TRACE_smpi_recv(args.src, args.dst, args.tag);
437 void SendAction::kernel(simgrid::xbt::ReplayAction&)
439 const SendRecvParser& args = get_args();
440 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
444 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
445 if (not TRACE_smpi_view_internals())
446 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
448 if (get_name() == "send") {
449 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450 } else if (get_name() == "isend") {
451 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452 req_storage.add(request);
454 xbt_die("Don't know this action, %s", get_name().c_str());
457 TRACE_smpi_comm_out(get_pid());
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
462 const SendRecvParser& args = get_args();
465 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
468 // unknown size from the receiver point of view
469 double arg_size = args.size;
470 if (arg_size <= 0.0) {
471 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
472 arg_size = status.count;
475 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
476 if (get_name() == "recv") {
478 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
479 } else if (get_name() == "irecv") {
480 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
481 req_storage.add(request);
486 TRACE_smpi_comm_out(get_pid());
487 if (is_recv && not TRACE_smpi_view_internals()) {
488 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
489 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
493 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
495 const ComputeParser& args = get_args();
496 if (smpi_cfg_simulate_computation()) {
497 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
501 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
503 const SleepParser& args = get_args();
504 XBT_DEBUG("Sleep for: %lf secs", args.time);
505 aid_t pid = simgrid::s4u::this_actor::get_pid();
506 TRACE_smpi_sleeping_in(pid, args.time);
507 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
508 TRACE_smpi_sleeping_out(pid);
511 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
513 const LocationParser& args = get_args();
514 smpi_trace_set_call_location(args.filename.c_str(), args.line);
517 void TestAction::kernel(simgrid::xbt::ReplayAction&)
519 const WaitTestParser& args = get_args();
520 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
521 req_storage.remove(request);
522 // if request is null here, this may mean that a previous test has succeeded
523 // Different times in traced application and replayed version may lead to this
524 // In this case, ignore the extra calls.
525 if (request != MPI_REQUEST_NULL) {
526 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
530 Request::test(&request, &status, &flag);
532 XBT_DEBUG("MPI_Test result: %d", flag);
533 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
535 if (request == MPI_REQUEST_NULL)
536 req_storage.addNullRequest(args.src, args.dst, args.tag);
538 req_storage.add(request);
540 TRACE_smpi_comm_out(get_pid());
544 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
546 CHECK_ACTION_PARAMS(action, 0, 1)
547 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
548 : MPI_BYTE; // default TAU datatype
550 /* start a simulated timer */
551 smpi_process()->simulated_start();
554 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
559 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
561 const size_t count_requests = req_storage.size();
563 if (count_requests > 0) {
564 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
565 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
566 std::vector<MPI_Request> reqs;
567 req_storage.get_requests(reqs);
568 for (auto const& req : reqs) {
569 if (req && (req->flags() & MPI_REQ_RECV)) {
570 sender_receiver.emplace_back(req->src(), req->dst());
573 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
574 req_storage.get_store().clear();
576 for (auto const& pair : sender_receiver) {
577 TRACE_smpi_recv(pair.first, pair.second, 0);
579 TRACE_smpi_comm_out(get_pid());
583 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
585 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
586 colls::barrier(MPI_COMM_WORLD);
587 TRACE_smpi_comm_out(get_pid());
590 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
592 const BcastArgParser& args = get_args();
593 TRACE_smpi_comm_in(get_pid(), "action_bcast",
594 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root), -1.0, args.size,
595 0, Datatype::encode(args.datatype1), ""));
597 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
599 TRACE_smpi_comm_out(get_pid());
602 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
604 const ReduceArgParser& args = get_args();
605 TRACE_smpi_comm_in(get_pid(), "action_reduce",
606 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root), args.comp_size,
607 args.comm_size, 0, Datatype::encode(args.datatype1), ""));
609 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
610 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
611 args.root, MPI_COMM_WORLD);
612 private_execute_flops(args.comp_size);
614 TRACE_smpi_comm_out(get_pid());
617 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
619 const AllReduceArgParser& args = get_args();
620 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
621 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
622 Datatype::encode(args.datatype1), ""));
624 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
625 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
627 private_execute_flops(args.comp_size);
629 TRACE_smpi_comm_out(get_pid());
632 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
634 const AllToAllArgParser& args = get_args();
635 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
636 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
637 Datatype::encode(args.datatype1),
638 Datatype::encode(args.datatype2)));
640 colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
641 recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
644 TRACE_smpi_comm_out(get_pid());
647 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
649 const GatherArgParser& args = get_args();
650 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
651 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
652 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
653 Datatype::encode(args.datatype2)));
655 if (get_name() == "gather") {
656 int rank = MPI_COMM_WORLD->rank();
657 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
658 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
659 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
661 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
665 TRACE_smpi_comm_out(get_pid());
668 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
670 int rank = MPI_COMM_WORLD->rank();
671 const GatherVArgParser& args = get_args();
672 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
673 new simgrid::instr::VarCollTIData(
674 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, 0,
675 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
677 if (get_name() == "gatherv") {
678 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
679 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
680 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
682 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
683 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
684 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
687 TRACE_smpi_comm_out(get_pid());
690 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
692 int rank = MPI_COMM_WORLD->rank();
693 const ScatterArgParser& args = get_args();
694 TRACE_smpi_comm_in(get_pid(), "action_scatter",
695 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
696 Datatype::encode(args.datatype1),
697 Datatype::encode(args.datatype2)));
699 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
701 args.datatype2, args.root, MPI_COMM_WORLD);
703 TRACE_smpi_comm_out(get_pid());
706 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
708 int rank = MPI_COMM_WORLD->rank();
709 const ScatterVArgParser& args = get_args();
710 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
711 new simgrid::instr::VarCollTIData(get_name(), args.root, 0, args.sendcounts, args.recv_size,
712 nullptr, Datatype::encode(args.datatype1),
713 Datatype::encode(args.datatype2)));
715 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
716 args.sendcounts->data(), args.disps.data(), args.datatype1,
717 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
720 TRACE_smpi_comm_out(get_pid());
723 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
725 const ReduceScatterArgParser& args = get_args();
727 get_pid(), "action_reducescatter",
728 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, 0, args.recvcounts,
729 std::to_string(args.comp_size), /* ugly hack to print comp_size */
730 Datatype::encode(args.datatype1)));
732 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
733 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
734 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
736 private_execute_flops(args.comp_size);
737 TRACE_smpi_comm_out(get_pid());
740 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
742 const AllToAllVArgParser& args = get_args();
743 TRACE_smpi_comm_in(get_pid(), __func__,
744 new simgrid::instr::VarCollTIData(
745 "alltoallv", 0, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
746 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
748 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
749 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
750 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
752 TRACE_smpi_comm_out(get_pid());
754 } // Replay Namespace
755 }} // namespace simgrid::smpi
757 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
758 /** @brief Only initialize the replay, don't do it for real */
759 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
761 xbt_assert(not smpi_process()->initializing());
763 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
764 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
765 simgrid::smpi::ActorExt::init();
767 smpi_process()->mark_as_initialized();
768 smpi_process()->set_replaying(true);
770 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
771 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
772 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
773 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
774 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
775 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
776 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
784 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
785 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
786 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
787 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
788 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
789 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
790 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
791 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
792 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
793 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
794 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
795 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
796 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
797 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
798 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
800 //if we have a delayed start, sleep here.
801 if (start_delay_flops > 0) {
802 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
803 private_execute_flops(start_delay_flops);
805 // Wait for the other actors to initialize also
806 simgrid::s4u::this_actor::yield();
810 /** @brief actually run the replay after initialization */
811 void smpi_replay_main(int rank, const char* private_trace_filename)
813 static int active_processes = 0;
815 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
816 std::string rank_string = std::to_string(rank);
817 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
819 /* and now, finalize everything */
820 /* One active process will stop. Decrease the counter*/
821 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
822 XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
823 if (count_requests > 0) {
824 std::vector<MPI_Request> requests(count_requests);
827 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
828 requests[i] = pair.second;
831 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
835 if(active_processes==0){
836 /* Last process alive speaking: end the simulated timer */
837 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
838 smpi_free_replay_tmp_buffers();
841 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
842 new simgrid::instr::NoOpTIData("finalize"));
844 smpi_process()->finalize();
846 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
849 /** @brief chain a replay initialization and a replay start */
850 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
852 smpi_replay_init(instance_id, rank, start_delay_flops);
853 smpi_replay_main(rank, private_trace_filename);