1 /* Copyright (c) 2009-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
92 RequestStorage() = default;
93 int size() const { return store.size(); }
95 req_storage_t& get_store() { return store; }
97 void get_requests(std::vector<MPI_Request>& vec) const
99 for (auto const& pair : store) {
100 auto& req = pair.second;
101 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
102 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
103 vec.push_back(pair.second);
104 pair.second->print_request("MM");
109 MPI_Request find(int src, int dst, int tag)
111 auto it = store.find(req_key_t(src, dst, tag));
112 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
115 void remove(const Request* req)
117 if (req == MPI_REQUEST_NULL) return;
119 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
122 void add(MPI_Request req)
124 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
125 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
128 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
129 void addNullRequest(int src, int dst, int tag)
131 store.insert({req_key_t(
132 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
133 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
134 tag), MPI_REQUEST_NULL});
138 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
140 CHECK_ACTION_PARAMS(action, 3, 0)
141 src = std::stoi(action[2]);
142 dst = std::stoi(action[3]);
143 tag = std::stoi(action[4]);
146 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
148 CHECK_ACTION_PARAMS(action, 3, 1)
149 partner = std::stoi(action[2]);
150 tag = std::stoi(action[3]);
151 size = parse_double(action[4]);
152 if (action.size() > 5)
153 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
156 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
158 CHECK_ACTION_PARAMS(action, 1, 0)
159 flops = parse_double(action[2]);
162 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 time = parse_double(action[2]);
168 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 CHECK_ACTION_PARAMS(action, 2, 0)
171 filename = std::string(action[2]);
172 line = std::stoi(action[3]);
175 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
177 CHECK_ACTION_PARAMS(action, 1, 2)
178 size = parse_double(action[2]);
179 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
180 if (action.size() > 4)
181 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
184 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
186 CHECK_ACTION_PARAMS(action, 2, 2)
187 comm_size = parse_double(action[2]);
188 comp_size = parse_double(action[3]);
189 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
190 if (action.size() > 5)
191 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
194 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
196 CHECK_ACTION_PARAMS(action, 2, 1)
197 comm_size = parse_double(action[2]);
198 comp_size = parse_double(action[3]);
199 if (action.size() > 4)
200 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
203 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
205 CHECK_ACTION_PARAMS(action, 2, 1)
206 comm_size = MPI_COMM_WORLD->size();
207 send_size = parse_double(action[2]);
208 recv_size = parse_double(action[3]);
210 if (action.size() > 4)
211 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
212 if (action.size() > 5)
213 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
216 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
218 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
221 1) 68 is the sendcounts
222 2) 68 is the recvcounts
223 3) 0 is the root node
224 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
225 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
227 CHECK_ACTION_PARAMS(action, 2, 3)
228 comm_size = MPI_COMM_WORLD->size();
229 send_size = parse_double(action[2]);
230 recv_size = parse_double(action[3]);
232 if (name == "gather") {
233 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
234 if (action.size() > 5)
235 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
236 if (action.size() > 6)
237 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
239 if (action.size() > 4)
240 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
241 if (action.size() > 5)
242 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
246 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
248 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
249 0 gather 68 68 10 10 10 0 0 0
251 1) 68 is the sendcount
252 2) 68 10 10 10 is the recvcounts
253 3) 0 is the root node
254 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
255 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
257 comm_size = MPI_COMM_WORLD->size();
258 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
259 send_size = parse_double(action[2]);
260 disps = std::vector<int>(comm_size, 0);
261 recvcounts = std::make_shared<std::vector<int>>(comm_size);
263 if (name == "gatherv") {
264 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
265 if (action.size() > 4 + comm_size)
266 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
267 if (action.size() > 5 + comm_size)
268 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
271 /* The 3 comes from "0 gather <sendcount>", which must always be present.
272 * The + comm_size is the recvcounts array, which must also be present
274 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
275 int datatype_index = 3 + comm_size;
276 disp_index = datatype_index + 1;
277 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
278 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
279 } else if (action.size() >
280 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
281 disp_index = 3 + comm_size;
282 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
283 int datatype_index = 3 + comm_size;
284 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
285 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
288 if (disp_index != 0) {
289 for (unsigned int i = 0; i < comm_size; i++)
290 disps[i] = std::stoi(action[disp_index + i]);
294 for (unsigned int i = 0; i < comm_size; i++) {
295 (*recvcounts)[i] = std::stoi(action[i + 3]);
297 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
300 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
302 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
305 1) 68 is the sendcounts
306 2) 68 is the recvcounts
307 3) 0 is the root node
308 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
309 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
311 CHECK_ACTION_PARAMS(action, 2, 3)
312 comm_size = MPI_COMM_WORLD->size();
313 send_size = parse_double(action[2]);
314 recv_size = parse_double(action[3]);
315 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
316 if (action.size() > 5)
317 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
318 if (action.size() > 6)
319 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
322 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
324 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
325 0 gather 68 10 10 10 68 0 0 0
327 1) 68 10 10 10 is the sendcounts
328 2) 68 is the recvcount
329 3) 0 is the root node
330 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
331 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
333 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
334 recv_size = parse_double(action[2 + comm_size]);
335 disps = std::vector<int>(comm_size, 0);
336 sendcounts = std::make_shared<std::vector<int>>(comm_size);
338 if (action.size() > 5 + comm_size)
339 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
340 if (action.size() > 5 + comm_size)
341 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
343 for (unsigned int i = 0; i < comm_size; i++) {
344 (*sendcounts)[i] = std::stoi(action[i + 2]);
346 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
347 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
350 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
352 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
353 0 reducescatter 275427 275427 275427 204020 11346849 0
355 1) The first four values after the name of the action declare the recvcounts array
356 2) The value 11346849 is the amount of instructions
357 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
359 comm_size = MPI_COMM_WORLD->size();
360 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
361 comp_size = parse_double(action[2 + comm_size]);
362 recvcounts = std::make_shared<std::vector<int>>(comm_size);
363 if (action.size() > 3 + comm_size)
364 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
366 for (unsigned int i = 0; i < comm_size; i++) {
367 recvcounts->push_back(std::stoi(action[i + 2]));
369 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
372 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
374 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
375 0 alltoallv 100 1 7 10 12 100 1 70 10 5
377 1) 100 is the size of the send buffer *sizeof(int),
378 2) 1 7 10 12 is the sendcounts array
379 3) 100*sizeof(int) is the size of the receiver buffer
380 4) 1 70 10 5 is the recvcounts array
382 comm_size = MPI_COMM_WORLD->size();
383 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
384 sendcounts = std::make_shared<std::vector<int>>(comm_size);
385 recvcounts = std::make_shared<std::vector<int>>(comm_size);
386 senddisps = std::vector<int>(comm_size, 0);
387 recvdisps = std::vector<int>(comm_size, 0);
389 if (action.size() > 5 + 2 * comm_size)
390 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
391 if (action.size() > 5 + 2 * comm_size)
392 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
394 send_buf_size = parse_double(action[2]);
395 recv_buf_size = parse_double(action[3 + comm_size]);
396 for (unsigned int i = 0; i < comm_size; i++) {
397 (*sendcounts)[i] = std::stoi(action[3 + i]);
398 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
400 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
401 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
404 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
406 std::string s = boost::algorithm::join(action, " ");
407 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
408 const WaitTestParser& args = get_args();
409 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
410 req_storage.remove(request);
412 if (request == MPI_REQUEST_NULL) {
413 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
418 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
420 // Must be taken before Request::wait() since the request may be set to
421 // MPI_REQUEST_NULL by Request::wait!
422 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
423 // TODO: Here we take the rank while we normally take the process id (look for get_pid())
424 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
427 Request::wait(&request, &status);
429 TRACE_smpi_comm_out(rank);
430 if (is_wait_for_receive)
431 TRACE_smpi_recv(args.src, args.dst, args.tag);
434 void SendAction::kernel(simgrid::xbt::ReplayAction&)
436 const SendRecvParser& args = get_args();
437 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
441 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
442 if (not TRACE_smpi_view_internals())
443 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
445 if (get_name() == "send") {
446 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447 } else if (get_name() == "isend") {
448 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
449 req_storage.add(request);
451 xbt_die("Don't know this action, %s", get_name().c_str());
454 TRACE_smpi_comm_out(get_pid());
457 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
459 const SendRecvParser& args = get_args();
462 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
465 // unknown size from the receiver point of view
466 double arg_size = args.size;
467 if (arg_size <= 0.0) {
468 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
469 arg_size = status.count;
472 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
473 if (get_name() == "recv") {
475 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
476 } else if (get_name() == "irecv") {
477 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
478 req_storage.add(request);
483 TRACE_smpi_comm_out(get_pid());
484 if (is_recv && not TRACE_smpi_view_internals()) {
485 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
486 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
490 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
492 const ComputeParser& args = get_args();
493 if (smpi_cfg_simulate_computation()) {
494 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
498 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
500 const SleepParser& args = get_args();
501 XBT_DEBUG("Sleep for: %lf secs", args.time);
502 int rank = simgrid::s4u::this_actor::get_pid();
503 TRACE_smpi_sleeping_in(rank, args.time);
504 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
505 TRACE_smpi_sleeping_out(rank);
508 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
510 const LocationParser& args = get_args();
511 smpi_trace_set_call_location(args.filename.c_str(), args.line);
514 void TestAction::kernel(simgrid::xbt::ReplayAction&)
516 const WaitTestParser& args = get_args();
517 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
518 req_storage.remove(request);
519 // if request is null here, this may mean that a previous test has succeeded
520 // Different times in traced application and replayed version may lead to this
521 // In this case, ignore the extra calls.
522 if (request != MPI_REQUEST_NULL) {
523 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
527 Request::test(&request, &status, &flag);
529 XBT_DEBUG("MPI_Test result: %d", flag);
530 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
532 if (request == MPI_REQUEST_NULL)
533 req_storage.addNullRequest(args.src, args.dst, args.tag);
535 req_storage.add(request);
537 TRACE_smpi_comm_out(get_pid());
541 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
543 CHECK_ACTION_PARAMS(action, 0, 1)
544 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
545 : MPI_BYTE; // default TAU datatype
547 /* start a simulated timer */
548 smpi_process()->simulated_start();
551 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
556 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
558 const unsigned int count_requests = req_storage.size();
560 if (count_requests > 0) {
561 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
562 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
563 std::vector<MPI_Request> reqs;
564 req_storage.get_requests(reqs);
565 for (auto const& req : reqs) {
566 if (req && (req->flags() & MPI_REQ_RECV)) {
567 sender_receiver.emplace_back(req->src(), req->dst());
570 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
571 req_storage.get_store().clear();
573 for (auto const& pair : sender_receiver) {
574 TRACE_smpi_recv(pair.first, pair.second, 0);
576 TRACE_smpi_comm_out(get_pid());
580 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
582 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
583 colls::barrier(MPI_COMM_WORLD);
584 TRACE_smpi_comm_out(get_pid());
587 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
589 const BcastArgParser& args = get_args();
590 TRACE_smpi_comm_in(get_pid(), "action_bcast",
591 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(), -1.0,
592 args.size, -1, Datatype::encode(args.datatype1), ""));
594 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
596 TRACE_smpi_comm_out(get_pid());
599 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
601 const ReduceArgParser& args = get_args();
602 TRACE_smpi_comm_in(get_pid(), "action_reduce",
603 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
604 args.comp_size, args.comm_size, -1,
605 Datatype::encode(args.datatype1), ""));
607 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
608 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
609 args.root, MPI_COMM_WORLD);
610 private_execute_flops(args.comp_size);
612 TRACE_smpi_comm_out(get_pid());
615 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
617 const AllReduceArgParser& args = get_args();
618 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
619 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
620 Datatype::encode(args.datatype1), ""));
622 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
623 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
625 private_execute_flops(args.comp_size);
627 TRACE_smpi_comm_out(get_pid());
630 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
632 const AllToAllArgParser& args = get_args();
633 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
634 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
635 Datatype::encode(args.datatype1),
636 Datatype::encode(args.datatype2)));
638 colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
639 recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
642 TRACE_smpi_comm_out(get_pid());
645 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
647 const GatherArgParser& args = get_args();
648 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
649 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
650 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
651 Datatype::encode(args.datatype2)));
653 if (get_name() == "gather") {
654 int rank = MPI_COMM_WORLD->rank();
655 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
656 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
657 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
659 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
660 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
663 TRACE_smpi_comm_out(get_pid());
666 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
668 int rank = MPI_COMM_WORLD->rank();
669 const GatherVArgParser& args = get_args();
670 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
671 new simgrid::instr::VarCollTIData(
672 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
673 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
675 if (get_name() == "gatherv") {
676 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
677 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
678 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
680 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
681 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
682 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
685 TRACE_smpi_comm_out(get_pid());
688 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
690 int rank = MPI_COMM_WORLD->rank();
691 const ScatterArgParser& args = get_args();
692 TRACE_smpi_comm_in(get_pid(), "action_scatter",
693 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
694 Datatype::encode(args.datatype1),
695 Datatype::encode(args.datatype2)));
697 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
698 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
699 args.datatype2, args.root, MPI_COMM_WORLD);
701 TRACE_smpi_comm_out(get_pid());
704 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
706 int rank = MPI_COMM_WORLD->rank();
707 const ScatterVArgParser& args = get_args();
708 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
709 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
710 nullptr, Datatype::encode(args.datatype1),
711 Datatype::encode(args.datatype2)));
713 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
714 args.sendcounts->data(), args.disps.data(), args.datatype1,
715 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
718 TRACE_smpi_comm_out(get_pid());
721 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
723 const ReduceScatterArgParser& args = get_args();
725 get_pid(), "action_reducescatter",
726 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
727 std::to_string(args.comp_size), /* ugly hack to print comp_size */
728 Datatype::encode(args.datatype1)));
730 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
731 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
732 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
734 private_execute_flops(args.comp_size);
735 TRACE_smpi_comm_out(get_pid());
738 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
740 const AllToAllVArgParser& args = get_args();
741 TRACE_smpi_comm_in(get_pid(), __func__,
742 new simgrid::instr::VarCollTIData(
743 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
744 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
746 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
747 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
748 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
750 TRACE_smpi_comm_out(get_pid());
752 } // Replay Namespace
753 }} // namespace simgrid::smpi
755 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
756 /** @brief Only initialize the replay, don't do it for real */
757 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
759 xbt_assert(not smpi_process()->initializing());
761 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
762 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
763 simgrid::smpi::ActorExt::init();
765 smpi_process()->mark_as_initialized();
766 smpi_process()->set_replaying(true);
768 int my_proc_id = simgrid::s4u::this_actor::get_pid();
770 TRACE_smpi_init(my_proc_id, "smpi_replay_run_init");
771 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
772 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
773 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
774 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
775 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
776 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
784 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
785 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
786 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
787 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
788 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
789 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
790 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
791 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
792 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
793 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
794 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
795 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
796 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
797 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
798 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
800 //if we have a delayed start, sleep here.
801 if (start_delay_flops > 0) {
802 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
803 private_execute_flops(start_delay_flops);
805 // Wait for the other actors to initialize also
806 simgrid::s4u::this_actor::yield();
810 /** @brief actually run the replay after initialization */
811 void smpi_replay_main(int rank, const char* trace_filename)
813 static int active_processes = 0;
815 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
816 std::string rank_string = std::to_string(rank);
817 simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
819 /* and now, finalize everything */
820 /* One active process will stop. Decrease the counter*/
821 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
822 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
823 if (count_requests > 0) {
824 auto* requests = new MPI_Request[count_requests];
827 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
828 requests[i] = pair.second;
831 simgrid::smpi::Request::waitall(count_requests, requests, MPI_STATUSES_IGNORE);
836 if(active_processes==0){
837 /* Last process alive speaking: end the simulated timer */
838 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
839 smpi_free_replay_tmp_buffers();
842 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
843 new simgrid::instr::NoOpTIData("finalize"));
845 smpi_process()->finalize();
847 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
850 /** @brief chain a replay initialization and a replay start */
851 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
853 smpi_replay_init(instance_id, rank, start_delay_flops);
854 smpi_replay_main(rank, trace_filename);