1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
17 #include <unordered_map>
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 static int communicator_size = 0;
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
32 while(action[i]!=nullptr)\
35 THROWF(arg_error, 0, "%s replay failed.\n" \
36 "%d items were given on the line. First two should be process_id and action. " \
37 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
38 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
41 class ReplayActionArg {
45 static void log_timed_action (const char *const *action, double clock){
46 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
47 char *name = xbt_str_join_array(action, " ");
48 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
53 static std::vector<MPI_Request>* get_reqq_self()
55 return reqq.at(Actor::self()->getPid());
58 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
60 reqq.insert({Actor::self()->getPid(), mpi_request});
64 static double parse_double(const char *string)
67 double value = strtod(string, &endptr);
69 THROWF(unknown_error, 0, "%s is not a double", string);
74 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
75 static MPI_Datatype decode_datatype(const char *const action)
77 return simgrid::smpi::Datatype::decode(action);
80 const char* encode_datatype(MPI_Datatype datatype)
82 if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
85 return datatype->encode();
91 static void action_init(const char *const *action)
93 XBT_DEBUG("Initialize the counters");
94 CHECK_ACTION_PARAMS(action, 0, 1)
96 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
98 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
100 /* start a simulated timer */
101 smpi_process()->simulated_start();
102 /*initialize the number of active processes */
103 active_processes = smpi_process_count();
105 set_reqq_self(new std::vector<MPI_Request>);
108 static void action_finalize(const char *const *action)
113 static void action_comm_size(const char *const *action)
115 communicator_size = parse_double(action[2]);
116 log_timed_action (action, smpi_process()->simulated_elapsed());
119 static void action_comm_split(const char *const *action)
121 log_timed_action (action, smpi_process()->simulated_elapsed());
124 static void action_comm_dup(const char *const *action)
126 log_timed_action (action, smpi_process()->simulated_elapsed());
129 static void action_compute(const char *const *action)
131 CHECK_ACTION_PARAMS(action, 1, 0)
132 double clock = smpi_process()->simulated_elapsed();
133 double flops= parse_double(action[2]);
134 int my_proc_id = Actor::self()->getPid();
136 TRACE_smpi_computing_in(my_proc_id, flops);
137 smpi_execute_flops(flops);
138 TRACE_smpi_computing_out(my_proc_id);
140 log_timed_action (action, clock);
143 static void action_send(const char *const *action)
145 CHECK_ACTION_PARAMS(action, 2, 1)
146 int to = std::stoi(action[2]);
147 double size=parse_double(action[3]);
148 double clock = smpi_process()->simulated_elapsed();
150 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
152 int my_proc_id = Actor::self()->getPid();
153 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
155 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
156 new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
157 if (not TRACE_smpi_view_internals())
158 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
160 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
162 TRACE_smpi_comm_out(my_proc_id);
164 log_timed_action(action, clock);
167 static void action_Isend(const char *const *action)
169 CHECK_ACTION_PARAMS(action, 2, 1)
170 int to = std::stoi(action[2]);
171 double size=parse_double(action[3]);
172 double clock = smpi_process()->simulated_elapsed();
174 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
176 int my_proc_id = Actor::self()->getPid();
177 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
178 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
179 new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
180 if (not TRACE_smpi_view_internals())
181 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
183 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
185 TRACE_smpi_comm_out(my_proc_id);
187 get_reqq_self()->push_back(request);
189 log_timed_action (action, clock);
192 static void action_recv(const char *const *action) {
193 CHECK_ACTION_PARAMS(action, 2, 1)
194 int from = std::stoi(action[2]);
195 double size=parse_double(action[3]);
196 double clock = smpi_process()->simulated_elapsed();
199 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
201 int my_proc_id = Actor::self()->getPid();
202 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
204 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
205 new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
207 //unknown size from the receiver point of view
209 Request::probe(from, 0, MPI_COMM_WORLD, &status);
213 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
215 TRACE_smpi_comm_out(my_proc_id);
216 if (not TRACE_smpi_view_internals()) {
217 TRACE_smpi_recv(src_traced, my_proc_id, 0);
220 log_timed_action (action, clock);
223 static void action_Irecv(const char *const *action)
225 CHECK_ACTION_PARAMS(action, 2, 1)
226 int from = std::stoi(action[2]);
227 double size=parse_double(action[3]);
228 double clock = smpi_process()->simulated_elapsed();
230 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
232 int my_proc_id = Actor::self()->getPid();
233 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
234 new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
236 //unknow size from the receiver pov
238 Request::probe(from, 0, MPI_COMM_WORLD, &status);
242 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
244 TRACE_smpi_comm_out(my_proc_id);
245 get_reqq_self()->push_back(request);
247 log_timed_action (action, clock);
250 static void action_test(const char* const* action)
252 CHECK_ACTION_PARAMS(action, 0, 0)
253 double clock = smpi_process()->simulated_elapsed();
256 MPI_Request request = get_reqq_self()->back();
257 get_reqq_self()->pop_back();
258 //if request is null here, this may mean that a previous test has succeeded
259 //Different times in traced application and replayed version may lead to this
260 //In this case, ignore the extra calls.
261 if(request!=nullptr){
262 int my_proc_id = Actor::self()->getPid();
263 TRACE_smpi_testing_in(my_proc_id);
265 int flag = Request::test(&request, &status);
267 XBT_DEBUG("MPI_Test result: %d", flag);
268 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
269 get_reqq_self()->push_back(request);
271 TRACE_smpi_testing_out(my_proc_id);
273 log_timed_action (action, clock);
276 static void action_wait(const char *const *action){
277 CHECK_ACTION_PARAMS(action, 0, 0)
278 double clock = smpi_process()->simulated_elapsed();
281 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
282 xbt_str_join_array(action," "));
283 MPI_Request request = get_reqq_self()->back();
284 get_reqq_self()->pop_back();
286 if (request==nullptr){
287 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
291 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
293 MPI_Group group = request->comm()->group();
294 int src_traced = group->rank(request->src());
295 int dst_traced = group->rank(request->dst());
296 int is_wait_for_receive = (request->flags() & RECV);
297 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
299 Request::wait(&request, &status);
301 TRACE_smpi_comm_out(rank);
302 if (is_wait_for_receive)
303 TRACE_smpi_recv(src_traced, dst_traced, 0);
304 log_timed_action (action, clock);
307 static void action_waitall(const char *const *action){
308 CHECK_ACTION_PARAMS(action, 0, 0)
309 double clock = smpi_process()->simulated_elapsed();
310 const unsigned int count_requests = get_reqq_self()->size();
312 if (count_requests>0) {
313 MPI_Status status[count_requests];
315 int my_proc_id_traced = Actor::self()->getPid();
316 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
317 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
318 int recvs_snd[count_requests];
319 int recvs_rcv[count_requests];
320 for (unsigned int i = 0; i < count_requests; i++) {
321 const auto& req = (*get_reqq_self())[i];
322 if (req && (req->flags() & RECV)) {
323 recvs_snd[i] = req->src();
324 recvs_rcv[i] = req->dst();
328 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
330 for (unsigned i = 0; i < count_requests; i++) {
331 if (recvs_snd[i]!=-100)
332 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
334 TRACE_smpi_comm_out(my_proc_id_traced);
336 log_timed_action (action, clock);
339 static void action_barrier(const char *const *action){
340 double clock = smpi_process()->simulated_elapsed();
341 int my_proc_id = Actor::self()->getPid();
342 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
344 Colls::barrier(MPI_COMM_WORLD);
346 TRACE_smpi_comm_out(my_proc_id);
347 log_timed_action (action, clock);
350 static void action_bcast(const char *const *action)
352 CHECK_ACTION_PARAMS(action, 1, 2)
353 double size = parse_double(action[2]);
354 double clock = smpi_process()->simulated_elapsed();
355 int root = (action[3]) ? std::stoi(action[3]) : 0;
356 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
357 MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
359 int my_proc_id = Actor::self()->getPid();
360 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
361 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
362 -1, MPI_CURRENT_TYPE->encode(), ""));
364 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
366 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
368 TRACE_smpi_comm_out(my_proc_id);
369 log_timed_action (action, clock);
372 static void action_reduce(const char *const *action)
374 CHECK_ACTION_PARAMS(action, 2, 2)
375 double comm_size = parse_double(action[2]);
376 double comp_size = parse_double(action[3]);
377 double clock = smpi_process()->simulated_elapsed();
378 int root = (action[4]) ? std::stoi(action[4]) : 0;
380 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
382 int my_proc_id = Actor::self()->getPid();
383 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
384 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
385 comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
387 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
388 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
389 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
390 smpi_execute_flops(comp_size);
392 TRACE_smpi_comm_out(my_proc_id);
393 log_timed_action (action, clock);
396 static void action_allReduce(const char *const *action) {
397 CHECK_ACTION_PARAMS(action, 2, 1)
398 double comm_size = parse_double(action[2]);
399 double comp_size = parse_double(action[3]);
401 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
403 double clock = smpi_process()->simulated_elapsed();
404 int my_proc_id = Actor::self()->getPid();
405 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
406 MPI_CURRENT_TYPE->encode(), ""));
408 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
409 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
410 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
411 smpi_execute_flops(comp_size);
413 TRACE_smpi_comm_out(my_proc_id);
414 log_timed_action (action, clock);
417 static void action_allToAll(const char *const *action) {
418 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
419 double clock = smpi_process()->simulated_elapsed();
420 int comm_size = MPI_COMM_WORLD->size();
421 int send_size = parse_double(action[2]);
422 int recv_size = parse_double(action[3]);
423 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
424 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
426 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
427 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
429 int my_proc_id = Actor::self()->getPid();
430 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
431 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
432 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
434 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
436 TRACE_smpi_comm_out(my_proc_id);
437 log_timed_action (action, clock);
440 static void action_gather(const char *const *action) {
441 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
444 1) 68 is the sendcounts
445 2) 68 is the recvcounts
446 3) 0 is the root node
447 4) 0 is the send datatype id, see decode_datatype()
448 5) 0 is the recv datatype id, see decode_datatype()
450 CHECK_ACTION_PARAMS(action, 2, 3)
451 double clock = smpi_process()->simulated_elapsed();
452 int comm_size = MPI_COMM_WORLD->size();
453 int send_size = parse_double(action[2]);
454 int recv_size = parse_double(action[3]);
455 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
456 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
458 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
459 void *recv = nullptr;
460 int root = (action[4]) ? std::stoi(action[4]) : 0;
461 int rank = MPI_COMM_WORLD->rank();
464 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
466 TRACE_smpi_comm_in(rank, __FUNCTION__,
467 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
468 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
470 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
472 TRACE_smpi_comm_out(Actor::self()->getPid());
473 log_timed_action (action, clock);
476 static void action_scatter(const char* const* action)
478 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
481 1) 68 is the sendcounts
482 2) 68 is the recvcounts
483 3) 0 is the root node
484 4) 0 is the send datatype id, see decode_datatype()
485 5) 0 is the recv datatype id, see decode_datatype()
487 CHECK_ACTION_PARAMS(action, 2, 3)
488 double clock = smpi_process()->simulated_elapsed();
489 int comm_size = MPI_COMM_WORLD->size();
490 int send_size = parse_double(action[2]);
491 int recv_size = parse_double(action[3]);
492 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
493 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
495 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
496 void* recv = nullptr;
497 int root = (action[4]) ? std::stoi(action[4]) : 0;
498 int rank = MPI_COMM_WORLD->rank();
501 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
503 TRACE_smpi_comm_in(rank, __FUNCTION__,
504 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
505 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
507 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
509 TRACE_smpi_comm_out(Actor::self()->getPid());
510 log_timed_action(action, clock);
513 static void action_gatherv(const char *const *action) {
514 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
515 0 gather 68 68 10 10 10 0 0 0
517 1) 68 is the sendcount
518 2) 68 10 10 10 is the recvcounts
519 3) 0 is the root node
520 4) 0 is the send datatype id, see decode_datatype()
521 5) 0 is the recv datatype id, see decode_datatype()
523 double clock = smpi_process()->simulated_elapsed();
524 int comm_size = MPI_COMM_WORLD->size();
525 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
526 int send_size = parse_double(action[2]);
527 std::vector<int> disps(comm_size, 0);
528 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
530 MPI_Datatype MPI_CURRENT_TYPE =
531 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
532 MPI_Datatype MPI_CURRENT_TYPE2{
533 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
535 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
536 void *recv = nullptr;
537 for(int i=0;i<comm_size;i++) {
538 (*recvcounts)[i] = std::stoi(action[i + 3]);
540 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
542 int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
543 int rank = MPI_COMM_WORLD->rank();
546 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
548 TRACE_smpi_comm_in(rank, __FUNCTION__,
549 new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
550 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
552 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
555 TRACE_smpi_comm_out(Actor::self()->getPid());
556 log_timed_action (action, clock);
559 static void action_scatterv(const char* const* action)
561 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
562 0 gather 68 10 10 10 68 0 0 0
564 1) 68 10 10 10 is the sendcounts
565 2) 68 is the recvcount
566 3) 0 is the root node
567 4) 0 is the send datatype id, see decode_datatype()
568 5) 0 is the recv datatype id, see decode_datatype()
570 double clock = smpi_process()->simulated_elapsed();
571 int comm_size = MPI_COMM_WORLD->size();
572 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
573 int recv_size = parse_double(action[2 + comm_size]);
574 std::vector<int> disps(comm_size, 0);
575 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
577 MPI_Datatype MPI_CURRENT_TYPE =
578 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
579 MPI_Datatype MPI_CURRENT_TYPE2{
580 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
582 void* send = nullptr;
583 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
584 for (int i = 0; i < comm_size; i++) {
585 (*sendcounts)[i] = std::stoi(action[i + 2]);
587 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
589 int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
590 int rank = MPI_COMM_WORLD->rank();
593 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
595 TRACE_smpi_comm_in(rank, __FUNCTION__,
596 new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
597 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
599 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
602 TRACE_smpi_comm_out(Actor::self()->getPid());
603 log_timed_action(action, clock);
606 static void action_reducescatter(const char *const *action) {
607 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
608 0 reduceScatter 275427 275427 275427 204020 11346849 0
610 1) The first four values after the name of the action declare the recvcounts array
611 2) The value 11346849 is the amount of instructions
612 3) The last value corresponds to the datatype, see decode_datatype().
614 double clock = smpi_process()->simulated_elapsed();
615 int comm_size = MPI_COMM_WORLD->size();
616 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
617 int comp_size = parse_double(action[2+comm_size]);
618 int my_proc_id = Actor::self()->getPid();
619 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
620 MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
622 for(int i=0;i<comm_size;i++) {
623 recvcounts->push_back(std::stoi(action[i + 2]));
625 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
627 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
628 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
629 std::to_string(comp_size), /* ugly hack to print comp_size */
630 MPI_CURRENT_TYPE->encode()));
632 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
633 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
635 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
636 smpi_execute_flops(comp_size);
638 TRACE_smpi_comm_out(my_proc_id);
639 log_timed_action (action, clock);
642 static void action_allgather(const char *const *action) {
643 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
644 0 allGather 275427 275427
646 1) 275427 is the sendcount
647 2) 275427 is the recvcount
648 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
650 double clock = smpi_process()->simulated_elapsed();
652 CHECK_ACTION_PARAMS(action, 2, 2)
653 int sendcount = std::stoi(action[2]);
654 int recvcount = std::stoi(action[3]);
656 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
657 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
659 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
660 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
662 int my_proc_id = Actor::self()->getPid();
664 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
665 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
666 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
668 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
670 TRACE_smpi_comm_out(my_proc_id);
671 log_timed_action (action, clock);
674 static void action_allgatherv(const char *const *action) {
675 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
676 0 allGatherV 275427 275427 275427 275427 204020
678 1) 275427 is the sendcount
679 2) The next four elements declare the recvcounts array
680 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
682 double clock = smpi_process()->simulated_elapsed();
684 int comm_size = MPI_COMM_WORLD->size();
685 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
686 int sendcount = std::stoi(action[2]);
687 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
688 std::vector<int> disps(comm_size, 0);
690 int datatype_index = 0, disp_index = 0;
691 if (action[3 + 2 * comm_size]) { /* datatype + disp are specified */
692 datatype_index = 3 + comm_size;
693 disp_index = datatype_index + 1;
694 } else if (action[3 + 2 * comm_size]) { /* disps specified; datatype is not specified; use the default one */
696 disp_index = 3 + comm_size;
697 } else if (action[3 + comm_size]) { /* only datatype, no disp specified */
698 datatype_index = 3 + comm_size;
701 if (disp_index != 0) {
702 std::copy(action[disp_index], action[disp_index + comm_size], disps.begin());
705 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
706 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
708 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
710 for(int i=0;i<comm_size;i++) {
711 (*recvcounts)[i] = std::stoi(action[i + 3]);
713 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
714 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
716 int my_proc_id = Actor::self()->getPid();
718 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
719 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
720 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
722 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
725 TRACE_smpi_comm_out(my_proc_id);
726 log_timed_action (action, clock);
729 static void action_allToAllv(const char *const *action) {
730 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
731 0 allToAllV 100 1 7 10 12 100 1 70 10 5
733 1) 100 is the size of the send buffer *sizeof(int),
734 2) 1 7 10 12 is the sendcounts array
735 3) 100*sizeof(int) is the size of the receiver buffer
736 4) 1 70 10 5 is the recvcounts array
738 double clock = smpi_process()->simulated_elapsed();
740 int comm_size = MPI_COMM_WORLD->size();
741 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
742 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
743 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
744 std::vector<int> senddisps(comm_size, 0);
745 std::vector<int> recvdisps(comm_size, 0);
747 MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
748 ? decode_datatype(action[4 + 2 * comm_size])
750 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
751 ? decode_datatype(action[5 + 2 * comm_size])
754 int send_buf_size=parse_double(action[2]);
755 int recv_buf_size=parse_double(action[3+comm_size]);
756 int my_proc_id = Actor::self()->getPid();
757 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
758 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
760 for(int i=0;i<comm_size;i++) {
761 (*sendcounts)[i] = std::stoi(action[3 + i]);
762 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
764 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
765 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
767 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
768 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
769 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
771 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
772 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
774 TRACE_smpi_comm_out(my_proc_id);
775 log_timed_action (action, clock);
778 }} // namespace simgrid::smpi
780 /** @brief Only initialize the replay, don't do it for real */
781 void smpi_replay_init(int* argc, char*** argv)
783 simgrid::smpi::Process::init(argc, argv);
784 smpi_process()->mark_as_initialized();
785 smpi_process()->set_replaying(true);
787 int my_proc_id = Actor::self()->getPid();
788 TRACE_smpi_init(my_proc_id);
789 TRACE_smpi_computing_init(my_proc_id);
790 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
791 TRACE_smpi_comm_out(my_proc_id);
792 xbt_replay_action_register("init", simgrid::smpi::action_init);
793 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
794 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
795 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
796 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
797 xbt_replay_action_register("send", simgrid::smpi::action_send);
798 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
799 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
800 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
801 xbt_replay_action_register("test", simgrid::smpi::action_test);
802 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
803 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
804 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
805 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
806 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
807 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
808 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
809 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
810 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
811 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
812 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
813 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
814 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
815 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
816 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
817 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
819 //if we have a delayed start, sleep here.
821 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
822 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
823 smpi_execute_flops(value);
825 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
826 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
827 smpi_execute_flops(0.0);
831 /** @brief actually run the replay after initialization */
832 void smpi_replay_main(int* argc, char*** argv)
834 simgrid::xbt::replay_runner(*argc, *argv);
836 /* and now, finalize everything */
837 /* One active process will stop. Decrease the counter*/
838 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
839 if (not get_reqq_self()->empty()) {
840 unsigned int count_requests=get_reqq_self()->size();
841 MPI_Request requests[count_requests];
842 MPI_Status status[count_requests];
845 for (auto const& req : *get_reqq_self()) {
849 simgrid::smpi::Request::waitall(count_requests, requests, status);
851 delete get_reqq_self();
854 if(active_processes==0){
855 /* Last process alive speaking: end the simulated timer */
856 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
857 smpi_free_replay_tmp_buffers();
860 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
862 smpi_process()->finalize();
864 TRACE_smpi_comm_out(Actor::self()->getPid());
865 TRACE_smpi_finalize(Actor::self()->getPid());
868 /** @brief chain a replay initialization and a replay start */
869 void smpi_replay_run(int* argc, char*** argv)
871 smpi_replay_init(argc, argv);
872 smpi_replay_main(argc, argv);