1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(std::string string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(
138 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140 tag), MPI_REQUEST_NULL});
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
146 CHECK_ACTION_PARAMS(action, 3, 0)
147 src = std::stoi(action[2]);
148 dst = std::stoi(action[3]);
149 tag = std::stoi(action[4]);
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
154 CHECK_ACTION_PARAMS(action, 3, 1)
155 partner = std::stoi(action[2]);
156 tag = std::stoi(action[3]);
157 size = parse_double(action[4]);
158 if (action.size() > 5)
159 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
170 CHECK_ACTION_PARAMS(action, 1, 2)
171 size = parse_double(action[2]);
172 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173 if (action.size() > 4)
174 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
179 CHECK_ACTION_PARAMS(action, 2, 2)
180 comm_size = parse_double(action[2]);
181 comp_size = parse_double(action[3]);
182 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
183 if (action.size() > 5)
184 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
189 CHECK_ACTION_PARAMS(action, 2, 1)
190 comm_size = parse_double(action[2]);
191 comp_size = parse_double(action[3]);
192 if (action.size() > 4)
193 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
198 CHECK_ACTION_PARAMS(action, 2, 1)
199 comm_size = MPI_COMM_WORLD->size();
200 send_size = parse_double(action[2]);
201 recv_size = parse_double(action[3]);
203 if (action.size() > 4)
204 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205 if (action.size() > 5)
206 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
211 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
214 1) 68 is the sendcounts
215 2) 68 is the recvcounts
216 3) 0 is the root node
217 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
220 CHECK_ACTION_PARAMS(action, 2, 3)
221 comm_size = MPI_COMM_WORLD->size();
222 send_size = parse_double(action[2]);
223 recv_size = parse_double(action[3]);
225 if (name == "gather") {
226 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
227 if (action.size() > 5)
228 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229 if (action.size() > 6)
230 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
232 if (action.size() > 4)
233 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
234 if (action.size() > 5)
235 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
239 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
241 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
242 0 gather 68 68 10 10 10 0 0 0
244 1) 68 is the sendcount
245 2) 68 10 10 10 is the recvcounts
246 3) 0 is the root node
247 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
250 comm_size = MPI_COMM_WORLD->size();
251 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
252 send_size = parse_double(action[2]);
253 disps = std::vector<int>(comm_size, 0);
254 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
256 if (name == "gatherv") {
257 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
258 if (action.size() > 4 + comm_size)
259 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
260 if (action.size() > 5 + comm_size)
261 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
263 int datatype_index = 0;
265 /* The 3 comes from "0 gather <sendcount>", which must always be present.
266 * The + comm_size is the recvcounts array, which must also be present
268 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
269 datatype_index = 3 + comm_size;
270 disp_index = datatype_index + 1;
271 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
272 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
273 } else if (action.size() >
274 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
275 disp_index = 3 + comm_size;
276 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
277 datatype_index = 3 + comm_size;
278 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
279 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
282 if (disp_index != 0) {
283 for (unsigned int i = 0; i < comm_size; i++)
284 disps[i] = std::stoi(action[disp_index + i]);
288 for (unsigned int i = 0; i < comm_size; i++) {
289 (*recvcounts)[i] = std::stoi(action[i + 3]);
291 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
294 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
296 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
299 1) 68 is the sendcounts
300 2) 68 is the recvcounts
301 3) 0 is the root node
302 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305 CHECK_ACTION_PARAMS(action, 2, 3)
306 comm_size = MPI_COMM_WORLD->size();
307 send_size = parse_double(action[2]);
308 recv_size = parse_double(action[3]);
309 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
310 if (action.size() > 5)
311 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
312 if (action.size() > 6)
313 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
316 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
318 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
319 0 gather 68 10 10 10 68 0 0 0
321 1) 68 10 10 10 is the sendcounts
322 2) 68 is the recvcount
323 3) 0 is the root node
324 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
327 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
328 recv_size = parse_double(action[2 + comm_size]);
329 disps = std::vector<int>(comm_size, 0);
330 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
332 if (action.size() > 5 + comm_size)
333 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
334 if (action.size() > 5 + comm_size)
335 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
337 for (unsigned int i = 0; i < comm_size; i++) {
338 (*sendcounts)[i] = std::stoi(action[i + 2]);
340 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
341 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
344 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
346 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
347 0 reducescatter 275427 275427 275427 204020 11346849 0
349 1) The first four values after the name of the action declare the recvcounts array
350 2) The value 11346849 is the amount of instructions
351 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
353 comm_size = MPI_COMM_WORLD->size();
354 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
355 comp_size = parse_double(action[2 + comm_size]);
356 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
357 if (action.size() > 3 + comm_size)
358 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
360 for (unsigned int i = 0; i < comm_size; i++) {
361 recvcounts->push_back(std::stoi(action[i + 2]));
363 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
366 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
368 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
369 0 alltoallv 100 1 7 10 12 100 1 70 10 5
371 1) 100 is the size of the send buffer *sizeof(int),
372 2) 1 7 10 12 is the sendcounts array
373 3) 100*sizeof(int) is the size of the receiver buffer
374 4) 1 70 10 5 is the recvcounts array
376 comm_size = MPI_COMM_WORLD->size();
377 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
378 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380 senddisps = std::vector<int>(comm_size, 0);
381 recvdisps = std::vector<int>(comm_size, 0);
383 if (action.size() > 5 + 2 * comm_size)
384 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
385 if (action.size() > 5 + 2 * comm_size)
386 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
388 send_buf_size = parse_double(action[2]);
389 recv_buf_size = parse_double(action[3 + comm_size]);
390 for (unsigned int i = 0; i < comm_size; i++) {
391 (*sendcounts)[i] = std::stoi(action[3 + i]);
392 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
394 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
395 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
398 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
400 std::string s = boost::algorithm::join(action, " ");
401 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
402 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
403 req_storage.remove(request);
405 if (request == MPI_REQUEST_NULL) {
406 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
411 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
413 // Must be taken before Request::wait() since the request may be set to
414 // MPI_REQUEST_NULL by Request::wait!
415 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
416 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
417 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
420 Request::wait(&request, &status);
422 TRACE_smpi_comm_out(rank);
423 if (is_wait_for_receive)
424 TRACE_smpi_recv(args.src, args.dst, args.tag);
427 void SendAction::kernel(simgrid::xbt::ReplayAction& action)
429 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
431 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
432 args.tag, Datatype::encode(args.datatype1)));
433 if (not TRACE_smpi_view_internals())
434 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
436 if (name == "send") {
437 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
438 } else if (name == "isend") {
439 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
440 req_storage.add(request);
442 xbt_die("Don't know this action, %s", name.c_str());
445 TRACE_smpi_comm_out(my_proc_id);
448 void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
450 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
451 args.tag, Datatype::encode(args.datatype1)));
454 // unknown size from the receiver point of view
455 if (args.size <= 0.0) {
456 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
457 args.size = status.count;
460 if (name == "recv") {
461 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
462 } else if (name == "irecv") {
463 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
464 req_storage.add(request);
467 TRACE_smpi_comm_out(my_proc_id);
468 // TODO: Check why this was only activated in the "recv" case and not in the "irecv" case
469 if (name == "recv" && not TRACE_smpi_view_internals()) {
470 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
471 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
475 void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
477 TRACE_smpi_computing_in(my_proc_id, args.flops);
478 smpi_execute_flops(args.flops);
479 TRACE_smpi_computing_out(my_proc_id);
482 void TestAction::kernel(simgrid::xbt::ReplayAction& action)
484 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
485 req_storage.remove(request);
486 // if request is null here, this may mean that a previous test has succeeded
487 // Different times in traced application and replayed version may lead to this
488 // In this case, ignore the extra calls.
489 if (request != MPI_REQUEST_NULL) {
490 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
493 int flag = Request::test(&request, &status);
495 XBT_DEBUG("MPI_Test result: %d", flag);
496 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
498 if (request == MPI_REQUEST_NULL)
499 req_storage.addNullRequest(args.src, args.dst, args.tag);
501 req_storage.add(request);
503 TRACE_smpi_comm_out(my_proc_id);
507 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
509 CHECK_ACTION_PARAMS(action, 0, 1)
510 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
511 : MPI_BYTE; // default TAU datatype
513 /* start a simulated timer */
514 smpi_process()->simulated_start();
517 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
522 void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
524 const unsigned int count_requests = req_storage.size();
526 if (count_requests > 0) {
527 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
528 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
529 std::vector<MPI_Request> reqs;
530 req_storage.get_requests(reqs);
531 for (const auto& req : reqs) {
532 if (req && (req->flags() & MPI_REQ_RECV)) {
533 sender_receiver.push_back({req->src(), req->dst()});
536 MPI_Status status[count_requests];
537 Request::waitall(count_requests, &(reqs.data())[0], status);
538 req_storage.get_store().clear();
540 for (auto& pair : sender_receiver) {
541 TRACE_smpi_recv(pair.first, pair.second, 0);
543 TRACE_smpi_comm_out(my_proc_id);
547 void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
549 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
550 Colls::barrier(MPI_COMM_WORLD);
551 TRACE_smpi_comm_out(my_proc_id);
554 void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
556 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
557 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
558 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
560 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
562 TRACE_smpi_comm_out(my_proc_id);
565 void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
567 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
568 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
569 args.comp_size, args.comm_size, -1,
570 Datatype::encode(args.datatype1), ""));
572 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
573 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
574 smpi_execute_flops(args.comp_size);
576 TRACE_smpi_comm_out(my_proc_id);
579 void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
581 TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
582 Datatype::encode(args.datatype1), ""));
584 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
585 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
586 smpi_execute_flops(args.comp_size);
588 TRACE_smpi_comm_out(my_proc_id);
591 void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
593 TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
594 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
595 Datatype::encode(args.datatype1),
596 Datatype::encode(args.datatype2)));
598 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
599 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
600 args.recv_size, args.datatype2, MPI_COMM_WORLD);
602 TRACE_smpi_comm_out(my_proc_id);
605 void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
607 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
608 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
610 if (name == "gather") {
611 int rank = MPI_COMM_WORLD->rank();
612 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
613 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
616 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
617 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
619 TRACE_smpi_comm_out(my_proc_id);
622 void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
624 int rank = MPI_COMM_WORLD->rank();
626 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
627 name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
628 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
630 if (name == "gatherv") {
631 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
632 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
633 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
636 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
637 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
638 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
641 TRACE_smpi_comm_out(my_proc_id);
644 void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
646 int rank = MPI_COMM_WORLD->rank();
647 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
648 Datatype::encode(args.datatype1),
649 Datatype::encode(args.datatype2)));
651 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
652 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
654 TRACE_smpi_comm_out(my_proc_id);
657 void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
659 int rank = MPI_COMM_WORLD->rank();
660 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
661 nullptr, Datatype::encode(args.datatype1),
662 Datatype::encode(args.datatype2)));
664 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
665 args.sendcounts->data(), args.disps.data(), args.datatype1,
666 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
669 TRACE_smpi_comm_out(my_proc_id);
672 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
674 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
675 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
676 std::to_string(args.comp_size), /* ugly hack to print comp_size */
677 Datatype::encode(args.datatype1)));
679 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
680 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
681 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
683 smpi_execute_flops(args.comp_size);
684 TRACE_smpi_comm_out(my_proc_id);
687 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
689 TRACE_smpi_comm_in(my_proc_id, __func__,
690 new simgrid::instr::VarCollTIData(
691 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
692 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
694 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
695 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
697 TRACE_smpi_comm_out(my_proc_id);
699 } // Replay Namespace
700 }} // namespace simgrid::smpi
702 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
703 /** @brief Only initialize the replay, don't do it for real */
704 void smpi_replay_init(int* argc, char*** argv)
706 if (not smpi_process()->initializing()){
707 simgrid::smpi::ActorExt::init(argc, argv);
709 smpi_process()->mark_as_initialized();
710 smpi_process()->set_replaying(true);
712 int my_proc_id = simgrid::s4u::this_actor::get_pid();
714 TRACE_smpi_init(my_proc_id);
715 TRACE_smpi_computing_init(my_proc_id);
716 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
717 TRACE_smpi_comm_out(my_proc_id);
718 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
719 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
720 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
721 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
722 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
723 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
724 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
725 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
726 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
727 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
728 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
729 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
730 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
731 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
732 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
733 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
734 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
735 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
736 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
737 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
738 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
739 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
740 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
741 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
742 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
743 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
745 //if we have a delayed start, sleep here.
747 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
748 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
749 smpi_execute_flops(value);
751 // Wait for the other actors to initialize also
752 simgrid::s4u::this_actor::yield();
756 /** @brief actually run the replay after initialization */
757 void smpi_replay_main(int* argc, char*** argv)
759 static int active_processes = 0;
761 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
762 simgrid::xbt::replay_runner(*argc, *argv);
764 /* and now, finalize everything */
765 /* One active process will stop. Decrease the counter*/
766 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
767 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
768 if (count_requests > 0) {
769 MPI_Request requests[count_requests];
770 MPI_Status status[count_requests];
773 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
774 requests[i] = pair.second;
777 simgrid::smpi::Request::waitall(count_requests, requests, status);
781 if(active_processes==0){
782 /* Last process alive speaking: end the simulated timer */
783 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
784 smpi_free_replay_tmp_buffers();
787 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
788 new simgrid::instr::NoOpTIData("finalize"));
790 smpi_process()->finalize();
792 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
793 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
796 /** @brief chain a replay initialization and a replay start */
797 void smpi_replay_run(int* argc, char*** argv)
799 smpi_replay_init(argc, argv);
800 smpi_replay_main(argc, argv);