1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
17 #include <unordered_map>
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 static int communicator_size = 0;
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 static int sendbuffer_size = 0;
31 static char* sendbuffer = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer = nullptr;
35 class ReplayActionArg {
39 static void log_timed_action (const char *const *action, double clock){
40 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
41 char *name = xbt_str_join_array(action, " ");
42 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
47 static std::vector<MPI_Request>* get_reqq_self()
49 return reqq.at(Actor::self()->getPid());
52 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
54 reqq.insert({Actor::self()->getPid(), mpi_request});
57 //allocate a single buffer for all sends, growing it if needed
58 void* smpi_get_tmp_sendbuffer(int size)
60 if (not smpi_process()->replaying())
61 return xbt_malloc(size);
62 if (sendbuffer_size<size){
63 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
69 //allocate a single buffer for all recv
70 void* smpi_get_tmp_recvbuffer(int size){
71 if (not smpi_process()->replaying())
72 return xbt_malloc(size);
73 if (recvbuffer_size<size){
74 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
80 void smpi_free_tmp_buffer(void* buf){
81 if (not smpi_process()->replaying())
86 static double parse_double(const char *string)
89 double value = strtod(string, &endptr);
91 THROWF(unknown_error, 0, "%s is not a double", string);
96 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
97 static MPI_Datatype decode_datatype(const char *const action)
99 return simgrid::smpi::Datatype::decode(action);
102 const char* encode_datatype(MPI_Datatype datatype)
104 if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
107 return datatype->encode();
110 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
112 while(action[i]!=nullptr)\
115 THROWF(arg_error, 0, "%s replay failed.\n" \
116 "%d items were given on the line. First two should be process_id and action. " \
117 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
118 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
124 static void action_init(const char *const *action)
126 XBT_DEBUG("Initialize the counters");
127 CHECK_ACTION_PARAMS(action, 0, 1)
129 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
131 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
133 /* start a simulated timer */
134 smpi_process()->simulated_start();
135 /*initialize the number of active processes */
136 active_processes = smpi_process_count();
138 set_reqq_self(new std::vector<MPI_Request>);
141 static void action_finalize(const char *const *action)
146 static void action_comm_size(const char *const *action)
148 communicator_size = parse_double(action[2]);
149 log_timed_action (action, smpi_process()->simulated_elapsed());
152 static void action_comm_split(const char *const *action)
154 log_timed_action (action, smpi_process()->simulated_elapsed());
157 static void action_comm_dup(const char *const *action)
159 log_timed_action (action, smpi_process()->simulated_elapsed());
162 static void action_compute(const char *const *action)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 double clock = smpi_process()->simulated_elapsed();
166 double flops= parse_double(action[2]);
167 int my_proc_id = Actor::self()->getPid();
169 TRACE_smpi_computing_in(my_proc_id, flops);
170 smpi_execute_flops(flops);
171 TRACE_smpi_computing_out(my_proc_id);
173 log_timed_action (action, clock);
176 static void action_send(const char *const *action)
178 CHECK_ACTION_PARAMS(action, 2, 1)
179 int to = atoi(action[2]);
180 double size=parse_double(action[3]);
181 double clock = smpi_process()->simulated_elapsed();
183 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
185 int my_proc_id = Actor::self()->getPid();
186 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
188 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
189 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
190 if (not TRACE_smpi_view_internals())
191 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
193 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
195 TRACE_smpi_comm_out(my_proc_id);
197 log_timed_action(action, clock);
200 static void action_Isend(const char *const *action)
202 CHECK_ACTION_PARAMS(action, 2, 1)
203 int to = atoi(action[2]);
204 double size=parse_double(action[3]);
205 double clock = smpi_process()->simulated_elapsed();
207 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
209 int my_proc_id = Actor::self()->getPid();
210 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
211 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
212 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
213 if (not TRACE_smpi_view_internals())
214 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
216 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
218 TRACE_smpi_comm_out(my_proc_id);
220 get_reqq_self()->push_back(request);
222 log_timed_action (action, clock);
225 static void action_recv(const char *const *action) {
226 CHECK_ACTION_PARAMS(action, 2, 1)
227 int from = atoi(action[2]);
228 double size=parse_double(action[3]);
229 double clock = smpi_process()->simulated_elapsed();
232 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
234 int my_proc_id = Actor::self()->getPid();
235 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
237 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
238 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
240 //unknown size from the receiver point of view
242 Request::probe(from, 0, MPI_COMM_WORLD, &status);
246 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
248 TRACE_smpi_comm_out(my_proc_id);
249 if (not TRACE_smpi_view_internals()) {
250 TRACE_smpi_recv(src_traced, my_proc_id, 0);
253 log_timed_action (action, clock);
256 static void action_Irecv(const char *const *action)
258 CHECK_ACTION_PARAMS(action, 2, 1)
259 int from = atoi(action[2]);
260 double size=parse_double(action[3]);
261 double clock = smpi_process()->simulated_elapsed();
263 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
265 int my_proc_id = Actor::self()->getPid();
266 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
267 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
269 //unknow size from the receiver pov
271 Request::probe(from, 0, MPI_COMM_WORLD, &status);
275 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
277 TRACE_smpi_comm_out(my_proc_id);
278 get_reqq_self()->push_back(request);
280 log_timed_action (action, clock);
283 static void action_test(const char* const* action)
285 CHECK_ACTION_PARAMS(action, 0, 0)
286 double clock = smpi_process()->simulated_elapsed();
289 MPI_Request request = get_reqq_self()->back();
290 get_reqq_self()->pop_back();
291 //if request is null here, this may mean that a previous test has succeeded
292 //Different times in traced application and replayed version may lead to this
293 //In this case, ignore the extra calls.
294 if(request!=nullptr){
295 int my_proc_id = Actor::self()->getPid();
296 TRACE_smpi_testing_in(my_proc_id);
298 int flag = Request::test(&request, &status);
300 XBT_DEBUG("MPI_Test result: %d", flag);
301 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
302 get_reqq_self()->push_back(request);
304 TRACE_smpi_testing_out(my_proc_id);
306 log_timed_action (action, clock);
309 static void action_wait(const char *const *action){
310 CHECK_ACTION_PARAMS(action, 0, 0)
311 double clock = smpi_process()->simulated_elapsed();
314 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
315 xbt_str_join_array(action," "));
316 MPI_Request request = get_reqq_self()->back();
317 get_reqq_self()->pop_back();
319 if (request==nullptr){
320 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
324 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
326 MPI_Group group = request->comm()->group();
327 int src_traced = group->rank(request->src());
328 int dst_traced = group->rank(request->dst());
329 int is_wait_for_receive = (request->flags() & RECV);
330 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
332 Request::wait(&request, &status);
334 TRACE_smpi_comm_out(rank);
335 if (is_wait_for_receive)
336 TRACE_smpi_recv(src_traced, dst_traced, 0);
337 log_timed_action (action, clock);
340 static void action_waitall(const char *const *action){
341 CHECK_ACTION_PARAMS(action, 0, 0)
342 double clock = smpi_process()->simulated_elapsed();
343 const unsigned int count_requests = get_reqq_self()->size();
345 if (count_requests>0) {
346 MPI_Status status[count_requests];
348 int my_proc_id_traced = Actor::self()->getPid();
349 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
350 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
351 int recvs_snd[count_requests];
352 int recvs_rcv[count_requests];
353 for (unsigned int i = 0; i < count_requests; i++) {
354 const auto& req = (*get_reqq_self())[i];
355 if (req && (req->flags() & RECV)) {
356 recvs_snd[i] = req->src();
357 recvs_rcv[i] = req->dst();
361 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
363 for (unsigned i = 0; i < count_requests; i++) {
364 if (recvs_snd[i]!=-100)
365 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
367 TRACE_smpi_comm_out(my_proc_id_traced);
369 log_timed_action (action, clock);
372 static void action_barrier(const char *const *action){
373 double clock = smpi_process()->simulated_elapsed();
374 int my_proc_id = Actor::self()->getPid();
375 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
377 Colls::barrier(MPI_COMM_WORLD);
379 TRACE_smpi_comm_out(my_proc_id);
380 log_timed_action (action, clock);
383 static void action_bcast(const char *const *action)
385 CHECK_ACTION_PARAMS(action, 1, 2)
386 double size = parse_double(action[2]);
387 double clock = smpi_process()->simulated_elapsed();
388 int root = (action[3]) ? atoi(action[3]) : 0;
389 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
390 MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
392 int my_proc_id = Actor::self()->getPid();
393 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
394 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
395 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
397 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
399 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
401 TRACE_smpi_comm_out(my_proc_id);
402 log_timed_action (action, clock);
405 static void action_reduce(const char *const *action)
407 CHECK_ACTION_PARAMS(action, 2, 2)
408 double comm_size = parse_double(action[2]);
409 double comp_size = parse_double(action[3]);
410 double clock = smpi_process()->simulated_elapsed();
411 int root = (action[4]) ? atoi(action[4]) : 0;
413 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
415 int my_proc_id = Actor::self()->getPid();
416 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
417 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
418 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
420 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
421 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
422 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
423 smpi_execute_flops(comp_size);
425 TRACE_smpi_comm_out(my_proc_id);
426 log_timed_action (action, clock);
429 static void action_allReduce(const char *const *action) {
430 CHECK_ACTION_PARAMS(action, 2, 1)
431 double comm_size = parse_double(action[2]);
432 double comp_size = parse_double(action[3]);
434 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
436 double clock = smpi_process()->simulated_elapsed();
437 int my_proc_id = Actor::self()->getPid();
438 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
439 encode_datatype(MPI_CURRENT_TYPE), ""));
441 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
442 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
443 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
444 smpi_execute_flops(comp_size);
446 TRACE_smpi_comm_out(my_proc_id);
447 log_timed_action (action, clock);
450 static void action_allToAll(const char *const *action) {
451 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
452 double clock = smpi_process()->simulated_elapsed();
453 int comm_size = MPI_COMM_WORLD->size();
454 int send_size = parse_double(action[2]);
455 int recv_size = parse_double(action[3]);
456 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
457 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
459 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
460 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
462 int my_proc_id = Actor::self()->getPid();
463 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
464 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
465 encode_datatype(MPI_CURRENT_TYPE),
466 encode_datatype(MPI_CURRENT_TYPE2)));
468 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
470 TRACE_smpi_comm_out(my_proc_id);
471 log_timed_action (action, clock);
474 static void action_gather(const char *const *action) {
475 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
478 1) 68 is the sendcounts
479 2) 68 is the recvcounts
480 3) 0 is the root node
481 4) 0 is the send datatype id, see decode_datatype()
482 5) 0 is the recv datatype id, see decode_datatype()
484 CHECK_ACTION_PARAMS(action, 2, 3)
485 double clock = smpi_process()->simulated_elapsed();
486 int comm_size = MPI_COMM_WORLD->size();
487 int send_size = parse_double(action[2]);
488 int recv_size = parse_double(action[3]);
489 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
490 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
492 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
493 void *recv = nullptr;
494 int root = (action[4]) ? atoi(action[4]) : 0;
495 int rank = MPI_COMM_WORLD->rank();
498 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
500 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
501 encode_datatype(MPI_CURRENT_TYPE),
502 encode_datatype(MPI_CURRENT_TYPE2)));
504 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
506 TRACE_smpi_comm_out(Actor::self()->getPid());
507 log_timed_action (action, clock);
510 static void action_scatter(const char* const* action)
512 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
515 1) 68 is the sendcounts
516 2) 68 is the recvcounts
517 3) 0 is the root node
518 4) 0 is the send datatype id, see decode_datatype()
519 5) 0 is the recv datatype id, see decode_datatype()
521 CHECK_ACTION_PARAMS(action, 2, 3)
522 double clock = smpi_process()->simulated_elapsed();
523 int comm_size = MPI_COMM_WORLD->size();
524 int send_size = parse_double(action[2]);
525 int recv_size = parse_double(action[3]);
526 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
527 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
529 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
530 void* recv = nullptr;
531 int root = (action[4]) ? atoi(action[4]) : 0;
532 int rank = MPI_COMM_WORLD->rank();
535 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
537 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
538 encode_datatype(MPI_CURRENT_TYPE),
539 encode_datatype(MPI_CURRENT_TYPE2)));
541 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
543 TRACE_smpi_comm_out(Actor::self()->getPid());
544 log_timed_action(action, clock);
547 static void action_gatherv(const char *const *action) {
548 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
549 0 gather 68 68 10 10 10 0 0 0
551 1) 68 is the sendcount
552 2) 68 10 10 10 is the recvcounts
553 3) 0 is the root node
554 4) 0 is the send datatype id, see decode_datatype()
555 5) 0 is the recv datatype id, see decode_datatype()
557 double clock = smpi_process()->simulated_elapsed();
558 int comm_size = MPI_COMM_WORLD->size();
559 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
560 int send_size = parse_double(action[2]);
561 std::vector<int> disps(comm_size, 0);
562 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
564 MPI_Datatype MPI_CURRENT_TYPE =
565 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
566 MPI_Datatype MPI_CURRENT_TYPE2{
567 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
569 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
570 void *recv = nullptr;
571 for(int i=0;i<comm_size;i++) {
572 (*recvcounts)[i] = atoi(action[i + 3]);
574 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
576 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
577 int rank = MPI_COMM_WORLD->rank();
580 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
582 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
583 "gatherV", root, send_size, nullptr, -1, recvcounts,
584 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
586 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
589 TRACE_smpi_comm_out(Actor::self()->getPid());
590 log_timed_action (action, clock);
593 static void action_scatterv(const char* const* action)
595 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
596 0 gather 68 10 10 10 68 0 0 0
598 1) 68 10 10 10 is the sendcounts
599 2) 68 is the recvcount
600 3) 0 is the root node
601 4) 0 is the send datatype id, see decode_datatype()
602 5) 0 is the recv datatype id, see decode_datatype()
604 double clock = smpi_process()->simulated_elapsed();
605 int comm_size = MPI_COMM_WORLD->size();
606 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
607 int recv_size = parse_double(action[2 + comm_size]);
608 std::vector<int> disps(comm_size, 0);
609 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
611 MPI_Datatype MPI_CURRENT_TYPE =
612 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
613 MPI_Datatype MPI_CURRENT_TYPE2{
614 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
616 void* send = nullptr;
617 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
618 for (int i = 0; i < comm_size; i++) {
619 (*sendcounts)[i] = atoi(action[i + 2]);
621 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
623 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
624 int rank = MPI_COMM_WORLD->rank();
627 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
629 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
630 nullptr, encode_datatype(MPI_CURRENT_TYPE),
631 encode_datatype(MPI_CURRENT_TYPE2)));
633 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
636 TRACE_smpi_comm_out(Actor::self()->getPid());
637 log_timed_action(action, clock);
640 static void action_reducescatter(const char *const *action) {
641 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
642 0 reduceScatter 275427 275427 275427 204020 11346849 0
644 1) The first four values after the name of the action declare the recvcounts array
645 2) The value 11346849 is the amount of instructions
646 3) The last value corresponds to the datatype, see decode_datatype().
648 double clock = smpi_process()->simulated_elapsed();
649 int comm_size = MPI_COMM_WORLD->size();
650 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
651 int comp_size = parse_double(action[2+comm_size]);
652 int my_proc_id = Actor::self()->getPid();
653 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
654 MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
656 for(int i=0;i<comm_size;i++) {
657 recvcounts->push_back(atoi(action[i + 2]));
659 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
661 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
662 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
663 std::to_string(comp_size), /* ugly hack to print comp_size */
664 encode_datatype(MPI_CURRENT_TYPE)));
666 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
667 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
669 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
670 smpi_execute_flops(comp_size);
672 TRACE_smpi_comm_out(my_proc_id);
673 log_timed_action (action, clock);
676 static void action_allgather(const char *const *action) {
677 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
678 0 allGather 275427 275427
680 1) 275427 is the sendcount
681 2) 275427 is the recvcount
682 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
684 double clock = smpi_process()->simulated_elapsed();
686 CHECK_ACTION_PARAMS(action, 2, 2)
687 int sendcount=atoi(action[2]);
688 int recvcount=atoi(action[3]);
690 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
691 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
693 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
694 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
696 int my_proc_id = Actor::self()->getPid();
698 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
699 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
700 encode_datatype(MPI_CURRENT_TYPE),
701 encode_datatype(MPI_CURRENT_TYPE2)));
703 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
705 TRACE_smpi_comm_out(my_proc_id);
706 log_timed_action (action, clock);
709 static void action_allgatherv(const char *const *action) {
710 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
711 0 allGatherV 275427 275427 275427 275427 204020
713 1) 275427 is the sendcount
714 2) The next four elements declare the recvcounts array
715 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
717 double clock = smpi_process()->simulated_elapsed();
719 int comm_size = MPI_COMM_WORLD->size();
720 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
721 int sendcount=atoi(action[2]);
722 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
723 std::vector<int> disps(comm_size, 0);
725 MPI_Datatype MPI_CURRENT_TYPE =
726 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
727 MPI_Datatype MPI_CURRENT_TYPE2{
728 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
730 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
732 for(int i=0;i<comm_size;i++) {
733 (*recvcounts)[i] = atoi(action[i + 3]);
735 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
736 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
738 int my_proc_id = Actor::self()->getPid();
740 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
741 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
742 encode_datatype(MPI_CURRENT_TYPE),
743 encode_datatype(MPI_CURRENT_TYPE2)));
745 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
748 TRACE_smpi_comm_out(my_proc_id);
749 log_timed_action (action, clock);
752 static void action_allToAllv(const char *const *action) {
753 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
754 0 allToAllV 100 1 7 10 12 100 1 70 10 5
756 1) 100 is the size of the send buffer *sizeof(int),
757 2) 1 7 10 12 is the sendcounts array
758 3) 100*sizeof(int) is the size of the receiver buffer
759 4) 1 70 10 5 is the recvcounts array
761 double clock = smpi_process()->simulated_elapsed();
763 int comm_size = MPI_COMM_WORLD->size();
764 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
765 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
766 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
767 std::vector<int> senddisps(comm_size, 0);
768 std::vector<int> recvdisps(comm_size, 0);
770 MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
771 ? decode_datatype(action[4 + 2 * comm_size])
773 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
774 ? decode_datatype(action[5 + 2 * comm_size])
777 int send_buf_size=parse_double(action[2]);
778 int recv_buf_size=parse_double(action[3+comm_size]);
779 int my_proc_id = Actor::self()->getPid();
780 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
781 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
783 for(int i=0;i<comm_size;i++) {
784 (*sendcounts)[i] = atoi(action[3 + i]);
785 (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
787 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
788 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
790 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
791 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
792 encode_datatype(MPI_CURRENT_TYPE),
793 encode_datatype(MPI_CURRENT_TYPE2)));
795 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
796 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
798 TRACE_smpi_comm_out(my_proc_id);
799 log_timed_action (action, clock);
802 }} // namespace simgrid::smpi
804 /** @brief Only initialize the replay, don't do it for real */
805 void smpi_replay_init(int* argc, char*** argv)
807 simgrid::smpi::Process::init(argc, argv);
808 smpi_process()->mark_as_initialized();
809 smpi_process()->set_replaying(true);
811 int my_proc_id = Actor::self()->getPid();
812 TRACE_smpi_init(my_proc_id);
813 TRACE_smpi_computing_init(my_proc_id);
814 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
815 TRACE_smpi_comm_out(my_proc_id);
816 xbt_replay_action_register("init", simgrid::smpi::action_init);
817 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
818 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
819 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
820 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
821 xbt_replay_action_register("send", simgrid::smpi::action_send);
822 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
823 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
824 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
825 xbt_replay_action_register("test", simgrid::smpi::action_test);
826 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
827 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
828 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
829 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
830 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
831 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
832 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
833 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
834 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
835 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
836 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
837 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
838 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
839 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
840 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
841 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
843 //if we have a delayed start, sleep here.
845 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
846 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
847 smpi_execute_flops(value);
849 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
850 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
851 smpi_execute_flops(0.0);
855 /** @brief actually run the replay after initialization */
856 void smpi_replay_main(int* argc, char*** argv)
858 simgrid::xbt::replay_runner(*argc, *argv);
860 /* and now, finalize everything */
861 /* One active process will stop. Decrease the counter*/
862 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
863 if (not get_reqq_self()->empty()) {
864 unsigned int count_requests=get_reqq_self()->size();
865 MPI_Request requests[count_requests];
866 MPI_Status status[count_requests];
869 for (auto const& req : *get_reqq_self()) {
873 simgrid::smpi::Request::waitall(count_requests, requests, status);
875 delete get_reqq_self();
878 if(active_processes==0){
879 /* Last process alive speaking: end the simulated timer */
880 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
881 xbt_free(sendbuffer);
882 xbt_free(recvbuffer);
885 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
887 smpi_process()->finalize();
889 TRACE_smpi_comm_out(Actor::self()->getPid());
890 TRACE_smpi_finalize(Actor::self()->getPid());
893 /** @brief chain a replay initialization and a replay start */
894 void smpi_replay_run(int* argc, char*** argv)
896 smpi_replay_init(argc, argv);
897 smpi_replay_main(argc, argv);