1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!_xbt_replay_is_active())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!_xbt_replay_is_active())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!_xbt_replay_is_active())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111 if (datatype==MPI_BYTE){
114 if(datatype==MPI_DOUBLE)
116 if(datatype==MPI_INT)
118 if(datatype==MPI_CHAR)
120 if(datatype==MPI_SHORT)
122 if(datatype==MPI_LONG)
124 if(datatype==MPI_FLOAT)
127 // default - not implemented.
128 // do not warn here as we pass in this function even for other trace formats
132 static void action_init(const char *const *action)
135 XBT_DEBUG("Initialize the counters");
137 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
138 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
140 /* start a simulated timer */
141 smpi_process_simulated_start();
142 /*initialize the number of active processes */
143 active_processes = smpi_process_count();
146 reqq=xbt_new0(xbt_dynar_t,active_processes);
148 for(i=0;i<active_processes;i++){
149 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
154 static void action_finalize(const char *const *action)
158 static void action_comm_size(const char *const *action)
160 double clock = smpi_process_simulated_elapsed();
162 communicator_size = parse_double(action[2]);
163 log_timed_action (action, clock);
166 static void action_comm_split(const char *const *action)
168 double clock = smpi_process_simulated_elapsed();
170 log_timed_action (action, clock);
173 static void action_comm_dup(const char *const *action)
175 double clock = smpi_process_simulated_elapsed();
177 log_timed_action (action, clock);
180 static void action_compute(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
183 double flops= parse_double(action[2]);
185 int rank = smpi_process_index();
186 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187 extra->type=TRACING_COMPUTING;
188 extra->comp_size=flops;
189 TRACE_smpi_computing_in(rank, extra);
191 smpi_execute_flops(flops);
193 TRACE_smpi_computing_out(rank);
196 log_timed_action (action, clock);
199 static void action_send(const char *const *action)
201 int to = atoi(action[2]);
202 double size=parse_double(action[3]);
203 double clock = smpi_process_simulated_elapsed();
206 MPI_CURRENT_TYPE=decode_datatype(action[4]);
208 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
212 int rank = smpi_process_index();
214 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216 extra->type = TRACING_SEND;
217 extra->send_size = size;
219 extra->dst = dst_traced;
220 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
227 log_timed_action (action, clock);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
235 static void action_Isend(const char *const *action)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process_simulated_elapsed();
242 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
246 int rank = smpi_process_index();
247 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249 extra->type = TRACING_ISEND;
250 extra->send_size = size;
252 extra->dst = dst_traced;
253 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
258 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
261 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
265 xbt_dynar_push(reqq[smpi_process_index()],&request);
267 log_timed_action (action, clock);
270 static void action_recv(const char *const *action) {
271 int from = atoi(action[2]);
272 double size=parse_double(action[3]);
273 double clock = smpi_process_simulated_elapsed();
276 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
280 int rank = smpi_process_index();
281 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
283 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284 extra->type = TRACING_RECV;
285 extra->send_size = size;
286 extra->src = src_traced;
288 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
292 //unknow size from the receiver pov
294 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
298 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
301 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302 TRACE_smpi_recv(rank, src_traced, rank);
305 log_timed_action (action, clock);
308 static void action_Irecv(const char *const *action)
310 int from = atoi(action[2]);
311 double size=parse_double(action[3]);
312 double clock = smpi_process_simulated_elapsed();
315 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
319 int rank = smpi_process_index();
320 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322 extra->type = TRACING_IRECV;
323 extra->send_size = size;
324 extra->src = src_traced;
326 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
330 //unknow size from the receiver pov
332 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
336 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
339 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
342 xbt_dynar_push(reqq[smpi_process_index()],&request);
344 log_timed_action (action, clock);
347 static void action_test(const char *const *action){
348 double clock = smpi_process_simulated_elapsed();
353 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354 xbt_assert(request != NULL, "found null request in reqq");
357 int rank = smpi_process_index();
358 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359 extra->type=TRACING_TEST;
360 TRACE_smpi_testing_in(rank, extra);
362 flag = smpi_mpi_test(&request, &status);
363 XBT_DEBUG("MPI_Test result: %d", flag);
364 /* push back request in dynar to be caught by a subsequent wait. if the test
365 * did succeed, the request is now NULL.
367 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
370 TRACE_smpi_testing_out(rank);
373 log_timed_action (action, clock);
376 static void action_wait(const char *const *action){
377 double clock = smpi_process_simulated_elapsed();
381 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
382 "action wait not preceded by any irecv or isend: %s",
383 xbt_str_join_array(action," "));
384 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
387 /* Assuming that the trace is well formed, this mean the comm might have
388 * been caught by a MPI_test. Then just return.
394 int rank = request->comm != MPI_COMM_NULL
395 ? smpi_comm_rank(request->comm)
398 MPI_Group group = smpi_comm_group(request->comm);
399 int src_traced = smpi_group_rank(group, request->src);
400 int dst_traced = smpi_group_rank(group, request->dst);
401 int is_wait_for_receive = request->recv;
402 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
403 extra->type = TRACING_WAIT;
404 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
406 smpi_mpi_wait(&request, &status);
408 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
409 if (is_wait_for_receive) {
410 TRACE_smpi_recv(rank, src_traced, dst_traced);
414 log_timed_action (action, clock);
417 static void action_waitall(const char *const *action){
418 double clock = smpi_process_simulated_elapsed();
419 int count_requests=0;
422 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
424 if (count_requests>0) {
425 MPI_Request requests[count_requests];
426 MPI_Status status[count_requests];
428 /* The reqq is an array of dynars. Its index corresponds to the rank.
429 Thus each rank saves its own requests to the array request. */
430 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
433 //save information from requests
435 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
436 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
437 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
438 for (i = 0; i < count_requests; i++) {
440 int *asrc = xbt_new(int, 1);
441 int *adst = xbt_new(int, 1);
442 int *arecv = xbt_new(int, 1);
443 *asrc = requests[i]->src;
444 *adst = requests[i]->dst;
445 *arecv = requests[i]->recv;
446 xbt_dynar_insert_at(srcs, i, asrc);
447 xbt_dynar_insert_at(dsts, i, adst);
448 xbt_dynar_insert_at(recvs, i, arecv);
453 int *t = xbt_new(int, 1);
454 xbt_dynar_insert_at(srcs, i, t);
455 xbt_dynar_insert_at(dsts, i, t);
456 xbt_dynar_insert_at(recvs, i, t);
460 int rank_traced = smpi_process_index();
461 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
462 extra->type = TRACING_WAITALL;
463 extra->send_size=count_requests;
464 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
467 smpi_mpi_waitall(count_requests, requests, status);
470 for (i = 0; i < count_requests; i++) {
471 int src_traced, dst_traced, is_wait_for_receive;
472 xbt_dynar_get_cpy(srcs, i, &src_traced);
473 xbt_dynar_get_cpy(dsts, i, &dst_traced);
474 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
475 if (is_wait_for_receive) {
476 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
479 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
481 xbt_dynar_free(&srcs);
482 xbt_dynar_free(&dsts);
483 xbt_dynar_free(&recvs);
486 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
488 log_timed_action (action, clock);
491 static void action_barrier(const char *const *action){
492 double clock = smpi_process_simulated_elapsed();
494 int rank = smpi_process_index();
495 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
496 extra->type = TRACING_BARRIER;
497 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
499 mpi_coll_barrier_fun(MPI_COMM_WORLD);
501 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
504 log_timed_action (action, clock);
508 static void action_bcast(const char *const *action)
510 double size = parse_double(action[2]);
511 double clock = smpi_process_simulated_elapsed();
514 * Initialize MPI_CURRENT_TYPE in order to decrease
515 * the number of the checks
517 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
520 root= atoi(action[3]);
522 MPI_CURRENT_TYPE=decode_datatype(action[4]);
527 int rank = smpi_process_index();
528 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
530 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
531 extra->type = TRACING_BCAST;
532 extra->send_size = size;
533 extra->root = root_traced;
534 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
535 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
539 mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
541 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
544 log_timed_action (action, clock);
547 static void action_reduce(const char *const *action)
549 double comm_size = parse_double(action[2]);
550 double comp_size = parse_double(action[3]);
551 double clock = smpi_process_simulated_elapsed();
553 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
556 root= atoi(action[4]);
558 MPI_CURRENT_TYPE=decode_datatype(action[5]);
563 int rank = smpi_process_index();
564 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
565 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
566 extra->type = TRACING_REDUCE;
567 extra->send_size = comm_size;
568 extra->comp_size = comp_size;
569 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
570 extra->root = root_traced;
572 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
574 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
575 smpi_execute_flops(comp_size);
577 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
580 log_timed_action (action, clock);
583 static void action_allReduce(const char *const *action) {
584 double comm_size = parse_double(action[2]);
585 double comp_size = parse_double(action[3]);
587 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
588 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
590 double clock = smpi_process_simulated_elapsed();
592 int rank = smpi_process_index();
593 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
594 extra->type = TRACING_ALLREDUCE;
595 extra->send_size = comm_size;
596 extra->comp_size = comp_size;
597 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
599 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
601 mpi_coll_allreduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
602 smpi_execute_flops(comp_size);
604 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
607 log_timed_action (action, clock);
610 static void action_allToAll(const char *const *action) {
611 double clock = smpi_process_simulated_elapsed();
612 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
613 int send_size = parse_double(action[2]);
614 int recv_size = parse_double(action[3]);
615 MPI_Datatype MPI_CURRENT_TYPE2;
618 MPI_CURRENT_TYPE=decode_datatype(action[4]);
619 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
622 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
623 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
625 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
626 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
629 int rank = smpi_process_index();
630 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
631 extra->type = TRACING_ALLTOALL;
632 extra->send_size = send_size;
633 extra->recv_size = recv_size;
634 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
635 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
637 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
640 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
643 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
646 log_timed_action (action, clock);
651 static void action_gather(const char *const *action) {
653 The structure of the gather action for the rank 0 (total 4 processes)
658 1) 68 is the sendcounts
659 2) 68 is the recvcounts
660 3) 0 is the root node
661 4) 0 is the send datatype id, see decode_datatype()
662 5) 0 is the recv datatype id, see decode_datatype()
664 double clock = smpi_process_simulated_elapsed();
665 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
666 int send_size = parse_double(action[2]);
667 int recv_size = parse_double(action[3]);
668 MPI_Datatype MPI_CURRENT_TYPE2;
670 MPI_CURRENT_TYPE=decode_datatype(action[5]);
671 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
673 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
674 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
676 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
679 int root=atoi(action[4]);
680 int rank = smpi_process_index();
683 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
686 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
687 extra->type = TRACING_GATHER;
688 extra->send_size = send_size;
689 extra->recv_size = recv_size;
691 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
692 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
694 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
696 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
697 recv, recv_size, MPI_CURRENT_TYPE2,
698 root, MPI_COMM_WORLD);
701 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
704 log_timed_action (action, clock);
710 static void action_gatherv(const char *const *action) {
712 The structure of the gatherv action for the rank 0 (total 4 processes)
714 0 gather 68 68 10 10 10 0 0 0
717 1) 68 is the sendcount
718 2) 68 10 10 10 is the recvcounts
719 3) 0 is the root node
720 4) 0 is the send datatype id, see decode_datatype()
721 5) 0 is the recv datatype id, see decode_datatype()
723 double clock = smpi_process_simulated_elapsed();
724 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
725 int send_size = parse_double(action[2]);
726 int *disps = xbt_new0(int, comm_size);
727 int *recvcounts = xbt_new0(int, comm_size);
730 MPI_Datatype MPI_CURRENT_TYPE2;
731 if(action[4+comm_size]) {
732 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
733 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
735 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
736 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
738 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
740 for(i=0;i<comm_size;i++) {
741 recvcounts[i] = atoi(action[i+3]);
742 recv_sum=recv_sum+recvcounts[i];
746 int root=atoi(action[3+comm_size]);
747 int rank = smpi_process_index();
750 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
753 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
754 extra->type = TRACING_GATHERV;
755 extra->send_size = send_size;
756 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
757 for(i=0; i< comm_size; i++)//copy data to avoid bad free
758 extra->recvcounts[i] = recvcounts[i];
760 extra->num_processes = comm_size;
761 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
762 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
764 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
766 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
767 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
768 root, MPI_COMM_WORLD);
771 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
774 log_timed_action (action, clock);
775 xbt_free(recvcounts);
780 static void action_reducescatter(const char *const *action) {
783 The structure of the reducescatter action for the rank 0 (total 4 processes)
785 0 reduceScatter 275427 275427 275427 204020 11346849 0
788 1) The first four values after the name of the action declare the recvcounts array
789 2) The value 11346849 is the amount of instructions
790 3) The last value corresponds to the datatype, see decode_datatype().
792 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
796 double clock = smpi_process_simulated_elapsed();
797 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
798 int comp_size = parse_double(action[2+comm_size]);
799 int *recvcounts = xbt_new0(int, comm_size);
800 int *disps = xbt_new0(int, comm_size);
802 int rank = smpi_process_index();
804 if(action[3+comm_size])
805 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
807 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
809 for(i=0;i<comm_size;i++) {
810 recvcounts[i] = atoi(action[i+2]);
815 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
816 extra->type = TRACING_REDUCE_SCATTER;
817 extra->send_size = 0;
818 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
819 for(i=0; i< comm_size; i++)//copy data to avoid bad free
820 extra->recvcounts[i] = recvcounts[i];
821 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
822 extra->comp_size = comp_size;
823 extra->num_processes = comm_size;
826 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
828 mpi_coll_reduce_scatter_fun(NULL, NULL, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
830 smpi_execute_flops(comp_size);
834 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
836 xbt_free(recvcounts);
838 log_timed_action (action, clock);
842 static void action_allgatherv(const char *const *action) {
845 The structure of the allgatherv action for the rank 0 (total 4 processes)
847 0 allGatherV 275427 275427 275427 275427 204020
850 1) 275427 is the sendcount
851 2) The next four elements declare the recvcounts array
852 3) No more values mean that the datatype for sent and receive buffer
853 is the default one, see decode_datatype().
857 double clock = smpi_process_simulated_elapsed();
859 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
861 int sendcount=atoi(action[2]);
862 int *recvcounts = xbt_new0(int, comm_size);
863 int *disps = xbt_new0(int, comm_size);
865 MPI_Datatype MPI_CURRENT_TYPE2;
867 if(action[3+comm_size]) {
868 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
869 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
871 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
872 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
874 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
876 for(i=0;i<comm_size;i++) {
877 recvcounts[i] = atoi(action[i+3]);
878 recv_sum=recv_sum+recvcounts[i];
880 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
883 int rank = smpi_process_index();
884 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
885 extra->type = TRACING_ALLGATHERV;
886 extra->send_size = sendcount;
887 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
888 for(i=0; i< comm_size; i++)//copy data to avoid bad free
889 extra->recvcounts[i] = recvcounts[i];
890 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
891 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
892 extra->num_processes = comm_size;
894 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
897 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
900 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
903 log_timed_action (action, clock);
904 xbt_free(recvcounts);
909 static void action_allToAllv(const char *const *action) {
911 The structure of the allToAllV action for the rank 0 (total 4 processes)
913 0 allToAllV 100 1 7 10 12 100 1 70 10 5
916 1) 100 is the size of the send buffer *sizeof(int),
917 2) 1 7 10 12 is the sendcounts array
918 3) 100*sizeof(int) is the size of the receiver buffer
919 4) 1 70 10 5 is the recvcounts array
924 double clock = smpi_process_simulated_elapsed();
926 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
927 int send_buf_size=0,recv_buf_size=0,i=0;
928 int *sendcounts = xbt_new0(int, comm_size);
929 int *recvcounts = xbt_new0(int, comm_size);
930 int *senddisps = xbt_new0(int, comm_size);
931 int *recvdisps = xbt_new0(int, comm_size);
933 MPI_Datatype MPI_CURRENT_TYPE2;
935 send_buf_size=parse_double(action[2]);
936 recv_buf_size=parse_double(action[3+comm_size]);
937 if(action[4+2*comm_size]) {
938 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
939 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
942 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
943 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
946 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
947 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
949 for(i=0;i<comm_size;i++) {
950 sendcounts[i] = atoi(action[i+3]);
951 recvcounts[i] = atoi(action[i+4+comm_size]);
956 int rank = smpi_process_index();
957 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
958 extra->type = TRACING_ALLTOALLV;
959 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
960 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
961 extra->num_processes = comm_size;
963 for(i=0; i< comm_size; i++){//copy data to avoid bad free
964 extra->send_size += sendcounts[i];
965 extra->sendcounts[i] = sendcounts[i];
966 extra->recv_size += recvcounts[i];
967 extra->recvcounts[i] = recvcounts[i];
969 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
970 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
972 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
974 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
975 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
978 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
981 log_timed_action (action, clock);
982 xbt_free(sendcounts);
983 xbt_free(recvcounts);
988 void smpi_replay_init(int *argc, char***argv){
989 smpi_process_init(argc, argv);
990 smpi_process_mark_as_initialized();
992 int rank = smpi_process_index();
993 TRACE_smpi_init(rank);
994 TRACE_smpi_computing_init(rank);
995 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
996 extra->type = TRACING_INIT;
997 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
998 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1001 if (!smpi_process_index()){
1002 _xbt_replay_action_init();
1003 xbt_replay_action_register("init", action_init);
1004 xbt_replay_action_register("finalize", action_finalize);
1005 xbt_replay_action_register("comm_size", action_comm_size);
1006 xbt_replay_action_register("comm_split", action_comm_split);
1007 xbt_replay_action_register("comm_dup", action_comm_dup);
1008 xbt_replay_action_register("send", action_send);
1009 xbt_replay_action_register("Isend", action_Isend);
1010 xbt_replay_action_register("recv", action_recv);
1011 xbt_replay_action_register("Irecv", action_Irecv);
1012 xbt_replay_action_register("test", action_test);
1013 xbt_replay_action_register("wait", action_wait);
1014 xbt_replay_action_register("waitAll", action_waitall);
1015 xbt_replay_action_register("barrier", action_barrier);
1016 xbt_replay_action_register("bcast", action_bcast);
1017 xbt_replay_action_register("reduce", action_reduce);
1018 xbt_replay_action_register("allReduce", action_allReduce);
1019 xbt_replay_action_register("allToAll", action_allToAll);
1020 xbt_replay_action_register("allToAllV", action_allToAllv);
1021 xbt_replay_action_register("gather", action_gather);
1022 xbt_replay_action_register("gatherV", action_gatherv);
1023 xbt_replay_action_register("allGatherV", action_allgatherv);
1024 xbt_replay_action_register("reduceScatter", action_reducescatter);
1025 xbt_replay_action_register("compute", action_compute);
1028 //if we have a delayed start, sleep here.
1031 double value = strtod((*argv)[2], &endptr);
1032 if (*endptr != '\0')
1033 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1034 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1035 smpi_execute_flops(value);
1037 xbt_replay_action_runner(*argc, *argv);
1040 int smpi_replay_finalize(){
1041 double sim_time= 1.;
1042 /* One active process will stop. Decrease the counter*/
1043 XBT_DEBUG("There are %lu elements in reqq[*]",
1044 xbt_dynar_length(reqq[smpi_process_index()]));
1045 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1046 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1047 MPI_Request requests[count_requests];
1048 MPI_Status status[count_requests];
1051 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1052 smpi_mpi_waitall(count_requests, requests, status);
1058 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1060 if(!active_processes){
1061 /* Last process alive speaking */
1062 /* end the simulated timer */
1063 sim_time = smpi_process_simulated_elapsed();
1064 XBT_INFO("Simulation time %f", sim_time);
1065 _xbt_replay_action_exit();
1066 xbt_free(sendbuffer);
1067 xbt_free(recvbuffer);
1071 mpi_coll_barrier_fun(MPI_COMM_WORLD);
1073 int rank = smpi_process_index();
1074 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1075 extra->type = TRACING_FINALIZE;
1076 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1078 smpi_process_finalize();
1080 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1081 TRACE_smpi_finalize(smpi_process_index());
1083 smpi_process_destroy();