1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!smpi_process_get_replaying())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!smpi_process_get_replaying())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!smpi_process_get_replaying())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111 if (datatype==MPI_BYTE){
114 if(datatype==MPI_DOUBLE)
116 if(datatype==MPI_INT)
118 if(datatype==MPI_CHAR)
120 if(datatype==MPI_SHORT)
122 if(datatype==MPI_LONG)
124 if(datatype==MPI_FLOAT)
127 // default - not implemented.
128 // do not warn here as we pass in this function even for other trace formats
132 static void action_init(const char *const *action)
135 XBT_DEBUG("Initialize the counters");
137 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
138 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
140 /* start a simulated timer */
141 smpi_process_simulated_start();
142 /*initialize the number of active processes */
143 active_processes = smpi_process_count();
146 reqq=xbt_new0(xbt_dynar_t,active_processes);
148 for(i=0;i<active_processes;i++){
149 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
154 static void action_finalize(const char *const *action)
158 static void action_comm_size(const char *const *action)
160 double clock = smpi_process_simulated_elapsed();
162 communicator_size = parse_double(action[2]);
163 log_timed_action (action, clock);
166 static void action_comm_split(const char *const *action)
168 double clock = smpi_process_simulated_elapsed();
170 log_timed_action (action, clock);
173 static void action_comm_dup(const char *const *action)
175 double clock = smpi_process_simulated_elapsed();
177 log_timed_action (action, clock);
180 static void action_compute(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
183 double flops= parse_double(action[2]);
185 int rank = smpi_process_index();
186 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187 extra->type=TRACING_COMPUTING;
188 extra->comp_size=flops;
189 TRACE_smpi_computing_in(rank, extra);
191 smpi_execute_flops(flops);
193 TRACE_smpi_computing_out(rank);
196 log_timed_action (action, clock);
199 static void action_send(const char *const *action)
201 int to = atoi(action[2]);
202 double size=parse_double(action[3]);
203 double clock = smpi_process_simulated_elapsed();
206 MPI_CURRENT_TYPE=decode_datatype(action[4]);
208 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
212 int rank = smpi_process_index();
214 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216 extra->type = TRACING_SEND;
217 extra->send_size = size;
219 extra->dst = dst_traced;
220 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
227 log_timed_action (action, clock);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
235 static void action_Isend(const char *const *action)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process_simulated_elapsed();
242 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
246 int rank = smpi_process_index();
247 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249 extra->type = TRACING_ISEND;
250 extra->send_size = size;
252 extra->dst = dst_traced;
253 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
258 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
261 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
265 xbt_dynar_push(reqq[smpi_process_index()],&request);
267 log_timed_action (action, clock);
270 static void action_recv(const char *const *action) {
271 int from = atoi(action[2]);
272 double size=parse_double(action[3]);
273 double clock = smpi_process_simulated_elapsed();
276 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
280 int rank = smpi_process_index();
281 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
283 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284 extra->type = TRACING_RECV;
285 extra->send_size = size;
286 extra->src = src_traced;
288 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
292 //unknow size from the receiver pov
294 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
298 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
301 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302 TRACE_smpi_recv(rank, src_traced, rank);
305 log_timed_action (action, clock);
308 static void action_Irecv(const char *const *action)
310 int from = atoi(action[2]);
311 double size=parse_double(action[3]);
312 double clock = smpi_process_simulated_elapsed();
315 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
319 int rank = smpi_process_index();
320 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322 extra->type = TRACING_IRECV;
323 extra->send_size = size;
324 extra->src = src_traced;
326 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
330 //unknow size from the receiver pov
332 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
336 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
339 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
342 xbt_dynar_push(reqq[smpi_process_index()],&request);
344 log_timed_action (action, clock);
347 static void action_test(const char *const *action){
348 double clock = smpi_process_simulated_elapsed();
353 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354 xbt_assert(request != NULL, "found null request in reqq");
357 int rank = smpi_process_index();
358 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359 extra->type=TRACING_TEST;
360 TRACE_smpi_testing_in(rank, extra);
362 flag = smpi_mpi_test(&request, &status);
363 XBT_DEBUG("MPI_Test result: %d", flag);
364 /* push back request in dynar to be caught by a subsequent wait. if the test
365 * did succeed, the request is now NULL.
367 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
370 TRACE_smpi_testing_out(rank);
373 log_timed_action (action, clock);
376 static void action_wait(const char *const *action){
377 double clock = smpi_process_simulated_elapsed();
381 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
382 "action wait not preceded by any irecv or isend: %s",
383 xbt_str_join_array(action," "));
384 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
387 /* Assuming that the trace is well formed, this mean the comm might have
388 * been caught by a MPI_test. Then just return.
394 int rank = request->comm != MPI_COMM_NULL
395 ? smpi_comm_rank(request->comm)
398 MPI_Group group = smpi_comm_group(request->comm);
399 int src_traced = smpi_group_rank(group, request->src);
400 int dst_traced = smpi_group_rank(group, request->dst);
401 int is_wait_for_receive = request->recv;
402 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
403 extra->type = TRACING_WAIT;
404 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
406 smpi_mpi_wait(&request, &status);
408 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
409 if (is_wait_for_receive) {
410 TRACE_smpi_recv(rank, src_traced, dst_traced);
414 log_timed_action (action, clock);
417 static void action_waitall(const char *const *action){
418 double clock = smpi_process_simulated_elapsed();
419 int count_requests=0;
422 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
424 if (count_requests>0) {
425 MPI_Request requests[count_requests];
426 MPI_Status status[count_requests];
428 /* The reqq is an array of dynars. Its index corresponds to the rank.
429 Thus each rank saves its own requests to the array request. */
430 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
433 //save information from requests
435 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
436 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
437 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
438 for (i = 0; i < count_requests; i++) {
440 int *asrc = xbt_new(int, 1);
441 int *adst = xbt_new(int, 1);
442 int *arecv = xbt_new(int, 1);
443 *asrc = requests[i]->src;
444 *adst = requests[i]->dst;
445 *arecv = requests[i]->recv;
446 xbt_dynar_insert_at(srcs, i, asrc);
447 xbt_dynar_insert_at(dsts, i, adst);
448 xbt_dynar_insert_at(recvs, i, arecv);
453 int *t = xbt_new(int, 1);
454 xbt_dynar_insert_at(srcs, i, t);
455 xbt_dynar_insert_at(dsts, i, t);
456 xbt_dynar_insert_at(recvs, i, t);
460 int rank_traced = smpi_process_index();
461 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
462 extra->type = TRACING_WAITALL;
463 extra->send_size=count_requests;
464 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
467 smpi_mpi_waitall(count_requests, requests, status);
470 for (i = 0; i < count_requests; i++) {
471 int src_traced, dst_traced, is_wait_for_receive;
472 xbt_dynar_get_cpy(srcs, i, &src_traced);
473 xbt_dynar_get_cpy(dsts, i, &dst_traced);
474 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
475 if (is_wait_for_receive) {
476 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
479 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
481 xbt_dynar_free(&srcs);
482 xbt_dynar_free(&dsts);
483 xbt_dynar_free(&recvs);
486 int freedrank=smpi_process_index();
487 xbt_dynar_free_container(&(reqq[freedrank]));
488 reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
490 log_timed_action (action, clock);
493 static void action_barrier(const char *const *action){
494 double clock = smpi_process_simulated_elapsed();
496 int rank = smpi_process_index();
497 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
498 extra->type = TRACING_BARRIER;
499 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
501 mpi_coll_barrier_fun(MPI_COMM_WORLD);
503 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
506 log_timed_action (action, clock);
510 static void action_bcast(const char *const *action)
512 double size = parse_double(action[2]);
513 double clock = smpi_process_simulated_elapsed();
516 * Initialize MPI_CURRENT_TYPE in order to decrease
517 * the number of the checks
519 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
522 root= atoi(action[3]);
524 MPI_CURRENT_TYPE=decode_datatype(action[4]);
529 int rank = smpi_process_index();
530 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
532 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
533 extra->type = TRACING_BCAST;
534 extra->send_size = size;
535 extra->root = root_traced;
536 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
537 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
540 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
541 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
543 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
545 log_timed_action (action, clock);
548 static void action_reduce(const char *const *action)
550 double comm_size = parse_double(action[2]);
551 double comp_size = parse_double(action[3]);
552 double clock = smpi_process_simulated_elapsed();
554 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
557 root= atoi(action[4]);
559 MPI_CURRENT_TYPE=decode_datatype(action[5]);
566 int rank = smpi_process_index();
567 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
568 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
569 extra->type = TRACING_REDUCE;
570 extra->send_size = comm_size;
571 extra->comp_size = comp_size;
572 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
573 extra->root = root_traced;
575 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
577 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
578 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
579 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
580 smpi_execute_flops(comp_size);
582 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
584 log_timed_action (action, clock);
587 static void action_allReduce(const char *const *action) {
588 double comm_size = parse_double(action[2]);
589 double comp_size = parse_double(action[3]);
591 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
592 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
594 double clock = smpi_process_simulated_elapsed();
596 int rank = smpi_process_index();
597 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
598 extra->type = TRACING_ALLREDUCE;
599 extra->send_size = comm_size;
600 extra->comp_size = comp_size;
601 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
603 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
605 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
606 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
607 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
608 smpi_execute_flops(comp_size);
610 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
612 log_timed_action (action, clock);
615 static void action_allToAll(const char *const *action) {
616 double clock = smpi_process_simulated_elapsed();
617 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
618 int send_size = parse_double(action[2]);
619 int recv_size = parse_double(action[3]);
620 MPI_Datatype MPI_CURRENT_TYPE2;
623 MPI_CURRENT_TYPE=decode_datatype(action[4]);
624 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
627 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
628 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
630 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
631 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
634 int rank = smpi_process_index();
635 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
636 extra->type = TRACING_ALLTOALL;
637 extra->send_size = send_size;
638 extra->recv_size = recv_size;
639 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
640 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
642 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
645 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
648 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
650 log_timed_action (action, clock);
655 static void action_gather(const char *const *action) {
657 The structure of the gather action for the rank 0 (total 4 processes)
662 1) 68 is the sendcounts
663 2) 68 is the recvcounts
664 3) 0 is the root node
665 4) 0 is the send datatype id, see decode_datatype()
666 5) 0 is the recv datatype id, see decode_datatype()
668 double clock = smpi_process_simulated_elapsed();
669 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
670 int send_size = parse_double(action[2]);
671 int recv_size = parse_double(action[3]);
672 MPI_Datatype MPI_CURRENT_TYPE2;
674 MPI_CURRENT_TYPE=decode_datatype(action[5]);
675 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
677 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
678 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
680 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
683 int root=atoi(action[4]);
684 int rank = smpi_comm_rank(MPI_COMM_WORLD);
687 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
690 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
691 extra->type = TRACING_GATHER;
692 extra->send_size = send_size;
693 extra->recv_size = recv_size;
695 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
696 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
698 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
700 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
701 recv, recv_size, MPI_CURRENT_TYPE2,
702 root, MPI_COMM_WORLD);
705 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
707 log_timed_action (action, clock);
713 static void action_gatherv(const char *const *action) {
715 The structure of the gatherv action for the rank 0 (total 4 processes)
717 0 gather 68 68 10 10 10 0 0 0
720 1) 68 is the sendcount
721 2) 68 10 10 10 is the recvcounts
722 3) 0 is the root node
723 4) 0 is the send datatype id, see decode_datatype()
724 5) 0 is the recv datatype id, see decode_datatype()
726 double clock = smpi_process_simulated_elapsed();
727 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
728 int send_size = parse_double(action[2]);
729 int *disps = xbt_new0(int, comm_size);
730 int *recvcounts = xbt_new0(int, comm_size);
733 MPI_Datatype MPI_CURRENT_TYPE2;
734 if(action[4+comm_size]) {
735 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
736 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
738 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
739 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
741 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
743 for(i=0;i<comm_size;i++) {
744 recvcounts[i] = atoi(action[i+3]);
745 recv_sum=recv_sum+recvcounts[i];
749 int root=atoi(action[3+comm_size]);
750 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
753 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
756 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
757 extra->type = TRACING_GATHERV;
758 extra->send_size = send_size;
759 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
760 for(i=0; i< comm_size; i++)//copy data to avoid bad free
761 extra->recvcounts[i] = recvcounts[i];
763 extra->num_processes = comm_size;
764 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
765 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
767 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
769 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
770 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
771 root, MPI_COMM_WORLD);
774 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
777 log_timed_action (action, clock);
778 xbt_free(recvcounts);
782 static void action_reducescatter(const char *const *action) {
785 The structure of the reducescatter action for the rank 0 (total 4 processes)
787 0 reduceScatter 275427 275427 275427 204020 11346849 0
790 1) The first four values after the name of the action declare the recvcounts array
791 2) The value 11346849 is the amount of instructions
792 3) The last value corresponds to the datatype, see decode_datatype().
794 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
798 double clock = smpi_process_simulated_elapsed();
799 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
800 int comp_size = parse_double(action[2+comm_size]);
801 int *recvcounts = xbt_new0(int, comm_size);
802 int *disps = xbt_new0(int, comm_size);
804 int rank = smpi_process_index();
806 if(action[3+comm_size])
807 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
809 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
811 for(i=0;i<comm_size;i++) {
812 recvcounts[i] = atoi(action[i+2]);
818 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
819 extra->type = TRACING_REDUCE_SCATTER;
820 extra->send_size = 0;
821 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
822 for(i=0; i< comm_size; i++)//copy data to avoid bad free
823 extra->recvcounts[i] = recvcounts[i];
824 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
825 extra->comp_size = comp_size;
826 extra->num_processes = comm_size;
828 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
830 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
831 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
833 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
835 smpi_execute_flops(comp_size);
839 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
841 xbt_free(recvcounts);
843 log_timed_action (action, clock);
847 static void action_allgatherv(const char *const *action) {
850 The structure of the allgatherv action for the rank 0 (total 4 processes)
852 0 allGatherV 275427 275427 275427 275427 204020
855 1) 275427 is the sendcount
856 2) The next four elements declare the recvcounts array
857 3) No more values mean that the datatype for sent and receive buffer
858 is the default one, see decode_datatype().
862 double clock = smpi_process_simulated_elapsed();
864 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
866 int sendcount=atoi(action[2]);
867 int *recvcounts = xbt_new0(int, comm_size);
868 int *disps = xbt_new0(int, comm_size);
870 MPI_Datatype MPI_CURRENT_TYPE2;
872 if(action[3+comm_size]) {
873 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
874 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
876 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
877 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
879 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
881 for(i=0;i<comm_size;i++) {
882 recvcounts[i] = atoi(action[i+3]);
883 recv_sum=recv_sum+recvcounts[i];
885 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
888 int rank = smpi_process_index();
889 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
890 extra->type = TRACING_ALLGATHERV;
891 extra->send_size = sendcount;
892 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
893 for(i=0; i< comm_size; i++)//copy data to avoid bad free
894 extra->recvcounts[i] = recvcounts[i];
895 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
896 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
897 extra->num_processes = comm_size;
899 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
902 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
905 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
908 log_timed_action (action, clock);
909 xbt_free(recvcounts);
914 static void action_allToAllv(const char *const *action) {
916 The structure of the allToAllV action for the rank 0 (total 4 processes)
918 0 allToAllV 100 1 7 10 12 100 1 70 10 5
921 1) 100 is the size of the send buffer *sizeof(int),
922 2) 1 7 10 12 is the sendcounts array
923 3) 100*sizeof(int) is the size of the receiver buffer
924 4) 1 70 10 5 is the recvcounts array
929 double clock = smpi_process_simulated_elapsed();
931 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
932 int send_buf_size=0,recv_buf_size=0,i=0;
933 int *sendcounts = xbt_new0(int, comm_size);
934 int *recvcounts = xbt_new0(int, comm_size);
935 int *senddisps = xbt_new0(int, comm_size);
936 int *recvdisps = xbt_new0(int, comm_size);
938 MPI_Datatype MPI_CURRENT_TYPE2;
940 send_buf_size=parse_double(action[2]);
941 recv_buf_size=parse_double(action[3+comm_size]);
942 if(action[4+2*comm_size]) {
943 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
944 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
947 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
948 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
951 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
952 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
954 for(i=0;i<comm_size;i++) {
955 sendcounts[i] = atoi(action[i+3]);
956 recvcounts[i] = atoi(action[i+4+comm_size]);
961 int rank = smpi_process_index();
962 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
963 extra->type = TRACING_ALLTOALLV;
964 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
965 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
966 extra->num_processes = comm_size;
968 for(i=0; i< comm_size; i++){//copy data to avoid bad free
969 extra->send_size += sendcounts[i];
970 extra->sendcounts[i] = sendcounts[i];
971 extra->recv_size += recvcounts[i];
972 extra->recvcounts[i] = recvcounts[i];
974 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
975 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
977 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
979 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
980 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
983 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
986 log_timed_action (action, clock);
987 xbt_free(sendcounts);
988 xbt_free(recvcounts);
993 void smpi_replay_init(int *argc, char***argv){
994 smpi_process_init(argc, argv);
995 smpi_process_mark_as_initialized();
996 smpi_process_set_replaying(1);
998 int rank = smpi_process_index();
999 TRACE_smpi_init(rank);
1000 TRACE_smpi_computing_init(rank);
1001 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1002 extra->type = TRACING_INIT;
1003 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1004 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1007 if (!smpi_process_index()){
1008 _xbt_replay_action_init();
1009 xbt_replay_action_register("init", action_init);
1010 xbt_replay_action_register("finalize", action_finalize);
1011 xbt_replay_action_register("comm_size", action_comm_size);
1012 xbt_replay_action_register("comm_split", action_comm_split);
1013 xbt_replay_action_register("comm_dup", action_comm_dup);
1014 xbt_replay_action_register("send", action_send);
1015 xbt_replay_action_register("Isend", action_Isend);
1016 xbt_replay_action_register("recv", action_recv);
1017 xbt_replay_action_register("Irecv", action_Irecv);
1018 xbt_replay_action_register("test", action_test);
1019 xbt_replay_action_register("wait", action_wait);
1020 xbt_replay_action_register("waitAll", action_waitall);
1021 xbt_replay_action_register("barrier", action_barrier);
1022 xbt_replay_action_register("bcast", action_bcast);
1023 xbt_replay_action_register("reduce", action_reduce);
1024 xbt_replay_action_register("allReduce", action_allReduce);
1025 xbt_replay_action_register("allToAll", action_allToAll);
1026 xbt_replay_action_register("allToAllV", action_allToAllv);
1027 xbt_replay_action_register("gather", action_gather);
1028 xbt_replay_action_register("gatherV", action_gatherv);
1029 xbt_replay_action_register("allGatherV", action_allgatherv);
1030 xbt_replay_action_register("reduceScatter", action_reducescatter);
1031 xbt_replay_action_register("compute", action_compute);
1034 //if we have a delayed start, sleep here.
1037 double value = strtod((*argv)[2], &endptr);
1038 if (*endptr != '\0')
1039 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1040 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1041 smpi_execute_flops(value);
1043 xbt_replay_action_runner(*argc, *argv);
1046 int smpi_replay_finalize(){
1047 double sim_time= 1.;
1048 /* One active process will stop. Decrease the counter*/
1049 XBT_DEBUG("There are %lu elements in reqq[*]",
1050 xbt_dynar_length(reqq[smpi_process_index()]));
1051 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1052 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1053 MPI_Request requests[count_requests];
1054 MPI_Status status[count_requests];
1057 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1058 smpi_mpi_waitall(count_requests, requests, status);
1064 if(!active_processes){
1065 /* Last process alive speaking */
1066 /* end the simulated timer */
1067 sim_time = smpi_process_simulated_elapsed();
1071 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1073 if(!active_processes){
1074 XBT_INFO("Simulation time %f", sim_time);
1075 _xbt_replay_action_exit();
1076 xbt_free(sendbuffer);
1077 xbt_free(recvbuffer);
1084 int rank = smpi_process_index();
1085 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1086 extra->type = TRACING_FINALIZE;
1087 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1089 smpi_process_finalize();
1091 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1092 TRACE_smpi_finalize(smpi_process_index());
1094 smpi_process_destroy();