1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(std::string string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(
138 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140 tag), MPI_REQUEST_NULL});
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
146 CHECK_ACTION_PARAMS(action, 3, 0)
147 src = std::stoi(action[2]);
148 dst = std::stoi(action[3]);
149 tag = std::stoi(action[4]);
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
154 CHECK_ACTION_PARAMS(action, 3, 1)
155 partner = std::stoi(action[2]);
156 tag = std::stoi(action[3]);
157 size = parse_double(action[4]);
158 if (action.size() > 5)
159 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
170 CHECK_ACTION_PARAMS(action, 1, 2)
171 size = parse_double(action[2]);
172 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173 if (action.size() > 4)
174 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
179 CHECK_ACTION_PARAMS(action, 2, 2)
180 comm_size = parse_double(action[2]);
181 comp_size = parse_double(action[3]);
182 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
183 if (action.size() > 5)
184 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
189 CHECK_ACTION_PARAMS(action, 2, 1)
190 comm_size = parse_double(action[2]);
191 comp_size = parse_double(action[3]);
192 if (action.size() > 4)
193 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
198 CHECK_ACTION_PARAMS(action, 2, 1)
199 comm_size = MPI_COMM_WORLD->size();
200 send_size = parse_double(action[2]);
201 recv_size = parse_double(action[3]);
203 if (action.size() > 4)
204 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205 if (action.size() > 5)
206 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
211 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
214 1) 68 is the sendcounts
215 2) 68 is the recvcounts
216 3) 0 is the root node
217 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
220 CHECK_ACTION_PARAMS(action, 2, 3)
221 comm_size = MPI_COMM_WORLD->size();
222 send_size = parse_double(action[2]);
223 recv_size = parse_double(action[3]);
225 if (name == "gather") {
226 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
227 if (action.size() > 5)
228 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229 if (action.size() > 6)
230 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
232 if (action.size() > 4)
233 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
234 if (action.size() > 5)
235 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
239 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
241 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
242 0 gather 68 68 10 10 10 0 0 0
244 1) 68 is the sendcount
245 2) 68 10 10 10 is the recvcounts
246 3) 0 is the root node
247 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
250 comm_size = MPI_COMM_WORLD->size();
251 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
252 send_size = parse_double(action[2]);
253 disps = std::vector<int>(comm_size, 0);
254 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
256 if (name == "gatherv") {
257 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
258 if (action.size() > 4 + comm_size)
259 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
260 if (action.size() > 5 + comm_size)
261 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
263 int datatype_index = 0;
265 /* The 3 comes from "0 gather <sendcount>", which must always be present.
266 * The + comm_size is the recvcounts array, which must also be present
268 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
269 datatype_index = 3 + comm_size;
270 disp_index = datatype_index + 1;
271 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
272 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
273 } else if (action.size() >
274 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
275 disp_index = 3 + comm_size;
276 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
277 datatype_index = 3 + comm_size;
278 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
279 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
282 if (disp_index != 0) {
283 for (unsigned int i = 0; i < comm_size; i++)
284 disps[i] = std::stoi(action[disp_index + i]);
288 for (unsigned int i = 0; i < comm_size; i++) {
289 (*recvcounts)[i] = std::stoi(action[i + 3]);
291 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
294 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
296 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
299 1) 68 is the sendcounts
300 2) 68 is the recvcounts
301 3) 0 is the root node
302 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305 CHECK_ACTION_PARAMS(action, 2, 3)
306 comm_size = MPI_COMM_WORLD->size();
307 send_size = parse_double(action[2]);
308 recv_size = parse_double(action[3]);
309 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
310 if (action.size() > 5)
311 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
312 if (action.size() > 6)
313 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
316 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
318 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
319 0 gather 68 10 10 10 68 0 0 0
321 1) 68 10 10 10 is the sendcounts
322 2) 68 is the recvcount
323 3) 0 is the root node
324 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
327 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
328 recv_size = parse_double(action[2 + comm_size]);
329 disps = std::vector<int>(comm_size, 0);
330 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
332 if (action.size() > 5 + comm_size)
333 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
334 if (action.size() > 5 + comm_size)
335 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
337 for (unsigned int i = 0; i < comm_size; i++) {
338 (*sendcounts)[i] = std::stoi(action[i + 2]);
340 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
341 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
344 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
346 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
347 0 reducescatter 275427 275427 275427 204020 11346849 0
349 1) The first four values after the name of the action declare the recvcounts array
350 2) The value 11346849 is the amount of instructions
351 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
353 comm_size = MPI_COMM_WORLD->size();
354 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
355 comp_size = parse_double(action[2 + comm_size]);
356 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
357 if (action.size() > 3 + comm_size)
358 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
360 for (unsigned int i = 0; i < comm_size; i++) {
361 recvcounts->push_back(std::stoi(action[i + 2]));
363 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
366 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
368 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
369 0 alltoallv 100 1 7 10 12 100 1 70 10 5
371 1) 100 is the size of the send buffer *sizeof(int),
372 2) 1 7 10 12 is the sendcounts array
373 3) 100*sizeof(int) is the size of the receiver buffer
374 4) 1 70 10 5 is the recvcounts array
376 comm_size = MPI_COMM_WORLD->size();
377 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
378 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380 senddisps = std::vector<int>(comm_size, 0);
381 recvdisps = std::vector<int>(comm_size, 0);
383 if (action.size() > 5 + 2 * comm_size)
384 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
385 if (action.size() > 5 + 2 * comm_size)
386 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
388 send_buf_size = parse_double(action[2]);
389 recv_buf_size = parse_double(action[3 + comm_size]);
390 for (unsigned int i = 0; i < comm_size; i++) {
391 (*sendcounts)[i] = std::stoi(action[3 + i]);
392 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
394 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
395 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
398 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
400 std::string s = boost::algorithm::join(action, " ");
401 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
402 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
403 req_storage.remove(request);
405 if (request == MPI_REQUEST_NULL) {
406 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
411 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
413 // Must be taken before Request::wait() since the request may be set to
414 // MPI_REQUEST_NULL by Request::wait!
415 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
416 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
417 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
420 Request::wait(&request, &status);
422 TRACE_smpi_comm_out(rank);
423 if (is_wait_for_receive)
424 TRACE_smpi_recv(args.src, args.dst, args.tag);
427 void SendAction::kernel(simgrid::xbt::ReplayAction& action)
429 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
431 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
432 args.tag, Datatype::encode(args.datatype1)));
433 if (not TRACE_smpi_view_internals())
434 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
436 if (name == "send") {
437 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
438 } else if (name == "isend") {
439 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
440 req_storage.add(request);
442 xbt_die("Don't know this action, %s", name.c_str());
445 TRACE_smpi_comm_out(my_proc_id);
448 void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
450 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
452 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
453 args.tag, Datatype::encode(args.datatype1)));
456 // unknown size from the receiver point of view
457 if (args.size <= 0.0) {
458 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
459 args.size = status.count;
462 if (name == "recv") {
463 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
464 } else if (name == "irecv") {
465 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
466 req_storage.add(request);
469 TRACE_smpi_comm_out(my_proc_id);
470 // TODO: Check why this was only activated in the "recv" case and not in the "irecv" case
471 if (name == "recv" && not TRACE_smpi_view_internals()) {
472 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
476 void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
478 TRACE_smpi_computing_in(my_proc_id, args.flops);
479 smpi_execute_flops(args.flops);
480 TRACE_smpi_computing_out(my_proc_id);
483 void TestAction::kernel(simgrid::xbt::ReplayAction& action)
485 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
486 req_storage.remove(request);
487 // if request is null here, this may mean that a previous test has succeeded
488 // Different times in traced application and replayed version may lead to this
489 // In this case, ignore the extra calls.
490 if (request != MPI_REQUEST_NULL) {
491 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
494 int flag = Request::test(&request, &status);
496 XBT_DEBUG("MPI_Test result: %d", flag);
497 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
499 if (request == MPI_REQUEST_NULL)
500 req_storage.addNullRequest(args.src, args.dst, args.tag);
502 req_storage.add(request);
504 TRACE_smpi_comm_out(my_proc_id);
508 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
510 CHECK_ACTION_PARAMS(action, 0, 1)
511 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
512 : MPI_BYTE; // default TAU datatype
514 /* start a simulated timer */
515 smpi_process()->simulated_start();
518 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
523 void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
525 const unsigned int count_requests = req_storage.size();
527 if (count_requests > 0) {
528 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
529 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
530 std::vector<MPI_Request> reqs;
531 req_storage.get_requests(reqs);
532 for (const auto& req : reqs) {
533 if (req && (req->flags() & MPI_REQ_RECV)) {
534 sender_receiver.push_back({req->src(), req->dst()});
537 MPI_Status status[count_requests];
538 Request::waitall(count_requests, &(reqs.data())[0], status);
539 req_storage.get_store().clear();
541 for (auto& pair : sender_receiver) {
542 TRACE_smpi_recv(pair.first, pair.second, 0);
544 TRACE_smpi_comm_out(my_proc_id);
548 void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
550 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
551 Colls::barrier(MPI_COMM_WORLD);
552 TRACE_smpi_comm_out(my_proc_id);
555 void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
557 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
558 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
559 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
561 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
563 TRACE_smpi_comm_out(my_proc_id);
566 void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
568 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
569 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
570 args.comp_size, args.comm_size, -1,
571 Datatype::encode(args.datatype1), ""));
573 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
574 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
575 smpi_execute_flops(args.comp_size);
577 TRACE_smpi_comm_out(my_proc_id);
580 void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
582 TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
583 Datatype::encode(args.datatype1), ""));
585 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
586 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
587 smpi_execute_flops(args.comp_size);
589 TRACE_smpi_comm_out(my_proc_id);
592 void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
594 TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
595 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
596 Datatype::encode(args.datatype1),
597 Datatype::encode(args.datatype2)));
599 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
600 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
601 args.recv_size, args.datatype2, MPI_COMM_WORLD);
603 TRACE_smpi_comm_out(my_proc_id);
606 void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
608 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
609 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
611 if (name == "gather") {
612 int rank = MPI_COMM_WORLD->rank();
613 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
614 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
617 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
618 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
620 TRACE_smpi_comm_out(my_proc_id);
623 void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
625 int rank = MPI_COMM_WORLD->rank();
627 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
628 name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
629 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
631 if (name == "gatherv") {
632 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
633 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
634 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
637 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
638 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
639 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
642 TRACE_smpi_comm_out(my_proc_id);
645 void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
647 int rank = MPI_COMM_WORLD->rank();
648 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
649 Datatype::encode(args.datatype1),
650 Datatype::encode(args.datatype2)));
652 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
653 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
655 TRACE_smpi_comm_out(my_proc_id);
658 void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
660 int rank = MPI_COMM_WORLD->rank();
661 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
662 nullptr, Datatype::encode(args.datatype1),
663 Datatype::encode(args.datatype2)));
665 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
666 args.sendcounts->data(), args.disps.data(), args.datatype1,
667 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
670 TRACE_smpi_comm_out(my_proc_id);
673 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
675 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
676 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
677 std::to_string(args.comp_size), /* ugly hack to print comp_size */
678 Datatype::encode(args.datatype1)));
680 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
681 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
682 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
684 smpi_execute_flops(args.comp_size);
685 TRACE_smpi_comm_out(my_proc_id);
688 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
690 TRACE_smpi_comm_in(my_proc_id, __func__,
691 new simgrid::instr::VarCollTIData(
692 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
693 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
695 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
696 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
698 TRACE_smpi_comm_out(my_proc_id);
700 } // Replay Namespace
701 }} // namespace simgrid::smpi
703 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
704 /** @brief Only initialize the replay, don't do it for real */
705 void smpi_replay_init(int* argc, char*** argv)
707 if (not smpi_process()->initializing()){
708 simgrid::smpi::ActorExt::init(argc, argv);
710 smpi_process()->mark_as_initialized();
711 smpi_process()->set_replaying(true);
713 int my_proc_id = simgrid::s4u::this_actor::get_pid();
715 TRACE_smpi_init(my_proc_id);
716 TRACE_smpi_computing_init(my_proc_id);
717 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
718 TRACE_smpi_comm_out(my_proc_id);
719 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
720 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
721 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
722 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
723 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
724 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
725 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
726 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
727 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
728 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
729 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
730 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
731 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
732 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
733 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
734 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
735 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
736 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
737 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
738 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
739 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
740 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
741 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
742 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
743 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
744 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
746 //if we have a delayed start, sleep here.
748 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
749 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
750 smpi_execute_flops(value);
752 // Wait for the other actors to initialize also
753 simgrid::s4u::this_actor::yield();
757 /** @brief actually run the replay after initialization */
758 void smpi_replay_main(int* argc, char*** argv)
760 static int active_processes = 0;
762 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
763 simgrid::xbt::replay_runner(*argc, *argv);
765 /* and now, finalize everything */
766 /* One active process will stop. Decrease the counter*/
767 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
768 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
769 if (count_requests > 0) {
770 MPI_Request requests[count_requests];
771 MPI_Status status[count_requests];
774 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
775 requests[i] = pair.second;
778 simgrid::smpi::Request::waitall(count_requests, requests, status);
782 if(active_processes==0){
783 /* Last process alive speaking: end the simulated timer */
784 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
785 smpi_free_replay_tmp_buffers();
788 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
789 new simgrid::instr::NoOpTIData("finalize"));
791 smpi_process()->finalize();
793 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
794 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
797 /** @brief chain a replay initialization and a replay start */
798 void smpi_replay_run(int* argc, char*** argv)
800 smpi_replay_init(argc, argv);
801 smpi_replay_main(argc, argv);