1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 int communicator_size = 0;
25 static int active_processes = 0;
26 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
28 MPI_Datatype MPI_DEFAULT_TYPE;
29 MPI_Datatype MPI_CURRENT_TYPE;
31 static int sendbuffer_size=0;
32 char* sendbuffer=nullptr;
33 static int recvbuffer_size=0;
34 char* recvbuffer=nullptr;
36 static void log_timed_action (const char *const *action, double clock){
37 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38 char *name = xbt_str_join_array(action, " ");
39 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
44 static std::vector<MPI_Request>* get_reqq_self()
46 return reqq.at(Actor::self()->getPid());
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
51 reqq.insert({Actor::self()->getPid(), mpi_request});
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
57 if (not smpi_process()->replaying())
58 return xbt_malloc(size);
59 if (sendbuffer_size<size){
60 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68 if (not smpi_process()->replaying())
69 return xbt_malloc(size);
70 if (recvbuffer_size<size){
71 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
77 void smpi_free_tmp_buffer(void* buf){
78 if (not smpi_process()->replaying())
83 static double parse_double(const char *string)
86 double value = strtod(string, &endptr);
88 THROWF(unknown_error, 0, "%s is not a double", string);
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
96 switch(atoi(action)) {
98 MPI_CURRENT_TYPE=MPI_DOUBLE;
101 MPI_CURRENT_TYPE=MPI_INT;
104 MPI_CURRENT_TYPE=MPI_CHAR;
107 MPI_CURRENT_TYPE=MPI_SHORT;
110 MPI_CURRENT_TYPE=MPI_LONG;
113 MPI_CURRENT_TYPE=MPI_FLOAT;
116 MPI_CURRENT_TYPE=MPI_BYTE;
119 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
122 return MPI_CURRENT_TYPE;
125 const char* encode_datatype(MPI_Datatype datatype)
127 if (datatype==MPI_BYTE)
129 if(datatype==MPI_DOUBLE)
131 if(datatype==MPI_INT)
133 if(datatype==MPI_CHAR)
135 if(datatype==MPI_SHORT)
137 if(datatype==MPI_LONG)
139 if(datatype==MPI_FLOAT)
141 // default - not implemented.
142 // do not warn here as we pass in this function even for other trace formats
146 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
148 while(action[i]!=nullptr)\
151 THROWF(arg_error, 0, "%s replay failed.\n" \
152 "%d items were given on the line. First two should be process_id and action. " \
153 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
154 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
160 static void action_init(const char *const *action)
162 XBT_DEBUG("Initialize the counters");
163 CHECK_ACTION_PARAMS(action, 0, 1)
165 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
167 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
169 /* start a simulated timer */
170 smpi_process()->simulated_start();
171 /*initialize the number of active processes */
172 active_processes = smpi_process_count();
174 set_reqq_self(new std::vector<MPI_Request>);
177 static void action_finalize(const char *const *action)
182 static void action_comm_size(const char *const *action)
184 communicator_size = parse_double(action[2]);
185 log_timed_action (action, smpi_process()->simulated_elapsed());
188 static void action_comm_split(const char *const *action)
190 log_timed_action (action, smpi_process()->simulated_elapsed());
193 static void action_comm_dup(const char *const *action)
195 log_timed_action (action, smpi_process()->simulated_elapsed());
198 static void action_compute(const char *const *action)
200 CHECK_ACTION_PARAMS(action, 1, 0)
201 double clock = smpi_process()->simulated_elapsed();
202 double flops= parse_double(action[2]);
203 int my_proc_id = Actor::self()->getPid();
205 TRACE_smpi_computing_in(my_proc_id, flops);
206 smpi_execute_flops(flops);
207 TRACE_smpi_computing_out(my_proc_id);
209 log_timed_action (action, clock);
212 static void action_send(const char *const *action)
214 CHECK_ACTION_PARAMS(action, 2, 1)
215 int to = atoi(action[2]);
216 double size=parse_double(action[3]);
217 double clock = smpi_process()->simulated_elapsed();
220 MPI_CURRENT_TYPE=decode_datatype(action[4]);
222 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
224 int my_proc_id = Actor::self()->getPid();
225 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
227 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
228 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
229 if (not TRACE_smpi_view_internals())
230 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
232 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
234 TRACE_smpi_comm_out(my_proc_id);
236 log_timed_action(action, clock);
239 static void action_Isend(const char *const *action)
241 CHECK_ACTION_PARAMS(action, 2, 1)
242 int to = atoi(action[2]);
243 double size=parse_double(action[3]);
244 double clock = smpi_process()->simulated_elapsed();
247 MPI_CURRENT_TYPE=decode_datatype(action[4]);
249 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
251 int my_proc_id = Actor::self()->getPid();
252 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
253 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
254 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
255 if (not TRACE_smpi_view_internals())
256 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
258 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
260 TRACE_smpi_comm_out(my_proc_id);
262 get_reqq_self()->push_back(request);
264 log_timed_action (action, clock);
267 static void action_recv(const char *const *action) {
268 CHECK_ACTION_PARAMS(action, 2, 1)
269 int from = atoi(action[2]);
270 double size=parse_double(action[3]);
271 double clock = smpi_process()->simulated_elapsed();
275 MPI_CURRENT_TYPE=decode_datatype(action[4]);
277 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
279 int my_proc_id = Actor::self()->getPid();
280 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
282 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
283 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
285 //unknown size from the receiver point of view
287 Request::probe(from, 0, MPI_COMM_WORLD, &status);
291 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
293 TRACE_smpi_comm_out(my_proc_id);
294 if (not TRACE_smpi_view_internals()) {
295 TRACE_smpi_recv(src_traced, my_proc_id, 0);
298 log_timed_action (action, clock);
301 static void action_Irecv(const char *const *action)
303 CHECK_ACTION_PARAMS(action, 2, 1)
304 int from = atoi(action[2]);
305 double size=parse_double(action[3]);
306 double clock = smpi_process()->simulated_elapsed();
309 MPI_CURRENT_TYPE=decode_datatype(action[4]);
311 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
313 int my_proc_id = Actor::self()->getPid();
314 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
315 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
317 //unknow size from the receiver pov
319 Request::probe(from, 0, MPI_COMM_WORLD, &status);
323 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
325 TRACE_smpi_comm_out(my_proc_id);
326 get_reqq_self()->push_back(request);
328 log_timed_action (action, clock);
331 static void action_test(const char* const* action)
333 CHECK_ACTION_PARAMS(action, 0, 0)
334 double clock = smpi_process()->simulated_elapsed();
337 MPI_Request request = get_reqq_self()->back();
338 get_reqq_self()->pop_back();
339 //if request is null here, this may mean that a previous test has succeeded
340 //Different times in traced application and replayed version may lead to this
341 //In this case, ignore the extra calls.
342 if(request!=nullptr){
343 int my_proc_id = Actor::self()->getPid();
344 TRACE_smpi_testing_in(my_proc_id);
346 int flag = Request::test(&request, &status);
348 XBT_DEBUG("MPI_Test result: %d", flag);
349 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
350 get_reqq_self()->push_back(request);
352 TRACE_smpi_testing_out(my_proc_id);
354 log_timed_action (action, clock);
357 static void action_wait(const char *const *action){
358 CHECK_ACTION_PARAMS(action, 0, 0)
359 double clock = smpi_process()->simulated_elapsed();
362 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
363 xbt_str_join_array(action," "));
364 MPI_Request request = get_reqq_self()->back();
365 get_reqq_self()->pop_back();
367 if (request==nullptr){
368 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
372 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
374 MPI_Group group = request->comm()->group();
375 int src_traced = group->rank(request->src());
376 int dst_traced = group->rank(request->dst());
377 int is_wait_for_receive = (request->flags() & RECV);
378 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
380 Request::wait(&request, &status);
382 TRACE_smpi_comm_out(rank);
383 if (is_wait_for_receive)
384 TRACE_smpi_recv(src_traced, dst_traced, 0);
385 log_timed_action (action, clock);
388 static void action_waitall(const char *const *action){
389 CHECK_ACTION_PARAMS(action, 0, 0)
390 double clock = smpi_process()->simulated_elapsed();
391 const unsigned int count_requests = get_reqq_self()->size();
393 if (count_requests>0) {
394 MPI_Status status[count_requests];
396 int my_proc_id_traced = Actor::self()->getPid();
397 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
398 int recvs_snd[count_requests];
399 int recvs_rcv[count_requests];
400 for (unsigned int i = 0; i < count_requests; i++) {
401 const auto& req = (*get_reqq_self())[i];
402 if (req && (req->flags () & RECV)){
403 recvs_snd[i]=req->src();
404 recvs_rcv[i]=req->dst();
408 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
410 for (unsigned i = 0; i < count_requests; i++) {
411 if (recvs_snd[i]!=-100)
412 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
414 TRACE_smpi_comm_out(my_proc_id_traced);
416 log_timed_action (action, clock);
419 static void action_barrier(const char *const *action){
420 double clock = smpi_process()->simulated_elapsed();
421 int my_proc_id = Actor::self()->getPid();
422 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
424 Colls::barrier(MPI_COMM_WORLD);
426 TRACE_smpi_comm_out(my_proc_id);
427 log_timed_action (action, clock);
430 static void action_bcast(const char *const *action)
432 CHECK_ACTION_PARAMS(action, 1, 2)
433 double size = parse_double(action[2]);
434 double clock = smpi_process()->simulated_elapsed();
436 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
437 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
440 root= atoi(action[3]);
442 MPI_CURRENT_TYPE=decode_datatype(action[4]);
445 int my_proc_id = Actor::self()->getPid();
446 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
447 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size, -1,
448 encode_datatype(MPI_CURRENT_TYPE), ""));
450 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
452 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
454 TRACE_smpi_comm_out(my_proc_id);
455 log_timed_action (action, clock);
458 static void action_reduce(const char *const *action)
460 CHECK_ACTION_PARAMS(action, 2, 2)
461 double comm_size = parse_double(action[2]);
462 double comp_size = parse_double(action[3]);
463 double clock = smpi_process()->simulated_elapsed();
465 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
468 root= atoi(action[4]);
470 MPI_CURRENT_TYPE=decode_datatype(action[5]);
473 int my_proc_id = Actor::self()->getPid();
474 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
475 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
476 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
478 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
479 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
481 smpi_execute_flops(comp_size);
483 TRACE_smpi_comm_out(my_proc_id);
484 log_timed_action (action, clock);
487 static void action_allReduce(const char *const *action) {
488 CHECK_ACTION_PARAMS(action, 2, 1)
489 double comm_size = parse_double(action[2]);
490 double comp_size = parse_double(action[3]);
493 MPI_CURRENT_TYPE=decode_datatype(action[4]);
495 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
497 double clock = smpi_process()->simulated_elapsed();
498 int my_proc_id = Actor::self()->getPid();
499 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
500 encode_datatype(MPI_CURRENT_TYPE), ""));
502 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
503 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
504 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
505 smpi_execute_flops(comp_size);
507 TRACE_smpi_comm_out(my_proc_id);
508 log_timed_action (action, clock);
511 static void action_allToAll(const char *const *action) {
512 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
513 double clock = smpi_process()->simulated_elapsed();
514 int comm_size = MPI_COMM_WORLD->size();
515 int send_size = parse_double(action[2]);
516 int recv_size = parse_double(action[3]);
517 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
519 if(action[4] && action[5]) {
520 MPI_CURRENT_TYPE=decode_datatype(action[4]);
521 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
524 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
526 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
527 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
529 int my_proc_id = Actor::self()->getPid();
530 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
531 encode_datatype(MPI_CURRENT_TYPE),
532 encode_datatype(MPI_CURRENT_TYPE2)));
534 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
536 TRACE_smpi_comm_out(my_proc_id);
537 log_timed_action (action, clock);
540 static void action_gather(const char *const *action) {
541 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
544 1) 68 is the sendcounts
545 2) 68 is the recvcounts
546 3) 0 is the root node
547 4) 0 is the send datatype id, see decode_datatype()
548 5) 0 is the recv datatype id, see decode_datatype()
550 CHECK_ACTION_PARAMS(action, 2, 3)
551 double clock = smpi_process()->simulated_elapsed();
552 int comm_size = MPI_COMM_WORLD->size();
553 int send_size = parse_double(action[2]);
554 int recv_size = parse_double(action[3]);
555 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
556 if(action[4] && action[5]) {
557 MPI_CURRENT_TYPE=decode_datatype(action[5]);
558 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
560 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
562 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
563 void *recv = nullptr;
566 root=atoi(action[4]);
567 int rank = MPI_COMM_WORLD->rank();
570 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
572 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573 encode_datatype(MPI_CURRENT_TYPE),
574 encode_datatype(MPI_CURRENT_TYPE2)));
576 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
578 TRACE_smpi_comm_out(Actor::self()->getPid());
579 log_timed_action (action, clock);
582 static void action_scatter(const char* const* action)
584 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
587 1) 68 is the sendcounts
588 2) 68 is the recvcounts
589 3) 0 is the root node
590 4) 0 is the send datatype id, see decode_datatype()
591 5) 0 is the recv datatype id, see decode_datatype()
593 CHECK_ACTION_PARAMS(action, 2, 3)
594 double clock = smpi_process()->simulated_elapsed();
595 int comm_size = MPI_COMM_WORLD->size();
596 int send_size = parse_double(action[2]);
597 int recv_size = parse_double(action[3]);
598 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
599 if (action[4] && action[5]) {
600 MPI_CURRENT_TYPE = decode_datatype(action[5]);
601 MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
603 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
605 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
606 void* recv = nullptr;
609 root = atoi(action[4]);
610 int rank = MPI_COMM_WORLD->rank();
613 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
615 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
616 encode_datatype(MPI_CURRENT_TYPE),
617 encode_datatype(MPI_CURRENT_TYPE2)));
619 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
621 TRACE_smpi_comm_out(Actor::self()->getPid());
622 log_timed_action(action, clock);
625 static void action_gatherv(const char *const *action) {
626 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
627 0 gather 68 68 10 10 10 0 0 0
629 1) 68 is the sendcount
630 2) 68 10 10 10 is the recvcounts
631 3) 0 is the root node
632 4) 0 is the send datatype id, see decode_datatype()
633 5) 0 is the recv datatype id, see decode_datatype()
635 double clock = smpi_process()->simulated_elapsed();
636 int comm_size = MPI_COMM_WORLD->size();
637 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
638 int send_size = parse_double(action[2]);
639 int disps[comm_size];
640 int recvcounts[comm_size];
643 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
644 if(action[4+comm_size] && action[5+comm_size]) {
645 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
646 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
648 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
650 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
651 void *recv = nullptr;
652 for(int i=0;i<comm_size;i++) {
653 recvcounts[i] = atoi(action[i+3]);
654 recv_sum=recv_sum+recvcounts[i];
658 int root=atoi(action[3+comm_size]);
659 int rank = MPI_COMM_WORLD->rank();
662 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
664 std::vector<int>* trace_recvcounts = new std::vector<int>;
665 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
666 trace_recvcounts->push_back(recvcounts[i]);
668 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
669 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
670 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
672 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
674 TRACE_smpi_comm_out(Actor::self()->getPid());
675 log_timed_action (action, clock);
678 static void action_scatterv(const char* const* action)
680 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
681 0 gather 68 10 10 10 68 0 0 0
683 1) 68 10 10 10 is the sendcounts
684 2) 68 is the recvcount
685 3) 0 is the root node
686 4) 0 is the send datatype id, see decode_datatype()
687 5) 0 is the recv datatype id, see decode_datatype()
689 double clock = smpi_process()->simulated_elapsed();
690 int comm_size = MPI_COMM_WORLD->size();
691 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
692 int recv_size = parse_double(action[2 + comm_size]);
693 int disps[comm_size];
694 int sendcounts[comm_size];
697 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
698 if (action[4 + comm_size] && action[5 + comm_size]) {
699 MPI_CURRENT_TYPE = decode_datatype(action[4 + comm_size]);
700 MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
702 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
704 void* send = nullptr;
705 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
706 for (int i = 0; i < comm_size; i++) {
707 sendcounts[i] = atoi(action[i + 2]);
708 send_sum += sendcounts[i];
712 int root = atoi(action[3 + comm_size]);
713 int rank = MPI_COMM_WORLD->rank();
716 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
718 std::vector<int>* trace_sendcounts = new std::vector<int>;
719 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
720 trace_sendcounts->push_back(sendcounts[i]);
722 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
723 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
724 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
726 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
728 TRACE_smpi_comm_out(Actor::self()->getPid());
729 log_timed_action(action, clock);
732 static void action_reducescatter(const char *const *action) {
733 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
734 0 reduceScatter 275427 275427 275427 204020 11346849 0
736 1) The first four values after the name of the action declare the recvcounts array
737 2) The value 11346849 is the amount of instructions
738 3) The last value corresponds to the datatype, see decode_datatype().
740 double clock = smpi_process()->simulated_elapsed();
741 int comm_size = MPI_COMM_WORLD->size();
742 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
743 int comp_size = parse_double(action[2+comm_size]);
744 int recvcounts[comm_size];
745 int my_proc_id = Actor::self()->getPid();
747 std::vector<int>* trace_recvcounts = new std::vector<int>;
748 if(action[3+comm_size])
749 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
751 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
753 for(int i=0;i<comm_size;i++) {
754 recvcounts[i] = atoi(action[i+2]);
755 trace_recvcounts->push_back(recvcounts[i]);
759 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
760 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
761 std::to_string(comp_size), /* ugly hack to print comp_size */
762 encode_datatype(MPI_CURRENT_TYPE)));
764 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
765 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
767 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
768 smpi_execute_flops(comp_size);
770 TRACE_smpi_comm_out(my_proc_id);
771 log_timed_action (action, clock);
774 static void action_allgather(const char *const *action) {
775 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
776 0 allGather 275427 275427
778 1) 275427 is the sendcount
779 2) 275427 is the recvcount
780 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
782 double clock = smpi_process()->simulated_elapsed();
784 CHECK_ACTION_PARAMS(action, 2, 2)
785 int sendcount=atoi(action[2]);
786 int recvcount=atoi(action[3]);
788 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
790 if(action[4] && action[5]) {
791 MPI_CURRENT_TYPE = decode_datatype(action[4]);
792 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
794 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
796 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
797 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
799 int my_proc_id = Actor::self()->getPid();
801 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
802 encode_datatype(MPI_CURRENT_TYPE),
803 encode_datatype(MPI_CURRENT_TYPE2)));
805 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
807 TRACE_smpi_comm_out(my_proc_id);
808 log_timed_action (action, clock);
811 static void action_allgatherv(const char *const *action) {
812 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
813 0 allGatherV 275427 275427 275427 275427 204020
815 1) 275427 is the sendcount
816 2) The next four elements declare the recvcounts array
817 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
819 double clock = smpi_process()->simulated_elapsed();
821 int comm_size = MPI_COMM_WORLD->size();
822 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
823 int sendcount=atoi(action[2]);
824 int recvcounts[comm_size];
825 int disps[comm_size];
827 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
829 if(action[3+comm_size] && action[4+comm_size]) {
830 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
831 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
833 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
835 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
837 for(int i=0;i<comm_size;i++) {
838 recvcounts[i] = atoi(action[i+3]);
839 recv_sum=recv_sum+recvcounts[i];
842 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
844 int my_proc_id = Actor::self()->getPid();
846 std::vector<int>* trace_recvcounts = new std::vector<int>;
847 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
848 trace_recvcounts->push_back(recvcounts[i]);
850 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::VarCollTIData(
851 "allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
852 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
854 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
857 TRACE_smpi_comm_out(my_proc_id);
858 log_timed_action (action, clock);
861 static void action_allToAllv(const char *const *action) {
862 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
863 0 allToAllV 100 1 7 10 12 100 1 70 10 5
865 1) 100 is the size of the send buffer *sizeof(int),
866 2) 1 7 10 12 is the sendcounts array
867 3) 100*sizeof(int) is the size of the receiver buffer
868 4) 1 70 10 5 is the recvcounts array
870 double clock = smpi_process()->simulated_elapsed();
872 int comm_size = MPI_COMM_WORLD->size();
873 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
876 int sendcounts[comm_size];
877 std::vector<int>* trace_sendcounts = new std::vector<int>;
878 int recvcounts[comm_size];
879 std::vector<int>* trace_recvcounts = new std::vector<int>;
880 int senddisps[comm_size];
881 int recvdisps[comm_size];
883 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
885 int send_buf_size=parse_double(action[2]);
886 int recv_buf_size=parse_double(action[3+comm_size]);
887 if(action[4+2*comm_size] && action[5+2*comm_size]) {
888 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
889 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
892 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
894 int my_proc_id = Actor::self()->getPid();
895 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
896 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
898 for(int i=0;i<comm_size;i++) {
899 sendcounts[i] = atoi(action[i+3]);
900 trace_sendcounts->push_back(sendcounts[i]);
901 send_size += sendcounts[i];
902 recvcounts[i] = atoi(action[i+4+comm_size]);
903 trace_recvcounts->push_back(recvcounts[i]);
904 recv_size += recvcounts[i];
909 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::VarCollTIData(
910 "allToAllV", -1, send_size, trace_sendcounts, recv_size, trace_recvcounts,
911 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
913 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
914 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
916 TRACE_smpi_comm_out(my_proc_id);
917 log_timed_action (action, clock);
920 }} // namespace simgrid::smpi
922 /** @brief Only initialize the replay, don't do it for real */
923 void smpi_replay_init(int* argc, char*** argv)
925 simgrid::smpi::Process::init(argc, argv);
926 smpi_process()->mark_as_initialized();
927 smpi_process()->set_replaying(true);
929 int my_proc_id = Actor::self()->getPid();
930 TRACE_smpi_init(my_proc_id);
931 TRACE_smpi_computing_init(my_proc_id);
932 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
933 TRACE_smpi_comm_out(my_proc_id);
934 xbt_replay_action_register("init", simgrid::smpi::action_init);
935 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
936 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
937 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
938 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
939 xbt_replay_action_register("send", simgrid::smpi::action_send);
940 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
941 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
942 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
943 xbt_replay_action_register("test", simgrid::smpi::action_test);
944 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
945 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
946 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
947 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
948 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
949 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
950 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
951 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
952 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
953 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
954 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
955 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
956 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
957 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
958 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
959 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
961 //if we have a delayed start, sleep here.
963 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
964 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
965 smpi_execute_flops(value);
967 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
968 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
969 smpi_execute_flops(0.0);
973 /** @brief actually run the replay after initialization */
974 void smpi_replay_main(int* argc, char*** argv)
976 simgrid::xbt::replay_runner(*argc, *argv);
978 /* and now, finalize everything */
979 /* One active process will stop. Decrease the counter*/
980 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
981 if (not get_reqq_self()->empty()) {
982 unsigned int count_requests=get_reqq_self()->size();
983 MPI_Request requests[count_requests];
984 MPI_Status status[count_requests];
987 for (auto const& req : *get_reqq_self()) {
991 simgrid::smpi::Request::waitall(count_requests, requests, status);
993 delete get_reqq_self();
996 if(active_processes==0){
997 /* Last process alive speaking: end the simulated timer */
998 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
999 xbt_free(sendbuffer);
1000 xbt_free(recvbuffer);
1003 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1005 smpi_process()->finalize();
1007 TRACE_smpi_comm_out(Actor::self()->getPid());
1008 TRACE_smpi_finalize(Actor::self()->getPid());
1011 /** @brief chain a replay initialization and a replay start */
1012 void smpi_replay_run(int* argc, char*** argv)
1014 smpi_replay_init(argc, argv);
1015 smpi_replay_main(argc, argv);