1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14 #include <simgrid/smpi/replay.hpp>
16 #include <boost/algorithm/string/join.hpp>
19 #include <unordered_map>
23 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
24 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
25 // this could go into a header file.
26 namespace hash_tuple {
27 template <typename TT> class hash {
29 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 // Recursive template code derived from Matthieu M.
38 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 static void apply(size_t& seed, Tuple const& tuple)
42 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
43 hash_combine(seed, std::get<Index>(tuple));
47 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 template <typename... TT> class hash<std::tuple<TT...>> {
54 size_t operator()(std::tuple<TT...> const& tt) const
57 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
63 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
65 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
66 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
69 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
71 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
72 std::string s = boost::algorithm::join(action, " ");
73 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
78 static double parse_double(std::string string)
80 return xbt_str_parse_double(string.c_str(), "%s is not a double");
87 MPI_Datatype MPI_DEFAULT_TYPE;
89 class RequestStorage {
100 req_storage_t& get_store()
105 void get_requests(std::vector<MPI_Request>& vec)
107 for (auto& pair : store) {
108 auto& req = pair.second;
109 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
110 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
111 vec.push_back(pair.second);
112 pair.second->print_request("MM");
117 MPI_Request find(int src, int dst, int tag)
119 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
120 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
123 void remove(MPI_Request req)
125 if (req == MPI_REQUEST_NULL) return;
127 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
130 void add(MPI_Request req)
132 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
133 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
136 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
137 void addNullRequest(int src, int dst, int tag)
139 store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
143 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
145 CHECK_ACTION_PARAMS(action, 3, 0)
146 src = std::stoi(action[2]);
147 dst = std::stoi(action[3]);
148 tag = std::stoi(action[4]);
151 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
153 CHECK_ACTION_PARAMS(action, 3, 1)
154 partner = std::stoi(action[2]);
155 tag = std::stoi(action[3]);
156 size = parse_double(action[4]);
157 if (action.size() > 5)
158 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
170 CHECK_ACTION_PARAMS(action, 1, 2)
171 size = parse_double(action[2]);
172 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173 if (action.size() > 4)
174 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
179 CHECK_ACTION_PARAMS(action, 2, 2)
180 comm_size = parse_double(action[2]);
181 comp_size = parse_double(action[3]);
182 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
183 if (action.size() > 5)
184 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
189 CHECK_ACTION_PARAMS(action, 2, 1)
190 comm_size = parse_double(action[2]);
191 comp_size = parse_double(action[3]);
192 if (action.size() > 4)
193 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
198 CHECK_ACTION_PARAMS(action, 2, 1)
199 comm_size = MPI_COMM_WORLD->size();
200 send_size = parse_double(action[2]);
201 recv_size = parse_double(action[3]);
203 if (action.size() > 4)
204 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205 if (action.size() > 5)
206 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
211 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
214 1) 68 is the sendcounts
215 2) 68 is the recvcounts
216 3) 0 is the root node
217 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
220 CHECK_ACTION_PARAMS(action, 2, 3)
221 comm_size = MPI_COMM_WORLD->size();
222 send_size = parse_double(action[2]);
223 recv_size = parse_double(action[3]);
225 if (name == "gather") {
226 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
227 if (action.size() > 5)
228 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229 if (action.size() > 6)
230 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
233 if (action.size() > 4)
234 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
235 if (action.size() > 5)
236 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
240 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
242 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
243 0 gather 68 68 10 10 10 0 0 0
245 1) 68 is the sendcount
246 2) 68 10 10 10 is the recvcounts
247 3) 0 is the root node
248 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
249 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
251 comm_size = MPI_COMM_WORLD->size();
252 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
253 send_size = parse_double(action[2]);
254 disps = std::vector<int>(comm_size, 0);
255 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
257 if (name == "gatherV") {
258 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
259 if (action.size() > 4 + comm_size)
260 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
261 if (action.size() > 5 + comm_size)
262 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
265 int datatype_index = 0;
267 /* The 3 comes from "0 gather <sendcount>", which must always be present.
268 * The + comm_size is the recvcounts array, which must also be present
270 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
271 datatype_index = 3 + comm_size;
272 disp_index = datatype_index + 1;
273 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
274 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
275 } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
276 disp_index = 3 + comm_size;
277 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
278 datatype_index = 3 + comm_size;
279 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
280 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
283 if (disp_index != 0) {
284 for (unsigned int i = 0; i < comm_size; i++)
285 disps[i] = std::stoi(action[disp_index + i]);
289 for (unsigned int i = 0; i < comm_size; i++) {
290 (*recvcounts)[i] = std::stoi(action[i + 3]);
292 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
295 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
297 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
300 1) 68 is the sendcounts
301 2) 68 is the recvcounts
302 3) 0 is the root node
303 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
304 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
306 CHECK_ACTION_PARAMS(action, 2, 3)
307 comm_size = MPI_COMM_WORLD->size();
308 send_size = parse_double(action[2]);
309 recv_size = parse_double(action[3]);
310 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
311 if (action.size() > 5)
312 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
313 if (action.size() > 6)
314 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
317 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
319 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
320 0 gather 68 10 10 10 68 0 0 0
322 1) 68 10 10 10 is the sendcounts
323 2) 68 is the recvcount
324 3) 0 is the root node
325 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
326 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
328 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
329 recv_size = parse_double(action[2 + comm_size]);
330 disps = std::vector<int>(comm_size, 0);
331 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
333 if (action.size() > 5 + comm_size)
334 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
335 if (action.size() > 5 + comm_size)
336 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
338 for (unsigned int i = 0; i < comm_size; i++) {
339 (*sendcounts)[i] = std::stoi(action[i + 2]);
341 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
342 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
345 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
347 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
348 0 reduceScatter 275427 275427 275427 204020 11346849 0
350 1) The first four values after the name of the action declare the recvcounts array
351 2) The value 11346849 is the amount of instructions
352 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
354 comm_size = MPI_COMM_WORLD->size();
355 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
356 comp_size = parse_double(action[2+comm_size]);
357 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
358 if (action.size() > 3 + comm_size)
359 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
361 for (unsigned int i = 0; i < comm_size; i++) {
362 recvcounts->push_back(std::stoi(action[i + 2]));
364 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
367 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
369 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
370 0 allToAllV 100 1 7 10 12 100 1 70 10 5
372 1) 100 is the size of the send buffer *sizeof(int),
373 2) 1 7 10 12 is the sendcounts array
374 3) 100*sizeof(int) is the size of the receiver buffer
375 4) 1 70 10 5 is the recvcounts array
377 comm_size = MPI_COMM_WORLD->size();
378 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
379 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
381 senddisps = std::vector<int>(comm_size, 0);
382 recvdisps = std::vector<int>(comm_size, 0);
384 if (action.size() > 5 + 2 * comm_size)
385 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
386 if (action.size() > 5 + 2 * comm_size)
387 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
389 send_buf_size=parse_double(action[2]);
390 recv_buf_size=parse_double(action[3+comm_size]);
391 for (unsigned int i = 0; i < comm_size; i++) {
392 (*sendcounts)[i] = std::stoi(action[3 + i]);
393 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
395 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
396 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
400 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
402 // Needs to be re-initialized for every action, hence here
403 double start_time = smpi_process()->simulated_elapsed();
404 args.parse(action, name);
407 log_timed_action(action, start_time);
410 class WaitAction : public ReplayAction<WaitTestParser> {
412 RequestStorage& req_storage;
415 explicit WaitAction(RequestStorage& storage) : ReplayAction("Wait"), req_storage(storage) {}
416 void kernel(simgrid::xbt::ReplayAction& action) override
418 std::string s = boost::algorithm::join(action, " ");
419 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
420 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
421 req_storage.remove(request);
423 if (request == MPI_REQUEST_NULL) {
424 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
429 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
431 // Must be taken before Request::wait() since the request may be set to
432 // MPI_REQUEST_NULL by Request::wait!
433 bool is_wait_for_receive = (request->flags() & RECV);
434 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
435 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
438 Request::wait(&request, &status);
440 TRACE_smpi_comm_out(rank);
441 if (is_wait_for_receive)
442 TRACE_smpi_recv(args.src, args.dst, args.tag);
446 class SendAction : public ReplayAction<SendRecvParser> {
448 RequestStorage& req_storage;
451 explicit SendAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
452 void kernel(simgrid::xbt::ReplayAction& action) override
454 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
456 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
457 args.tag, Datatype::encode(args.datatype1)));
458 if (not TRACE_smpi_view_internals())
459 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
461 if (name == "send") {
462 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
463 } else if (name == "Isend") {
464 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
465 req_storage.add(request);
467 xbt_die("Don't know this action, %s", name.c_str());
470 TRACE_smpi_comm_out(my_proc_id);
474 class RecvAction : public ReplayAction<SendRecvParser> {
476 RequestStorage& req_storage;
479 explicit RecvAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
480 void kernel(simgrid::xbt::ReplayAction& action) override
482 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
484 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
485 args.tag, Datatype::encode(args.datatype1)));
488 // unknown size from the receiver point of view
489 if (args.size <= 0.0) {
490 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
491 args.size = status.count;
494 if (name == "recv") {
495 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
496 } else if (name == "Irecv") {
497 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
498 req_storage.add(request);
501 TRACE_smpi_comm_out(my_proc_id);
502 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
503 if (name == "recv" && not TRACE_smpi_view_internals()) {
504 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
509 class ComputeAction : public ReplayAction<ComputeParser> {
511 ComputeAction() : ReplayAction("compute") {}
512 void kernel(simgrid::xbt::ReplayAction& action) override
514 TRACE_smpi_computing_in(my_proc_id, args.flops);
515 smpi_execute_flops(args.flops);
516 TRACE_smpi_computing_out(my_proc_id);
520 class TestAction : public ReplayAction<WaitTestParser> {
522 RequestStorage& req_storage;
525 explicit TestAction(RequestStorage& storage) : ReplayAction("Test"), req_storage(storage) {}
526 void kernel(simgrid::xbt::ReplayAction& action) override
528 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
529 req_storage.remove(request);
530 // if request is null here, this may mean that a previous test has succeeded
531 // Different times in traced application and replayed version may lead to this
532 // In this case, ignore the extra calls.
533 if (request != MPI_REQUEST_NULL) {
534 TRACE_smpi_testing_in(my_proc_id);
537 int flag = Request::test(&request, &status);
539 XBT_DEBUG("MPI_Test result: %d", flag);
540 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
542 if (request == MPI_REQUEST_NULL)
543 req_storage.addNullRequest(args.src, args.dst, args.tag);
545 req_storage.add(request);
547 TRACE_smpi_testing_out(my_proc_id);
552 class InitAction : public ReplayAction<ActionArgParser> {
554 InitAction() : ReplayAction("Init") {}
555 void kernel(simgrid::xbt::ReplayAction& action) override
557 CHECK_ACTION_PARAMS(action, 0, 1)
558 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
559 : MPI_BYTE; // default TAU datatype
561 /* start a simulated timer */
562 smpi_process()->simulated_start();
566 class CommunicatorAction : public ReplayAction<ActionArgParser> {
568 CommunicatorAction() : ReplayAction("Comm") {}
569 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
572 class WaitAllAction : public ReplayAction<ActionArgParser> {
574 RequestStorage& req_storage;
577 explicit WaitAllAction(RequestStorage& storage) : ReplayAction("waitAll"), req_storage(storage) {}
578 void kernel(simgrid::xbt::ReplayAction& action) override
580 const unsigned int count_requests = req_storage.size();
582 if (count_requests > 0) {
583 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
584 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
585 std::vector<MPI_Request> reqs;
586 req_storage.get_requests(reqs);
587 for (const auto& req : reqs) {
588 if (req && (req->flags() & RECV)) {
589 sender_receiver.push_back({req->src(), req->dst()});
592 MPI_Status status[count_requests];
593 Request::waitall(count_requests, &(reqs.data())[0], status);
594 req_storage.get_store().clear();
596 for (auto& pair : sender_receiver) {
597 TRACE_smpi_recv(pair.first, pair.second, 0);
599 TRACE_smpi_comm_out(my_proc_id);
604 class BarrierAction : public ReplayAction<ActionArgParser> {
606 BarrierAction() : ReplayAction("barrier") {}
607 void kernel(simgrid::xbt::ReplayAction& action) override
609 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
610 Colls::barrier(MPI_COMM_WORLD);
611 TRACE_smpi_comm_out(my_proc_id);
615 class BcastAction : public ReplayAction<BcastArgParser> {
617 BcastAction() : ReplayAction("bcast") {}
618 void kernel(simgrid::xbt::ReplayAction& action) override
620 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
621 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
622 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
624 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
626 TRACE_smpi_comm_out(my_proc_id);
630 class ReduceAction : public ReplayAction<ReduceArgParser> {
632 ReduceAction() : ReplayAction("reduce") {}
633 void kernel(simgrid::xbt::ReplayAction& action) override
635 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
636 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
637 args.comp_size, args.comm_size, -1,
638 Datatype::encode(args.datatype1), ""));
640 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
641 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
642 smpi_execute_flops(args.comp_size);
644 TRACE_smpi_comm_out(my_proc_id);
648 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
650 AllReduceAction() : ReplayAction("allReduce") {}
651 void kernel(simgrid::xbt::ReplayAction& action) override
653 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
654 Datatype::encode(args.datatype1), ""));
656 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
657 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
658 smpi_execute_flops(args.comp_size);
660 TRACE_smpi_comm_out(my_proc_id);
664 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
666 AllToAllAction() : ReplayAction("allToAll") {}
667 void kernel(simgrid::xbt::ReplayAction& action) override
669 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
670 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
671 Datatype::encode(args.datatype1),
672 Datatype::encode(args.datatype2)));
674 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
675 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
676 args.recv_size, args.datatype2, MPI_COMM_WORLD);
678 TRACE_smpi_comm_out(my_proc_id);
682 class GatherAction : public ReplayAction<GatherArgParser> {
684 explicit GatherAction(std::string name) : ReplayAction(name) {}
685 void kernel(simgrid::xbt::ReplayAction& action) override
687 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
688 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
690 if (name == "gather") {
691 int rank = MPI_COMM_WORLD->rank();
692 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
693 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
696 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
699 TRACE_smpi_comm_out(my_proc_id);
703 class GatherVAction : public ReplayAction<GatherVArgParser> {
705 explicit GatherVAction(std::string name) : ReplayAction(name) {}
706 void kernel(simgrid::xbt::ReplayAction& action) override
708 int rank = MPI_COMM_WORLD->rank();
710 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
711 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
712 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
714 if (name == "gatherV") {
715 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
716 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
717 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
720 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
721 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
722 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
725 TRACE_smpi_comm_out(my_proc_id);
729 class ScatterAction : public ReplayAction<ScatterArgParser> {
731 ScatterAction() : ReplayAction("scatter") {}
732 void kernel(simgrid::xbt::ReplayAction& action) override
734 int rank = MPI_COMM_WORLD->rank();
735 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
736 Datatype::encode(args.datatype1),
737 Datatype::encode(args.datatype2)));
739 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
740 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
742 TRACE_smpi_comm_out(my_proc_id);
747 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
749 ScatterVAction() : ReplayAction("scatterV") {}
750 void kernel(simgrid::xbt::ReplayAction& action) override
752 int rank = MPI_COMM_WORLD->rank();
753 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
754 nullptr, Datatype::encode(args.datatype1),
755 Datatype::encode(args.datatype2)));
757 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
758 args.sendcounts->data(), args.disps.data(), args.datatype1,
759 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
762 TRACE_smpi_comm_out(my_proc_id);
766 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
768 ReduceScatterAction() : ReplayAction("reduceScatter") {}
769 void kernel(simgrid::xbt::ReplayAction& action) override
771 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
772 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
773 std::to_string(args.comp_size), /* ugly hack to print comp_size */
774 Datatype::encode(args.datatype1)));
776 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
777 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
778 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
780 smpi_execute_flops(args.comp_size);
781 TRACE_smpi_comm_out(my_proc_id);
785 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
787 AllToAllVAction() : ReplayAction("allToAllV") {}
788 void kernel(simgrid::xbt::ReplayAction& action) override
790 TRACE_smpi_comm_in(my_proc_id, __func__,
791 new simgrid::instr::VarCollTIData(
792 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
793 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
795 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
796 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
798 TRACE_smpi_comm_out(my_proc_id);
801 } // Replay Namespace
802 }} // namespace simgrid::smpi
804 std::vector<simgrid::smpi::replay::RequestStorage> storage;
805 /** @brief Only initialize the replay, don't do it for real */
806 void smpi_replay_init(int* argc, char*** argv)
808 simgrid::smpi::Process::init(argc, argv);
809 smpi_process()->mark_as_initialized();
810 smpi_process()->set_replaying(true);
812 int my_proc_id = simgrid::s4u::this_actor::get_pid();
813 storage.resize(smpi_process_count());
815 TRACE_smpi_init(my_proc_id);
816 TRACE_smpi_computing_init(my_proc_id);
817 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
818 TRACE_smpi_comm_out(my_proc_id);
819 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
820 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
821 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
823 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
824 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
825 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
826 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
827 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
828 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
829 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
830 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
831 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
832 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
833 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
834 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
835 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
836 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
837 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
838 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
839 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
840 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
841 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
842 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
843 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
844 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
846 //if we have a delayed start, sleep here.
848 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
849 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
850 smpi_execute_flops(value);
852 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
853 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
854 smpi_execute_flops(0.0);
858 /** @brief actually run the replay after initialization */
859 void smpi_replay_main(int* argc, char*** argv)
861 static int active_processes = 0;
863 simgrid::xbt::replay_runner(*argc, *argv);
865 /* and now, finalize everything */
866 /* One active process will stop. Decrease the counter*/
867 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
868 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
869 if (count_requests > 0) {
870 MPI_Request requests[count_requests];
871 MPI_Status status[count_requests];
874 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
875 requests[i] = pair.second;
878 simgrid::smpi::Request::waitall(count_requests, requests, status);
882 if(active_processes==0){
883 /* Last process alive speaking: end the simulated timer */
884 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
885 smpi_free_replay_tmp_buffers();
888 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
889 new simgrid::instr::NoOpTIData("finalize"));
891 smpi_process()->finalize();
893 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
894 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
897 /** @brief chain a replay initialization and a replay start */
898 void smpi_replay_run(int* argc, char*** argv)
900 smpi_replay_init(argc, argv);
901 smpi_replay_main(argc, argv);