1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!smpi_process_get_replaying())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!smpi_process_get_replaying())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!smpi_process_get_replaying())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111 if (datatype==MPI_BYTE){
114 if(datatype==MPI_DOUBLE)
116 if(datatype==MPI_INT)
118 if(datatype==MPI_CHAR)
120 if(datatype==MPI_SHORT)
122 if(datatype==MPI_LONG)
124 if(datatype==MPI_FLOAT)
127 // default - not implemented.
128 // do not warn here as we pass in this function even for other trace formats
132 static void action_init(const char *const *action)
135 XBT_DEBUG("Initialize the counters");
137 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
138 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
140 /* start a simulated timer */
141 smpi_process_simulated_start();
142 /*initialize the number of active processes */
143 active_processes = smpi_process_count();
146 reqq=xbt_new0(xbt_dynar_t,active_processes);
148 for(i=0;i<active_processes;i++){
149 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
154 static void action_finalize(const char *const *action)
158 static void action_comm_size(const char *const *action)
160 double clock = smpi_process_simulated_elapsed();
162 communicator_size = parse_double(action[2]);
163 log_timed_action (action, clock);
166 static void action_comm_split(const char *const *action)
168 double clock = smpi_process_simulated_elapsed();
170 log_timed_action (action, clock);
173 static void action_comm_dup(const char *const *action)
175 double clock = smpi_process_simulated_elapsed();
177 log_timed_action (action, clock);
180 static void action_compute(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
183 double flops= parse_double(action[2]);
185 int rank = smpi_process_index();
186 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187 extra->type=TRACING_COMPUTING;
188 extra->comp_size=flops;
189 TRACE_smpi_computing_in(rank, extra);
191 smpi_execute_flops(flops);
193 TRACE_smpi_computing_out(rank);
196 log_timed_action (action, clock);
199 static void action_send(const char *const *action)
201 int to = atoi(action[2]);
202 double size=parse_double(action[3]);
203 double clock = smpi_process_simulated_elapsed();
206 MPI_CURRENT_TYPE=decode_datatype(action[4]);
208 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
212 int rank = smpi_process_index();
214 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216 extra->type = TRACING_SEND;
217 extra->send_size = size;
219 extra->dst = dst_traced;
220 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
227 log_timed_action (action, clock);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
235 static void action_Isend(const char *const *action)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process_simulated_elapsed();
242 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
246 int rank = smpi_process_index();
247 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249 extra->type = TRACING_ISEND;
250 extra->send_size = size;
252 extra->dst = dst_traced;
253 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
258 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
261 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
265 xbt_dynar_push(reqq[smpi_process_index()],&request);
267 log_timed_action (action, clock);
270 static void action_recv(const char *const *action) {
271 int from = atoi(action[2]);
272 double size=parse_double(action[3]);
273 double clock = smpi_process_simulated_elapsed();
276 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
280 int rank = smpi_process_index();
281 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
283 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284 extra->type = TRACING_RECV;
285 extra->send_size = size;
286 extra->src = src_traced;
288 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
292 //unknow size from the receiver pov
294 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
298 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
301 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302 TRACE_smpi_recv(rank, src_traced, rank);
305 log_timed_action (action, clock);
308 static void action_Irecv(const char *const *action)
310 int from = atoi(action[2]);
311 double size=parse_double(action[3]);
312 double clock = smpi_process_simulated_elapsed();
315 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
319 int rank = smpi_process_index();
320 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322 extra->type = TRACING_IRECV;
323 extra->send_size = size;
324 extra->src = src_traced;
326 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
330 //unknow size from the receiver pov
332 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
336 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
339 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
342 xbt_dynar_push(reqq[smpi_process_index()],&request);
344 log_timed_action (action, clock);
347 static void action_test(const char *const *action){
348 double clock = smpi_process_simulated_elapsed();
353 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354 //if request is null here, this may mean that a previous test has succeeded
355 //Different times in traced application and replayed version may lead to this
356 //In this case, ignore the extra calls.
359 int rank = smpi_process_index();
360 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361 extra->type=TRACING_TEST;
362 TRACE_smpi_testing_in(rank, extra);
365 flag = smpi_mpi_test(&request, &status);
367 XBT_DEBUG("MPI_Test result: %d", flag);
368 /* push back request in dynar to be caught by a subsequent wait. if the test
369 * did succeed, the request is now NULL.
371 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
374 TRACE_smpi_testing_out(rank);
377 log_timed_action (action, clock);
380 static void action_wait(const char *const *action){
381 double clock = smpi_process_simulated_elapsed();
385 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
386 "action wait not preceded by any irecv or isend: %s",
387 xbt_str_join_array(action," "));
388 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
391 /* Assuming that the trace is well formed, this mean the comm might have
392 * been caught by a MPI_test. Then just return.
398 int rank = request->comm != MPI_COMM_NULL
399 ? smpi_comm_rank(request->comm)
402 MPI_Group group = smpi_comm_group(request->comm);
403 int src_traced = smpi_group_rank(group, request->src);
404 int dst_traced = smpi_group_rank(group, request->dst);
405 int is_wait_for_receive = request->recv;
406 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
407 extra->type = TRACING_WAIT;
408 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
410 smpi_mpi_wait(&request, &status);
412 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
413 if (is_wait_for_receive) {
414 TRACE_smpi_recv(rank, src_traced, dst_traced);
418 log_timed_action (action, clock);
421 static void action_waitall(const char *const *action){
422 double clock = smpi_process_simulated_elapsed();
423 int count_requests=0;
426 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
428 if (count_requests>0) {
429 MPI_Request requests[count_requests];
430 MPI_Status status[count_requests];
432 /* The reqq is an array of dynars. Its index corresponds to the rank.
433 Thus each rank saves its own requests to the array request. */
434 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
437 //save information from requests
439 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
440 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
441 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
442 for (i = 0; i < count_requests; i++) {
444 int *asrc = xbt_new(int, 1);
445 int *adst = xbt_new(int, 1);
446 int *arecv = xbt_new(int, 1);
447 *asrc = requests[i]->src;
448 *adst = requests[i]->dst;
449 *arecv = requests[i]->recv;
450 xbt_dynar_insert_at(srcs, i, asrc);
451 xbt_dynar_insert_at(dsts, i, adst);
452 xbt_dynar_insert_at(recvs, i, arecv);
457 int *t = xbt_new(int, 1);
458 xbt_dynar_insert_at(srcs, i, t);
459 xbt_dynar_insert_at(dsts, i, t);
460 xbt_dynar_insert_at(recvs, i, t);
464 int rank_traced = smpi_process_index();
465 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
466 extra->type = TRACING_WAITALL;
467 extra->send_size=count_requests;
468 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
471 smpi_mpi_waitall(count_requests, requests, status);
474 for (i = 0; i < count_requests; i++) {
475 int src_traced, dst_traced, is_wait_for_receive;
476 xbt_dynar_get_cpy(srcs, i, &src_traced);
477 xbt_dynar_get_cpy(dsts, i, &dst_traced);
478 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
479 if (is_wait_for_receive) {
480 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
483 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
485 xbt_dynar_free(&srcs);
486 xbt_dynar_free(&dsts);
487 xbt_dynar_free(&recvs);
490 int freedrank=smpi_process_index();
491 xbt_dynar_free_container(&(reqq[freedrank]));
492 reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
494 log_timed_action (action, clock);
497 static void action_barrier(const char *const *action){
498 double clock = smpi_process_simulated_elapsed();
500 int rank = smpi_process_index();
501 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
502 extra->type = TRACING_BARRIER;
503 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
505 mpi_coll_barrier_fun(MPI_COMM_WORLD);
507 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
510 log_timed_action (action, clock);
514 static void action_bcast(const char *const *action)
516 double size = parse_double(action[2]);
517 double clock = smpi_process_simulated_elapsed();
520 * Initialize MPI_CURRENT_TYPE in order to decrease
521 * the number of the checks
523 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
526 root= atoi(action[3]);
528 MPI_CURRENT_TYPE=decode_datatype(action[4]);
533 int rank = smpi_process_index();
534 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
536 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537 extra->type = TRACING_BCAST;
538 extra->send_size = size;
539 extra->root = root_traced;
540 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
541 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
544 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
545 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
547 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
549 log_timed_action (action, clock);
552 static void action_reduce(const char *const *action)
554 double comm_size = parse_double(action[2]);
555 double comp_size = parse_double(action[3]);
556 double clock = smpi_process_simulated_elapsed();
558 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
561 root= atoi(action[4]);
563 MPI_CURRENT_TYPE=decode_datatype(action[5]);
570 int rank = smpi_process_index();
571 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
572 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573 extra->type = TRACING_REDUCE;
574 extra->send_size = comm_size;
575 extra->comp_size = comp_size;
576 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
577 extra->root = root_traced;
579 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
581 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
582 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
583 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
584 smpi_execute_flops(comp_size);
586 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
588 log_timed_action (action, clock);
591 static void action_allReduce(const char *const *action) {
592 double comm_size = parse_double(action[2]);
593 double comp_size = parse_double(action[3]);
595 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
596 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
598 double clock = smpi_process_simulated_elapsed();
600 int rank = smpi_process_index();
601 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
602 extra->type = TRACING_ALLREDUCE;
603 extra->send_size = comm_size;
604 extra->comp_size = comp_size;
605 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
607 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
609 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
610 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
611 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
612 smpi_execute_flops(comp_size);
614 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
616 log_timed_action (action, clock);
619 static void action_allToAll(const char *const *action) {
620 double clock = smpi_process_simulated_elapsed();
621 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
622 int send_size = parse_double(action[2]);
623 int recv_size = parse_double(action[3]);
624 MPI_Datatype MPI_CURRENT_TYPE2;
627 MPI_CURRENT_TYPE=decode_datatype(action[4]);
628 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
631 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
632 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
634 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
635 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
638 int rank = smpi_process_index();
639 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
640 extra->type = TRACING_ALLTOALL;
641 extra->send_size = send_size;
642 extra->recv_size = recv_size;
643 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
644 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
646 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
649 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
652 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
654 log_timed_action (action, clock);
659 static void action_gather(const char *const *action) {
661 The structure of the gather action for the rank 0 (total 4 processes)
666 1) 68 is the sendcounts
667 2) 68 is the recvcounts
668 3) 0 is the root node
669 4) 0 is the send datatype id, see decode_datatype()
670 5) 0 is the recv datatype id, see decode_datatype()
672 double clock = smpi_process_simulated_elapsed();
673 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
674 int send_size = parse_double(action[2]);
675 int recv_size = parse_double(action[3]);
676 MPI_Datatype MPI_CURRENT_TYPE2;
678 MPI_CURRENT_TYPE=decode_datatype(action[5]);
679 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
681 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
682 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
684 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
687 int root=atoi(action[4]);
688 int rank = smpi_comm_rank(MPI_COMM_WORLD);
691 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
694 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
695 extra->type = TRACING_GATHER;
696 extra->send_size = send_size;
697 extra->recv_size = recv_size;
699 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
700 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
702 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
704 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
705 recv, recv_size, MPI_CURRENT_TYPE2,
706 root, MPI_COMM_WORLD);
709 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
711 log_timed_action (action, clock);
717 static void action_gatherv(const char *const *action) {
719 The structure of the gatherv action for the rank 0 (total 4 processes)
721 0 gather 68 68 10 10 10 0 0 0
724 1) 68 is the sendcount
725 2) 68 10 10 10 is the recvcounts
726 3) 0 is the root node
727 4) 0 is the send datatype id, see decode_datatype()
728 5) 0 is the recv datatype id, see decode_datatype()
730 double clock = smpi_process_simulated_elapsed();
731 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
732 int send_size = parse_double(action[2]);
733 int *disps = xbt_new0(int, comm_size);
734 int *recvcounts = xbt_new0(int, comm_size);
737 MPI_Datatype MPI_CURRENT_TYPE2;
738 if(action[4+comm_size]) {
739 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
740 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
742 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
743 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
745 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
747 for(i=0;i<comm_size;i++) {
748 recvcounts[i] = atoi(action[i+3]);
749 recv_sum=recv_sum+recvcounts[i];
753 int root=atoi(action[3+comm_size]);
754 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
757 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
760 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
761 extra->type = TRACING_GATHERV;
762 extra->send_size = send_size;
763 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
764 for(i=0; i< comm_size; i++)//copy data to avoid bad free
765 extra->recvcounts[i] = recvcounts[i];
767 extra->num_processes = comm_size;
768 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
769 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
771 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
773 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
774 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
775 root, MPI_COMM_WORLD);
778 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
781 log_timed_action (action, clock);
782 xbt_free(recvcounts);
786 static void action_reducescatter(const char *const *action) {
789 The structure of the reducescatter action for the rank 0 (total 4 processes)
791 0 reduceScatter 275427 275427 275427 204020 11346849 0
794 1) The first four values after the name of the action declare the recvcounts array
795 2) The value 11346849 is the amount of instructions
796 3) The last value corresponds to the datatype, see decode_datatype().
798 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
802 double clock = smpi_process_simulated_elapsed();
803 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
804 int comp_size = parse_double(action[2+comm_size]);
805 int *recvcounts = xbt_new0(int, comm_size);
806 int *disps = xbt_new0(int, comm_size);
808 int rank = smpi_process_index();
810 if(action[3+comm_size])
811 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
813 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
815 for(i=0;i<comm_size;i++) {
816 recvcounts[i] = atoi(action[i+2]);
822 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
823 extra->type = TRACING_REDUCE_SCATTER;
824 extra->send_size = 0;
825 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
826 for(i=0; i< comm_size; i++)//copy data to avoid bad free
827 extra->recvcounts[i] = recvcounts[i];
828 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
829 extra->comp_size = comp_size;
830 extra->num_processes = comm_size;
832 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
834 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
835 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
837 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
839 smpi_execute_flops(comp_size);
843 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
845 xbt_free(recvcounts);
847 log_timed_action (action, clock);
851 static void action_allgatherv(const char *const *action) {
854 The structure of the allgatherv action for the rank 0 (total 4 processes)
856 0 allGatherV 275427 275427 275427 275427 204020
859 1) 275427 is the sendcount
860 2) The next four elements declare the recvcounts array
861 3) No more values mean that the datatype for sent and receive buffer
862 is the default one, see decode_datatype().
866 double clock = smpi_process_simulated_elapsed();
868 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
870 int sendcount=atoi(action[2]);
871 int *recvcounts = xbt_new0(int, comm_size);
872 int *disps = xbt_new0(int, comm_size);
874 MPI_Datatype MPI_CURRENT_TYPE2;
876 if(action[3+comm_size]) {
877 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
878 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
880 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
881 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
883 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
885 for(i=0;i<comm_size;i++) {
886 recvcounts[i] = atoi(action[i+3]);
887 recv_sum=recv_sum+recvcounts[i];
889 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
892 int rank = smpi_process_index();
893 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
894 extra->type = TRACING_ALLGATHERV;
895 extra->send_size = sendcount;
896 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
897 for(i=0; i< comm_size; i++)//copy data to avoid bad free
898 extra->recvcounts[i] = recvcounts[i];
899 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
900 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
901 extra->num_processes = comm_size;
903 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
906 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
909 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
912 log_timed_action (action, clock);
913 xbt_free(recvcounts);
918 static void action_allToAllv(const char *const *action) {
920 The structure of the allToAllV action for the rank 0 (total 4 processes)
922 0 allToAllV 100 1 7 10 12 100 1 70 10 5
925 1) 100 is the size of the send buffer *sizeof(int),
926 2) 1 7 10 12 is the sendcounts array
927 3) 100*sizeof(int) is the size of the receiver buffer
928 4) 1 70 10 5 is the recvcounts array
933 double clock = smpi_process_simulated_elapsed();
935 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
936 int send_buf_size=0,recv_buf_size=0,i=0;
937 int *sendcounts = xbt_new0(int, comm_size);
938 int *recvcounts = xbt_new0(int, comm_size);
939 int *senddisps = xbt_new0(int, comm_size);
940 int *recvdisps = xbt_new0(int, comm_size);
942 MPI_Datatype MPI_CURRENT_TYPE2;
944 send_buf_size=parse_double(action[2]);
945 recv_buf_size=parse_double(action[3+comm_size]);
946 if(action[4+2*comm_size]) {
947 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
948 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
951 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
952 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
955 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
956 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
958 for(i=0;i<comm_size;i++) {
959 sendcounts[i] = atoi(action[i+3]);
960 recvcounts[i] = atoi(action[i+4+comm_size]);
965 int rank = smpi_process_index();
966 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
967 extra->type = TRACING_ALLTOALLV;
968 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
969 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
970 extra->num_processes = comm_size;
972 for(i=0; i< comm_size; i++){//copy data to avoid bad free
973 extra->send_size += sendcounts[i];
974 extra->sendcounts[i] = sendcounts[i];
975 extra->recv_size += recvcounts[i];
976 extra->recvcounts[i] = recvcounts[i];
978 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
979 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
981 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
983 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
984 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
987 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
990 log_timed_action (action, clock);
991 xbt_free(sendcounts);
992 xbt_free(recvcounts);
997 void smpi_replay_init(int *argc, char***argv){
998 smpi_process_init(argc, argv);
999 smpi_process_mark_as_initialized();
1000 smpi_process_set_replaying(1);
1002 int rank = smpi_process_index();
1003 TRACE_smpi_init(rank);
1004 TRACE_smpi_computing_init(rank);
1005 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1006 extra->type = TRACING_INIT;
1007 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1008 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1011 if (!smpi_process_index()){
1012 _xbt_replay_action_init();
1013 xbt_replay_action_register("init", action_init);
1014 xbt_replay_action_register("finalize", action_finalize);
1015 xbt_replay_action_register("comm_size", action_comm_size);
1016 xbt_replay_action_register("comm_split", action_comm_split);
1017 xbt_replay_action_register("comm_dup", action_comm_dup);
1018 xbt_replay_action_register("send", action_send);
1019 xbt_replay_action_register("Isend", action_Isend);
1020 xbt_replay_action_register("recv", action_recv);
1021 xbt_replay_action_register("Irecv", action_Irecv);
1022 xbt_replay_action_register("test", action_test);
1023 xbt_replay_action_register("wait", action_wait);
1024 xbt_replay_action_register("waitAll", action_waitall);
1025 xbt_replay_action_register("barrier", action_barrier);
1026 xbt_replay_action_register("bcast", action_bcast);
1027 xbt_replay_action_register("reduce", action_reduce);
1028 xbt_replay_action_register("allReduce", action_allReduce);
1029 xbt_replay_action_register("allToAll", action_allToAll);
1030 xbt_replay_action_register("allToAllV", action_allToAllv);
1031 xbt_replay_action_register("gather", action_gather);
1032 xbt_replay_action_register("gatherV", action_gatherv);
1033 xbt_replay_action_register("allGatherV", action_allgatherv);
1034 xbt_replay_action_register("reduceScatter", action_reducescatter);
1035 xbt_replay_action_register("compute", action_compute);
1038 //if we have a delayed start, sleep here.
1041 double value = strtod((*argv)[2], &endptr);
1042 if (*endptr != '\0')
1043 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1044 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1045 smpi_execute_flops(value);
1047 xbt_replay_action_runner(*argc, *argv);
1050 int smpi_replay_finalize(){
1051 double sim_time= 1.;
1052 /* One active process will stop. Decrease the counter*/
1053 XBT_DEBUG("There are %lu elements in reqq[*]",
1054 xbt_dynar_length(reqq[smpi_process_index()]));
1055 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1056 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1057 MPI_Request requests[count_requests];
1058 MPI_Status status[count_requests];
1061 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1062 smpi_mpi_waitall(count_requests, requests, status);
1068 if(!active_processes){
1069 /* Last process alive speaking */
1070 /* end the simulated timer */
1071 sim_time = smpi_process_simulated_elapsed();
1075 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1077 if(!active_processes){
1078 XBT_INFO("Simulation time %f", sim_time);
1079 _xbt_replay_action_exit();
1080 xbt_free(sendbuffer);
1081 xbt_free(recvbuffer);
1088 int rank = smpi_process_index();
1089 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1090 extra->type = TRACING_FINALIZE;
1091 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1093 smpi_process_finalize();
1095 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1096 TRACE_smpi_finalize(smpi_process_index());
1098 smpi_process_destroy();