1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
16 #include <unordered_map>
19 using simgrid::s4u::Actor;
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23 static int communicator_size = 0;
24 static int active_processes = 0;
25 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27 static MPI_Datatype MPI_DEFAULT_TYPE;
28 static MPI_Datatype MPI_CURRENT_TYPE;
30 static int sendbuffer_size = 0;
31 static char* sendbuffer = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer = nullptr;
35 static void log_timed_action (const char *const *action, double clock){
36 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
37 char *name = xbt_str_join_array(action, " ");
38 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
43 static std::vector<MPI_Request>* get_reqq_self()
45 return reqq.at(Actor::self()->getPid());
48 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 reqq.insert({Actor::self()->getPid(), mpi_request});
53 //allocate a single buffer for all sends, growing it if needed
54 void* smpi_get_tmp_sendbuffer(int size)
56 if (not smpi_process()->replaying())
57 return xbt_malloc(size);
58 if (sendbuffer_size<size){
59 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
65 //allocate a single buffer for all recv
66 void* smpi_get_tmp_recvbuffer(int size){
67 if (not smpi_process()->replaying())
68 return xbt_malloc(size);
69 if (recvbuffer_size<size){
70 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
76 void smpi_free_tmp_buffer(void* buf){
77 if (not smpi_process()->replaying())
82 static double parse_double(const char *string)
85 double value = strtod(string, &endptr);
87 THROWF(unknown_error, 0, "%s is not a double", string);
92 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
93 static MPI_Datatype decode_datatype(const char *const action)
95 switch(atoi(action)) {
118 return MPI_DEFAULT_TYPE;
123 const char* encode_datatype(MPI_Datatype datatype)
125 if (datatype==MPI_BYTE)
127 if(datatype==MPI_DOUBLE)
129 if(datatype==MPI_INT)
131 if(datatype==MPI_CHAR)
133 if(datatype==MPI_SHORT)
135 if(datatype==MPI_LONG)
137 if(datatype==MPI_FLOAT)
139 // default - not implemented.
140 // do not warn here as we pass in this function even for other trace formats
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
146 while(action[i]!=nullptr)\
149 THROWF(arg_error, 0, "%s replay failed.\n" \
150 "%d items were given on the line. First two should be process_id and action. " \
151 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
158 static void action_init(const char *const *action)
160 XBT_DEBUG("Initialize the counters");
161 CHECK_ACTION_PARAMS(action, 0, 1)
163 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
165 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
167 /* start a simulated timer */
168 smpi_process()->simulated_start();
169 /*initialize the number of active processes */
170 active_processes = smpi_process_count();
172 set_reqq_self(new std::vector<MPI_Request>);
175 static void action_finalize(const char *const *action)
180 static void action_comm_size(const char *const *action)
182 communicator_size = parse_double(action[2]);
183 log_timed_action (action, smpi_process()->simulated_elapsed());
186 static void action_comm_split(const char *const *action)
188 log_timed_action (action, smpi_process()->simulated_elapsed());
191 static void action_comm_dup(const char *const *action)
193 log_timed_action (action, smpi_process()->simulated_elapsed());
196 static void action_compute(const char *const *action)
198 CHECK_ACTION_PARAMS(action, 1, 0)
199 double clock = smpi_process()->simulated_elapsed();
200 double flops= parse_double(action[2]);
201 int my_proc_id = Actor::self()->getPid();
203 TRACE_smpi_computing_in(my_proc_id, flops);
204 smpi_execute_flops(flops);
205 TRACE_smpi_computing_out(my_proc_id);
207 log_timed_action (action, clock);
210 static void action_send(const char *const *action)
212 CHECK_ACTION_PARAMS(action, 2, 1)
213 int to = atoi(action[2]);
214 double size=parse_double(action[3]);
215 double clock = smpi_process()->simulated_elapsed();
217 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
219 int my_proc_id = Actor::self()->getPid();
220 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
222 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
223 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
224 if (not TRACE_smpi_view_internals())
225 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
227 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
229 TRACE_smpi_comm_out(my_proc_id);
231 log_timed_action(action, clock);
234 static void action_Isend(const char *const *action)
236 CHECK_ACTION_PARAMS(action, 2, 1)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process()->simulated_elapsed();
241 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
243 int my_proc_id = Actor::self()->getPid();
244 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
245 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
246 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
247 if (not TRACE_smpi_view_internals())
248 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
250 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
252 TRACE_smpi_comm_out(my_proc_id);
254 get_reqq_self()->push_back(request);
256 log_timed_action (action, clock);
259 static void action_recv(const char *const *action) {
260 CHECK_ACTION_PARAMS(action, 2, 1)
261 int from = atoi(action[2]);
262 double size=parse_double(action[3]);
263 double clock = smpi_process()->simulated_elapsed();
266 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
268 int my_proc_id = Actor::self()->getPid();
269 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
271 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
272 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
274 //unknown size from the receiver point of view
276 Request::probe(from, 0, MPI_COMM_WORLD, &status);
280 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
282 TRACE_smpi_comm_out(my_proc_id);
283 if (not TRACE_smpi_view_internals()) {
284 TRACE_smpi_recv(src_traced, my_proc_id, 0);
287 log_timed_action (action, clock);
290 static void action_Irecv(const char *const *action)
292 CHECK_ACTION_PARAMS(action, 2, 1)
293 int from = atoi(action[2]);
294 double size=parse_double(action[3]);
295 double clock = smpi_process()->simulated_elapsed();
297 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
299 int my_proc_id = Actor::self()->getPid();
300 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
301 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
303 //unknow size from the receiver pov
305 Request::probe(from, 0, MPI_COMM_WORLD, &status);
309 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
311 TRACE_smpi_comm_out(my_proc_id);
312 get_reqq_self()->push_back(request);
314 log_timed_action (action, clock);
317 static void action_test(const char* const* action)
319 CHECK_ACTION_PARAMS(action, 0, 0)
320 double clock = smpi_process()->simulated_elapsed();
323 MPI_Request request = get_reqq_self()->back();
324 get_reqq_self()->pop_back();
325 //if request is null here, this may mean that a previous test has succeeded
326 //Different times in traced application and replayed version may lead to this
327 //In this case, ignore the extra calls.
328 if(request!=nullptr){
329 int my_proc_id = Actor::self()->getPid();
330 TRACE_smpi_testing_in(my_proc_id);
332 int flag = Request::test(&request, &status);
334 XBT_DEBUG("MPI_Test result: %d", flag);
335 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
336 get_reqq_self()->push_back(request);
338 TRACE_smpi_testing_out(my_proc_id);
340 log_timed_action (action, clock);
343 static void action_wait(const char *const *action){
344 CHECK_ACTION_PARAMS(action, 0, 0)
345 double clock = smpi_process()->simulated_elapsed();
348 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
349 xbt_str_join_array(action," "));
350 MPI_Request request = get_reqq_self()->back();
351 get_reqq_self()->pop_back();
353 if (request==nullptr){
354 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
358 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
360 MPI_Group group = request->comm()->group();
361 int src_traced = group->rank(request->src());
362 int dst_traced = group->rank(request->dst());
363 int is_wait_for_receive = (request->flags() & RECV);
364 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
366 Request::wait(&request, &status);
368 TRACE_smpi_comm_out(rank);
369 if (is_wait_for_receive)
370 TRACE_smpi_recv(src_traced, dst_traced, 0);
371 log_timed_action (action, clock);
374 static void action_waitall(const char *const *action){
375 CHECK_ACTION_PARAMS(action, 0, 0)
376 double clock = smpi_process()->simulated_elapsed();
377 const unsigned int count_requests = get_reqq_self()->size();
379 if (count_requests>0) {
380 MPI_Status status[count_requests];
382 int my_proc_id_traced = Actor::self()->getPid();
383 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
384 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
385 int recvs_snd[count_requests];
386 int recvs_rcv[count_requests];
387 for (unsigned int i = 0; i < count_requests; i++) {
388 const auto& req = (*get_reqq_self())[i];
389 if (req && (req->flags() & RECV)) {
390 recvs_snd[i] = req->src();
391 recvs_rcv[i] = req->dst();
395 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
397 for (unsigned i = 0; i < count_requests; i++) {
398 if (recvs_snd[i]!=-100)
399 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
401 TRACE_smpi_comm_out(my_proc_id_traced);
403 log_timed_action (action, clock);
406 static void action_barrier(const char *const *action){
407 double clock = smpi_process()->simulated_elapsed();
408 int my_proc_id = Actor::self()->getPid();
409 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
411 Colls::barrier(MPI_COMM_WORLD);
413 TRACE_smpi_comm_out(my_proc_id);
414 log_timed_action (action, clock);
417 static void action_bcast(const char *const *action)
419 CHECK_ACTION_PARAMS(action, 1, 2)
420 double size = parse_double(action[2]);
421 double clock = smpi_process()->simulated_elapsed();
422 int root = (action[3]) ? atoi(action[3]) : 0;
423 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
424 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
426 int my_proc_id = Actor::self()->getPid();
427 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
428 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
429 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
431 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
433 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
435 TRACE_smpi_comm_out(my_proc_id);
436 log_timed_action (action, clock);
439 static void action_reduce(const char *const *action)
441 CHECK_ACTION_PARAMS(action, 2, 2)
442 double comm_size = parse_double(action[2]);
443 double comp_size = parse_double(action[3]);
444 double clock = smpi_process()->simulated_elapsed();
445 int root = (action[4]) ? atoi(action[4]) : 0;
447 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
449 int my_proc_id = Actor::self()->getPid();
450 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
451 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
452 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
454 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
455 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
457 smpi_execute_flops(comp_size);
459 TRACE_smpi_comm_out(my_proc_id);
460 log_timed_action (action, clock);
463 static void action_allReduce(const char *const *action) {
464 CHECK_ACTION_PARAMS(action, 2, 1)
465 double comm_size = parse_double(action[2]);
466 double comp_size = parse_double(action[3]);
468 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
470 double clock = smpi_process()->simulated_elapsed();
471 int my_proc_id = Actor::self()->getPid();
472 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
473 encode_datatype(MPI_CURRENT_TYPE), ""));
475 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
476 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
478 smpi_execute_flops(comp_size);
480 TRACE_smpi_comm_out(my_proc_id);
481 log_timed_action (action, clock);
484 static void action_allToAll(const char *const *action) {
485 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
486 double clock = smpi_process()->simulated_elapsed();
487 int comm_size = MPI_COMM_WORLD->size();
488 int send_size = parse_double(action[2]);
489 int recv_size = parse_double(action[3]);
490 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
491 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
493 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
494 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
496 int my_proc_id = Actor::self()->getPid();
497 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
498 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
499 encode_datatype(MPI_CURRENT_TYPE),
500 encode_datatype(MPI_CURRENT_TYPE2)));
502 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
504 TRACE_smpi_comm_out(my_proc_id);
505 log_timed_action (action, clock);
508 static void action_gather(const char *const *action) {
509 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
512 1) 68 is the sendcounts
513 2) 68 is the recvcounts
514 3) 0 is the root node
515 4) 0 is the send datatype id, see decode_datatype()
516 5) 0 is the recv datatype id, see decode_datatype()
518 CHECK_ACTION_PARAMS(action, 2, 3)
519 double clock = smpi_process()->simulated_elapsed();
520 int comm_size = MPI_COMM_WORLD->size();
521 int send_size = parse_double(action[2]);
522 int recv_size = parse_double(action[3]);
523 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
524 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
526 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
527 void *recv = nullptr;
528 int root = (action[4]) ? atoi(action[4]) : 0;
529 int rank = MPI_COMM_WORLD->rank();
532 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
534 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
535 encode_datatype(MPI_CURRENT_TYPE),
536 encode_datatype(MPI_CURRENT_TYPE2)));
538 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
540 TRACE_smpi_comm_out(Actor::self()->getPid());
541 log_timed_action (action, clock);
544 static void action_scatter(const char* const* action)
546 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
549 1) 68 is the sendcounts
550 2) 68 is the recvcounts
551 3) 0 is the root node
552 4) 0 is the send datatype id, see decode_datatype()
553 5) 0 is the recv datatype id, see decode_datatype()
555 CHECK_ACTION_PARAMS(action, 2, 3)
556 double clock = smpi_process()->simulated_elapsed();
557 int comm_size = MPI_COMM_WORLD->size();
558 int send_size = parse_double(action[2]);
559 int recv_size = parse_double(action[3]);
560 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
561 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
563 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
564 void* recv = nullptr;
565 int root = (action[4]) ? atoi(action[4]) : 0;
566 int rank = MPI_COMM_WORLD->rank();
569 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
571 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
572 encode_datatype(MPI_CURRENT_TYPE),
573 encode_datatype(MPI_CURRENT_TYPE2)));
575 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
577 TRACE_smpi_comm_out(Actor::self()->getPid());
578 log_timed_action(action, clock);
581 static void action_gatherv(const char *const *action) {
582 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
583 0 gather 68 68 10 10 10 0 0 0
585 1) 68 is the sendcount
586 2) 68 10 10 10 is the recvcounts
587 3) 0 is the root node
588 4) 0 is the send datatype id, see decode_datatype()
589 5) 0 is the recv datatype id, see decode_datatype()
591 double clock = smpi_process()->simulated_elapsed();
592 int comm_size = MPI_COMM_WORLD->size();
593 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
594 int send_size = parse_double(action[2]);
595 int disps[comm_size];
596 int recvcounts[comm_size];
600 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
601 MPI_Datatype MPI_CURRENT_TYPE2{
602 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
604 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
605 void *recv = nullptr;
606 for(int i=0;i<comm_size;i++) {
607 recvcounts[i] = atoi(action[i+3]);
608 recv_sum=recv_sum+recvcounts[i];
612 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
613 int rank = MPI_COMM_WORLD->rank();
616 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
618 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
620 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
621 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
622 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
624 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
626 TRACE_smpi_comm_out(Actor::self()->getPid());
627 log_timed_action (action, clock);
630 static void action_scatterv(const char* const* action)
632 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
633 0 gather 68 10 10 10 68 0 0 0
635 1) 68 10 10 10 is the sendcounts
636 2) 68 is the recvcount
637 3) 0 is the root node
638 4) 0 is the send datatype id, see decode_datatype()
639 5) 0 is the recv datatype id, see decode_datatype()
641 double clock = smpi_process()->simulated_elapsed();
642 int comm_size = MPI_COMM_WORLD->size();
643 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
644 int recv_size = parse_double(action[2 + comm_size]);
645 int disps[comm_size];
646 int sendcounts[comm_size];
650 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
651 MPI_Datatype MPI_CURRENT_TYPE2{
652 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
654 void* send = nullptr;
655 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
656 for (int i = 0; i < comm_size; i++) {
657 sendcounts[i] = atoi(action[i + 2]);
658 send_sum += sendcounts[i];
662 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
663 int rank = MPI_COMM_WORLD->rank();
666 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
668 std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
670 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
671 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
672 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
674 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
676 TRACE_smpi_comm_out(Actor::self()->getPid());
677 log_timed_action(action, clock);
680 static void action_reducescatter(const char *const *action) {
681 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
682 0 reduceScatter 275427 275427 275427 204020 11346849 0
684 1) The first four values after the name of the action declare the recvcounts array
685 2) The value 11346849 is the amount of instructions
686 3) The last value corresponds to the datatype, see decode_datatype().
688 double clock = smpi_process()->simulated_elapsed();
689 int comm_size = MPI_COMM_WORLD->size();
690 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
691 int comp_size = parse_double(action[2+comm_size]);
692 int my_proc_id = Actor::self()->getPid();
693 std::vector<int>* trace_recvcounts = new std::vector<int>;
694 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
696 for(int i=0;i<comm_size;i++) {
697 trace_recvcounts->push_back(atoi(action[i + 2]));
699 int size{std::accumulate(trace_recvcounts->begin(), trace_recvcounts->end(), 0)};
701 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
702 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
703 std::to_string(comp_size), /* ugly hack to print comp_size */
704 encode_datatype(MPI_CURRENT_TYPE)));
706 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
707 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
709 Colls::reduce_scatter(sendbuf, recvbuf, trace_recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
710 smpi_execute_flops(comp_size);
712 TRACE_smpi_comm_out(my_proc_id);
713 log_timed_action (action, clock);
716 static void action_allgather(const char *const *action) {
717 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
718 0 allGather 275427 275427
720 1) 275427 is the sendcount
721 2) 275427 is the recvcount
722 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
724 double clock = smpi_process()->simulated_elapsed();
726 CHECK_ACTION_PARAMS(action, 2, 2)
727 int sendcount=atoi(action[2]);
728 int recvcount=atoi(action[3]);
730 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
731 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
733 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
734 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
736 int my_proc_id = Actor::self()->getPid();
738 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
739 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
740 encode_datatype(MPI_CURRENT_TYPE),
741 encode_datatype(MPI_CURRENT_TYPE2)));
743 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
745 TRACE_smpi_comm_out(my_proc_id);
746 log_timed_action (action, clock);
749 static void action_allgatherv(const char *const *action) {
750 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
751 0 allGatherV 275427 275427 275427 275427 204020
753 1) 275427 is the sendcount
754 2) The next four elements declare the recvcounts array
755 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
757 double clock = smpi_process()->simulated_elapsed();
759 int comm_size = MPI_COMM_WORLD->size();
760 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
761 int sendcount=atoi(action[2]);
762 int recvcounts[comm_size];
763 int disps[comm_size];
767 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
768 MPI_Datatype MPI_CURRENT_TYPE2{
769 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
771 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
773 for(int i=0;i<comm_size;i++) {
774 recvcounts[i] = atoi(action[i+3]);
775 recv_sum=recv_sum+recvcounts[i];
778 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
780 int my_proc_id = Actor::self()->getPid();
782 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
784 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
785 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
786 encode_datatype(MPI_CURRENT_TYPE),
787 encode_datatype(MPI_CURRENT_TYPE2)));
789 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
792 TRACE_smpi_comm_out(my_proc_id);
793 log_timed_action (action, clock);
796 static void action_allToAllv(const char *const *action) {
797 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
798 0 allToAllV 100 1 7 10 12 100 1 70 10 5
800 1) 100 is the size of the send buffer *sizeof(int),
801 2) 1 7 10 12 is the sendcounts array
802 3) 100*sizeof(int) is the size of the receiver buffer
803 4) 1 70 10 5 is the recvcounts array
805 double clock = smpi_process()->simulated_elapsed();
807 int comm_size = MPI_COMM_WORLD->size();
808 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
811 int sendcounts[comm_size];
812 std::vector<int>* trace_sendcounts = new std::vector<int>;
813 int recvcounts[comm_size];
814 std::vector<int>* trace_recvcounts = new std::vector<int>;
815 int senddisps[comm_size];
816 int recvdisps[comm_size];
818 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
819 ? decode_datatype(action[4 + 2 * comm_size])
821 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
822 ? decode_datatype(action[5 + 2 * comm_size])
825 int send_buf_size=parse_double(action[2]);
826 int recv_buf_size=parse_double(action[3+comm_size]);
827 int my_proc_id = Actor::self()->getPid();
828 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
829 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
831 for(int i=0;i<comm_size;i++) {
832 sendcounts[i] = atoi(action[i+3]);
833 trace_sendcounts->push_back(sendcounts[i]);
834 send_size += sendcounts[i];
835 recvcounts[i] = atoi(action[i+4+comm_size]);
836 trace_recvcounts->push_back(recvcounts[i]);
837 recv_size += recvcounts[i];
842 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
843 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
844 trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
845 encode_datatype(MPI_CURRENT_TYPE2)));
847 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
848 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
850 TRACE_smpi_comm_out(my_proc_id);
851 log_timed_action (action, clock);
854 }} // namespace simgrid::smpi
856 /** @brief Only initialize the replay, don't do it for real */
857 void smpi_replay_init(int* argc, char*** argv)
859 simgrid::smpi::Process::init(argc, argv);
860 smpi_process()->mark_as_initialized();
861 smpi_process()->set_replaying(true);
863 int my_proc_id = Actor::self()->getPid();
864 TRACE_smpi_init(my_proc_id);
865 TRACE_smpi_computing_init(my_proc_id);
866 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
867 TRACE_smpi_comm_out(my_proc_id);
868 xbt_replay_action_register("init", simgrid::smpi::action_init);
869 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
870 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
871 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
872 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
873 xbt_replay_action_register("send", simgrid::smpi::action_send);
874 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
875 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
876 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
877 xbt_replay_action_register("test", simgrid::smpi::action_test);
878 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
879 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
880 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
881 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
882 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
883 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
884 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
885 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
886 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
887 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
888 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
889 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
890 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
891 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
892 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
893 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
895 //if we have a delayed start, sleep here.
897 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
898 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
899 smpi_execute_flops(value);
901 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
902 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
903 smpi_execute_flops(0.0);
907 /** @brief actually run the replay after initialization */
908 void smpi_replay_main(int* argc, char*** argv)
910 simgrid::xbt::replay_runner(*argc, *argv);
912 /* and now, finalize everything */
913 /* One active process will stop. Decrease the counter*/
914 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
915 if (not get_reqq_self()->empty()) {
916 unsigned int count_requests=get_reqq_self()->size();
917 MPI_Request requests[count_requests];
918 MPI_Status status[count_requests];
921 for (auto const& req : *get_reqq_self()) {
925 simgrid::smpi::Request::waitall(count_requests, requests, status);
927 delete get_reqq_self();
930 if(active_processes==0){
931 /* Last process alive speaking: end the simulated timer */
932 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
933 xbt_free(sendbuffer);
934 xbt_free(recvbuffer);
937 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
939 smpi_process()->finalize();
941 TRACE_smpi_comm_out(Actor::self()->getPid());
942 TRACE_smpi_finalize(Actor::self()->getPid());
945 /** @brief chain a replay initialization and a replay start */
946 void smpi_replay_run(int* argc, char*** argv)
948 smpi_replay_init(argc, argv);
949 smpi_replay_main(argc, argv);