1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "xbt/replay.h"
8 #include <unordered_map>
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15 int communicator_size = 0;
16 static int active_processes = 0;
17 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
22 static int sendbuffer_size=0;
23 char* sendbuffer=nullptr;
24 static int recvbuffer_size=0;
25 char* recvbuffer=nullptr;
27 static void log_timed_action (const char *const *action, double clock){
28 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
29 char *name = xbt_str_join_array(action, " ");
30 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
35 static std::vector<MPI_Request>* get_reqq_self()
37 return reqq.at(smpi_process_index());
40 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
42 reqq.insert({smpi_process_index(), mpi_request});
45 //allocate a single buffer for all sends, growing it if needed
46 void* smpi_get_tmp_sendbuffer(int size)
48 if (!smpi_process_get_replaying())
49 return xbt_malloc(size);
50 if (sendbuffer_size<size){
51 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
57 //allocate a single buffer for all recv
58 void* smpi_get_tmp_recvbuffer(int size){
59 if (!smpi_process_get_replaying())
60 return xbt_malloc(size);
61 if (recvbuffer_size<size){
62 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
68 void smpi_free_tmp_buffer(void* buf){
69 if (!smpi_process_get_replaying())
74 static double parse_double(const char *string)
77 double value = strtod(string, &endptr);
79 THROWF(unknown_error, 0, "%s is not a double", string);
83 static MPI_Datatype decode_datatype(const char *const action)
85 switch(atoi(action)) {
87 MPI_CURRENT_TYPE=MPI_DOUBLE;
90 MPI_CURRENT_TYPE=MPI_INT;
93 MPI_CURRENT_TYPE=MPI_CHAR;
96 MPI_CURRENT_TYPE=MPI_SHORT;
99 MPI_CURRENT_TYPE=MPI_LONG;
102 MPI_CURRENT_TYPE=MPI_FLOAT;
105 MPI_CURRENT_TYPE=MPI_BYTE;
108 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
110 return MPI_CURRENT_TYPE;
113 const char* encode_datatype(MPI_Datatype datatype, int* known)
115 //default type for output is set to MPI_BYTE
116 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
119 if (datatype==MPI_BYTE)
121 if(datatype==MPI_DOUBLE)
123 if(datatype==MPI_INT)
125 if(datatype==MPI_CHAR)
127 if(datatype==MPI_SHORT)
129 if(datatype==MPI_LONG)
131 if(datatype==MPI_FLOAT)
133 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
136 // default - not implemented.
137 // do not warn here as we pass in this function even for other trace formats
141 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
143 while(action[i]!=nullptr)\
146 THROWF(arg_error, 0, "%s replay failed.\n" \
147 "%d items were given on the line. First two should be process_id and action. " \
148 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
149 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
152 static void action_init(const char *const *action)
154 XBT_DEBUG("Initialize the counters");
155 CHECK_ACTION_PARAMS(action, 0, 1)
157 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
158 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
160 /* start a simulated timer */
161 smpi_process_simulated_start();
162 /*initialize the number of active processes */
163 active_processes = smpi_process_count();
165 set_reqq_self(new std::vector<MPI_Request>);
168 static void action_finalize(const char *const *action)
173 static void action_comm_size(const char *const *action)
175 communicator_size = parse_double(action[2]);
176 log_timed_action (action, smpi_process_simulated_elapsed());
179 static void action_comm_split(const char *const *action)
181 log_timed_action (action, smpi_process_simulated_elapsed());
184 static void action_comm_dup(const char *const *action)
186 log_timed_action (action, smpi_process_simulated_elapsed());
189 static void action_compute(const char *const *action)
191 CHECK_ACTION_PARAMS(action, 1, 0)
192 double clock = smpi_process_simulated_elapsed();
193 double flops= parse_double(action[2]);
194 int rank = smpi_process_index();
195 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
196 extra->type=TRACING_COMPUTING;
197 extra->comp_size=flops;
198 TRACE_smpi_computing_in(rank, extra);
200 smpi_execute_flops(flops);
202 TRACE_smpi_computing_out(rank);
203 log_timed_action (action, clock);
206 static void action_send(const char *const *action)
208 CHECK_ACTION_PARAMS(action, 2, 1)
209 int to = atoi(action[2]);
210 double size=parse_double(action[3]);
211 double clock = smpi_process_simulated_elapsed();
214 MPI_CURRENT_TYPE=decode_datatype(action[4]);
216 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
218 int rank = smpi_process_index();
220 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
221 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
222 extra->type = TRACING_SEND;
223 extra->send_size = size;
225 extra->dst = dst_traced;
226 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
227 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
228 if (!TRACE_smpi_view_internals())
229 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
231 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
233 log_timed_action (action, clock);
235 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
238 static void action_Isend(const char *const *action)
240 CHECK_ACTION_PARAMS(action, 2, 1)
241 int to = atoi(action[2]);
242 double size=parse_double(action[3]);
243 double clock = smpi_process_simulated_elapsed();
246 MPI_CURRENT_TYPE=decode_datatype(action[4]);
248 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250 int rank = smpi_process_index();
251 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
252 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253 extra->type = TRACING_ISEND;
254 extra->send_size = size;
256 extra->dst = dst_traced;
257 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
258 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
259 if (!TRACE_smpi_view_internals())
260 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
262 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
264 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
266 get_reqq_self()->push_back(request);
268 log_timed_action (action, clock);
271 static void action_recv(const char *const *action) {
272 CHECK_ACTION_PARAMS(action, 2, 1)
273 int from = atoi(action[2]);
274 double size=parse_double(action[3]);
275 double clock = smpi_process_simulated_elapsed();
279 MPI_CURRENT_TYPE=decode_datatype(action[4]);
281 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
283 int rank = smpi_process_index();
284 int src_traced = MPI_COMM_WORLD->group()->rank(from);
286 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
287 extra->type = TRACING_RECV;
288 extra->send_size = size;
289 extra->src = src_traced;
291 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
292 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
294 //unknown size from the receiver point of view
296 Request::probe(from, 0, MPI_COMM_WORLD, &status);
300 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
302 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
303 if (!TRACE_smpi_view_internals()) {
304 TRACE_smpi_recv(rank, src_traced, rank, 0);
307 log_timed_action (action, clock);
310 static void action_Irecv(const char *const *action)
312 CHECK_ACTION_PARAMS(action, 2, 1)
313 int from = atoi(action[2]);
314 double size=parse_double(action[3]);
315 double clock = smpi_process_simulated_elapsed();
318 MPI_CURRENT_TYPE=decode_datatype(action[4]);
320 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
322 int rank = smpi_process_index();
323 int src_traced = MPI_COMM_WORLD->group()->rank(from);
324 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
325 extra->type = TRACING_IRECV;
326 extra->send_size = size;
327 extra->src = src_traced;
329 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
330 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
332 //unknow size from the receiver pov
334 Request::probe(from, 0, MPI_COMM_WORLD, &status);
338 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
340 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
341 get_reqq_self()->push_back(request);
343 log_timed_action (action, clock);
346 static void action_test(const char *const *action){
347 CHECK_ACTION_PARAMS(action, 0, 0)
348 double clock = smpi_process_simulated_elapsed();
351 MPI_Request request = get_reqq_self()->back();
352 get_reqq_self()->pop_back();
353 //if request is null here, this may mean that a previous test has succeeded
354 //Different times in traced application and replayed version may lead to this
355 //In this case, ignore the extra calls.
356 if(request!=nullptr){
357 int rank = smpi_process_index();
358 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359 extra->type=TRACING_TEST;
360 TRACE_smpi_testing_in(rank, extra);
362 int flag = Request::test(&request, &status);
364 XBT_DEBUG("MPI_Test result: %d", flag);
365 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
366 get_reqq_self()->push_back(request);
368 TRACE_smpi_testing_out(rank);
370 log_timed_action (action, clock);
373 static void action_wait(const char *const *action){
374 CHECK_ACTION_PARAMS(action, 0, 0)
375 double clock = smpi_process_simulated_elapsed();
378 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
379 xbt_str_join_array(action," "));
380 MPI_Request request = get_reqq_self()->back();
381 get_reqq_self()->pop_back();
383 if (request==nullptr){
384 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
388 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
390 MPI_Group group = request->comm()->group();
391 int src_traced = group->rank(request->src());
392 int dst_traced = group->rank(request->dst());
393 int is_wait_for_receive = (request->flags() & RECV);
394 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
395 extra->type = TRACING_WAIT;
396 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
398 Request::wait(&request, &status);
400 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
401 if (is_wait_for_receive)
402 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
403 log_timed_action (action, clock);
406 static void action_waitall(const char *const *action){
407 CHECK_ACTION_PARAMS(action, 0, 0)
408 double clock = smpi_process_simulated_elapsed();
409 unsigned int count_requests=get_reqq_self()->size();
411 if (count_requests>0) {
412 MPI_Status status[count_requests];
414 int rank_traced = smpi_process_index();
415 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
416 extra->type = TRACING_WAITALL;
417 extra->send_size=count_requests;
418 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
419 int recvs_snd[count_requests];
420 int recvs_rcv[count_requests];
422 for (auto req : *(get_reqq_self())){
423 if (req && (req->flags () & RECV)){
424 recvs_snd[i]=req->src();
425 recvs_rcv[i]=req->dst();
430 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
432 for (i=0; i<count_requests;i++){
433 if (recvs_snd[i]!=-100)
434 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
436 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
438 log_timed_action (action, clock);
441 static void action_barrier(const char *const *action){
442 double clock = smpi_process_simulated_elapsed();
443 int rank = smpi_process_index();
444 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
445 extra->type = TRACING_BARRIER;
446 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
448 mpi_coll_barrier_fun(MPI_COMM_WORLD);
450 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
451 log_timed_action (action, clock);
454 static void action_bcast(const char *const *action)
456 CHECK_ACTION_PARAMS(action, 1, 2)
457 double size = parse_double(action[2]);
458 double clock = smpi_process_simulated_elapsed();
460 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
461 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
464 root= atoi(action[3]);
466 MPI_CURRENT_TYPE=decode_datatype(action[4]);
469 int rank = smpi_process_index();
470 int root_traced = MPI_COMM_WORLD->group()->index(root);
472 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
473 extra->type = TRACING_BCAST;
474 extra->send_size = size;
475 extra->root = root_traced;
476 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
477 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
478 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
480 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
482 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
483 log_timed_action (action, clock);
486 static void action_reduce(const char *const *action)
488 CHECK_ACTION_PARAMS(action, 2, 2)
489 double comm_size = parse_double(action[2]);
490 double comp_size = parse_double(action[3]);
491 double clock = smpi_process_simulated_elapsed();
493 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
496 root= atoi(action[4]);
498 MPI_CURRENT_TYPE=decode_datatype(action[5]);
501 int rank = smpi_process_index();
502 int root_traced = MPI_COMM_WORLD->group()->rank(root);
503 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
504 extra->type = TRACING_REDUCE;
505 extra->send_size = comm_size;
506 extra->comp_size = comp_size;
507 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
508 extra->root = root_traced;
510 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
512 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
513 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
514 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
515 smpi_execute_flops(comp_size);
517 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
518 log_timed_action (action, clock);
521 static void action_allReduce(const char *const *action) {
522 CHECK_ACTION_PARAMS(action, 2, 1)
523 double comm_size = parse_double(action[2]);
524 double comp_size = parse_double(action[3]);
527 MPI_CURRENT_TYPE=decode_datatype(action[4]);
529 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
531 double clock = smpi_process_simulated_elapsed();
532 int rank = smpi_process_index();
533 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
534 extra->type = TRACING_ALLREDUCE;
535 extra->send_size = comm_size;
536 extra->comp_size = comp_size;
537 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
538 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
540 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
541 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
542 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
543 smpi_execute_flops(comp_size);
545 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
546 log_timed_action (action, clock);
549 static void action_allToAll(const char *const *action) {
550 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
551 double clock = smpi_process_simulated_elapsed();
552 int comm_size = MPI_COMM_WORLD->size();
553 int send_size = parse_double(action[2]);
554 int recv_size = parse_double(action[3]);
555 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
557 if(action[4] && action[5]) {
558 MPI_CURRENT_TYPE=decode_datatype(action[4]);
559 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
562 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
564 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
565 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
567 int rank = smpi_process_index();
568 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
569 extra->type = TRACING_ALLTOALL;
570 extra->send_size = send_size;
571 extra->recv_size = recv_size;
572 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
573 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
575 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
577 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
579 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
580 log_timed_action (action, clock);
583 static void action_gather(const char *const *action) {
584 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
587 1) 68 is the sendcounts
588 2) 68 is the recvcounts
589 3) 0 is the root node
590 4) 0 is the send datatype id, see decode_datatype()
591 5) 0 is the recv datatype id, see decode_datatype()
593 CHECK_ACTION_PARAMS(action, 2, 3)
594 double clock = smpi_process_simulated_elapsed();
595 int comm_size = MPI_COMM_WORLD->size();
596 int send_size = parse_double(action[2]);
597 int recv_size = parse_double(action[3]);
598 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
599 if(action[4] && action[5]) {
600 MPI_CURRENT_TYPE=decode_datatype(action[5]);
601 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
603 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
605 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
606 void *recv = nullptr;
609 root=atoi(action[4]);
610 int rank = MPI_COMM_WORLD->rank();
613 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
615 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
616 extra->type = TRACING_GATHER;
617 extra->send_size = send_size;
618 extra->recv_size = recv_size;
620 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
621 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
623 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
625 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
627 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
628 log_timed_action (action, clock);
631 static void action_gatherv(const char *const *action) {
632 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
633 0 gather 68 68 10 10 10 0 0 0
635 1) 68 is the sendcount
636 2) 68 10 10 10 is the recvcounts
637 3) 0 is the root node
638 4) 0 is the send datatype id, see decode_datatype()
639 5) 0 is the recv datatype id, see decode_datatype()
641 double clock = smpi_process_simulated_elapsed();
642 int comm_size = MPI_COMM_WORLD->size();
643 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
644 int send_size = parse_double(action[2]);
645 int disps[comm_size];
646 int recvcounts[comm_size];
649 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
650 if(action[4+comm_size] && action[5+comm_size]) {
651 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
652 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
654 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
656 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
657 void *recv = nullptr;
658 for(int i=0;i<comm_size;i++) {
659 recvcounts[i] = atoi(action[i+3]);
660 recv_sum=recv_sum+recvcounts[i];
664 int root=atoi(action[3+comm_size]);
665 int rank = MPI_COMM_WORLD->rank();
668 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
670 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
671 extra->type = TRACING_GATHERV;
672 extra->send_size = send_size;
673 extra->recvcounts= xbt_new(int,comm_size);
674 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
675 extra->recvcounts[i] = recvcounts[i];
677 extra->num_processes = comm_size;
678 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
679 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
681 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
683 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
685 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
686 log_timed_action (action, clock);
689 static void action_reducescatter(const char *const *action) {
690 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
691 0 reduceScatter 275427 275427 275427 204020 11346849 0
693 1) The first four values after the name of the action declare the recvcounts array
694 2) The value 11346849 is the amount of instructions
695 3) The last value corresponds to the datatype, see decode_datatype().
697 double clock = smpi_process_simulated_elapsed();
698 int comm_size = MPI_COMM_WORLD->size();
699 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
700 int comp_size = parse_double(action[2+comm_size]);
701 int recvcounts[comm_size];
702 int rank = smpi_process_index();
704 if(action[3+comm_size])
705 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
707 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
709 for(int i=0;i<comm_size;i++) {
710 recvcounts[i] = atoi(action[i+2]);
714 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
715 extra->type = TRACING_REDUCE_SCATTER;
716 extra->send_size = 0;
717 extra->recvcounts= xbt_new(int, comm_size);
718 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
719 extra->recvcounts[i] = recvcounts[i];
720 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
721 extra->comp_size = comp_size;
722 extra->num_processes = comm_size;
724 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
726 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
727 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
729 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
730 smpi_execute_flops(comp_size);
732 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
733 log_timed_action (action, clock);
736 static void action_allgather(const char *const *action) {
737 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
738 0 allGather 275427 275427
740 1) 275427 is the sendcount
741 2) 275427 is the recvcount
742 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
744 double clock = smpi_process_simulated_elapsed();
746 CHECK_ACTION_PARAMS(action, 2, 2)
747 int sendcount=atoi(action[2]);
748 int recvcount=atoi(action[3]);
750 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
752 if(action[4] && action[5]) {
753 MPI_CURRENT_TYPE = decode_datatype(action[4]);
754 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
756 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
758 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
759 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
761 int rank = smpi_process_index();
762 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
763 extra->type = TRACING_ALLGATHER;
764 extra->send_size = sendcount;
765 extra->recv_size= recvcount;
766 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
767 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
768 extra->num_processes = MPI_COMM_WORLD->size();
770 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
772 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
774 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
775 log_timed_action (action, clock);
778 static void action_allgatherv(const char *const *action) {
779 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
780 0 allGatherV 275427 275427 275427 275427 204020
782 1) 275427 is the sendcount
783 2) The next four elements declare the recvcounts array
784 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
786 double clock = smpi_process_simulated_elapsed();
788 int comm_size = MPI_COMM_WORLD->size();
789 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
790 int sendcount=atoi(action[2]);
791 int recvcounts[comm_size];
792 int disps[comm_size];
794 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
796 if(action[3+comm_size] && action[4+comm_size]) {
797 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
798 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
800 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
802 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
804 for(int i=0;i<comm_size;i++) {
805 recvcounts[i] = atoi(action[i+3]);
806 recv_sum=recv_sum+recvcounts[i];
809 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
811 int rank = smpi_process_index();
812 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
813 extra->type = TRACING_ALLGATHERV;
814 extra->send_size = sendcount;
815 extra->recvcounts= xbt_new(int, comm_size);
816 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
817 extra->recvcounts[i] = recvcounts[i];
818 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
819 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
820 extra->num_processes = comm_size;
822 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
824 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
827 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
828 log_timed_action (action, clock);
831 static void action_allToAllv(const char *const *action) {
832 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
833 0 allToAllV 100 1 7 10 12 100 1 70 10 5
835 1) 100 is the size of the send buffer *sizeof(int),
836 2) 1 7 10 12 is the sendcounts array
837 3) 100*sizeof(int) is the size of the receiver buffer
838 4) 1 70 10 5 is the recvcounts array
840 double clock = smpi_process_simulated_elapsed();
842 int comm_size = MPI_COMM_WORLD->size();
843 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
844 int sendcounts[comm_size];
845 int recvcounts[comm_size];
846 int senddisps[comm_size];
847 int recvdisps[comm_size];
849 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
851 int send_buf_size=parse_double(action[2]);
852 int recv_buf_size=parse_double(action[3+comm_size]);
853 if(action[4+2*comm_size] && action[5+2*comm_size]) {
854 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
855 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
858 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
860 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
861 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
863 for(int i=0;i<comm_size;i++) {
864 sendcounts[i] = atoi(action[i+3]);
865 recvcounts[i] = atoi(action[i+4+comm_size]);
870 int rank = smpi_process_index();
871 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
872 extra->type = TRACING_ALLTOALLV;
873 extra->recvcounts= xbt_new(int, comm_size);
874 extra->sendcounts= xbt_new(int, comm_size);
875 extra->num_processes = comm_size;
877 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
878 extra->send_size += sendcounts[i];
879 extra->sendcounts[i] = sendcounts[i];
880 extra->recv_size += recvcounts[i];
881 extra->recvcounts[i] = recvcounts[i];
883 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
884 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
886 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
888 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
889 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
891 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
892 log_timed_action (action, clock);
895 void smpi_replay_run(int *argc, char***argv){
896 /* First initializes everything */
897 smpi_process_init(argc, argv);
898 smpi_process_mark_as_initialized();
899 smpi_process_set_replaying(true);
901 int rank = smpi_process_index();
902 TRACE_smpi_init(rank);
903 TRACE_smpi_computing_init(rank);
904 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
905 extra->type = TRACING_INIT;
906 char *operation =bprintf("%s_init",__FUNCTION__);
907 TRACE_smpi_collective_in(rank, -1, operation, extra);
908 TRACE_smpi_collective_out(rank, -1, operation);
911 if (_xbt_replay_action_init()==0) {
912 xbt_replay_action_register("init", action_init);
913 xbt_replay_action_register("finalize", action_finalize);
914 xbt_replay_action_register("comm_size", action_comm_size);
915 xbt_replay_action_register("comm_split", action_comm_split);
916 xbt_replay_action_register("comm_dup", action_comm_dup);
917 xbt_replay_action_register("send", action_send);
918 xbt_replay_action_register("Isend", action_Isend);
919 xbt_replay_action_register("recv", action_recv);
920 xbt_replay_action_register("Irecv", action_Irecv);
921 xbt_replay_action_register("test", action_test);
922 xbt_replay_action_register("wait", action_wait);
923 xbt_replay_action_register("waitAll", action_waitall);
924 xbt_replay_action_register("barrier", action_barrier);
925 xbt_replay_action_register("bcast", action_bcast);
926 xbt_replay_action_register("reduce", action_reduce);
927 xbt_replay_action_register("allReduce", action_allReduce);
928 xbt_replay_action_register("allToAll", action_allToAll);
929 xbt_replay_action_register("allToAllV", action_allToAllv);
930 xbt_replay_action_register("gather", action_gather);
931 xbt_replay_action_register("gatherV", action_gatherv);
932 xbt_replay_action_register("allGather", action_allgather);
933 xbt_replay_action_register("allGatherV", action_allgatherv);
934 xbt_replay_action_register("reduceScatter", action_reducescatter);
935 xbt_replay_action_register("compute", action_compute);
938 //if we have a delayed start, sleep here.
941 double value = strtod((*argv)[2], &endptr);
943 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
944 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
945 smpi_execute_flops(value);
947 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
948 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
949 smpi_execute_flops(0.0);
952 /* Actually run the replay */
953 xbt_replay_action_runner(*argc, *argv);
955 /* and now, finalize everything */
956 /* One active process will stop. Decrease the counter*/
957 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
958 if (!get_reqq_self()->empty()){
959 unsigned int count_requests=get_reqq_self()->size();
960 MPI_Request requests[count_requests];
961 MPI_Status status[count_requests];
964 for (auto req: *get_reqq_self()){
968 Request::waitall(count_requests, requests, status);
970 delete get_reqq_self();
973 if(active_processes==0){
974 /* Last process alive speaking: end the simulated timer */
975 XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
976 _xbt_replay_action_exit();
977 xbt_free(sendbuffer);
978 xbt_free(recvbuffer);
981 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
982 extra_fin->type = TRACING_FINALIZE;
983 operation =bprintf("%s_finalize",__FUNCTION__);
984 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
986 smpi_process_finalize();
988 TRACE_smpi_collective_out(rank, -1, operation);
989 TRACE_smpi_finalize(smpi_process_index());
990 smpi_process_destroy();