1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14 #include <simgrid/smpi/replay.hpp>
16 #include <boost/algorithm/string/join.hpp>
19 #include <unordered_map>
23 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
24 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
25 // this could go into a header file.
26 namespace hash_tuple {
27 template <typename TT> class hash {
29 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 // Recursive template code derived from Matthieu M.
38 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 static void apply(size_t& seed, Tuple const& tuple)
42 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
43 hash_combine(seed, std::get<Index>(tuple));
47 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 template <typename... TT> class hash<std::tuple<TT...>> {
54 size_t operator()(std::tuple<TT...> const& tt) const
57 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
63 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
65 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
66 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
69 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
71 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
72 std::string s = boost::algorithm::join(action, " ");
73 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
78 static double parse_double(std::string string)
80 return xbt_str_parse_double(string.c_str(), "%s is not a double");
87 MPI_Datatype MPI_DEFAULT_TYPE;
89 class RequestStorage {
100 req_storage_t& get_store()
105 void get_requests(std::vector<MPI_Request>& vec)
107 for (auto& pair : store) {
108 auto& req = pair.second;
109 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
110 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
111 vec.push_back(pair.second);
112 pair.second->print_request("MM");
117 MPI_Request find(int src, int dst, int tag)
119 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
120 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
123 void remove(MPI_Request req)
125 if (req == MPI_REQUEST_NULL) return;
127 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
130 void add(MPI_Request req)
132 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
133 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
136 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
137 void addNullRequest(int src, int dst, int tag)
139 store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
143 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
145 CHECK_ACTION_PARAMS(action, 3, 0)
146 src = std::stoi(action[2]);
147 dst = std::stoi(action[3]);
148 tag = std::stoi(action[4]);
151 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
153 CHECK_ACTION_PARAMS(action, 3, 1)
154 partner = std::stoi(action[2]);
155 tag = std::stoi(action[3]);
156 size = parse_double(action[4]);
157 if (action.size() > 5)
158 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
161 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
163 CHECK_ACTION_PARAMS(action, 1, 0)
164 flops = parse_double(action[2]);
167 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
169 CHECK_ACTION_PARAMS(action, 1, 2)
170 size = parse_double(action[2]);
171 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
172 if (action.size() > 4)
173 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
176 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
178 CHECK_ACTION_PARAMS(action, 2, 2)
179 comm_size = parse_double(action[2]);
180 comp_size = parse_double(action[3]);
181 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
182 if (action.size() > 5)
183 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
186 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
188 CHECK_ACTION_PARAMS(action, 2, 1)
189 comm_size = parse_double(action[2]);
190 comp_size = parse_double(action[3]);
191 if (action.size() > 4)
192 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
195 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
197 CHECK_ACTION_PARAMS(action, 2, 1)
198 comm_size = MPI_COMM_WORLD->size();
199 send_size = parse_double(action[2]);
200 recv_size = parse_double(action[3]);
202 if (action.size() > 4)
203 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
204 if (action.size() > 5)
205 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
208 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
210 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
213 1) 68 is the sendcounts
214 2) 68 is the recvcounts
215 3) 0 is the root node
216 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
217 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
219 CHECK_ACTION_PARAMS(action, 2, 3)
220 comm_size = MPI_COMM_WORLD->size();
221 send_size = parse_double(action[2]);
222 recv_size = parse_double(action[3]);
224 if (name == "gather") {
225 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
226 if (action.size() > 5)
227 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
228 if (action.size() > 6)
229 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
231 if (action.size() > 4)
232 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
233 if (action.size() > 5)
234 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
238 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
240 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
241 0 gather 68 68 10 10 10 0 0 0
243 1) 68 is the sendcount
244 2) 68 10 10 10 is the recvcounts
245 3) 0 is the root node
246 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
247 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
249 comm_size = MPI_COMM_WORLD->size();
250 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
251 send_size = parse_double(action[2]);
252 disps = std::vector<int>(comm_size, 0);
253 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
255 if (name == "gatherV") {
256 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
257 if (action.size() > 4 + comm_size)
258 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
259 if (action.size() > 5 + comm_size)
260 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
262 int datatype_index = 0;
264 /* The 3 comes from "0 gather <sendcount>", which must always be present.
265 * The + comm_size is the recvcounts array, which must also be present
267 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
268 datatype_index = 3 + comm_size;
269 disp_index = datatype_index + 1;
270 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
271 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
272 } else if (action.size() >
273 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
274 disp_index = 3 + comm_size;
275 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
276 datatype_index = 3 + comm_size;
277 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
278 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
281 if (disp_index != 0) {
282 for (unsigned int i = 0; i < comm_size; i++)
283 disps[i] = std::stoi(action[disp_index + i]);
287 for (unsigned int i = 0; i < comm_size; i++) {
288 (*recvcounts)[i] = std::stoi(action[i + 3]);
290 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
293 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
295 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
298 1) 68 is the sendcounts
299 2) 68 is the recvcounts
300 3) 0 is the root node
301 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
302 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
304 CHECK_ACTION_PARAMS(action, 2, 3)
305 comm_size = MPI_COMM_WORLD->size();
306 send_size = parse_double(action[2]);
307 recv_size = parse_double(action[3]);
308 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
309 if (action.size() > 5)
310 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
311 if (action.size() > 6)
312 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
315 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
317 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
318 0 gather 68 10 10 10 68 0 0 0
320 1) 68 10 10 10 is the sendcounts
321 2) 68 is the recvcount
322 3) 0 is the root node
323 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
324 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
326 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
327 recv_size = parse_double(action[2 + comm_size]);
328 disps = std::vector<int>(comm_size, 0);
329 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
331 if (action.size() > 5 + comm_size)
332 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
333 if (action.size() > 5 + comm_size)
334 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
336 for (unsigned int i = 0; i < comm_size; i++) {
337 (*sendcounts)[i] = std::stoi(action[i + 2]);
339 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
340 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
343 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
345 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
346 0 reduceScatter 275427 275427 275427 204020 11346849 0
348 1) The first four values after the name of the action declare the recvcounts array
349 2) The value 11346849 is the amount of instructions
350 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
352 comm_size = MPI_COMM_WORLD->size();
353 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
354 comp_size = parse_double(action[2 + comm_size]);
355 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
356 if (action.size() > 3 + comm_size)
357 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
359 for (unsigned int i = 0; i < comm_size; i++) {
360 recvcounts->push_back(std::stoi(action[i + 2]));
362 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
365 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
367 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
368 0 allToAllV 100 1 7 10 12 100 1 70 10 5
370 1) 100 is the size of the send buffer *sizeof(int),
371 2) 1 7 10 12 is the sendcounts array
372 3) 100*sizeof(int) is the size of the receiver buffer
373 4) 1 70 10 5 is the recvcounts array
375 comm_size = MPI_COMM_WORLD->size();
376 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
377 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
378 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379 senddisps = std::vector<int>(comm_size, 0);
380 recvdisps = std::vector<int>(comm_size, 0);
382 if (action.size() > 5 + 2 * comm_size)
383 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
384 if (action.size() > 5 + 2 * comm_size)
385 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
387 send_buf_size = parse_double(action[2]);
388 recv_buf_size = parse_double(action[3 + comm_size]);
389 for (unsigned int i = 0; i < comm_size; i++) {
390 (*sendcounts)[i] = std::stoi(action[3 + i]);
391 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
393 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
394 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
398 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
400 // Needs to be re-initialized for every action, hence here
401 double start_time = smpi_process()->simulated_elapsed();
402 args.parse(action, name);
405 log_timed_action(action, start_time);
408 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
410 std::string s = boost::algorithm::join(action, " ");
411 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
412 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413 req_storage.remove(request);
415 if (request == MPI_REQUEST_NULL) {
416 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
421 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
423 // Must be taken before Request::wait() since the request may be set to
424 // MPI_REQUEST_NULL by Request::wait!
425 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
426 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
427 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
430 Request::wait(&request, &status);
432 TRACE_smpi_comm_out(rank);
433 if (is_wait_for_receive)
434 TRACE_smpi_recv(args.src, args.dst, args.tag);
437 void SendAction::kernel(simgrid::xbt::ReplayAction& action)
439 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
441 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
442 args.tag, Datatype::encode(args.datatype1)));
443 if (not TRACE_smpi_view_internals())
444 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
446 if (name == "send") {
447 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
448 } else if (name == "Isend") {
449 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450 req_storage.add(request);
452 xbt_die("Don't know this action, %s", name.c_str());
455 TRACE_smpi_comm_out(my_proc_id);
458 void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
460 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
462 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
463 args.tag, Datatype::encode(args.datatype1)));
466 // unknown size from the receiver point of view
467 if (args.size <= 0.0) {
468 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
469 args.size = status.count;
472 if (name == "recv") {
473 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
474 } else if (name == "Irecv") {
475 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
476 req_storage.add(request);
479 TRACE_smpi_comm_out(my_proc_id);
480 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
481 if (name == "recv" && not TRACE_smpi_view_internals()) {
482 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
486 void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
488 TRACE_smpi_computing_in(my_proc_id, args.flops);
489 smpi_execute_flops(args.flops);
490 TRACE_smpi_computing_out(my_proc_id);
493 void TestAction::kernel(simgrid::xbt::ReplayAction& action)
495 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
496 req_storage.remove(request);
497 // if request is null here, this may mean that a previous test has succeeded
498 // Different times in traced application and replayed version may lead to this
499 // In this case, ignore the extra calls.
500 if (request != MPI_REQUEST_NULL) {
501 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
504 int flag = Request::test(&request, &status);
506 XBT_DEBUG("MPI_Test result: %d", flag);
507 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
509 if (request == MPI_REQUEST_NULL)
510 req_storage.addNullRequest(args.src, args.dst, args.tag);
512 req_storage.add(request);
514 TRACE_smpi_comm_out(my_proc_id);
518 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
520 CHECK_ACTION_PARAMS(action, 0, 1)
521 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
522 : MPI_BYTE; // default TAU datatype
524 /* start a simulated timer */
525 smpi_process()->simulated_start();
528 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
533 void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
535 const unsigned int count_requests = req_storage.size();
537 if (count_requests > 0) {
538 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
539 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
540 std::vector<MPI_Request> reqs;
541 req_storage.get_requests(reqs);
542 for (const auto& req : reqs) {
543 if (req && (req->flags() & MPI_REQ_RECV)) {
544 sender_receiver.push_back({req->src(), req->dst()});
547 MPI_Status status[count_requests];
548 Request::waitall(count_requests, &(reqs.data())[0], status);
549 req_storage.get_store().clear();
551 for (auto& pair : sender_receiver) {
552 TRACE_smpi_recv(pair.first, pair.second, 0);
554 TRACE_smpi_comm_out(my_proc_id);
558 void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
560 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
561 Colls::barrier(MPI_COMM_WORLD);
562 TRACE_smpi_comm_out(my_proc_id);
565 void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
567 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
568 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
569 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
571 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
573 TRACE_smpi_comm_out(my_proc_id);
576 void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
578 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
579 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
580 args.comp_size, args.comm_size, -1,
581 Datatype::encode(args.datatype1), ""));
583 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
584 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
585 smpi_execute_flops(args.comp_size);
587 TRACE_smpi_comm_out(my_proc_id);
590 void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
592 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
593 Datatype::encode(args.datatype1), ""));
595 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
596 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
597 smpi_execute_flops(args.comp_size);
599 TRACE_smpi_comm_out(my_proc_id);
602 void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
604 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
605 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
606 Datatype::encode(args.datatype1),
607 Datatype::encode(args.datatype2)));
609 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
610 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
611 args.recv_size, args.datatype2, MPI_COMM_WORLD);
613 TRACE_smpi_comm_out(my_proc_id);
616 void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
618 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
619 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
621 if (name == "gather") {
622 int rank = MPI_COMM_WORLD->rank();
623 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
624 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
627 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
628 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
630 TRACE_smpi_comm_out(my_proc_id);
633 void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
635 int rank = MPI_COMM_WORLD->rank();
637 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
638 name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
639 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
641 if (name == "gatherV") {
642 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
643 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
644 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
647 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
648 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
649 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
652 TRACE_smpi_comm_out(my_proc_id);
655 void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
657 int rank = MPI_COMM_WORLD->rank();
658 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
659 Datatype::encode(args.datatype1),
660 Datatype::encode(args.datatype2)));
662 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
663 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
665 TRACE_smpi_comm_out(my_proc_id);
668 void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
670 int rank = MPI_COMM_WORLD->rank();
671 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
672 nullptr, Datatype::encode(args.datatype1),
673 Datatype::encode(args.datatype2)));
675 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
676 args.sendcounts->data(), args.disps.data(), args.datatype1,
677 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
680 TRACE_smpi_comm_out(my_proc_id);
683 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
685 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
686 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
687 std::to_string(args.comp_size), /* ugly hack to print comp_size */
688 Datatype::encode(args.datatype1)));
690 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
691 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
692 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
694 smpi_execute_flops(args.comp_size);
695 TRACE_smpi_comm_out(my_proc_id);
698 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
700 TRACE_smpi_comm_in(my_proc_id, __func__,
701 new simgrid::instr::VarCollTIData(
702 "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
703 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
705 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
706 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
708 TRACE_smpi_comm_out(my_proc_id);
710 } // Replay Namespace
711 }} // namespace simgrid::smpi
713 std::vector<simgrid::smpi::replay::RequestStorage> storage;
714 /** @brief Only initialize the replay, don't do it for real */
715 void smpi_replay_init(int* argc, char*** argv)
717 simgrid::smpi::Process::init(argc, argv);
718 smpi_process()->mark_as_initialized();
719 smpi_process()->set_replaying(true);
721 int my_proc_id = simgrid::s4u::this_actor::get_pid();
722 storage.resize(smpi_process_count());
724 TRACE_smpi_init(my_proc_id);
725 TRACE_smpi_computing_init(my_proc_id);
726 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
727 TRACE_smpi_comm_out(my_proc_id);
728 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
729 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
730 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
731 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
732 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
733 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
734 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
735 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
736 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
737 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
738 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
739 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
740 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
741 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
742 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
743 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
744 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
745 xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
746 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
747 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
748 xbt_replay_action_register("gatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
749 xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
750 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
751 xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
752 xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
753 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
755 //if we have a delayed start, sleep here.
757 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
758 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
759 smpi_execute_flops(value);
761 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
762 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
763 smpi_execute_flops(0.0);
767 /** @brief actually run the replay after initialization */
768 void smpi_replay_main(int* argc, char*** argv)
770 static int active_processes = 0;
772 simgrid::xbt::replay_runner(*argc, *argv);
774 /* and now, finalize everything */
775 /* One active process will stop. Decrease the counter*/
776 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
777 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
778 if (count_requests > 0) {
779 MPI_Request requests[count_requests];
780 MPI_Status status[count_requests];
783 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
784 requests[i] = pair.second;
787 simgrid::smpi::Request::waitall(count_requests, requests, status);
791 if(active_processes==0){
792 /* Last process alive speaking: end the simulated timer */
793 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
794 smpi_free_replay_tmp_buffers();
797 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
798 new simgrid::instr::NoOpTIData("finalize"));
800 smpi_process()->finalize();
802 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
803 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
806 /** @brief chain a replay initialization and a replay start */
807 void smpi_replay_run(int* argc, char*** argv)
809 smpi_replay_init(argc, argv);
810 smpi_replay_main(argc, argv);