1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(
138 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140 tag), MPI_REQUEST_NULL});
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 CHECK_ACTION_PARAMS(action, 3, 0)
147 src = std::stoi(action[2]);
148 dst = std::stoi(action[3]);
149 tag = std::stoi(action[4]);
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
154 CHECK_ACTION_PARAMS(action, 3, 1)
155 partner = std::stoi(action[2]);
156 tag = std::stoi(action[3]);
157 size = parse_double(action[4]);
158 if (action.size() > 5)
159 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 CHECK_ACTION_PARAMS(action, 2, 0)
171 filename = std::string(action[2]);
172 line = std::stoi(action[3]);
175 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
177 CHECK_ACTION_PARAMS(action, 1, 2)
178 size = parse_double(action[2]);
179 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
180 if (action.size() > 4)
181 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
184 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
186 CHECK_ACTION_PARAMS(action, 2, 2)
187 comm_size = parse_double(action[2]);
188 comp_size = parse_double(action[3]);
189 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
190 if (action.size() > 5)
191 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
194 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
196 CHECK_ACTION_PARAMS(action, 2, 1)
197 comm_size = parse_double(action[2]);
198 comp_size = parse_double(action[3]);
199 if (action.size() > 4)
200 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
203 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
205 CHECK_ACTION_PARAMS(action, 2, 1)
206 comm_size = MPI_COMM_WORLD->size();
207 send_size = parse_double(action[2]);
208 recv_size = parse_double(action[3]);
210 if (action.size() > 4)
211 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
212 if (action.size() > 5)
213 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
216 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
218 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
221 1) 68 is the sendcounts
222 2) 68 is the recvcounts
223 3) 0 is the root node
224 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
225 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
227 CHECK_ACTION_PARAMS(action, 2, 3)
228 comm_size = MPI_COMM_WORLD->size();
229 send_size = parse_double(action[2]);
230 recv_size = parse_double(action[3]);
232 if (name == "gather") {
233 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
234 if (action.size() > 5)
235 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
236 if (action.size() > 6)
237 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
239 if (action.size() > 4)
240 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
241 if (action.size() > 5)
242 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
246 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
248 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
249 0 gather 68 68 10 10 10 0 0 0
251 1) 68 is the sendcount
252 2) 68 10 10 10 is the recvcounts
253 3) 0 is the root node
254 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
255 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
257 comm_size = MPI_COMM_WORLD->size();
258 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
259 send_size = parse_double(action[2]);
260 disps = std::vector<int>(comm_size, 0);
261 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
263 if (name == "gatherv") {
264 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
265 if (action.size() > 4 + comm_size)
266 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
267 if (action.size() > 5 + comm_size)
268 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
270 int datatype_index = 0;
272 /* The 3 comes from "0 gather <sendcount>", which must always be present.
273 * The + comm_size is the recvcounts array, which must also be present
275 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
276 datatype_index = 3 + comm_size;
277 disp_index = datatype_index + 1;
278 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
279 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
280 } else if (action.size() >
281 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
282 disp_index = 3 + comm_size;
283 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
284 datatype_index = 3 + comm_size;
285 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
286 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
289 if (disp_index != 0) {
290 for (unsigned int i = 0; i < comm_size; i++)
291 disps[i] = std::stoi(action[disp_index + i]);
295 for (unsigned int i = 0; i < comm_size; i++) {
296 (*recvcounts)[i] = std::stoi(action[i + 3]);
298 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
301 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
303 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
306 1) 68 is the sendcounts
307 2) 68 is the recvcounts
308 3) 0 is the root node
309 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
310 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
312 CHECK_ACTION_PARAMS(action, 2, 3)
313 comm_size = MPI_COMM_WORLD->size();
314 send_size = parse_double(action[2]);
315 recv_size = parse_double(action[3]);
316 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
317 if (action.size() > 5)
318 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
319 if (action.size() > 6)
320 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
323 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
325 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
326 0 gather 68 10 10 10 68 0 0 0
328 1) 68 10 10 10 is the sendcounts
329 2) 68 is the recvcount
330 3) 0 is the root node
331 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
332 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
334 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
335 recv_size = parse_double(action[2 + comm_size]);
336 disps = std::vector<int>(comm_size, 0);
337 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
339 if (action.size() > 5 + comm_size)
340 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
341 if (action.size() > 5 + comm_size)
342 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
344 for (unsigned int i = 0; i < comm_size; i++) {
345 (*sendcounts)[i] = std::stoi(action[i + 2]);
347 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
348 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
351 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
353 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
354 0 reducescatter 275427 275427 275427 204020 11346849 0
356 1) The first four values after the name of the action declare the recvcounts array
357 2) The value 11346849 is the amount of instructions
358 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
360 comm_size = MPI_COMM_WORLD->size();
361 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
362 comp_size = parse_double(action[2 + comm_size]);
363 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
364 if (action.size() > 3 + comm_size)
365 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
367 for (unsigned int i = 0; i < comm_size; i++) {
368 recvcounts->push_back(std::stoi(action[i + 2]));
370 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
373 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
375 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
376 0 alltoallv 100 1 7 10 12 100 1 70 10 5
378 1) 100 is the size of the send buffer *sizeof(int),
379 2) 1 7 10 12 is the sendcounts array
380 3) 100*sizeof(int) is the size of the receiver buffer
381 4) 1 70 10 5 is the recvcounts array
383 comm_size = MPI_COMM_WORLD->size();
384 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
385 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
386 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
387 senddisps = std::vector<int>(comm_size, 0);
388 recvdisps = std::vector<int>(comm_size, 0);
390 if (action.size() > 5 + 2 * comm_size)
391 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
392 if (action.size() > 5 + 2 * comm_size)
393 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
395 send_buf_size = parse_double(action[2]);
396 recv_buf_size = parse_double(action[3 + comm_size]);
397 for (unsigned int i = 0; i < comm_size; i++) {
398 (*sendcounts)[i] = std::stoi(action[3 + i]);
399 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
401 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
402 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
405 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
407 std::string s = boost::algorithm::join(action, " ");
408 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
409 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
410 req_storage.remove(request);
412 if (request == MPI_REQUEST_NULL) {
413 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
418 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
420 // Must be taken before Request::wait() since the request may be set to
421 // MPI_REQUEST_NULL by Request::wait!
422 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
423 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
424 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
427 Request::wait(&request, &status);
429 TRACE_smpi_comm_out(rank);
430 if (is_wait_for_receive)
431 TRACE_smpi_recv(args.src, args.dst, args.tag);
434 void SendAction::kernel(simgrid::xbt::ReplayAction&)
436 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
438 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
439 args.tag, Datatype::encode(args.datatype1)));
440 if (not TRACE_smpi_view_internals())
441 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
443 if (name == "send") {
444 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
445 } else if (name == "isend") {
446 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447 req_storage.add(request);
449 xbt_die("Don't know this action, %s", name.c_str());
452 TRACE_smpi_comm_out(my_proc_id);
455 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
457 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
458 args.tag, Datatype::encode(args.datatype1)));
461 // unknown size from the receiver point of view
462 if (args.size <= 0.0) {
463 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
464 args.size = status.count;
467 bool is_recv = false; // Help analyzers understanding that status is not used unintialized
468 if (name == "recv") {
470 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
471 } else if (name == "irecv") {
472 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
473 req_storage.add(request);
478 TRACE_smpi_comm_out(my_proc_id);
479 if (is_recv && not TRACE_smpi_view_internals()) {
480 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
481 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
485 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
487 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
490 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
492 smpi_trace_set_call_location(args.filename.c_str(), args.line);
495 void TestAction::kernel(simgrid::xbt::ReplayAction&)
497 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
498 req_storage.remove(request);
499 // if request is null here, this may mean that a previous test has succeeded
500 // Different times in traced application and replayed version may lead to this
501 // In this case, ignore the extra calls.
502 if (request != MPI_REQUEST_NULL) {
503 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
507 Request::test(&request, &status, &flag);
509 XBT_DEBUG("MPI_Test result: %d", flag);
510 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
512 if (request == MPI_REQUEST_NULL)
513 req_storage.addNullRequest(args.src, args.dst, args.tag);
515 req_storage.add(request);
517 TRACE_smpi_comm_out(my_proc_id);
521 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
523 CHECK_ACTION_PARAMS(action, 0, 1)
524 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
525 : MPI_BYTE; // default TAU datatype
527 /* start a simulated timer */
528 smpi_process()->simulated_start();
531 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
536 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
538 const unsigned int count_requests = req_storage.size();
540 if (count_requests > 0) {
541 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
542 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
543 std::vector<MPI_Request> reqs;
544 req_storage.get_requests(reqs);
545 for (const auto& req : reqs) {
546 if (req && (req->flags() & MPI_REQ_RECV)) {
547 sender_receiver.push_back({req->src(), req->dst()});
550 MPI_Status status[count_requests];
551 Request::waitall(count_requests, &(reqs.data())[0], status);
552 req_storage.get_store().clear();
554 for (auto& pair : sender_receiver) {
555 TRACE_smpi_recv(pair.first, pair.second, 0);
557 TRACE_smpi_comm_out(my_proc_id);
561 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
563 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
564 Colls::barrier(MPI_COMM_WORLD);
565 TRACE_smpi_comm_out(my_proc_id);
568 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
570 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
571 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
572 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
574 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
576 TRACE_smpi_comm_out(my_proc_id);
579 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
581 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
582 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
583 args.comp_size, args.comm_size, -1,
584 Datatype::encode(args.datatype1), ""));
586 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
587 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
588 private_execute_flops(args.comp_size);
590 TRACE_smpi_comm_out(my_proc_id);
593 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
595 TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
596 Datatype::encode(args.datatype1), ""));
598 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
599 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
600 private_execute_flops(args.comp_size);
602 TRACE_smpi_comm_out(my_proc_id);
605 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
607 TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
608 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
609 Datatype::encode(args.datatype1),
610 Datatype::encode(args.datatype2)));
612 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
613 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
614 args.recv_size, args.datatype2, MPI_COMM_WORLD);
616 TRACE_smpi_comm_out(my_proc_id);
619 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
621 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
622 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
624 if (name == "gather") {
625 int rank = MPI_COMM_WORLD->rank();
626 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
627 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
630 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
631 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
633 TRACE_smpi_comm_out(my_proc_id);
636 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
638 int rank = MPI_COMM_WORLD->rank();
640 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
641 name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
642 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
644 if (name == "gatherv") {
645 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
646 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
647 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
650 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
651 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
652 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
655 TRACE_smpi_comm_out(my_proc_id);
658 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
660 int rank = MPI_COMM_WORLD->rank();
661 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
662 Datatype::encode(args.datatype1),
663 Datatype::encode(args.datatype2)));
665 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
666 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
668 TRACE_smpi_comm_out(my_proc_id);
671 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
673 int rank = MPI_COMM_WORLD->rank();
674 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
675 nullptr, Datatype::encode(args.datatype1),
676 Datatype::encode(args.datatype2)));
678 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
679 args.sendcounts->data(), args.disps.data(), args.datatype1,
680 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
683 TRACE_smpi_comm_out(my_proc_id);
686 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
688 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
689 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
690 std::to_string(args.comp_size), /* ugly hack to print comp_size */
691 Datatype::encode(args.datatype1)));
693 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
694 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
695 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
697 private_execute_flops(args.comp_size);
698 TRACE_smpi_comm_out(my_proc_id);
701 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
703 TRACE_smpi_comm_in(my_proc_id, __func__,
704 new simgrid::instr::VarCollTIData(
705 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
706 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
708 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
709 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
711 TRACE_smpi_comm_out(my_proc_id);
713 } // Replay Namespace
714 }} // namespace simgrid::smpi
716 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
717 /** @brief Only initialize the replay, don't do it for real */
718 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
720 xbt_assert(not smpi_process()->initializing());
722 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
723 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
724 simgrid::smpi::ActorExt::init();
726 smpi_process()->mark_as_initialized();
727 smpi_process()->set_replaying(true);
729 int my_proc_id = simgrid::s4u::this_actor::get_pid();
731 TRACE_smpi_init(my_proc_id);
732 TRACE_smpi_computing_init(my_proc_id);
733 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
734 TRACE_smpi_comm_out(my_proc_id);
735 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
736 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
737 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
738 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
739 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
740 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
741 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
742 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
743 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
744 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
745 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
746 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
747 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
748 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
749 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
750 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
751 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
752 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
753 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
754 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
755 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
756 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
757 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
758 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
759 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
760 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
761 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
763 //if we have a delayed start, sleep here.
764 if (start_delay_flops > 0) {
765 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
766 private_execute_flops(start_delay_flops);
768 // Wait for the other actors to initialize also
769 simgrid::s4u::this_actor::yield();
773 /** @brief actually run the replay after initialization */
774 void smpi_replay_main(int rank, const char* trace_filename)
776 static int active_processes = 0;
778 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
779 std::string rank_string = std::to_string(rank);
780 simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
782 /* and now, finalize everything */
783 /* One active process will stop. Decrease the counter*/
784 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
785 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
786 if (count_requests > 0) {
787 MPI_Request requests[count_requests];
788 MPI_Status status[count_requests];
791 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
792 requests[i] = pair.second;
795 simgrid::smpi::Request::waitall(count_requests, requests, status);
799 if(active_processes==0){
800 /* Last process alive speaking: end the simulated timer */
801 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
802 smpi_free_replay_tmp_buffers();
805 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
806 new simgrid::instr::NoOpTIData("finalize"));
808 smpi_process()->finalize();
810 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
813 /** @brief chain a replay initialization and a replay start */
814 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
816 smpi_replay_init(instance_id, rank, start_delay_flops);
817 smpi_replay_main(rank, trace_filename);