1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(
138 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140 tag), MPI_REQUEST_NULL});
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 CHECK_ACTION_PARAMS(action, 3, 0)
147 src = std::stoi(action[2]);
148 dst = std::stoi(action[3]);
149 tag = std::stoi(action[4]);
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
154 CHECK_ACTION_PARAMS(action, 3, 1)
155 partner = std::stoi(action[2]);
156 tag = std::stoi(action[3]);
157 size = parse_double(action[4]);
158 if (action.size() > 5)
159 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 CHECK_ACTION_PARAMS(action, 1, 2)
171 size = parse_double(action[2]);
172 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173 if (action.size() > 4)
174 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
179 CHECK_ACTION_PARAMS(action, 2, 2)
180 comm_size = parse_double(action[2]);
181 comp_size = parse_double(action[3]);
182 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
183 if (action.size() > 5)
184 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
189 CHECK_ACTION_PARAMS(action, 2, 1)
190 comm_size = parse_double(action[2]);
191 comp_size = parse_double(action[3]);
192 if (action.size() > 4)
193 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
198 CHECK_ACTION_PARAMS(action, 2, 1)
199 comm_size = MPI_COMM_WORLD->size();
200 send_size = parse_double(action[2]);
201 recv_size = parse_double(action[3]);
203 if (action.size() > 4)
204 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205 if (action.size() > 5)
206 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
211 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
214 1) 68 is the sendcounts
215 2) 68 is the recvcounts
216 3) 0 is the root node
217 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
220 CHECK_ACTION_PARAMS(action, 2, 3)
221 comm_size = MPI_COMM_WORLD->size();
222 send_size = parse_double(action[2]);
223 recv_size = parse_double(action[3]);
225 if (name == "gather") {
226 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
227 if (action.size() > 5)
228 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229 if (action.size() > 6)
230 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
232 if (action.size() > 4)
233 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
234 if (action.size() > 5)
235 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
239 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
241 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
242 0 gather 68 68 10 10 10 0 0 0
244 1) 68 is the sendcount
245 2) 68 10 10 10 is the recvcounts
246 3) 0 is the root node
247 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
250 comm_size = MPI_COMM_WORLD->size();
251 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
252 send_size = parse_double(action[2]);
253 disps = std::vector<int>(comm_size, 0);
254 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
256 if (name == "gatherv") {
257 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
258 if (action.size() > 4 + comm_size)
259 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
260 if (action.size() > 5 + comm_size)
261 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
263 int datatype_index = 0;
265 /* The 3 comes from "0 gather <sendcount>", which must always be present.
266 * The + comm_size is the recvcounts array, which must also be present
268 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
269 datatype_index = 3 + comm_size;
270 disp_index = datatype_index + 1;
271 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
272 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
273 } else if (action.size() >
274 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
275 disp_index = 3 + comm_size;
276 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
277 datatype_index = 3 + comm_size;
278 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
279 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
282 if (disp_index != 0) {
283 for (unsigned int i = 0; i < comm_size; i++)
284 disps[i] = std::stoi(action[disp_index + i]);
288 for (unsigned int i = 0; i < comm_size; i++) {
289 (*recvcounts)[i] = std::stoi(action[i + 3]);
291 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
294 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
296 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
299 1) 68 is the sendcounts
300 2) 68 is the recvcounts
301 3) 0 is the root node
302 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305 CHECK_ACTION_PARAMS(action, 2, 3)
306 comm_size = MPI_COMM_WORLD->size();
307 send_size = parse_double(action[2]);
308 recv_size = parse_double(action[3]);
309 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
310 if (action.size() > 5)
311 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
312 if (action.size() > 6)
313 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
316 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
318 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
319 0 gather 68 10 10 10 68 0 0 0
321 1) 68 10 10 10 is the sendcounts
322 2) 68 is the recvcount
323 3) 0 is the root node
324 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
327 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
328 recv_size = parse_double(action[2 + comm_size]);
329 disps = std::vector<int>(comm_size, 0);
330 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
332 if (action.size() > 5 + comm_size)
333 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
334 if (action.size() > 5 + comm_size)
335 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
337 for (unsigned int i = 0; i < comm_size; i++) {
338 (*sendcounts)[i] = std::stoi(action[i + 2]);
340 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
341 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
344 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
346 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
347 0 reducescatter 275427 275427 275427 204020 11346849 0
349 1) The first four values after the name of the action declare the recvcounts array
350 2) The value 11346849 is the amount of instructions
351 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
353 comm_size = MPI_COMM_WORLD->size();
354 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
355 comp_size = parse_double(action[2 + comm_size]);
356 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
357 if (action.size() > 3 + comm_size)
358 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
360 for (unsigned int i = 0; i < comm_size; i++) {
361 recvcounts->push_back(std::stoi(action[i + 2]));
363 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
366 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
368 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
369 0 alltoallv 100 1 7 10 12 100 1 70 10 5
371 1) 100 is the size of the send buffer *sizeof(int),
372 2) 1 7 10 12 is the sendcounts array
373 3) 100*sizeof(int) is the size of the receiver buffer
374 4) 1 70 10 5 is the recvcounts array
376 comm_size = MPI_COMM_WORLD->size();
377 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
378 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380 senddisps = std::vector<int>(comm_size, 0);
381 recvdisps = std::vector<int>(comm_size, 0);
383 if (action.size() > 5 + 2 * comm_size)
384 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
385 if (action.size() > 5 + 2 * comm_size)
386 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
388 send_buf_size = parse_double(action[2]);
389 recv_buf_size = parse_double(action[3 + comm_size]);
390 for (unsigned int i = 0; i < comm_size; i++) {
391 (*sendcounts)[i] = std::stoi(action[3 + i]);
392 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
394 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
395 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
398 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
400 std::string s = boost::algorithm::join(action, " ");
401 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
402 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
403 req_storage.remove(request);
405 if (request == MPI_REQUEST_NULL) {
406 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
411 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
413 // Must be taken before Request::wait() since the request may be set to
414 // MPI_REQUEST_NULL by Request::wait!
415 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
416 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
417 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
420 Request::wait(&request, &status);
422 TRACE_smpi_comm_out(rank);
423 if (is_wait_for_receive)
424 TRACE_smpi_recv(args.src, args.dst, args.tag);
427 void SendAction::kernel(simgrid::xbt::ReplayAction&)
429 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
431 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
432 args.tag, Datatype::encode(args.datatype1)));
433 if (not TRACE_smpi_view_internals())
434 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
436 if (name == "send") {
437 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
438 } else if (name == "isend") {
439 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
440 req_storage.add(request);
442 xbt_die("Don't know this action, %s", name.c_str());
445 TRACE_smpi_comm_out(my_proc_id);
448 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
450 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
451 args.tag, Datatype::encode(args.datatype1)));
454 // unknown size from the receiver point of view
455 if (args.size <= 0.0) {
456 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
457 args.size = status.count;
460 bool is_recv = false; // Help analyzers understanding that status is not used unintialized
461 if (name == "recv") {
463 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
464 } else if (name == "irecv") {
465 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
466 req_storage.add(request);
471 TRACE_smpi_comm_out(my_proc_id);
472 if (is_recv && not TRACE_smpi_view_internals()) {
473 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
474 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
478 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
480 smpi_execute_flops(args.flops);
483 void TestAction::kernel(simgrid::xbt::ReplayAction&)
485 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
486 req_storage.remove(request);
487 // if request is null here, this may mean that a previous test has succeeded
488 // Different times in traced application and replayed version may lead to this
489 // In this case, ignore the extra calls.
490 if (request != MPI_REQUEST_NULL) {
491 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
495 Request::test(&request, &status, &flag);
497 XBT_DEBUG("MPI_Test result: %d", flag);
498 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
500 if (request == MPI_REQUEST_NULL)
501 req_storage.addNullRequest(args.src, args.dst, args.tag);
503 req_storage.add(request);
505 TRACE_smpi_comm_out(my_proc_id);
509 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
511 CHECK_ACTION_PARAMS(action, 0, 1)
512 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
513 : MPI_BYTE; // default TAU datatype
515 /* start a simulated timer */
516 smpi_process()->simulated_start();
519 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
524 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
526 const unsigned int count_requests = req_storage.size();
528 if (count_requests > 0) {
529 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
530 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
531 std::vector<MPI_Request> reqs;
532 req_storage.get_requests(reqs);
533 for (const auto& req : reqs) {
534 if (req && (req->flags() & MPI_REQ_RECV)) {
535 sender_receiver.push_back({req->src(), req->dst()});
538 MPI_Status status[count_requests];
539 Request::waitall(count_requests, &(reqs.data())[0], status);
540 req_storage.get_store().clear();
542 for (auto& pair : sender_receiver) {
543 TRACE_smpi_recv(pair.first, pair.second, 0);
545 TRACE_smpi_comm_out(my_proc_id);
549 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
551 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
552 Colls::barrier(MPI_COMM_WORLD);
553 TRACE_smpi_comm_out(my_proc_id);
556 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
558 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
559 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
560 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
562 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
564 TRACE_smpi_comm_out(my_proc_id);
567 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
569 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
570 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
571 args.comp_size, args.comm_size, -1,
572 Datatype::encode(args.datatype1), ""));
574 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
575 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
576 private_execute_flops(args.comp_size);
578 TRACE_smpi_comm_out(my_proc_id);
581 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
583 TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
584 Datatype::encode(args.datatype1), ""));
586 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
587 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
588 private_execute_flops(args.comp_size);
590 TRACE_smpi_comm_out(my_proc_id);
593 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
595 TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
596 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
597 Datatype::encode(args.datatype1),
598 Datatype::encode(args.datatype2)));
600 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
601 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
602 args.recv_size, args.datatype2, MPI_COMM_WORLD);
604 TRACE_smpi_comm_out(my_proc_id);
607 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
609 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
610 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
612 if (name == "gather") {
613 int rank = MPI_COMM_WORLD->rank();
614 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
615 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
618 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
619 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
621 TRACE_smpi_comm_out(my_proc_id);
624 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
626 int rank = MPI_COMM_WORLD->rank();
628 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
629 name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
630 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
632 if (name == "gatherv") {
633 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
634 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
635 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
638 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
639 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
640 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
643 TRACE_smpi_comm_out(my_proc_id);
646 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
648 int rank = MPI_COMM_WORLD->rank();
649 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
650 Datatype::encode(args.datatype1),
651 Datatype::encode(args.datatype2)));
653 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
654 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
656 TRACE_smpi_comm_out(my_proc_id);
659 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
661 int rank = MPI_COMM_WORLD->rank();
662 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
663 nullptr, Datatype::encode(args.datatype1),
664 Datatype::encode(args.datatype2)));
666 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
667 args.sendcounts->data(), args.disps.data(), args.datatype1,
668 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
671 TRACE_smpi_comm_out(my_proc_id);
674 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
676 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
677 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
678 std::to_string(args.comp_size), /* ugly hack to print comp_size */
679 Datatype::encode(args.datatype1)));
681 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
682 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
683 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
685 private_execute_flops(args.comp_size);
686 TRACE_smpi_comm_out(my_proc_id);
689 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
691 TRACE_smpi_comm_in(my_proc_id, __func__,
692 new simgrid::instr::VarCollTIData(
693 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
694 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
696 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
697 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
699 TRACE_smpi_comm_out(my_proc_id);
701 } // Replay Namespace
702 }} // namespace simgrid::smpi
704 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
705 /** @brief Only initialize the replay, don't do it for real */
706 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
708 xbt_assert(not smpi_process()->initializing());
710 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
711 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
712 simgrid::smpi::ActorExt::init();
714 smpi_process()->mark_as_initialized();
715 smpi_process()->set_replaying(true);
717 int my_proc_id = simgrid::s4u::this_actor::get_pid();
719 TRACE_smpi_init(my_proc_id);
720 TRACE_smpi_computing_init(my_proc_id);
721 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
722 TRACE_smpi_comm_out(my_proc_id);
723 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
724 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
725 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
726 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
727 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
728 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
729 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
730 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
731 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
732 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
733 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
734 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
735 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
736 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
737 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
738 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
739 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
740 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
741 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
742 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
743 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
744 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
745 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
746 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
747 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
748 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
750 //if we have a delayed start, sleep here.
751 if (start_delay_flops > 0) {
752 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
753 private_execute_flops(start_delay_flops);
755 // Wait for the other actors to initialize also
756 simgrid::s4u::this_actor::yield();
760 /** @brief actually run the replay after initialization */
761 void smpi_replay_main(int rank, const char* trace_filename)
763 static int active_processes = 0;
765 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
766 std::string rank_string = std::to_string(rank);
767 simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
769 /* and now, finalize everything */
770 /* One active process will stop. Decrease the counter*/
771 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
772 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
773 if (count_requests > 0) {
774 MPI_Request requests[count_requests];
775 MPI_Status status[count_requests];
778 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
779 requests[i] = pair.second;
782 simgrid::smpi::Request::waitall(count_requests, requests, status);
786 if(active_processes==0){
787 /* Last process alive speaking: end the simulated timer */
788 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
789 smpi_free_replay_tmp_buffers();
792 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
793 new simgrid::instr::NoOpTIData("finalize"));
795 smpi_process()->finalize();
797 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
798 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
801 /** @brief chain a replay initialization and a replay start */
802 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
804 smpi_replay_init(instance_id, rank, start_delay_flops);
805 smpi_replay_main(rank, trace_filename);