1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "xbt/replay.hpp"
8 #include <unordered_map>
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
13 using namespace simgrid::smpi;
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
17 int communicator_size = 0;
18 static int active_processes = 0;
19 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
21 MPI_Datatype MPI_DEFAULT_TYPE;
22 MPI_Datatype MPI_CURRENT_TYPE;
24 static int sendbuffer_size=0;
25 char* sendbuffer=nullptr;
26 static int recvbuffer_size=0;
27 char* recvbuffer=nullptr;
29 static void log_timed_action (const char *const *action, double clock){
30 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
31 char *name = xbt_str_join_array(action, " ");
32 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static std::vector<MPI_Request>* get_reqq_self()
39 return reqq.at(smpi_process_index());
42 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
44 reqq.insert({smpi_process_index(), mpi_request});
47 //allocate a single buffer for all sends, growing it if needed
48 void* smpi_get_tmp_sendbuffer(int size)
50 if (!smpi_process_get_replaying())
51 return xbt_malloc(size);
52 if (sendbuffer_size<size){
53 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59 //allocate a single buffer for all recv
60 void* smpi_get_tmp_recvbuffer(int size){
61 if (!smpi_process_get_replaying())
62 return xbt_malloc(size);
63 if (recvbuffer_size<size){
64 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70 void smpi_free_tmp_buffer(void* buf){
71 if (!smpi_process_get_replaying())
76 static double parse_double(const char *string)
79 double value = strtod(string, &endptr);
81 THROWF(unknown_error, 0, "%s is not a double", string);
85 static MPI_Datatype decode_datatype(const char *const action)
87 switch(atoi(action)) {
89 MPI_CURRENT_TYPE=MPI_DOUBLE;
92 MPI_CURRENT_TYPE=MPI_INT;
95 MPI_CURRENT_TYPE=MPI_CHAR;
98 MPI_CURRENT_TYPE=MPI_SHORT;
101 MPI_CURRENT_TYPE=MPI_LONG;
104 MPI_CURRENT_TYPE=MPI_FLOAT;
107 MPI_CURRENT_TYPE=MPI_BYTE;
110 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
112 return MPI_CURRENT_TYPE;
115 const char* encode_datatype(MPI_Datatype datatype, int* known)
117 //default type for output is set to MPI_BYTE
118 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
121 if (datatype==MPI_BYTE)
123 if(datatype==MPI_DOUBLE)
125 if(datatype==MPI_INT)
127 if(datatype==MPI_CHAR)
129 if(datatype==MPI_SHORT)
131 if(datatype==MPI_LONG)
133 if(datatype==MPI_FLOAT)
135 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
138 // default - not implemented.
139 // do not warn here as we pass in this function even for other trace formats
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145 while(action[i]!=nullptr)\
148 THROWF(arg_error, 0, "%s replay failed.\n" \
149 "%d items were given on the line. First two should be process_id and action. " \
150 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
154 static void action_init(const char *const *action)
156 XBT_DEBUG("Initialize the counters");
157 CHECK_ACTION_PARAMS(action, 0, 1)
159 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
160 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
162 /* start a simulated timer */
163 smpi_process_simulated_start();
164 /*initialize the number of active processes */
165 active_processes = smpi_process_count();
167 set_reqq_self(new std::vector<MPI_Request>);
170 static void action_finalize(const char *const *action)
175 static void action_comm_size(const char *const *action)
177 communicator_size = parse_double(action[2]);
178 log_timed_action (action, smpi_process_simulated_elapsed());
181 static void action_comm_split(const char *const *action)
183 log_timed_action (action, smpi_process_simulated_elapsed());
186 static void action_comm_dup(const char *const *action)
188 log_timed_action (action, smpi_process_simulated_elapsed());
191 static void action_compute(const char *const *action)
193 CHECK_ACTION_PARAMS(action, 1, 0)
194 double clock = smpi_process_simulated_elapsed();
195 double flops= parse_double(action[2]);
196 int rank = smpi_process_index();
197 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
198 extra->type=TRACING_COMPUTING;
199 extra->comp_size=flops;
200 TRACE_smpi_computing_in(rank, extra);
202 smpi_execute_flops(flops);
204 TRACE_smpi_computing_out(rank);
205 log_timed_action (action, clock);
208 static void action_send(const char *const *action)
210 CHECK_ACTION_PARAMS(action, 2, 1)
211 int to = atoi(action[2]);
212 double size=parse_double(action[3]);
213 double clock = smpi_process_simulated_elapsed();
216 MPI_CURRENT_TYPE=decode_datatype(action[4]);
218 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
220 int rank = smpi_process_index();
222 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
223 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
224 extra->type = TRACING_SEND;
225 extra->send_size = size;
227 extra->dst = dst_traced;
228 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
229 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
230 if (!TRACE_smpi_view_internals())
231 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
233 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
235 log_timed_action (action, clock);
237 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
240 static void action_Isend(const char *const *action)
242 CHECK_ACTION_PARAMS(action, 2, 1)
243 int to = atoi(action[2]);
244 double size=parse_double(action[3]);
245 double clock = smpi_process_simulated_elapsed();
248 MPI_CURRENT_TYPE=decode_datatype(action[4]);
250 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
252 int rank = smpi_process_index();
253 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
254 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
255 extra->type = TRACING_ISEND;
256 extra->send_size = size;
258 extra->dst = dst_traced;
259 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
260 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
261 if (!TRACE_smpi_view_internals())
262 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
264 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
266 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
268 get_reqq_self()->push_back(request);
270 log_timed_action (action, clock);
273 static void action_recv(const char *const *action) {
274 CHECK_ACTION_PARAMS(action, 2, 1)
275 int from = atoi(action[2]);
276 double size=parse_double(action[3]);
277 double clock = smpi_process_simulated_elapsed();
281 MPI_CURRENT_TYPE=decode_datatype(action[4]);
283 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
285 int rank = smpi_process_index();
286 int src_traced = MPI_COMM_WORLD->group()->rank(from);
288 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
289 extra->type = TRACING_RECV;
290 extra->send_size = size;
291 extra->src = src_traced;
293 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
294 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
296 //unknown size from the receiver point of view
298 Request::probe(from, 0, MPI_COMM_WORLD, &status);
302 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
304 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
305 if (!TRACE_smpi_view_internals()) {
306 TRACE_smpi_recv(rank, src_traced, rank, 0);
309 log_timed_action (action, clock);
312 static void action_Irecv(const char *const *action)
314 CHECK_ACTION_PARAMS(action, 2, 1)
315 int from = atoi(action[2]);
316 double size=parse_double(action[3]);
317 double clock = smpi_process_simulated_elapsed();
320 MPI_CURRENT_TYPE=decode_datatype(action[4]);
322 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
324 int rank = smpi_process_index();
325 int src_traced = MPI_COMM_WORLD->group()->rank(from);
326 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
327 extra->type = TRACING_IRECV;
328 extra->send_size = size;
329 extra->src = src_traced;
331 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
332 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
334 //unknow size from the receiver pov
336 Request::probe(from, 0, MPI_COMM_WORLD, &status);
340 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
342 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
343 get_reqq_self()->push_back(request);
345 log_timed_action (action, clock);
348 static void action_test(const char *const *action){
349 CHECK_ACTION_PARAMS(action, 0, 0)
350 double clock = smpi_process_simulated_elapsed();
353 MPI_Request request = get_reqq_self()->back();
354 get_reqq_self()->pop_back();
355 //if request is null here, this may mean that a previous test has succeeded
356 //Different times in traced application and replayed version may lead to this
357 //In this case, ignore the extra calls.
358 if(request!=nullptr){
359 int rank = smpi_process_index();
360 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361 extra->type=TRACING_TEST;
362 TRACE_smpi_testing_in(rank, extra);
364 int flag = Request::test(&request, &status);
366 XBT_DEBUG("MPI_Test result: %d", flag);
367 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
368 get_reqq_self()->push_back(request);
370 TRACE_smpi_testing_out(rank);
372 log_timed_action (action, clock);
375 static void action_wait(const char *const *action){
376 CHECK_ACTION_PARAMS(action, 0, 0)
377 double clock = smpi_process_simulated_elapsed();
380 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
381 xbt_str_join_array(action," "));
382 MPI_Request request = get_reqq_self()->back();
383 get_reqq_self()->pop_back();
385 if (request==nullptr){
386 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
390 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
392 MPI_Group group = request->comm()->group();
393 int src_traced = group->rank(request->src());
394 int dst_traced = group->rank(request->dst());
395 int is_wait_for_receive = (request->flags() & RECV);
396 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
397 extra->type = TRACING_WAIT;
398 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
400 Request::wait(&request, &status);
402 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
403 if (is_wait_for_receive)
404 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
405 log_timed_action (action, clock);
408 static void action_waitall(const char *const *action){
409 CHECK_ACTION_PARAMS(action, 0, 0)
410 double clock = smpi_process_simulated_elapsed();
411 unsigned int count_requests=get_reqq_self()->size();
413 if (count_requests>0) {
414 MPI_Status status[count_requests];
416 int rank_traced = smpi_process_index();
417 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
418 extra->type = TRACING_WAITALL;
419 extra->send_size=count_requests;
420 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
421 int recvs_snd[count_requests];
422 int recvs_rcv[count_requests];
424 for (auto req : *(get_reqq_self())){
425 if (req && (req->flags () & RECV)){
426 recvs_snd[i]=req->src();
427 recvs_rcv[i]=req->dst();
432 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
434 for (i=0; i<count_requests;i++){
435 if (recvs_snd[i]!=-100)
436 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
438 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
440 log_timed_action (action, clock);
443 static void action_barrier(const char *const *action){
444 double clock = smpi_process_simulated_elapsed();
445 int rank = smpi_process_index();
446 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
447 extra->type = TRACING_BARRIER;
448 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
450 Colls::barrier(MPI_COMM_WORLD);
452 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
453 log_timed_action (action, clock);
456 static void action_bcast(const char *const *action)
458 CHECK_ACTION_PARAMS(action, 1, 2)
459 double size = parse_double(action[2]);
460 double clock = smpi_process_simulated_elapsed();
462 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
463 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
466 root= atoi(action[3]);
468 MPI_CURRENT_TYPE=decode_datatype(action[4]);
471 int rank = smpi_process_index();
472 int root_traced = MPI_COMM_WORLD->group()->index(root);
474 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
475 extra->type = TRACING_BCAST;
476 extra->send_size = size;
477 extra->root = root_traced;
478 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
479 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
480 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
482 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
484 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
485 log_timed_action (action, clock);
488 static void action_reduce(const char *const *action)
490 CHECK_ACTION_PARAMS(action, 2, 2)
491 double comm_size = parse_double(action[2]);
492 double comp_size = parse_double(action[3]);
493 double clock = smpi_process_simulated_elapsed();
495 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
498 root= atoi(action[4]);
500 MPI_CURRENT_TYPE=decode_datatype(action[5]);
503 int rank = smpi_process_index();
504 int root_traced = MPI_COMM_WORLD->group()->rank(root);
505 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
506 extra->type = TRACING_REDUCE;
507 extra->send_size = comm_size;
508 extra->comp_size = comp_size;
509 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
510 extra->root = root_traced;
512 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
514 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
515 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
516 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
517 smpi_execute_flops(comp_size);
519 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
520 log_timed_action (action, clock);
523 static void action_allReduce(const char *const *action) {
524 CHECK_ACTION_PARAMS(action, 2, 1)
525 double comm_size = parse_double(action[2]);
526 double comp_size = parse_double(action[3]);
529 MPI_CURRENT_TYPE=decode_datatype(action[4]);
531 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
533 double clock = smpi_process_simulated_elapsed();
534 int rank = smpi_process_index();
535 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
536 extra->type = TRACING_ALLREDUCE;
537 extra->send_size = comm_size;
538 extra->comp_size = comp_size;
539 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
540 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
542 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
543 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
544 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
545 smpi_execute_flops(comp_size);
547 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
548 log_timed_action (action, clock);
551 static void action_allToAll(const char *const *action) {
552 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
553 double clock = smpi_process_simulated_elapsed();
554 int comm_size = MPI_COMM_WORLD->size();
555 int send_size = parse_double(action[2]);
556 int recv_size = parse_double(action[3]);
557 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
559 if(action[4] && action[5]) {
560 MPI_CURRENT_TYPE=decode_datatype(action[4]);
561 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
564 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
566 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
567 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
569 int rank = smpi_process_index();
570 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
571 extra->type = TRACING_ALLTOALL;
572 extra->send_size = send_size;
573 extra->recv_size = recv_size;
574 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
575 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
577 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
579 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
581 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
582 log_timed_action (action, clock);
585 static void action_gather(const char *const *action) {
586 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
589 1) 68 is the sendcounts
590 2) 68 is the recvcounts
591 3) 0 is the root node
592 4) 0 is the send datatype id, see decode_datatype()
593 5) 0 is the recv datatype id, see decode_datatype()
595 CHECK_ACTION_PARAMS(action, 2, 3)
596 double clock = smpi_process_simulated_elapsed();
597 int comm_size = MPI_COMM_WORLD->size();
598 int send_size = parse_double(action[2]);
599 int recv_size = parse_double(action[3]);
600 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
601 if(action[4] && action[5]) {
602 MPI_CURRENT_TYPE=decode_datatype(action[5]);
603 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
605 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
607 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
608 void *recv = nullptr;
611 root=atoi(action[4]);
612 int rank = MPI_COMM_WORLD->rank();
615 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
617 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
618 extra->type = TRACING_GATHER;
619 extra->send_size = send_size;
620 extra->recv_size = recv_size;
622 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
623 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
625 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
627 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
629 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
630 log_timed_action (action, clock);
633 static void action_gatherv(const char *const *action) {
634 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
635 0 gather 68 68 10 10 10 0 0 0
637 1) 68 is the sendcount
638 2) 68 10 10 10 is the recvcounts
639 3) 0 is the root node
640 4) 0 is the send datatype id, see decode_datatype()
641 5) 0 is the recv datatype id, see decode_datatype()
643 double clock = smpi_process_simulated_elapsed();
644 int comm_size = MPI_COMM_WORLD->size();
645 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
646 int send_size = parse_double(action[2]);
647 int disps[comm_size];
648 int recvcounts[comm_size];
651 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
652 if(action[4+comm_size] && action[5+comm_size]) {
653 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
654 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
656 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
658 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
659 void *recv = nullptr;
660 for(int i=0;i<comm_size;i++) {
661 recvcounts[i] = atoi(action[i+3]);
662 recv_sum=recv_sum+recvcounts[i];
666 int root=atoi(action[3+comm_size]);
667 int rank = MPI_COMM_WORLD->rank();
670 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
672 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
673 extra->type = TRACING_GATHERV;
674 extra->send_size = send_size;
675 extra->recvcounts= xbt_new(int,comm_size);
676 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
677 extra->recvcounts[i] = recvcounts[i];
679 extra->num_processes = comm_size;
680 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
681 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
683 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
685 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
687 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
688 log_timed_action (action, clock);
691 static void action_reducescatter(const char *const *action) {
692 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
693 0 reduceScatter 275427 275427 275427 204020 11346849 0
695 1) The first four values after the name of the action declare the recvcounts array
696 2) The value 11346849 is the amount of instructions
697 3) The last value corresponds to the datatype, see decode_datatype().
699 double clock = smpi_process_simulated_elapsed();
700 int comm_size = MPI_COMM_WORLD->size();
701 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
702 int comp_size = parse_double(action[2+comm_size]);
703 int recvcounts[comm_size];
704 int rank = smpi_process_index();
706 if(action[3+comm_size])
707 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
709 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
711 for(int i=0;i<comm_size;i++) {
712 recvcounts[i] = atoi(action[i+2]);
716 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
717 extra->type = TRACING_REDUCE_SCATTER;
718 extra->send_size = 0;
719 extra->recvcounts= xbt_new(int, comm_size);
720 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
721 extra->recvcounts[i] = recvcounts[i];
722 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
723 extra->comp_size = comp_size;
724 extra->num_processes = comm_size;
726 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
728 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
729 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
731 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
732 smpi_execute_flops(comp_size);
734 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
735 log_timed_action (action, clock);
738 static void action_allgather(const char *const *action) {
739 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
740 0 allGather 275427 275427
742 1) 275427 is the sendcount
743 2) 275427 is the recvcount
744 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
746 double clock = smpi_process_simulated_elapsed();
748 CHECK_ACTION_PARAMS(action, 2, 2)
749 int sendcount=atoi(action[2]);
750 int recvcount=atoi(action[3]);
752 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
754 if(action[4] && action[5]) {
755 MPI_CURRENT_TYPE = decode_datatype(action[4]);
756 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
758 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
760 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
761 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
763 int rank = smpi_process_index();
764 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
765 extra->type = TRACING_ALLGATHER;
766 extra->send_size = sendcount;
767 extra->recv_size= recvcount;
768 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
769 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
770 extra->num_processes = MPI_COMM_WORLD->size();
772 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
774 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
776 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
777 log_timed_action (action, clock);
780 static void action_allgatherv(const char *const *action) {
781 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
782 0 allGatherV 275427 275427 275427 275427 204020
784 1) 275427 is the sendcount
785 2) The next four elements declare the recvcounts array
786 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
788 double clock = smpi_process_simulated_elapsed();
790 int comm_size = MPI_COMM_WORLD->size();
791 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
792 int sendcount=atoi(action[2]);
793 int recvcounts[comm_size];
794 int disps[comm_size];
796 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
798 if(action[3+comm_size] && action[4+comm_size]) {
799 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
800 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
802 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
804 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
806 for(int i=0;i<comm_size;i++) {
807 recvcounts[i] = atoi(action[i+3]);
808 recv_sum=recv_sum+recvcounts[i];
811 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
813 int rank = smpi_process_index();
814 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
815 extra->type = TRACING_ALLGATHERV;
816 extra->send_size = sendcount;
817 extra->recvcounts= xbt_new(int, comm_size);
818 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
819 extra->recvcounts[i] = recvcounts[i];
820 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
821 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
822 extra->num_processes = comm_size;
824 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
826 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
829 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
830 log_timed_action (action, clock);
833 static void action_allToAllv(const char *const *action) {
834 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
835 0 allToAllV 100 1 7 10 12 100 1 70 10 5
837 1) 100 is the size of the send buffer *sizeof(int),
838 2) 1 7 10 12 is the sendcounts array
839 3) 100*sizeof(int) is the size of the receiver buffer
840 4) 1 70 10 5 is the recvcounts array
842 double clock = smpi_process_simulated_elapsed();
844 int comm_size = MPI_COMM_WORLD->size();
845 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
846 int sendcounts[comm_size];
847 int recvcounts[comm_size];
848 int senddisps[comm_size];
849 int recvdisps[comm_size];
851 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
853 int send_buf_size=parse_double(action[2]);
854 int recv_buf_size=parse_double(action[3+comm_size]);
855 if(action[4+2*comm_size] && action[5+2*comm_size]) {
856 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
857 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
860 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
862 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
863 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
865 for(int i=0;i<comm_size;i++) {
866 sendcounts[i] = atoi(action[i+3]);
867 recvcounts[i] = atoi(action[i+4+comm_size]);
872 int rank = smpi_process_index();
873 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
874 extra->type = TRACING_ALLTOALLV;
875 extra->recvcounts= xbt_new(int, comm_size);
876 extra->sendcounts= xbt_new(int, comm_size);
877 extra->num_processes = comm_size;
879 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
880 extra->send_size += sendcounts[i];
881 extra->sendcounts[i] = sendcounts[i];
882 extra->recv_size += recvcounts[i];
883 extra->recvcounts[i] = recvcounts[i];
885 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
886 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
888 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
890 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
891 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
893 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
894 log_timed_action (action, clock);
897 void smpi_replay_run(int *argc, char***argv){
898 /* First initializes everything */
899 smpi_process_init(argc, argv);
900 smpi_process_mark_as_initialized();
901 smpi_process_set_replaying(true);
903 int rank = smpi_process_index();
904 TRACE_smpi_init(rank);
905 TRACE_smpi_computing_init(rank);
906 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
907 extra->type = TRACING_INIT;
908 char *operation =bprintf("%s_init",__FUNCTION__);
909 TRACE_smpi_collective_in(rank, -1, operation, extra);
910 TRACE_smpi_collective_out(rank, -1, operation);
912 _xbt_replay_action_init();
913 xbt_replay_action_register("init", action_init);
914 xbt_replay_action_register("finalize", action_finalize);
915 xbt_replay_action_register("comm_size", action_comm_size);
916 xbt_replay_action_register("comm_split", action_comm_split);
917 xbt_replay_action_register("comm_dup", action_comm_dup);
918 xbt_replay_action_register("send", action_send);
919 xbt_replay_action_register("Isend", action_Isend);
920 xbt_replay_action_register("recv", action_recv);
921 xbt_replay_action_register("Irecv", action_Irecv);
922 xbt_replay_action_register("test", action_test);
923 xbt_replay_action_register("wait", action_wait);
924 xbt_replay_action_register("waitAll", action_waitall);
925 xbt_replay_action_register("barrier", action_barrier);
926 xbt_replay_action_register("bcast", action_bcast);
927 xbt_replay_action_register("reduce", action_reduce);
928 xbt_replay_action_register("allReduce", action_allReduce);
929 xbt_replay_action_register("allToAll", action_allToAll);
930 xbt_replay_action_register("allToAllV", action_allToAllv);
931 xbt_replay_action_register("gather", action_gather);
932 xbt_replay_action_register("gatherV", action_gatherv);
933 xbt_replay_action_register("allGather", action_allgather);
934 xbt_replay_action_register("allGatherV", action_allgatherv);
935 xbt_replay_action_register("reduceScatter", action_reducescatter);
936 xbt_replay_action_register("compute", action_compute);
938 //if we have a delayed start, sleep here.
941 double value = strtod((*argv)[2], &endptr);
943 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
944 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
945 smpi_execute_flops(value);
947 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
948 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
949 smpi_execute_flops(0.0);
952 /* Actually run the replay */
953 xbt_replay_action_runner(*argc, *argv);
955 /* and now, finalize everything */
956 /* One active process will stop. Decrease the counter*/
957 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
958 if (!get_reqq_self()->empty()){
959 unsigned int count_requests=get_reqq_self()->size();
960 MPI_Request requests[count_requests];
961 MPI_Status status[count_requests];
964 for (auto req: *get_reqq_self()){
968 Request::waitall(count_requests, requests, status);
970 delete get_reqq_self();
973 if(active_processes==0){
974 /* Last process alive speaking: end the simulated timer */
975 XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
976 _xbt_replay_action_exit();
977 xbt_free(sendbuffer);
978 xbt_free(recvbuffer);
981 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
982 extra_fin->type = TRACING_FINALIZE;
983 operation =bprintf("%s_finalize",__FUNCTION__);
984 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
986 smpi_process_finalize();
988 TRACE_smpi_collective_out(rank, -1, operation);
989 TRACE_smpi_finalize(smpi_process_index());
990 smpi_process_destroy();