1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
17 #include <unordered_map>
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 static int communicator_size = 0;
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 static int sendbuffer_size = 0;
31 static char* sendbuffer = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer = nullptr;
35 class ReplayActionArg {
39 static void log_timed_action (const char *const *action, double clock){
40 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
41 char *name = xbt_str_join_array(action, " ");
42 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
47 static std::vector<MPI_Request>* get_reqq_self()
49 return reqq.at(Actor::self()->getPid());
52 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
54 reqq.insert({Actor::self()->getPid(), mpi_request});
57 //allocate a single buffer for all sends, growing it if needed
58 void* smpi_get_tmp_sendbuffer(int size)
60 if (not smpi_process()->replaying())
61 return xbt_malloc(size);
62 if (sendbuffer_size<size){
63 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
69 //allocate a single buffer for all recv
70 void* smpi_get_tmp_recvbuffer(int size){
71 if (not smpi_process()->replaying())
72 return xbt_malloc(size);
73 if (recvbuffer_size<size){
74 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
80 void smpi_free_tmp_buffer(void* buf){
81 if (not smpi_process()->replaying())
86 static double parse_double(const char *string)
89 double value = strtod(string, &endptr);
91 THROWF(unknown_error, 0, "%s is not a double", string);
96 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
97 static MPI_Datatype decode_datatype(const char *const action)
99 switch(atoi(action)) {
122 return MPI_DEFAULT_TYPE;
127 const char* encode_datatype(MPI_Datatype datatype)
129 if (datatype==MPI_BYTE)
131 if(datatype==MPI_DOUBLE)
133 if(datatype==MPI_INT)
135 if(datatype==MPI_CHAR)
137 if(datatype==MPI_SHORT)
139 if(datatype==MPI_LONG)
141 if(datatype==MPI_FLOAT)
143 // default - not implemented.
144 // do not warn here as we pass in this function even for other trace formats
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
150 while(action[i]!=nullptr)\
153 THROWF(arg_error, 0, "%s replay failed.\n" \
154 "%d items were given on the line. First two should be process_id and action. " \
155 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
162 static void action_init(const char *const *action)
164 XBT_DEBUG("Initialize the counters");
165 CHECK_ACTION_PARAMS(action, 0, 1)
167 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
169 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
171 /* start a simulated timer */
172 smpi_process()->simulated_start();
173 /*initialize the number of active processes */
174 active_processes = smpi_process_count();
176 set_reqq_self(new std::vector<MPI_Request>);
179 static void action_finalize(const char *const *action)
184 static void action_comm_size(const char *const *action)
186 communicator_size = parse_double(action[2]);
187 log_timed_action (action, smpi_process()->simulated_elapsed());
190 static void action_comm_split(const char *const *action)
192 log_timed_action (action, smpi_process()->simulated_elapsed());
195 static void action_comm_dup(const char *const *action)
197 log_timed_action (action, smpi_process()->simulated_elapsed());
200 static void action_compute(const char *const *action)
202 CHECK_ACTION_PARAMS(action, 1, 0)
203 double clock = smpi_process()->simulated_elapsed();
204 double flops= parse_double(action[2]);
205 int my_proc_id = Actor::self()->getPid();
207 TRACE_smpi_computing_in(my_proc_id, flops);
208 smpi_execute_flops(flops);
209 TRACE_smpi_computing_out(my_proc_id);
211 log_timed_action (action, clock);
214 static void action_send(const char *const *action)
216 CHECK_ACTION_PARAMS(action, 2, 1)
217 int to = atoi(action[2]);
218 double size=parse_double(action[3]);
219 double clock = smpi_process()->simulated_elapsed();
221 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
223 int my_proc_id = Actor::self()->getPid();
224 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
226 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
227 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
228 if (not TRACE_smpi_view_internals())
229 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
231 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
233 TRACE_smpi_comm_out(my_proc_id);
235 log_timed_action(action, clock);
238 static void action_Isend(const char *const *action)
240 CHECK_ACTION_PARAMS(action, 2, 1)
241 int to = atoi(action[2]);
242 double size=parse_double(action[3]);
243 double clock = smpi_process()->simulated_elapsed();
245 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
247 int my_proc_id = Actor::self()->getPid();
248 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
249 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
250 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
251 if (not TRACE_smpi_view_internals())
252 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
254 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
256 TRACE_smpi_comm_out(my_proc_id);
258 get_reqq_self()->push_back(request);
260 log_timed_action (action, clock);
263 static void action_recv(const char *const *action) {
264 CHECK_ACTION_PARAMS(action, 2, 1)
265 int from = atoi(action[2]);
266 double size=parse_double(action[3]);
267 double clock = smpi_process()->simulated_elapsed();
270 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
272 int my_proc_id = Actor::self()->getPid();
273 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
275 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
276 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
278 //unknown size from the receiver point of view
280 Request::probe(from, 0, MPI_COMM_WORLD, &status);
284 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
286 TRACE_smpi_comm_out(my_proc_id);
287 if (not TRACE_smpi_view_internals()) {
288 TRACE_smpi_recv(src_traced, my_proc_id, 0);
291 log_timed_action (action, clock);
294 static void action_Irecv(const char *const *action)
296 CHECK_ACTION_PARAMS(action, 2, 1)
297 int from = atoi(action[2]);
298 double size=parse_double(action[3]);
299 double clock = smpi_process()->simulated_elapsed();
301 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
303 int my_proc_id = Actor::self()->getPid();
304 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
305 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
307 //unknow size from the receiver pov
309 Request::probe(from, 0, MPI_COMM_WORLD, &status);
313 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
315 TRACE_smpi_comm_out(my_proc_id);
316 get_reqq_self()->push_back(request);
318 log_timed_action (action, clock);
321 static void action_test(const char* const* action)
323 CHECK_ACTION_PARAMS(action, 0, 0)
324 double clock = smpi_process()->simulated_elapsed();
327 MPI_Request request = get_reqq_self()->back();
328 get_reqq_self()->pop_back();
329 //if request is null here, this may mean that a previous test has succeeded
330 //Different times in traced application and replayed version may lead to this
331 //In this case, ignore the extra calls.
332 if(request!=nullptr){
333 int my_proc_id = Actor::self()->getPid();
334 TRACE_smpi_testing_in(my_proc_id);
336 int flag = Request::test(&request, &status);
338 XBT_DEBUG("MPI_Test result: %d", flag);
339 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
340 get_reqq_self()->push_back(request);
342 TRACE_smpi_testing_out(my_proc_id);
344 log_timed_action (action, clock);
347 static void action_wait(const char *const *action){
348 CHECK_ACTION_PARAMS(action, 0, 0)
349 double clock = smpi_process()->simulated_elapsed();
352 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
353 xbt_str_join_array(action," "));
354 MPI_Request request = get_reqq_self()->back();
355 get_reqq_self()->pop_back();
357 if (request==nullptr){
358 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
362 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
364 MPI_Group group = request->comm()->group();
365 int src_traced = group->rank(request->src());
366 int dst_traced = group->rank(request->dst());
367 int is_wait_for_receive = (request->flags() & RECV);
368 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
370 Request::wait(&request, &status);
372 TRACE_smpi_comm_out(rank);
373 if (is_wait_for_receive)
374 TRACE_smpi_recv(src_traced, dst_traced, 0);
375 log_timed_action (action, clock);
378 static void action_waitall(const char *const *action){
379 CHECK_ACTION_PARAMS(action, 0, 0)
380 double clock = smpi_process()->simulated_elapsed();
381 const unsigned int count_requests = get_reqq_self()->size();
383 if (count_requests>0) {
384 MPI_Status status[count_requests];
386 int my_proc_id_traced = Actor::self()->getPid();
387 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
388 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
389 int recvs_snd[count_requests];
390 int recvs_rcv[count_requests];
391 for (unsigned int i = 0; i < count_requests; i++) {
392 const auto& req = (*get_reqq_self())[i];
393 if (req && (req->flags() & RECV)) {
394 recvs_snd[i] = req->src();
395 recvs_rcv[i] = req->dst();
399 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
401 for (unsigned i = 0; i < count_requests; i++) {
402 if (recvs_snd[i]!=-100)
403 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
405 TRACE_smpi_comm_out(my_proc_id_traced);
407 log_timed_action (action, clock);
410 static void action_barrier(const char *const *action){
411 double clock = smpi_process()->simulated_elapsed();
412 int my_proc_id = Actor::self()->getPid();
413 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
415 Colls::barrier(MPI_COMM_WORLD);
417 TRACE_smpi_comm_out(my_proc_id);
418 log_timed_action (action, clock);
421 static void action_bcast(const char *const *action)
423 CHECK_ACTION_PARAMS(action, 1, 2)
424 double size = parse_double(action[2]);
425 double clock = smpi_process()->simulated_elapsed();
426 int root = (action[3]) ? atoi(action[3]) : 0;
427 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
428 MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
430 int my_proc_id = Actor::self()->getPid();
431 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
432 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
433 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
435 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
437 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
439 TRACE_smpi_comm_out(my_proc_id);
440 log_timed_action (action, clock);
443 static void action_reduce(const char *const *action)
445 CHECK_ACTION_PARAMS(action, 2, 2)
446 double comm_size = parse_double(action[2]);
447 double comp_size = parse_double(action[3]);
448 double clock = smpi_process()->simulated_elapsed();
449 int root = (action[4]) ? atoi(action[4]) : 0;
451 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
453 int my_proc_id = Actor::self()->getPid();
454 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
455 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
456 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
458 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
459 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
460 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
461 smpi_execute_flops(comp_size);
463 TRACE_smpi_comm_out(my_proc_id);
464 log_timed_action (action, clock);
467 static void action_allReduce(const char *const *action) {
468 CHECK_ACTION_PARAMS(action, 2, 1)
469 double comm_size = parse_double(action[2]);
470 double comp_size = parse_double(action[3]);
472 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
474 double clock = smpi_process()->simulated_elapsed();
475 int my_proc_id = Actor::self()->getPid();
476 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
477 encode_datatype(MPI_CURRENT_TYPE), ""));
479 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
481 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
482 smpi_execute_flops(comp_size);
484 TRACE_smpi_comm_out(my_proc_id);
485 log_timed_action (action, clock);
488 static void action_allToAll(const char *const *action) {
489 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
490 double clock = smpi_process()->simulated_elapsed();
491 int comm_size = MPI_COMM_WORLD->size();
492 int send_size = parse_double(action[2]);
493 int recv_size = parse_double(action[3]);
494 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
495 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
497 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
498 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
500 int my_proc_id = Actor::self()->getPid();
501 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
502 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
503 encode_datatype(MPI_CURRENT_TYPE),
504 encode_datatype(MPI_CURRENT_TYPE2)));
506 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
508 TRACE_smpi_comm_out(my_proc_id);
509 log_timed_action (action, clock);
512 static void action_gather(const char *const *action) {
513 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
516 1) 68 is the sendcounts
517 2) 68 is the recvcounts
518 3) 0 is the root node
519 4) 0 is the send datatype id, see decode_datatype()
520 5) 0 is the recv datatype id, see decode_datatype()
522 CHECK_ACTION_PARAMS(action, 2, 3)
523 double clock = smpi_process()->simulated_elapsed();
524 int comm_size = MPI_COMM_WORLD->size();
525 int send_size = parse_double(action[2]);
526 int recv_size = parse_double(action[3]);
527 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
528 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
530 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
531 void *recv = nullptr;
532 int root = (action[4]) ? atoi(action[4]) : 0;
533 int rank = MPI_COMM_WORLD->rank();
536 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
538 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
539 encode_datatype(MPI_CURRENT_TYPE),
540 encode_datatype(MPI_CURRENT_TYPE2)));
542 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
544 TRACE_smpi_comm_out(Actor::self()->getPid());
545 log_timed_action (action, clock);
548 static void action_scatter(const char* const* action)
550 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
553 1) 68 is the sendcounts
554 2) 68 is the recvcounts
555 3) 0 is the root node
556 4) 0 is the send datatype id, see decode_datatype()
557 5) 0 is the recv datatype id, see decode_datatype()
559 CHECK_ACTION_PARAMS(action, 2, 3)
560 double clock = smpi_process()->simulated_elapsed();
561 int comm_size = MPI_COMM_WORLD->size();
562 int send_size = parse_double(action[2]);
563 int recv_size = parse_double(action[3]);
564 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
565 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
567 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
568 void* recv = nullptr;
569 int root = (action[4]) ? atoi(action[4]) : 0;
570 int rank = MPI_COMM_WORLD->rank();
573 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
575 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
576 encode_datatype(MPI_CURRENT_TYPE),
577 encode_datatype(MPI_CURRENT_TYPE2)));
579 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
581 TRACE_smpi_comm_out(Actor::self()->getPid());
582 log_timed_action(action, clock);
585 static void action_gatherv(const char *const *action) {
586 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
587 0 gather 68 68 10 10 10 0 0 0
589 1) 68 is the sendcount
590 2) 68 10 10 10 is the recvcounts
591 3) 0 is the root node
592 4) 0 is the send datatype id, see decode_datatype()
593 5) 0 is the recv datatype id, see decode_datatype()
595 double clock = smpi_process()->simulated_elapsed();
596 int comm_size = MPI_COMM_WORLD->size();
597 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
598 int send_size = parse_double(action[2]);
599 std::vector<int> disps(comm_size, 0);
600 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
602 MPI_Datatype MPI_CURRENT_TYPE =
603 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
604 MPI_Datatype MPI_CURRENT_TYPE2{
605 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
607 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
608 void *recv = nullptr;
609 for(int i=0;i<comm_size;i++) {
610 (*recvcounts)[i] = atoi(action[i + 3]);
612 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
614 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
615 int rank = MPI_COMM_WORLD->rank();
618 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
620 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
621 "gatherV", root, send_size, nullptr, -1, recvcounts,
622 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
624 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
627 TRACE_smpi_comm_out(Actor::self()->getPid());
628 log_timed_action (action, clock);
631 static void action_scatterv(const char* const* action)
633 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
634 0 gather 68 10 10 10 68 0 0 0
636 1) 68 10 10 10 is the sendcounts
637 2) 68 is the recvcount
638 3) 0 is the root node
639 4) 0 is the send datatype id, see decode_datatype()
640 5) 0 is the recv datatype id, see decode_datatype()
642 double clock = smpi_process()->simulated_elapsed();
643 int comm_size = MPI_COMM_WORLD->size();
644 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
645 int recv_size = parse_double(action[2 + comm_size]);
646 std::vector<int> disps(comm_size, 0);
647 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
649 MPI_Datatype MPI_CURRENT_TYPE =
650 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
651 MPI_Datatype MPI_CURRENT_TYPE2{
652 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
654 void* send = nullptr;
655 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
656 for (int i = 0; i < comm_size; i++) {
657 (*sendcounts)[i] = atoi(action[i + 2]);
659 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
661 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
662 int rank = MPI_COMM_WORLD->rank();
665 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
667 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
668 nullptr, encode_datatype(MPI_CURRENT_TYPE),
669 encode_datatype(MPI_CURRENT_TYPE2)));
671 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
674 TRACE_smpi_comm_out(Actor::self()->getPid());
675 log_timed_action(action, clock);
678 static void action_reducescatter(const char *const *action) {
679 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
680 0 reduceScatter 275427 275427 275427 204020 11346849 0
682 1) The first four values after the name of the action declare the recvcounts array
683 2) The value 11346849 is the amount of instructions
684 3) The last value corresponds to the datatype, see decode_datatype().
686 double clock = smpi_process()->simulated_elapsed();
687 int comm_size = MPI_COMM_WORLD->size();
688 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
689 int comp_size = parse_double(action[2+comm_size]);
690 int my_proc_id = Actor::self()->getPid();
691 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
692 MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
694 for(int i=0;i<comm_size;i++) {
695 recvcounts->push_back(atoi(action[i + 2]));
697 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
699 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
700 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
701 std::to_string(comp_size), /* ugly hack to print comp_size */
702 encode_datatype(MPI_CURRENT_TYPE)));
704 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
705 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
707 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
708 smpi_execute_flops(comp_size);
710 TRACE_smpi_comm_out(my_proc_id);
711 log_timed_action (action, clock);
714 static void action_allgather(const char *const *action) {
715 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
716 0 allGather 275427 275427
718 1) 275427 is the sendcount
719 2) 275427 is the recvcount
720 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
722 double clock = smpi_process()->simulated_elapsed();
724 CHECK_ACTION_PARAMS(action, 2, 2)
725 int sendcount=atoi(action[2]);
726 int recvcount=atoi(action[3]);
728 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
729 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
731 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
732 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
734 int my_proc_id = Actor::self()->getPid();
736 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
737 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
738 encode_datatype(MPI_CURRENT_TYPE),
739 encode_datatype(MPI_CURRENT_TYPE2)));
741 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
743 TRACE_smpi_comm_out(my_proc_id);
744 log_timed_action (action, clock);
747 static void action_allgatherv(const char *const *action) {
748 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
749 0 allGatherV 275427 275427 275427 275427 204020
751 1) 275427 is the sendcount
752 2) The next four elements declare the recvcounts array
753 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
755 double clock = smpi_process()->simulated_elapsed();
757 int comm_size = MPI_COMM_WORLD->size();
758 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
759 int sendcount=atoi(action[2]);
760 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
761 std::vector<int> disps(comm_size, 0);
763 MPI_Datatype MPI_CURRENT_TYPE =
764 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
765 MPI_Datatype MPI_CURRENT_TYPE2{
766 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
768 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
770 for(int i=0;i<comm_size;i++) {
771 (*recvcounts)[i] = atoi(action[i + 3]);
773 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
774 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
776 int my_proc_id = Actor::self()->getPid();
778 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
779 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
780 encode_datatype(MPI_CURRENT_TYPE),
781 encode_datatype(MPI_CURRENT_TYPE2)));
783 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
786 TRACE_smpi_comm_out(my_proc_id);
787 log_timed_action (action, clock);
790 static void action_allToAllv(const char *const *action) {
791 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
792 0 allToAllV 100 1 7 10 12 100 1 70 10 5
794 1) 100 is the size of the send buffer *sizeof(int),
795 2) 1 7 10 12 is the sendcounts array
796 3) 100*sizeof(int) is the size of the receiver buffer
797 4) 1 70 10 5 is the recvcounts array
799 double clock = smpi_process()->simulated_elapsed();
801 int comm_size = MPI_COMM_WORLD->size();
802 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
803 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
804 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
805 std::vector<int> senddisps(comm_size, 0);
806 std::vector<int> recvdisps(comm_size, 0);
808 MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
809 ? decode_datatype(action[4 + 2 * comm_size])
811 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
812 ? decode_datatype(action[5 + 2 * comm_size])
815 int send_buf_size=parse_double(action[2]);
816 int recv_buf_size=parse_double(action[3+comm_size]);
817 int my_proc_id = Actor::self()->getPid();
818 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
819 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
821 for(int i=0;i<comm_size;i++) {
822 (*sendcounts)[i] = atoi(action[3 + i]);
823 (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
825 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
826 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
828 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
829 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
830 encode_datatype(MPI_CURRENT_TYPE),
831 encode_datatype(MPI_CURRENT_TYPE2)));
833 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
834 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
836 TRACE_smpi_comm_out(my_proc_id);
837 log_timed_action (action, clock);
840 }} // namespace simgrid::smpi
842 /** @brief Only initialize the replay, don't do it for real */
843 void smpi_replay_init(int* argc, char*** argv)
845 simgrid::smpi::Process::init(argc, argv);
846 smpi_process()->mark_as_initialized();
847 smpi_process()->set_replaying(true);
849 int my_proc_id = Actor::self()->getPid();
850 TRACE_smpi_init(my_proc_id);
851 TRACE_smpi_computing_init(my_proc_id);
852 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
853 TRACE_smpi_comm_out(my_proc_id);
854 xbt_replay_action_register("init", simgrid::smpi::action_init);
855 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
856 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
857 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
858 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
859 xbt_replay_action_register("send", simgrid::smpi::action_send);
860 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
861 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
862 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
863 xbt_replay_action_register("test", simgrid::smpi::action_test);
864 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
865 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
866 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
867 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
868 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
869 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
870 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
871 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
872 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
873 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
874 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
875 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
876 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
877 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
878 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
879 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
881 //if we have a delayed start, sleep here.
883 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
884 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
885 smpi_execute_flops(value);
887 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
888 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
889 smpi_execute_flops(0.0);
893 /** @brief actually run the replay after initialization */
894 void smpi_replay_main(int* argc, char*** argv)
896 simgrid::xbt::replay_runner(*argc, *argv);
898 /* and now, finalize everything */
899 /* One active process will stop. Decrease the counter*/
900 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
901 if (not get_reqq_self()->empty()) {
902 unsigned int count_requests=get_reqq_self()->size();
903 MPI_Request requests[count_requests];
904 MPI_Status status[count_requests];
907 for (auto const& req : *get_reqq_self()) {
911 simgrid::smpi::Request::waitall(count_requests, requests, status);
913 delete get_reqq_self();
916 if(active_processes==0){
917 /* Last process alive speaking: end the simulated timer */
918 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
919 xbt_free(sendbuffer);
920 xbt_free(recvbuffer);
923 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
925 smpi_process()->finalize();
927 TRACE_smpi_comm_out(Actor::self()->getPid());
928 TRACE_smpi_finalize(Actor::self()->getPid());
931 /** @brief chain a replay initialization and a replay start */
932 void smpi_replay_run(int* argc, char*** argv)
934 smpi_replay_init(argc, argv);
935 smpi_replay_main(argc, argv);