1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include <xbt/replay.h>
10 #include <unordered_map>
13 #define KEY_SIZE (sizeof(int) * 2 + 1)
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
17 int communicator_size = 0;
18 static int active_processes = 0;
19 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
21 MPI_Datatype MPI_DEFAULT_TYPE;
22 MPI_Datatype MPI_CURRENT_TYPE;
24 static int sendbuffer_size=0;
25 char* sendbuffer=nullptr;
26 static int recvbuffer_size=0;
27 char* recvbuffer=nullptr;
29 static void log_timed_action (const char *const *action, double clock){
30 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
31 char *name = xbt_str_join_array(action, " ");
32 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static std::vector<MPI_Request>* get_reqq_self()
39 return reqq.at(smpi_process_index());
42 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
44 reqq.insert({smpi_process_index(), mpi_request});
47 //allocate a single buffer for all sends, growing it if needed
48 void* smpi_get_tmp_sendbuffer(int size)
50 if (!smpi_process_get_replaying())
51 return xbt_malloc(size);
52 if (sendbuffer_size<size){
53 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59 //allocate a single buffer for all recv
60 void* smpi_get_tmp_recvbuffer(int size){
61 if (!smpi_process_get_replaying())
62 return xbt_malloc(size);
63 if (recvbuffer_size<size){
64 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70 void smpi_free_tmp_buffer(void* buf){
71 if (!smpi_process_get_replaying())
76 static double parse_double(const char *string)
79 double value = strtod(string, &endptr);
81 THROWF(unknown_error, 0, "%s is not a double", string);
85 static MPI_Datatype decode_datatype(const char *const action)
87 switch(atoi(action)) {
89 MPI_CURRENT_TYPE=MPI_DOUBLE;
92 MPI_CURRENT_TYPE=MPI_INT;
95 MPI_CURRENT_TYPE=MPI_CHAR;
98 MPI_CURRENT_TYPE=MPI_SHORT;
101 MPI_CURRENT_TYPE=MPI_LONG;
104 MPI_CURRENT_TYPE=MPI_FLOAT;
107 MPI_CURRENT_TYPE=MPI_BYTE;
110 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
112 return MPI_CURRENT_TYPE;
115 const char* encode_datatype(MPI_Datatype datatype, int* known)
117 //default type for output is set to MPI_BYTE
118 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
121 if (datatype==MPI_BYTE)
123 if(datatype==MPI_DOUBLE)
125 if(datatype==MPI_INT)
127 if(datatype==MPI_CHAR)
129 if(datatype==MPI_SHORT)
131 if(datatype==MPI_LONG)
133 if(datatype==MPI_FLOAT)
135 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
138 // default - not implemented.
139 // do not warn here as we pass in this function even for other trace formats
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145 while(action[i]!=nullptr)\
148 THROWF(arg_error, 0, "%s replay failed.\n" \
149 "%d items were given on the line. First two should be process_id and action. " \
150 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
154 static void action_init(const char *const *action)
156 XBT_DEBUG("Initialize the counters");
157 CHECK_ACTION_PARAMS(action, 0, 1)
159 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
160 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
162 /* start a simulated timer */
163 smpi_process_simulated_start();
164 /*initialize the number of active processes */
165 active_processes = smpi_process_count();
167 set_reqq_self(new std::vector<MPI_Request>);
170 static void action_finalize(const char *const *action)
175 static void action_comm_size(const char *const *action)
177 communicator_size = parse_double(action[2]);
178 log_timed_action (action, smpi_process_simulated_elapsed());
181 static void action_comm_split(const char *const *action)
183 log_timed_action (action, smpi_process_simulated_elapsed());
186 static void action_comm_dup(const char *const *action)
188 log_timed_action (action, smpi_process_simulated_elapsed());
191 static void action_compute(const char *const *action)
193 CHECK_ACTION_PARAMS(action, 1, 0)
194 double clock = smpi_process_simulated_elapsed();
195 double flops= parse_double(action[2]);
196 int rank = smpi_process_index();
197 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
198 extra->type=TRACING_COMPUTING;
199 extra->comp_size=flops;
200 TRACE_smpi_computing_in(rank, extra);
202 smpi_execute_flops(flops);
204 TRACE_smpi_computing_out(rank);
205 log_timed_action (action, clock);
208 static void action_send(const char *const *action)
210 CHECK_ACTION_PARAMS(action, 2, 1)
211 int to = atoi(action[2]);
212 double size=parse_double(action[3]);
213 double clock = smpi_process_simulated_elapsed();
216 MPI_CURRENT_TYPE=decode_datatype(action[4]);
218 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
220 int rank = smpi_process_index();
222 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
223 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
224 extra->type = TRACING_SEND;
225 extra->send_size = size;
227 extra->dst = dst_traced;
228 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
229 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
230 if (!TRACE_smpi_view_internals())
231 TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
233 smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
235 log_timed_action (action, clock);
237 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
240 static void action_Isend(const char *const *action)
242 CHECK_ACTION_PARAMS(action, 2, 1)
243 int to = atoi(action[2]);
244 double size=parse_double(action[3]);
245 double clock = smpi_process_simulated_elapsed();
248 MPI_CURRENT_TYPE=decode_datatype(action[4]);
250 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
252 int rank = smpi_process_index();
253 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
254 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
255 extra->type = TRACING_ISEND;
256 extra->send_size = size;
258 extra->dst = dst_traced;
259 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
260 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
261 if (!TRACE_smpi_view_internals())
262 TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
264 MPI_Request request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
266 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
269 get_reqq_self()->push_back(request);
271 log_timed_action (action, clock);
274 static void action_recv(const char *const *action) {
275 CHECK_ACTION_PARAMS(action, 2, 1)
276 int from = atoi(action[2]);
277 double size=parse_double(action[3]);
278 double clock = smpi_process_simulated_elapsed();
282 MPI_CURRENT_TYPE=decode_datatype(action[4]);
284 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
286 int rank = smpi_process_index();
287 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
289 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
290 extra->type = TRACING_RECV;
291 extra->send_size = size;
292 extra->src = src_traced;
294 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
295 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
297 //unknown size from the receiver point of view
299 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
303 smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
305 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
306 if (!TRACE_smpi_view_internals()) {
307 TRACE_smpi_recv(rank, src_traced, rank, 0);
310 log_timed_action (action, clock);
313 static void action_Irecv(const char *const *action)
315 CHECK_ACTION_PARAMS(action, 2, 1)
316 int from = atoi(action[2]);
317 double size=parse_double(action[3]);
318 double clock = smpi_process_simulated_elapsed();
321 MPI_CURRENT_TYPE=decode_datatype(action[4]);
323 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
325 int rank = smpi_process_index();
326 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
327 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328 extra->type = TRACING_IRECV;
329 extra->send_size = size;
330 extra->src = src_traced;
332 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
333 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
335 //unknow size from the receiver pov
337 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
341 MPI_Request request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
343 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
345 get_reqq_self()->push_back(request);
347 log_timed_action (action, clock);
350 static void action_test(const char *const *action){
351 CHECK_ACTION_PARAMS(action, 0, 0)
352 double clock = smpi_process_simulated_elapsed();
355 MPI_Request request = get_reqq_self()->back();
356 get_reqq_self()->pop_back();
357 //if request is null here, this may mean that a previous test has succeeded
358 //Different times in traced application and replayed version may lead to this
359 //In this case, ignore the extra calls.
360 if(request!=nullptr){
361 int rank = smpi_process_index();
362 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
363 extra->type=TRACING_TEST;
364 TRACE_smpi_testing_in(rank, extra);
366 int flag = smpi_mpi_test(&request, &status);
368 XBT_DEBUG("MPI_Test result: %d", flag);
369 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
370 get_reqq_self()->push_back(request);
372 TRACE_smpi_testing_out(rank);
374 log_timed_action (action, clock);
377 static void action_wait(const char *const *action){
378 CHECK_ACTION_PARAMS(action, 0, 0)
379 double clock = smpi_process_simulated_elapsed();
382 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
383 xbt_str_join_array(action," "));
384 MPI_Request request = get_reqq_self()->back();
385 get_reqq_self()->pop_back();
387 if (request==nullptr){
388 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
392 int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
394 MPI_Group group = smpi_comm_group(request->comm);
395 int src_traced = smpi_group_rank(group, request->src);
396 int dst_traced = smpi_group_rank(group, request->dst);
397 int is_wait_for_receive = request->recv;
398 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
399 extra->type = TRACING_WAIT;
400 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
402 smpi_mpi_wait(&request, &status);
404 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
405 if (is_wait_for_receive)
406 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
407 log_timed_action (action, clock);
410 static void action_waitall(const char *const *action){
411 CHECK_ACTION_PARAMS(action, 0, 0)
412 double clock = smpi_process_simulated_elapsed();
413 unsigned int count_requests=get_reqq_self()->size();
415 if (count_requests>0) {
416 MPI_Status status[count_requests];
418 int rank_traced = smpi_process_index();
419 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
420 extra->type = TRACING_WAITALL;
421 extra->send_size=count_requests;
422 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
423 int recvs_snd[count_requests];
424 int recvs_rcv[count_requests];
426 for (auto req : *(get_reqq_self())){
427 if (req && req->recv){
428 recvs_snd[i]=req->src;
429 recvs_rcv[i]=req->dst;
434 smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
436 for (i=0; i<count_requests;i++){
437 if (recvs_snd[i]!=-100)
438 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
440 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
442 log_timed_action (action, clock);
445 static void action_barrier(const char *const *action){
446 double clock = smpi_process_simulated_elapsed();
447 int rank = smpi_process_index();
448 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
449 extra->type = TRACING_BARRIER;
450 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
452 mpi_coll_barrier_fun(MPI_COMM_WORLD);
454 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
455 log_timed_action (action, clock);
458 static void action_bcast(const char *const *action)
460 CHECK_ACTION_PARAMS(action, 1, 2)
461 double size = parse_double(action[2]);
462 double clock = smpi_process_simulated_elapsed();
464 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
465 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
468 root= atoi(action[3]);
470 MPI_CURRENT_TYPE=decode_datatype(action[4]);
474 int rank = smpi_process_index();
475 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
477 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
478 extra->type = TRACING_BCAST;
479 extra->send_size = size;
480 extra->root = root_traced;
481 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
482 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
483 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
485 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
487 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
488 log_timed_action (action, clock);
491 static void action_reduce(const char *const *action)
493 CHECK_ACTION_PARAMS(action, 2, 2)
494 double comm_size = parse_double(action[2]);
495 double comp_size = parse_double(action[3]);
496 double clock = smpi_process_simulated_elapsed();
498 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
501 root= atoi(action[4]);
503 MPI_CURRENT_TYPE=decode_datatype(action[5]);
507 int rank = smpi_process_index();
508 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
509 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
510 extra->type = TRACING_REDUCE;
511 extra->send_size = comm_size;
512 extra->comp_size = comp_size;
513 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
514 extra->root = root_traced;
516 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
518 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
519 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
520 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
521 smpi_execute_flops(comp_size);
523 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
524 log_timed_action (action, clock);
527 static void action_allReduce(const char *const *action) {
528 CHECK_ACTION_PARAMS(action, 2, 1)
529 double comm_size = parse_double(action[2]);
530 double comp_size = parse_double(action[3]);
533 MPI_CURRENT_TYPE=decode_datatype(action[4]);
535 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
537 double clock = smpi_process_simulated_elapsed();
538 int rank = smpi_process_index();
539 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
540 extra->type = TRACING_ALLREDUCE;
541 extra->send_size = comm_size;
542 extra->comp_size = comp_size;
543 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
544 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
546 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
547 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
548 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
549 smpi_execute_flops(comp_size);
551 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
552 log_timed_action (action, clock);
555 static void action_allToAll(const char *const *action) {
556 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
557 //two optional (corresponding datatypes)
558 double clock = smpi_process_simulated_elapsed();
559 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
560 int send_size = parse_double(action[2]);
561 int recv_size = parse_double(action[3]);
562 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
564 if(action[4] && action[5]) {
565 MPI_CURRENT_TYPE=decode_datatype(action[4]);
566 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
569 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
572 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
573 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
575 int rank = smpi_process_index();
576 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
577 extra->type = TRACING_ALLTOALL;
578 extra->send_size = send_size;
579 extra->recv_size = recv_size;
580 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
581 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
583 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
585 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
587 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
588 log_timed_action (action, clock);
591 static void action_gather(const char *const *action) {
592 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
595 1) 68 is the sendcounts
596 2) 68 is the recvcounts
597 3) 0 is the root node
598 4) 0 is the send datatype id, see decode_datatype()
599 5) 0 is the recv datatype id, see decode_datatype()
601 CHECK_ACTION_PARAMS(action, 2, 3)
602 double clock = smpi_process_simulated_elapsed();
603 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
604 int send_size = parse_double(action[2]);
605 int recv_size = parse_double(action[3]);
606 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
607 if(action[4] && action[5]) {
608 MPI_CURRENT_TYPE=decode_datatype(action[5]);
609 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
611 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
613 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
614 void *recv = nullptr;
617 root=atoi(action[4]);
618 int rank = smpi_comm_rank(MPI_COMM_WORLD);
621 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
623 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
624 extra->type = TRACING_GATHER;
625 extra->send_size = send_size;
626 extra->recv_size = recv_size;
628 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
629 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
631 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
633 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
635 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
636 log_timed_action (action, clock);
639 static void action_gatherv(const char *const *action) {
640 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
641 0 gather 68 68 10 10 10 0 0 0
643 1) 68 is the sendcount
644 2) 68 10 10 10 is the recvcounts
645 3) 0 is the root node
646 4) 0 is the send datatype id, see decode_datatype()
647 5) 0 is the recv datatype id, see decode_datatype()
650 double clock = smpi_process_simulated_elapsed();
651 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
652 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
653 int send_size = parse_double(action[2]);
654 int disps[comm_size] = { 0 };
655 int recvcounts[comm_size];
658 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
659 if(action[4+comm_size] && action[5+comm_size]) {
660 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
661 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
663 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
665 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
666 void *recv = nullptr;
667 for(i=0;i<comm_size;i++) {
668 recvcounts[i] = atoi(action[i+3]);
669 recv_sum=recv_sum+recvcounts[i];
672 int root=atoi(action[3+comm_size]);
673 int rank = smpi_comm_rank(MPI_COMM_WORLD);
676 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
678 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
679 extra->type = TRACING_GATHERV;
680 extra->send_size = send_size;
681 extra->recvcounts= xbt_new(int,comm_size);
682 for(i=0; i< comm_size; i++)//copy data to avoid bad free
683 extra->recvcounts[i] = recvcounts[i];
685 extra->num_processes = comm_size;
686 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
687 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
689 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
691 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
693 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
694 log_timed_action (action, clock);
697 static void action_reducescatter(const char *const *action) {
698 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
699 0 reduceScatter 275427 275427 275427 204020 11346849 0
701 1) The first four values after the name of the action declare the recvcounts array
702 2) The value 11346849 is the amount of instructions
703 3) The last value corresponds to the datatype, see decode_datatype().
705 double clock = smpi_process_simulated_elapsed();
706 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
707 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
708 int comp_size = parse_double(action[2+comm_size]);
709 int recvcounts[comm_size];
710 int rank = smpi_process_index();
712 if(action[3+comm_size])
713 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
715 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
717 for(int i=0;i<comm_size;i++) {
718 recvcounts[i] = atoi(action[i+2]);
722 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
723 extra->type = TRACING_REDUCE_SCATTER;
724 extra->send_size = 0;
725 extra->recvcounts= xbt_new(int, comm_size);
726 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
727 extra->recvcounts[i] = recvcounts[i];
728 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
729 extra->comp_size = comp_size;
730 extra->num_processes = comm_size;
732 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
734 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
735 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
737 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
738 smpi_execute_flops(comp_size);
740 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
741 log_timed_action (action, clock);
744 static void action_allgather(const char *const *action) {
745 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
746 0 allGather 275427 275427
748 1) 275427 is the sendcount
749 2) 275427 is the recvcount
750 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
752 double clock = smpi_process_simulated_elapsed();
754 CHECK_ACTION_PARAMS(action, 2, 2)
755 int sendcount=atoi(action[2]);
756 int recvcount=atoi(action[3]);
758 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
760 if(action[4] && action[5]) {
761 MPI_CURRENT_TYPE = decode_datatype(action[4]);
762 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
764 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
766 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
767 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
769 int rank = smpi_process_index();
770 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
771 extra->type = TRACING_ALLGATHER;
772 extra->send_size = sendcount;
773 extra->recv_size= recvcount;
774 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
775 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
776 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
778 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
780 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
782 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
783 log_timed_action (action, clock);
786 static void action_allgatherv(const char *const *action) {
787 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
788 0 allGatherV 275427 275427 275427 275427 204020
790 1) 275427 is the sendcount
791 2) The next four elements declare the recvcounts array
792 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
794 double clock = smpi_process_simulated_elapsed();
796 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
797 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
798 int sendcount=atoi(action[2]);
799 int recvcounts[comm_size];
800 int disps[comm_size] = { 0 };
802 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
804 if(action[3+comm_size] && action[4+comm_size]) {
805 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
806 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
808 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
810 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
812 for(int i=0;i<comm_size;i++) {
813 recvcounts[i] = atoi(action[i+3]);
814 recv_sum=recv_sum+recvcounts[i];
816 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
818 int rank = smpi_process_index();
819 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
820 extra->type = TRACING_ALLGATHERV;
821 extra->send_size = sendcount;
822 extra->recvcounts= xbt_new(int, comm_size);
823 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
824 extra->recvcounts[i] = recvcounts[i];
825 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
826 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
827 extra->num_processes = comm_size;
829 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
831 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
834 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
835 log_timed_action (action, clock);
838 static void action_allToAllv(const char *const *action) {
839 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
840 0 allToAllV 100 1 7 10 12 100 1 70 10 5
842 1) 100 is the size of the send buffer *sizeof(int),
843 2) 1 7 10 12 is the sendcounts array
844 3) 100*sizeof(int) is the size of the receiver buffer
845 4) 1 70 10 5 is the recvcounts array
847 double clock = smpi_process_simulated_elapsed();
849 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
850 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
851 int sendcounts[comm_size];
852 int recvcounts[comm_size];
853 int senddisps[comm_size] = { 0 };
854 int recvdisps[comm_size] = { 0 };
856 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
858 int send_buf_size=parse_double(action[2]);
859 int recv_buf_size=parse_double(action[3+comm_size]);
860 if(action[4+2*comm_size] && action[5+2*comm_size]) {
861 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
862 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
865 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
868 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
869 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
871 for(int i=0;i<comm_size;i++) {
872 sendcounts[i] = atoi(action[i+3]);
873 recvcounts[i] = atoi(action[i+4+comm_size]);
876 int rank = smpi_process_index();
877 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
878 extra->type = TRACING_ALLTOALLV;
879 extra->recvcounts= xbt_new(int, comm_size);
880 extra->sendcounts= xbt_new(int, comm_size);
881 extra->num_processes = comm_size;
883 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
884 extra->send_size += sendcounts[i];
885 extra->sendcounts[i] = sendcounts[i];
886 extra->recv_size += recvcounts[i];
887 extra->recvcounts[i] = recvcounts[i];
889 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
890 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
892 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
894 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
895 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
897 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
898 log_timed_action (action, clock);
901 void smpi_replay_run(int *argc, char***argv){
902 /* First initializes everything */
903 smpi_process_init(argc, argv);
904 smpi_process_mark_as_initialized();
905 smpi_process_set_replaying(true);
907 int rank = smpi_process_index();
908 TRACE_smpi_init(rank);
909 TRACE_smpi_computing_init(rank);
910 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
911 extra->type = TRACING_INIT;
912 char *operation =bprintf("%s_init",__FUNCTION__);
913 TRACE_smpi_collective_in(rank, -1, operation, extra);
914 TRACE_smpi_collective_out(rank, -1, operation);
917 if (_xbt_replay_action_init()==0) {
918 xbt_replay_action_register("init", action_init);
919 xbt_replay_action_register("finalize", action_finalize);
920 xbt_replay_action_register("comm_size", action_comm_size);
921 xbt_replay_action_register("comm_split", action_comm_split);
922 xbt_replay_action_register("comm_dup", action_comm_dup);
923 xbt_replay_action_register("send", action_send);
924 xbt_replay_action_register("Isend", action_Isend);
925 xbt_replay_action_register("recv", action_recv);
926 xbt_replay_action_register("Irecv", action_Irecv);
927 xbt_replay_action_register("test", action_test);
928 xbt_replay_action_register("wait", action_wait);
929 xbt_replay_action_register("waitAll", action_waitall);
930 xbt_replay_action_register("barrier", action_barrier);
931 xbt_replay_action_register("bcast", action_bcast);
932 xbt_replay_action_register("reduce", action_reduce);
933 xbt_replay_action_register("allReduce", action_allReduce);
934 xbt_replay_action_register("allToAll", action_allToAll);
935 xbt_replay_action_register("allToAllV", action_allToAllv);
936 xbt_replay_action_register("gather", action_gather);
937 xbt_replay_action_register("gatherV", action_gatherv);
938 xbt_replay_action_register("allGather", action_allgather);
939 xbt_replay_action_register("allGatherV", action_allgatherv);
940 xbt_replay_action_register("reduceScatter", action_reducescatter);
941 xbt_replay_action_register("compute", action_compute);
944 //if we have a delayed start, sleep here.
947 double value = strtod((*argv)[2], &endptr);
949 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
950 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
951 smpi_execute_flops(value);
953 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
954 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
955 smpi_execute_flops(0.0);
958 /* Actually run the replay */
959 xbt_replay_action_runner(*argc, *argv);
961 /* and now, finalize everything */
962 /* One active process will stop. Decrease the counter*/
963 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
964 if (!get_reqq_self()->empty()){
965 unsigned int count_requests=get_reqq_self()->size();
966 MPI_Request requests[count_requests];
967 MPI_Status status[count_requests];
970 for (auto req: *get_reqq_self()){
974 smpi_mpi_waitall(count_requests, requests, status);
978 if(active_processes==0){
979 /* Last process alive speaking: end the simulated timer */
980 XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
981 _xbt_replay_action_exit();
982 xbt_free(sendbuffer);
983 xbt_free(recvbuffer);
986 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
987 extra_fin->type = TRACING_FINALIZE;
988 operation =bprintf("%s_finalize",__FUNCTION__);
989 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
991 smpi_process_finalize();
993 TRACE_smpi_collective_out(rank, -1, operation);
994 TRACE_smpi_finalize(smpi_process_index());
995 smpi_process_destroy();