1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 int communicator_size = 0;
25 static int active_processes = 0;
26 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
28 MPI_Datatype MPI_DEFAULT_TYPE;
29 MPI_Datatype MPI_CURRENT_TYPE;
31 static int sendbuffer_size=0;
32 char* sendbuffer=nullptr;
33 static int recvbuffer_size=0;
34 char* recvbuffer=nullptr;
36 static void log_timed_action (const char *const *action, double clock){
37 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38 char *name = xbt_str_join_array(action, " ");
39 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
44 static std::vector<MPI_Request>* get_reqq_self()
46 return reqq.at(Actor::self()->getPid());
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
51 reqq.insert({Actor::self()->getPid(), mpi_request});
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
57 if (not smpi_process()->replaying())
58 return xbt_malloc(size);
59 if (sendbuffer_size<size){
60 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68 if (not smpi_process()->replaying())
69 return xbt_malloc(size);
70 if (recvbuffer_size<size){
71 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
77 void smpi_free_tmp_buffer(void* buf){
78 if (not smpi_process()->replaying())
83 static double parse_double(const char *string)
86 double value = strtod(string, &endptr);
88 THROWF(unknown_error, 0, "%s is not a double", string);
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
96 switch(atoi(action)) {
98 MPI_CURRENT_TYPE=MPI_DOUBLE;
101 MPI_CURRENT_TYPE=MPI_INT;
104 MPI_CURRENT_TYPE=MPI_CHAR;
107 MPI_CURRENT_TYPE=MPI_SHORT;
110 MPI_CURRENT_TYPE=MPI_LONG;
113 MPI_CURRENT_TYPE=MPI_FLOAT;
116 MPI_CURRENT_TYPE=MPI_BYTE;
119 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
122 return MPI_CURRENT_TYPE;
125 const char* encode_datatype(MPI_Datatype datatype)
127 if (datatype==MPI_BYTE)
129 if(datatype==MPI_DOUBLE)
131 if(datatype==MPI_INT)
133 if(datatype==MPI_CHAR)
135 if(datatype==MPI_SHORT)
137 if(datatype==MPI_LONG)
139 if(datatype==MPI_FLOAT)
141 // default - not implemented.
142 // do not warn here as we pass in this function even for other trace formats
146 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
148 while(action[i]!=nullptr)\
151 THROWF(arg_error, 0, "%s replay failed.\n" \
152 "%d items were given on the line. First two should be process_id and action. " \
153 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
154 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
160 static void action_init(const char *const *action)
162 XBT_DEBUG("Initialize the counters");
163 CHECK_ACTION_PARAMS(action, 0, 1)
165 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
167 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
169 /* start a simulated timer */
170 smpi_process()->simulated_start();
171 /*initialize the number of active processes */
172 active_processes = smpi_process_count();
174 set_reqq_self(new std::vector<MPI_Request>);
177 static void action_finalize(const char *const *action)
182 static void action_comm_size(const char *const *action)
184 communicator_size = parse_double(action[2]);
185 log_timed_action (action, smpi_process()->simulated_elapsed());
188 static void action_comm_split(const char *const *action)
190 log_timed_action (action, smpi_process()->simulated_elapsed());
193 static void action_comm_dup(const char *const *action)
195 log_timed_action (action, smpi_process()->simulated_elapsed());
198 static void action_compute(const char *const *action)
200 CHECK_ACTION_PARAMS(action, 1, 0)
201 double clock = smpi_process()->simulated_elapsed();
202 double flops= parse_double(action[2]);
203 int my_proc_id = Actor::self()->getPid();
205 TRACE_smpi_computing_in(my_proc_id, flops);
206 smpi_execute_flops(flops);
207 TRACE_smpi_computing_out(my_proc_id);
209 log_timed_action (action, clock);
212 static void action_send(const char *const *action)
214 CHECK_ACTION_PARAMS(action, 2, 1)
215 int to = atoi(action[2]);
216 double size=parse_double(action[3]);
217 double clock = smpi_process()->simulated_elapsed();
219 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
221 int my_proc_id = Actor::self()->getPid();
222 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
224 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
225 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
226 if (not TRACE_smpi_view_internals())
227 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
229 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
231 TRACE_smpi_comm_out(my_proc_id);
233 log_timed_action(action, clock);
236 static void action_Isend(const char *const *action)
238 CHECK_ACTION_PARAMS(action, 2, 1)
239 int to = atoi(action[2]);
240 double size=parse_double(action[3]);
241 double clock = smpi_process()->simulated_elapsed();
243 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
245 int my_proc_id = Actor::self()->getPid();
246 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
247 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
248 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
249 if (not TRACE_smpi_view_internals())
250 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
252 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
254 TRACE_smpi_comm_out(my_proc_id);
256 get_reqq_self()->push_back(request);
258 log_timed_action (action, clock);
261 static void action_recv(const char *const *action) {
262 CHECK_ACTION_PARAMS(action, 2, 1)
263 int from = atoi(action[2]);
264 double size=parse_double(action[3]);
265 double clock = smpi_process()->simulated_elapsed();
268 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
270 int my_proc_id = Actor::self()->getPid();
271 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
273 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
274 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
276 //unknown size from the receiver point of view
278 Request::probe(from, 0, MPI_COMM_WORLD, &status);
282 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
284 TRACE_smpi_comm_out(my_proc_id);
285 if (not TRACE_smpi_view_internals()) {
286 TRACE_smpi_recv(src_traced, my_proc_id, 0);
289 log_timed_action (action, clock);
292 static void action_Irecv(const char *const *action)
294 CHECK_ACTION_PARAMS(action, 2, 1)
295 int from = atoi(action[2]);
296 double size=parse_double(action[3]);
297 double clock = smpi_process()->simulated_elapsed();
299 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
301 int my_proc_id = Actor::self()->getPid();
302 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
303 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
305 //unknow size from the receiver pov
307 Request::probe(from, 0, MPI_COMM_WORLD, &status);
311 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
313 TRACE_smpi_comm_out(my_proc_id);
314 get_reqq_self()->push_back(request);
316 log_timed_action (action, clock);
319 static void action_test(const char* const* action)
321 CHECK_ACTION_PARAMS(action, 0, 0)
322 double clock = smpi_process()->simulated_elapsed();
325 MPI_Request request = get_reqq_self()->back();
326 get_reqq_self()->pop_back();
327 //if request is null here, this may mean that a previous test has succeeded
328 //Different times in traced application and replayed version may lead to this
329 //In this case, ignore the extra calls.
330 if(request!=nullptr){
331 int my_proc_id = Actor::self()->getPid();
332 TRACE_smpi_testing_in(my_proc_id);
334 int flag = Request::test(&request, &status);
336 XBT_DEBUG("MPI_Test result: %d", flag);
337 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
338 get_reqq_self()->push_back(request);
340 TRACE_smpi_testing_out(my_proc_id);
342 log_timed_action (action, clock);
345 static void action_wait(const char *const *action){
346 CHECK_ACTION_PARAMS(action, 0, 0)
347 double clock = smpi_process()->simulated_elapsed();
350 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
351 xbt_str_join_array(action," "));
352 MPI_Request request = get_reqq_self()->back();
353 get_reqq_self()->pop_back();
355 if (request==nullptr){
356 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
360 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
362 MPI_Group group = request->comm()->group();
363 int src_traced = group->rank(request->src());
364 int dst_traced = group->rank(request->dst());
365 int is_wait_for_receive = (request->flags() & RECV);
366 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
368 Request::wait(&request, &status);
370 TRACE_smpi_comm_out(rank);
371 if (is_wait_for_receive)
372 TRACE_smpi_recv(src_traced, dst_traced, 0);
373 log_timed_action (action, clock);
376 static void action_waitall(const char *const *action){
377 CHECK_ACTION_PARAMS(action, 0, 0)
378 double clock = smpi_process()->simulated_elapsed();
379 const unsigned int count_requests = get_reqq_self()->size();
381 if (count_requests>0) {
382 MPI_Status status[count_requests];
384 int my_proc_id_traced = Actor::self()->getPid();
385 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
386 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
387 int recvs_snd[count_requests];
388 int recvs_rcv[count_requests];
389 for (unsigned int i = 0; i < count_requests; i++) {
390 const auto& req = (*get_reqq_self())[i];
391 if (req && (req->flags() & RECV)) {
392 recvs_snd[i] = req->src();
393 recvs_rcv[i] = req->dst();
397 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
399 for (unsigned i = 0; i < count_requests; i++) {
400 if (recvs_snd[i]!=-100)
401 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
403 TRACE_smpi_comm_out(my_proc_id_traced);
405 log_timed_action (action, clock);
408 static void action_barrier(const char *const *action){
409 double clock = smpi_process()->simulated_elapsed();
410 int my_proc_id = Actor::self()->getPid();
411 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
413 Colls::barrier(MPI_COMM_WORLD);
415 TRACE_smpi_comm_out(my_proc_id);
416 log_timed_action (action, clock);
419 static void action_bcast(const char *const *action)
421 CHECK_ACTION_PARAMS(action, 1, 2)
422 double size = parse_double(action[2]);
423 double clock = smpi_process()->simulated_elapsed();
424 int root = (action[3]) ? atoi(action[3]) : 0;
425 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
426 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
428 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
430 int my_proc_id = Actor::self()->getPid();
431 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
432 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
433 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
435 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
437 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
439 TRACE_smpi_comm_out(my_proc_id);
440 log_timed_action (action, clock);
443 static void action_reduce(const char *const *action)
445 CHECK_ACTION_PARAMS(action, 2, 2)
446 double comm_size = parse_double(action[2]);
447 double comp_size = parse_double(action[3]);
448 double clock = smpi_process()->simulated_elapsed();
449 int root = (action[4]) ? atoi(action[4]) : 0;
451 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
453 int my_proc_id = Actor::self()->getPid();
454 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
455 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
456 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
458 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
459 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
460 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
461 smpi_execute_flops(comp_size);
463 TRACE_smpi_comm_out(my_proc_id);
464 log_timed_action (action, clock);
467 static void action_allReduce(const char *const *action) {
468 CHECK_ACTION_PARAMS(action, 2, 1)
469 double comm_size = parse_double(action[2]);
470 double comp_size = parse_double(action[3]);
472 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
474 double clock = smpi_process()->simulated_elapsed();
475 int my_proc_id = Actor::self()->getPid();
476 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
477 encode_datatype(MPI_CURRENT_TYPE), ""));
479 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
481 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
482 smpi_execute_flops(comp_size);
484 TRACE_smpi_comm_out(my_proc_id);
485 log_timed_action (action, clock);
488 static void action_allToAll(const char *const *action) {
489 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
490 double clock = smpi_process()->simulated_elapsed();
491 int comm_size = MPI_COMM_WORLD->size();
492 int send_size = parse_double(action[2]);
493 int recv_size = parse_double(action[3]);
494 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
495 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
497 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
498 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
500 int my_proc_id = Actor::self()->getPid();
501 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
502 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
503 encode_datatype(MPI_CURRENT_TYPE),
504 encode_datatype(MPI_CURRENT_TYPE2)));
506 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
508 TRACE_smpi_comm_out(my_proc_id);
509 log_timed_action (action, clock);
512 static void action_gather(const char *const *action) {
513 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
516 1) 68 is the sendcounts
517 2) 68 is the recvcounts
518 3) 0 is the root node
519 4) 0 is the send datatype id, see decode_datatype()
520 5) 0 is the recv datatype id, see decode_datatype()
522 CHECK_ACTION_PARAMS(action, 2, 3)
523 double clock = smpi_process()->simulated_elapsed();
524 int comm_size = MPI_COMM_WORLD->size();
525 int send_size = parse_double(action[2]);
526 int recv_size = parse_double(action[3]);
527 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
528 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
530 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
531 void *recv = nullptr;
532 int root = (action[4]) ? atoi(action[4]) : 0;
533 int rank = MPI_COMM_WORLD->rank();
536 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
538 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
539 encode_datatype(MPI_CURRENT_TYPE),
540 encode_datatype(MPI_CURRENT_TYPE2)));
542 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
544 TRACE_smpi_comm_out(Actor::self()->getPid());
545 log_timed_action (action, clock);
548 static void action_scatter(const char* const* action)
550 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
553 1) 68 is the sendcounts
554 2) 68 is the recvcounts
555 3) 0 is the root node
556 4) 0 is the send datatype id, see decode_datatype()
557 5) 0 is the recv datatype id, see decode_datatype()
559 CHECK_ACTION_PARAMS(action, 2, 3)
560 double clock = smpi_process()->simulated_elapsed();
561 int comm_size = MPI_COMM_WORLD->size();
562 int send_size = parse_double(action[2]);
563 int recv_size = parse_double(action[3]);
564 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
565 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
567 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
568 void* recv = nullptr;
569 int root = (action[4]) ? atoi(action[4]) : 0;
570 int rank = MPI_COMM_WORLD->rank();
573 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
575 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
576 encode_datatype(MPI_CURRENT_TYPE),
577 encode_datatype(MPI_CURRENT_TYPE2)));
579 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
581 TRACE_smpi_comm_out(Actor::self()->getPid());
582 log_timed_action(action, clock);
585 static void action_gatherv(const char *const *action) {
586 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
587 0 gather 68 68 10 10 10 0 0 0
589 1) 68 is the sendcount
590 2) 68 10 10 10 is the recvcounts
591 3) 0 is the root node
592 4) 0 is the send datatype id, see decode_datatype()
593 5) 0 is the recv datatype id, see decode_datatype()
595 double clock = smpi_process()->simulated_elapsed();
596 int comm_size = MPI_COMM_WORLD->size();
597 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
598 int send_size = parse_double(action[2]);
599 int disps[comm_size];
600 int recvcounts[comm_size];
604 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
605 MPI_Datatype MPI_CURRENT_TYPE2{
606 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
608 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
609 void *recv = nullptr;
610 for(int i=0;i<comm_size;i++) {
611 recvcounts[i] = atoi(action[i+3]);
612 recv_sum=recv_sum+recvcounts[i];
616 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
617 int rank = MPI_COMM_WORLD->rank();
620 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
622 std::vector<int>* trace_recvcounts = new std::vector<int>;
623 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
624 trace_recvcounts->push_back(recvcounts[i]);
626 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
627 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
628 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
630 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
632 TRACE_smpi_comm_out(Actor::self()->getPid());
633 log_timed_action (action, clock);
636 static void action_scatterv(const char* const* action)
638 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
639 0 gather 68 10 10 10 68 0 0 0
641 1) 68 10 10 10 is the sendcounts
642 2) 68 is the recvcount
643 3) 0 is the root node
644 4) 0 is the send datatype id, see decode_datatype()
645 5) 0 is the recv datatype id, see decode_datatype()
647 double clock = smpi_process()->simulated_elapsed();
648 int comm_size = MPI_COMM_WORLD->size();
649 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
650 int recv_size = parse_double(action[2 + comm_size]);
651 int disps[comm_size];
652 int sendcounts[comm_size];
656 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
657 MPI_Datatype MPI_CURRENT_TYPE2{
658 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
660 void* send = nullptr;
661 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
662 for (int i = 0; i < comm_size; i++) {
663 sendcounts[i] = atoi(action[i + 2]);
664 send_sum += sendcounts[i];
668 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
669 int rank = MPI_COMM_WORLD->rank();
672 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
674 std::vector<int>* trace_sendcounts = new std::vector<int>;
675 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
676 trace_sendcounts->push_back(sendcounts[i]);
678 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
679 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
680 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
682 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
684 TRACE_smpi_comm_out(Actor::self()->getPid());
685 log_timed_action(action, clock);
688 static void action_reducescatter(const char *const *action) {
689 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
690 0 reduceScatter 275427 275427 275427 204020 11346849 0
692 1) The first four values after the name of the action declare the recvcounts array
693 2) The value 11346849 is the amount of instructions
694 3) The last value corresponds to the datatype, see decode_datatype().
696 double clock = smpi_process()->simulated_elapsed();
697 int comm_size = MPI_COMM_WORLD->size();
698 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
699 int comp_size = parse_double(action[2+comm_size]);
700 int recvcounts[comm_size];
701 int my_proc_id = Actor::self()->getPid();
703 std::vector<int>* trace_recvcounts = new std::vector<int>;
704 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
706 for(int i=0;i<comm_size;i++) {
707 recvcounts[i] = atoi(action[i+2]);
708 trace_recvcounts->push_back(recvcounts[i]);
712 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
713 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
714 std::to_string(comp_size), /* ugly hack to print comp_size */
715 encode_datatype(MPI_CURRENT_TYPE)));
717 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
718 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
720 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
721 smpi_execute_flops(comp_size);
723 TRACE_smpi_comm_out(my_proc_id);
724 log_timed_action (action, clock);
727 static void action_allgather(const char *const *action) {
728 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
729 0 allGather 275427 275427
731 1) 275427 is the sendcount
732 2) 275427 is the recvcount
733 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
735 double clock = smpi_process()->simulated_elapsed();
737 CHECK_ACTION_PARAMS(action, 2, 2)
738 int sendcount=atoi(action[2]);
739 int recvcount=atoi(action[3]);
741 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
742 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
744 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
745 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
747 int my_proc_id = Actor::self()->getPid();
749 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
750 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
751 encode_datatype(MPI_CURRENT_TYPE),
752 encode_datatype(MPI_CURRENT_TYPE2)));
754 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
756 TRACE_smpi_comm_out(my_proc_id);
757 log_timed_action (action, clock);
760 static void action_allgatherv(const char *const *action) {
761 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
762 0 allGatherV 275427 275427 275427 275427 204020
764 1) 275427 is the sendcount
765 2) The next four elements declare the recvcounts array
766 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
768 double clock = smpi_process()->simulated_elapsed();
770 int comm_size = MPI_COMM_WORLD->size();
771 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
772 int sendcount=atoi(action[2]);
773 int recvcounts[comm_size];
774 int disps[comm_size];
778 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
779 MPI_Datatype MPI_CURRENT_TYPE2{
780 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
782 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
784 for(int i=0;i<comm_size;i++) {
785 recvcounts[i] = atoi(action[i+3]);
786 recv_sum=recv_sum+recvcounts[i];
789 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
791 int my_proc_id = Actor::self()->getPid();
793 std::vector<int>* trace_recvcounts = new std::vector<int>;
794 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
795 trace_recvcounts->push_back(recvcounts[i]);
797 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
798 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
799 encode_datatype(MPI_CURRENT_TYPE),
800 encode_datatype(MPI_CURRENT_TYPE2)));
802 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
805 TRACE_smpi_comm_out(my_proc_id);
806 log_timed_action (action, clock);
809 static void action_allToAllv(const char *const *action) {
810 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
811 0 allToAllV 100 1 7 10 12 100 1 70 10 5
813 1) 100 is the size of the send buffer *sizeof(int),
814 2) 1 7 10 12 is the sendcounts array
815 3) 100*sizeof(int) is the size of the receiver buffer
816 4) 1 70 10 5 is the recvcounts array
818 double clock = smpi_process()->simulated_elapsed();
820 int comm_size = MPI_COMM_WORLD->size();
821 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
824 int sendcounts[comm_size];
825 std::vector<int>* trace_sendcounts = new std::vector<int>;
826 int recvcounts[comm_size];
827 std::vector<int>* trace_recvcounts = new std::vector<int>;
828 int senddisps[comm_size];
829 int recvdisps[comm_size];
831 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
832 ? decode_datatype(action[4 + 2 * comm_size])
834 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
835 ? decode_datatype(action[5 + 2 * comm_size])
838 int send_buf_size=parse_double(action[2]);
839 int recv_buf_size=parse_double(action[3+comm_size]);
840 int my_proc_id = Actor::self()->getPid();
841 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
842 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
844 for(int i=0;i<comm_size;i++) {
845 sendcounts[i] = atoi(action[i+3]);
846 trace_sendcounts->push_back(sendcounts[i]);
847 send_size += sendcounts[i];
848 recvcounts[i] = atoi(action[i+4+comm_size]);
849 trace_recvcounts->push_back(recvcounts[i]);
850 recv_size += recvcounts[i];
855 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
856 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
857 trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
858 encode_datatype(MPI_CURRENT_TYPE2)));
860 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
861 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
863 TRACE_smpi_comm_out(my_proc_id);
864 log_timed_action (action, clock);
867 }} // namespace simgrid::smpi
869 /** @brief Only initialize the replay, don't do it for real */
870 void smpi_replay_init(int* argc, char*** argv)
872 simgrid::smpi::Process::init(argc, argv);
873 smpi_process()->mark_as_initialized();
874 smpi_process()->set_replaying(true);
876 int my_proc_id = Actor::self()->getPid();
877 TRACE_smpi_init(my_proc_id);
878 TRACE_smpi_computing_init(my_proc_id);
879 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
880 TRACE_smpi_comm_out(my_proc_id);
881 xbt_replay_action_register("init", simgrid::smpi::action_init);
882 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
883 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
884 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
885 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
886 xbt_replay_action_register("send", simgrid::smpi::action_send);
887 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
888 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
889 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
890 xbt_replay_action_register("test", simgrid::smpi::action_test);
891 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
892 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
893 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
894 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
895 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
896 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
897 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
898 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
899 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
900 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
901 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
902 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
903 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
904 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
905 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
906 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
908 //if we have a delayed start, sleep here.
910 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
911 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
912 smpi_execute_flops(value);
914 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
915 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
916 smpi_execute_flops(0.0);
920 /** @brief actually run the replay after initialization */
921 void smpi_replay_main(int* argc, char*** argv)
923 simgrid::xbt::replay_runner(*argc, *argv);
925 /* and now, finalize everything */
926 /* One active process will stop. Decrease the counter*/
927 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
928 if (not get_reqq_self()->empty()) {
929 unsigned int count_requests=get_reqq_self()->size();
930 MPI_Request requests[count_requests];
931 MPI_Status status[count_requests];
934 for (auto const& req : *get_reqq_self()) {
938 simgrid::smpi::Request::waitall(count_requests, requests, status);
940 delete get_reqq_self();
943 if(active_processes==0){
944 /* Last process alive speaking: end the simulated timer */
945 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
946 xbt_free(sendbuffer);
947 xbt_free(recvbuffer);
950 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
952 smpi_process()->finalize();
954 TRACE_smpi_comm_out(Actor::self()->getPid());
955 TRACE_smpi_finalize(Actor::self()->getPid());
958 /** @brief chain a replay initialization and a replay start */
959 void smpi_replay_run(int* argc, char*** argv)
961 smpi_replay_init(argc, argv);
962 smpi_replay_main(argc, argv);