1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
17 #include <unordered_map>
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 static int communicator_size = 0;
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
31 class ReplayActionArg {
35 static void log_timed_action (const char *const *action, double clock){
36 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
37 char *name = xbt_str_join_array(action, " ");
38 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
43 static std::vector<MPI_Request>* get_reqq_self()
45 return reqq.at(Actor::self()->getPid());
48 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 reqq.insert({Actor::self()->getPid(), mpi_request});
54 static double parse_double(const char *string)
57 double value = strtod(string, &endptr);
59 THROWF(unknown_error, 0, "%s is not a double", string);
64 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
65 static MPI_Datatype decode_datatype(const char *const action)
67 return simgrid::smpi::Datatype::decode(action);
70 const char* encode_datatype(MPI_Datatype datatype)
72 if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
75 return datatype->encode();
78 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
80 while(action[i]!=nullptr)\
83 THROWF(arg_error, 0, "%s replay failed.\n" \
84 "%d items were given on the line. First two should be process_id and action. " \
85 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
86 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
92 static void action_init(const char *const *action)
94 XBT_DEBUG("Initialize the counters");
95 CHECK_ACTION_PARAMS(action, 0, 1)
97 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
99 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
101 /* start a simulated timer */
102 smpi_process()->simulated_start();
103 /*initialize the number of active processes */
104 active_processes = smpi_process_count();
106 set_reqq_self(new std::vector<MPI_Request>);
109 static void action_finalize(const char *const *action)
114 static void action_comm_size(const char *const *action)
116 communicator_size = parse_double(action[2]);
117 log_timed_action (action, smpi_process()->simulated_elapsed());
120 static void action_comm_split(const char *const *action)
122 log_timed_action (action, smpi_process()->simulated_elapsed());
125 static void action_comm_dup(const char *const *action)
127 log_timed_action (action, smpi_process()->simulated_elapsed());
130 static void action_compute(const char *const *action)
132 CHECK_ACTION_PARAMS(action, 1, 0)
133 double clock = smpi_process()->simulated_elapsed();
134 double flops= parse_double(action[2]);
135 int my_proc_id = Actor::self()->getPid();
137 TRACE_smpi_computing_in(my_proc_id, flops);
138 smpi_execute_flops(flops);
139 TRACE_smpi_computing_out(my_proc_id);
141 log_timed_action (action, clock);
144 static void action_send(const char *const *action)
146 CHECK_ACTION_PARAMS(action, 2, 1)
147 int to = std::stoi(action[2]);
148 double size=parse_double(action[3]);
149 double clock = smpi_process()->simulated_elapsed();
151 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
153 int my_proc_id = Actor::self()->getPid();
154 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
156 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
157 new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
158 if (not TRACE_smpi_view_internals())
159 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
161 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
163 TRACE_smpi_comm_out(my_proc_id);
165 log_timed_action(action, clock);
168 static void action_Isend(const char *const *action)
170 CHECK_ACTION_PARAMS(action, 2, 1)
171 int to = std::stoi(action[2]);
172 double size=parse_double(action[3]);
173 double clock = smpi_process()->simulated_elapsed();
175 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
177 int my_proc_id = Actor::self()->getPid();
178 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
179 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
180 new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
181 if (not TRACE_smpi_view_internals())
182 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
184 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
186 TRACE_smpi_comm_out(my_proc_id);
188 get_reqq_self()->push_back(request);
190 log_timed_action (action, clock);
193 static void action_recv(const char *const *action) {
194 CHECK_ACTION_PARAMS(action, 2, 1)
195 int from = std::stoi(action[2]);
196 double size=parse_double(action[3]);
197 double clock = smpi_process()->simulated_elapsed();
200 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
202 int my_proc_id = Actor::self()->getPid();
203 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
205 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
206 new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
208 //unknown size from the receiver point of view
210 Request::probe(from, 0, MPI_COMM_WORLD, &status);
214 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
216 TRACE_smpi_comm_out(my_proc_id);
217 if (not TRACE_smpi_view_internals()) {
218 TRACE_smpi_recv(src_traced, my_proc_id, 0);
221 log_timed_action (action, clock);
224 static void action_Irecv(const char *const *action)
226 CHECK_ACTION_PARAMS(action, 2, 1)
227 int from = std::stoi(action[2]);
228 double size=parse_double(action[3]);
229 double clock = smpi_process()->simulated_elapsed();
231 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
233 int my_proc_id = Actor::self()->getPid();
234 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
235 new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
237 //unknow size from the receiver pov
239 Request::probe(from, 0, MPI_COMM_WORLD, &status);
243 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
245 TRACE_smpi_comm_out(my_proc_id);
246 get_reqq_self()->push_back(request);
248 log_timed_action (action, clock);
251 static void action_test(const char* const* action)
253 CHECK_ACTION_PARAMS(action, 0, 0)
254 double clock = smpi_process()->simulated_elapsed();
257 MPI_Request request = get_reqq_self()->back();
258 get_reqq_self()->pop_back();
259 //if request is null here, this may mean that a previous test has succeeded
260 //Different times in traced application and replayed version may lead to this
261 //In this case, ignore the extra calls.
262 if(request!=nullptr){
263 int my_proc_id = Actor::self()->getPid();
264 TRACE_smpi_testing_in(my_proc_id);
266 int flag = Request::test(&request, &status);
268 XBT_DEBUG("MPI_Test result: %d", flag);
269 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
270 get_reqq_self()->push_back(request);
272 TRACE_smpi_testing_out(my_proc_id);
274 log_timed_action (action, clock);
277 static void action_wait(const char *const *action){
278 CHECK_ACTION_PARAMS(action, 0, 0)
279 double clock = smpi_process()->simulated_elapsed();
282 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
283 xbt_str_join_array(action," "));
284 MPI_Request request = get_reqq_self()->back();
285 get_reqq_self()->pop_back();
287 if (request==nullptr){
288 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
292 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
294 MPI_Group group = request->comm()->group();
295 int src_traced = group->rank(request->src());
296 int dst_traced = group->rank(request->dst());
297 int is_wait_for_receive = (request->flags() & RECV);
298 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
300 Request::wait(&request, &status);
302 TRACE_smpi_comm_out(rank);
303 if (is_wait_for_receive)
304 TRACE_smpi_recv(src_traced, dst_traced, 0);
305 log_timed_action (action, clock);
308 static void action_waitall(const char *const *action){
309 CHECK_ACTION_PARAMS(action, 0, 0)
310 double clock = smpi_process()->simulated_elapsed();
311 const unsigned int count_requests = get_reqq_self()->size();
313 if (count_requests>0) {
314 MPI_Status status[count_requests];
316 int my_proc_id_traced = Actor::self()->getPid();
317 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
318 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
319 int recvs_snd[count_requests];
320 int recvs_rcv[count_requests];
321 for (unsigned int i = 0; i < count_requests; i++) {
322 const auto& req = (*get_reqq_self())[i];
323 if (req && (req->flags() & RECV)) {
324 recvs_snd[i] = req->src();
325 recvs_rcv[i] = req->dst();
329 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
331 for (unsigned i = 0; i < count_requests; i++) {
332 if (recvs_snd[i]!=-100)
333 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
335 TRACE_smpi_comm_out(my_proc_id_traced);
337 log_timed_action (action, clock);
340 static void action_barrier(const char *const *action){
341 double clock = smpi_process()->simulated_elapsed();
342 int my_proc_id = Actor::self()->getPid();
343 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
345 Colls::barrier(MPI_COMM_WORLD);
347 TRACE_smpi_comm_out(my_proc_id);
348 log_timed_action (action, clock);
351 static void action_bcast(const char *const *action)
353 CHECK_ACTION_PARAMS(action, 1, 2)
354 double size = parse_double(action[2]);
355 double clock = smpi_process()->simulated_elapsed();
356 int root = (action[3]) ? std::stoi(action[3]) : 0;
357 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
358 MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
360 int my_proc_id = Actor::self()->getPid();
361 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
362 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
363 -1, MPI_CURRENT_TYPE->encode(), ""));
365 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
367 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
369 TRACE_smpi_comm_out(my_proc_id);
370 log_timed_action (action, clock);
373 static void action_reduce(const char *const *action)
375 CHECK_ACTION_PARAMS(action, 2, 2)
376 double comm_size = parse_double(action[2]);
377 double comp_size = parse_double(action[3]);
378 double clock = smpi_process()->simulated_elapsed();
379 int root = (action[4]) ? std::stoi(action[4]) : 0;
381 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
383 int my_proc_id = Actor::self()->getPid();
384 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
385 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
386 comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
388 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
389 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
390 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
391 smpi_execute_flops(comp_size);
393 TRACE_smpi_comm_out(my_proc_id);
394 log_timed_action (action, clock);
397 static void action_allReduce(const char *const *action) {
398 CHECK_ACTION_PARAMS(action, 2, 1)
399 double comm_size = parse_double(action[2]);
400 double comp_size = parse_double(action[3]);
402 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
404 double clock = smpi_process()->simulated_elapsed();
405 int my_proc_id = Actor::self()->getPid();
406 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
407 MPI_CURRENT_TYPE->encode(), ""));
409 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
410 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
411 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
412 smpi_execute_flops(comp_size);
414 TRACE_smpi_comm_out(my_proc_id);
415 log_timed_action (action, clock);
418 static void action_allToAll(const char *const *action) {
419 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
420 double clock = smpi_process()->simulated_elapsed();
421 int comm_size = MPI_COMM_WORLD->size();
422 int send_size = parse_double(action[2]);
423 int recv_size = parse_double(action[3]);
424 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
425 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
427 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
428 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
430 int my_proc_id = Actor::self()->getPid();
431 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
432 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
433 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
435 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
437 TRACE_smpi_comm_out(my_proc_id);
438 log_timed_action (action, clock);
441 static void action_gather(const char *const *action) {
442 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
445 1) 68 is the sendcounts
446 2) 68 is the recvcounts
447 3) 0 is the root node
448 4) 0 is the send datatype id, see decode_datatype()
449 5) 0 is the recv datatype id, see decode_datatype()
451 CHECK_ACTION_PARAMS(action, 2, 3)
452 double clock = smpi_process()->simulated_elapsed();
453 int comm_size = MPI_COMM_WORLD->size();
454 int send_size = parse_double(action[2]);
455 int recv_size = parse_double(action[3]);
456 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
457 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
459 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
460 void *recv = nullptr;
461 int root = (action[4]) ? std::stoi(action[4]) : 0;
462 int rank = MPI_COMM_WORLD->rank();
465 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
467 TRACE_smpi_comm_in(rank, __FUNCTION__,
468 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
469 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
471 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
473 TRACE_smpi_comm_out(Actor::self()->getPid());
474 log_timed_action (action, clock);
477 static void action_scatter(const char* const* action)
479 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
482 1) 68 is the sendcounts
483 2) 68 is the recvcounts
484 3) 0 is the root node
485 4) 0 is the send datatype id, see decode_datatype()
486 5) 0 is the recv datatype id, see decode_datatype()
488 CHECK_ACTION_PARAMS(action, 2, 3)
489 double clock = smpi_process()->simulated_elapsed();
490 int comm_size = MPI_COMM_WORLD->size();
491 int send_size = parse_double(action[2]);
492 int recv_size = parse_double(action[3]);
493 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
494 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
496 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
497 void* recv = nullptr;
498 int root = (action[4]) ? std::stoi(action[4]) : 0;
499 int rank = MPI_COMM_WORLD->rank();
502 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
504 TRACE_smpi_comm_in(rank, __FUNCTION__,
505 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
506 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
508 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
510 TRACE_smpi_comm_out(Actor::self()->getPid());
511 log_timed_action(action, clock);
514 static void action_gatherv(const char *const *action) {
515 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
516 0 gather 68 68 10 10 10 0 0 0
518 1) 68 is the sendcount
519 2) 68 10 10 10 is the recvcounts
520 3) 0 is the root node
521 4) 0 is the send datatype id, see decode_datatype()
522 5) 0 is the recv datatype id, see decode_datatype()
524 double clock = smpi_process()->simulated_elapsed();
525 int comm_size = MPI_COMM_WORLD->size();
526 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
527 int send_size = parse_double(action[2]);
528 std::vector<int> disps(comm_size, 0);
529 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
531 MPI_Datatype MPI_CURRENT_TYPE =
532 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
533 MPI_Datatype MPI_CURRENT_TYPE2{
534 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
536 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
537 void *recv = nullptr;
538 for(int i=0;i<comm_size;i++) {
539 (*recvcounts)[i] = std::stoi(action[i + 3]);
541 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
543 int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
544 int rank = MPI_COMM_WORLD->rank();
547 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
549 TRACE_smpi_comm_in(rank, __FUNCTION__,
550 new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
551 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
553 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
556 TRACE_smpi_comm_out(Actor::self()->getPid());
557 log_timed_action (action, clock);
560 static void action_scatterv(const char* const* action)
562 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
563 0 gather 68 10 10 10 68 0 0 0
565 1) 68 10 10 10 is the sendcounts
566 2) 68 is the recvcount
567 3) 0 is the root node
568 4) 0 is the send datatype id, see decode_datatype()
569 5) 0 is the recv datatype id, see decode_datatype()
571 double clock = smpi_process()->simulated_elapsed();
572 int comm_size = MPI_COMM_WORLD->size();
573 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
574 int recv_size = parse_double(action[2 + comm_size]);
575 std::vector<int> disps(comm_size, 0);
576 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
578 MPI_Datatype MPI_CURRENT_TYPE =
579 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
580 MPI_Datatype MPI_CURRENT_TYPE2{
581 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
583 void* send = nullptr;
584 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
585 for (int i = 0; i < comm_size; i++) {
586 (*sendcounts)[i] = std::stoi(action[i + 2]);
588 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
590 int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
591 int rank = MPI_COMM_WORLD->rank();
594 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
596 TRACE_smpi_comm_in(rank, __FUNCTION__,
597 new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
598 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
600 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
603 TRACE_smpi_comm_out(Actor::self()->getPid());
604 log_timed_action(action, clock);
607 static void action_reducescatter(const char *const *action) {
608 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
609 0 reduceScatter 275427 275427 275427 204020 11346849 0
611 1) The first four values after the name of the action declare the recvcounts array
612 2) The value 11346849 is the amount of instructions
613 3) The last value corresponds to the datatype, see decode_datatype().
615 double clock = smpi_process()->simulated_elapsed();
616 int comm_size = MPI_COMM_WORLD->size();
617 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
618 int comp_size = parse_double(action[2+comm_size]);
619 int my_proc_id = Actor::self()->getPid();
620 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
621 MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
623 for(int i=0;i<comm_size;i++) {
624 recvcounts->push_back(std::stoi(action[i + 2]));
626 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
628 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
629 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
630 std::to_string(comp_size), /* ugly hack to print comp_size */
631 MPI_CURRENT_TYPE->encode()));
633 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
634 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
636 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
637 smpi_execute_flops(comp_size);
639 TRACE_smpi_comm_out(my_proc_id);
640 log_timed_action (action, clock);
643 static void action_allgather(const char *const *action) {
644 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
645 0 allGather 275427 275427
647 1) 275427 is the sendcount
648 2) 275427 is the recvcount
649 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
651 double clock = smpi_process()->simulated_elapsed();
653 CHECK_ACTION_PARAMS(action, 2, 2)
654 int sendcount = std::stoi(action[2]);
655 int recvcount = std::stoi(action[3]);
657 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
658 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
660 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
661 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
663 int my_proc_id = Actor::self()->getPid();
665 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
666 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
667 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
669 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
671 TRACE_smpi_comm_out(my_proc_id);
672 log_timed_action (action, clock);
675 static void action_allgatherv(const char *const *action) {
676 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
677 0 allGatherV 275427 275427 275427 275427 204020
679 1) 275427 is the sendcount
680 2) The next four elements declare the recvcounts array
681 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
683 double clock = smpi_process()->simulated_elapsed();
685 int comm_size = MPI_COMM_WORLD->size();
686 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
687 int sendcount = std::stoi(action[2]);
688 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
689 std::vector<int> disps(comm_size, 0);
691 int datatype_index = 0, disp_index = 0;
692 if (action[3 + 2 * comm_size]) { /* datatype + disp are specified */
693 datatype_index = 3 + comm_size;
694 disp_index = datatype_index + 1;
695 } else if (action[3 + 2 * comm_size]) { /* disps specified; datatype is not specified; use the default one */
697 disp_index = 3 + comm_size;
698 } else if (action[3 + comm_size]) { /* only datatype, no disp specified */
699 datatype_index = 3 + comm_size;
702 if (disp_index != 0) {
703 std::copy(action[disp_index], action[disp_index + comm_size], disps.begin());
706 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
707 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
709 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
711 for(int i=0;i<comm_size;i++) {
712 (*recvcounts)[i] = std::stoi(action[i + 3]);
714 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
715 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
717 int my_proc_id = Actor::self()->getPid();
719 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
720 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
721 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
723 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
726 TRACE_smpi_comm_out(my_proc_id);
727 log_timed_action (action, clock);
730 static void action_allToAllv(const char *const *action) {
731 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
732 0 allToAllV 100 1 7 10 12 100 1 70 10 5
734 1) 100 is the size of the send buffer *sizeof(int),
735 2) 1 7 10 12 is the sendcounts array
736 3) 100*sizeof(int) is the size of the receiver buffer
737 4) 1 70 10 5 is the recvcounts array
739 double clock = smpi_process()->simulated_elapsed();
741 int comm_size = MPI_COMM_WORLD->size();
742 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
743 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
744 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
745 std::vector<int> senddisps(comm_size, 0);
746 std::vector<int> recvdisps(comm_size, 0);
748 MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
749 ? decode_datatype(action[4 + 2 * comm_size])
751 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
752 ? decode_datatype(action[5 + 2 * comm_size])
755 int send_buf_size=parse_double(action[2]);
756 int recv_buf_size=parse_double(action[3+comm_size]);
757 int my_proc_id = Actor::self()->getPid();
758 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
759 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
761 for(int i=0;i<comm_size;i++) {
762 (*sendcounts)[i] = std::stoi(action[3 + i]);
763 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
765 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
766 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
768 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
769 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
770 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
772 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
773 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
775 TRACE_smpi_comm_out(my_proc_id);
776 log_timed_action (action, clock);
779 }} // namespace simgrid::smpi
781 /** @brief Only initialize the replay, don't do it for real */
782 void smpi_replay_init(int* argc, char*** argv)
784 simgrid::smpi::Process::init(argc, argv);
785 smpi_process()->mark_as_initialized();
786 smpi_process()->set_replaying(true);
788 int my_proc_id = Actor::self()->getPid();
789 TRACE_smpi_init(my_proc_id);
790 TRACE_smpi_computing_init(my_proc_id);
791 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
792 TRACE_smpi_comm_out(my_proc_id);
793 xbt_replay_action_register("init", simgrid::smpi::action_init);
794 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
795 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
796 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
797 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
798 xbt_replay_action_register("send", simgrid::smpi::action_send);
799 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
800 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
801 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
802 xbt_replay_action_register("test", simgrid::smpi::action_test);
803 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
804 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
805 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
806 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
807 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
808 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
809 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
810 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
811 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
812 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
813 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
814 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
815 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
816 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
817 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
818 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
820 //if we have a delayed start, sleep here.
822 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
823 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
824 smpi_execute_flops(value);
826 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
827 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
828 smpi_execute_flops(0.0);
832 /** @brief actually run the replay after initialization */
833 void smpi_replay_main(int* argc, char*** argv)
835 simgrid::xbt::replay_runner(*argc, *argv);
837 /* and now, finalize everything */
838 /* One active process will stop. Decrease the counter*/
839 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
840 if (not get_reqq_self()->empty()) {
841 unsigned int count_requests=get_reqq_self()->size();
842 MPI_Request requests[count_requests];
843 MPI_Status status[count_requests];
846 for (auto const& req : *get_reqq_self()) {
850 simgrid::smpi::Request::waitall(count_requests, requests, status);
852 delete get_reqq_self();
855 if(active_processes==0){
856 /* Last process alive speaking: end the simulated timer */
857 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
858 smpi_free_replay_tmp_buffers();
861 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
863 smpi_process()->finalize();
865 TRACE_smpi_comm_out(Actor::self()->getPid());
866 TRACE_smpi_finalize(Actor::self()->getPid());
869 /** @brief chain a replay initialization and a replay start */
870 void smpi_replay_run(int* argc, char*** argv)
872 smpi_replay_init(argc, argv);
873 smpi_replay_main(argc, argv);