1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "xbt/replay.h"
8 #include <unordered_map>
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15 int communicator_size = 0;
16 static int active_processes = 0;
17 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
22 static int sendbuffer_size=0;
23 char* sendbuffer=nullptr;
24 static int recvbuffer_size=0;
25 char* recvbuffer=nullptr;
27 static void log_timed_action (const char *const *action, double clock){
28 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
29 char *name = xbt_str_join_array(action, " ");
30 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
35 static std::vector<MPI_Request>* get_reqq_self()
37 return reqq.at(smpi_process_index());
40 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
42 reqq.insert({smpi_process_index(), mpi_request});
45 //allocate a single buffer for all sends, growing it if needed
46 void* smpi_get_tmp_sendbuffer(int size)
48 if (!smpi_process_get_replaying())
49 return xbt_malloc(size);
50 if (sendbuffer_size<size){
51 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
57 //allocate a single buffer for all recv
58 void* smpi_get_tmp_recvbuffer(int size){
59 if (!smpi_process_get_replaying())
60 return xbt_malloc(size);
61 if (recvbuffer_size<size){
62 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
68 void smpi_free_tmp_buffer(void* buf){
69 if (!smpi_process_get_replaying())
74 static double parse_double(const char *string)
77 double value = strtod(string, &endptr);
79 THROWF(unknown_error, 0, "%s is not a double", string);
83 static MPI_Datatype decode_datatype(const char *const action)
85 switch(atoi(action)) {
87 MPI_CURRENT_TYPE=MPI_DOUBLE;
90 MPI_CURRENT_TYPE=MPI_INT;
93 MPI_CURRENT_TYPE=MPI_CHAR;
96 MPI_CURRENT_TYPE=MPI_SHORT;
99 MPI_CURRENT_TYPE=MPI_LONG;
102 MPI_CURRENT_TYPE=MPI_FLOAT;
105 MPI_CURRENT_TYPE=MPI_BYTE;
108 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
110 return MPI_CURRENT_TYPE;
113 const char* encode_datatype(MPI_Datatype datatype, int* known)
115 //default type for output is set to MPI_BYTE
116 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
119 if (datatype==MPI_BYTE)
121 if(datatype==MPI_DOUBLE)
123 if(datatype==MPI_INT)
125 if(datatype==MPI_CHAR)
127 if(datatype==MPI_SHORT)
129 if(datatype==MPI_LONG)
131 if(datatype==MPI_FLOAT)
133 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
136 // default - not implemented.
137 // do not warn here as we pass in this function even for other trace formats
141 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
143 while(action[i]!=nullptr)\
146 THROWF(arg_error, 0, "%s replay failed.\n" \
147 "%d items were given on the line. First two should be process_id and action. " \
148 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
149 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
152 static void action_init(const char *const *action)
154 XBT_DEBUG("Initialize the counters");
155 CHECK_ACTION_PARAMS(action, 0, 1)
157 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
158 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
160 /* start a simulated timer */
161 smpi_process_simulated_start();
162 /*initialize the number of active processes */
163 active_processes = smpi_process_count();
165 set_reqq_self(new std::vector<MPI_Request>);
168 static void action_finalize(const char *const *action)
173 static void action_comm_size(const char *const *action)
175 communicator_size = parse_double(action[2]);
176 log_timed_action (action, smpi_process_simulated_elapsed());
179 static void action_comm_split(const char *const *action)
181 log_timed_action (action, smpi_process_simulated_elapsed());
184 static void action_comm_dup(const char *const *action)
186 log_timed_action (action, smpi_process_simulated_elapsed());
189 static void action_compute(const char *const *action)
191 CHECK_ACTION_PARAMS(action, 1, 0)
192 double clock = smpi_process_simulated_elapsed();
193 double flops= parse_double(action[2]);
194 int rank = smpi_process_index();
195 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
196 extra->type=TRACING_COMPUTING;
197 extra->comp_size=flops;
198 TRACE_smpi_computing_in(rank, extra);
200 smpi_execute_flops(flops);
202 TRACE_smpi_computing_out(rank);
203 log_timed_action (action, clock);
206 static void action_send(const char *const *action)
208 CHECK_ACTION_PARAMS(action, 2, 1)
209 int to = atoi(action[2]);
210 double size=parse_double(action[3]);
211 double clock = smpi_process_simulated_elapsed();
214 MPI_CURRENT_TYPE=decode_datatype(action[4]);
216 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
218 int rank = smpi_process_index();
220 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
221 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
222 extra->type = TRACING_SEND;
223 extra->send_size = size;
225 extra->dst = dst_traced;
226 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
227 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
228 if (!TRACE_smpi_view_internals())
229 TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
231 smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
233 log_timed_action (action, clock);
235 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
238 static void action_Isend(const char *const *action)
240 CHECK_ACTION_PARAMS(action, 2, 1)
241 int to = atoi(action[2]);
242 double size=parse_double(action[3]);
243 double clock = smpi_process_simulated_elapsed();
246 MPI_CURRENT_TYPE=decode_datatype(action[4]);
248 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250 int rank = smpi_process_index();
251 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
252 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253 extra->type = TRACING_ISEND;
254 extra->send_size = size;
256 extra->dst = dst_traced;
257 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
258 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
259 if (!TRACE_smpi_view_internals())
260 TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
262 MPI_Request request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
264 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
267 get_reqq_self()->push_back(request);
269 log_timed_action (action, clock);
272 static void action_recv(const char *const *action) {
273 CHECK_ACTION_PARAMS(action, 2, 1)
274 int from = atoi(action[2]);
275 double size=parse_double(action[3]);
276 double clock = smpi_process_simulated_elapsed();
280 MPI_CURRENT_TYPE=decode_datatype(action[4]);
282 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
284 int rank = smpi_process_index();
285 int src_traced = MPI_COMM_WORLD->group()->rank(from);
287 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
288 extra->type = TRACING_RECV;
289 extra->send_size = size;
290 extra->src = src_traced;
292 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
293 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
295 //unknown size from the receiver point of view
297 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
301 smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
303 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
304 if (!TRACE_smpi_view_internals()) {
305 TRACE_smpi_recv(rank, src_traced, rank, 0);
308 log_timed_action (action, clock);
311 static void action_Irecv(const char *const *action)
313 CHECK_ACTION_PARAMS(action, 2, 1)
314 int from = atoi(action[2]);
315 double size=parse_double(action[3]);
316 double clock = smpi_process_simulated_elapsed();
319 MPI_CURRENT_TYPE=decode_datatype(action[4]);
321 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
323 int rank = smpi_process_index();
324 int src_traced = MPI_COMM_WORLD->group()->rank(from);
325 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
326 extra->type = TRACING_IRECV;
327 extra->send_size = size;
328 extra->src = src_traced;
330 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
331 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
333 //unknow size from the receiver pov
335 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
339 MPI_Request request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
341 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
343 get_reqq_self()->push_back(request);
345 log_timed_action (action, clock);
348 static void action_test(const char *const *action){
349 CHECK_ACTION_PARAMS(action, 0, 0)
350 double clock = smpi_process_simulated_elapsed();
353 MPI_Request request = get_reqq_self()->back();
354 get_reqq_self()->pop_back();
355 //if request is null here, this may mean that a previous test has succeeded
356 //Different times in traced application and replayed version may lead to this
357 //In this case, ignore the extra calls.
358 if(request!=nullptr){
359 int rank = smpi_process_index();
360 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361 extra->type=TRACING_TEST;
362 TRACE_smpi_testing_in(rank, extra);
364 int flag = smpi_mpi_test(&request, &status);
366 XBT_DEBUG("MPI_Test result: %d", flag);
367 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
368 get_reqq_self()->push_back(request);
370 TRACE_smpi_testing_out(rank);
372 log_timed_action (action, clock);
375 static void action_wait(const char *const *action){
376 CHECK_ACTION_PARAMS(action, 0, 0)
377 double clock = smpi_process_simulated_elapsed();
380 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
381 xbt_str_join_array(action," "));
382 MPI_Request request = get_reqq_self()->back();
383 get_reqq_self()->pop_back();
385 if (request==nullptr){
386 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
390 int rank = request->comm != MPI_COMM_NULL ? request->comm->rank() : -1;
392 MPI_Group group = request->comm->group();
393 int src_traced = group->rank(request->src);
394 int dst_traced = group->rank(request->dst);
395 int is_wait_for_receive = request->recv;
396 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
397 extra->type = TRACING_WAIT;
398 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
400 smpi_mpi_wait(&request, &status);
402 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
403 if (is_wait_for_receive)
404 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
405 log_timed_action (action, clock);
408 static void action_waitall(const char *const *action){
409 CHECK_ACTION_PARAMS(action, 0, 0)
410 double clock = smpi_process_simulated_elapsed();
411 unsigned int count_requests=get_reqq_self()->size();
413 if (count_requests>0) {
414 MPI_Status status[count_requests];
416 int rank_traced = smpi_process_index();
417 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
418 extra->type = TRACING_WAITALL;
419 extra->send_size=count_requests;
420 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
421 int recvs_snd[count_requests];
422 int recvs_rcv[count_requests];
424 for (auto req : *(get_reqq_self())){
425 if (req && req->recv){
426 recvs_snd[i]=req->src;
427 recvs_rcv[i]=req->dst;
432 smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
434 for (i=0; i<count_requests;i++){
435 if (recvs_snd[i]!=-100)
436 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
438 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
440 log_timed_action (action, clock);
443 static void action_barrier(const char *const *action){
444 double clock = smpi_process_simulated_elapsed();
445 int rank = smpi_process_index();
446 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
447 extra->type = TRACING_BARRIER;
448 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
450 mpi_coll_barrier_fun(MPI_COMM_WORLD);
452 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
453 log_timed_action (action, clock);
456 static void action_bcast(const char *const *action)
458 CHECK_ACTION_PARAMS(action, 1, 2)
459 double size = parse_double(action[2]);
460 double clock = smpi_process_simulated_elapsed();
462 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
463 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
466 root= atoi(action[3]);
468 MPI_CURRENT_TYPE=decode_datatype(action[4]);
471 int rank = smpi_process_index();
472 int root_traced = MPI_COMM_WORLD->group()->index(root);
474 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
475 extra->type = TRACING_BCAST;
476 extra->send_size = size;
477 extra->root = root_traced;
478 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
479 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
480 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
482 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
484 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
485 log_timed_action (action, clock);
488 static void action_reduce(const char *const *action)
490 CHECK_ACTION_PARAMS(action, 2, 2)
491 double comm_size = parse_double(action[2]);
492 double comp_size = parse_double(action[3]);
493 double clock = smpi_process_simulated_elapsed();
495 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
498 root= atoi(action[4]);
500 MPI_CURRENT_TYPE=decode_datatype(action[5]);
503 int rank = smpi_process_index();
504 int root_traced = MPI_COMM_WORLD->group()->rank(root);
505 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
506 extra->type = TRACING_REDUCE;
507 extra->send_size = comm_size;
508 extra->comp_size = comp_size;
509 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
510 extra->root = root_traced;
512 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
514 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
515 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
516 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
517 smpi_execute_flops(comp_size);
519 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
520 log_timed_action (action, clock);
523 static void action_allReduce(const char *const *action) {
524 CHECK_ACTION_PARAMS(action, 2, 1)
525 double comm_size = parse_double(action[2]);
526 double comp_size = parse_double(action[3]);
529 MPI_CURRENT_TYPE=decode_datatype(action[4]);
531 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
533 double clock = smpi_process_simulated_elapsed();
534 int rank = smpi_process_index();
535 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
536 extra->type = TRACING_ALLREDUCE;
537 extra->send_size = comm_size;
538 extra->comp_size = comp_size;
539 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
540 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
542 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
543 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
544 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
545 smpi_execute_flops(comp_size);
547 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
548 log_timed_action (action, clock);
551 static void action_allToAll(const char *const *action) {
552 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
553 double clock = smpi_process_simulated_elapsed();
554 int comm_size = MPI_COMM_WORLD->size();
555 int send_size = parse_double(action[2]);
556 int recv_size = parse_double(action[3]);
557 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
559 if(action[4] && action[5]) {
560 MPI_CURRENT_TYPE=decode_datatype(action[4]);
561 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
564 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
566 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
567 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
569 int rank = smpi_process_index();
570 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
571 extra->type = TRACING_ALLTOALL;
572 extra->send_size = send_size;
573 extra->recv_size = recv_size;
574 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
575 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
577 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
579 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
581 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
582 log_timed_action (action, clock);
585 static void action_gather(const char *const *action) {
586 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
589 1) 68 is the sendcounts
590 2) 68 is the recvcounts
591 3) 0 is the root node
592 4) 0 is the send datatype id, see decode_datatype()
593 5) 0 is the recv datatype id, see decode_datatype()
595 CHECK_ACTION_PARAMS(action, 2, 3)
596 double clock = smpi_process_simulated_elapsed();
597 int comm_size = MPI_COMM_WORLD->size();
598 int send_size = parse_double(action[2]);
599 int recv_size = parse_double(action[3]);
600 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
601 if(action[4] && action[5]) {
602 MPI_CURRENT_TYPE=decode_datatype(action[5]);
603 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
605 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
607 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
608 void *recv = nullptr;
611 root=atoi(action[4]);
612 int rank = MPI_COMM_WORLD->rank();
615 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
617 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
618 extra->type = TRACING_GATHER;
619 extra->send_size = send_size;
620 extra->recv_size = recv_size;
622 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
623 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
625 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
627 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
629 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
630 log_timed_action (action, clock);
633 static void action_gatherv(const char *const *action) {
634 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
635 0 gather 68 68 10 10 10 0 0 0
637 1) 68 is the sendcount
638 2) 68 10 10 10 is the recvcounts
639 3) 0 is the root node
640 4) 0 is the send datatype id, see decode_datatype()
641 5) 0 is the recv datatype id, see decode_datatype()
643 double clock = smpi_process_simulated_elapsed();
644 int comm_size = MPI_COMM_WORLD->size();
645 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
646 int send_size = parse_double(action[2]);
647 int disps[comm_size];
648 int recvcounts[comm_size];
651 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
652 if(action[4+comm_size] && action[5+comm_size]) {
653 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
654 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
656 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
658 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
659 void *recv = nullptr;
660 for(int i=0;i<comm_size;i++) {
661 recvcounts[i] = atoi(action[i+3]);
662 recv_sum=recv_sum+recvcounts[i];
666 int root=atoi(action[3+comm_size]);
667 int rank = MPI_COMM_WORLD->rank();
670 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
672 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
673 extra->type = TRACING_GATHERV;
674 extra->send_size = send_size;
675 extra->recvcounts= xbt_new(int,comm_size);
676 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
677 extra->recvcounts[i] = recvcounts[i];
679 extra->num_processes = comm_size;
680 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
681 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
683 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
685 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
687 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
688 log_timed_action (action, clock);
691 static void action_reducescatter(const char *const *action) {
692 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
693 0 reduceScatter 275427 275427 275427 204020 11346849 0
695 1) The first four values after the name of the action declare the recvcounts array
696 2) The value 11346849 is the amount of instructions
697 3) The last value corresponds to the datatype, see decode_datatype().
699 double clock = smpi_process_simulated_elapsed();
700 int comm_size = MPI_COMM_WORLD->size();
701 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
702 int comp_size = parse_double(action[2+comm_size]);
703 int recvcounts[comm_size];
704 int rank = smpi_process_index();
706 if(action[3+comm_size])
707 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
709 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
711 for(int i=0;i<comm_size;i++) {
712 recvcounts[i] = atoi(action[i+2]);
716 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
717 extra->type = TRACING_REDUCE_SCATTER;
718 extra->send_size = 0;
719 extra->recvcounts= xbt_new(int, comm_size);
720 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
721 extra->recvcounts[i] = recvcounts[i];
722 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
723 extra->comp_size = comp_size;
724 extra->num_processes = comm_size;
726 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
728 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
729 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
731 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
732 smpi_execute_flops(comp_size);
734 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
735 log_timed_action (action, clock);
738 static void action_allgather(const char *const *action) {
739 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
740 0 allGather 275427 275427
742 1) 275427 is the sendcount
743 2) 275427 is the recvcount
744 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
746 double clock = smpi_process_simulated_elapsed();
748 CHECK_ACTION_PARAMS(action, 2, 2)
749 int sendcount=atoi(action[2]);
750 int recvcount=atoi(action[3]);
752 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
754 if(action[4] && action[5]) {
755 MPI_CURRENT_TYPE = decode_datatype(action[4]);
756 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
758 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
760 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
761 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
763 int rank = smpi_process_index();
764 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
765 extra->type = TRACING_ALLGATHER;
766 extra->send_size = sendcount;
767 extra->recv_size= recvcount;
768 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
769 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
770 extra->num_processes = MPI_COMM_WORLD->size();
772 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
774 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
776 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
777 log_timed_action (action, clock);
780 static void action_allgatherv(const char *const *action) {
781 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
782 0 allGatherV 275427 275427 275427 275427 204020
784 1) 275427 is the sendcount
785 2) The next four elements declare the recvcounts array
786 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
788 double clock = smpi_process_simulated_elapsed();
790 int comm_size = MPI_COMM_WORLD->size();
791 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
792 int sendcount=atoi(action[2]);
793 int recvcounts[comm_size];
794 int disps[comm_size];
796 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
798 if(action[3+comm_size] && action[4+comm_size]) {
799 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
800 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
802 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
804 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
806 for(int i=0;i<comm_size;i++) {
807 recvcounts[i] = atoi(action[i+3]);
808 recv_sum=recv_sum+recvcounts[i];
811 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
813 int rank = smpi_process_index();
814 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
815 extra->type = TRACING_ALLGATHERV;
816 extra->send_size = sendcount;
817 extra->recvcounts= xbt_new(int, comm_size);
818 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
819 extra->recvcounts[i] = recvcounts[i];
820 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
821 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
822 extra->num_processes = comm_size;
824 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
826 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
829 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
830 log_timed_action (action, clock);
833 static void action_allToAllv(const char *const *action) {
834 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
835 0 allToAllV 100 1 7 10 12 100 1 70 10 5
837 1) 100 is the size of the send buffer *sizeof(int),
838 2) 1 7 10 12 is the sendcounts array
839 3) 100*sizeof(int) is the size of the receiver buffer
840 4) 1 70 10 5 is the recvcounts array
842 double clock = smpi_process_simulated_elapsed();
844 int comm_size = MPI_COMM_WORLD->size();
845 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
846 int sendcounts[comm_size];
847 int recvcounts[comm_size];
848 int senddisps[comm_size];
849 int recvdisps[comm_size];
851 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
853 int send_buf_size=parse_double(action[2]);
854 int recv_buf_size=parse_double(action[3+comm_size]);
855 if(action[4+2*comm_size] && action[5+2*comm_size]) {
856 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
857 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
860 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
862 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
863 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
865 for(int i=0;i<comm_size;i++) {
866 sendcounts[i] = atoi(action[i+3]);
867 recvcounts[i] = atoi(action[i+4+comm_size]);
872 int rank = smpi_process_index();
873 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
874 extra->type = TRACING_ALLTOALLV;
875 extra->recvcounts= xbt_new(int, comm_size);
876 extra->sendcounts= xbt_new(int, comm_size);
877 extra->num_processes = comm_size;
879 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
880 extra->send_size += sendcounts[i];
881 extra->sendcounts[i] = sendcounts[i];
882 extra->recv_size += recvcounts[i];
883 extra->recvcounts[i] = recvcounts[i];
885 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
886 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
888 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
890 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
891 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
893 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
894 log_timed_action (action, clock);
897 void smpi_replay_run(int *argc, char***argv){
898 /* First initializes everything */
899 smpi_process_init(argc, argv);
900 smpi_process_mark_as_initialized();
901 smpi_process_set_replaying(true);
903 int rank = smpi_process_index();
904 TRACE_smpi_init(rank);
905 TRACE_smpi_computing_init(rank);
906 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
907 extra->type = TRACING_INIT;
908 char *operation =bprintf("%s_init",__FUNCTION__);
909 TRACE_smpi_collective_in(rank, -1, operation, extra);
910 TRACE_smpi_collective_out(rank, -1, operation);
913 if (_xbt_replay_action_init()==0) {
914 xbt_replay_action_register("init", action_init);
915 xbt_replay_action_register("finalize", action_finalize);
916 xbt_replay_action_register("comm_size", action_comm_size);
917 xbt_replay_action_register("comm_split", action_comm_split);
918 xbt_replay_action_register("comm_dup", action_comm_dup);
919 xbt_replay_action_register("send", action_send);
920 xbt_replay_action_register("Isend", action_Isend);
921 xbt_replay_action_register("recv", action_recv);
922 xbt_replay_action_register("Irecv", action_Irecv);
923 xbt_replay_action_register("test", action_test);
924 xbt_replay_action_register("wait", action_wait);
925 xbt_replay_action_register("waitAll", action_waitall);
926 xbt_replay_action_register("barrier", action_barrier);
927 xbt_replay_action_register("bcast", action_bcast);
928 xbt_replay_action_register("reduce", action_reduce);
929 xbt_replay_action_register("allReduce", action_allReduce);
930 xbt_replay_action_register("allToAll", action_allToAll);
931 xbt_replay_action_register("allToAllV", action_allToAllv);
932 xbt_replay_action_register("gather", action_gather);
933 xbt_replay_action_register("gatherV", action_gatherv);
934 xbt_replay_action_register("allGather", action_allgather);
935 xbt_replay_action_register("allGatherV", action_allgatherv);
936 xbt_replay_action_register("reduceScatter", action_reducescatter);
937 xbt_replay_action_register("compute", action_compute);
940 //if we have a delayed start, sleep here.
943 double value = strtod((*argv)[2], &endptr);
945 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
946 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
947 smpi_execute_flops(value);
949 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
950 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
951 smpi_execute_flops(0.0);
954 /* Actually run the replay */
955 xbt_replay_action_runner(*argc, *argv);
957 /* and now, finalize everything */
958 /* One active process will stop. Decrease the counter*/
959 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
960 if (!get_reqq_self()->empty()){
961 unsigned int count_requests=get_reqq_self()->size();
962 MPI_Request requests[count_requests];
963 MPI_Status status[count_requests];
966 for (auto req: *get_reqq_self()){
970 smpi_mpi_waitall(count_requests, requests, status);
972 delete get_reqq_self();
975 if(active_processes==0){
976 /* Last process alive speaking: end the simulated timer */
977 XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
978 _xbt_replay_action_exit();
979 xbt_free(sendbuffer);
980 xbt_free(recvbuffer);
983 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
984 extra_fin->type = TRACING_FINALIZE;
985 operation =bprintf("%s_finalize",__FUNCTION__);
986 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
988 smpi_process_finalize();
990 TRACE_smpi_collective_out(rank, -1, operation);
991 TRACE_smpi_finalize(smpi_process_index());
992 smpi_process_destroy();