1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
16 #include <unordered_map>
19 using simgrid::s4u::Actor;
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23 static int communicator_size = 0;
24 static int active_processes = 0;
25 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27 static MPI_Datatype MPI_DEFAULT_TYPE;
28 static MPI_Datatype MPI_CURRENT_TYPE;
30 static int sendbuffer_size = 0;
31 static char* sendbuffer = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer = nullptr;
35 static void log_timed_action (const char *const *action, double clock){
36 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
37 char *name = xbt_str_join_array(action, " ");
38 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
43 static std::vector<MPI_Request>* get_reqq_self()
45 return reqq.at(Actor::self()->getPid());
48 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 reqq.insert({Actor::self()->getPid(), mpi_request});
53 //allocate a single buffer for all sends, growing it if needed
54 void* smpi_get_tmp_sendbuffer(int size)
56 if (not smpi_process()->replaying())
57 return xbt_malloc(size);
58 if (sendbuffer_size<size){
59 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
65 //allocate a single buffer for all recv
66 void* smpi_get_tmp_recvbuffer(int size){
67 if (not smpi_process()->replaying())
68 return xbt_malloc(size);
69 if (recvbuffer_size<size){
70 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
76 void smpi_free_tmp_buffer(void* buf){
77 if (not smpi_process()->replaying())
82 static double parse_double(const char *string)
85 double value = strtod(string, &endptr);
87 THROWF(unknown_error, 0, "%s is not a double", string);
92 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
93 static MPI_Datatype decode_datatype(const char *const action)
95 switch(atoi(action)) {
118 return MPI_DEFAULT_TYPE;
123 const char* encode_datatype(MPI_Datatype datatype)
125 if (datatype==MPI_BYTE)
127 if(datatype==MPI_DOUBLE)
129 if(datatype==MPI_INT)
131 if(datatype==MPI_CHAR)
133 if(datatype==MPI_SHORT)
135 if(datatype==MPI_LONG)
137 if(datatype==MPI_FLOAT)
139 // default - not implemented.
140 // do not warn here as we pass in this function even for other trace formats
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
146 while(action[i]!=nullptr)\
149 THROWF(arg_error, 0, "%s replay failed.\n" \
150 "%d items were given on the line. First two should be process_id and action. " \
151 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
158 static void action_init(const char *const *action)
160 XBT_DEBUG("Initialize the counters");
161 CHECK_ACTION_PARAMS(action, 0, 1)
163 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
165 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
167 /* start a simulated timer */
168 smpi_process()->simulated_start();
169 /*initialize the number of active processes */
170 active_processes = smpi_process_count();
172 set_reqq_self(new std::vector<MPI_Request>);
175 static void action_finalize(const char *const *action)
180 static void action_comm_size(const char *const *action)
182 communicator_size = parse_double(action[2]);
183 log_timed_action (action, smpi_process()->simulated_elapsed());
186 static void action_comm_split(const char *const *action)
188 log_timed_action (action, smpi_process()->simulated_elapsed());
191 static void action_comm_dup(const char *const *action)
193 log_timed_action (action, smpi_process()->simulated_elapsed());
196 static void action_compute(const char *const *action)
198 CHECK_ACTION_PARAMS(action, 1, 0)
199 double clock = smpi_process()->simulated_elapsed();
200 double flops= parse_double(action[2]);
201 int my_proc_id = Actor::self()->getPid();
203 TRACE_smpi_computing_in(my_proc_id, flops);
204 smpi_execute_flops(flops);
205 TRACE_smpi_computing_out(my_proc_id);
207 log_timed_action (action, clock);
210 static void action_send(const char *const *action)
212 CHECK_ACTION_PARAMS(action, 2, 1)
213 int to = atoi(action[2]);
214 double size=parse_double(action[3]);
215 double clock = smpi_process()->simulated_elapsed();
217 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
219 int my_proc_id = Actor::self()->getPid();
220 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
222 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
223 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
224 if (not TRACE_smpi_view_internals())
225 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
227 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
229 TRACE_smpi_comm_out(my_proc_id);
231 log_timed_action(action, clock);
234 static void action_Isend(const char *const *action)
236 CHECK_ACTION_PARAMS(action, 2, 1)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process()->simulated_elapsed();
241 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
243 int my_proc_id = Actor::self()->getPid();
244 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
245 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
246 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
247 if (not TRACE_smpi_view_internals())
248 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
250 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
252 TRACE_smpi_comm_out(my_proc_id);
254 get_reqq_self()->push_back(request);
256 log_timed_action (action, clock);
259 static void action_recv(const char *const *action) {
260 CHECK_ACTION_PARAMS(action, 2, 1)
261 int from = atoi(action[2]);
262 double size=parse_double(action[3]);
263 double clock = smpi_process()->simulated_elapsed();
266 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
268 int my_proc_id = Actor::self()->getPid();
269 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
271 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
272 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
274 //unknown size from the receiver point of view
276 Request::probe(from, 0, MPI_COMM_WORLD, &status);
280 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
282 TRACE_smpi_comm_out(my_proc_id);
283 if (not TRACE_smpi_view_internals()) {
284 TRACE_smpi_recv(src_traced, my_proc_id, 0);
287 log_timed_action (action, clock);
290 static void action_Irecv(const char *const *action)
292 CHECK_ACTION_PARAMS(action, 2, 1)
293 int from = atoi(action[2]);
294 double size=parse_double(action[3]);
295 double clock = smpi_process()->simulated_elapsed();
297 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
299 int my_proc_id = Actor::self()->getPid();
300 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
301 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
303 //unknow size from the receiver pov
305 Request::probe(from, 0, MPI_COMM_WORLD, &status);
309 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
311 TRACE_smpi_comm_out(my_proc_id);
312 get_reqq_self()->push_back(request);
314 log_timed_action (action, clock);
317 static void action_test(const char* const* action)
319 CHECK_ACTION_PARAMS(action, 0, 0)
320 double clock = smpi_process()->simulated_elapsed();
323 MPI_Request request = get_reqq_self()->back();
324 get_reqq_self()->pop_back();
325 //if request is null here, this may mean that a previous test has succeeded
326 //Different times in traced application and replayed version may lead to this
327 //In this case, ignore the extra calls.
328 if(request!=nullptr){
329 int my_proc_id = Actor::self()->getPid();
330 TRACE_smpi_testing_in(my_proc_id);
332 int flag = Request::test(&request, &status);
334 XBT_DEBUG("MPI_Test result: %d", flag);
335 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
336 get_reqq_self()->push_back(request);
338 TRACE_smpi_testing_out(my_proc_id);
340 log_timed_action (action, clock);
343 static void action_wait(const char *const *action){
344 CHECK_ACTION_PARAMS(action, 0, 0)
345 double clock = smpi_process()->simulated_elapsed();
348 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
349 xbt_str_join_array(action," "));
350 MPI_Request request = get_reqq_self()->back();
351 get_reqq_self()->pop_back();
353 if (request==nullptr){
354 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
358 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
360 MPI_Group group = request->comm()->group();
361 int src_traced = group->rank(request->src());
362 int dst_traced = group->rank(request->dst());
363 int is_wait_for_receive = (request->flags() & RECV);
364 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
366 Request::wait(&request, &status);
368 TRACE_smpi_comm_out(rank);
369 if (is_wait_for_receive)
370 TRACE_smpi_recv(src_traced, dst_traced, 0);
371 log_timed_action (action, clock);
374 static void action_waitall(const char *const *action){
375 CHECK_ACTION_PARAMS(action, 0, 0)
376 double clock = smpi_process()->simulated_elapsed();
377 const unsigned int count_requests = get_reqq_self()->size();
379 if (count_requests>0) {
380 MPI_Status status[count_requests];
382 int my_proc_id_traced = Actor::self()->getPid();
383 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
384 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
385 int recvs_snd[count_requests];
386 int recvs_rcv[count_requests];
387 for (unsigned int i = 0; i < count_requests; i++) {
388 const auto& req = (*get_reqq_self())[i];
389 if (req && (req->flags() & RECV)) {
390 recvs_snd[i] = req->src();
391 recvs_rcv[i] = req->dst();
395 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
397 for (unsigned i = 0; i < count_requests; i++) {
398 if (recvs_snd[i]!=-100)
399 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
401 TRACE_smpi_comm_out(my_proc_id_traced);
403 log_timed_action (action, clock);
406 static void action_barrier(const char *const *action){
407 double clock = smpi_process()->simulated_elapsed();
408 int my_proc_id = Actor::self()->getPid();
409 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
411 Colls::barrier(MPI_COMM_WORLD);
413 TRACE_smpi_comm_out(my_proc_id);
414 log_timed_action (action, clock);
417 static void action_bcast(const char *const *action)
419 CHECK_ACTION_PARAMS(action, 1, 2)
420 double size = parse_double(action[2]);
421 double clock = smpi_process()->simulated_elapsed();
422 int root = (action[3]) ? atoi(action[3]) : 0;
423 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
424 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
426 int my_proc_id = Actor::self()->getPid();
427 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
428 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
429 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
431 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
433 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
435 TRACE_smpi_comm_out(my_proc_id);
436 log_timed_action (action, clock);
439 static void action_reduce(const char *const *action)
441 CHECK_ACTION_PARAMS(action, 2, 2)
442 double comm_size = parse_double(action[2]);
443 double comp_size = parse_double(action[3]);
444 double clock = smpi_process()->simulated_elapsed();
445 int root = (action[4]) ? atoi(action[4]) : 0;
447 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
449 int my_proc_id = Actor::self()->getPid();
450 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
451 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
452 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
454 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
455 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
457 smpi_execute_flops(comp_size);
459 TRACE_smpi_comm_out(my_proc_id);
460 log_timed_action (action, clock);
463 static void action_allReduce(const char *const *action) {
464 CHECK_ACTION_PARAMS(action, 2, 1)
465 double comm_size = parse_double(action[2]);
466 double comp_size = parse_double(action[3]);
468 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
470 double clock = smpi_process()->simulated_elapsed();
471 int my_proc_id = Actor::self()->getPid();
472 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
473 encode_datatype(MPI_CURRENT_TYPE), ""));
475 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
476 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
478 smpi_execute_flops(comp_size);
480 TRACE_smpi_comm_out(my_proc_id);
481 log_timed_action (action, clock);
484 static void action_allToAll(const char *const *action) {
485 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
486 double clock = smpi_process()->simulated_elapsed();
487 int comm_size = MPI_COMM_WORLD->size();
488 int send_size = parse_double(action[2]);
489 int recv_size = parse_double(action[3]);
490 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
491 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
493 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
494 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
496 int my_proc_id = Actor::self()->getPid();
497 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
498 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
499 encode_datatype(MPI_CURRENT_TYPE),
500 encode_datatype(MPI_CURRENT_TYPE2)));
502 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
504 TRACE_smpi_comm_out(my_proc_id);
505 log_timed_action (action, clock);
508 static void action_gather(const char *const *action) {
509 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
512 1) 68 is the sendcounts
513 2) 68 is the recvcounts
514 3) 0 is the root node
515 4) 0 is the send datatype id, see decode_datatype()
516 5) 0 is the recv datatype id, see decode_datatype()
518 CHECK_ACTION_PARAMS(action, 2, 3)
519 double clock = smpi_process()->simulated_elapsed();
520 int comm_size = MPI_COMM_WORLD->size();
521 int send_size = parse_double(action[2]);
522 int recv_size = parse_double(action[3]);
523 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
524 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
526 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
527 void *recv = nullptr;
528 int root = (action[4]) ? atoi(action[4]) : 0;
529 int rank = MPI_COMM_WORLD->rank();
532 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
534 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
535 encode_datatype(MPI_CURRENT_TYPE),
536 encode_datatype(MPI_CURRENT_TYPE2)));
538 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
540 TRACE_smpi_comm_out(Actor::self()->getPid());
541 log_timed_action (action, clock);
544 static void action_scatter(const char* const* action)
546 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
549 1) 68 is the sendcounts
550 2) 68 is the recvcounts
551 3) 0 is the root node
552 4) 0 is the send datatype id, see decode_datatype()
553 5) 0 is the recv datatype id, see decode_datatype()
555 CHECK_ACTION_PARAMS(action, 2, 3)
556 double clock = smpi_process()->simulated_elapsed();
557 int comm_size = MPI_COMM_WORLD->size();
558 int send_size = parse_double(action[2]);
559 int recv_size = parse_double(action[3]);
560 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
561 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
563 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
564 void* recv = nullptr;
565 int root = (action[4]) ? atoi(action[4]) : 0;
566 int rank = MPI_COMM_WORLD->rank();
569 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
571 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
572 encode_datatype(MPI_CURRENT_TYPE),
573 encode_datatype(MPI_CURRENT_TYPE2)));
575 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
577 TRACE_smpi_comm_out(Actor::self()->getPid());
578 log_timed_action(action, clock);
581 static void action_gatherv(const char *const *action) {
582 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
583 0 gather 68 68 10 10 10 0 0 0
585 1) 68 is the sendcount
586 2) 68 10 10 10 is the recvcounts
587 3) 0 is the root node
588 4) 0 is the send datatype id, see decode_datatype()
589 5) 0 is the recv datatype id, see decode_datatype()
591 double clock = smpi_process()->simulated_elapsed();
592 int comm_size = MPI_COMM_WORLD->size();
593 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
594 int send_size = parse_double(action[2]);
595 std::vector<int> disps(comm_size, 0);
596 int recvcounts[comm_size];
600 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
601 MPI_Datatype MPI_CURRENT_TYPE2{
602 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
604 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
605 void *recv = nullptr;
606 for(int i=0;i<comm_size;i++) {
607 recvcounts[i] = atoi(action[i+3]);
608 recv_sum=recv_sum+recvcounts[i];
611 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
612 int rank = MPI_COMM_WORLD->rank();
615 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
617 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
619 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
620 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
621 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
623 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps.data(), MPI_CURRENT_TYPE2, root,
626 TRACE_smpi_comm_out(Actor::self()->getPid());
627 log_timed_action (action, clock);
630 static void action_scatterv(const char* const* action)
632 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
633 0 gather 68 10 10 10 68 0 0 0
635 1) 68 10 10 10 is the sendcounts
636 2) 68 is the recvcount
637 3) 0 is the root node
638 4) 0 is the send datatype id, see decode_datatype()
639 5) 0 is the recv datatype id, see decode_datatype()
641 double clock = smpi_process()->simulated_elapsed();
642 int comm_size = MPI_COMM_WORLD->size();
643 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
644 int recv_size = parse_double(action[2 + comm_size]);
645 std::vector<int> disps(comm_size, 0);
646 int sendcounts[comm_size];
650 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
651 MPI_Datatype MPI_CURRENT_TYPE2{
652 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
654 void* send = nullptr;
655 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
656 for (int i = 0; i < comm_size; i++) {
657 sendcounts[i] = atoi(action[i + 2]);
658 send_sum += sendcounts[i];
661 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
662 int rank = MPI_COMM_WORLD->rank();
665 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
667 std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
669 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
670 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
671 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
673 Colls::scatterv(send, sendcounts, disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
675 TRACE_smpi_comm_out(Actor::self()->getPid());
676 log_timed_action(action, clock);
679 static void action_reducescatter(const char *const *action) {
680 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
681 0 reduceScatter 275427 275427 275427 204020 11346849 0
683 1) The first four values after the name of the action declare the recvcounts array
684 2) The value 11346849 is the amount of instructions
685 3) The last value corresponds to the datatype, see decode_datatype().
687 double clock = smpi_process()->simulated_elapsed();
688 int comm_size = MPI_COMM_WORLD->size();
689 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
690 int comp_size = parse_double(action[2+comm_size]);
691 int my_proc_id = Actor::self()->getPid();
692 std::vector<int>* trace_recvcounts = new std::vector<int>;
693 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
695 for(int i=0;i<comm_size;i++) {
696 trace_recvcounts->push_back(atoi(action[i + 2]));
698 int size{std::accumulate(trace_recvcounts->begin(), trace_recvcounts->end(), 0)};
700 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
701 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
702 std::to_string(comp_size), /* ugly hack to print comp_size */
703 encode_datatype(MPI_CURRENT_TYPE)));
705 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
706 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
708 Colls::reduce_scatter(sendbuf, recvbuf, trace_recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
709 smpi_execute_flops(comp_size);
711 TRACE_smpi_comm_out(my_proc_id);
712 log_timed_action (action, clock);
715 static void action_allgather(const char *const *action) {
716 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
717 0 allGather 275427 275427
719 1) 275427 is the sendcount
720 2) 275427 is the recvcount
721 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
723 double clock = smpi_process()->simulated_elapsed();
725 CHECK_ACTION_PARAMS(action, 2, 2)
726 int sendcount=atoi(action[2]);
727 int recvcount=atoi(action[3]);
729 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
730 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
732 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
733 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
735 int my_proc_id = Actor::self()->getPid();
737 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
738 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
739 encode_datatype(MPI_CURRENT_TYPE),
740 encode_datatype(MPI_CURRENT_TYPE2)));
742 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
744 TRACE_smpi_comm_out(my_proc_id);
745 log_timed_action (action, clock);
748 static void action_allgatherv(const char *const *action) {
749 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
750 0 allGatherV 275427 275427 275427 275427 204020
752 1) 275427 is the sendcount
753 2) The next four elements declare the recvcounts array
754 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
756 double clock = smpi_process()->simulated_elapsed();
758 int comm_size = MPI_COMM_WORLD->size();
759 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
760 int sendcount=atoi(action[2]);
761 int recvcounts[comm_size];
762 std::vector<int> disps(comm_size, 0);
766 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
767 MPI_Datatype MPI_CURRENT_TYPE2{
768 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
770 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
772 for(int i=0;i<comm_size;i++) {
773 recvcounts[i] = atoi(action[i+3]);
774 recv_sum=recv_sum+recvcounts[i];
776 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
778 int my_proc_id = Actor::self()->getPid();
780 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
782 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
783 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
784 encode_datatype(MPI_CURRENT_TYPE),
785 encode_datatype(MPI_CURRENT_TYPE2)));
787 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
790 TRACE_smpi_comm_out(my_proc_id);
791 log_timed_action (action, clock);
794 static void action_allToAllv(const char *const *action) {
795 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
796 0 allToAllV 100 1 7 10 12 100 1 70 10 5
798 1) 100 is the size of the send buffer *sizeof(int),
799 2) 1 7 10 12 is the sendcounts array
800 3) 100*sizeof(int) is the size of the receiver buffer
801 4) 1 70 10 5 is the recvcounts array
803 double clock = smpi_process()->simulated_elapsed();
805 int comm_size = MPI_COMM_WORLD->size();
806 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
809 int sendcounts[comm_size];
810 std::vector<int>* trace_sendcounts = new std::vector<int>;
811 int recvcounts[comm_size];
812 std::vector<int>* trace_recvcounts = new std::vector<int>;
813 std::vector<int> senddisps(comm_size, 0);
814 std::vector<int> recvdisps(comm_size, 0);
816 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
817 ? decode_datatype(action[4 + 2 * comm_size])
819 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
820 ? decode_datatype(action[5 + 2 * comm_size])
823 int send_buf_size=parse_double(action[2]);
824 int recv_buf_size=parse_double(action[3+comm_size]);
825 int my_proc_id = Actor::self()->getPid();
826 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
827 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
829 for(int i=0;i<comm_size;i++) {
830 sendcounts[i] = atoi(action[i+3]);
831 trace_sendcounts->push_back(sendcounts[i]);
832 send_size += sendcounts[i];
833 recvcounts[i] = atoi(action[i+4+comm_size]);
834 trace_recvcounts->push_back(recvcounts[i]);
835 recv_size += recvcounts[i];
838 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
839 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
840 trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
841 encode_datatype(MPI_CURRENT_TYPE2)));
843 Colls::alltoallv(sendbuf, sendcounts, senddisps.data(), MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps.data(),
844 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
846 TRACE_smpi_comm_out(my_proc_id);
847 log_timed_action (action, clock);
850 }} // namespace simgrid::smpi
852 /** @brief Only initialize the replay, don't do it for real */
853 void smpi_replay_init(int* argc, char*** argv)
855 simgrid::smpi::Process::init(argc, argv);
856 smpi_process()->mark_as_initialized();
857 smpi_process()->set_replaying(true);
859 int my_proc_id = Actor::self()->getPid();
860 TRACE_smpi_init(my_proc_id);
861 TRACE_smpi_computing_init(my_proc_id);
862 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
863 TRACE_smpi_comm_out(my_proc_id);
864 xbt_replay_action_register("init", simgrid::smpi::action_init);
865 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
866 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
867 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
868 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
869 xbt_replay_action_register("send", simgrid::smpi::action_send);
870 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
871 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
872 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
873 xbt_replay_action_register("test", simgrid::smpi::action_test);
874 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
875 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
876 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
877 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
878 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
879 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
880 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
881 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
882 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
883 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
884 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
885 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
886 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
887 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
888 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
889 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
891 //if we have a delayed start, sleep here.
893 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
894 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
895 smpi_execute_flops(value);
897 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
898 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
899 smpi_execute_flops(0.0);
903 /** @brief actually run the replay after initialization */
904 void smpi_replay_main(int* argc, char*** argv)
906 simgrid::xbt::replay_runner(*argc, *argv);
908 /* and now, finalize everything */
909 /* One active process will stop. Decrease the counter*/
910 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
911 if (not get_reqq_self()->empty()) {
912 unsigned int count_requests=get_reqq_self()->size();
913 MPI_Request requests[count_requests];
914 MPI_Status status[count_requests];
917 for (auto const& req : *get_reqq_self()) {
921 simgrid::smpi::Request::waitall(count_requests, requests, status);
923 delete get_reqq_self();
926 if(active_processes==0){
927 /* Last process alive speaking: end the simulated timer */
928 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
929 xbt_free(sendbuffer);
930 xbt_free(recvbuffer);
933 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
935 smpi_process()->finalize();
937 TRACE_smpi_comm_out(Actor::self()->getPid());
938 TRACE_smpi_finalize(Actor::self()->getPid());
941 /** @brief chain a replay initialization and a replay start */
942 void smpi_replay_run(int* argc, char*** argv)
944 smpi_replay_init(argc, argv);
945 smpi_replay_main(argc, argv);