1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(
138 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140 tag), MPI_REQUEST_NULL});
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 CHECK_ACTION_PARAMS(action, 3, 0)
147 src = std::stoi(action[2]);
148 dst = std::stoi(action[3]);
149 tag = std::stoi(action[4]);
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
154 CHECK_ACTION_PARAMS(action, 3, 1)
155 partner = std::stoi(action[2]);
156 tag = std::stoi(action[3]);
157 size = parse_double(action[4]);
158 if (action.size() > 5)
159 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 CHECK_ACTION_PARAMS(action, 1, 0)
171 time = parse_double(action[2]);
174 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 2, 0)
177 filename = std::string(action[2]);
178 line = std::stoi(action[3]);
181 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
183 CHECK_ACTION_PARAMS(action, 1, 2)
184 size = parse_double(action[2]);
185 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
186 if (action.size() > 4)
187 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
190 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
192 CHECK_ACTION_PARAMS(action, 2, 2)
193 comm_size = parse_double(action[2]);
194 comp_size = parse_double(action[3]);
195 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
196 if (action.size() > 5)
197 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
200 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 CHECK_ACTION_PARAMS(action, 2, 1)
203 comm_size = parse_double(action[2]);
204 comp_size = parse_double(action[3]);
205 if (action.size() > 4)
206 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
209 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 comm_size = MPI_COMM_WORLD->size();
213 send_size = parse_double(action[2]);
214 recv_size = parse_double(action[3]);
216 if (action.size() > 4)
217 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
218 if (action.size() > 5)
219 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
224 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
227 1) 68 is the sendcounts
228 2) 68 is the recvcounts
229 3) 0 is the root node
230 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
233 CHECK_ACTION_PARAMS(action, 2, 3)
234 comm_size = MPI_COMM_WORLD->size();
235 send_size = parse_double(action[2]);
236 recv_size = parse_double(action[3]);
238 if (name == "gather") {
239 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
240 if (action.size() > 5)
241 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
242 if (action.size() > 6)
243 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
245 if (action.size() > 4)
246 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
247 if (action.size() > 5)
248 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
252 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
254 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
255 0 gather 68 68 10 10 10 0 0 0
257 1) 68 is the sendcount
258 2) 68 10 10 10 is the recvcounts
259 3) 0 is the root node
260 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
261 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
263 comm_size = MPI_COMM_WORLD->size();
264 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
265 send_size = parse_double(action[2]);
266 disps = std::vector<int>(comm_size, 0);
267 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
269 if (name == "gatherv") {
270 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
271 if (action.size() > 4 + comm_size)
272 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
273 if (action.size() > 5 + comm_size)
274 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
277 /* The 3 comes from "0 gather <sendcount>", which must always be present.
278 * The + comm_size is the recvcounts array, which must also be present
280 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
281 int datatype_index = 3 + comm_size;
282 disp_index = datatype_index + 1;
283 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
284 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
285 } else if (action.size() >
286 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
287 disp_index = 3 + comm_size;
288 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
289 int datatype_index = 3 + comm_size;
290 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
291 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
294 if (disp_index != 0) {
295 for (unsigned int i = 0; i < comm_size; i++)
296 disps[i] = std::stoi(action[disp_index + i]);
300 for (unsigned int i = 0; i < comm_size; i++) {
301 (*recvcounts)[i] = std::stoi(action[i + 3]);
303 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
306 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
308 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
311 1) 68 is the sendcounts
312 2) 68 is the recvcounts
313 3) 0 is the root node
314 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
315 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
317 CHECK_ACTION_PARAMS(action, 2, 3)
318 comm_size = MPI_COMM_WORLD->size();
319 send_size = parse_double(action[2]);
320 recv_size = parse_double(action[3]);
321 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
322 if (action.size() > 5)
323 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
324 if (action.size() > 6)
325 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
328 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
330 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
331 0 gather 68 10 10 10 68 0 0 0
333 1) 68 10 10 10 is the sendcounts
334 2) 68 is the recvcount
335 3) 0 is the root node
336 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
337 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
339 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
340 recv_size = parse_double(action[2 + comm_size]);
341 disps = std::vector<int>(comm_size, 0);
342 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
344 if (action.size() > 5 + comm_size)
345 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
346 if (action.size() > 5 + comm_size)
347 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
349 for (unsigned int i = 0; i < comm_size; i++) {
350 (*sendcounts)[i] = std::stoi(action[i + 2]);
352 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
353 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
356 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
358 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
359 0 reducescatter 275427 275427 275427 204020 11346849 0
361 1) The first four values after the name of the action declare the recvcounts array
362 2) The value 11346849 is the amount of instructions
363 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
365 comm_size = MPI_COMM_WORLD->size();
366 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
367 comp_size = parse_double(action[2 + comm_size]);
368 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
369 if (action.size() > 3 + comm_size)
370 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
372 for (unsigned int i = 0; i < comm_size; i++) {
373 recvcounts->push_back(std::stoi(action[i + 2]));
375 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
378 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
380 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
381 0 alltoallv 100 1 7 10 12 100 1 70 10 5
383 1) 100 is the size of the send buffer *sizeof(int),
384 2) 1 7 10 12 is the sendcounts array
385 3) 100*sizeof(int) is the size of the receiver buffer
386 4) 1 70 10 5 is the recvcounts array
388 comm_size = MPI_COMM_WORLD->size();
389 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
390 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
391 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
392 senddisps = std::vector<int>(comm_size, 0);
393 recvdisps = std::vector<int>(comm_size, 0);
395 if (action.size() > 5 + 2 * comm_size)
396 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
397 if (action.size() > 5 + 2 * comm_size)
398 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
400 send_buf_size = parse_double(action[2]);
401 recv_buf_size = parse_double(action[3 + comm_size]);
402 for (unsigned int i = 0; i < comm_size; i++) {
403 (*sendcounts)[i] = std::stoi(action[3 + i]);
404 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
406 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
407 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
410 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
412 std::string s = boost::algorithm::join(action, " ");
413 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
414 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
415 req_storage.remove(request);
417 if (request == MPI_REQUEST_NULL) {
418 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
423 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
425 // Must be taken before Request::wait() since the request may be set to
426 // MPI_REQUEST_NULL by Request::wait!
427 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
428 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
429 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
432 Request::wait(&request, &status);
434 TRACE_smpi_comm_out(rank);
435 if (is_wait_for_receive)
436 TRACE_smpi_recv(args.src, args.dst, args.tag);
439 void SendAction::kernel(simgrid::xbt::ReplayAction&)
441 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
443 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
444 args.tag, Datatype::encode(args.datatype1)));
445 if (not TRACE_smpi_view_internals())
446 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
448 if (name == "send") {
449 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450 } else if (name == "isend") {
451 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452 req_storage.add(request);
454 xbt_die("Don't know this action, %s", name.c_str());
457 TRACE_smpi_comm_out(my_proc_id);
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
462 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
463 args.tag, Datatype::encode(args.datatype1)));
466 // unknown size from the receiver point of view
467 if (args.size <= 0.0) {
468 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
469 args.size = status.count;
472 bool is_recv = false; // Help analyzers understanding that status is not used unintialized
473 if (name == "recv") {
475 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
476 } else if (name == "irecv") {
477 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
478 req_storage.add(request);
483 TRACE_smpi_comm_out(my_proc_id);
484 if (is_recv && not TRACE_smpi_view_internals()) {
485 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
486 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
490 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
492 if (simgrid::config::get_value<bool>("smpi/simulate-computation")) {
493 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
497 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
499 XBT_DEBUG("Sleep for: %lf secs", args.time);
500 int rank = simgrid::s4u::this_actor::get_pid();
501 TRACE_smpi_sleeping_in(rank, args.time);
502 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
503 TRACE_smpi_sleeping_out(rank);
506 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
508 smpi_trace_set_call_location(args.filename.c_str(), args.line);
511 void TestAction::kernel(simgrid::xbt::ReplayAction&)
513 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
514 req_storage.remove(request);
515 // if request is null here, this may mean that a previous test has succeeded
516 // Different times in traced application and replayed version may lead to this
517 // In this case, ignore the extra calls.
518 if (request != MPI_REQUEST_NULL) {
519 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
523 Request::test(&request, &status, &flag);
525 XBT_DEBUG("MPI_Test result: %d", flag);
526 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
528 if (request == MPI_REQUEST_NULL)
529 req_storage.addNullRequest(args.src, args.dst, args.tag);
531 req_storage.add(request);
533 TRACE_smpi_comm_out(my_proc_id);
537 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
539 CHECK_ACTION_PARAMS(action, 0, 1)
540 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
541 : MPI_BYTE; // default TAU datatype
543 /* start a simulated timer */
544 smpi_process()->simulated_start();
547 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
552 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
554 const unsigned int count_requests = req_storage.size();
556 if (count_requests > 0) {
557 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
558 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
559 std::vector<MPI_Request> reqs;
560 req_storage.get_requests(reqs);
561 for (const auto& req : reqs) {
562 if (req && (req->flags() & MPI_REQ_RECV)) {
563 sender_receiver.push_back({req->src(), req->dst()});
566 MPI_Status status[count_requests];
567 Request::waitall(count_requests, &(reqs.data())[0], status);
568 req_storage.get_store().clear();
570 for (auto& pair : sender_receiver) {
571 TRACE_smpi_recv(pair.first, pair.second, 0);
573 TRACE_smpi_comm_out(my_proc_id);
577 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
579 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
580 Colls::barrier(MPI_COMM_WORLD);
581 TRACE_smpi_comm_out(my_proc_id);
584 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
586 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
587 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
588 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
590 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
592 TRACE_smpi_comm_out(my_proc_id);
595 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
597 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
598 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
599 args.comp_size, args.comm_size, -1,
600 Datatype::encode(args.datatype1), ""));
602 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
603 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
604 private_execute_flops(args.comp_size);
606 TRACE_smpi_comm_out(my_proc_id);
609 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
611 TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
612 Datatype::encode(args.datatype1), ""));
614 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
615 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
616 private_execute_flops(args.comp_size);
618 TRACE_smpi_comm_out(my_proc_id);
621 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
623 TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
624 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
625 Datatype::encode(args.datatype1),
626 Datatype::encode(args.datatype2)));
628 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
629 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
630 args.recv_size, args.datatype2, MPI_COMM_WORLD);
632 TRACE_smpi_comm_out(my_proc_id);
635 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
637 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
638 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
640 if (name == "gather") {
641 int rank = MPI_COMM_WORLD->rank();
642 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
643 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
646 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
647 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
649 TRACE_smpi_comm_out(my_proc_id);
652 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
654 int rank = MPI_COMM_WORLD->rank();
656 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
657 name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
658 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
660 if (name == "gatherv") {
661 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
663 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
666 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
667 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
668 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
671 TRACE_smpi_comm_out(my_proc_id);
674 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
676 int rank = MPI_COMM_WORLD->rank();
677 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
678 Datatype::encode(args.datatype1),
679 Datatype::encode(args.datatype2)));
681 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
682 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
684 TRACE_smpi_comm_out(my_proc_id);
687 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
689 int rank = MPI_COMM_WORLD->rank();
690 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
691 nullptr, Datatype::encode(args.datatype1),
692 Datatype::encode(args.datatype2)));
694 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
695 args.sendcounts->data(), args.disps.data(), args.datatype1,
696 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
699 TRACE_smpi_comm_out(my_proc_id);
702 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
704 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
705 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
706 std::to_string(args.comp_size), /* ugly hack to print comp_size */
707 Datatype::encode(args.datatype1)));
709 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
710 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
711 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
713 private_execute_flops(args.comp_size);
714 TRACE_smpi_comm_out(my_proc_id);
717 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
719 TRACE_smpi_comm_in(my_proc_id, __func__,
720 new simgrid::instr::VarCollTIData(
721 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
722 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
724 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
725 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
727 TRACE_smpi_comm_out(my_proc_id);
729 } // Replay Namespace
730 }} // namespace simgrid::smpi
732 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
733 /** @brief Only initialize the replay, don't do it for real */
734 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
736 xbt_assert(not smpi_process()->initializing());
738 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
739 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
740 simgrid::smpi::ActorExt::init();
742 smpi_process()->mark_as_initialized();
743 smpi_process()->set_replaying(true);
745 int my_proc_id = simgrid::s4u::this_actor::get_pid();
747 TRACE_smpi_init(my_proc_id);
748 TRACE_smpi_computing_init(my_proc_id);
749 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
750 TRACE_smpi_comm_out(my_proc_id);
751 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
752 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
753 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
754 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
755 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
756 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
757 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
758 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
759 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
760 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
761 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
762 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
763 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
764 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
765 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
766 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
767 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
768 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
769 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
770 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
771 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
772 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
773 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
774 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
775 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
776 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
777 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
778 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
780 //if we have a delayed start, sleep here.
781 if (start_delay_flops > 0) {
782 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
783 private_execute_flops(start_delay_flops);
785 // Wait for the other actors to initialize also
786 simgrid::s4u::this_actor::yield();
790 /** @brief actually run the replay after initialization */
791 void smpi_replay_main(int rank, const char* trace_filename)
793 static int active_processes = 0;
795 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
796 std::string rank_string = std::to_string(rank);
797 simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
799 /* and now, finalize everything */
800 /* One active process will stop. Decrease the counter*/
801 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
802 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
803 if (count_requests > 0) {
804 MPI_Request requests[count_requests];
805 MPI_Status status[count_requests];
808 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
809 requests[i] = pair.second;
812 simgrid::smpi::Request::waitall(count_requests, requests, status);
816 if(active_processes==0){
817 /* Last process alive speaking: end the simulated timer */
818 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
819 smpi_free_replay_tmp_buffers();
822 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
823 new simgrid::instr::NoOpTIData("finalize"));
825 smpi_process()->finalize();
827 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
830 /** @brief chain a replay initialization and a replay start */
831 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
833 smpi_replay_init(instance_id, rank, start_delay_flops);
834 smpi_replay_main(rank, trace_filename);