1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!smpi_process_get_replaying())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!smpi_process_get_replaying())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!smpi_process_get_replaying())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111 if (datatype==MPI_BYTE){
114 if(datatype==MPI_DOUBLE)
116 if(datatype==MPI_INT)
118 if(datatype==MPI_CHAR)
120 if(datatype==MPI_SHORT)
122 if(datatype==MPI_LONG)
124 if(datatype==MPI_FLOAT)
127 // default - not implemented.
128 // do not warn here as we pass in this function even for other trace formats
132 static void action_init(const char *const *action)
135 XBT_DEBUG("Initialize the counters");
137 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
138 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
140 /* start a simulated timer */
141 smpi_process_simulated_start();
142 /*initialize the number of active processes */
143 active_processes = smpi_process_count();
146 reqq=xbt_new0(xbt_dynar_t,active_processes);
148 for(i=0;i<active_processes;i++){
149 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
154 static void action_finalize(const char *const *action)
158 static void action_comm_size(const char *const *action)
160 double clock = smpi_process_simulated_elapsed();
162 communicator_size = parse_double(action[2]);
163 log_timed_action (action, clock);
166 static void action_comm_split(const char *const *action)
168 double clock = smpi_process_simulated_elapsed();
170 log_timed_action (action, clock);
173 static void action_comm_dup(const char *const *action)
175 double clock = smpi_process_simulated_elapsed();
177 log_timed_action (action, clock);
180 static void action_compute(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
183 double flops= parse_double(action[2]);
185 int rank = smpi_process_index();
186 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187 extra->type=TRACING_COMPUTING;
188 extra->comp_size=flops;
189 TRACE_smpi_computing_in(rank, extra);
191 smpi_execute_flops(flops);
193 TRACE_smpi_computing_out(rank);
196 log_timed_action (action, clock);
199 static void action_send(const char *const *action)
201 int to = atoi(action[2]);
202 double size=parse_double(action[3]);
203 double clock = smpi_process_simulated_elapsed();
206 MPI_CURRENT_TYPE=decode_datatype(action[4]);
208 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
212 int rank = smpi_process_index();
214 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216 extra->type = TRACING_SEND;
217 extra->send_size = size;
219 extra->dst = dst_traced;
220 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
227 log_timed_action (action, clock);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
235 static void action_Isend(const char *const *action)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process_simulated_elapsed();
242 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
246 int rank = smpi_process_index();
247 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249 extra->type = TRACING_ISEND;
250 extra->send_size = size;
252 extra->dst = dst_traced;
253 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
258 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
261 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
265 xbt_dynar_push(reqq[smpi_process_index()],&request);
267 log_timed_action (action, clock);
270 static void action_recv(const char *const *action) {
271 int from = atoi(action[2]);
272 double size=parse_double(action[3]);
273 double clock = smpi_process_simulated_elapsed();
276 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
280 int rank = smpi_process_index();
281 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
283 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284 extra->type = TRACING_RECV;
285 extra->send_size = size;
286 extra->src = src_traced;
288 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
292 //unknow size from the receiver pov
294 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
298 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
301 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302 TRACE_smpi_recv(rank, src_traced, rank);
305 log_timed_action (action, clock);
308 static void action_Irecv(const char *const *action)
310 int from = atoi(action[2]);
311 double size=parse_double(action[3]);
312 double clock = smpi_process_simulated_elapsed();
315 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
319 int rank = smpi_process_index();
320 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322 extra->type = TRACING_IRECV;
323 extra->send_size = size;
324 extra->src = src_traced;
326 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
330 //unknow size from the receiver pov
332 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
336 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
339 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
342 xbt_dynar_push(reqq[smpi_process_index()],&request);
344 log_timed_action (action, clock);
347 static void action_test(const char *const *action){
348 double clock = smpi_process_simulated_elapsed();
353 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354 //if request is null here, this may mean that a previous test has succeeded
355 //Different times in traced application and replayed version may lead to this
356 //In this case, ignore the extra calls.
359 int rank = smpi_process_index();
360 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361 extra->type=TRACING_TEST;
362 TRACE_smpi_testing_in(rank, extra);
365 flag = smpi_mpi_test(&request, &status);
367 XBT_DEBUG("MPI_Test result: %d", flag);
368 /* push back request in dynar to be caught by a subsequent wait. if the test
369 * did succeed, the request is now NULL.
371 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
374 TRACE_smpi_testing_out(rank);
377 log_timed_action (action, clock);
380 static void action_wait(const char *const *action){
381 double clock = smpi_process_simulated_elapsed();
385 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
386 "action wait not preceded by any irecv or isend: %s",
387 xbt_str_join_array(action," "));
388 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
391 /* Assuming that the trace is well formed, this mean the comm might have
392 * been caught by a MPI_test. Then just return.
398 int rank = request->comm != MPI_COMM_NULL
399 ? smpi_comm_rank(request->comm)
402 MPI_Group group = smpi_comm_group(request->comm);
403 int src_traced = smpi_group_rank(group, request->src);
404 int dst_traced = smpi_group_rank(group, request->dst);
405 int is_wait_for_receive = request->recv;
406 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
407 extra->type = TRACING_WAIT;
408 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
410 smpi_mpi_wait(&request, &status);
412 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
413 if (is_wait_for_receive) {
414 TRACE_smpi_recv(rank, src_traced, dst_traced);
418 log_timed_action (action, clock);
421 static void action_waitall(const char *const *action){
422 double clock = smpi_process_simulated_elapsed();
423 int count_requests=0;
426 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
428 if (count_requests>0) {
429 MPI_Request requests[count_requests];
430 MPI_Status status[count_requests];
432 /* The reqq is an array of dynars. Its index corresponds to the rank.
433 Thus each rank saves its own requests to the array request. */
434 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
437 //save information from requests
439 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
440 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
441 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
442 for (i = 0; i < count_requests; i++) {
444 int *asrc = xbt_new(int, 1);
445 int *adst = xbt_new(int, 1);
446 int *arecv = xbt_new(int, 1);
447 *asrc = requests[i]->src;
448 *adst = requests[i]->dst;
449 *arecv = requests[i]->recv;
450 xbt_dynar_insert_at(srcs, i, asrc);
451 xbt_dynar_insert_at(dsts, i, adst);
452 xbt_dynar_insert_at(recvs, i, arecv);
457 int *t = xbt_new(int, 1);
458 xbt_dynar_insert_at(srcs, i, t);
459 xbt_dynar_insert_at(dsts, i, t);
460 xbt_dynar_insert_at(recvs, i, t);
464 int rank_traced = smpi_process_index();
465 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
466 extra->type = TRACING_WAITALL;
467 extra->send_size=count_requests;
468 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
471 smpi_mpi_waitall(count_requests, requests, status);
474 for (i = 0; i < count_requests; i++) {
475 int src_traced, dst_traced, is_wait_for_receive;
476 xbt_dynar_get_cpy(srcs, i, &src_traced);
477 xbt_dynar_get_cpy(dsts, i, &dst_traced);
478 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
479 if (is_wait_for_receive) {
480 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
483 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
485 xbt_dynar_free(&srcs);
486 xbt_dynar_free(&dsts);
487 xbt_dynar_free(&recvs);
490 int freedrank=smpi_process_index();
491 xbt_dynar_free_container(&(reqq[freedrank]));
492 reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
494 log_timed_action (action, clock);
497 static void action_barrier(const char *const *action){
498 double clock = smpi_process_simulated_elapsed();
500 int rank = smpi_process_index();
501 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
502 extra->type = TRACING_BARRIER;
503 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
505 mpi_coll_barrier_fun(MPI_COMM_WORLD);
507 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
510 log_timed_action (action, clock);
514 static void action_bcast(const char *const *action)
516 double size = parse_double(action[2]);
517 double clock = smpi_process_simulated_elapsed();
520 * Initialize MPI_CURRENT_TYPE in order to decrease
521 * the number of the checks
523 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
526 root= atoi(action[3]);
528 MPI_CURRENT_TYPE=decode_datatype(action[4]);
533 int rank = smpi_process_index();
534 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
536 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537 extra->type = TRACING_BCAST;
538 extra->send_size = size;
539 extra->root = root_traced;
540 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
541 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
544 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
545 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
547 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
549 log_timed_action (action, clock);
552 static void action_reduce(const char *const *action)
554 double comm_size = parse_double(action[2]);
555 double comp_size = parse_double(action[3]);
556 double clock = smpi_process_simulated_elapsed();
558 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
561 root= atoi(action[4]);
563 MPI_CURRENT_TYPE=decode_datatype(action[5]);
570 int rank = smpi_process_index();
571 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
572 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573 extra->type = TRACING_REDUCE;
574 extra->send_size = comm_size;
575 extra->comp_size = comp_size;
576 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
577 extra->root = root_traced;
579 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
581 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
582 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
583 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
584 smpi_execute_flops(comp_size);
586 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
588 log_timed_action (action, clock);
591 static void action_allReduce(const char *const *action) {
592 double comm_size = parse_double(action[2]);
593 double comp_size = parse_double(action[3]);
595 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
596 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
598 double clock = smpi_process_simulated_elapsed();
600 int rank = smpi_process_index();
601 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
602 extra->type = TRACING_ALLREDUCE;
603 extra->send_size = comm_size;
604 extra->comp_size = comp_size;
605 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
607 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
609 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
610 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
611 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
612 smpi_execute_flops(comp_size);
614 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
616 log_timed_action (action, clock);
619 static void action_allToAll(const char *const *action) {
620 double clock = smpi_process_simulated_elapsed();
621 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
622 int send_size = parse_double(action[2]);
623 int recv_size = parse_double(action[3]);
624 MPI_Datatype MPI_CURRENT_TYPE2;
627 MPI_CURRENT_TYPE=decode_datatype(action[4]);
628 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
631 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
632 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
634 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
635 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
638 int rank = smpi_process_index();
639 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
640 extra->type = TRACING_ALLTOALL;
641 extra->send_size = send_size;
642 extra->recv_size = recv_size;
643 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
644 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
646 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
649 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
652 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
654 log_timed_action (action, clock);
659 static void action_gather(const char *const *action) {
661 The structure of the gather action for the rank 0 (total 4 processes)
666 1) 68 is the sendcounts
667 2) 68 is the recvcounts
668 3) 0 is the root node
669 4) 0 is the send datatype id, see decode_datatype()
670 5) 0 is the recv datatype id, see decode_datatype()
672 double clock = smpi_process_simulated_elapsed();
673 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
674 int send_size = parse_double(action[2]);
675 int recv_size = parse_double(action[3]);
676 MPI_Datatype MPI_CURRENT_TYPE2;
678 MPI_CURRENT_TYPE=decode_datatype(action[5]);
679 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
681 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
682 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
684 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
688 root=atoi(action[4]);
689 int rank = smpi_comm_rank(MPI_COMM_WORLD);
692 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
695 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
696 extra->type = TRACING_GATHER;
697 extra->send_size = send_size;
698 extra->recv_size = recv_size;
700 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
701 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
703 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
705 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
706 recv, recv_size, MPI_CURRENT_TYPE2,
707 root, MPI_COMM_WORLD);
710 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
712 log_timed_action (action, clock);
718 static void action_gatherv(const char *const *action) {
720 The structure of the gatherv action for the rank 0 (total 4 processes)
722 0 gather 68 68 10 10 10 0 0 0
725 1) 68 is the sendcount
726 2) 68 10 10 10 is the recvcounts
727 3) 0 is the root node
728 4) 0 is the send datatype id, see decode_datatype()
729 5) 0 is the recv datatype id, see decode_datatype()
731 double clock = smpi_process_simulated_elapsed();
732 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
733 int send_size = parse_double(action[2]);
734 int *disps = xbt_new0(int, comm_size);
735 int *recvcounts = xbt_new0(int, comm_size);
738 MPI_Datatype MPI_CURRENT_TYPE2;
739 if(action[4+comm_size]) {
740 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
741 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
743 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
744 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
746 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
748 for(i=0;i<comm_size;i++) {
749 recvcounts[i] = atoi(action[i+3]);
750 recv_sum=recv_sum+recvcounts[i];
754 int root=atoi(action[3+comm_size]);
755 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
758 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
761 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
762 extra->type = TRACING_GATHERV;
763 extra->send_size = send_size;
764 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
765 for(i=0; i< comm_size; i++)//copy data to avoid bad free
766 extra->recvcounts[i] = recvcounts[i];
768 extra->num_processes = comm_size;
769 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
770 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
772 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
774 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
775 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
776 root, MPI_COMM_WORLD);
779 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
782 log_timed_action (action, clock);
783 xbt_free(recvcounts);
787 static void action_reducescatter(const char *const *action) {
790 The structure of the reducescatter action for the rank 0 (total 4 processes)
792 0 reduceScatter 275427 275427 275427 204020 11346849 0
795 1) The first four values after the name of the action declare the recvcounts array
796 2) The value 11346849 is the amount of instructions
797 3) The last value corresponds to the datatype, see decode_datatype().
799 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
803 double clock = smpi_process_simulated_elapsed();
804 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
805 int comp_size = parse_double(action[2+comm_size]);
806 int *recvcounts = xbt_new0(int, comm_size);
807 int *disps = xbt_new0(int, comm_size);
809 int rank = smpi_process_index();
811 if(action[3+comm_size])
812 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
814 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
816 for(i=0;i<comm_size;i++) {
817 recvcounts[i] = atoi(action[i+2]);
823 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
824 extra->type = TRACING_REDUCE_SCATTER;
825 extra->send_size = 0;
826 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
827 for(i=0; i< comm_size; i++)//copy data to avoid bad free
828 extra->recvcounts[i] = recvcounts[i];
829 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
830 extra->comp_size = comp_size;
831 extra->num_processes = comm_size;
833 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
835 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
836 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
838 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
840 smpi_execute_flops(comp_size);
844 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
846 xbt_free(recvcounts);
848 log_timed_action (action, clock);
852 static void action_allgatherv(const char *const *action) {
855 The structure of the allgatherv action for the rank 0 (total 4 processes)
857 0 allGatherV 275427 275427 275427 275427 204020
860 1) 275427 is the sendcount
861 2) The next four elements declare the recvcounts array
862 3) No more values mean that the datatype for sent and receive buffer
863 is the default one, see decode_datatype().
867 double clock = smpi_process_simulated_elapsed();
869 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
871 int sendcount=atoi(action[2]);
872 int *recvcounts = xbt_new0(int, comm_size);
873 int *disps = xbt_new0(int, comm_size);
875 MPI_Datatype MPI_CURRENT_TYPE2;
877 if(action[3+comm_size]) {
878 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
879 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
881 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
882 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
884 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
886 for(i=0;i<comm_size;i++) {
887 recvcounts[i] = atoi(action[i+3]);
888 recv_sum=recv_sum+recvcounts[i];
890 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
893 int rank = smpi_process_index();
894 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
895 extra->type = TRACING_ALLGATHERV;
896 extra->send_size = sendcount;
897 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
898 for(i=0; i< comm_size; i++)//copy data to avoid bad free
899 extra->recvcounts[i] = recvcounts[i];
900 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
901 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
902 extra->num_processes = comm_size;
904 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
907 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
910 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
913 log_timed_action (action, clock);
914 xbt_free(recvcounts);
919 static void action_allToAllv(const char *const *action) {
921 The structure of the allToAllV action for the rank 0 (total 4 processes)
923 0 allToAllV 100 1 7 10 12 100 1 70 10 5
926 1) 100 is the size of the send buffer *sizeof(int),
927 2) 1 7 10 12 is the sendcounts array
928 3) 100*sizeof(int) is the size of the receiver buffer
929 4) 1 70 10 5 is the recvcounts array
934 double clock = smpi_process_simulated_elapsed();
936 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
937 int send_buf_size=0,recv_buf_size=0,i=0;
938 int *sendcounts = xbt_new0(int, comm_size);
939 int *recvcounts = xbt_new0(int, comm_size);
940 int *senddisps = xbt_new0(int, comm_size);
941 int *recvdisps = xbt_new0(int, comm_size);
943 MPI_Datatype MPI_CURRENT_TYPE2;
945 send_buf_size=parse_double(action[2]);
946 recv_buf_size=parse_double(action[3+comm_size]);
947 if(action[4+2*comm_size]) {
948 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
949 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
952 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
953 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
956 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
957 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
959 for(i=0;i<comm_size;i++) {
960 sendcounts[i] = atoi(action[i+3]);
961 recvcounts[i] = atoi(action[i+4+comm_size]);
966 int rank = smpi_process_index();
967 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
968 extra->type = TRACING_ALLTOALLV;
969 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
970 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
971 extra->num_processes = comm_size;
973 for(i=0; i< comm_size; i++){//copy data to avoid bad free
974 extra->send_size += sendcounts[i];
975 extra->sendcounts[i] = sendcounts[i];
976 extra->recv_size += recvcounts[i];
977 extra->recvcounts[i] = recvcounts[i];
979 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
980 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
982 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
984 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
985 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
988 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
991 log_timed_action (action, clock);
992 xbt_free(sendcounts);
993 xbt_free(recvcounts);
998 void smpi_replay_init(int *argc, char***argv){
999 smpi_process_init(argc, argv);
1000 smpi_process_mark_as_initialized();
1001 smpi_process_set_replaying(1);
1003 int rank = smpi_process_index();
1004 TRACE_smpi_init(rank);
1005 TRACE_smpi_computing_init(rank);
1006 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1007 extra->type = TRACING_INIT;
1008 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1009 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1012 if (!smpi_process_index()){
1013 _xbt_replay_action_init();
1014 xbt_replay_action_register("init", action_init);
1015 xbt_replay_action_register("finalize", action_finalize);
1016 xbt_replay_action_register("comm_size", action_comm_size);
1017 xbt_replay_action_register("comm_split", action_comm_split);
1018 xbt_replay_action_register("comm_dup", action_comm_dup);
1019 xbt_replay_action_register("send", action_send);
1020 xbt_replay_action_register("Isend", action_Isend);
1021 xbt_replay_action_register("recv", action_recv);
1022 xbt_replay_action_register("Irecv", action_Irecv);
1023 xbt_replay_action_register("test", action_test);
1024 xbt_replay_action_register("wait", action_wait);
1025 xbt_replay_action_register("waitAll", action_waitall);
1026 xbt_replay_action_register("barrier", action_barrier);
1027 xbt_replay_action_register("bcast", action_bcast);
1028 xbt_replay_action_register("reduce", action_reduce);
1029 xbt_replay_action_register("allReduce", action_allReduce);
1030 xbt_replay_action_register("allToAll", action_allToAll);
1031 xbt_replay_action_register("allToAllV", action_allToAllv);
1032 xbt_replay_action_register("gather", action_gather);
1033 xbt_replay_action_register("gatherV", action_gatherv);
1034 xbt_replay_action_register("allGatherV", action_allgatherv);
1035 xbt_replay_action_register("reduceScatter", action_reducescatter);
1036 xbt_replay_action_register("compute", action_compute);
1039 //if we have a delayed start, sleep here.
1042 double value = strtod((*argv)[2], &endptr);
1043 if (*endptr != '\0')
1044 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1045 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1046 smpi_execute_flops(value);
1048 xbt_replay_action_runner(*argc, *argv);
1051 int smpi_replay_finalize(){
1052 double sim_time= 1.;
1053 /* One active process will stop. Decrease the counter*/
1054 XBT_DEBUG("There are %lu elements in reqq[*]",
1055 xbt_dynar_length(reqq[smpi_process_index()]));
1056 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1057 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1058 MPI_Request requests[count_requests];
1059 MPI_Status status[count_requests];
1062 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1063 smpi_mpi_waitall(count_requests, requests, status);
1069 if(!active_processes){
1070 /* Last process alive speaking */
1071 /* end the simulated timer */
1072 sim_time = smpi_process_simulated_elapsed();
1076 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1078 if(!active_processes){
1079 XBT_INFO("Simulation time %f", sim_time);
1080 _xbt_replay_action_exit();
1081 xbt_free(sendbuffer);
1082 xbt_free(recvbuffer);
1089 int rank = smpi_process_index();
1090 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1091 extra->type = TRACING_FINALIZE;
1092 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1094 smpi_process_finalize();
1096 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1097 TRACE_smpi_finalize(smpi_process_index());
1099 smpi_process_destroy();