1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
21 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
22 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
23 // this could go into a header file.
24 namespace hash_tuple {
25 template <typename TT> class hash {
27 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
30 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
32 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
35 // Recursive template code derived from Matthieu M.
36 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
38 static void apply(size_t& seed, Tuple const& tuple)
40 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
41 hash_combine(seed, std::get<Index>(tuple));
45 template <class Tuple> class HashValueImpl<Tuple, 0> {
47 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
50 template <typename... TT> class hash<std::tuple<TT...>> {
52 size_t operator()(std::tuple<TT...> const& tt) const
55 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
61 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
63 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
64 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(std::string string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
141 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
143 CHECK_ACTION_PARAMS(action, 3, 0)
144 src = std::stoi(action[2]);
145 dst = std::stoi(action[3]);
146 tag = std::stoi(action[4]);
149 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
151 CHECK_ACTION_PARAMS(action, 3, 1)
152 partner = std::stoi(action[2]);
153 tag = std::stoi(action[3]);
154 size = parse_double(action[4]);
155 if (action.size() > 5)
156 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
159 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
161 CHECK_ACTION_PARAMS(action, 1, 0)
162 flops = parse_double(action[2]);
165 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
167 CHECK_ACTION_PARAMS(action, 1, 2)
168 size = parse_double(action[2]);
169 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
170 if (action.size() > 4)
171 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
174 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
176 CHECK_ACTION_PARAMS(action, 2, 2)
177 comm_size = parse_double(action[2]);
178 comp_size = parse_double(action[3]);
179 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
180 if (action.size() > 5)
181 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
184 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
186 CHECK_ACTION_PARAMS(action, 2, 1)
187 comm_size = parse_double(action[2]);
188 comp_size = parse_double(action[3]);
189 if (action.size() > 4)
190 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
193 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
195 CHECK_ACTION_PARAMS(action, 2, 1)
196 comm_size = MPI_COMM_WORLD->size();
197 send_size = parse_double(action[2]);
198 recv_size = parse_double(action[3]);
200 if (action.size() > 4)
201 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
202 if (action.size() > 5)
203 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
206 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
208 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
211 1) 68 is the sendcounts
212 2) 68 is the recvcounts
213 3) 0 is the root node
214 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
215 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217 CHECK_ACTION_PARAMS(action, 2, 3)
218 comm_size = MPI_COMM_WORLD->size();
219 send_size = parse_double(action[2]);
220 recv_size = parse_double(action[3]);
222 if (name == "gather") {
223 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
224 if (action.size() > 5)
225 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
226 if (action.size() > 6)
227 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
229 if (action.size() > 4)
230 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
231 if (action.size() > 5)
232 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
236 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
238 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
239 0 gather 68 68 10 10 10 0 0 0
241 1) 68 is the sendcount
242 2) 68 10 10 10 is the recvcounts
243 3) 0 is the root node
244 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
245 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
247 comm_size = MPI_COMM_WORLD->size();
248 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
249 send_size = parse_double(action[2]);
250 disps = std::vector<int>(comm_size, 0);
251 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
253 if (name == "gatherV") {
254 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
255 if (action.size() > 4 + comm_size)
256 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
257 if (action.size() > 5 + comm_size)
258 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
260 int datatype_index = 0;
262 /* The 3 comes from "0 gather <sendcount>", which must always be present.
263 * The + comm_size is the recvcounts array, which must also be present
265 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
266 datatype_index = 3 + comm_size;
267 disp_index = datatype_index + 1;
268 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
269 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
270 } else if (action.size() >
271 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
272 disp_index = 3 + comm_size;
273 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
274 datatype_index = 3 + comm_size;
275 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
276 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
279 if (disp_index != 0) {
280 for (unsigned int i = 0; i < comm_size; i++)
281 disps[i] = std::stoi(action[disp_index + i]);
285 for (unsigned int i = 0; i < comm_size; i++) {
286 (*recvcounts)[i] = std::stoi(action[i + 3]);
288 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
291 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
293 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
296 1) 68 is the sendcounts
297 2) 68 is the recvcounts
298 3) 0 is the root node
299 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
300 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
302 CHECK_ACTION_PARAMS(action, 2, 3)
303 comm_size = MPI_COMM_WORLD->size();
304 send_size = parse_double(action[2]);
305 recv_size = parse_double(action[3]);
306 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
307 if (action.size() > 5)
308 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
309 if (action.size() > 6)
310 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
313 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
315 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
316 0 gather 68 10 10 10 68 0 0 0
318 1) 68 10 10 10 is the sendcounts
319 2) 68 is the recvcount
320 3) 0 is the root node
321 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
322 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
324 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
325 recv_size = parse_double(action[2 + comm_size]);
326 disps = std::vector<int>(comm_size, 0);
327 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
329 if (action.size() > 5 + comm_size)
330 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
331 if (action.size() > 5 + comm_size)
332 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
334 for (unsigned int i = 0; i < comm_size; i++) {
335 (*sendcounts)[i] = std::stoi(action[i + 2]);
337 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
338 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
341 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
343 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
344 0 reduceScatter 275427 275427 275427 204020 11346849 0
346 1) The first four values after the name of the action declare the recvcounts array
347 2) The value 11346849 is the amount of instructions
348 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
350 comm_size = MPI_COMM_WORLD->size();
351 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
352 comp_size = parse_double(action[2 + comm_size]);
353 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
354 if (action.size() > 3 + comm_size)
355 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
357 for (unsigned int i = 0; i < comm_size; i++) {
358 recvcounts->push_back(std::stoi(action[i + 2]));
360 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
363 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
365 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
366 0 allToAllV 100 1 7 10 12 100 1 70 10 5
368 1) 100 is the size of the send buffer *sizeof(int),
369 2) 1 7 10 12 is the sendcounts array
370 3) 100*sizeof(int) is the size of the receiver buffer
371 4) 1 70 10 5 is the recvcounts array
373 comm_size = MPI_COMM_WORLD->size();
374 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
375 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
376 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
377 senddisps = std::vector<int>(comm_size, 0);
378 recvdisps = std::vector<int>(comm_size, 0);
380 if (action.size() > 5 + 2 * comm_size)
381 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
382 if (action.size() > 5 + 2 * comm_size)
383 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
385 send_buf_size = parse_double(action[2]);
386 recv_buf_size = parse_double(action[3 + comm_size]);
387 for (unsigned int i = 0; i < comm_size; i++) {
388 (*sendcounts)[i] = std::stoi(action[3 + i]);
389 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
391 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
392 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
396 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
398 // Needs to be re-initialized for every action, hence here
399 double start_time = smpi_process()->simulated_elapsed();
400 args.parse(action, name);
403 log_timed_action(action, start_time);
406 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
408 std::string s = boost::algorithm::join(action, " ");
409 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
410 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
411 req_storage.remove(request);
413 if (request == MPI_REQUEST_NULL) {
414 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
419 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
421 // Must be taken before Request::wait() since the request may be set to
422 // MPI_REQUEST_NULL by Request::wait!
423 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
424 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
425 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
428 Request::wait(&request, &status);
430 TRACE_smpi_comm_out(rank);
431 if (is_wait_for_receive)
432 TRACE_smpi_recv(args.src, args.dst, args.tag);
435 void SendAction::kernel(simgrid::xbt::ReplayAction& action)
437 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
439 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
440 args.tag, Datatype::encode(args.datatype1)));
441 if (not TRACE_smpi_view_internals())
442 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
444 if (name == "send") {
445 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
446 } else if (name == "Isend") {
447 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
448 req_storage.add(request);
450 xbt_die("Don't know this action, %s", name.c_str());
453 TRACE_smpi_comm_out(my_proc_id);
456 void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
458 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
460 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
461 args.tag, Datatype::encode(args.datatype1)));
464 // unknown size from the receiver point of view
465 if (args.size <= 0.0) {
466 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
467 args.size = status.count;
470 if (name == "recv") {
471 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
472 } else if (name == "Irecv") {
473 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
474 req_storage.add(request);
477 TRACE_smpi_comm_out(my_proc_id);
478 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
479 if (name == "recv" && not TRACE_smpi_view_internals()) {
480 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
484 void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
486 TRACE_smpi_computing_in(my_proc_id, args.flops);
487 smpi_execute_flops(args.flops);
488 TRACE_smpi_computing_out(my_proc_id);
491 void TestAction::kernel(simgrid::xbt::ReplayAction& action)
493 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
494 req_storage.remove(request);
495 // if request is null here, this may mean that a previous test has succeeded
496 // Different times in traced application and replayed version may lead to this
497 // In this case, ignore the extra calls.
498 if (request != MPI_REQUEST_NULL) {
499 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
502 int flag = Request::test(&request, &status);
504 XBT_DEBUG("MPI_Test result: %d", flag);
505 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
507 if (request == MPI_REQUEST_NULL)
508 req_storage.addNullRequest(args.src, args.dst, args.tag);
510 req_storage.add(request);
512 TRACE_smpi_comm_out(my_proc_id);
516 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
518 CHECK_ACTION_PARAMS(action, 0, 1)
519 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
520 : MPI_BYTE; // default TAU datatype
522 /* start a simulated timer */
523 smpi_process()->simulated_start();
526 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
531 void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
533 const unsigned int count_requests = req_storage.size();
535 if (count_requests > 0) {
536 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
537 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
538 std::vector<MPI_Request> reqs;
539 req_storage.get_requests(reqs);
540 for (const auto& req : reqs) {
541 if (req && (req->flags() & MPI_REQ_RECV)) {
542 sender_receiver.push_back({req->src(), req->dst()});
545 MPI_Status status[count_requests];
546 Request::waitall(count_requests, &(reqs.data())[0], status);
547 req_storage.get_store().clear();
549 for (auto& pair : sender_receiver) {
550 TRACE_smpi_recv(pair.first, pair.second, 0);
552 TRACE_smpi_comm_out(my_proc_id);
556 void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
558 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
559 Colls::barrier(MPI_COMM_WORLD);
560 TRACE_smpi_comm_out(my_proc_id);
563 void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
565 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
566 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
567 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
569 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
571 TRACE_smpi_comm_out(my_proc_id);
574 void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
576 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
577 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
578 args.comp_size, args.comm_size, -1,
579 Datatype::encode(args.datatype1), ""));
581 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
582 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
583 smpi_execute_flops(args.comp_size);
585 TRACE_smpi_comm_out(my_proc_id);
588 void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
590 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
591 Datatype::encode(args.datatype1), ""));
593 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
594 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
595 smpi_execute_flops(args.comp_size);
597 TRACE_smpi_comm_out(my_proc_id);
600 void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
602 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
603 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
604 Datatype::encode(args.datatype1),
605 Datatype::encode(args.datatype2)));
607 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
608 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
609 args.recv_size, args.datatype2, MPI_COMM_WORLD);
611 TRACE_smpi_comm_out(my_proc_id);
614 void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
616 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
617 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
619 if (name == "gather") {
620 int rank = MPI_COMM_WORLD->rank();
621 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
622 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
625 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
626 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
628 TRACE_smpi_comm_out(my_proc_id);
631 void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
633 int rank = MPI_COMM_WORLD->rank();
635 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
636 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
637 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
639 if (name == "gatherV") {
640 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
641 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
642 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
645 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
646 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
647 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
650 TRACE_smpi_comm_out(my_proc_id);
653 void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
655 int rank = MPI_COMM_WORLD->rank();
656 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
657 Datatype::encode(args.datatype1),
658 Datatype::encode(args.datatype2)));
660 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
661 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
663 TRACE_smpi_comm_out(my_proc_id);
666 void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
668 int rank = MPI_COMM_WORLD->rank();
669 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
670 nullptr, Datatype::encode(args.datatype1),
671 Datatype::encode(args.datatype2)));
673 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
674 args.sendcounts->data(), args.disps.data(), args.datatype1,
675 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
678 TRACE_smpi_comm_out(my_proc_id);
681 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
683 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
684 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
685 std::to_string(args.comp_size), /* ugly hack to print comp_size */
686 Datatype::encode(args.datatype1)));
688 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
689 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
690 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
692 smpi_execute_flops(args.comp_size);
693 TRACE_smpi_comm_out(my_proc_id);
696 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
698 TRACE_smpi_comm_in(my_proc_id, __func__,
699 new simgrid::instr::VarCollTIData(
700 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
701 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
703 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
704 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
706 TRACE_smpi_comm_out(my_proc_id);
708 } // Replay Namespace
709 }} // namespace simgrid::smpi
711 std::vector<simgrid::smpi::replay::RequestStorage> storage;
712 /** @brief Only initialize the replay, don't do it for real */
713 void smpi_replay_init(int* argc, char*** argv)
715 simgrid::smpi::Process::init(argc, argv);
716 smpi_process()->mark_as_initialized();
717 smpi_process()->set_replaying(true);
719 int my_proc_id = simgrid::s4u::this_actor::get_pid();
720 storage.resize(smpi_process_count());
722 TRACE_smpi_init(my_proc_id);
723 TRACE_smpi_computing_init(my_proc_id);
724 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
725 TRACE_smpi_comm_out(my_proc_id);
726 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
727 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
728 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
729 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
730 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
731 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
732 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
733 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
734 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
735 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
736 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
737 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
738 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
739 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
740 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
741 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
742 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
743 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
744 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
745 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
746 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
747 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
748 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
749 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
750 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
751 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
753 //if we have a delayed start, sleep here.
755 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
756 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
757 smpi_execute_flops(value);
759 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
760 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
761 smpi_execute_flops(0.0);
765 /** @brief actually run the replay after initialization */
766 void smpi_replay_main(int* argc, char*** argv)
768 static int active_processes = 0;
770 simgrid::xbt::replay_runner(*argc, *argv);
772 /* and now, finalize everything */
773 /* One active process will stop. Decrease the counter*/
774 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
775 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
776 if (count_requests > 0) {
777 MPI_Request requests[count_requests];
778 MPI_Status status[count_requests];
781 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
782 requests[i] = pair.second;
785 simgrid::smpi::Request::waitall(count_requests, requests, status);
789 if(active_processes==0){
790 /* Last process alive speaking: end the simulated timer */
791 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
792 smpi_free_replay_tmp_buffers();
795 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
796 new simgrid::instr::NoOpTIData("finalize"));
798 smpi_process()->finalize();
800 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
801 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
804 /** @brief chain a replay initialization and a replay start */
805 void smpi_replay_run(int* argc, char*** argv)
807 smpi_replay_init(argc, argv);
808 smpi_replay_main(argc, argv);