1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
65 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
92 RequestStorage() = default;
93 int size() const { return store.size(); }
95 req_storage_t& get_store() { return store; }
97 void get_requests(std::vector<MPI_Request>& vec) const
99 for (auto const& pair : store) {
100 auto& req = pair.second;
101 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
102 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
103 vec.push_back(pair.second);
104 pair.second->print_request("MM");
109 MPI_Request find(int src, int dst, int tag)
111 auto it = store.find(req_key_t(src, dst, tag));
112 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
115 void remove(const Request* req)
117 if (req == MPI_REQUEST_NULL) return;
119 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
122 void add(MPI_Request req)
124 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
125 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
128 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
129 void addNullRequest(int src, int dst, int tag)
132 {req_key_t(MPI_COMM_WORLD->group()->actor_pid(src) - 1, MPI_COMM_WORLD->group()->actor_pid(dst) - 1, tag),
137 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
139 CHECK_ACTION_PARAMS(action, 3, 0)
140 src = std::stoi(action[2]);
141 dst = std::stoi(action[3]);
142 tag = std::stoi(action[4]);
145 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
147 CHECK_ACTION_PARAMS(action, 3, 1)
148 partner = std::stoi(action[2]);
149 tag = std::stoi(action[3]);
150 size = parse_double(action[4]);
151 if (action.size() > 5)
152 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
155 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
157 CHECK_ACTION_PARAMS(action, 1, 0)
158 flops = parse_double(action[2]);
161 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 CHECK_ACTION_PARAMS(action, 1, 0)
164 time = parse_double(action[2]);
167 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 CHECK_ACTION_PARAMS(action, 2, 0)
170 filename = std::string(action[2]);
171 line = std::stoi(action[3]);
174 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 1, 2)
177 size = parse_double(action[2]);
178 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
179 if (action.size() > 4)
180 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
183 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
185 CHECK_ACTION_PARAMS(action, 2, 2)
186 comm_size = parse_double(action[2]);
187 comp_size = parse_double(action[3]);
188 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
189 if (action.size() > 5)
190 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
193 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
195 CHECK_ACTION_PARAMS(action, 2, 1)
196 comm_size = parse_double(action[2]);
197 comp_size = parse_double(action[3]);
198 if (action.size() > 4)
199 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
202 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
204 CHECK_ACTION_PARAMS(action, 2, 1)
205 comm_size = MPI_COMM_WORLD->size();
206 send_size = parse_double(action[2]);
207 recv_size = parse_double(action[3]);
209 if (action.size() > 4)
210 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
211 if (action.size() > 5)
212 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
215 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
217 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
220 1) 68 is the sendcounts
221 2) 68 is the recvcounts
222 3) 0 is the root node
223 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
224 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
226 CHECK_ACTION_PARAMS(action, 2, 3)
227 comm_size = MPI_COMM_WORLD->size();
228 send_size = parse_double(action[2]);
229 recv_size = parse_double(action[3]);
231 if (name == "gather") {
232 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
233 if (action.size() > 5)
234 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
235 if (action.size() > 6)
236 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
238 if (action.size() > 4)
239 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
240 if (action.size() > 5)
241 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
245 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
247 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
248 0 gather 68 68 10 10 10 0 0 0
250 1) 68 is the sendcount
251 2) 68 10 10 10 is the recvcounts
252 3) 0 is the root node
253 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
254 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
256 comm_size = MPI_COMM_WORLD->size();
257 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
258 send_size = parse_double(action[2]);
259 disps = std::vector<int>(comm_size, 0);
260 recvcounts = std::make_shared<std::vector<int>>(comm_size);
262 if (name == "gatherv") {
263 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
264 if (action.size() > 4 + comm_size)
265 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
266 if (action.size() > 5 + comm_size)
267 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
270 /* The 3 comes from "0 gather <sendcount>", which must always be present.
271 * The + comm_size is the recvcounts array, which must also be present
273 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
274 int datatype_index = 3 + comm_size;
275 disp_index = datatype_index + 1;
276 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
277 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
278 } else if (action.size() >
279 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
280 disp_index = 3 + comm_size;
281 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
282 int datatype_index = 3 + comm_size;
283 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
284 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
287 if (disp_index != 0) {
288 for (unsigned int i = 0; i < comm_size; i++)
289 disps[i] = std::stoi(action[disp_index + i]);
293 for (unsigned int i = 0; i < comm_size; i++) {
294 (*recvcounts)[i] = std::stoi(action[i + 3]);
296 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
299 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
301 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
304 1) 68 is the sendcounts
305 2) 68 is the recvcounts
306 3) 0 is the root node
307 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
308 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
310 CHECK_ACTION_PARAMS(action, 2, 3)
311 comm_size = MPI_COMM_WORLD->size();
312 send_size = parse_double(action[2]);
313 recv_size = parse_double(action[3]);
314 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
315 if (action.size() > 5)
316 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
317 if (action.size() > 6)
318 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
321 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
323 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
324 0 gather 68 10 10 10 68 0 0 0
326 1) 68 10 10 10 is the sendcounts
327 2) 68 is the recvcount
328 3) 0 is the root node
329 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
330 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
332 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
333 recv_size = parse_double(action[2 + comm_size]);
334 disps = std::vector<int>(comm_size, 0);
335 sendcounts = std::make_shared<std::vector<int>>(comm_size);
337 if (action.size() > 5 + comm_size)
338 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
339 if (action.size() > 5 + comm_size)
340 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
342 for (unsigned int i = 0; i < comm_size; i++) {
343 (*sendcounts)[i] = std::stoi(action[i + 2]);
345 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
346 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
349 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
351 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
352 0 reducescatter 275427 275427 275427 204020 11346849 0
354 1) The first four values after the name of the action declare the recvcounts array
355 2) The value 11346849 is the amount of instructions
356 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
358 comm_size = MPI_COMM_WORLD->size();
359 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
360 comp_size = parse_double(action[2 + comm_size]);
361 recvcounts = std::make_shared<std::vector<int>>(comm_size);
362 if (action.size() > 3 + comm_size)
363 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
365 for (unsigned int i = 0; i < comm_size; i++) {
366 recvcounts->push_back(std::stoi(action[i + 2]));
368 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
371 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
373 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
374 0 alltoallv 100 1 7 10 12 100 1 70 10 5
376 1) 100 is the size of the send buffer *sizeof(int),
377 2) 1 7 10 12 is the sendcounts array
378 3) 100*sizeof(int) is the size of the receiver buffer
379 4) 1 70 10 5 is the recvcounts array
381 comm_size = MPI_COMM_WORLD->size();
382 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
383 sendcounts = std::make_shared<std::vector<int>>(comm_size);
384 recvcounts = std::make_shared<std::vector<int>>(comm_size);
385 senddisps = std::vector<int>(comm_size, 0);
386 recvdisps = std::vector<int>(comm_size, 0);
388 if (action.size() > 5 + 2 * comm_size)
389 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
390 if (action.size() > 5 + 2 * comm_size)
391 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
393 send_buf_size = parse_double(action[2]);
394 recv_buf_size = parse_double(action[3 + comm_size]);
395 for (unsigned int i = 0; i < comm_size; i++) {
396 (*sendcounts)[i] = std::stoi(action[3 + i]);
397 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
399 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
400 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
403 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
405 std::string s = boost::algorithm::join(action, " ");
406 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
407 const WaitTestParser& args = get_args();
408 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
409 req_storage.remove(request);
411 if (request == MPI_REQUEST_NULL) {
412 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
417 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
419 // Must be taken before Request::wait() since the request may be set to
420 // MPI_REQUEST_NULL by Request::wait!
421 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
422 // TODO: Here we take the rank while we normally take the process id (look for get_pid())
423 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
426 Request::wait(&request, &status);
428 TRACE_smpi_comm_out(rank);
429 if (is_wait_for_receive)
430 TRACE_smpi_recv(args.src, args.dst, args.tag);
433 void SendAction::kernel(simgrid::xbt::ReplayAction&)
435 const SendRecvParser& args = get_args();
436 int dst_traced = MPI_COMM_WORLD->group()->actor_pid(args.partner);
440 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
441 if (not TRACE_smpi_view_internals())
442 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
444 if (get_name() == "send") {
445 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
446 } else if (get_name() == "isend") {
447 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
448 req_storage.add(request);
450 xbt_die("Don't know this action, %s", get_name().c_str());
453 TRACE_smpi_comm_out(get_pid());
456 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
458 const SendRecvParser& args = get_args();
461 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
464 // unknown size from the receiver point of view
465 double arg_size = args.size;
466 if (arg_size <= 0.0) {
467 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
468 arg_size = status.count;
471 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
472 if (get_name() == "recv") {
474 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
475 } else if (get_name() == "irecv") {
476 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
477 req_storage.add(request);
482 TRACE_smpi_comm_out(get_pid());
483 if (is_recv && not TRACE_smpi_view_internals()) {
484 int src_traced = MPI_COMM_WORLD->group()->actor_pid(status.MPI_SOURCE);
485 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
489 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
491 const ComputeParser& args = get_args();
492 if (smpi_cfg_simulate_computation()) {
493 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
497 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
499 const SleepParser& args = get_args();
500 XBT_DEBUG("Sleep for: %lf secs", args.time);
501 int rank = simgrid::s4u::this_actor::get_pid();
502 TRACE_smpi_sleeping_in(rank, args.time);
503 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
504 TRACE_smpi_sleeping_out(rank);
507 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
509 const LocationParser& args = get_args();
510 smpi_trace_set_call_location(args.filename.c_str(), args.line);
513 void TestAction::kernel(simgrid::xbt::ReplayAction&)
515 const WaitTestParser& args = get_args();
516 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
517 req_storage.remove(request);
518 // if request is null here, this may mean that a previous test has succeeded
519 // Different times in traced application and replayed version may lead to this
520 // In this case, ignore the extra calls.
521 if (request != MPI_REQUEST_NULL) {
522 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
526 Request::test(&request, &status, &flag);
528 XBT_DEBUG("MPI_Test result: %d", flag);
529 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
531 if (request == MPI_REQUEST_NULL)
532 req_storage.addNullRequest(args.src, args.dst, args.tag);
534 req_storage.add(request);
536 TRACE_smpi_comm_out(get_pid());
540 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
542 CHECK_ACTION_PARAMS(action, 0, 1)
543 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
544 : MPI_BYTE; // default TAU datatype
546 /* start a simulated timer */
547 smpi_process()->simulated_start();
550 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
555 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
557 const unsigned int count_requests = req_storage.size();
559 if (count_requests > 0) {
560 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
561 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
562 std::vector<MPI_Request> reqs;
563 req_storage.get_requests(reqs);
564 for (auto const& req : reqs) {
565 if (req && (req->flags() & MPI_REQ_RECV)) {
566 sender_receiver.emplace_back(req->src(), req->dst());
569 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
570 req_storage.get_store().clear();
572 for (auto const& pair : sender_receiver) {
573 TRACE_smpi_recv(pair.first, pair.second, 0);
575 TRACE_smpi_comm_out(get_pid());
579 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
581 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
582 colls::barrier(MPI_COMM_WORLD);
583 TRACE_smpi_comm_out(get_pid());
586 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
588 const BcastArgParser& args = get_args();
589 TRACE_smpi_comm_in(get_pid(), "action_bcast",
590 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor_pid(args.root), -1.0,
591 args.size, -1, Datatype::encode(args.datatype1), ""));
593 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
595 TRACE_smpi_comm_out(get_pid());
598 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
600 const ReduceArgParser& args = get_args();
601 TRACE_smpi_comm_in(get_pid(), "action_reduce",
602 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor_pid(args.root),
603 args.comp_size, args.comm_size, -1,
604 Datatype::encode(args.datatype1), ""));
606 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
607 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
608 args.root, MPI_COMM_WORLD);
609 private_execute_flops(args.comp_size);
611 TRACE_smpi_comm_out(get_pid());
614 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
616 const AllReduceArgParser& args = get_args();
617 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
618 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
619 Datatype::encode(args.datatype1), ""));
621 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
622 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
624 private_execute_flops(args.comp_size);
626 TRACE_smpi_comm_out(get_pid());
629 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
631 const AllToAllArgParser& args = get_args();
632 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
633 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
634 Datatype::encode(args.datatype1),
635 Datatype::encode(args.datatype2)));
637 colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
638 recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
641 TRACE_smpi_comm_out(get_pid());
644 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
646 const GatherArgParser& args = get_args();
647 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
648 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
649 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
650 Datatype::encode(args.datatype2)));
652 if (get_name() == "gather") {
653 int rank = MPI_COMM_WORLD->rank();
654 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
655 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
656 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
658 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
659 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
662 TRACE_smpi_comm_out(get_pid());
665 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
667 int rank = MPI_COMM_WORLD->rank();
668 const GatherVArgParser& args = get_args();
669 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
670 new simgrid::instr::VarCollTIData(
671 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
672 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
674 if (get_name() == "gatherv") {
675 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
676 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
677 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
679 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
680 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
681 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
684 TRACE_smpi_comm_out(get_pid());
687 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
689 int rank = MPI_COMM_WORLD->rank();
690 const ScatterArgParser& args = get_args();
691 TRACE_smpi_comm_in(get_pid(), "action_scatter",
692 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
693 Datatype::encode(args.datatype1),
694 Datatype::encode(args.datatype2)));
696 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
698 args.datatype2, args.root, MPI_COMM_WORLD);
700 TRACE_smpi_comm_out(get_pid());
703 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
705 int rank = MPI_COMM_WORLD->rank();
706 const ScatterVArgParser& args = get_args();
707 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
708 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
709 nullptr, Datatype::encode(args.datatype1),
710 Datatype::encode(args.datatype2)));
712 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
713 args.sendcounts->data(), args.disps.data(), args.datatype1,
714 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
717 TRACE_smpi_comm_out(get_pid());
720 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
722 const ReduceScatterArgParser& args = get_args();
724 get_pid(), "action_reducescatter",
725 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
726 std::to_string(args.comp_size), /* ugly hack to print comp_size */
727 Datatype::encode(args.datatype1)));
729 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
730 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
731 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
733 private_execute_flops(args.comp_size);
734 TRACE_smpi_comm_out(get_pid());
737 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
739 const AllToAllVArgParser& args = get_args();
740 TRACE_smpi_comm_in(get_pid(), __func__,
741 new simgrid::instr::VarCollTIData(
742 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
743 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
745 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
746 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
747 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
749 TRACE_smpi_comm_out(get_pid());
751 } // Replay Namespace
752 }} // namespace simgrid::smpi
754 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
755 /** @brief Only initialize the replay, don't do it for real */
756 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
758 xbt_assert(not smpi_process()->initializing());
760 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
761 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
762 simgrid::smpi::ActorExt::init();
764 smpi_process()->mark_as_initialized();
765 smpi_process()->set_replaying(true);
767 int my_proc_id = simgrid::s4u::this_actor::get_pid();
769 TRACE_smpi_init(my_proc_id, "smpi_replay_run_init");
770 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
771 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
772 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
773 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
774 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
775 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
776 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
783 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
784 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
785 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
786 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
787 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
788 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
789 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
790 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
791 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
792 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
793 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
794 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
795 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
796 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
797 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
799 //if we have a delayed start, sleep here.
800 if (start_delay_flops > 0) {
801 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
802 private_execute_flops(start_delay_flops);
804 // Wait for the other actors to initialize also
805 simgrid::s4u::this_actor::yield();
809 /** @brief actually run the replay after initialization */
810 void smpi_replay_main(int rank, const char* private_trace_filename)
812 static int active_processes = 0;
814 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
815 std::string rank_string = std::to_string(rank);
816 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
818 /* and now, finalize everything */
819 /* One active process will stop. Decrease the counter*/
820 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
821 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
822 if (count_requests > 0) {
823 std::vector<MPI_Request> requests(count_requests);
826 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
827 requests[i] = pair.second;
830 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
834 if(active_processes==0){
835 /* Last process alive speaking: end the simulated timer */
836 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
837 smpi_free_replay_tmp_buffers();
840 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
841 new simgrid::instr::NoOpTIData("finalize"));
843 smpi_process()->finalize();
845 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
848 /** @brief chain a replay initialization and a replay start */
849 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
851 smpi_replay_init(instance_id, rank, start_delay_flops);
852 smpi_replay_main(rank, private_trace_filename);