1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
17 #include <unordered_map>
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 static int communicator_size = 0;
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29 static MPI_Datatype MPI_CURRENT_TYPE;
31 static int sendbuffer_size = 0;
32 static char* sendbuffer = nullptr;
33 static int recvbuffer_size = 0;
34 static char* recvbuffer = nullptr;
36 static void log_timed_action (const char *const *action, double clock){
37 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38 char *name = xbt_str_join_array(action, " ");
39 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
44 static std::vector<MPI_Request>* get_reqq_self()
46 return reqq.at(Actor::self()->getPid());
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
51 reqq.insert({Actor::self()->getPid(), mpi_request});
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
57 if (not smpi_process()->replaying())
58 return xbt_malloc(size);
59 if (sendbuffer_size<size){
60 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68 if (not smpi_process()->replaying())
69 return xbt_malloc(size);
70 if (recvbuffer_size<size){
71 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
77 void smpi_free_tmp_buffer(void* buf){
78 if (not smpi_process()->replaying())
83 static double parse_double(const char *string)
86 double value = strtod(string, &endptr);
88 THROWF(unknown_error, 0, "%s is not a double", string);
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
96 switch(atoi(action)) {
119 return MPI_DEFAULT_TYPE;
124 const char* encode_datatype(MPI_Datatype datatype)
126 if (datatype==MPI_BYTE)
128 if(datatype==MPI_DOUBLE)
130 if(datatype==MPI_INT)
132 if(datatype==MPI_CHAR)
134 if(datatype==MPI_SHORT)
136 if(datatype==MPI_LONG)
138 if(datatype==MPI_FLOAT)
140 // default - not implemented.
141 // do not warn here as we pass in this function even for other trace formats
145 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
147 while(action[i]!=nullptr)\
150 THROWF(arg_error, 0, "%s replay failed.\n" \
151 "%d items were given on the line. First two should be process_id and action. " \
152 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
153 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
159 static void action_init(const char *const *action)
161 XBT_DEBUG("Initialize the counters");
162 CHECK_ACTION_PARAMS(action, 0, 1)
164 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
166 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
168 /* start a simulated timer */
169 smpi_process()->simulated_start();
170 /*initialize the number of active processes */
171 active_processes = smpi_process_count();
173 set_reqq_self(new std::vector<MPI_Request>);
176 static void action_finalize(const char *const *action)
181 static void action_comm_size(const char *const *action)
183 communicator_size = parse_double(action[2]);
184 log_timed_action (action, smpi_process()->simulated_elapsed());
187 static void action_comm_split(const char *const *action)
189 log_timed_action (action, smpi_process()->simulated_elapsed());
192 static void action_comm_dup(const char *const *action)
194 log_timed_action (action, smpi_process()->simulated_elapsed());
197 static void action_compute(const char *const *action)
199 CHECK_ACTION_PARAMS(action, 1, 0)
200 double clock = smpi_process()->simulated_elapsed();
201 double flops= parse_double(action[2]);
202 int my_proc_id = Actor::self()->getPid();
204 TRACE_smpi_computing_in(my_proc_id, flops);
205 smpi_execute_flops(flops);
206 TRACE_smpi_computing_out(my_proc_id);
208 log_timed_action (action, clock);
211 static void action_send(const char *const *action)
213 CHECK_ACTION_PARAMS(action, 2, 1)
214 int to = atoi(action[2]);
215 double size=parse_double(action[3]);
216 double clock = smpi_process()->simulated_elapsed();
218 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
220 int my_proc_id = Actor::self()->getPid();
221 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
223 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
224 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
225 if (not TRACE_smpi_view_internals())
226 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
228 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
230 TRACE_smpi_comm_out(my_proc_id);
232 log_timed_action(action, clock);
235 static void action_Isend(const char *const *action)
237 CHECK_ACTION_PARAMS(action, 2, 1)
238 int to = atoi(action[2]);
239 double size=parse_double(action[3]);
240 double clock = smpi_process()->simulated_elapsed();
242 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
244 int my_proc_id = Actor::self()->getPid();
245 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
246 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
247 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
248 if (not TRACE_smpi_view_internals())
249 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
251 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
253 TRACE_smpi_comm_out(my_proc_id);
255 get_reqq_self()->push_back(request);
257 log_timed_action (action, clock);
260 static void action_recv(const char *const *action) {
261 CHECK_ACTION_PARAMS(action, 2, 1)
262 int from = atoi(action[2]);
263 double size=parse_double(action[3]);
264 double clock = smpi_process()->simulated_elapsed();
267 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
269 int my_proc_id = Actor::self()->getPid();
270 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
272 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
273 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
275 //unknown size from the receiver point of view
277 Request::probe(from, 0, MPI_COMM_WORLD, &status);
281 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
283 TRACE_smpi_comm_out(my_proc_id);
284 if (not TRACE_smpi_view_internals()) {
285 TRACE_smpi_recv(src_traced, my_proc_id, 0);
288 log_timed_action (action, clock);
291 static void action_Irecv(const char *const *action)
293 CHECK_ACTION_PARAMS(action, 2, 1)
294 int from = atoi(action[2]);
295 double size=parse_double(action[3]);
296 double clock = smpi_process()->simulated_elapsed();
298 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
300 int my_proc_id = Actor::self()->getPid();
301 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
302 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
304 //unknow size from the receiver pov
306 Request::probe(from, 0, MPI_COMM_WORLD, &status);
310 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
312 TRACE_smpi_comm_out(my_proc_id);
313 get_reqq_self()->push_back(request);
315 log_timed_action (action, clock);
318 static void action_test(const char* const* action)
320 CHECK_ACTION_PARAMS(action, 0, 0)
321 double clock = smpi_process()->simulated_elapsed();
324 MPI_Request request = get_reqq_self()->back();
325 get_reqq_self()->pop_back();
326 //if request is null here, this may mean that a previous test has succeeded
327 //Different times in traced application and replayed version may lead to this
328 //In this case, ignore the extra calls.
329 if(request!=nullptr){
330 int my_proc_id = Actor::self()->getPid();
331 TRACE_smpi_testing_in(my_proc_id);
333 int flag = Request::test(&request, &status);
335 XBT_DEBUG("MPI_Test result: %d", flag);
336 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
337 get_reqq_self()->push_back(request);
339 TRACE_smpi_testing_out(my_proc_id);
341 log_timed_action (action, clock);
344 static void action_wait(const char *const *action){
345 CHECK_ACTION_PARAMS(action, 0, 0)
346 double clock = smpi_process()->simulated_elapsed();
349 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
350 xbt_str_join_array(action," "));
351 MPI_Request request = get_reqq_self()->back();
352 get_reqq_self()->pop_back();
354 if (request==nullptr){
355 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
359 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
361 MPI_Group group = request->comm()->group();
362 int src_traced = group->rank(request->src());
363 int dst_traced = group->rank(request->dst());
364 int is_wait_for_receive = (request->flags() & RECV);
365 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
367 Request::wait(&request, &status);
369 TRACE_smpi_comm_out(rank);
370 if (is_wait_for_receive)
371 TRACE_smpi_recv(src_traced, dst_traced, 0);
372 log_timed_action (action, clock);
375 static void action_waitall(const char *const *action){
376 CHECK_ACTION_PARAMS(action, 0, 0)
377 double clock = smpi_process()->simulated_elapsed();
378 const unsigned int count_requests = get_reqq_self()->size();
380 if (count_requests>0) {
381 MPI_Status status[count_requests];
383 int my_proc_id_traced = Actor::self()->getPid();
384 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
385 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
386 int recvs_snd[count_requests];
387 int recvs_rcv[count_requests];
388 for (unsigned int i = 0; i < count_requests; i++) {
389 const auto& req = (*get_reqq_self())[i];
390 if (req && (req->flags() & RECV)) {
391 recvs_snd[i] = req->src();
392 recvs_rcv[i] = req->dst();
396 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
398 for (unsigned i = 0; i < count_requests; i++) {
399 if (recvs_snd[i]!=-100)
400 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
402 TRACE_smpi_comm_out(my_proc_id_traced);
404 log_timed_action (action, clock);
407 static void action_barrier(const char *const *action){
408 double clock = smpi_process()->simulated_elapsed();
409 int my_proc_id = Actor::self()->getPid();
410 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
412 Colls::barrier(MPI_COMM_WORLD);
414 TRACE_smpi_comm_out(my_proc_id);
415 log_timed_action (action, clock);
418 static void action_bcast(const char *const *action)
420 CHECK_ACTION_PARAMS(action, 1, 2)
421 double size = parse_double(action[2]);
422 double clock = smpi_process()->simulated_elapsed();
423 int root = (action[3]) ? atoi(action[3]) : 0;
424 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
425 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
427 int my_proc_id = Actor::self()->getPid();
428 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
430 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
432 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
434 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
436 TRACE_smpi_comm_out(my_proc_id);
437 log_timed_action (action, clock);
440 static void action_reduce(const char *const *action)
442 CHECK_ACTION_PARAMS(action, 2, 2)
443 double comm_size = parse_double(action[2]);
444 double comp_size = parse_double(action[3]);
445 double clock = smpi_process()->simulated_elapsed();
446 int root = (action[4]) ? atoi(action[4]) : 0;
448 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
450 int my_proc_id = Actor::self()->getPid();
451 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
452 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
453 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
455 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
458 smpi_execute_flops(comp_size);
460 TRACE_smpi_comm_out(my_proc_id);
461 log_timed_action (action, clock);
464 static void action_allReduce(const char *const *action) {
465 CHECK_ACTION_PARAMS(action, 2, 1)
466 double comm_size = parse_double(action[2]);
467 double comp_size = parse_double(action[3]);
469 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
471 double clock = smpi_process()->simulated_elapsed();
472 int my_proc_id = Actor::self()->getPid();
473 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
474 encode_datatype(MPI_CURRENT_TYPE), ""));
476 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
479 smpi_execute_flops(comp_size);
481 TRACE_smpi_comm_out(my_proc_id);
482 log_timed_action (action, clock);
485 static void action_allToAll(const char *const *action) {
486 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
487 double clock = smpi_process()->simulated_elapsed();
488 int comm_size = MPI_COMM_WORLD->size();
489 int send_size = parse_double(action[2]);
490 int recv_size = parse_double(action[3]);
491 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
492 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
494 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
495 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
497 int my_proc_id = Actor::self()->getPid();
498 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
499 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
500 encode_datatype(MPI_CURRENT_TYPE),
501 encode_datatype(MPI_CURRENT_TYPE2)));
503 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
505 TRACE_smpi_comm_out(my_proc_id);
506 log_timed_action (action, clock);
509 static void action_gather(const char *const *action) {
510 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
513 1) 68 is the sendcounts
514 2) 68 is the recvcounts
515 3) 0 is the root node
516 4) 0 is the send datatype id, see decode_datatype()
517 5) 0 is the recv datatype id, see decode_datatype()
519 CHECK_ACTION_PARAMS(action, 2, 3)
520 double clock = smpi_process()->simulated_elapsed();
521 int comm_size = MPI_COMM_WORLD->size();
522 int send_size = parse_double(action[2]);
523 int recv_size = parse_double(action[3]);
524 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
525 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
527 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528 void *recv = nullptr;
529 int root = (action[4]) ? atoi(action[4]) : 0;
530 int rank = MPI_COMM_WORLD->rank();
533 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
535 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
536 encode_datatype(MPI_CURRENT_TYPE),
537 encode_datatype(MPI_CURRENT_TYPE2)));
539 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
541 TRACE_smpi_comm_out(Actor::self()->getPid());
542 log_timed_action (action, clock);
545 static void action_scatter(const char* const* action)
547 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
550 1) 68 is the sendcounts
551 2) 68 is the recvcounts
552 3) 0 is the root node
553 4) 0 is the send datatype id, see decode_datatype()
554 5) 0 is the recv datatype id, see decode_datatype()
556 CHECK_ACTION_PARAMS(action, 2, 3)
557 double clock = smpi_process()->simulated_elapsed();
558 int comm_size = MPI_COMM_WORLD->size();
559 int send_size = parse_double(action[2]);
560 int recv_size = parse_double(action[3]);
561 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
562 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
564 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
565 void* recv = nullptr;
566 int root = (action[4]) ? atoi(action[4]) : 0;
567 int rank = MPI_COMM_WORLD->rank();
570 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
572 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573 encode_datatype(MPI_CURRENT_TYPE),
574 encode_datatype(MPI_CURRENT_TYPE2)));
576 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
578 TRACE_smpi_comm_out(Actor::self()->getPid());
579 log_timed_action(action, clock);
582 static void action_gatherv(const char *const *action) {
583 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
584 0 gather 68 68 10 10 10 0 0 0
586 1) 68 is the sendcount
587 2) 68 10 10 10 is the recvcounts
588 3) 0 is the root node
589 4) 0 is the send datatype id, see decode_datatype()
590 5) 0 is the recv datatype id, see decode_datatype()
592 double clock = smpi_process()->simulated_elapsed();
593 int comm_size = MPI_COMM_WORLD->size();
594 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
595 int send_size = parse_double(action[2]);
596 std::vector<int> disps(comm_size, 0);
597 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
600 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
601 MPI_Datatype MPI_CURRENT_TYPE2{
602 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
604 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
605 void *recv = nullptr;
606 for(int i=0;i<comm_size;i++) {
607 (*recvcounts)[i] = atoi(action[i + 3]);
609 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
611 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
612 int rank = MPI_COMM_WORLD->rank();
615 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
617 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
618 "gatherV", root, send_size, nullptr, -1, recvcounts,
619 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
621 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
624 TRACE_smpi_comm_out(Actor::self()->getPid());
625 log_timed_action (action, clock);
628 static void action_scatterv(const char* const* action)
630 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
631 0 gather 68 10 10 10 68 0 0 0
633 1) 68 10 10 10 is the sendcounts
634 2) 68 is the recvcount
635 3) 0 is the root node
636 4) 0 is the send datatype id, see decode_datatype()
637 5) 0 is the recv datatype id, see decode_datatype()
639 double clock = smpi_process()->simulated_elapsed();
640 int comm_size = MPI_COMM_WORLD->size();
641 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
642 int recv_size = parse_double(action[2 + comm_size]);
643 std::vector<int> disps(comm_size, 0);
644 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
647 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
648 MPI_Datatype MPI_CURRENT_TYPE2{
649 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
651 void* send = nullptr;
652 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
653 for (int i = 0; i < comm_size; i++) {
654 (*sendcounts)[i] = atoi(action[i + 2]);
656 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
658 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
659 int rank = MPI_COMM_WORLD->rank();
662 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
664 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
665 nullptr, encode_datatype(MPI_CURRENT_TYPE),
666 encode_datatype(MPI_CURRENT_TYPE2)));
668 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
671 TRACE_smpi_comm_out(Actor::self()->getPid());
672 log_timed_action(action, clock);
675 static void action_reducescatter(const char *const *action) {
676 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
677 0 reduceScatter 275427 275427 275427 204020 11346849 0
679 1) The first four values after the name of the action declare the recvcounts array
680 2) The value 11346849 is the amount of instructions
681 3) The last value corresponds to the datatype, see decode_datatype().
683 double clock = smpi_process()->simulated_elapsed();
684 int comm_size = MPI_COMM_WORLD->size();
685 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
686 int comp_size = parse_double(action[2+comm_size]);
687 int my_proc_id = Actor::self()->getPid();
688 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
689 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
691 for(int i=0;i<comm_size;i++) {
692 recvcounts->push_back(atoi(action[i + 2]));
694 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
696 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
697 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
698 std::to_string(comp_size), /* ugly hack to print comp_size */
699 encode_datatype(MPI_CURRENT_TYPE)));
701 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
702 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
704 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
705 smpi_execute_flops(comp_size);
707 TRACE_smpi_comm_out(my_proc_id);
708 log_timed_action (action, clock);
711 static void action_allgather(const char *const *action) {
712 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
713 0 allGather 275427 275427
715 1) 275427 is the sendcount
716 2) 275427 is the recvcount
717 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
719 double clock = smpi_process()->simulated_elapsed();
721 CHECK_ACTION_PARAMS(action, 2, 2)
722 int sendcount=atoi(action[2]);
723 int recvcount=atoi(action[3]);
725 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
726 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
728 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
729 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
731 int my_proc_id = Actor::self()->getPid();
733 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
734 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
735 encode_datatype(MPI_CURRENT_TYPE),
736 encode_datatype(MPI_CURRENT_TYPE2)));
738 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
740 TRACE_smpi_comm_out(my_proc_id);
741 log_timed_action (action, clock);
744 static void action_allgatherv(const char *const *action) {
745 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
746 0 allGatherV 275427 275427 275427 275427 204020
748 1) 275427 is the sendcount
749 2) The next four elements declare the recvcounts array
750 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
752 double clock = smpi_process()->simulated_elapsed();
754 int comm_size = MPI_COMM_WORLD->size();
755 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
756 int sendcount=atoi(action[2]);
757 int recvcounts[comm_size];
758 std::vector<int> disps(comm_size, 0);
762 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
763 MPI_Datatype MPI_CURRENT_TYPE2{
764 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
766 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
768 for(int i=0;i<comm_size;i++) {
769 recvcounts[i] = atoi(action[i+3]);
770 recv_sum=recv_sum+recvcounts[i];
772 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
774 int my_proc_id = Actor::self()->getPid();
776 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
778 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
779 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
780 encode_datatype(MPI_CURRENT_TYPE),
781 encode_datatype(MPI_CURRENT_TYPE2)));
783 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
786 TRACE_smpi_comm_out(my_proc_id);
787 log_timed_action (action, clock);
790 static void action_allToAllv(const char *const *action) {
791 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
792 0 allToAllV 100 1 7 10 12 100 1 70 10 5
794 1) 100 is the size of the send buffer *sizeof(int),
795 2) 1 7 10 12 is the sendcounts array
796 3) 100*sizeof(int) is the size of the receiver buffer
797 4) 1 70 10 5 is the recvcounts array
799 double clock = smpi_process()->simulated_elapsed();
801 int comm_size = MPI_COMM_WORLD->size();
802 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
803 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
804 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
805 std::vector<int> senddisps(comm_size, 0);
806 std::vector<int> recvdisps(comm_size, 0);
808 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
809 ? decode_datatype(action[4 + 2 * comm_size])
811 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
812 ? decode_datatype(action[5 + 2 * comm_size])
815 int send_buf_size=parse_double(action[2]);
816 int recv_buf_size=parse_double(action[3+comm_size]);
817 int my_proc_id = Actor::self()->getPid();
818 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
819 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
821 for(int i=0;i<comm_size;i++) {
822 (*sendcounts)[i] = atoi(action[3 + i]);
823 (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
825 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
826 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
828 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
829 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
830 encode_datatype(MPI_CURRENT_TYPE),
831 encode_datatype(MPI_CURRENT_TYPE2)));
833 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
834 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
836 TRACE_smpi_comm_out(my_proc_id);
837 log_timed_action (action, clock);
840 }} // namespace simgrid::smpi
842 /** @brief Only initialize the replay, don't do it for real */
843 void smpi_replay_init(int* argc, char*** argv)
845 simgrid::smpi::Process::init(argc, argv);
846 smpi_process()->mark_as_initialized();
847 smpi_process()->set_replaying(true);
849 int my_proc_id = Actor::self()->getPid();
850 TRACE_smpi_init(my_proc_id);
851 TRACE_smpi_computing_init(my_proc_id);
852 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
853 TRACE_smpi_comm_out(my_proc_id);
854 xbt_replay_action_register("init", simgrid::smpi::action_init);
855 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
856 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
857 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
858 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
859 xbt_replay_action_register("send", simgrid::smpi::action_send);
860 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
861 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
862 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
863 xbt_replay_action_register("test", simgrid::smpi::action_test);
864 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
865 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
866 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
867 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
868 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
869 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
870 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
871 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
872 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
873 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
874 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
875 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
876 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
877 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
878 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
879 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
881 //if we have a delayed start, sleep here.
883 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
884 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
885 smpi_execute_flops(value);
887 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
888 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
889 smpi_execute_flops(0.0);
893 /** @brief actually run the replay after initialization */
894 void smpi_replay_main(int* argc, char*** argv)
896 simgrid::xbt::replay_runner(*argc, *argv);
898 /* and now, finalize everything */
899 /* One active process will stop. Decrease the counter*/
900 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
901 if (not get_reqq_self()->empty()) {
902 unsigned int count_requests=get_reqq_self()->size();
903 MPI_Request requests[count_requests];
904 MPI_Status status[count_requests];
907 for (auto const& req : *get_reqq_self()) {
911 simgrid::smpi::Request::waitall(count_requests, requests, status);
913 delete get_reqq_self();
916 if(active_processes==0){
917 /* Last process alive speaking: end the simulated timer */
918 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
919 xbt_free(sendbuffer);
920 xbt_free(recvbuffer);
923 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
925 smpi_process()->finalize();
927 TRACE_smpi_comm_out(Actor::self()->getPid());
928 TRACE_smpi_finalize(Actor::self()->getPid());
931 /** @brief chain a replay initialization and a replay start */
932 void smpi_replay_run(int* argc, char*** argv)
934 smpi_replay_init(argc, argv);
935 smpi_replay_main(argc, argv);