1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!smpi_process_get_replaying())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!smpi_process_get_replaying())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!smpi_process_get_replaying())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111 if (datatype==MPI_BYTE){
114 if(datatype==MPI_DOUBLE)
116 if(datatype==MPI_INT)
118 if(datatype==MPI_CHAR)
120 if(datatype==MPI_SHORT)
122 if(datatype==MPI_LONG)
124 if(datatype==MPI_FLOAT)
127 // default - not implemented.
128 // do not warn here as we pass in this function even for other trace formats
132 static void action_init(const char *const *action)
135 XBT_DEBUG("Initialize the counters");
137 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
138 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
140 /* start a simulated timer */
141 smpi_process_simulated_start();
142 /*initialize the number of active processes */
143 active_processes = smpi_process_count();
146 reqq=xbt_new0(xbt_dynar_t,active_processes);
148 for(i=0;i<active_processes;i++){
149 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
154 static void action_finalize(const char *const *action)
158 static void action_comm_size(const char *const *action)
160 double clock = smpi_process_simulated_elapsed();
162 communicator_size = parse_double(action[2]);
163 log_timed_action (action, clock);
166 static void action_comm_split(const char *const *action)
168 double clock = smpi_process_simulated_elapsed();
170 log_timed_action (action, clock);
173 static void action_comm_dup(const char *const *action)
175 double clock = smpi_process_simulated_elapsed();
177 log_timed_action (action, clock);
180 static void action_compute(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
183 double flops= parse_double(action[2]);
185 int rank = smpi_process_index();
186 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187 extra->type=TRACING_COMPUTING;
188 extra->comp_size=flops;
189 TRACE_smpi_computing_in(rank, extra);
191 smpi_execute_flops(flops);
193 TRACE_smpi_computing_out(rank);
196 log_timed_action (action, clock);
199 static void action_send(const char *const *action)
201 int to = atoi(action[2]);
202 double size=parse_double(action[3]);
203 double clock = smpi_process_simulated_elapsed();
206 MPI_CURRENT_TYPE=decode_datatype(action[4]);
208 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
212 int rank = smpi_process_index();
214 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216 extra->type = TRACING_SEND;
217 extra->send_size = size;
219 extra->dst = dst_traced;
220 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
227 log_timed_action (action, clock);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
235 static void action_Isend(const char *const *action)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process_simulated_elapsed();
242 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
246 int rank = smpi_process_index();
247 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249 extra->type = TRACING_ISEND;
250 extra->send_size = size;
252 extra->dst = dst_traced;
253 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
258 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
261 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
265 xbt_dynar_push(reqq[smpi_process_index()],&request);
267 log_timed_action (action, clock);
270 static void action_recv(const char *const *action) {
271 int from = atoi(action[2]);
272 double size=parse_double(action[3]);
273 double clock = smpi_process_simulated_elapsed();
276 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
280 int rank = smpi_process_index();
281 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
283 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284 extra->type = TRACING_RECV;
285 extra->send_size = size;
286 extra->src = src_traced;
288 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
292 //unknow size from the receiver pov
294 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
298 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
301 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302 TRACE_smpi_recv(rank, src_traced, rank);
305 log_timed_action (action, clock);
308 static void action_Irecv(const char *const *action)
310 int from = atoi(action[2]);
311 double size=parse_double(action[3]);
312 double clock = smpi_process_simulated_elapsed();
315 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
319 int rank = smpi_process_index();
320 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322 extra->type = TRACING_IRECV;
323 extra->send_size = size;
324 extra->src = src_traced;
326 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
330 //unknow size from the receiver pov
332 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
336 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
339 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
342 xbt_dynar_push(reqq[smpi_process_index()],&request);
344 log_timed_action (action, clock);
347 static void action_test(const char *const *action){
348 double clock = smpi_process_simulated_elapsed();
353 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354 //if request is null here, this may mean that a previous test has succeeded
355 //Different times in traced application and replayed version may lead to this
356 //In this case, ignore the extra calls.
359 int rank = smpi_process_index();
360 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361 extra->type=TRACING_TEST;
362 TRACE_smpi_testing_in(rank, extra);
365 flag = smpi_mpi_test(&request, &status);
367 XBT_DEBUG("MPI_Test result: %d", flag);
368 /* push back request in dynar to be caught by a subsequent wait. if the test
369 * did succeed, the request is now NULL.
371 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
374 TRACE_smpi_testing_out(rank);
377 log_timed_action (action, clock);
380 static void action_wait(const char *const *action){
381 double clock = smpi_process_simulated_elapsed();
385 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
386 "action wait not preceded by any irecv or isend: %s",
387 xbt_str_join_array(action," "));
388 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
391 /* Assuming that the trace is well formed, this mean the comm might have
392 * been caught by a MPI_test. Then just return.
398 int rank = request->comm != MPI_COMM_NULL
399 ? smpi_comm_rank(request->comm)
402 MPI_Group group = smpi_comm_group(request->comm);
403 int src_traced = smpi_group_rank(group, request->src);
404 int dst_traced = smpi_group_rank(group, request->dst);
405 int is_wait_for_receive = request->recv;
406 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
407 extra->type = TRACING_WAIT;
408 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
410 smpi_mpi_wait(&request, &status);
412 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
413 if (is_wait_for_receive) {
414 TRACE_smpi_recv(rank, src_traced, dst_traced);
418 log_timed_action (action, clock);
421 static void action_waitall(const char *const *action){
422 double clock = smpi_process_simulated_elapsed();
423 int count_requests=0;
426 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
428 if (count_requests>0) {
429 MPI_Request requests[count_requests];
430 MPI_Status status[count_requests];
432 /* The reqq is an array of dynars. Its index corresponds to the rank.
433 Thus each rank saves its own requests to the array request. */
434 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
437 //save information from requests
439 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
440 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
441 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
442 for (i = 0; i < count_requests; i++) {
444 int *asrc = xbt_new(int, 1);
445 int *adst = xbt_new(int, 1);
446 int *arecv = xbt_new(int, 1);
447 *asrc = requests[i]->src;
448 *adst = requests[i]->dst;
449 *arecv = requests[i]->recv;
450 xbt_dynar_insert_at(srcs, i, asrc);
451 xbt_dynar_insert_at(dsts, i, adst);
452 xbt_dynar_insert_at(recvs, i, arecv);
457 int *t = xbt_new(int, 1);
458 xbt_dynar_insert_at(srcs, i, t);
459 xbt_dynar_insert_at(dsts, i, t);
460 xbt_dynar_insert_at(recvs, i, t);
464 int rank_traced = smpi_process_index();
465 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
466 extra->type = TRACING_WAITALL;
467 extra->send_size=count_requests;
468 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
471 smpi_mpi_waitall(count_requests, requests, status);
474 for (i = 0; i < count_requests; i++) {
475 int src_traced, dst_traced, is_wait_for_receive;
476 xbt_dynar_get_cpy(srcs, i, &src_traced);
477 xbt_dynar_get_cpy(dsts, i, &dst_traced);
478 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
479 if (is_wait_for_receive) {
480 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
483 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
485 xbt_dynar_free(&srcs);
486 xbt_dynar_free(&dsts);
487 xbt_dynar_free(&recvs);
490 int freedrank=smpi_process_index();
491 xbt_dynar_free_container(&(reqq[freedrank]));
492 reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
494 log_timed_action (action, clock);
497 static void action_barrier(const char *const *action){
498 double clock = smpi_process_simulated_elapsed();
500 int rank = smpi_process_index();
501 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
502 extra->type = TRACING_BARRIER;
503 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
505 mpi_coll_barrier_fun(MPI_COMM_WORLD);
507 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
510 log_timed_action (action, clock);
514 static void action_bcast(const char *const *action)
516 double size = parse_double(action[2]);
517 double clock = smpi_process_simulated_elapsed();
520 * Initialize MPI_CURRENT_TYPE in order to decrease
521 * the number of the checks
523 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
526 root= atoi(action[3]);
528 MPI_CURRENT_TYPE=decode_datatype(action[4]);
533 int rank = smpi_process_index();
534 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
536 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537 extra->type = TRACING_BCAST;
538 extra->send_size = size;
539 extra->root = root_traced;
540 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
541 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
544 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
545 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
547 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
549 log_timed_action (action, clock);
552 static void action_reduce(const char *const *action)
554 double comm_size = parse_double(action[2]);
555 double comp_size = parse_double(action[3]);
556 double clock = smpi_process_simulated_elapsed();
558 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
561 root= atoi(action[4]);
563 MPI_CURRENT_TYPE=decode_datatype(action[5]);
570 int rank = smpi_process_index();
571 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
572 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573 extra->type = TRACING_REDUCE;
574 extra->send_size = comm_size;
575 extra->comp_size = comp_size;
576 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
577 extra->root = root_traced;
579 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
581 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
582 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
583 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
584 smpi_execute_flops(comp_size);
586 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
588 log_timed_action (action, clock);
591 static void action_allReduce(const char *const *action) {
592 double comm_size = parse_double(action[2]);
593 double comp_size = parse_double(action[3]);
595 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
596 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
598 double clock = smpi_process_simulated_elapsed();
600 int rank = smpi_process_index();
601 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
602 extra->type = TRACING_ALLREDUCE;
603 extra->send_size = comm_size;
604 extra->comp_size = comp_size;
605 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
607 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
609 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
610 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
611 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
612 smpi_execute_flops(comp_size);
614 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
616 log_timed_action (action, clock);
619 static void action_allToAll(const char *const *action) {
620 double clock = smpi_process_simulated_elapsed();
621 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
622 int send_size = parse_double(action[2]);
623 int recv_size = parse_double(action[3]);
624 MPI_Datatype MPI_CURRENT_TYPE2;
627 MPI_CURRENT_TYPE=decode_datatype(action[4]);
628 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
631 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
632 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
634 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
635 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
638 int rank = smpi_process_index();
639 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
640 extra->type = TRACING_ALLTOALL;
641 extra->send_size = send_size;
642 extra->recv_size = recv_size;
643 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
644 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
646 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
649 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
652 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
654 log_timed_action (action, clock);
659 static void action_gather(const char *const *action) {
661 The structure of the gather action for the rank 0 (total 4 processes)
666 1) 68 is the sendcounts
667 2) 68 is the recvcounts
668 3) 0 is the root node
669 4) 0 is the send datatype id, see decode_datatype()
670 5) 0 is the recv datatype id, see decode_datatype()
672 double clock = smpi_process_simulated_elapsed();
673 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
674 int send_size = parse_double(action[2]);
675 int recv_size = parse_double(action[3]);
676 MPI_Datatype MPI_CURRENT_TYPE2;
678 MPI_CURRENT_TYPE=decode_datatype(action[5]);
679 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
681 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
682 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
684 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
688 root=atoi(action[4]);
689 int rank = smpi_comm_rank(MPI_COMM_WORLD);
692 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
695 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
696 extra->type = TRACING_GATHER;
697 extra->send_size = send_size;
698 extra->recv_size = recv_size;
700 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
701 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
703 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
705 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
706 recv, recv_size, MPI_CURRENT_TYPE2,
707 root, MPI_COMM_WORLD);
710 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
712 log_timed_action (action, clock);
718 static void action_gatherv(const char *const *action) {
720 The structure of the gatherv action for the rank 0 (total 4 processes)
722 0 gather 68 68 10 10 10 0 0 0
725 1) 68 is the sendcount
726 2) 68 10 10 10 is the recvcounts
727 3) 0 is the root node
728 4) 0 is the send datatype id, see decode_datatype()
729 5) 0 is the recv datatype id, see decode_datatype()
731 double clock = smpi_process_simulated_elapsed();
732 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
733 int send_size = parse_double(action[2]);
734 int *disps = xbt_new0(int, comm_size);
735 int *recvcounts = xbt_new0(int, comm_size);
738 MPI_Datatype MPI_CURRENT_TYPE2;
739 if(action[4+comm_size]) {
740 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
741 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
743 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
744 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
746 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
748 for(i=0;i<comm_size;i++) {
749 recvcounts[i] = atoi(action[i+3]);
750 recv_sum=recv_sum+recvcounts[i];
754 int root=atoi(action[3+comm_size]);
755 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
758 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
761 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
762 extra->type = TRACING_GATHERV;
763 extra->send_size = send_size;
764 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
765 for(i=0; i< comm_size; i++)//copy data to avoid bad free
766 extra->recvcounts[i] = recvcounts[i];
768 extra->num_processes = comm_size;
769 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
770 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
772 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
774 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
775 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
776 root, MPI_COMM_WORLD);
779 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
782 log_timed_action (action, clock);
783 xbt_free(recvcounts);
787 static void action_reducescatter(const char *const *action) {
790 The structure of the reducescatter action for the rank 0 (total 4 processes)
792 0 reduceScatter 275427 275427 275427 204020 11346849 0
795 1) The first four values after the name of the action declare the recvcounts array
796 2) The value 11346849 is the amount of instructions
797 3) The last value corresponds to the datatype, see decode_datatype().
799 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
803 double clock = smpi_process_simulated_elapsed();
804 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
805 int comp_size = parse_double(action[2+comm_size]);
806 int *recvcounts = xbt_new0(int, comm_size);
807 int *disps = xbt_new0(int, comm_size);
809 int rank = smpi_process_index();
811 if(action[3+comm_size])
812 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
814 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
816 for(i=0;i<comm_size;i++) {
817 recvcounts[i] = atoi(action[i+2]);
823 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
824 extra->type = TRACING_REDUCE_SCATTER;
825 extra->send_size = 0;
826 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
827 for(i=0; i< comm_size; i++)//copy data to avoid bad free
828 extra->recvcounts[i] = recvcounts[i];
829 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
830 extra->comp_size = comp_size;
831 extra->num_processes = comm_size;
833 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
835 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
836 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
838 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
840 smpi_execute_flops(comp_size);
844 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
846 xbt_free(recvcounts);
848 log_timed_action (action, clock);
851 static void action_allgather(const char *const *action) {
853 The structure of the allgather action for the rank 0 (total 4 processes)
855 0 allGather 275427 275427
858 1) 275427 is the sendcount
859 2) 275427 is the recvcount
860 3) No more values mean that the datatype for sent and receive buffer
861 is the default one, see decode_datatype().
865 double clock = smpi_process_simulated_elapsed();
867 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
868 int sendcount=atoi(action[2]);
869 int recvcount=atoi(action[3]);
871 MPI_Datatype MPI_CURRENT_TYPE2;
874 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
875 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
877 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
878 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
880 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
881 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
884 int rank = smpi_process_index();
885 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
886 extra->type = TRACING_ALLGATHER;
887 extra->send_size = sendcount;
888 extra->recv_size= recvcount;
889 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
890 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
891 extra->num_processes = comm_size;
893 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
896 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
899 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
902 log_timed_action (action, clock);
905 static void action_allgatherv(const char *const *action) {
908 The structure of the allgatherv action for the rank 0 (total 4 processes)
910 0 allGatherV 275427 275427 275427 275427 204020
913 1) 275427 is the sendcount
914 2) The next four elements declare the recvcounts array
915 3) No more values mean that the datatype for sent and receive buffer
916 is the default one, see decode_datatype().
920 double clock = smpi_process_simulated_elapsed();
922 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
924 int sendcount=atoi(action[2]);
925 int *recvcounts = xbt_new0(int, comm_size);
926 int *disps = xbt_new0(int, comm_size);
928 MPI_Datatype MPI_CURRENT_TYPE2;
930 if(action[3+comm_size]) {
931 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
932 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
934 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
935 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
937 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
939 for(i=0;i<comm_size;i++) {
940 recvcounts[i] = atoi(action[i+3]);
941 recv_sum=recv_sum+recvcounts[i];
943 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
946 int rank = smpi_process_index();
947 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
948 extra->type = TRACING_ALLGATHERV;
949 extra->send_size = sendcount;
950 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
951 for(i=0; i< comm_size; i++)//copy data to avoid bad free
952 extra->recvcounts[i] = recvcounts[i];
953 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
954 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
955 extra->num_processes = comm_size;
957 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
960 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
963 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
966 log_timed_action (action, clock);
967 xbt_free(recvcounts);
971 static void action_allToAllv(const char *const *action) {
973 The structure of the allToAllV action for the rank 0 (total 4 processes)
975 0 allToAllV 100 1 7 10 12 100 1 70 10 5
978 1) 100 is the size of the send buffer *sizeof(int),
979 2) 1 7 10 12 is the sendcounts array
980 3) 100*sizeof(int) is the size of the receiver buffer
981 4) 1 70 10 5 is the recvcounts array
986 double clock = smpi_process_simulated_elapsed();
988 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
989 int send_buf_size=0,recv_buf_size=0,i=0;
990 int *sendcounts = xbt_new0(int, comm_size);
991 int *recvcounts = xbt_new0(int, comm_size);
992 int *senddisps = xbt_new0(int, comm_size);
993 int *recvdisps = xbt_new0(int, comm_size);
995 MPI_Datatype MPI_CURRENT_TYPE2;
997 send_buf_size=parse_double(action[2]);
998 recv_buf_size=parse_double(action[3+comm_size]);
999 if(action[4+2*comm_size]) {
1000 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
1001 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
1004 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
1005 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
1008 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1009 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1011 for(i=0;i<comm_size;i++) {
1012 sendcounts[i] = atoi(action[i+3]);
1013 recvcounts[i] = atoi(action[i+4+comm_size]);
1018 int rank = smpi_process_index();
1019 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1020 extra->type = TRACING_ALLTOALLV;
1021 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1022 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1023 extra->num_processes = comm_size;
1025 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1026 extra->send_size += sendcounts[i];
1027 extra->sendcounts[i] = sendcounts[i];
1028 extra->recv_size += recvcounts[i];
1029 extra->recvcounts[i] = recvcounts[i];
1031 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
1032 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
1034 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1036 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1037 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1040 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1043 log_timed_action (action, clock);
1044 xbt_free(sendcounts);
1045 xbt_free(recvcounts);
1046 xbt_free(senddisps);
1047 xbt_free(recvdisps);
1050 void smpi_replay_init(int *argc, char***argv){
1051 smpi_process_init(argc, argv);
1052 smpi_process_mark_as_initialized();
1053 smpi_process_set_replaying(1);
1055 int rank = smpi_process_index();
1056 TRACE_smpi_init(rank);
1057 TRACE_smpi_computing_init(rank);
1058 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1059 extra->type = TRACING_INIT;
1060 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1061 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1064 if (!smpi_process_index()){
1065 _xbt_replay_action_init();
1066 xbt_replay_action_register("init", action_init);
1067 xbt_replay_action_register("finalize", action_finalize);
1068 xbt_replay_action_register("comm_size", action_comm_size);
1069 xbt_replay_action_register("comm_split", action_comm_split);
1070 xbt_replay_action_register("comm_dup", action_comm_dup);
1071 xbt_replay_action_register("send", action_send);
1072 xbt_replay_action_register("Isend", action_Isend);
1073 xbt_replay_action_register("recv", action_recv);
1074 xbt_replay_action_register("Irecv", action_Irecv);
1075 xbt_replay_action_register("test", action_test);
1076 xbt_replay_action_register("wait", action_wait);
1077 xbt_replay_action_register("waitAll", action_waitall);
1078 xbt_replay_action_register("barrier", action_barrier);
1079 xbt_replay_action_register("bcast", action_bcast);
1080 xbt_replay_action_register("reduce", action_reduce);
1081 xbt_replay_action_register("allReduce", action_allReduce);
1082 xbt_replay_action_register("allToAll", action_allToAll);
1083 xbt_replay_action_register("allToAllV", action_allToAllv);
1084 xbt_replay_action_register("gather", action_gather);
1085 xbt_replay_action_register("gatherV", action_gatherv);
1086 xbt_replay_action_register("allGather", action_allgather);
1087 xbt_replay_action_register("allGatherV", action_allgatherv);
1088 xbt_replay_action_register("reduceScatter", action_reducescatter);
1089 xbt_replay_action_register("compute", action_compute);
1092 //if we have a delayed start, sleep here.
1095 double value = strtod((*argv)[2], &endptr);
1096 if (*endptr != '\0')
1097 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1098 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1099 smpi_execute_flops(value);
1101 xbt_replay_action_runner(*argc, *argv);
1104 int smpi_replay_finalize(){
1105 double sim_time= 1.;
1106 /* One active process will stop. Decrease the counter*/
1107 XBT_DEBUG("There are %lu elements in reqq[*]",
1108 xbt_dynar_length(reqq[smpi_process_index()]));
1109 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1110 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1111 MPI_Request requests[count_requests];
1112 MPI_Status status[count_requests];
1115 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1116 smpi_mpi_waitall(count_requests, requests, status);
1122 if(!active_processes){
1123 /* Last process alive speaking */
1124 /* end the simulated timer */
1125 sim_time = smpi_process_simulated_elapsed();
1129 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1131 if(!active_processes){
1132 XBT_INFO("Simulation time %f", sim_time);
1133 _xbt_replay_action_exit();
1134 xbt_free(sendbuffer);
1135 xbt_free(recvbuffer);
1142 int rank = smpi_process_index();
1143 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1144 extra->type = TRACING_FINALIZE;
1145 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1147 smpi_process_finalize();
1149 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1150 TRACE_smpi_finalize(smpi_process_index());
1152 smpi_process_destroy();