1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14 #include <simgrid/smpi/replay.hpp>
16 #include <boost/algorithm/string/join.hpp>
19 #include <unordered_map>
23 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
24 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
25 // this could go into a header file.
26 namespace hash_tuple {
27 template <typename TT> class hash {
29 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 // Recursive template code derived from Matthieu M.
38 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 static void apply(size_t& seed, Tuple const& tuple)
42 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
43 hash_combine(seed, std::get<Index>(tuple));
47 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 template <typename... TT> class hash<std::tuple<TT...>> {
54 size_t operator()(std::tuple<TT...> const& tt) const
57 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
63 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
65 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
66 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
69 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
71 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
72 std::string s = boost::algorithm::join(action, " ");
73 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
78 static double parse_double(std::string string)
80 return xbt_str_parse_double(string.c_str(), "%s is not a double");
87 MPI_Datatype MPI_DEFAULT_TYPE;
89 class RequestStorage {
100 req_storage_t& get_store()
105 void get_requests(std::vector<MPI_Request>& vec)
107 for (auto& pair : store) {
108 auto& req = pair.second;
109 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
110 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
111 vec.push_back(pair.second);
112 pair.second->print_request("MM");
117 MPI_Request find(int src, int dst, int tag)
119 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
120 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
123 void remove(MPI_Request req)
125 if (req == MPI_REQUEST_NULL) return;
127 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
130 void add(MPI_Request req)
132 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
133 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
136 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
137 void addNullRequest(int src, int dst, int tag)
139 store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
143 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
145 CHECK_ACTION_PARAMS(action, 3, 0)
146 src = std::stoi(action[2]);
147 dst = std::stoi(action[3]);
148 tag = std::stoi(action[4]);
151 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
153 CHECK_ACTION_PARAMS(action, 3, 1)
154 partner = std::stoi(action[2]);
155 tag = std::stoi(action[3]);
156 size = parse_double(action[4]);
157 if (action.size() > 5)
158 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
161 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
163 CHECK_ACTION_PARAMS(action, 1, 0)
164 flops = parse_double(action[2]);
167 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
169 CHECK_ACTION_PARAMS(action, 1, 2)
170 size = parse_double(action[2]);
171 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
172 if (action.size() > 4)
173 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
176 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
178 CHECK_ACTION_PARAMS(action, 2, 2)
179 comm_size = parse_double(action[2]);
180 comp_size = parse_double(action[3]);
181 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
182 if (action.size() > 5)
183 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
186 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
188 CHECK_ACTION_PARAMS(action, 2, 1)
189 comm_size = parse_double(action[2]);
190 comp_size = parse_double(action[3]);
191 if (action.size() > 4)
192 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
195 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
197 CHECK_ACTION_PARAMS(action, 2, 1)
198 comm_size = MPI_COMM_WORLD->size();
199 send_size = parse_double(action[2]);
200 recv_size = parse_double(action[3]);
202 if (action.size() > 4)
203 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
204 if (action.size() > 5)
205 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
208 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
210 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
213 1) 68 is the sendcounts
214 2) 68 is the recvcounts
215 3) 0 is the root node
216 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
217 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
219 CHECK_ACTION_PARAMS(action, 2, 3)
220 comm_size = MPI_COMM_WORLD->size();
221 send_size = parse_double(action[2]);
222 recv_size = parse_double(action[3]);
224 if (name == "gather") {
225 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
226 if (action.size() > 5)
227 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
228 if (action.size() > 6)
229 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
231 if (action.size() > 4)
232 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
233 if (action.size() > 5)
234 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
238 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
240 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
241 0 gather 68 68 10 10 10 0 0 0
243 1) 68 is the sendcount
244 2) 68 10 10 10 is the recvcounts
245 3) 0 is the root node
246 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
247 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
249 comm_size = MPI_COMM_WORLD->size();
250 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
251 send_size = parse_double(action[2]);
252 disps = std::vector<int>(comm_size, 0);
253 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
255 if (name == "gatherV") {
256 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
257 if (action.size() > 4 + comm_size)
258 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
259 if (action.size() > 5 + comm_size)
260 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
262 int datatype_index = 0;
264 /* The 3 comes from "0 gather <sendcount>", which must always be present.
265 * The + comm_size is the recvcounts array, which must also be present
267 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
268 datatype_index = 3 + comm_size;
269 disp_index = datatype_index + 1;
270 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
271 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
272 } else if (action.size() >
273 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
274 disp_index = 3 + comm_size;
275 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
276 datatype_index = 3 + comm_size;
277 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
278 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
281 if (disp_index != 0) {
282 for (unsigned int i = 0; i < comm_size; i++)
283 disps[i] = std::stoi(action[disp_index + i]);
287 for (unsigned int i = 0; i < comm_size; i++) {
288 (*recvcounts)[i] = std::stoi(action[i + 3]);
290 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
293 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
295 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
298 1) 68 is the sendcounts
299 2) 68 is the recvcounts
300 3) 0 is the root node
301 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
302 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
304 CHECK_ACTION_PARAMS(action, 2, 3)
305 comm_size = MPI_COMM_WORLD->size();
306 send_size = parse_double(action[2]);
307 recv_size = parse_double(action[3]);
308 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
309 if (action.size() > 5)
310 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
311 if (action.size() > 6)
312 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
315 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
317 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
318 0 gather 68 10 10 10 68 0 0 0
320 1) 68 10 10 10 is the sendcounts
321 2) 68 is the recvcount
322 3) 0 is the root node
323 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
324 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
326 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
327 recv_size = parse_double(action[2 + comm_size]);
328 disps = std::vector<int>(comm_size, 0);
329 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
331 if (action.size() > 5 + comm_size)
332 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
333 if (action.size() > 5 + comm_size)
334 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
336 for (unsigned int i = 0; i < comm_size; i++) {
337 (*sendcounts)[i] = std::stoi(action[i + 2]);
339 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
340 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
343 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
345 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
346 0 reduceScatter 275427 275427 275427 204020 11346849 0
348 1) The first four values after the name of the action declare the recvcounts array
349 2) The value 11346849 is the amount of instructions
350 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
352 comm_size = MPI_COMM_WORLD->size();
353 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
354 comp_size = parse_double(action[2 + comm_size]);
355 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
356 if (action.size() > 3 + comm_size)
357 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
359 for (unsigned int i = 0; i < comm_size; i++) {
360 recvcounts->push_back(std::stoi(action[i + 2]));
362 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
365 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
367 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
368 0 allToAllV 100 1 7 10 12 100 1 70 10 5
370 1) 100 is the size of the send buffer *sizeof(int),
371 2) 1 7 10 12 is the sendcounts array
372 3) 100*sizeof(int) is the size of the receiver buffer
373 4) 1 70 10 5 is the recvcounts array
375 comm_size = MPI_COMM_WORLD->size();
376 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
377 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
378 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379 senddisps = std::vector<int>(comm_size, 0);
380 recvdisps = std::vector<int>(comm_size, 0);
382 if (action.size() > 5 + 2 * comm_size)
383 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
384 if (action.size() > 5 + 2 * comm_size)
385 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
387 send_buf_size = parse_double(action[2]);
388 recv_buf_size = parse_double(action[3 + comm_size]);
389 for (unsigned int i = 0; i < comm_size; i++) {
390 (*sendcounts)[i] = std::stoi(action[3 + i]);
391 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
393 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
394 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
398 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
400 // Needs to be re-initialized for every action, hence here
401 double start_time = smpi_process()->simulated_elapsed();
402 args.parse(action, name);
405 log_timed_action(action, start_time);
408 class WaitAction : public ReplayAction<WaitTestParser> {
410 RequestStorage& req_storage;
413 explicit WaitAction(RequestStorage& storage) : ReplayAction("Wait"), req_storage(storage) {}
414 void kernel(simgrid::xbt::ReplayAction& action) override
416 std::string s = boost::algorithm::join(action, " ");
417 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
418 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
419 req_storage.remove(request);
421 if (request == MPI_REQUEST_NULL) {
422 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
427 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
429 // Must be taken before Request::wait() since the request may be set to
430 // MPI_REQUEST_NULL by Request::wait!
431 bool is_wait_for_receive = (request->flags() & RECV);
432 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
433 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
436 Request::wait(&request, &status);
438 TRACE_smpi_comm_out(rank);
439 if (is_wait_for_receive)
440 TRACE_smpi_recv(args.src, args.dst, args.tag);
444 class SendAction : public ReplayAction<SendRecvParser> {
446 RequestStorage& req_storage;
449 explicit SendAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
450 void kernel(simgrid::xbt::ReplayAction& action) override
452 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
454 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
455 args.tag, Datatype::encode(args.datatype1)));
456 if (not TRACE_smpi_view_internals())
457 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
459 if (name == "send") {
460 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
461 } else if (name == "Isend") {
462 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
463 req_storage.add(request);
465 xbt_die("Don't know this action, %s", name.c_str());
468 TRACE_smpi_comm_out(my_proc_id);
472 class RecvAction : public ReplayAction<SendRecvParser> {
474 RequestStorage& req_storage;
477 explicit RecvAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
478 void kernel(simgrid::xbt::ReplayAction& action) override
480 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
482 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
483 args.tag, Datatype::encode(args.datatype1)));
486 // unknown size from the receiver point of view
487 if (args.size <= 0.0) {
488 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
489 args.size = status.count;
492 if (name == "recv") {
493 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
494 } else if (name == "Irecv") {
495 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
496 req_storage.add(request);
499 TRACE_smpi_comm_out(my_proc_id);
500 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
501 if (name == "recv" && not TRACE_smpi_view_internals()) {
502 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
507 class ComputeAction : public ReplayAction<ComputeParser> {
509 ComputeAction() : ReplayAction("compute") {}
510 void kernel(simgrid::xbt::ReplayAction& action) override
512 TRACE_smpi_computing_in(my_proc_id, args.flops);
513 smpi_execute_flops(args.flops);
514 TRACE_smpi_computing_out(my_proc_id);
518 class TestAction : public ReplayAction<WaitTestParser> {
520 RequestStorage& req_storage;
523 explicit TestAction(RequestStorage& storage) : ReplayAction("Test"), req_storage(storage) {}
524 void kernel(simgrid::xbt::ReplayAction& action) override
526 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
527 req_storage.remove(request);
528 // if request is null here, this may mean that a previous test has succeeded
529 // Different times in traced application and replayed version may lead to this
530 // In this case, ignore the extra calls.
531 if (request != MPI_REQUEST_NULL) {
532 TRACE_smpi_testing_in(my_proc_id);
535 int flag = Request::test(&request, &status);
537 XBT_DEBUG("MPI_Test result: %d", flag);
538 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
540 if (request == MPI_REQUEST_NULL)
541 req_storage.addNullRequest(args.src, args.dst, args.tag);
543 req_storage.add(request);
545 TRACE_smpi_testing_out(my_proc_id);
550 class InitAction : public ReplayAction<ActionArgParser> {
552 InitAction() : ReplayAction("Init") {}
553 void kernel(simgrid::xbt::ReplayAction& action) override
555 CHECK_ACTION_PARAMS(action, 0, 1)
556 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
557 : MPI_BYTE; // default TAU datatype
559 /* start a simulated timer */
560 smpi_process()->simulated_start();
564 class CommunicatorAction : public ReplayAction<ActionArgParser> {
566 CommunicatorAction() : ReplayAction("Comm") {}
567 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
570 class WaitAllAction : public ReplayAction<ActionArgParser> {
572 RequestStorage& req_storage;
575 explicit WaitAllAction(RequestStorage& storage) : ReplayAction("waitAll"), req_storage(storage) {}
576 void kernel(simgrid::xbt::ReplayAction& action) override
578 const unsigned int count_requests = req_storage.size();
580 if (count_requests > 0) {
581 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
582 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
583 std::vector<MPI_Request> reqs;
584 req_storage.get_requests(reqs);
585 for (const auto& req : reqs) {
586 if (req && (req->flags() & RECV)) {
587 sender_receiver.push_back({req->src(), req->dst()});
590 MPI_Status status[count_requests];
591 Request::waitall(count_requests, &(reqs.data())[0], status);
592 req_storage.get_store().clear();
594 for (auto& pair : sender_receiver) {
595 TRACE_smpi_recv(pair.first, pair.second, 0);
597 TRACE_smpi_comm_out(my_proc_id);
602 class BarrierAction : public ReplayAction<ActionArgParser> {
604 BarrierAction() : ReplayAction("barrier") {}
605 void kernel(simgrid::xbt::ReplayAction& action) override
607 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
608 Colls::barrier(MPI_COMM_WORLD);
609 TRACE_smpi_comm_out(my_proc_id);
613 class BcastAction : public ReplayAction<BcastArgParser> {
615 BcastAction() : ReplayAction("bcast") {}
616 void kernel(simgrid::xbt::ReplayAction& action) override
618 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
619 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
620 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
622 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
624 TRACE_smpi_comm_out(my_proc_id);
628 class ReduceAction : public ReplayAction<ReduceArgParser> {
630 ReduceAction() : ReplayAction("reduce") {}
631 void kernel(simgrid::xbt::ReplayAction& action) override
633 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
634 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
635 args.comp_size, args.comm_size, -1,
636 Datatype::encode(args.datatype1), ""));
638 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
639 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
640 smpi_execute_flops(args.comp_size);
642 TRACE_smpi_comm_out(my_proc_id);
646 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
648 AllReduceAction() : ReplayAction("allReduce") {}
649 void kernel(simgrid::xbt::ReplayAction& action) override
651 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
652 Datatype::encode(args.datatype1), ""));
654 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
655 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
656 smpi_execute_flops(args.comp_size);
658 TRACE_smpi_comm_out(my_proc_id);
662 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
664 AllToAllAction() : ReplayAction("allToAll") {}
665 void kernel(simgrid::xbt::ReplayAction& action) override
667 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
668 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
669 Datatype::encode(args.datatype1),
670 Datatype::encode(args.datatype2)));
672 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
673 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
674 args.recv_size, args.datatype2, MPI_COMM_WORLD);
676 TRACE_smpi_comm_out(my_proc_id);
680 class GatherAction : public ReplayAction<GatherArgParser> {
682 explicit GatherAction(std::string name) : ReplayAction(name) {}
683 void kernel(simgrid::xbt::ReplayAction& action) override
685 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
686 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
688 if (name == "gather") {
689 int rank = MPI_COMM_WORLD->rank();
690 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
691 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
694 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
695 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
697 TRACE_smpi_comm_out(my_proc_id);
701 class GatherVAction : public ReplayAction<GatherVArgParser> {
703 explicit GatherVAction(std::string name) : ReplayAction(name) {}
704 void kernel(simgrid::xbt::ReplayAction& action) override
706 int rank = MPI_COMM_WORLD->rank();
708 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
709 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
710 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
712 if (name == "gatherV") {
713 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
714 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
715 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
718 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
719 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
720 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
723 TRACE_smpi_comm_out(my_proc_id);
727 class ScatterAction : public ReplayAction<ScatterArgParser> {
729 ScatterAction() : ReplayAction("scatter") {}
730 void kernel(simgrid::xbt::ReplayAction& action) override
732 int rank = MPI_COMM_WORLD->rank();
733 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
734 Datatype::encode(args.datatype1),
735 Datatype::encode(args.datatype2)));
737 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
738 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
740 TRACE_smpi_comm_out(my_proc_id);
745 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
747 ScatterVAction() : ReplayAction("scatterV") {}
748 void kernel(simgrid::xbt::ReplayAction& action) override
750 int rank = MPI_COMM_WORLD->rank();
751 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
752 nullptr, Datatype::encode(args.datatype1),
753 Datatype::encode(args.datatype2)));
755 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
756 args.sendcounts->data(), args.disps.data(), args.datatype1,
757 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
760 TRACE_smpi_comm_out(my_proc_id);
764 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
766 ReduceScatterAction() : ReplayAction("reduceScatter") {}
767 void kernel(simgrid::xbt::ReplayAction& action) override
769 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
770 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
771 std::to_string(args.comp_size), /* ugly hack to print comp_size */
772 Datatype::encode(args.datatype1)));
774 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
775 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
776 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
778 smpi_execute_flops(args.comp_size);
779 TRACE_smpi_comm_out(my_proc_id);
783 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
785 AllToAllVAction() : ReplayAction("allToAllV") {}
786 void kernel(simgrid::xbt::ReplayAction& action) override
788 TRACE_smpi_comm_in(my_proc_id, __func__,
789 new simgrid::instr::VarCollTIData(
790 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
791 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
793 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
794 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
796 TRACE_smpi_comm_out(my_proc_id);
799 } // Replay Namespace
800 }} // namespace simgrid::smpi
802 std::vector<simgrid::smpi::replay::RequestStorage> storage;
803 /** @brief Only initialize the replay, don't do it for real */
804 void smpi_replay_init(int* argc, char*** argv)
806 simgrid::smpi::Process::init(argc, argv);
807 smpi_process()->mark_as_initialized();
808 smpi_process()->set_replaying(true);
810 int my_proc_id = simgrid::s4u::this_actor::get_pid();
811 storage.resize(smpi_process_count());
813 TRACE_smpi_init(my_proc_id);
814 TRACE_smpi_computing_init(my_proc_id);
815 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
816 TRACE_smpi_comm_out(my_proc_id);
817 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
818 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
819 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
823 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
824 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
825 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
826 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
827 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
828 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
829 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
830 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
831 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
832 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
833 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
834 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
835 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
836 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
837 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
838 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
839 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
840 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
841 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
842 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
844 //if we have a delayed start, sleep here.
846 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
847 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
848 smpi_execute_flops(value);
850 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
851 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
852 smpi_execute_flops(0.0);
856 /** @brief actually run the replay after initialization */
857 void smpi_replay_main(int* argc, char*** argv)
859 static int active_processes = 0;
861 simgrid::xbt::replay_runner(*argc, *argv);
863 /* and now, finalize everything */
864 /* One active process will stop. Decrease the counter*/
865 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
866 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
867 if (count_requests > 0) {
868 MPI_Request requests[count_requests];
869 MPI_Status status[count_requests];
872 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
873 requests[i] = pair.second;
876 simgrid::smpi::Request::waitall(count_requests, requests, status);
880 if(active_processes==0){
881 /* Last process alive speaking: end the simulated timer */
882 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
883 smpi_free_replay_tmp_buffers();
886 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
887 new simgrid::instr::NoOpTIData("finalize"));
889 smpi_process()->finalize();
891 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
892 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
895 /** @brief chain a replay initialization and a replay start */
896 void smpi_replay_run(int* argc, char*** argv)
898 smpi_replay_init(argc, argv);
899 smpi_replay_main(argc, argv);