1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 class ReplayActionArg {
46 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
47 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
48 std::string s = boost::algorithm::join(action, " ");
49 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
53 static std::vector<MPI_Request>* get_reqq_self()
55 return reqq.at(Actor::self()->getPid());
58 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
60 reqq.insert({Actor::self()->getPid(), mpi_request});
64 static double parse_double(std::string string)
66 return xbt_str_parse_double(string.c_str(), "%s is not a double");
70 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
71 static MPI_Datatype decode_datatype(std::string action)
73 return simgrid::smpi::Datatype::decode(const_cast<const char* const>(action.c_str()));
76 const char* encode_datatype(MPI_Datatype datatype)
78 if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
81 return datatype->encode();
87 static void action_init(simgrid::xbt::ReplayAction& action)
89 XBT_DEBUG("Initialize the counters");
90 CHECK_ACTION_PARAMS(action, 0, 1)
91 if (action.size() > 2)
92 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
94 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
96 /* start a simulated timer */
97 smpi_process()->simulated_start();
98 /*initialize the number of active processes */
99 active_processes = smpi_process_count();
101 set_reqq_self(new std::vector<MPI_Request>);
104 static void action_finalize(simgrid::xbt::ReplayAction& action)
109 static void action_comm_size(simgrid::xbt::ReplayAction& action)
111 communicator_size = parse_double(action[2]);
112 log_timed_action (action, smpi_process()->simulated_elapsed());
115 static void action_comm_split(simgrid::xbt::ReplayAction& action)
117 log_timed_action (action, smpi_process()->simulated_elapsed());
120 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
122 log_timed_action (action, smpi_process()->simulated_elapsed());
125 static void action_compute(simgrid::xbt::ReplayAction& action)
127 CHECK_ACTION_PARAMS(action, 1, 0)
128 double clock = smpi_process()->simulated_elapsed();
129 double flops= parse_double(action[2]);
130 int my_proc_id = Actor::self()->getPid();
132 TRACE_smpi_computing_in(my_proc_id, flops);
133 smpi_execute_flops(flops);
134 TRACE_smpi_computing_out(my_proc_id);
136 log_timed_action (action, clock);
139 static void action_send(simgrid::xbt::ReplayAction& action)
141 CHECK_ACTION_PARAMS(action, 2, 1)
142 int to = std::stoi(action[2]);
143 double size=parse_double(action[3]);
144 double clock = smpi_process()->simulated_elapsed();
146 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
148 int my_proc_id = Actor::self()->getPid();
149 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
151 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
152 new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
153 if (not TRACE_smpi_view_internals())
154 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
156 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
158 TRACE_smpi_comm_out(my_proc_id);
160 log_timed_action(action, clock);
163 static void action_Isend(simgrid::xbt::ReplayAction& action)
165 CHECK_ACTION_PARAMS(action, 2, 1)
166 int to = std::stoi(action[2]);
167 double size=parse_double(action[3]);
168 double clock = smpi_process()->simulated_elapsed();
170 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
172 int my_proc_id = Actor::self()->getPid();
173 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
174 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
175 new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
176 if (not TRACE_smpi_view_internals())
177 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
179 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
181 TRACE_smpi_comm_out(my_proc_id);
183 get_reqq_self()->push_back(request);
185 log_timed_action (action, clock);
188 static void action_recv(simgrid::xbt::ReplayAction& action)
190 CHECK_ACTION_PARAMS(action, 2, 1)
191 int from = std::stoi(action[2]);
192 double size=parse_double(action[3]);
193 double clock = smpi_process()->simulated_elapsed();
196 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
198 int my_proc_id = Actor::self()->getPid();
199 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
201 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
202 new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
204 //unknown size from the receiver point of view
206 Request::probe(from, 0, MPI_COMM_WORLD, &status);
210 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
212 TRACE_smpi_comm_out(my_proc_id);
213 if (not TRACE_smpi_view_internals()) {
214 TRACE_smpi_recv(src_traced, my_proc_id, 0);
217 log_timed_action (action, clock);
220 static void action_Irecv(simgrid::xbt::ReplayAction& action)
222 CHECK_ACTION_PARAMS(action, 2, 1)
223 int from = std::stoi(action[2]);
224 double size=parse_double(action[3]);
225 double clock = smpi_process()->simulated_elapsed();
227 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
229 int my_proc_id = Actor::self()->getPid();
230 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
231 new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
233 //unknow size from the receiver pov
235 Request::probe(from, 0, MPI_COMM_WORLD, &status);
239 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
241 TRACE_smpi_comm_out(my_proc_id);
242 get_reqq_self()->push_back(request);
244 log_timed_action (action, clock);
247 static void action_test(simgrid::xbt::ReplayAction& action)
249 CHECK_ACTION_PARAMS(action, 0, 0)
250 double clock = smpi_process()->simulated_elapsed();
253 MPI_Request request = get_reqq_self()->back();
254 get_reqq_self()->pop_back();
255 //if request is null here, this may mean that a previous test has succeeded
256 //Different times in traced application and replayed version may lead to this
257 //In this case, ignore the extra calls.
258 if(request!=nullptr){
259 int my_proc_id = Actor::self()->getPid();
260 TRACE_smpi_testing_in(my_proc_id);
262 int flag = Request::test(&request, &status);
264 XBT_DEBUG("MPI_Test result: %d", flag);
265 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
266 get_reqq_self()->push_back(request);
268 TRACE_smpi_testing_out(my_proc_id);
270 log_timed_action (action, clock);
273 static void action_wait(simgrid::xbt::ReplayAction& action)
275 CHECK_ACTION_PARAMS(action, 0, 0)
276 double clock = smpi_process()->simulated_elapsed();
279 std::string s = boost::algorithm::join(action, " ");
280 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
281 MPI_Request request = get_reqq_self()->back();
282 get_reqq_self()->pop_back();
284 if (request==nullptr){
285 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
289 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
291 MPI_Group group = request->comm()->group();
292 int src_traced = group->rank(request->src());
293 int dst_traced = group->rank(request->dst());
294 int is_wait_for_receive = (request->flags() & RECV);
295 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
297 Request::wait(&request, &status);
299 TRACE_smpi_comm_out(rank);
300 if (is_wait_for_receive)
301 TRACE_smpi_recv(src_traced, dst_traced, 0);
302 log_timed_action (action, clock);
305 static void action_waitall(simgrid::xbt::ReplayAction& action)
307 CHECK_ACTION_PARAMS(action, 0, 0)
308 double clock = smpi_process()->simulated_elapsed();
309 const unsigned int count_requests = get_reqq_self()->size();
311 if (count_requests>0) {
312 MPI_Status status[count_requests];
314 int my_proc_id_traced = Actor::self()->getPid();
315 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
316 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
317 int recvs_snd[count_requests];
318 int recvs_rcv[count_requests];
319 for (unsigned int i = 0; i < count_requests; i++) {
320 const auto& req = (*get_reqq_self())[i];
321 if (req && (req->flags() & RECV)) {
322 recvs_snd[i] = req->src();
323 recvs_rcv[i] = req->dst();
327 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
329 for (unsigned i = 0; i < count_requests; i++) {
330 if (recvs_snd[i]!=-100)
331 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
333 TRACE_smpi_comm_out(my_proc_id_traced);
335 log_timed_action (action, clock);
338 static void action_barrier(simgrid::xbt::ReplayAction& action)
340 double clock = smpi_process()->simulated_elapsed();
341 int my_proc_id = Actor::self()->getPid();
342 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
344 Colls::barrier(MPI_COMM_WORLD);
346 TRACE_smpi_comm_out(my_proc_id);
347 log_timed_action (action, clock);
350 static void action_bcast(simgrid::xbt::ReplayAction& action)
352 CHECK_ACTION_PARAMS(action, 1, 2)
353 double size = parse_double(action[2]);
354 double clock = smpi_process()->simulated_elapsed();
355 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
356 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
357 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
359 int my_proc_id = Actor::self()->getPid();
360 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
361 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
362 -1, MPI_CURRENT_TYPE->encode(), ""));
364 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
366 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
368 TRACE_smpi_comm_out(my_proc_id);
369 log_timed_action (action, clock);
372 static void action_reduce(simgrid::xbt::ReplayAction& action)
374 CHECK_ACTION_PARAMS(action, 2, 2)
375 double comm_size = parse_double(action[2]);
376 double comp_size = parse_double(action[3]);
377 double clock = smpi_process()->simulated_elapsed();
378 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
380 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
382 int my_proc_id = Actor::self()->getPid();
383 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
384 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
385 comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
387 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
388 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
389 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
390 smpi_execute_flops(comp_size);
392 TRACE_smpi_comm_out(my_proc_id);
393 log_timed_action (action, clock);
396 static void action_allReduce(simgrid::xbt::ReplayAction& action)
398 CHECK_ACTION_PARAMS(action, 2, 1)
399 double comm_size = parse_double(action[2]);
400 double comp_size = parse_double(action[3]);
402 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
404 double clock = smpi_process()->simulated_elapsed();
405 int my_proc_id = Actor::self()->getPid();
406 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
407 MPI_CURRENT_TYPE->encode(), ""));
409 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
410 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
411 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
412 smpi_execute_flops(comp_size);
414 TRACE_smpi_comm_out(my_proc_id);
415 log_timed_action (action, clock);
418 static void action_allToAll(simgrid::xbt::ReplayAction& action)
420 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
421 double clock = smpi_process()->simulated_elapsed();
422 unsigned long comm_size = MPI_COMM_WORLD->size();
423 int send_size = parse_double(action[2]);
424 int recv_size = parse_double(action[3]);
425 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
426 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
428 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
429 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
431 int my_proc_id = Actor::self()->getPid();
432 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
433 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
434 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
436 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
438 TRACE_smpi_comm_out(my_proc_id);
439 log_timed_action (action, clock);
442 static void action_gather(simgrid::xbt::ReplayAction& action)
444 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
447 1) 68 is the sendcounts
448 2) 68 is the recvcounts
449 3) 0 is the root node
450 4) 0 is the send datatype id, see decode_datatype()
451 5) 0 is the recv datatype id, see decode_datatype()
453 CHECK_ACTION_PARAMS(action, 2, 3)
454 double clock = smpi_process()->simulated_elapsed();
455 unsigned long comm_size = MPI_COMM_WORLD->size();
456 int send_size = parse_double(action[2]);
457 int recv_size = parse_double(action[3]);
458 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
459 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
461 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
462 void *recv = nullptr;
463 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
464 int rank = MPI_COMM_WORLD->rank();
467 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
469 TRACE_smpi_comm_in(rank, __FUNCTION__,
470 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
471 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
473 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
475 TRACE_smpi_comm_out(Actor::self()->getPid());
476 log_timed_action (action, clock);
479 static void action_scatter(simgrid::xbt::ReplayAction& action)
481 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
484 1) 68 is the sendcounts
485 2) 68 is the recvcounts
486 3) 0 is the root node
487 4) 0 is the send datatype id, see decode_datatype()
488 5) 0 is the recv datatype id, see decode_datatype()
490 CHECK_ACTION_PARAMS(action, 2, 3)
491 double clock = smpi_process()->simulated_elapsed();
492 unsigned long comm_size = MPI_COMM_WORLD->size();
493 int send_size = parse_double(action[2]);
494 int recv_size = parse_double(action[3]);
495 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
496 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
498 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
499 void* recv = nullptr;
500 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
501 int rank = MPI_COMM_WORLD->rank();
504 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
506 TRACE_smpi_comm_in(rank, __FUNCTION__,
507 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
508 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
510 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
512 TRACE_smpi_comm_out(Actor::self()->getPid());
513 log_timed_action(action, clock);
516 static void action_gatherv(simgrid::xbt::ReplayAction& action)
518 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
519 0 gather 68 68 10 10 10 0 0 0
521 1) 68 is the sendcount
522 2) 68 10 10 10 is the recvcounts
523 3) 0 is the root node
524 4) 0 is the send datatype id, see decode_datatype()
525 5) 0 is the recv datatype id, see decode_datatype()
527 double clock = smpi_process()->simulated_elapsed();
528 unsigned long comm_size = MPI_COMM_WORLD->size();
529 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
530 int send_size = parse_double(action[2]);
531 std::vector<int> disps(comm_size, 0);
532 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
534 MPI_Datatype MPI_CURRENT_TYPE =
535 (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
536 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
539 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
540 void *recv = nullptr;
541 for (unsigned int i = 0; i < comm_size; i++) {
542 (*recvcounts)[i] = std::stoi(action[i + 3]);
544 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
546 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
547 int rank = MPI_COMM_WORLD->rank();
550 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
552 TRACE_smpi_comm_in(rank, __FUNCTION__,
553 new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
554 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
556 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
559 TRACE_smpi_comm_out(Actor::self()->getPid());
560 log_timed_action (action, clock);
563 static void action_scatterv(simgrid::xbt::ReplayAction& action)
565 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
566 0 gather 68 10 10 10 68 0 0 0
568 1) 68 10 10 10 is the sendcounts
569 2) 68 is the recvcount
570 3) 0 is the root node
571 4) 0 is the send datatype id, see decode_datatype()
572 5) 0 is the recv datatype id, see decode_datatype()
574 double clock = smpi_process()->simulated_elapsed();
575 unsigned long comm_size = MPI_COMM_WORLD->size();
576 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
577 int recv_size = parse_double(action[2 + comm_size]);
578 std::vector<int> disps(comm_size, 0);
579 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
581 MPI_Datatype MPI_CURRENT_TYPE =
582 (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
583 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
586 void* send = nullptr;
587 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
588 for (unsigned int i = 0; i < comm_size; i++) {
589 (*sendcounts)[i] = std::stoi(action[i + 2]);
591 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
593 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
594 int rank = MPI_COMM_WORLD->rank();
597 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
599 TRACE_smpi_comm_in(rank, __FUNCTION__,
600 new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
601 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
603 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
606 TRACE_smpi_comm_out(Actor::self()->getPid());
607 log_timed_action(action, clock);
610 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
612 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
613 0 reduceScatter 275427 275427 275427 204020 11346849 0
615 1) The first four values after the name of the action declare the recvcounts array
616 2) The value 11346849 is the amount of instructions
617 3) The last value corresponds to the datatype, see decode_datatype().
619 double clock = smpi_process()->simulated_elapsed();
620 unsigned long comm_size = MPI_COMM_WORLD->size();
621 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
622 int comp_size = parse_double(action[2+comm_size]);
623 int my_proc_id = Actor::self()->getPid();
624 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
625 MPI_Datatype MPI_CURRENT_TYPE =
626 (action.size() > 3 + comm_size) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
628 for (unsigned int i = 0; i < comm_size; i++) {
629 recvcounts->push_back(std::stoi(action[i + 2]));
631 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
633 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
634 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
635 std::to_string(comp_size), /* ugly hack to print comp_size */
636 MPI_CURRENT_TYPE->encode()));
638 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
639 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
641 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
642 smpi_execute_flops(comp_size);
644 TRACE_smpi_comm_out(my_proc_id);
645 log_timed_action (action, clock);
648 static void action_allgather(simgrid::xbt::ReplayAction& action)
650 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
651 0 allGather 275427 275427
653 1) 275427 is the sendcount
654 2) 275427 is the recvcount
655 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
657 double clock = smpi_process()->simulated_elapsed();
659 CHECK_ACTION_PARAMS(action, 2, 2)
660 int sendcount = std::stoi(action[2]);
661 int recvcount = std::stoi(action[3]);
663 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
664 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
666 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
667 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
669 int my_proc_id = Actor::self()->getPid();
671 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
672 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
673 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
675 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
677 TRACE_smpi_comm_out(my_proc_id);
678 log_timed_action (action, clock);
681 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
683 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
684 0 allGatherV 275427 275427 275427 275427 204020
686 1) 275427 is the sendcount
687 2) The next four elements declare the recvcounts array
688 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
690 double clock = smpi_process()->simulated_elapsed();
692 unsigned long comm_size = MPI_COMM_WORLD->size();
693 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
694 int sendcount = std::stoi(action[2]);
695 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
696 std::vector<int> disps(comm_size, 0);
698 int datatype_index = 0, disp_index = 0;
699 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
700 datatype_index = 3 + comm_size;
701 disp_index = datatype_index + 1;
702 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
704 disp_index = 3 + comm_size;
705 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
706 datatype_index = 3 + comm_size;
709 if (disp_index != 0) {
710 for (unsigned int i = 0; i < comm_size; i++)
711 disps[i] = std::stoi(action[disp_index + i]);
714 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
715 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
717 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
719 for (unsigned int i = 0; i < comm_size; i++) {
720 (*recvcounts)[i] = std::stoi(action[i + 3]);
722 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
723 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
725 int my_proc_id = Actor::self()->getPid();
727 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
728 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
729 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
731 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
734 TRACE_smpi_comm_out(my_proc_id);
735 log_timed_action (action, clock);
738 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
740 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
741 0 allToAllV 100 1 7 10 12 100 1 70 10 5
743 1) 100 is the size of the send buffer *sizeof(int),
744 2) 1 7 10 12 is the sendcounts array
745 3) 100*sizeof(int) is the size of the receiver buffer
746 4) 1 70 10 5 is the recvcounts array
748 double clock = smpi_process()->simulated_elapsed();
750 unsigned long comm_size = MPI_COMM_WORLD->size();
751 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
752 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
753 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
754 std::vector<int> senddisps(comm_size, 0);
755 std::vector<int> recvdisps(comm_size, 0);
757 MPI_Datatype MPI_CURRENT_TYPE =
758 (action.size() > 5 + 2 * comm_size) ? decode_datatype(action[4 + 2 * comm_size]) : MPI_DEFAULT_TYPE;
759 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size) ? decode_datatype(action[5 + 2 * comm_size])
762 int send_buf_size=parse_double(action[2]);
763 int recv_buf_size=parse_double(action[3+comm_size]);
764 int my_proc_id = Actor::self()->getPid();
765 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
766 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
768 for (unsigned int i = 0; i < comm_size; i++) {
769 (*sendcounts)[i] = std::stoi(action[3 + i]);
770 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
772 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
773 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
775 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
776 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
777 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
779 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
780 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
782 TRACE_smpi_comm_out(my_proc_id);
783 log_timed_action (action, clock);
786 }} // namespace simgrid::smpi
788 /** @brief Only initialize the replay, don't do it for real */
789 void smpi_replay_init(int* argc, char*** argv)
791 simgrid::smpi::Process::init(argc, argv);
792 smpi_process()->mark_as_initialized();
793 smpi_process()->set_replaying(true);
795 int my_proc_id = Actor::self()->getPid();
796 TRACE_smpi_init(my_proc_id);
797 TRACE_smpi_computing_init(my_proc_id);
798 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
799 TRACE_smpi_comm_out(my_proc_id);
800 xbt_replay_action_register("init", simgrid::smpi::action_init);
801 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
802 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
803 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
804 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
805 xbt_replay_action_register("send", simgrid::smpi::action_send);
806 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
807 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
808 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
809 xbt_replay_action_register("test", simgrid::smpi::action_test);
810 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
811 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
812 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
813 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
814 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
815 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
816 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
817 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
818 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
819 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
820 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
821 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
822 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
823 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
824 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
825 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
827 //if we have a delayed start, sleep here.
829 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
830 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
831 smpi_execute_flops(value);
833 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
834 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
835 smpi_execute_flops(0.0);
839 /** @brief actually run the replay after initialization */
840 void smpi_replay_main(int* argc, char*** argv)
842 simgrid::xbt::replay_runner(*argc, *argv);
844 /* and now, finalize everything */
845 /* One active process will stop. Decrease the counter*/
846 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
847 if (not get_reqq_self()->empty()) {
848 unsigned int count_requests=get_reqq_self()->size();
849 MPI_Request requests[count_requests];
850 MPI_Status status[count_requests];
853 for (auto const& req : *get_reqq_self()) {
857 simgrid::smpi::Request::waitall(count_requests, requests, status);
859 delete get_reqq_self();
862 if(active_processes==0){
863 /* Last process alive speaking: end the simulated timer */
864 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
865 smpi_free_replay_tmp_buffers();
868 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
870 smpi_process()->finalize();
872 TRACE_smpi_comm_out(Actor::self()->getPid());
873 TRACE_smpi_finalize(Actor::self()->getPid());
876 /** @brief chain a replay initialization and a replay start */
877 void smpi_replay_run(int* argc, char*** argv)
879 smpi_replay_init(argc, argv);
880 smpi_replay_main(argc, argv);