1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!smpi_process_get_replaying())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!smpi_process_get_replaying())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!smpi_process_get_replaying())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype, int* known)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
112 if (datatype==MPI_BYTE){
115 if(datatype==MPI_DOUBLE)
117 if(datatype==MPI_INT)
119 if(datatype==MPI_CHAR)
121 if(datatype==MPI_SHORT)
123 if(datatype==MPI_LONG)
125 if(datatype==MPI_FLOAT)
127 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
129 // default - not implemented.
130 // do not warn here as we pass in this function even for other trace formats
134 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
136 while(action[i]!=NULL)\
139 THROWF(arg_error, 0, "%s replay failed.\n" \
140 "%d items were given on the line. First two should be process_id and action. " \
141 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
142 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
146 static void action_init(const char *const *action)
149 XBT_DEBUG("Initialize the counters");
150 CHECK_ACTION_PARAMS(action, 0, 1);
151 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
152 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
154 /* start a simulated timer */
155 smpi_process_simulated_start();
156 /*initialize the number of active processes */
157 active_processes = smpi_process_count();
160 reqq=xbt_new0(xbt_dynar_t,active_processes);
162 for(i=0;i<active_processes;i++){
163 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
168 static void action_finalize(const char *const *action)
172 static void action_comm_size(const char *const *action)
174 double clock = smpi_process_simulated_elapsed();
176 communicator_size = parse_double(action[2]);
177 log_timed_action (action, clock);
180 static void action_comm_split(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
184 log_timed_action (action, clock);
187 static void action_comm_dup(const char *const *action)
189 double clock = smpi_process_simulated_elapsed();
191 log_timed_action (action, clock);
194 static void action_compute(const char *const *action)
196 CHECK_ACTION_PARAMS(action, 1, 0);
197 double clock = smpi_process_simulated_elapsed();
198 double flops= parse_double(action[2]);
200 int rank = smpi_process_index();
201 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
202 extra->type=TRACING_COMPUTING;
203 extra->comp_size=flops;
204 TRACE_smpi_computing_in(rank, extra);
206 smpi_execute_flops(flops);
208 TRACE_smpi_computing_out(rank);
211 log_timed_action (action, clock);
214 static void action_send(const char *const *action)
216 CHECK_ACTION_PARAMS(action, 2, 1);
217 int to = atoi(action[2]);
218 double size=parse_double(action[3]);
219 double clock = smpi_process_simulated_elapsed();
222 MPI_CURRENT_TYPE=decode_datatype(action[4]);
224 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
228 int rank = smpi_process_index();
230 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
231 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
232 extra->type = TRACING_SEND;
233 extra->send_size = size;
235 extra->dst = dst_traced;
236 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
237 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
238 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
241 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
243 log_timed_action (action, clock);
246 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
251 static void action_Isend(const char *const *action)
253 CHECK_ACTION_PARAMS(action, 2, 1);
254 int to = atoi(action[2]);
255 double size=parse_double(action[3]);
256 double clock = smpi_process_simulated_elapsed();
259 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
260 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
263 int rank = smpi_process_index();
264 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
265 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
266 extra->type = TRACING_ISEND;
267 extra->send_size = size;
269 extra->dst = dst_traced;
270 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
271 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
272 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
275 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
278 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
282 xbt_dynar_push(reqq[smpi_process_index()],&request);
284 log_timed_action (action, clock);
287 static void action_recv(const char *const *action) {
288 CHECK_ACTION_PARAMS(action, 2, 1);
289 int from = atoi(action[2]);
290 double size=parse_double(action[3]);
291 double clock = smpi_process_simulated_elapsed();
294 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
295 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
298 int rank = smpi_process_index();
299 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
301 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
302 extra->type = TRACING_RECV;
303 extra->send_size = size;
304 extra->src = src_traced;
306 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
307 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
310 //unknow size from the receiver pov
312 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
316 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
319 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
320 TRACE_smpi_recv(rank, src_traced, rank);
323 log_timed_action (action, clock);
326 static void action_Irecv(const char *const *action)
328 CHECK_ACTION_PARAMS(action, 2, 1);
329 int from = atoi(action[2]);
330 double size=parse_double(action[3]);
331 double clock = smpi_process_simulated_elapsed();
334 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
335 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
338 int rank = smpi_process_index();
339 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
340 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
341 extra->type = TRACING_IRECV;
342 extra->send_size = size;
343 extra->src = src_traced;
345 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
346 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
349 //unknow size from the receiver pov
351 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
355 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
358 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
361 xbt_dynar_push(reqq[smpi_process_index()],&request);
363 log_timed_action (action, clock);
366 static void action_test(const char *const *action){
367 CHECK_ACTION_PARAMS(action, 0, 0);
368 double clock = smpi_process_simulated_elapsed();
373 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
374 //if request is null here, this may mean that a previous test has succeeded
375 //Different times in traced application and replayed version may lead to this
376 //In this case, ignore the extra calls.
379 int rank = smpi_process_index();
380 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
381 extra->type=TRACING_TEST;
382 TRACE_smpi_testing_in(rank, extra);
385 flag = smpi_mpi_test(&request, &status);
387 XBT_DEBUG("MPI_Test result: %d", flag);
388 /* push back request in dynar to be caught by a subsequent wait. if the test
389 * did succeed, the request is now NULL.
391 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
394 TRACE_smpi_testing_out(rank);
397 log_timed_action (action, clock);
400 static void action_wait(const char *const *action){
401 CHECK_ACTION_PARAMS(action, 0, 0);
402 double clock = smpi_process_simulated_elapsed();
406 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
407 "action wait not preceded by any irecv or isend: %s",
408 xbt_str_join_array(action," "));
409 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
412 /* Assuming that the trace is well formed, this mean the comm might have
413 * been caught by a MPI_test. Then just return.
419 int rank = request->comm != MPI_COMM_NULL
420 ? smpi_comm_rank(request->comm)
423 MPI_Group group = smpi_comm_group(request->comm);
424 int src_traced = smpi_group_rank(group, request->src);
425 int dst_traced = smpi_group_rank(group, request->dst);
426 int is_wait_for_receive = request->recv;
427 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
428 extra->type = TRACING_WAIT;
429 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
431 smpi_mpi_wait(&request, &status);
433 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
434 if (is_wait_for_receive) {
435 TRACE_smpi_recv(rank, src_traced, dst_traced);
439 log_timed_action (action, clock);
442 static void action_waitall(const char *const *action){
443 CHECK_ACTION_PARAMS(action, 0, 0);
444 double clock = smpi_process_simulated_elapsed();
445 int count_requests=0;
448 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
450 if (count_requests>0) {
451 MPI_Request requests[count_requests];
452 MPI_Status status[count_requests];
454 /* The reqq is an array of dynars. Its index corresponds to the rank.
455 Thus each rank saves its own requests to the array request. */
456 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
459 //save information from requests
461 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
462 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
463 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
464 for (i = 0; i < count_requests; i++) {
466 int *asrc = xbt_new(int, 1);
467 int *adst = xbt_new(int, 1);
468 int *arecv = xbt_new(int, 1);
469 *asrc = requests[i]->src;
470 *adst = requests[i]->dst;
471 *arecv = requests[i]->recv;
472 xbt_dynar_insert_at(srcs, i, asrc);
473 xbt_dynar_insert_at(dsts, i, adst);
474 xbt_dynar_insert_at(recvs, i, arecv);
479 int *t = xbt_new(int, 1);
480 xbt_dynar_insert_at(srcs, i, t);
481 xbt_dynar_insert_at(dsts, i, t);
482 xbt_dynar_insert_at(recvs, i, t);
486 int rank_traced = smpi_process_index();
487 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
488 extra->type = TRACING_WAITALL;
489 extra->send_size=count_requests;
490 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
493 smpi_mpi_waitall(count_requests, requests, status);
496 for (i = 0; i < count_requests; i++) {
497 int src_traced, dst_traced, is_wait_for_receive;
498 xbt_dynar_get_cpy(srcs, i, &src_traced);
499 xbt_dynar_get_cpy(dsts, i, &dst_traced);
500 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
501 if (is_wait_for_receive) {
502 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
505 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
507 xbt_dynar_free(&srcs);
508 xbt_dynar_free(&dsts);
509 xbt_dynar_free(&recvs);
512 int freedrank=smpi_process_index();
513 xbt_dynar_free_container(&(reqq[freedrank]));
514 reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
516 log_timed_action (action, clock);
519 static void action_barrier(const char *const *action){
520 double clock = smpi_process_simulated_elapsed();
522 int rank = smpi_process_index();
523 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
524 extra->type = TRACING_BARRIER;
525 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
527 mpi_coll_barrier_fun(MPI_COMM_WORLD);
529 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
532 log_timed_action (action, clock);
536 static void action_bcast(const char *const *action)
538 CHECK_ACTION_PARAMS(action, 1, 2);
539 double size = parse_double(action[2]);
540 double clock = smpi_process_simulated_elapsed();
543 * Initialize MPI_CURRENT_TYPE in order to decrease
544 * the number of the checks
546 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
549 root= atoi(action[3]);
551 MPI_CURRENT_TYPE=decode_datatype(action[4]);
556 int rank = smpi_process_index();
557 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
559 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
560 extra->type = TRACING_BCAST;
561 extra->send_size = size;
562 extra->root = root_traced;
563 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
564 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
567 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
568 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
570 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
572 log_timed_action (action, clock);
575 static void action_reduce(const char *const *action)
577 CHECK_ACTION_PARAMS(action, 2, 2);
578 double comm_size = parse_double(action[2]);
579 double comp_size = parse_double(action[3]);
580 double clock = smpi_process_simulated_elapsed();
582 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
585 root= atoi(action[4]);
587 MPI_CURRENT_TYPE=decode_datatype(action[5]);
594 int rank = smpi_process_index();
595 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
596 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
597 extra->type = TRACING_REDUCE;
598 extra->send_size = comm_size;
599 extra->comp_size = comp_size;
600 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
601 extra->root = root_traced;
603 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
605 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
606 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
607 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
608 smpi_execute_flops(comp_size);
610 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
612 log_timed_action (action, clock);
615 static void action_allReduce(const char *const *action) {
616 CHECK_ACTION_PARAMS(action, 2, 1);
617 double comm_size = parse_double(action[2]);
618 double comp_size = parse_double(action[3]);
620 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
621 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
623 double clock = smpi_process_simulated_elapsed();
625 int rank = smpi_process_index();
626 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
627 extra->type = TRACING_ALLREDUCE;
628 extra->send_size = comm_size;
629 extra->comp_size = comp_size;
630 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
632 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
634 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
635 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
636 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
637 smpi_execute_flops(comp_size);
639 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
641 log_timed_action (action, clock);
644 static void action_allToAll(const char *const *action) {
645 double clock = smpi_process_simulated_elapsed();
646 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
647 int send_size = parse_double(action[2]);
648 int recv_size = parse_double(action[3]);
649 MPI_Datatype MPI_CURRENT_TYPE2;
652 MPI_CURRENT_TYPE=decode_datatype(action[4]);
653 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
656 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
657 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
659 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
660 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
663 int rank = smpi_process_index();
664 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
665 extra->type = TRACING_ALLTOALL;
666 extra->send_size = send_size;
667 extra->recv_size = recv_size;
668 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
669 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
671 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
674 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
677 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
679 log_timed_action (action, clock);
684 static void action_gather(const char *const *action) {
686 The structure of the gather action for the rank 0 (total 4 processes)
691 1) 68 is the sendcounts
692 2) 68 is the recvcounts
693 3) 0 is the root node
694 4) 0 is the send datatype id, see decode_datatype()
695 5) 0 is the recv datatype id, see decode_datatype()
697 CHECK_ACTION_PARAMS(action, 2, 3);
698 double clock = smpi_process_simulated_elapsed();
699 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
700 int send_size = parse_double(action[2]);
701 int recv_size = parse_double(action[3]);
702 MPI_Datatype MPI_CURRENT_TYPE2;
704 MPI_CURRENT_TYPE=decode_datatype(action[5]);
705 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
707 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
708 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
710 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
714 root=atoi(action[4]);
715 int rank = smpi_comm_rank(MPI_COMM_WORLD);
718 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
721 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
722 extra->type = TRACING_GATHER;
723 extra->send_size = send_size;
724 extra->recv_size = recv_size;
726 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
727 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
729 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
731 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
732 recv, recv_size, MPI_CURRENT_TYPE2,
733 root, MPI_COMM_WORLD);
736 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
738 log_timed_action (action, clock);
744 static void action_gatherv(const char *const *action) {
746 The structure of the gatherv action for the rank 0 (total 4 processes)
748 0 gather 68 68 10 10 10 0 0 0
751 1) 68 is the sendcount
752 2) 68 10 10 10 is the recvcounts
753 3) 0 is the root node
754 4) 0 is the send datatype id, see decode_datatype()
755 5) 0 is the recv datatype id, see decode_datatype()
758 double clock = smpi_process_simulated_elapsed();
759 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
760 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
761 int send_size = parse_double(action[2]);
762 int *disps = xbt_new0(int, comm_size);
763 int *recvcounts = xbt_new0(int, comm_size);
766 MPI_Datatype MPI_CURRENT_TYPE2;
767 if(action[4+comm_size]) {
768 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
769 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
771 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
772 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
774 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
776 for(i=0;i<comm_size;i++) {
777 recvcounts[i] = atoi(action[i+3]);
778 recv_sum=recv_sum+recvcounts[i];
782 int root=atoi(action[3+comm_size]);
783 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
786 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
789 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
790 extra->type = TRACING_GATHERV;
791 extra->send_size = send_size;
792 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
793 for(i=0; i< comm_size; i++)//copy data to avoid bad free
794 extra->recvcounts[i] = recvcounts[i];
796 extra->num_processes = comm_size;
797 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
798 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
800 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
802 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
803 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
804 root, MPI_COMM_WORLD);
807 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
810 log_timed_action (action, clock);
811 xbt_free(recvcounts);
815 static void action_reducescatter(const char *const *action) {
818 The structure of the reducescatter action for the rank 0 (total 4 processes)
820 0 reduceScatter 275427 275427 275427 204020 11346849 0
823 1) The first four values after the name of the action declare the recvcounts array
824 2) The value 11346849 is the amount of instructions
825 3) The last value corresponds to the datatype, see decode_datatype().
827 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
831 double clock = smpi_process_simulated_elapsed();
832 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
833 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
834 int comp_size = parse_double(action[2+comm_size]);
835 int *recvcounts = xbt_new0(int, comm_size);
836 int *disps = xbt_new0(int, comm_size);
838 int rank = smpi_process_index();
840 if(action[3+comm_size])
841 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
843 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
845 for(i=0;i<comm_size;i++) {
846 recvcounts[i] = atoi(action[i+2]);
852 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
853 extra->type = TRACING_REDUCE_SCATTER;
854 extra->send_size = 0;
855 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
856 for(i=0; i< comm_size; i++)//copy data to avoid bad free
857 extra->recvcounts[i] = recvcounts[i];
858 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
859 extra->comp_size = comp_size;
860 extra->num_processes = comm_size;
862 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
864 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
865 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
867 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
869 smpi_execute_flops(comp_size);
873 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
875 xbt_free(recvcounts);
877 log_timed_action (action, clock);
880 static void action_allgather(const char *const *action) {
882 The structure of the allgather action for the rank 0 (total 4 processes)
884 0 allGather 275427 275427
887 1) 275427 is the sendcount
888 2) 275427 is the recvcount
889 3) No more values mean that the datatype for sent and receive buffer
890 is the default one, see decode_datatype().
894 double clock = smpi_process_simulated_elapsed();
896 CHECK_ACTION_PARAMS(action, 2, 2);
897 int sendcount=atoi(action[2]);
898 int recvcount=atoi(action[3]);
900 MPI_Datatype MPI_CURRENT_TYPE2;
903 MPI_CURRENT_TYPE = decode_datatype(action[3]);
904 MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
906 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
907 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
909 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
910 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
913 int rank = smpi_process_index();
914 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
915 extra->type = TRACING_ALLGATHER;
916 extra->send_size = sendcount;
917 extra->recv_size= recvcount;
918 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
919 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
920 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
922 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
925 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
928 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
931 log_timed_action (action, clock);
934 static void action_allgatherv(const char *const *action) {
937 The structure of the allgatherv action for the rank 0 (total 4 processes)
939 0 allGatherV 275427 275427 275427 275427 204020
942 1) 275427 is the sendcount
943 2) The next four elements declare the recvcounts array
944 3) No more values mean that the datatype for sent and receive buffer
945 is the default one, see decode_datatype().
949 double clock = smpi_process_simulated_elapsed();
951 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
952 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
954 int sendcount=atoi(action[2]);
955 int *recvcounts = xbt_new0(int, comm_size);
956 int *disps = xbt_new0(int, comm_size);
958 MPI_Datatype MPI_CURRENT_TYPE2;
960 if(action[3+comm_size]) {
961 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
962 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
964 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
965 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
967 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
969 for(i=0;i<comm_size;i++) {
970 recvcounts[i] = atoi(action[i+3]);
971 recv_sum=recv_sum+recvcounts[i];
973 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
976 int rank = smpi_process_index();
977 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
978 extra->type = TRACING_ALLGATHERV;
979 extra->send_size = sendcount;
980 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
981 for(i=0; i< comm_size; i++)//copy data to avoid bad free
982 extra->recvcounts[i] = recvcounts[i];
983 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
984 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
985 extra->num_processes = comm_size;
987 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
990 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
993 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
996 log_timed_action (action, clock);
997 xbt_free(recvcounts);
1001 static void action_allToAllv(const char *const *action) {
1003 The structure of the allToAllV action for the rank 0 (total 4 processes)
1005 0 allToAllV 100 1 7 10 12 100 1 70 10 5
1008 1) 100 is the size of the send buffer *sizeof(int),
1009 2) 1 7 10 12 is the sendcounts array
1010 3) 100*sizeof(int) is the size of the receiver buffer
1011 4) 1 70 10 5 is the recvcounts array
1016 double clock = smpi_process_simulated_elapsed();
1018 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
1019 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
1020 int send_buf_size=0,recv_buf_size=0,i=0;
1021 int *sendcounts = xbt_new0(int, comm_size);
1022 int *recvcounts = xbt_new0(int, comm_size);
1023 int *senddisps = xbt_new0(int, comm_size);
1024 int *recvdisps = xbt_new0(int, comm_size);
1026 MPI_Datatype MPI_CURRENT_TYPE2;
1028 send_buf_size=parse_double(action[2]);
1029 recv_buf_size=parse_double(action[3+comm_size]);
1030 if(action[4+2*comm_size]) {
1031 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
1032 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
1035 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
1036 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
1039 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1040 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1042 for(i=0;i<comm_size;i++) {
1043 sendcounts[i] = atoi(action[i+3]);
1044 recvcounts[i] = atoi(action[i+4+comm_size]);
1049 int rank = smpi_process_index();
1050 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1051 extra->type = TRACING_ALLTOALLV;
1052 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1053 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1054 extra->num_processes = comm_size;
1056 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1057 extra->send_size += sendcounts[i];
1058 extra->sendcounts[i] = sendcounts[i];
1059 extra->recv_size += recvcounts[i];
1060 extra->recvcounts[i] = recvcounts[i];
1062 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1063 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1065 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1067 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1068 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1071 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1074 log_timed_action (action, clock);
1075 xbt_free(sendcounts);
1076 xbt_free(recvcounts);
1077 xbt_free(senddisps);
1078 xbt_free(recvdisps);
1081 void smpi_replay_init(int *argc, char***argv){
1082 smpi_process_init(argc, argv);
1083 smpi_process_mark_as_initialized();
1084 smpi_process_set_replaying(1);
1086 int rank = smpi_process_index();
1087 TRACE_smpi_init(rank);
1088 TRACE_smpi_computing_init(rank);
1089 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1090 extra->type = TRACING_INIT;
1091 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1092 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1095 if (!smpi_process_index()){
1096 _xbt_replay_action_init();
1097 xbt_replay_action_register("init", action_init);
1098 xbt_replay_action_register("finalize", action_finalize);
1099 xbt_replay_action_register("comm_size", action_comm_size);
1100 xbt_replay_action_register("comm_split", action_comm_split);
1101 xbt_replay_action_register("comm_dup", action_comm_dup);
1102 xbt_replay_action_register("send", action_send);
1103 xbt_replay_action_register("Isend", action_Isend);
1104 xbt_replay_action_register("recv", action_recv);
1105 xbt_replay_action_register("Irecv", action_Irecv);
1106 xbt_replay_action_register("test", action_test);
1107 xbt_replay_action_register("wait", action_wait);
1108 xbt_replay_action_register("waitAll", action_waitall);
1109 xbt_replay_action_register("barrier", action_barrier);
1110 xbt_replay_action_register("bcast", action_bcast);
1111 xbt_replay_action_register("reduce", action_reduce);
1112 xbt_replay_action_register("allReduce", action_allReduce);
1113 xbt_replay_action_register("allToAll", action_allToAll);
1114 xbt_replay_action_register("allToAllV", action_allToAllv);
1115 xbt_replay_action_register("gather", action_gather);
1116 xbt_replay_action_register("gatherV", action_gatherv);
1117 xbt_replay_action_register("allGather", action_allgather);
1118 xbt_replay_action_register("allGatherV", action_allgatherv);
1119 xbt_replay_action_register("reduceScatter", action_reducescatter);
1120 xbt_replay_action_register("compute", action_compute);
1123 //if we have a delayed start, sleep here.
1126 double value = strtod((*argv)[2], &endptr);
1127 if (*endptr != '\0')
1128 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1129 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1130 smpi_execute_flops(value);
1132 xbt_replay_action_runner(*argc, *argv);
1135 int smpi_replay_finalize(){
1136 double sim_time= 1.;
1137 /* One active process will stop. Decrease the counter*/
1138 XBT_DEBUG("There are %lu elements in reqq[*]",
1139 xbt_dynar_length(reqq[smpi_process_index()]));
1140 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1141 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1142 MPI_Request requests[count_requests];
1143 MPI_Status status[count_requests];
1146 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1147 smpi_mpi_waitall(count_requests, requests, status);
1153 if(!active_processes){
1154 /* Last process alive speaking */
1155 /* end the simulated timer */
1156 sim_time = smpi_process_simulated_elapsed();
1160 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1162 if(!active_processes){
1163 XBT_INFO("Simulation time %f", sim_time);
1164 _xbt_replay_action_exit();
1165 xbt_free(sendbuffer);
1166 xbt_free(recvbuffer);
1173 int rank = smpi_process_index();
1174 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1175 extra->type = TRACING_FINALIZE;
1176 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1178 smpi_process_finalize();
1180 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1181 TRACE_smpi_finalize(smpi_process_index());
1183 smpi_process_destroy();