1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
65 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
66 static MPI_Datatype decode_datatype(std::string action)
68 return simgrid::smpi::Datatype::decode(const_cast<const char* const>(action.c_str()));
74 static void action_init(simgrid::xbt::ReplayAction& action)
76 XBT_DEBUG("Initialize the counters");
77 CHECK_ACTION_PARAMS(action, 0, 1)
78 if (action.size() > 2)
79 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
81 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
83 /* start a simulated timer */
84 smpi_process()->simulated_start();
85 /*initialize the number of active processes */
86 active_processes = smpi_process_count();
88 set_reqq_self(new std::vector<MPI_Request>);
91 static void action_finalize(simgrid::xbt::ReplayAction& action)
96 static void action_comm_size(simgrid::xbt::ReplayAction& action)
98 communicator_size = parse_double(action[2]);
99 log_timed_action (action, smpi_process()->simulated_elapsed());
102 static void action_comm_split(simgrid::xbt::ReplayAction& action)
104 log_timed_action (action, smpi_process()->simulated_elapsed());
107 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
109 log_timed_action (action, smpi_process()->simulated_elapsed());
112 static void action_compute(simgrid::xbt::ReplayAction& action)
114 CHECK_ACTION_PARAMS(action, 1, 0)
115 double clock = smpi_process()->simulated_elapsed();
116 double flops= parse_double(action[2]);
117 int my_proc_id = Actor::self()->getPid();
119 TRACE_smpi_computing_in(my_proc_id, flops);
120 smpi_execute_flops(flops);
121 TRACE_smpi_computing_out(my_proc_id);
123 log_timed_action (action, clock);
126 static void action_send(simgrid::xbt::ReplayAction& action)
128 CHECK_ACTION_PARAMS(action, 2, 1)
129 int to = std::stoi(action[2]);
130 double size=parse_double(action[3]);
131 double clock = smpi_process()->simulated_elapsed();
133 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
135 int my_proc_id = Actor::self()->getPid();
136 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
138 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
139 new simgrid::instr::Pt2PtTIData("send", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
140 if (not TRACE_smpi_view_internals())
141 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
143 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
145 TRACE_smpi_comm_out(my_proc_id);
147 log_timed_action(action, clock);
150 static void action_Isend(simgrid::xbt::ReplayAction& action)
152 CHECK_ACTION_PARAMS(action, 2, 1)
153 int to = std::stoi(action[2]);
154 double size=parse_double(action[3]);
155 double clock = smpi_process()->simulated_elapsed();
157 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
159 int my_proc_id = Actor::self()->getPid();
160 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
161 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
162 new simgrid::instr::Pt2PtTIData("Isend", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
163 if (not TRACE_smpi_view_internals())
164 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
166 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
168 TRACE_smpi_comm_out(my_proc_id);
170 get_reqq_self()->push_back(request);
172 log_timed_action (action, clock);
175 static void action_recv(simgrid::xbt::ReplayAction& action)
177 CHECK_ACTION_PARAMS(action, 2, 1)
178 int from = std::stoi(action[2]);
179 double size=parse_double(action[3]);
180 double clock = smpi_process()->simulated_elapsed();
183 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
185 int my_proc_id = Actor::self()->getPid();
186 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
188 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
189 new simgrid::instr::Pt2PtTIData("recv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
191 //unknown size from the receiver point of view
193 Request::probe(from, 0, MPI_COMM_WORLD, &status);
197 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
199 TRACE_smpi_comm_out(my_proc_id);
200 if (not TRACE_smpi_view_internals()) {
201 TRACE_smpi_recv(src_traced, my_proc_id, 0);
204 log_timed_action (action, clock);
207 static void action_Irecv(simgrid::xbt::ReplayAction& action)
209 CHECK_ACTION_PARAMS(action, 2, 1)
210 int from = std::stoi(action[2]);
211 double size=parse_double(action[3]);
212 double clock = smpi_process()->simulated_elapsed();
214 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
216 int my_proc_id = Actor::self()->getPid();
217 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
218 new simgrid::instr::Pt2PtTIData("Irecv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
220 //unknow size from the receiver pov
222 Request::probe(from, 0, MPI_COMM_WORLD, &status);
226 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
228 TRACE_smpi_comm_out(my_proc_id);
229 get_reqq_self()->push_back(request);
231 log_timed_action (action, clock);
234 static void action_test(simgrid::xbt::ReplayAction& action)
236 CHECK_ACTION_PARAMS(action, 0, 0)
237 double clock = smpi_process()->simulated_elapsed();
240 MPI_Request request = get_reqq_self()->back();
241 get_reqq_self()->pop_back();
242 //if request is null here, this may mean that a previous test has succeeded
243 //Different times in traced application and replayed version may lead to this
244 //In this case, ignore the extra calls.
245 if(request!=nullptr){
246 int my_proc_id = Actor::self()->getPid();
247 TRACE_smpi_testing_in(my_proc_id);
249 int flag = Request::test(&request, &status);
251 XBT_DEBUG("MPI_Test result: %d", flag);
252 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
253 get_reqq_self()->push_back(request);
255 TRACE_smpi_testing_out(my_proc_id);
257 log_timed_action (action, clock);
260 static void action_wait(simgrid::xbt::ReplayAction& action)
262 CHECK_ACTION_PARAMS(action, 0, 0)
263 double clock = smpi_process()->simulated_elapsed();
266 std::string s = boost::algorithm::join(action, " ");
267 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
268 MPI_Request request = get_reqq_self()->back();
269 get_reqq_self()->pop_back();
271 if (request==nullptr){
272 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
276 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
278 MPI_Group group = request->comm()->group();
279 int src_traced = group->rank(request->src());
280 int dst_traced = group->rank(request->dst());
281 int is_wait_for_receive = (request->flags() & RECV);
282 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
284 Request::wait(&request, &status);
286 TRACE_smpi_comm_out(rank);
287 if (is_wait_for_receive)
288 TRACE_smpi_recv(src_traced, dst_traced, 0);
289 log_timed_action (action, clock);
292 static void action_waitall(simgrid::xbt::ReplayAction& action)
294 CHECK_ACTION_PARAMS(action, 0, 0)
295 double clock = smpi_process()->simulated_elapsed();
296 const unsigned int count_requests = get_reqq_self()->size();
298 if (count_requests>0) {
299 MPI_Status status[count_requests];
301 int my_proc_id_traced = Actor::self()->getPid();
302 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
303 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
304 int recvs_snd[count_requests];
305 int recvs_rcv[count_requests];
306 for (unsigned int i = 0; i < count_requests; i++) {
307 const auto& req = (*get_reqq_self())[i];
308 if (req && (req->flags() & RECV)) {
309 recvs_snd[i] = req->src();
310 recvs_rcv[i] = req->dst();
314 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
316 for (unsigned i = 0; i < count_requests; i++) {
317 if (recvs_snd[i]!=-100)
318 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
320 TRACE_smpi_comm_out(my_proc_id_traced);
322 log_timed_action (action, clock);
325 static void action_barrier(simgrid::xbt::ReplayAction& action)
327 double clock = smpi_process()->simulated_elapsed();
328 int my_proc_id = Actor::self()->getPid();
329 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
331 Colls::barrier(MPI_COMM_WORLD);
333 TRACE_smpi_comm_out(my_proc_id);
334 log_timed_action (action, clock);
337 static void action_bcast(simgrid::xbt::ReplayAction& action)
339 CHECK_ACTION_PARAMS(action, 1, 2)
340 double size = parse_double(action[2]);
341 double clock = smpi_process()->simulated_elapsed();
342 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
343 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
344 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
346 int my_proc_id = Actor::self()->getPid();
347 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
348 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
349 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
351 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
353 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
355 TRACE_smpi_comm_out(my_proc_id);
356 log_timed_action (action, clock);
359 static void action_reduce(simgrid::xbt::ReplayAction& action)
361 CHECK_ACTION_PARAMS(action, 2, 2)
362 double comm_size = parse_double(action[2]);
363 double comp_size = parse_double(action[3]);
364 double clock = smpi_process()->simulated_elapsed();
365 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
367 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
369 int my_proc_id = Actor::self()->getPid();
370 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
371 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
372 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
374 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
375 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
376 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
377 smpi_execute_flops(comp_size);
379 TRACE_smpi_comm_out(my_proc_id);
380 log_timed_action (action, clock);
383 static void action_allReduce(simgrid::xbt::ReplayAction& action)
385 CHECK_ACTION_PARAMS(action, 2, 1)
386 double comm_size = parse_double(action[2]);
387 double comp_size = parse_double(action[3]);
389 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
391 double clock = smpi_process()->simulated_elapsed();
392 int my_proc_id = Actor::self()->getPid();
393 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
394 Datatype::encode(MPI_CURRENT_TYPE), ""));
396 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
397 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
398 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
399 smpi_execute_flops(comp_size);
401 TRACE_smpi_comm_out(my_proc_id);
402 log_timed_action (action, clock);
405 static void action_allToAll(simgrid::xbt::ReplayAction& action)
407 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
408 double clock = smpi_process()->simulated_elapsed();
409 unsigned long comm_size = MPI_COMM_WORLD->size();
410 int send_size = parse_double(action[2]);
411 int recv_size = parse_double(action[3]);
412 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
413 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
415 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
416 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
418 int my_proc_id = Actor::self()->getPid();
419 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
420 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
421 Datatype::encode(MPI_CURRENT_TYPE),
422 Datatype::encode(MPI_CURRENT_TYPE2)));
424 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
426 TRACE_smpi_comm_out(my_proc_id);
427 log_timed_action (action, clock);
430 static void action_gather(simgrid::xbt::ReplayAction& action)
432 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
435 1) 68 is the sendcounts
436 2) 68 is the recvcounts
437 3) 0 is the root node
438 4) 0 is the send datatype id, see decode_datatype()
439 5) 0 is the recv datatype id, see decode_datatype()
441 CHECK_ACTION_PARAMS(action, 2, 3)
442 double clock = smpi_process()->simulated_elapsed();
443 unsigned long comm_size = MPI_COMM_WORLD->size();
444 int send_size = parse_double(action[2]);
445 int recv_size = parse_double(action[3]);
446 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
447 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
449 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
450 void *recv = nullptr;
451 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
452 int rank = MPI_COMM_WORLD->rank();
455 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
457 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
458 Datatype::encode(MPI_CURRENT_TYPE),
459 Datatype::encode(MPI_CURRENT_TYPE2)));
461 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
463 TRACE_smpi_comm_out(Actor::self()->getPid());
464 log_timed_action (action, clock);
467 static void action_scatter(simgrid::xbt::ReplayAction& action)
469 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
472 1) 68 is the sendcounts
473 2) 68 is the recvcounts
474 3) 0 is the root node
475 4) 0 is the send datatype id, see decode_datatype()
476 5) 0 is the recv datatype id, see decode_datatype()
478 CHECK_ACTION_PARAMS(action, 2, 3)
479 double clock = smpi_process()->simulated_elapsed();
480 unsigned long comm_size = MPI_COMM_WORLD->size();
481 int send_size = parse_double(action[2]);
482 int recv_size = parse_double(action[3]);
483 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
484 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
486 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
487 void* recv = nullptr;
488 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
489 int rank = MPI_COMM_WORLD->rank();
492 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
494 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
495 Datatype::encode(MPI_CURRENT_TYPE),
496 Datatype::encode(MPI_CURRENT_TYPE2)));
498 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
500 TRACE_smpi_comm_out(Actor::self()->getPid());
501 log_timed_action(action, clock);
504 static void action_gatherv(simgrid::xbt::ReplayAction& action)
506 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
507 0 gather 68 68 10 10 10 0 0 0
509 1) 68 is the sendcount
510 2) 68 10 10 10 is the recvcounts
511 3) 0 is the root node
512 4) 0 is the send datatype id, see decode_datatype()
513 5) 0 is the recv datatype id, see decode_datatype()
515 double clock = smpi_process()->simulated_elapsed();
516 unsigned long comm_size = MPI_COMM_WORLD->size();
517 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
518 int send_size = parse_double(action[2]);
519 std::vector<int> disps(comm_size, 0);
520 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
522 MPI_Datatype MPI_CURRENT_TYPE =
523 (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
524 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
527 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528 void *recv = nullptr;
529 for (unsigned int i = 0; i < comm_size; i++) {
530 (*recvcounts)[i] = std::stoi(action[i + 3]);
532 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
534 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
535 int rank = MPI_COMM_WORLD->rank();
538 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
540 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
541 "gatherV", root, send_size, nullptr, -1, recvcounts,
542 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
544 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
547 TRACE_smpi_comm_out(Actor::self()->getPid());
548 log_timed_action (action, clock);
551 static void action_scatterv(simgrid::xbt::ReplayAction& action)
553 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
554 0 gather 68 10 10 10 68 0 0 0
556 1) 68 10 10 10 is the sendcounts
557 2) 68 is the recvcount
558 3) 0 is the root node
559 4) 0 is the send datatype id, see decode_datatype()
560 5) 0 is the recv datatype id, see decode_datatype()
562 double clock = smpi_process()->simulated_elapsed();
563 unsigned long comm_size = MPI_COMM_WORLD->size();
564 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
565 int recv_size = parse_double(action[2 + comm_size]);
566 std::vector<int> disps(comm_size, 0);
567 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
569 MPI_Datatype MPI_CURRENT_TYPE =
570 (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
571 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
574 void* send = nullptr;
575 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
576 for (unsigned int i = 0; i < comm_size; i++) {
577 (*sendcounts)[i] = std::stoi(action[i + 2]);
579 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
581 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
582 int rank = MPI_COMM_WORLD->rank();
585 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
587 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
588 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
589 Datatype::encode(MPI_CURRENT_TYPE2)));
591 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
594 TRACE_smpi_comm_out(Actor::self()->getPid());
595 log_timed_action(action, clock);
598 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
600 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
601 0 reduceScatter 275427 275427 275427 204020 11346849 0
603 1) The first four values after the name of the action declare the recvcounts array
604 2) The value 11346849 is the amount of instructions
605 3) The last value corresponds to the datatype, see decode_datatype().
607 double clock = smpi_process()->simulated_elapsed();
608 unsigned long comm_size = MPI_COMM_WORLD->size();
609 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
610 int comp_size = parse_double(action[2+comm_size]);
611 int my_proc_id = Actor::self()->getPid();
612 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
613 MPI_Datatype MPI_CURRENT_TYPE =
614 (action.size() > 3 + comm_size) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
616 for (unsigned int i = 0; i < comm_size; i++) {
617 recvcounts->push_back(std::stoi(action[i + 2]));
619 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
621 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
622 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
623 std::to_string(comp_size), /* ugly hack to print comp_size */
624 Datatype::encode(MPI_CURRENT_TYPE)));
626 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
627 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
629 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
630 smpi_execute_flops(comp_size);
632 TRACE_smpi_comm_out(my_proc_id);
633 log_timed_action (action, clock);
636 static void action_allgather(simgrid::xbt::ReplayAction& action)
638 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
639 0 allGather 275427 275427
641 1) 275427 is the sendcount
642 2) 275427 is the recvcount
643 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
645 double clock = smpi_process()->simulated_elapsed();
647 CHECK_ACTION_PARAMS(action, 2, 2)
648 int sendcount = std::stoi(action[2]);
649 int recvcount = std::stoi(action[3]);
651 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
652 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
654 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
655 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
657 int my_proc_id = Actor::self()->getPid();
659 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
660 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
661 Datatype::encode(MPI_CURRENT_TYPE),
662 Datatype::encode(MPI_CURRENT_TYPE2)));
664 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
666 TRACE_smpi_comm_out(my_proc_id);
667 log_timed_action (action, clock);
670 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
672 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
673 0 allGatherV 275427 275427 275427 275427 204020
675 1) 275427 is the sendcount
676 2) The next four elements declare the recvcounts array
677 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
679 double clock = smpi_process()->simulated_elapsed();
681 unsigned long comm_size = MPI_COMM_WORLD->size();
682 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
683 int sendcount = std::stoi(action[2]);
684 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
685 std::vector<int> disps(comm_size, 0);
687 int datatype_index = 0, disp_index = 0;
688 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
689 datatype_index = 3 + comm_size;
690 disp_index = datatype_index + 1;
691 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
693 disp_index = 3 + comm_size;
694 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
695 datatype_index = 3 + comm_size;
698 if (disp_index != 0) {
699 for (unsigned int i = 0; i < comm_size; i++)
700 disps[i] = std::stoi(action[disp_index + i]);
703 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
704 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
706 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
708 for (unsigned int i = 0; i < comm_size; i++) {
709 (*recvcounts)[i] = std::stoi(action[i + 3]);
711 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
712 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
714 int my_proc_id = Actor::self()->getPid();
716 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
717 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
718 Datatype::encode(MPI_CURRENT_TYPE),
719 Datatype::encode(MPI_CURRENT_TYPE2)));
721 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
724 TRACE_smpi_comm_out(my_proc_id);
725 log_timed_action (action, clock);
728 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
730 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
731 0 allToAllV 100 1 7 10 12 100 1 70 10 5
733 1) 100 is the size of the send buffer *sizeof(int),
734 2) 1 7 10 12 is the sendcounts array
735 3) 100*sizeof(int) is the size of the receiver buffer
736 4) 1 70 10 5 is the recvcounts array
738 double clock = smpi_process()->simulated_elapsed();
740 unsigned long comm_size = MPI_COMM_WORLD->size();
741 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
742 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
743 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
744 std::vector<int> senddisps(comm_size, 0);
745 std::vector<int> recvdisps(comm_size, 0);
747 MPI_Datatype MPI_CURRENT_TYPE =
748 (action.size() > 5 + 2 * comm_size) ? decode_datatype(action[4 + 2 * comm_size]) : MPI_DEFAULT_TYPE;
749 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size) ? decode_datatype(action[5 + 2 * comm_size])
752 int send_buf_size=parse_double(action[2]);
753 int recv_buf_size=parse_double(action[3+comm_size]);
754 int my_proc_id = Actor::self()->getPid();
755 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
756 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
758 for (unsigned int i = 0; i < comm_size; i++) {
759 (*sendcounts)[i] = std::stoi(action[3 + i]);
760 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
762 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
763 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
765 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
766 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
767 Datatype::encode(MPI_CURRENT_TYPE),
768 Datatype::encode(MPI_CURRENT_TYPE2)));
770 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
771 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
773 TRACE_smpi_comm_out(my_proc_id);
774 log_timed_action (action, clock);
777 }} // namespace simgrid::smpi
779 /** @brief Only initialize the replay, don't do it for real */
780 void smpi_replay_init(int* argc, char*** argv)
782 simgrid::smpi::Process::init(argc, argv);
783 smpi_process()->mark_as_initialized();
784 smpi_process()->set_replaying(true);
786 int my_proc_id = Actor::self()->getPid();
787 TRACE_smpi_init(my_proc_id);
788 TRACE_smpi_computing_init(my_proc_id);
789 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
790 TRACE_smpi_comm_out(my_proc_id);
791 xbt_replay_action_register("init", simgrid::smpi::action_init);
792 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
793 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
794 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
795 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
796 xbt_replay_action_register("send", simgrid::smpi::action_send);
797 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
798 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
799 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
800 xbt_replay_action_register("test", simgrid::smpi::action_test);
801 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
802 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
803 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
804 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
805 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
806 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
807 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
808 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
809 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
810 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
811 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
812 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
813 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
814 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
815 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
816 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
818 //if we have a delayed start, sleep here.
820 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
821 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
822 smpi_execute_flops(value);
824 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
825 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
826 smpi_execute_flops(0.0);
830 /** @brief actually run the replay after initialization */
831 void smpi_replay_main(int* argc, char*** argv)
833 simgrid::xbt::replay_runner(*argc, *argv);
835 /* and now, finalize everything */
836 /* One active process will stop. Decrease the counter*/
837 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
838 if (not get_reqq_self()->empty()) {
839 unsigned int count_requests=get_reqq_self()->size();
840 MPI_Request requests[count_requests];
841 MPI_Status status[count_requests];
844 for (auto const& req : *get_reqq_self()) {
848 simgrid::smpi::Request::waitall(count_requests, requests, status);
850 delete get_reqq_self();
853 if(active_processes==0){
854 /* Last process alive speaking: end the simulated timer */
855 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
856 smpi_free_replay_tmp_buffers();
859 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
861 smpi_process()->finalize();
863 TRACE_smpi_comm_out(Actor::self()->getPid());
864 TRACE_smpi_finalize(Actor::self()->getPid());
867 /** @brief chain a replay initialization and a replay start */
868 void smpi_replay_run(int* argc, char*** argv)
870 smpi_replay_init(argc, argv);
871 smpi_replay_main(argc, argv);