1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
68 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
70 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
71 std::string s = boost::algorithm::join(action, " ");
72 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
77 static double parse_double(std::string string)
79 return xbt_str_parse_double(string.c_str(), "%s is not a double");
86 MPI_Datatype MPI_DEFAULT_TYPE;
88 class RequestStorage {
99 req_storage_t& get_store()
104 void get_requests(std::vector<MPI_Request>& vec)
106 for (auto& pair : store) {
107 auto& req = pair.second;
108 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
109 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
110 vec.push_back(pair.second);
111 pair.second->print_request("MM");
116 MPI_Request find(int src, int dst, int tag)
118 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
119 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
122 void remove(MPI_Request req)
124 if (req == MPI_REQUEST_NULL) return;
126 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
129 void add(MPI_Request req)
131 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
132 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
135 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
136 void addNullRequest(int src, int dst, int tag)
138 store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
142 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
144 CHECK_ACTION_PARAMS(action, 3, 0)
145 src = std::stoi(action[2]);
146 dst = std::stoi(action[3]);
147 tag = std::stoi(action[4]);
150 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
152 CHECK_ACTION_PARAMS(action, 3, 1)
153 partner = std::stoi(action[2]);
154 tag = std::stoi(action[3]);
155 size = parse_double(action[4]);
156 if (action.size() > 5)
157 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
162 CHECK_ACTION_PARAMS(action, 1, 0)
163 flops = parse_double(action[2]);
166 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
168 CHECK_ACTION_PARAMS(action, 1, 2)
169 size = parse_double(action[2]);
170 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
171 if (action.size() > 4)
172 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
175 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
177 CHECK_ACTION_PARAMS(action, 2, 2)
178 comm_size = parse_double(action[2]);
179 comp_size = parse_double(action[3]);
180 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
181 if (action.size() > 5)
182 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
185 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
187 CHECK_ACTION_PARAMS(action, 2, 1)
188 comm_size = parse_double(action[2]);
189 comp_size = parse_double(action[3]);
190 if (action.size() > 4)
191 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
194 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
196 CHECK_ACTION_PARAMS(action, 2, 1)
197 comm_size = MPI_COMM_WORLD->size();
198 send_size = parse_double(action[2]);
199 recv_size = parse_double(action[3]);
201 if (action.size() > 4)
202 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
203 if (action.size() > 5)
204 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
207 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
209 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
212 1) 68 is the sendcounts
213 2) 68 is the recvcounts
214 3) 0 is the root node
215 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
218 CHECK_ACTION_PARAMS(action, 2, 3)
219 comm_size = MPI_COMM_WORLD->size();
220 send_size = parse_double(action[2]);
221 recv_size = parse_double(action[3]);
223 if (name == "gather") {
224 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
225 if (action.size() > 5)
226 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
227 if (action.size() > 6)
228 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
230 if (action.size() > 4)
231 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
232 if (action.size() > 5)
233 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
237 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
239 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
240 0 gather 68 68 10 10 10 0 0 0
242 1) 68 is the sendcount
243 2) 68 10 10 10 is the recvcounts
244 3) 0 is the root node
245 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
246 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
248 comm_size = MPI_COMM_WORLD->size();
249 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
250 send_size = parse_double(action[2]);
251 disps = std::vector<int>(comm_size, 0);
252 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
254 if (name == "gatherV") {
255 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
256 if (action.size() > 4 + comm_size)
257 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
258 if (action.size() > 5 + comm_size)
259 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
261 int datatype_index = 0;
263 /* The 3 comes from "0 gather <sendcount>", which must always be present.
264 * The + comm_size is the recvcounts array, which must also be present
266 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
267 datatype_index = 3 + comm_size;
268 disp_index = datatype_index + 1;
269 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
270 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
271 } else if (action.size() >
272 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
273 disp_index = 3 + comm_size;
274 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
275 datatype_index = 3 + comm_size;
276 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
277 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
280 if (disp_index != 0) {
281 for (unsigned int i = 0; i < comm_size; i++)
282 disps[i] = std::stoi(action[disp_index + i]);
286 for (unsigned int i = 0; i < comm_size; i++) {
287 (*recvcounts)[i] = std::stoi(action[i + 3]);
289 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
292 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
294 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
297 1) 68 is the sendcounts
298 2) 68 is the recvcounts
299 3) 0 is the root node
300 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
301 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
303 CHECK_ACTION_PARAMS(action, 2, 3)
304 comm_size = MPI_COMM_WORLD->size();
305 send_size = parse_double(action[2]);
306 recv_size = parse_double(action[3]);
307 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
308 if (action.size() > 5)
309 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
310 if (action.size() > 6)
311 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
314 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
316 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
317 0 gather 68 10 10 10 68 0 0 0
319 1) 68 10 10 10 is the sendcounts
320 2) 68 is the recvcount
321 3) 0 is the root node
322 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
323 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
325 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
326 recv_size = parse_double(action[2 + comm_size]);
327 disps = std::vector<int>(comm_size, 0);
328 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
330 if (action.size() > 5 + comm_size)
331 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
332 if (action.size() > 5 + comm_size)
333 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
335 for (unsigned int i = 0; i < comm_size; i++) {
336 (*sendcounts)[i] = std::stoi(action[i + 2]);
338 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
339 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
342 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
344 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
345 0 reduceScatter 275427 275427 275427 204020 11346849 0
347 1) The first four values after the name of the action declare the recvcounts array
348 2) The value 11346849 is the amount of instructions
349 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
351 comm_size = MPI_COMM_WORLD->size();
352 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
353 comp_size = parse_double(action[2 + comm_size]);
354 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
355 if (action.size() > 3 + comm_size)
356 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
358 for (unsigned int i = 0; i < comm_size; i++) {
359 recvcounts->push_back(std::stoi(action[i + 2]));
361 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
364 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
366 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
367 0 allToAllV 100 1 7 10 12 100 1 70 10 5
369 1) 100 is the size of the send buffer *sizeof(int),
370 2) 1 7 10 12 is the sendcounts array
371 3) 100*sizeof(int) is the size of the receiver buffer
372 4) 1 70 10 5 is the recvcounts array
374 comm_size = MPI_COMM_WORLD->size();
375 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
376 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
377 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
378 senddisps = std::vector<int>(comm_size, 0);
379 recvdisps = std::vector<int>(comm_size, 0);
381 if (action.size() > 5 + 2 * comm_size)
382 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
383 if (action.size() > 5 + 2 * comm_size)
384 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
386 send_buf_size = parse_double(action[2]);
387 recv_buf_size = parse_double(action[3 + comm_size]);
388 for (unsigned int i = 0; i < comm_size; i++) {
389 (*sendcounts)[i] = std::stoi(action[3 + i]);
390 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
392 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
393 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
397 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
399 // Needs to be re-initialized for every action, hence here
400 double start_time = smpi_process()->simulated_elapsed();
401 args.parse(action, name);
404 log_timed_action(action, start_time);
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
409 std::string s = boost::algorithm::join(action, " ");
410 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
412 req_storage.remove(request);
414 if (request == MPI_REQUEST_NULL) {
415 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
420 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
422 // Must be taken before Request::wait() since the request may be set to
423 // MPI_REQUEST_NULL by Request::wait!
424 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
425 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
426 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
429 Request::wait(&request, &status);
431 TRACE_smpi_comm_out(rank);
432 if (is_wait_for_receive)
433 TRACE_smpi_recv(args.src, args.dst, args.tag);
436 void SendAction::kernel(simgrid::xbt::ReplayAction& action)
438 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
440 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
441 args.tag, Datatype::encode(args.datatype1)));
442 if (not TRACE_smpi_view_internals())
443 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
445 if (name == "send") {
446 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447 } else if (name == "Isend") {
448 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
449 req_storage.add(request);
451 xbt_die("Don't know this action, %s", name.c_str());
454 TRACE_smpi_comm_out(my_proc_id);
457 void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
459 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
461 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
462 args.tag, Datatype::encode(args.datatype1)));
465 // unknown size from the receiver point of view
466 if (args.size <= 0.0) {
467 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
468 args.size = status.count;
471 if (name == "recv") {
472 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
473 } else if (name == "Irecv") {
474 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
475 req_storage.add(request);
478 TRACE_smpi_comm_out(my_proc_id);
479 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
480 if (name == "recv" && not TRACE_smpi_view_internals()) {
481 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
485 void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
487 TRACE_smpi_computing_in(my_proc_id, args.flops);
488 smpi_execute_flops(args.flops);
489 TRACE_smpi_computing_out(my_proc_id);
492 void TestAction::kernel(simgrid::xbt::ReplayAction& action)
494 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
495 req_storage.remove(request);
496 // if request is null here, this may mean that a previous test has succeeded
497 // Different times in traced application and replayed version may lead to this
498 // In this case, ignore the extra calls.
499 if (request != MPI_REQUEST_NULL) {
500 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
503 int flag = Request::test(&request, &status);
505 XBT_DEBUG("MPI_Test result: %d", flag);
506 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
508 if (request == MPI_REQUEST_NULL)
509 req_storage.addNullRequest(args.src, args.dst, args.tag);
511 req_storage.add(request);
513 TRACE_smpi_comm_out(my_proc_id);
517 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
519 CHECK_ACTION_PARAMS(action, 0, 1)
520 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
521 : MPI_BYTE; // default TAU datatype
523 /* start a simulated timer */
524 smpi_process()->simulated_start();
527 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
532 void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
534 const unsigned int count_requests = req_storage.size();
536 if (count_requests > 0) {
537 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
538 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
539 std::vector<MPI_Request> reqs;
540 req_storage.get_requests(reqs);
541 for (const auto& req : reqs) {
542 if (req && (req->flags() & MPI_REQ_RECV)) {
543 sender_receiver.push_back({req->src(), req->dst()});
546 MPI_Status status[count_requests];
547 Request::waitall(count_requests, &(reqs.data())[0], status);
548 req_storage.get_store().clear();
550 for (auto& pair : sender_receiver) {
551 TRACE_smpi_recv(pair.first, pair.second, 0);
553 TRACE_smpi_comm_out(my_proc_id);
557 void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
559 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
560 Colls::barrier(MPI_COMM_WORLD);
561 TRACE_smpi_comm_out(my_proc_id);
564 void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
566 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
567 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
568 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
570 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
572 TRACE_smpi_comm_out(my_proc_id);
575 void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
577 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
578 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
579 args.comp_size, args.comm_size, -1,
580 Datatype::encode(args.datatype1), ""));
582 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
583 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
584 smpi_execute_flops(args.comp_size);
586 TRACE_smpi_comm_out(my_proc_id);
589 void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
591 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
592 Datatype::encode(args.datatype1), ""));
594 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
595 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
596 smpi_execute_flops(args.comp_size);
598 TRACE_smpi_comm_out(my_proc_id);
601 void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
603 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
604 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
605 Datatype::encode(args.datatype1),
606 Datatype::encode(args.datatype2)));
608 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
609 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
610 args.recv_size, args.datatype2, MPI_COMM_WORLD);
612 TRACE_smpi_comm_out(my_proc_id);
615 void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
617 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
618 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
620 if (name == "gather") {
621 int rank = MPI_COMM_WORLD->rank();
622 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
623 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
626 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
627 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
629 TRACE_smpi_comm_out(my_proc_id);
632 void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
634 int rank = MPI_COMM_WORLD->rank();
636 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
637 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
638 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
640 if (name == "gatherV") {
641 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
642 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
643 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
646 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
647 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
648 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
651 TRACE_smpi_comm_out(my_proc_id);
654 void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
656 int rank = MPI_COMM_WORLD->rank();
657 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
658 Datatype::encode(args.datatype1),
659 Datatype::encode(args.datatype2)));
661 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
664 TRACE_smpi_comm_out(my_proc_id);
667 void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
669 int rank = MPI_COMM_WORLD->rank();
670 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
671 nullptr, Datatype::encode(args.datatype1),
672 Datatype::encode(args.datatype2)));
674 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
675 args.sendcounts->data(), args.disps.data(), args.datatype1,
676 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
679 TRACE_smpi_comm_out(my_proc_id);
682 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
684 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
685 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
686 std::to_string(args.comp_size), /* ugly hack to print comp_size */
687 Datatype::encode(args.datatype1)));
689 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
690 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
691 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
693 smpi_execute_flops(args.comp_size);
694 TRACE_smpi_comm_out(my_proc_id);
697 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
699 TRACE_smpi_comm_in(my_proc_id, __func__,
700 new simgrid::instr::VarCollTIData(
701 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
702 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
704 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
705 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
707 TRACE_smpi_comm_out(my_proc_id);
709 } // Replay Namespace
710 }} // namespace simgrid::smpi
712 static std::vector<simgrid::smpi::replay::RequestStorage> storage;
713 /** @brief Only initialize the replay, don't do it for real */
714 void smpi_replay_init(int* argc, char*** argv)
716 simgrid::smpi::Process::init(argc, argv);
717 smpi_process()->mark_as_initialized();
718 smpi_process()->set_replaying(true);
720 int my_proc_id = simgrid::s4u::this_actor::get_pid();
721 storage.resize(smpi_process_count());
723 TRACE_smpi_init(my_proc_id);
724 TRACE_smpi_computing_init(my_proc_id);
725 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
726 TRACE_smpi_comm_out(my_proc_id);
727 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
728 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
729 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
730 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
731 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
732 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
733 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
734 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
735 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
736 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
737 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
738 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
739 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
740 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
741 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
742 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
743 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
744 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
745 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
746 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
747 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
748 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
749 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
750 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
751 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
752 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
754 //if we have a delayed start, sleep here.
756 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
757 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
758 smpi_execute_flops(value);
760 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
761 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
762 smpi_execute_flops(0.0);
766 /** @brief actually run the replay after initialization */
767 void smpi_replay_main(int* argc, char*** argv)
769 static int active_processes = 0;
771 simgrid::xbt::replay_runner(*argc, *argv);
773 /* and now, finalize everything */
774 /* One active process will stop. Decrease the counter*/
775 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
776 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
777 if (count_requests > 0) {
778 MPI_Request requests[count_requests];
779 MPI_Status status[count_requests];
782 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
783 requests[i] = pair.second;
786 simgrid::smpi::Request::waitall(count_requests, requests, status);
790 if(active_processes==0){
791 /* Last process alive speaking: end the simulated timer */
792 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
793 smpi_free_replay_tmp_buffers();
796 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
797 new simgrid::instr::NoOpTIData("finalize"));
799 smpi_process()->finalize();
801 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
802 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
805 /** @brief chain a replay initialization and a replay start */
806 void smpi_replay_run(int* argc, char*** argv)
808 smpi_replay_init(argc, argv);
809 smpi_replay_main(argc, argv);