1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!smpi_process_get_replaying())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!smpi_process_get_replaying())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!smpi_process_get_replaying())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111 if (datatype==MPI_BYTE){
114 if(datatype==MPI_DOUBLE)
116 if(datatype==MPI_INT)
118 if(datatype==MPI_CHAR)
120 if(datatype==MPI_SHORT)
122 if(datatype==MPI_LONG)
124 if(datatype==MPI_FLOAT)
127 // default - not implemented.
128 // do not warn here as we pass in this function even for other trace formats
132 static void action_init(const char *const *action)
135 XBT_DEBUG("Initialize the counters");
137 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
138 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
140 /* start a simulated timer */
141 smpi_process_simulated_start();
142 /*initialize the number of active processes */
143 active_processes = smpi_process_count();
146 reqq=xbt_new0(xbt_dynar_t,active_processes);
148 for(i=0;i<active_processes;i++){
149 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
154 static void action_finalize(const char *const *action)
158 static void action_comm_size(const char *const *action)
160 double clock = smpi_process_simulated_elapsed();
162 communicator_size = parse_double(action[2]);
163 log_timed_action (action, clock);
166 static void action_comm_split(const char *const *action)
168 double clock = smpi_process_simulated_elapsed();
170 log_timed_action (action, clock);
173 static void action_comm_dup(const char *const *action)
175 double clock = smpi_process_simulated_elapsed();
177 log_timed_action (action, clock);
180 static void action_compute(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
183 double flops= parse_double(action[2]);
185 int rank = smpi_process_index();
186 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187 extra->type=TRACING_COMPUTING;
188 extra->comp_size=flops;
189 TRACE_smpi_computing_in(rank, extra);
191 smpi_execute_flops(flops);
193 TRACE_smpi_computing_out(rank);
196 log_timed_action (action, clock);
199 static void action_send(const char *const *action)
201 int to = atoi(action[2]);
202 double size=parse_double(action[3]);
203 double clock = smpi_process_simulated_elapsed();
206 MPI_CURRENT_TYPE=decode_datatype(action[4]);
208 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
212 int rank = smpi_process_index();
214 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216 extra->type = TRACING_SEND;
217 extra->send_size = size;
219 extra->dst = dst_traced;
220 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
227 log_timed_action (action, clock);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
235 static void action_Isend(const char *const *action)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process_simulated_elapsed();
242 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
246 int rank = smpi_process_index();
247 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249 extra->type = TRACING_ISEND;
250 extra->send_size = size;
252 extra->dst = dst_traced;
253 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
258 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
261 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
265 xbt_dynar_push(reqq[smpi_process_index()],&request);
267 log_timed_action (action, clock);
270 static void action_recv(const char *const *action) {
271 int from = atoi(action[2]);
272 double size=parse_double(action[3]);
273 double clock = smpi_process_simulated_elapsed();
276 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
280 int rank = smpi_process_index();
281 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
283 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284 extra->type = TRACING_RECV;
285 extra->send_size = size;
286 extra->src = src_traced;
288 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
292 //unknow size from the receiver pov
294 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
298 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
301 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302 TRACE_smpi_recv(rank, src_traced, rank);
305 log_timed_action (action, clock);
308 static void action_Irecv(const char *const *action)
310 int from = atoi(action[2]);
311 double size=parse_double(action[3]);
312 double clock = smpi_process_simulated_elapsed();
315 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
319 int rank = smpi_process_index();
320 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322 extra->type = TRACING_IRECV;
323 extra->send_size = size;
324 extra->src = src_traced;
326 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
330 //unknow size from the receiver pov
332 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
336 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
339 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
342 xbt_dynar_push(reqq[smpi_process_index()],&request);
344 log_timed_action (action, clock);
347 static void action_test(const char *const *action){
348 double clock = smpi_process_simulated_elapsed();
353 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354 xbt_assert(request != NULL, "found null request in reqq");
357 int rank = smpi_process_index();
358 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359 extra->type=TRACING_TEST;
360 TRACE_smpi_testing_in(rank, extra);
362 flag = smpi_mpi_test(&request, &status);
363 XBT_DEBUG("MPI_Test result: %d", flag);
364 /* push back request in dynar to be caught by a subsequent wait. if the test
365 * did succeed, the request is now NULL.
367 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
370 TRACE_smpi_testing_out(rank);
373 log_timed_action (action, clock);
376 static void action_wait(const char *const *action){
377 double clock = smpi_process_simulated_elapsed();
381 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
382 "action wait not preceded by any irecv or isend: %s",
383 xbt_str_join_array(action," "));
384 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
387 /* Assuming that the trace is well formed, this mean the comm might have
388 * been caught by a MPI_test. Then just return.
394 int rank = request->comm != MPI_COMM_NULL
395 ? smpi_comm_rank(request->comm)
398 MPI_Group group = smpi_comm_group(request->comm);
399 int src_traced = smpi_group_rank(group, request->src);
400 int dst_traced = smpi_group_rank(group, request->dst);
401 int is_wait_for_receive = request->recv;
402 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
403 extra->type = TRACING_WAIT;
404 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
406 smpi_mpi_wait(&request, &status);
408 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
409 if (is_wait_for_receive) {
410 TRACE_smpi_recv(rank, src_traced, dst_traced);
414 log_timed_action (action, clock);
417 static void action_waitall(const char *const *action){
418 double clock = smpi_process_simulated_elapsed();
419 int count_requests=0;
422 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
424 if (count_requests>0) {
425 MPI_Request requests[count_requests];
426 MPI_Status status[count_requests];
428 /* The reqq is an array of dynars. Its index corresponds to the rank.
429 Thus each rank saves its own requests to the array request. */
430 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
433 //save information from requests
435 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
436 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
437 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
438 for (i = 0; i < count_requests; i++) {
440 int *asrc = xbt_new(int, 1);
441 int *adst = xbt_new(int, 1);
442 int *arecv = xbt_new(int, 1);
443 *asrc = requests[i]->src;
444 *adst = requests[i]->dst;
445 *arecv = requests[i]->recv;
446 xbt_dynar_insert_at(srcs, i, asrc);
447 xbt_dynar_insert_at(dsts, i, adst);
448 xbt_dynar_insert_at(recvs, i, arecv);
453 int *t = xbt_new(int, 1);
454 xbt_dynar_insert_at(srcs, i, t);
455 xbt_dynar_insert_at(dsts, i, t);
456 xbt_dynar_insert_at(recvs, i, t);
460 int rank_traced = smpi_process_index();
461 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
462 extra->type = TRACING_WAITALL;
463 extra->send_size=count_requests;
464 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
467 smpi_mpi_waitall(count_requests, requests, status);
470 for (i = 0; i < count_requests; i++) {
471 int src_traced, dst_traced, is_wait_for_receive;
472 xbt_dynar_get_cpy(srcs, i, &src_traced);
473 xbt_dynar_get_cpy(dsts, i, &dst_traced);
474 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
475 if (is_wait_for_receive) {
476 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
479 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
481 xbt_dynar_free(&srcs);
482 xbt_dynar_free(&dsts);
483 xbt_dynar_free(&recvs);
486 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
488 log_timed_action (action, clock);
491 static void action_barrier(const char *const *action){
492 double clock = smpi_process_simulated_elapsed();
494 int rank = smpi_process_index();
495 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
496 extra->type = TRACING_BARRIER;
497 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
499 mpi_coll_barrier_fun(MPI_COMM_WORLD);
501 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
504 log_timed_action (action, clock);
508 static void action_bcast(const char *const *action)
510 double size = parse_double(action[2]);
511 double clock = smpi_process_simulated_elapsed();
514 * Initialize MPI_CURRENT_TYPE in order to decrease
515 * the number of the checks
517 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
520 root= atoi(action[3]);
522 MPI_CURRENT_TYPE=decode_datatype(action[4]);
527 int rank = smpi_process_index();
528 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
530 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
531 extra->type = TRACING_BCAST;
532 extra->send_size = size;
533 extra->root = root_traced;
534 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
535 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
538 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
539 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
541 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
543 log_timed_action (action, clock);
546 static void action_reduce(const char *const *action)
548 double comm_size = parse_double(action[2]);
549 double comp_size = parse_double(action[3]);
550 double clock = smpi_process_simulated_elapsed();
552 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
555 root= atoi(action[4]);
557 MPI_CURRENT_TYPE=decode_datatype(action[5]);
564 int rank = smpi_process_index();
565 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
566 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
567 extra->type = TRACING_REDUCE;
568 extra->send_size = comm_size;
569 extra->comp_size = comp_size;
570 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
571 extra->root = root_traced;
573 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
575 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
576 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
577 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
578 smpi_execute_flops(comp_size);
580 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
582 log_timed_action (action, clock);
585 static void action_allReduce(const char *const *action) {
586 double comm_size = parse_double(action[2]);
587 double comp_size = parse_double(action[3]);
589 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
590 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
592 double clock = smpi_process_simulated_elapsed();
594 int rank = smpi_process_index();
595 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
596 extra->type = TRACING_ALLREDUCE;
597 extra->send_size = comm_size;
598 extra->comp_size = comp_size;
599 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
601 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
603 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
604 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
605 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
606 smpi_execute_flops(comp_size);
608 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
610 log_timed_action (action, clock);
613 static void action_allToAll(const char *const *action) {
614 double clock = smpi_process_simulated_elapsed();
615 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
616 int send_size = parse_double(action[2]);
617 int recv_size = parse_double(action[3]);
618 MPI_Datatype MPI_CURRENT_TYPE2;
621 MPI_CURRENT_TYPE=decode_datatype(action[4]);
622 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
625 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
626 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
628 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
629 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
632 int rank = smpi_process_index();
633 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
634 extra->type = TRACING_ALLTOALL;
635 extra->send_size = send_size;
636 extra->recv_size = recv_size;
637 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
638 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
640 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
643 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
646 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
648 log_timed_action (action, clock);
653 static void action_gather(const char *const *action) {
655 The structure of the gather action for the rank 0 (total 4 processes)
660 1) 68 is the sendcounts
661 2) 68 is the recvcounts
662 3) 0 is the root node
663 4) 0 is the send datatype id, see decode_datatype()
664 5) 0 is the recv datatype id, see decode_datatype()
666 double clock = smpi_process_simulated_elapsed();
667 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
668 int send_size = parse_double(action[2]);
669 int recv_size = parse_double(action[3]);
670 MPI_Datatype MPI_CURRENT_TYPE2;
672 MPI_CURRENT_TYPE=decode_datatype(action[5]);
673 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
675 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
676 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
678 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
681 int root=atoi(action[4]);
682 int rank = smpi_comm_rank(MPI_COMM_WORLD);
685 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
688 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
689 extra->type = TRACING_GATHER;
690 extra->send_size = send_size;
691 extra->recv_size = recv_size;
693 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
694 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
696 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
698 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
699 recv, recv_size, MPI_CURRENT_TYPE2,
700 root, MPI_COMM_WORLD);
703 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
705 log_timed_action (action, clock);
711 static void action_gatherv(const char *const *action) {
713 The structure of the gatherv action for the rank 0 (total 4 processes)
715 0 gather 68 68 10 10 10 0 0 0
718 1) 68 is the sendcount
719 2) 68 10 10 10 is the recvcounts
720 3) 0 is the root node
721 4) 0 is the send datatype id, see decode_datatype()
722 5) 0 is the recv datatype id, see decode_datatype()
724 double clock = smpi_process_simulated_elapsed();
725 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
726 int send_size = parse_double(action[2]);
727 int *disps = xbt_new0(int, comm_size);
728 int *recvcounts = xbt_new0(int, comm_size);
731 MPI_Datatype MPI_CURRENT_TYPE2;
732 if(action[4+comm_size]) {
733 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
734 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
736 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
737 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
739 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
741 for(i=0;i<comm_size;i++) {
742 recvcounts[i] = atoi(action[i+3]);
743 recv_sum=recv_sum+recvcounts[i];
747 int root=atoi(action[3+comm_size]);
748 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
751 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
754 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
755 extra->type = TRACING_GATHERV;
756 extra->send_size = send_size;
757 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
758 for(i=0; i< comm_size; i++)//copy data to avoid bad free
759 extra->recvcounts[i] = recvcounts[i];
761 extra->num_processes = comm_size;
762 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
763 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
765 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
767 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
768 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
769 root, MPI_COMM_WORLD);
772 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
775 log_timed_action (action, clock);
776 xbt_free(recvcounts);
780 static void action_reducescatter(const char *const *action) {
783 The structure of the reducescatter action for the rank 0 (total 4 processes)
785 0 reduceScatter 275427 275427 275427 204020 11346849 0
788 1) The first four values after the name of the action declare the recvcounts array
789 2) The value 11346849 is the amount of instructions
790 3) The last value corresponds to the datatype, see decode_datatype().
792 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
796 double clock = smpi_process_simulated_elapsed();
797 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
798 int comp_size = parse_double(action[2+comm_size]);
799 int *recvcounts = xbt_new0(int, comm_size);
800 int *disps = xbt_new0(int, comm_size);
802 int rank = smpi_process_index();
804 if(action[3+comm_size])
805 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
807 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
809 for(i=0;i<comm_size;i++) {
810 recvcounts[i] = atoi(action[i+2]);
816 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
817 extra->type = TRACING_REDUCE_SCATTER;
818 extra->send_size = 0;
819 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
820 for(i=0; i< comm_size; i++)//copy data to avoid bad free
821 extra->recvcounts[i] = recvcounts[i];
822 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
823 extra->comp_size = comp_size;
824 extra->num_processes = comm_size;
826 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
828 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
829 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
831 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
833 smpi_execute_flops(comp_size);
837 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
839 xbt_free(recvcounts);
841 log_timed_action (action, clock);
845 static void action_allgatherv(const char *const *action) {
848 The structure of the allgatherv action for the rank 0 (total 4 processes)
850 0 allGatherV 275427 275427 275427 275427 204020
853 1) 275427 is the sendcount
854 2) The next four elements declare the recvcounts array
855 3) No more values mean that the datatype for sent and receive buffer
856 is the default one, see decode_datatype().
860 double clock = smpi_process_simulated_elapsed();
862 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
864 int sendcount=atoi(action[2]);
865 int *recvcounts = xbt_new0(int, comm_size);
866 int *disps = xbt_new0(int, comm_size);
868 MPI_Datatype MPI_CURRENT_TYPE2;
870 if(action[3+comm_size]) {
871 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
872 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
874 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
875 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
877 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
879 for(i=0;i<comm_size;i++) {
880 recvcounts[i] = atoi(action[i+3]);
881 recv_sum=recv_sum+recvcounts[i];
883 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
886 int rank = smpi_process_index();
887 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
888 extra->type = TRACING_ALLGATHERV;
889 extra->send_size = sendcount;
890 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
891 for(i=0; i< comm_size; i++)//copy data to avoid bad free
892 extra->recvcounts[i] = recvcounts[i];
893 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
894 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
895 extra->num_processes = comm_size;
897 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
900 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
903 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
906 log_timed_action (action, clock);
907 xbt_free(recvcounts);
912 static void action_allToAllv(const char *const *action) {
914 The structure of the allToAllV action for the rank 0 (total 4 processes)
916 0 allToAllV 100 1 7 10 12 100 1 70 10 5
919 1) 100 is the size of the send buffer *sizeof(int),
920 2) 1 7 10 12 is the sendcounts array
921 3) 100*sizeof(int) is the size of the receiver buffer
922 4) 1 70 10 5 is the recvcounts array
927 double clock = smpi_process_simulated_elapsed();
929 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
930 int send_buf_size=0,recv_buf_size=0,i=0;
931 int *sendcounts = xbt_new0(int, comm_size);
932 int *recvcounts = xbt_new0(int, comm_size);
933 int *senddisps = xbt_new0(int, comm_size);
934 int *recvdisps = xbt_new0(int, comm_size);
936 MPI_Datatype MPI_CURRENT_TYPE2;
938 send_buf_size=parse_double(action[2]);
939 recv_buf_size=parse_double(action[3+comm_size]);
940 if(action[4+2*comm_size]) {
941 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
942 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
945 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
946 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
949 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
950 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
952 for(i=0;i<comm_size;i++) {
953 sendcounts[i] = atoi(action[i+3]);
954 recvcounts[i] = atoi(action[i+4+comm_size]);
959 int rank = smpi_process_index();
960 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
961 extra->type = TRACING_ALLTOALLV;
962 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
963 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
964 extra->num_processes = comm_size;
966 for(i=0; i< comm_size; i++){//copy data to avoid bad free
967 extra->send_size += sendcounts[i];
968 extra->sendcounts[i] = sendcounts[i];
969 extra->recv_size += recvcounts[i];
970 extra->recvcounts[i] = recvcounts[i];
972 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
973 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
975 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
977 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
978 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
981 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
984 log_timed_action (action, clock);
985 xbt_free(sendcounts);
986 xbt_free(recvcounts);
991 void smpi_replay_init(int *argc, char***argv){
992 smpi_process_init(argc, argv);
993 smpi_process_mark_as_initialized();
994 smpi_process_set_replaying(1);
996 int rank = smpi_process_index();
997 TRACE_smpi_init(rank);
998 TRACE_smpi_computing_init(rank);
999 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1000 extra->type = TRACING_INIT;
1001 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1002 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1005 if (!smpi_process_index()){
1006 _xbt_replay_action_init();
1007 xbt_replay_action_register("init", action_init);
1008 xbt_replay_action_register("finalize", action_finalize);
1009 xbt_replay_action_register("comm_size", action_comm_size);
1010 xbt_replay_action_register("comm_split", action_comm_split);
1011 xbt_replay_action_register("comm_dup", action_comm_dup);
1012 xbt_replay_action_register("send", action_send);
1013 xbt_replay_action_register("Isend", action_Isend);
1014 xbt_replay_action_register("recv", action_recv);
1015 xbt_replay_action_register("Irecv", action_Irecv);
1016 xbt_replay_action_register("test", action_test);
1017 xbt_replay_action_register("wait", action_wait);
1018 xbt_replay_action_register("waitAll", action_waitall);
1019 xbt_replay_action_register("barrier", action_barrier);
1020 xbt_replay_action_register("bcast", action_bcast);
1021 xbt_replay_action_register("reduce", action_reduce);
1022 xbt_replay_action_register("allReduce", action_allReduce);
1023 xbt_replay_action_register("allToAll", action_allToAll);
1024 xbt_replay_action_register("allToAllV", action_allToAllv);
1025 xbt_replay_action_register("gather", action_gather);
1026 xbt_replay_action_register("gatherV", action_gatherv);
1027 xbt_replay_action_register("allGatherV", action_allgatherv);
1028 xbt_replay_action_register("reduceScatter", action_reducescatter);
1029 xbt_replay_action_register("compute", action_compute);
1032 //if we have a delayed start, sleep here.
1035 double value = strtod((*argv)[2], &endptr);
1036 if (*endptr != '\0')
1037 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1038 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1039 smpi_execute_flops(value);
1041 xbt_replay_action_runner(*argc, *argv);
1044 int smpi_replay_finalize(){
1045 double sim_time= 1.;
1046 /* One active process will stop. Decrease the counter*/
1047 XBT_DEBUG("There are %lu elements in reqq[*]",
1048 xbt_dynar_length(reqq[smpi_process_index()]));
1049 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1050 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1051 MPI_Request requests[count_requests];
1052 MPI_Status status[count_requests];
1055 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1056 smpi_mpi_waitall(count_requests, requests, status);
1062 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1064 if(!active_processes){
1065 /* Last process alive speaking */
1066 /* end the simulated timer */
1067 sim_time = smpi_process_simulated_elapsed();
1068 XBT_INFO("Simulation time %f", sim_time);
1069 _xbt_replay_action_exit();
1070 xbt_free(sendbuffer);
1071 xbt_free(recvbuffer);
1075 mpi_coll_barrier_fun(MPI_COMM_WORLD);
1077 int rank = smpi_process_index();
1078 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1079 extra->type = TRACING_FINALIZE;
1080 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1082 smpi_process_finalize();
1084 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1085 TRACE_smpi_finalize(smpi_process_index());
1087 smpi_process_destroy();