1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(smpi_process()->index());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({smpi_process()->index(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (not smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (not smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (not smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
94 switch(atoi(action)) {
96 MPI_CURRENT_TYPE=MPI_DOUBLE;
99 MPI_CURRENT_TYPE=MPI_INT;
102 MPI_CURRENT_TYPE=MPI_CHAR;
105 MPI_CURRENT_TYPE=MPI_SHORT;
108 MPI_CURRENT_TYPE=MPI_LONG;
111 MPI_CURRENT_TYPE=MPI_FLOAT;
114 MPI_CURRENT_TYPE=MPI_BYTE;
117 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
120 return MPI_CURRENT_TYPE;
123 const char* encode_datatype(MPI_Datatype datatype)
125 if (datatype==MPI_BYTE)
127 if(datatype==MPI_DOUBLE)
129 if(datatype==MPI_INT)
131 if(datatype==MPI_CHAR)
133 if(datatype==MPI_SHORT)
135 if(datatype==MPI_LONG)
137 if(datatype==MPI_FLOAT)
139 // default - not implemented.
140 // do not warn here as we pass in this function even for other trace formats
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
146 while(action[i]!=nullptr)\
149 THROWF(arg_error, 0, "%s replay failed.\n" \
150 "%d items were given on the line. First two should be process_id and action. " \
151 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
158 static void action_init(const char *const *action)
160 XBT_DEBUG("Initialize the counters");
161 CHECK_ACTION_PARAMS(action, 0, 1)
163 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
165 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
167 /* start a simulated timer */
168 smpi_process()->simulated_start();
169 /*initialize the number of active processes */
170 active_processes = smpi_process_count();
172 set_reqq_self(new std::vector<MPI_Request>);
175 static void action_finalize(const char *const *action)
180 static void action_comm_size(const char *const *action)
182 communicator_size = parse_double(action[2]);
183 log_timed_action (action, smpi_process()->simulated_elapsed());
186 static void action_comm_split(const char *const *action)
188 log_timed_action (action, smpi_process()->simulated_elapsed());
191 static void action_comm_dup(const char *const *action)
193 log_timed_action (action, smpi_process()->simulated_elapsed());
196 static void action_compute(const char *const *action)
198 CHECK_ACTION_PARAMS(action, 1, 0)
199 double clock = smpi_process()->simulated_elapsed();
200 double flops= parse_double(action[2]);
201 int rank = smpi_process()->index();
203 TRACE_smpi_computing_in(rank, flops);
204 smpi_execute_flops(flops);
205 TRACE_smpi_computing_out(rank);
207 log_timed_action (action, clock);
210 static void action_send(const char *const *action)
212 CHECK_ACTION_PARAMS(action, 2, 1)
213 int to = atoi(action[2]);
214 double size=parse_double(action[3]);
215 double clock = smpi_process()->simulated_elapsed();
218 MPI_CURRENT_TYPE=decode_datatype(action[4]);
220 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
222 int rank = smpi_process()->index();
223 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
225 TRACE_smpi_comm_in(rank, __FUNCTION__,
226 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
227 if (not TRACE_smpi_view_internals())
228 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
230 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
232 TRACE_smpi_comm_out(rank);
234 log_timed_action(action, clock);
237 static void action_Isend(const char *const *action)
239 CHECK_ACTION_PARAMS(action, 2, 1)
240 int to = atoi(action[2]);
241 double size=parse_double(action[3]);
242 double clock = smpi_process()->simulated_elapsed();
245 MPI_CURRENT_TYPE=decode_datatype(action[4]);
247 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
249 int rank = smpi_process()->index();
250 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
251 TRACE_smpi_comm_in(rank, __FUNCTION__,
252 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
253 if (not TRACE_smpi_view_internals())
254 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
256 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
258 TRACE_smpi_comm_out(rank);
260 get_reqq_self()->push_back(request);
262 log_timed_action (action, clock);
265 static void action_recv(const char *const *action) {
266 CHECK_ACTION_PARAMS(action, 2, 1)
267 int from = atoi(action[2]);
268 double size=parse_double(action[3]);
269 double clock = smpi_process()->simulated_elapsed();
273 MPI_CURRENT_TYPE=decode_datatype(action[4]);
275 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
277 int rank = smpi_process()->index();
278 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
280 TRACE_smpi_comm_in(rank, __FUNCTION__,
281 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
283 //unknown size from the receiver point of view
285 Request::probe(from, 0, MPI_COMM_WORLD, &status);
289 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
291 TRACE_smpi_comm_out(rank);
292 if (not TRACE_smpi_view_internals()) {
293 TRACE_smpi_recv(src_traced, rank, 0);
296 log_timed_action (action, clock);
299 static void action_Irecv(const char *const *action)
301 CHECK_ACTION_PARAMS(action, 2, 1)
302 int from = atoi(action[2]);
303 double size=parse_double(action[3]);
304 double clock = smpi_process()->simulated_elapsed();
307 MPI_CURRENT_TYPE=decode_datatype(action[4]);
309 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
311 int rank = smpi_process()->index();
312 TRACE_smpi_comm_in(rank, __FUNCTION__,
313 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
315 //unknow size from the receiver pov
317 Request::probe(from, 0, MPI_COMM_WORLD, &status);
321 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
323 TRACE_smpi_comm_out(rank);
324 get_reqq_self()->push_back(request);
326 log_timed_action (action, clock);
329 static void action_test(const char* const* action)
331 CHECK_ACTION_PARAMS(action, 0, 0)
332 double clock = smpi_process()->simulated_elapsed();
335 MPI_Request request = get_reqq_self()->back();
336 get_reqq_self()->pop_back();
337 //if request is null here, this may mean that a previous test has succeeded
338 //Different times in traced application and replayed version may lead to this
339 //In this case, ignore the extra calls.
340 if(request!=nullptr){
341 int rank = smpi_process()->index();
342 TRACE_smpi_testing_in(rank);
344 int flag = Request::test(&request, &status);
346 XBT_DEBUG("MPI_Test result: %d", flag);
347 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
348 get_reqq_self()->push_back(request);
350 TRACE_smpi_testing_out(rank);
352 log_timed_action (action, clock);
355 static void action_wait(const char *const *action){
356 CHECK_ACTION_PARAMS(action, 0, 0)
357 double clock = smpi_process()->simulated_elapsed();
360 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
361 xbt_str_join_array(action," "));
362 MPI_Request request = get_reqq_self()->back();
363 get_reqq_self()->pop_back();
365 if (request==nullptr){
366 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
370 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
372 MPI_Group group = request->comm()->group();
373 int src_traced = group->rank(request->src());
374 int dst_traced = group->rank(request->dst());
375 int is_wait_for_receive = (request->flags() & RECV);
376 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
378 Request::wait(&request, &status);
380 TRACE_smpi_comm_out(rank);
381 if (is_wait_for_receive)
382 TRACE_smpi_recv(src_traced, dst_traced, 0);
383 log_timed_action (action, clock);
386 static void action_waitall(const char *const *action){
387 CHECK_ACTION_PARAMS(action, 0, 0)
388 double clock = smpi_process()->simulated_elapsed();
389 const unsigned int count_requests = get_reqq_self()->size();
391 if (count_requests>0) {
392 MPI_Status status[count_requests];
394 int rank_traced = smpi_process()->index();
395 TRACE_smpi_comm_in(rank_traced, __FUNCTION__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
396 int recvs_snd[count_requests];
397 int recvs_rcv[count_requests];
398 for (unsigned int i = 0; i < count_requests; i++) {
399 const auto& req = (*get_reqq_self())[i];
400 if (req && (req->flags () & RECV)){
401 recvs_snd[i]=req->src();
402 recvs_rcv[i]=req->dst();
406 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
408 for (unsigned i = 0; i < count_requests; i++) {
409 if (recvs_snd[i]!=-100)
410 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
412 TRACE_smpi_comm_out(rank_traced);
414 log_timed_action (action, clock);
417 static void action_barrier(const char *const *action){
418 double clock = smpi_process()->simulated_elapsed();
419 int rank = smpi_process()->index();
420 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
422 Colls::barrier(MPI_COMM_WORLD);
424 TRACE_smpi_comm_out(rank);
425 log_timed_action (action, clock);
428 static void action_bcast(const char *const *action)
430 CHECK_ACTION_PARAMS(action, 1, 2)
431 double size = parse_double(action[2]);
432 double clock = smpi_process()->simulated_elapsed();
434 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
435 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
438 root= atoi(action[3]);
440 MPI_CURRENT_TYPE=decode_datatype(action[4]);
443 int rank = smpi_process()->index();
444 TRACE_smpi_comm_in(rank, __FUNCTION__,
445 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size, -1,
446 encode_datatype(MPI_CURRENT_TYPE), ""));
448 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
450 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
452 TRACE_smpi_comm_out(rank);
453 log_timed_action (action, clock);
456 static void action_reduce(const char *const *action)
458 CHECK_ACTION_PARAMS(action, 2, 2)
459 double comm_size = parse_double(action[2]);
460 double comp_size = parse_double(action[3]);
461 double clock = smpi_process()->simulated_elapsed();
463 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
466 root= atoi(action[4]);
468 MPI_CURRENT_TYPE=decode_datatype(action[5]);
471 int rank = smpi_process()->index();
472 TRACE_smpi_comm_in(rank, __FUNCTION__,
473 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
474 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
476 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
479 smpi_execute_flops(comp_size);
481 TRACE_smpi_comm_out(rank);
482 log_timed_action (action, clock);
485 static void action_allReduce(const char *const *action) {
486 CHECK_ACTION_PARAMS(action, 2, 1)
487 double comm_size = parse_double(action[2]);
488 double comp_size = parse_double(action[3]);
491 MPI_CURRENT_TYPE=decode_datatype(action[4]);
493 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
495 double clock = smpi_process()->simulated_elapsed();
496 int rank = smpi_process()->index();
497 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
498 encode_datatype(MPI_CURRENT_TYPE), ""));
500 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
501 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
502 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
503 smpi_execute_flops(comp_size);
505 TRACE_smpi_comm_out(rank);
506 log_timed_action (action, clock);
509 static void action_allToAll(const char *const *action) {
510 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
511 double clock = smpi_process()->simulated_elapsed();
512 int comm_size = MPI_COMM_WORLD->size();
513 int send_size = parse_double(action[2]);
514 int recv_size = parse_double(action[3]);
515 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
517 if(action[4] && action[5]) {
518 MPI_CURRENT_TYPE=decode_datatype(action[4]);
519 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
522 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
524 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
525 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
527 int rank = smpi_process()->index();
528 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
529 encode_datatype(MPI_CURRENT_TYPE),
530 encode_datatype(MPI_CURRENT_TYPE2)));
532 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
534 TRACE_smpi_comm_out(rank);
535 log_timed_action (action, clock);
538 static void action_gather(const char *const *action) {
539 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
542 1) 68 is the sendcounts
543 2) 68 is the recvcounts
544 3) 0 is the root node
545 4) 0 is the send datatype id, see decode_datatype()
546 5) 0 is the recv datatype id, see decode_datatype()
548 CHECK_ACTION_PARAMS(action, 2, 3)
549 double clock = smpi_process()->simulated_elapsed();
550 int comm_size = MPI_COMM_WORLD->size();
551 int send_size = parse_double(action[2]);
552 int recv_size = parse_double(action[3]);
553 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
554 if(action[4] && action[5]) {
555 MPI_CURRENT_TYPE=decode_datatype(action[5]);
556 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
558 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
560 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
561 void *recv = nullptr;
564 root=atoi(action[4]);
565 int rank = MPI_COMM_WORLD->rank();
568 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
570 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
571 encode_datatype(MPI_CURRENT_TYPE),
572 encode_datatype(MPI_CURRENT_TYPE2)));
574 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
576 TRACE_smpi_comm_out(smpi_process()->index());
577 log_timed_action (action, clock);
580 static void action_scatter(const char* const* action)
582 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
585 1) 68 is the sendcounts
586 2) 68 is the recvcounts
587 3) 0 is the root node
588 4) 0 is the send datatype id, see decode_datatype()
589 5) 0 is the recv datatype id, see decode_datatype()
591 CHECK_ACTION_PARAMS(action, 2, 3)
592 double clock = smpi_process()->simulated_elapsed();
593 int comm_size = MPI_COMM_WORLD->size();
594 int send_size = parse_double(action[2]);
595 int recv_size = parse_double(action[3]);
596 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
597 if (action[4] && action[5]) {
598 MPI_CURRENT_TYPE = decode_datatype(action[5]);
599 MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
601 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
603 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
604 void* recv = nullptr;
607 root = atoi(action[4]);
608 int rank = MPI_COMM_WORLD->rank();
611 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
613 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
614 encode_datatype(MPI_CURRENT_TYPE),
615 encode_datatype(MPI_CURRENT_TYPE2)));
617 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
619 TRACE_smpi_comm_out(smpi_process()->index());
620 log_timed_action(action, clock);
623 static void action_gatherv(const char *const *action) {
624 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
625 0 gather 68 68 10 10 10 0 0 0
627 1) 68 is the sendcount
628 2) 68 10 10 10 is the recvcounts
629 3) 0 is the root node
630 4) 0 is the send datatype id, see decode_datatype()
631 5) 0 is the recv datatype id, see decode_datatype()
633 double clock = smpi_process()->simulated_elapsed();
634 int comm_size = MPI_COMM_WORLD->size();
635 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
636 int send_size = parse_double(action[2]);
637 int disps[comm_size];
638 int recvcounts[comm_size];
641 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
642 if(action[4+comm_size] && action[5+comm_size]) {
643 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
644 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
646 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
648 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
649 void *recv = nullptr;
650 for(int i=0;i<comm_size;i++) {
651 recvcounts[i] = atoi(action[i+3]);
652 recv_sum=recv_sum+recvcounts[i];
656 int root=atoi(action[3+comm_size]);
657 int rank = MPI_COMM_WORLD->rank();
660 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
662 std::vector<int>* trace_recvcounts = new std::vector<int>;
663 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
664 trace_recvcounts->push_back(recvcounts[i]);
666 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
667 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
668 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
670 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
672 TRACE_smpi_comm_out(smpi_process()->index());
673 log_timed_action (action, clock);
676 static void action_scatterv(const char* const* action)
678 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
679 0 gather 68 10 10 10 68 0 0 0
681 1) 68 10 10 10 is the sendcounts
682 2) 68 is the recvcount
683 3) 0 is the root node
684 4) 0 is the send datatype id, see decode_datatype()
685 5) 0 is the recv datatype id, see decode_datatype()
687 double clock = smpi_process()->simulated_elapsed();
688 int comm_size = MPI_COMM_WORLD->size();
689 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
690 int recv_size = parse_double(action[2 + comm_size]);
691 int disps[comm_size];
692 int sendcounts[comm_size];
695 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
696 if (action[4 + comm_size] && action[5 + comm_size]) {
697 MPI_CURRENT_TYPE = decode_datatype(action[4 + comm_size]);
698 MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
700 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
702 void* send = nullptr;
703 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
704 for (int i = 0; i < comm_size; i++) {
705 sendcounts[i] = atoi(action[i + 2]);
706 send_sum += sendcounts[i];
710 int root = atoi(action[3 + comm_size]);
711 int rank = MPI_COMM_WORLD->rank();
714 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
716 std::vector<int>* trace_sendcounts = new std::vector<int>;
717 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
718 trace_sendcounts->push_back(sendcounts[i]);
720 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
721 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
722 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
724 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
726 TRACE_smpi_comm_out(smpi_process()->index());
727 log_timed_action(action, clock);
730 static void action_reducescatter(const char *const *action) {
731 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
732 0 reduceScatter 275427 275427 275427 204020 11346849 0
734 1) The first four values after the name of the action declare the recvcounts array
735 2) The value 11346849 is the amount of instructions
736 3) The last value corresponds to the datatype, see decode_datatype().
738 double clock = smpi_process()->simulated_elapsed();
739 int comm_size = MPI_COMM_WORLD->size();
740 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
741 int comp_size = parse_double(action[2+comm_size]);
742 int recvcounts[comm_size];
743 int rank = smpi_process()->index();
745 std::vector<int>* trace_recvcounts = new std::vector<int>;
746 if(action[3+comm_size])
747 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
749 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
751 for(int i=0;i<comm_size;i++) {
752 recvcounts[i] = atoi(action[i+2]);
753 trace_recvcounts->push_back(recvcounts[i]);
757 TRACE_smpi_comm_in(rank, __FUNCTION__,
758 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
759 std::to_string(comp_size), /* ugly hack to print comp_size */
760 encode_datatype(MPI_CURRENT_TYPE)));
762 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
763 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
765 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
766 smpi_execute_flops(comp_size);
768 TRACE_smpi_comm_out(rank);
769 log_timed_action (action, clock);
772 static void action_allgather(const char *const *action) {
773 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
774 0 allGather 275427 275427
776 1) 275427 is the sendcount
777 2) 275427 is the recvcount
778 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
780 double clock = smpi_process()->simulated_elapsed();
782 CHECK_ACTION_PARAMS(action, 2, 2)
783 int sendcount=atoi(action[2]);
784 int recvcount=atoi(action[3]);
786 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
788 if(action[4] && action[5]) {
789 MPI_CURRENT_TYPE = decode_datatype(action[4]);
790 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
792 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
794 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
795 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
797 int rank = smpi_process()->index();
799 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
800 encode_datatype(MPI_CURRENT_TYPE),
801 encode_datatype(MPI_CURRENT_TYPE2)));
803 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
805 TRACE_smpi_comm_out(rank);
806 log_timed_action (action, clock);
809 static void action_allgatherv(const char *const *action) {
810 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
811 0 allGatherV 275427 275427 275427 275427 204020
813 1) 275427 is the sendcount
814 2) The next four elements declare the recvcounts array
815 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
817 double clock = smpi_process()->simulated_elapsed();
819 int comm_size = MPI_COMM_WORLD->size();
820 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
821 int sendcount=atoi(action[2]);
822 int recvcounts[comm_size];
823 int disps[comm_size];
825 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
827 if(action[3+comm_size] && action[4+comm_size]) {
828 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
829 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
831 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
833 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
835 for(int i=0;i<comm_size;i++) {
836 recvcounts[i] = atoi(action[i+3]);
837 recv_sum=recv_sum+recvcounts[i];
840 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
842 int rank = smpi_process()->index();
844 std::vector<int>* trace_recvcounts = new std::vector<int>;
845 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
846 trace_recvcounts->push_back(recvcounts[i]);
848 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
849 "allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
850 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
852 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
855 TRACE_smpi_comm_out(rank);
856 log_timed_action (action, clock);
859 static void action_allToAllv(const char *const *action) {
860 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
861 0 allToAllV 100 1 7 10 12 100 1 70 10 5
863 1) 100 is the size of the send buffer *sizeof(int),
864 2) 1 7 10 12 is the sendcounts array
865 3) 100*sizeof(int) is the size of the receiver buffer
866 4) 1 70 10 5 is the recvcounts array
868 double clock = smpi_process()->simulated_elapsed();
870 int comm_size = MPI_COMM_WORLD->size();
871 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
874 int sendcounts[comm_size];
875 std::vector<int>* trace_sendcounts = new std::vector<int>;
876 int recvcounts[comm_size];
877 std::vector<int>* trace_recvcounts = new std::vector<int>;
878 int senddisps[comm_size];
879 int recvdisps[comm_size];
881 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
883 int send_buf_size=parse_double(action[2]);
884 int recv_buf_size=parse_double(action[3+comm_size]);
885 if(action[4+2*comm_size] && action[5+2*comm_size]) {
886 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
887 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
890 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
892 int rank = smpi_process()->index();
893 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
894 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
896 for(int i=0;i<comm_size;i++) {
897 sendcounts[i] = atoi(action[i+3]);
898 trace_sendcounts->push_back(sendcounts[i]);
899 send_size += sendcounts[i];
900 recvcounts[i] = atoi(action[i+4+comm_size]);
901 trace_recvcounts->push_back(recvcounts[i]);
902 recv_size += recvcounts[i];
907 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
908 "allToAllV", -1, send_size, trace_sendcounts, recv_size, trace_recvcounts,
909 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
911 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
912 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
914 TRACE_smpi_comm_out(rank);
915 log_timed_action (action, clock);
918 }} // namespace simgrid::smpi
920 /** @brief Only initialize the replay, don't do it for real */
921 void smpi_replay_init(int* argc, char*** argv)
923 simgrid::smpi::Process::init(argc, argv);
924 smpi_process()->mark_as_initialized();
925 smpi_process()->set_replaying(true);
927 int rank = smpi_process()->index();
928 TRACE_smpi_init(rank);
929 TRACE_smpi_computing_init(rank);
930 TRACE_smpi_comm_in(rank, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
931 TRACE_smpi_comm_out(rank);
932 xbt_replay_action_register("init", simgrid::smpi::action_init);
933 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
934 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
935 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
936 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
937 xbt_replay_action_register("send", simgrid::smpi::action_send);
938 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
939 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
940 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
941 xbt_replay_action_register("test", simgrid::smpi::action_test);
942 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
943 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
944 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
945 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
946 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
947 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
948 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
949 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
950 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
951 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
952 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
953 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
954 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
955 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
956 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
957 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
959 //if we have a delayed start, sleep here.
961 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
962 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
963 smpi_execute_flops(value);
965 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
966 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
967 smpi_execute_flops(0.0);
971 /** @brief actually run the replay after initialization */
972 void smpi_replay_main(int* argc, char*** argv)
974 simgrid::xbt::replay_runner(*argc, *argv);
976 /* and now, finalize everything */
977 /* One active process will stop. Decrease the counter*/
978 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
979 if (not get_reqq_self()->empty()) {
980 unsigned int count_requests=get_reqq_self()->size();
981 MPI_Request requests[count_requests];
982 MPI_Status status[count_requests];
985 for (auto const& req : *get_reqq_self()) {
989 simgrid::smpi::Request::waitall(count_requests, requests, status);
991 delete get_reqq_self();
994 if(active_processes==0){
995 /* Last process alive speaking: end the simulated timer */
996 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
997 xbt_free(sendbuffer);
998 xbt_free(recvbuffer);
1001 TRACE_smpi_comm_in(smpi_process()->index(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1003 smpi_process()->finalize();
1005 TRACE_smpi_comm_out(smpi_process()->index());
1006 TRACE_smpi_finalize(smpi_process()->index());
1009 /** @brief chain a replay initialization and a replay start */
1010 void smpi_replay_run(int* argc, char*** argv)
1012 smpi_replay_init(argc, argv);
1013 smpi_replay_main(argc, argv);