1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(
138 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140 tag), MPI_REQUEST_NULL});
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 CHECK_ACTION_PARAMS(action, 3, 0)
147 src = std::stoi(action[2]);
148 dst = std::stoi(action[3]);
149 tag = std::stoi(action[4]);
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
154 CHECK_ACTION_PARAMS(action, 3, 1)
155 partner = std::stoi(action[2]);
156 tag = std::stoi(action[3]);
157 size = parse_double(action[4]);
158 if (action.size() > 5)
159 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 CHECK_ACTION_PARAMS(action, 1, 0)
171 time = parse_double(action[2]);
174 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 2, 0)
177 filename = std::string(action[2]);
178 line = std::stoi(action[3]);
181 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
183 CHECK_ACTION_PARAMS(action, 1, 2)
184 size = parse_double(action[2]);
185 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
186 if (action.size() > 4)
187 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
190 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
192 CHECK_ACTION_PARAMS(action, 2, 2)
193 comm_size = parse_double(action[2]);
194 comp_size = parse_double(action[3]);
195 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
196 if (action.size() > 5)
197 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
200 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 CHECK_ACTION_PARAMS(action, 2, 1)
203 comm_size = parse_double(action[2]);
204 comp_size = parse_double(action[3]);
205 if (action.size() > 4)
206 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
209 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 comm_size = MPI_COMM_WORLD->size();
213 send_size = parse_double(action[2]);
214 recv_size = parse_double(action[3]);
216 if (action.size() > 4)
217 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
218 if (action.size() > 5)
219 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
224 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
227 1) 68 is the sendcounts
228 2) 68 is the recvcounts
229 3) 0 is the root node
230 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
233 CHECK_ACTION_PARAMS(action, 2, 3)
234 comm_size = MPI_COMM_WORLD->size();
235 send_size = parse_double(action[2]);
236 recv_size = parse_double(action[3]);
238 if (name == "gather") {
239 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
240 if (action.size() > 5)
241 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
242 if (action.size() > 6)
243 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
245 if (action.size() > 4)
246 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
247 if (action.size() > 5)
248 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
252 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
254 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
255 0 gather 68 68 10 10 10 0 0 0
257 1) 68 is the sendcount
258 2) 68 10 10 10 is the recvcounts
259 3) 0 is the root node
260 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
261 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
263 comm_size = MPI_COMM_WORLD->size();
264 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
265 send_size = parse_double(action[2]);
266 disps = std::vector<int>(comm_size, 0);
267 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
269 if (name == "gatherv") {
270 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
271 if (action.size() > 4 + comm_size)
272 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
273 if (action.size() > 5 + comm_size)
274 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
276 int datatype_index = 0;
278 /* The 3 comes from "0 gather <sendcount>", which must always be present.
279 * The + comm_size is the recvcounts array, which must also be present
281 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
282 datatype_index = 3 + comm_size;
283 disp_index = datatype_index + 1;
284 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
285 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
286 } else if (action.size() >
287 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
288 disp_index = 3 + comm_size;
289 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
290 datatype_index = 3 + comm_size;
291 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
292 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
295 if (disp_index != 0) {
296 for (unsigned int i = 0; i < comm_size; i++)
297 disps[i] = std::stoi(action[disp_index + i]);
301 for (unsigned int i = 0; i < comm_size; i++) {
302 (*recvcounts)[i] = std::stoi(action[i + 3]);
304 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
307 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
309 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
312 1) 68 is the sendcounts
313 2) 68 is the recvcounts
314 3) 0 is the root node
315 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
316 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
318 CHECK_ACTION_PARAMS(action, 2, 3)
319 comm_size = MPI_COMM_WORLD->size();
320 send_size = parse_double(action[2]);
321 recv_size = parse_double(action[3]);
322 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
323 if (action.size() > 5)
324 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
325 if (action.size() > 6)
326 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
329 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
331 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
332 0 gather 68 10 10 10 68 0 0 0
334 1) 68 10 10 10 is the sendcounts
335 2) 68 is the recvcount
336 3) 0 is the root node
337 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
338 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
340 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
341 recv_size = parse_double(action[2 + comm_size]);
342 disps = std::vector<int>(comm_size, 0);
343 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
345 if (action.size() > 5 + comm_size)
346 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
347 if (action.size() > 5 + comm_size)
348 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
350 for (unsigned int i = 0; i < comm_size; i++) {
351 (*sendcounts)[i] = std::stoi(action[i + 2]);
353 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
354 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
357 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
359 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
360 0 reducescatter 275427 275427 275427 204020 11346849 0
362 1) The first four values after the name of the action declare the recvcounts array
363 2) The value 11346849 is the amount of instructions
364 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
366 comm_size = MPI_COMM_WORLD->size();
367 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
368 comp_size = parse_double(action[2 + comm_size]);
369 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
370 if (action.size() > 3 + comm_size)
371 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
373 for (unsigned int i = 0; i < comm_size; i++) {
374 recvcounts->push_back(std::stoi(action[i + 2]));
376 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
379 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
381 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
382 0 alltoallv 100 1 7 10 12 100 1 70 10 5
384 1) 100 is the size of the send buffer *sizeof(int),
385 2) 1 7 10 12 is the sendcounts array
386 3) 100*sizeof(int) is the size of the receiver buffer
387 4) 1 70 10 5 is the recvcounts array
389 comm_size = MPI_COMM_WORLD->size();
390 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
391 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
392 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
393 senddisps = std::vector<int>(comm_size, 0);
394 recvdisps = std::vector<int>(comm_size, 0);
396 if (action.size() > 5 + 2 * comm_size)
397 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
398 if (action.size() > 5 + 2 * comm_size)
399 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
401 send_buf_size = parse_double(action[2]);
402 recv_buf_size = parse_double(action[3 + comm_size]);
403 for (unsigned int i = 0; i < comm_size; i++) {
404 (*sendcounts)[i] = std::stoi(action[3 + i]);
405 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
407 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
408 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
411 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
413 std::string s = boost::algorithm::join(action, " ");
414 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
415 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
416 req_storage.remove(request);
418 if (request == MPI_REQUEST_NULL) {
419 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
424 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
426 // Must be taken before Request::wait() since the request may be set to
427 // MPI_REQUEST_NULL by Request::wait!
428 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
429 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
430 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
433 Request::wait(&request, &status);
435 TRACE_smpi_comm_out(rank);
436 if (is_wait_for_receive)
437 TRACE_smpi_recv(args.src, args.dst, args.tag);
440 void SendAction::kernel(simgrid::xbt::ReplayAction&)
442 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
444 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
445 args.tag, Datatype::encode(args.datatype1)));
446 if (not TRACE_smpi_view_internals())
447 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
449 if (name == "send") {
450 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
451 } else if (name == "isend") {
452 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
453 req_storage.add(request);
455 xbt_die("Don't know this action, %s", name.c_str());
458 TRACE_smpi_comm_out(my_proc_id);
461 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
463 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
464 args.tag, Datatype::encode(args.datatype1)));
467 // unknown size from the receiver point of view
468 if (args.size <= 0.0) {
469 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
470 args.size = status.count;
473 bool is_recv = false; // Help analyzers understanding that status is not used unintialized
474 if (name == "recv") {
476 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
477 } else if (name == "irecv") {
478 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
479 req_storage.add(request);
484 TRACE_smpi_comm_out(my_proc_id);
485 if (is_recv && not TRACE_smpi_view_internals()) {
486 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
487 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
491 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
493 if (simgrid::config::get_value<bool>("smpi/simulate-computation")) {
494 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
498 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
500 XBT_DEBUG("Sleep for: %lf secs", args.time);
501 int rank = simgrid::s4u::this_actor::get_pid();
502 TRACE_smpi_sleeping_in(rank, args.time);
503 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
504 TRACE_smpi_sleeping_out(rank);
507 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
509 smpi_trace_set_call_location(args.filename.c_str(), args.line);
512 void TestAction::kernel(simgrid::xbt::ReplayAction&)
514 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
515 req_storage.remove(request);
516 // if request is null here, this may mean that a previous test has succeeded
517 // Different times in traced application and replayed version may lead to this
518 // In this case, ignore the extra calls.
519 if (request != MPI_REQUEST_NULL) {
520 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
524 Request::test(&request, &status, &flag);
526 XBT_DEBUG("MPI_Test result: %d", flag);
527 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
529 if (request == MPI_REQUEST_NULL)
530 req_storage.addNullRequest(args.src, args.dst, args.tag);
532 req_storage.add(request);
534 TRACE_smpi_comm_out(my_proc_id);
538 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
540 CHECK_ACTION_PARAMS(action, 0, 1)
541 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
542 : MPI_BYTE; // default TAU datatype
544 /* start a simulated timer */
545 smpi_process()->simulated_start();
548 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
553 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
555 const unsigned int count_requests = req_storage.size();
557 if (count_requests > 0) {
558 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
559 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
560 std::vector<MPI_Request> reqs;
561 req_storage.get_requests(reqs);
562 for (const auto& req : reqs) {
563 if (req && (req->flags() & MPI_REQ_RECV)) {
564 sender_receiver.push_back({req->src(), req->dst()});
567 MPI_Status status[count_requests];
568 Request::waitall(count_requests, &(reqs.data())[0], status);
569 req_storage.get_store().clear();
571 for (auto& pair : sender_receiver) {
572 TRACE_smpi_recv(pair.first, pair.second, 0);
574 TRACE_smpi_comm_out(my_proc_id);
578 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
580 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
581 Colls::barrier(MPI_COMM_WORLD);
582 TRACE_smpi_comm_out(my_proc_id);
585 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
587 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
588 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
589 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
591 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
593 TRACE_smpi_comm_out(my_proc_id);
596 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
598 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
599 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
600 args.comp_size, args.comm_size, -1,
601 Datatype::encode(args.datatype1), ""));
603 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
604 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
605 private_execute_flops(args.comp_size);
607 TRACE_smpi_comm_out(my_proc_id);
610 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
612 TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
613 Datatype::encode(args.datatype1), ""));
615 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
616 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
617 private_execute_flops(args.comp_size);
619 TRACE_smpi_comm_out(my_proc_id);
622 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
624 TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
625 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
626 Datatype::encode(args.datatype1),
627 Datatype::encode(args.datatype2)));
629 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
630 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
631 args.recv_size, args.datatype2, MPI_COMM_WORLD);
633 TRACE_smpi_comm_out(my_proc_id);
636 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
638 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
639 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
641 if (name == "gather") {
642 int rank = MPI_COMM_WORLD->rank();
643 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
644 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
647 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
648 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
650 TRACE_smpi_comm_out(my_proc_id);
653 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
655 int rank = MPI_COMM_WORLD->rank();
657 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
658 name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
659 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
661 if (name == "gatherv") {
662 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
663 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
664 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
667 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
668 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
669 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
672 TRACE_smpi_comm_out(my_proc_id);
675 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
677 int rank = MPI_COMM_WORLD->rank();
678 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
679 Datatype::encode(args.datatype1),
680 Datatype::encode(args.datatype2)));
682 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
683 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
685 TRACE_smpi_comm_out(my_proc_id);
688 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
690 int rank = MPI_COMM_WORLD->rank();
691 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
692 nullptr, Datatype::encode(args.datatype1),
693 Datatype::encode(args.datatype2)));
695 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
696 args.sendcounts->data(), args.disps.data(), args.datatype1,
697 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
700 TRACE_smpi_comm_out(my_proc_id);
703 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
705 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
706 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
707 std::to_string(args.comp_size), /* ugly hack to print comp_size */
708 Datatype::encode(args.datatype1)));
710 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
711 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
712 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
714 private_execute_flops(args.comp_size);
715 TRACE_smpi_comm_out(my_proc_id);
718 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
720 TRACE_smpi_comm_in(my_proc_id, __func__,
721 new simgrid::instr::VarCollTIData(
722 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
723 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
725 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
726 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
728 TRACE_smpi_comm_out(my_proc_id);
730 } // Replay Namespace
731 }} // namespace simgrid::smpi
733 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
734 /** @brief Only initialize the replay, don't do it for real */
735 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
737 xbt_assert(not smpi_process()->initializing());
739 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
740 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
741 simgrid::smpi::ActorExt::init();
743 smpi_process()->mark_as_initialized();
744 smpi_process()->set_replaying(true);
746 int my_proc_id = simgrid::s4u::this_actor::get_pid();
748 TRACE_smpi_init(my_proc_id);
749 TRACE_smpi_computing_init(my_proc_id);
750 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
751 TRACE_smpi_comm_out(my_proc_id);
752 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
753 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
754 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
755 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
756 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
757 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
758 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
759 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
760 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
761 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
762 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
763 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
764 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
765 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
766 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
767 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
768 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
769 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
770 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
771 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
772 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
773 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
774 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
775 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
776 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
777 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
778 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
779 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
781 //if we have a delayed start, sleep here.
782 if (start_delay_flops > 0) {
783 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
784 private_execute_flops(start_delay_flops);
786 // Wait for the other actors to initialize also
787 simgrid::s4u::this_actor::yield();
791 /** @brief actually run the replay after initialization */
792 void smpi_replay_main(int rank, const char* trace_filename)
794 static int active_processes = 0;
796 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
797 std::string rank_string = std::to_string(rank);
798 simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
800 /* and now, finalize everything */
801 /* One active process will stop. Decrease the counter*/
802 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
803 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
804 if (count_requests > 0) {
805 MPI_Request requests[count_requests];
806 MPI_Status status[count_requests];
809 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
810 requests[i] = pair.second;
813 simgrid::smpi::Request::waitall(count_requests, requests, status);
817 if(active_processes==0){
818 /* Last process alive speaking: end the simulated timer */
819 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
820 smpi_free_replay_tmp_buffers();
823 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
824 new simgrid::instr::NoOpTIData("finalize"));
826 smpi_process()->finalize();
828 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
831 /** @brief chain a replay initialization and a replay start */
832 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
834 smpi_replay_init(instance_id, rank, start_delay_flops);
835 smpi_replay_main(rank, trace_filename);