1 /* Copyright (c) 2009-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
93 int size() const { return store.size(); }
95 req_storage_t& get_store()
100 void get_requests(std::vector<MPI_Request>& vec) const
102 for (auto const& pair : store) {
103 auto& req = pair.second;
104 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
105 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
106 vec.push_back(pair.second);
107 pair.second->print_request("MM");
112 MPI_Request find(int src, int dst, int tag)
114 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
115 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
118 void remove(const Request* req)
120 if (req == MPI_REQUEST_NULL) return;
122 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
125 void add(MPI_Request req)
127 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
128 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
131 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
132 void addNullRequest(int src, int dst, int tag)
134 store.insert({req_key_t(
135 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
136 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
137 tag), MPI_REQUEST_NULL});
141 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
143 CHECK_ACTION_PARAMS(action, 3, 0)
144 src = std::stoi(action[2]);
145 dst = std::stoi(action[3]);
146 tag = std::stoi(action[4]);
149 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
151 CHECK_ACTION_PARAMS(action, 3, 1)
152 partner = std::stoi(action[2]);
153 tag = std::stoi(action[3]);
154 size = parse_double(action[4]);
155 if (action.size() > 5)
156 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
159 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
161 CHECK_ACTION_PARAMS(action, 1, 0)
162 flops = parse_double(action[2]);
165 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
167 CHECK_ACTION_PARAMS(action, 1, 0)
168 time = parse_double(action[2]);
171 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
173 CHECK_ACTION_PARAMS(action, 2, 0)
174 filename = std::string(action[2]);
175 line = std::stoi(action[3]);
178 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
180 CHECK_ACTION_PARAMS(action, 1, 2)
181 size = parse_double(action[2]);
182 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
183 if (action.size() > 4)
184 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
187 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
189 CHECK_ACTION_PARAMS(action, 2, 2)
190 comm_size = parse_double(action[2]);
191 comp_size = parse_double(action[3]);
192 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
193 if (action.size() > 5)
194 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
197 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
199 CHECK_ACTION_PARAMS(action, 2, 1)
200 comm_size = parse_double(action[2]);
201 comp_size = parse_double(action[3]);
202 if (action.size() > 4)
203 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
206 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
208 CHECK_ACTION_PARAMS(action, 2, 1)
209 comm_size = MPI_COMM_WORLD->size();
210 send_size = parse_double(action[2]);
211 recv_size = parse_double(action[3]);
213 if (action.size() > 4)
214 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
215 if (action.size() > 5)
216 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
219 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
221 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
224 1) 68 is the sendcounts
225 2) 68 is the recvcounts
226 3) 0 is the root node
227 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
228 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
230 CHECK_ACTION_PARAMS(action, 2, 3)
231 comm_size = MPI_COMM_WORLD->size();
232 send_size = parse_double(action[2]);
233 recv_size = parse_double(action[3]);
235 if (name == "gather") {
236 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
237 if (action.size() > 5)
238 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
239 if (action.size() > 6)
240 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
242 if (action.size() > 4)
243 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
244 if (action.size() > 5)
245 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
249 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
251 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
252 0 gather 68 68 10 10 10 0 0 0
254 1) 68 is the sendcount
255 2) 68 10 10 10 is the recvcounts
256 3) 0 is the root node
257 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
258 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
260 comm_size = MPI_COMM_WORLD->size();
261 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
262 send_size = parse_double(action[2]);
263 disps = std::vector<int>(comm_size, 0);
264 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
266 if (name == "gatherv") {
267 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
268 if (action.size() > 4 + comm_size)
269 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
270 if (action.size() > 5 + comm_size)
271 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
274 /* The 3 comes from "0 gather <sendcount>", which must always be present.
275 * The + comm_size is the recvcounts array, which must also be present
277 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
278 int datatype_index = 3 + comm_size;
279 disp_index = datatype_index + 1;
280 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
281 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
282 } else if (action.size() >
283 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
284 disp_index = 3 + comm_size;
285 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
286 int datatype_index = 3 + comm_size;
287 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
288 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
291 if (disp_index != 0) {
292 for (unsigned int i = 0; i < comm_size; i++)
293 disps[i] = std::stoi(action[disp_index + i]);
297 for (unsigned int i = 0; i < comm_size; i++) {
298 (*recvcounts)[i] = std::stoi(action[i + 3]);
300 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
303 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
305 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
308 1) 68 is the sendcounts
309 2) 68 is the recvcounts
310 3) 0 is the root node
311 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
312 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
314 CHECK_ACTION_PARAMS(action, 2, 3)
315 comm_size = MPI_COMM_WORLD->size();
316 send_size = parse_double(action[2]);
317 recv_size = parse_double(action[3]);
318 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
319 if (action.size() > 5)
320 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
321 if (action.size() > 6)
322 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
325 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
327 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
328 0 gather 68 10 10 10 68 0 0 0
330 1) 68 10 10 10 is the sendcounts
331 2) 68 is the recvcount
332 3) 0 is the root node
333 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
334 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
336 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
337 recv_size = parse_double(action[2 + comm_size]);
338 disps = std::vector<int>(comm_size, 0);
339 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
341 if (action.size() > 5 + comm_size)
342 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
343 if (action.size() > 5 + comm_size)
344 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
346 for (unsigned int i = 0; i < comm_size; i++) {
347 (*sendcounts)[i] = std::stoi(action[i + 2]);
349 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
350 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
353 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
355 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
356 0 reducescatter 275427 275427 275427 204020 11346849 0
358 1) The first four values after the name of the action declare the recvcounts array
359 2) The value 11346849 is the amount of instructions
360 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
362 comm_size = MPI_COMM_WORLD->size();
363 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
364 comp_size = parse_double(action[2 + comm_size]);
365 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
366 if (action.size() > 3 + comm_size)
367 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
369 for (unsigned int i = 0; i < comm_size; i++) {
370 recvcounts->push_back(std::stoi(action[i + 2]));
372 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
375 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
377 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
378 0 alltoallv 100 1 7 10 12 100 1 70 10 5
380 1) 100 is the size of the send buffer *sizeof(int),
381 2) 1 7 10 12 is the sendcounts array
382 3) 100*sizeof(int) is the size of the receiver buffer
383 4) 1 70 10 5 is the recvcounts array
385 comm_size = MPI_COMM_WORLD->size();
386 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
387 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
388 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
389 senddisps = std::vector<int>(comm_size, 0);
390 recvdisps = std::vector<int>(comm_size, 0);
392 if (action.size() > 5 + 2 * comm_size)
393 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
394 if (action.size() > 5 + 2 * comm_size)
395 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
397 send_buf_size = parse_double(action[2]);
398 recv_buf_size = parse_double(action[3 + comm_size]);
399 for (unsigned int i = 0; i < comm_size; i++) {
400 (*sendcounts)[i] = std::stoi(action[3 + i]);
401 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
403 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
404 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
409 std::string s = boost::algorithm::join(action, " ");
410 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411 const WaitTestParser& args = get_args();
412 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413 req_storage.remove(request);
415 if (request == MPI_REQUEST_NULL) {
416 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
421 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
423 // Must be taken before Request::wait() since the request may be set to
424 // MPI_REQUEST_NULL by Request::wait!
425 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
426 // TODO: Here we take the rank while we normally take the process id (look for get_pid())
427 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
430 Request::wait(&request, &status);
432 TRACE_smpi_comm_out(rank);
433 if (is_wait_for_receive)
434 TRACE_smpi_recv(args.src, args.dst, args.tag);
437 void SendAction::kernel(simgrid::xbt::ReplayAction&)
439 const SendRecvParser& args = get_args();
440 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
444 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
445 if (not TRACE_smpi_view_internals())
446 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
448 if (get_name() == "send") {
449 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450 } else if (get_name() == "isend") {
451 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452 req_storage.add(request);
454 xbt_die("Don't know this action, %s", get_name().c_str());
457 TRACE_smpi_comm_out(get_pid());
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
462 const SendRecvParser& args = get_args();
465 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
468 // unknown size from the receiver point of view
469 double arg_size = args.size;
470 if (arg_size <= 0.0) {
471 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
472 arg_size = status.count;
475 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
476 if (get_name() == "recv") {
478 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
479 } else if (get_name() == "irecv") {
480 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
481 req_storage.add(request);
486 TRACE_smpi_comm_out(get_pid());
487 if (is_recv && not TRACE_smpi_view_internals()) {
488 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
489 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
493 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
495 const ComputeParser& args = get_args();
496 if (smpi_cfg_simulate_computation()) {
497 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
501 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
503 const SleepParser& args = get_args();
504 XBT_DEBUG("Sleep for: %lf secs", args.time);
505 int rank = simgrid::s4u::this_actor::get_pid();
506 TRACE_smpi_sleeping_in(rank, args.time);
507 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
508 TRACE_smpi_sleeping_out(rank);
511 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
513 const LocationParser& args = get_args();
514 smpi_trace_set_call_location(args.filename.c_str(), args.line);
517 void TestAction::kernel(simgrid::xbt::ReplayAction&)
519 const WaitTestParser& args = get_args();
520 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
521 req_storage.remove(request);
522 // if request is null here, this may mean that a previous test has succeeded
523 // Different times in traced application and replayed version may lead to this
524 // In this case, ignore the extra calls.
525 if (request != MPI_REQUEST_NULL) {
526 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
530 Request::test(&request, &status, &flag);
532 XBT_DEBUG("MPI_Test result: %d", flag);
533 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
535 if (request == MPI_REQUEST_NULL)
536 req_storage.addNullRequest(args.src, args.dst, args.tag);
538 req_storage.add(request);
540 TRACE_smpi_comm_out(get_pid());
544 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
546 CHECK_ACTION_PARAMS(action, 0, 1)
547 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
548 : MPI_BYTE; // default TAU datatype
550 /* start a simulated timer */
551 smpi_process()->simulated_start();
554 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
559 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
561 const unsigned int count_requests = req_storage.size();
563 if (count_requests > 0) {
564 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
565 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
566 std::vector<MPI_Request> reqs;
567 req_storage.get_requests(reqs);
568 for (auto const& req : reqs) {
569 if (req && (req->flags() & MPI_REQ_RECV)) {
570 sender_receiver.push_back({req->src(), req->dst()});
573 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
574 req_storage.get_store().clear();
576 for (auto const& pair : sender_receiver) {
577 TRACE_smpi_recv(pair.first, pair.second, 0);
579 TRACE_smpi_comm_out(get_pid());
583 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
585 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
586 colls::barrier(MPI_COMM_WORLD);
587 TRACE_smpi_comm_out(get_pid());
590 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
592 const BcastArgParser& args = get_args();
593 TRACE_smpi_comm_in(get_pid(), "action_bcast",
594 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(), -1.0,
595 args.size, -1, Datatype::encode(args.datatype1), ""));
597 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
599 TRACE_smpi_comm_out(get_pid());
602 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
604 const ReduceArgParser& args = get_args();
605 TRACE_smpi_comm_in(get_pid(), "action_reduce",
606 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
607 args.comp_size, args.comm_size, -1,
608 Datatype::encode(args.datatype1), ""));
610 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
611 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
612 args.root, MPI_COMM_WORLD);
613 private_execute_flops(args.comp_size);
615 TRACE_smpi_comm_out(get_pid());
618 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
620 const AllReduceArgParser& args = get_args();
621 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
622 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
623 Datatype::encode(args.datatype1), ""));
625 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
626 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
628 private_execute_flops(args.comp_size);
630 TRACE_smpi_comm_out(get_pid());
633 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
635 const AllToAllArgParser& args = get_args();
636 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
637 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
638 Datatype::encode(args.datatype1),
639 Datatype::encode(args.datatype2)));
641 colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
642 recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
645 TRACE_smpi_comm_out(get_pid());
648 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
650 const GatherArgParser& args = get_args();
651 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
652 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
653 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
654 Datatype::encode(args.datatype2)));
656 if (get_name() == "gather") {
657 int rank = MPI_COMM_WORLD->rank();
658 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
659 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
660 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
662 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
663 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
666 TRACE_smpi_comm_out(get_pid());
669 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
671 int rank = MPI_COMM_WORLD->rank();
672 const GatherVArgParser& args = get_args();
673 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
674 new simgrid::instr::VarCollTIData(
675 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
676 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
678 if (get_name() == "gatherv") {
679 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
680 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
681 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
683 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
684 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
685 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
688 TRACE_smpi_comm_out(get_pid());
691 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
693 int rank = MPI_COMM_WORLD->rank();
694 const ScatterArgParser& args = get_args();
695 TRACE_smpi_comm_in(get_pid(), "action_scatter",
696 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
697 Datatype::encode(args.datatype1),
698 Datatype::encode(args.datatype2)));
700 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
702 args.datatype2, args.root, MPI_COMM_WORLD);
704 TRACE_smpi_comm_out(get_pid());
707 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
709 int rank = MPI_COMM_WORLD->rank();
710 const ScatterVArgParser& args = get_args();
711 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
712 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
713 nullptr, Datatype::encode(args.datatype1),
714 Datatype::encode(args.datatype2)));
716 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
717 args.sendcounts->data(), args.disps.data(), args.datatype1,
718 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
721 TRACE_smpi_comm_out(get_pid());
724 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
726 const ReduceScatterArgParser& args = get_args();
728 get_pid(), "action_reducescatter",
729 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
730 std::to_string(args.comp_size), /* ugly hack to print comp_size */
731 Datatype::encode(args.datatype1)));
733 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
734 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
735 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
737 private_execute_flops(args.comp_size);
738 TRACE_smpi_comm_out(get_pid());
741 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
743 const AllToAllVArgParser& args = get_args();
744 TRACE_smpi_comm_in(get_pid(), __func__,
745 new simgrid::instr::VarCollTIData(
746 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
747 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
749 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
750 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
751 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
753 TRACE_smpi_comm_out(get_pid());
755 } // Replay Namespace
756 }} // namespace simgrid::smpi
758 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
759 /** @brief Only initialize the replay, don't do it for real */
760 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
762 xbt_assert(not smpi_process()->initializing());
764 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
765 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
766 simgrid::smpi::ActorExt::init();
768 smpi_process()->mark_as_initialized();
769 smpi_process()->set_replaying(true);
771 int my_proc_id = simgrid::s4u::this_actor::get_pid();
773 TRACE_smpi_init(my_proc_id, "smpi_replay_run_init");
774 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
775 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
776 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
777 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
778 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
779 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
784 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
785 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
786 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
787 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
788 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
789 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
790 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
791 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
792 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
793 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
794 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
795 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
796 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
797 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
798 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
799 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
800 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
801 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
803 //if we have a delayed start, sleep here.
804 if (start_delay_flops > 0) {
805 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
806 private_execute_flops(start_delay_flops);
808 // Wait for the other actors to initialize also
809 simgrid::s4u::this_actor::yield();
813 /** @brief actually run the replay after initialization */
814 void smpi_replay_main(int rank, const char* trace_filename)
816 static int active_processes = 0;
818 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
819 std::string rank_string = std::to_string(rank);
820 simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
822 /* and now, finalize everything */
823 /* One active process will stop. Decrease the counter*/
824 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
825 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
826 if (count_requests > 0) {
827 MPI_Request* requests= new MPI_Request[count_requests];
830 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
831 requests[i] = pair.second;
834 simgrid::smpi::Request::waitall(count_requests, requests, MPI_STATUSES_IGNORE);
839 if(active_processes==0){
840 /* Last process alive speaking: end the simulated timer */
841 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
842 smpi_free_replay_tmp_buffers();
845 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
846 new simgrid::instr::NoOpTIData("finalize"));
848 smpi_process()->finalize();
850 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
853 /** @brief chain a replay initialization and a replay start */
854 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
856 smpi_replay_init(instance_id, rank, start_delay_flops);
857 smpi_replay_main(rank, trace_filename);