1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!smpi_process_get_replaying())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!smpi_process_get_replaying())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!smpi_process_get_replaying())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111 if (datatype==MPI_BYTE){
114 if(datatype==MPI_DOUBLE)
116 if(datatype==MPI_INT)
118 if(datatype==MPI_CHAR)
120 if(datatype==MPI_SHORT)
122 if(datatype==MPI_LONG)
124 if(datatype==MPI_FLOAT)
127 // default - not implemented.
128 // do not warn here as we pass in this function even for other trace formats
132 static void action_init(const char *const *action)
135 XBT_DEBUG("Initialize the counters");
137 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
138 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
140 /* start a simulated timer */
141 smpi_process_simulated_start();
142 /*initialize the number of active processes */
143 active_processes = smpi_process_count();
146 reqq=xbt_new0(xbt_dynar_t,active_processes);
148 for(i=0;i<active_processes;i++){
149 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
154 static void action_finalize(const char *const *action)
158 static void action_comm_size(const char *const *action)
160 double clock = smpi_process_simulated_elapsed();
162 communicator_size = parse_double(action[2]);
163 log_timed_action (action, clock);
166 static void action_comm_split(const char *const *action)
168 double clock = smpi_process_simulated_elapsed();
170 log_timed_action (action, clock);
173 static void action_comm_dup(const char *const *action)
175 double clock = smpi_process_simulated_elapsed();
177 log_timed_action (action, clock);
180 static void action_compute(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
183 double flops= parse_double(action[2]);
185 int rank = smpi_process_index();
186 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187 extra->type=TRACING_COMPUTING;
188 extra->comp_size=flops;
189 TRACE_smpi_computing_in(rank, extra);
191 smpi_execute_flops(flops);
193 TRACE_smpi_computing_out(rank);
196 log_timed_action (action, clock);
199 static void action_send(const char *const *action)
201 int to = atoi(action[2]);
202 double size=parse_double(action[3]);
203 double clock = smpi_process_simulated_elapsed();
206 MPI_CURRENT_TYPE=decode_datatype(action[4]);
208 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
212 int rank = smpi_process_index();
214 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216 extra->type = TRACING_SEND;
217 extra->send_size = size;
219 extra->dst = dst_traced;
220 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
227 log_timed_action (action, clock);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
235 static void action_Isend(const char *const *action)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process_simulated_elapsed();
242 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
246 int rank = smpi_process_index();
247 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249 extra->type = TRACING_ISEND;
250 extra->send_size = size;
252 extra->dst = dst_traced;
253 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
258 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
261 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
265 xbt_dynar_push(reqq[smpi_process_index()],&request);
267 log_timed_action (action, clock);
270 static void action_recv(const char *const *action) {
271 int from = atoi(action[2]);
272 double size=parse_double(action[3]);
273 double clock = smpi_process_simulated_elapsed();
276 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
280 int rank = smpi_process_index();
281 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
283 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284 extra->type = TRACING_RECV;
285 extra->send_size = size;
286 extra->src = src_traced;
288 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
292 //unknow size from the receiver pov
294 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
298 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
301 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302 TRACE_smpi_recv(rank, src_traced, rank);
305 log_timed_action (action, clock);
308 static void action_Irecv(const char *const *action)
310 int from = atoi(action[2]);
311 double size=parse_double(action[3]);
312 double clock = smpi_process_simulated_elapsed();
315 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
319 int rank = smpi_process_index();
320 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322 extra->type = TRACING_IRECV;
323 extra->send_size = size;
324 extra->src = src_traced;
326 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
330 //unknow size from the receiver pov
332 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
336 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
339 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
342 xbt_dynar_push(reqq[smpi_process_index()],&request);
344 log_timed_action (action, clock);
347 static void action_test(const char *const *action){
348 double clock = smpi_process_simulated_elapsed();
353 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354 xbt_assert(request != NULL, "found null request in reqq");
357 int rank = smpi_process_index();
358 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359 extra->type=TRACING_TEST;
360 TRACE_smpi_testing_in(rank, extra);
362 flag = smpi_mpi_test(&request, &status);
363 XBT_DEBUG("MPI_Test result: %d", flag);
364 /* push back request in dynar to be caught by a subsequent wait. if the test
365 * did succeed, the request is now NULL.
368 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
371 TRACE_smpi_testing_out(rank);
374 log_timed_action (action, clock);
377 static void action_wait(const char *const *action){
378 double clock = smpi_process_simulated_elapsed();
382 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
383 "action wait not preceded by any irecv or isend: %s",
384 xbt_str_join_array(action," "));
385 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
388 /* Assuming that the trace is well formed, this mean the comm might have
389 * been caught by a MPI_test. Then just return.
395 int rank = request->comm != MPI_COMM_NULL
396 ? smpi_comm_rank(request->comm)
399 MPI_Group group = smpi_comm_group(request->comm);
400 int src_traced = smpi_group_rank(group, request->src);
401 int dst_traced = smpi_group_rank(group, request->dst);
402 int is_wait_for_receive = request->recv;
403 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
404 extra->type = TRACING_WAIT;
405 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
407 smpi_mpi_wait(&request, &status);
409 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
410 if (is_wait_for_receive) {
411 TRACE_smpi_recv(rank, src_traced, dst_traced);
415 log_timed_action (action, clock);
418 static void action_waitall(const char *const *action){
419 double clock = smpi_process_simulated_elapsed();
420 int count_requests=0;
423 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
425 if (count_requests>0) {
426 MPI_Request requests[count_requests];
427 MPI_Status status[count_requests];
429 /* The reqq is an array of dynars. Its index corresponds to the rank.
430 Thus each rank saves its own requests to the array request. */
431 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
434 //save information from requests
436 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
437 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
438 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
439 for (i = 0; i < count_requests; i++) {
441 int *asrc = xbt_new(int, 1);
442 int *adst = xbt_new(int, 1);
443 int *arecv = xbt_new(int, 1);
444 *asrc = requests[i]->src;
445 *adst = requests[i]->dst;
446 *arecv = requests[i]->recv;
447 xbt_dynar_insert_at(srcs, i, asrc);
448 xbt_dynar_insert_at(dsts, i, adst);
449 xbt_dynar_insert_at(recvs, i, arecv);
454 int *t = xbt_new(int, 1);
455 xbt_dynar_insert_at(srcs, i, t);
456 xbt_dynar_insert_at(dsts, i, t);
457 xbt_dynar_insert_at(recvs, i, t);
461 int rank_traced = smpi_process_index();
462 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
463 extra->type = TRACING_WAITALL;
464 extra->send_size=count_requests;
465 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
468 smpi_mpi_waitall(count_requests, requests, status);
471 for (i = 0; i < count_requests; i++) {
472 int src_traced, dst_traced, is_wait_for_receive;
473 xbt_dynar_get_cpy(srcs, i, &src_traced);
474 xbt_dynar_get_cpy(dsts, i, &dst_traced);
475 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
476 if (is_wait_for_receive) {
477 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
480 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
482 xbt_dynar_free(&srcs);
483 xbt_dynar_free(&dsts);
484 xbt_dynar_free(&recvs);
487 int freedrank=smpi_process_index();
488 xbt_dynar_free_container(&(reqq[freedrank]));
489 reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
491 log_timed_action (action, clock);
494 static void action_barrier(const char *const *action){
495 double clock = smpi_process_simulated_elapsed();
497 int rank = smpi_process_index();
498 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
499 extra->type = TRACING_BARRIER;
500 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
502 mpi_coll_barrier_fun(MPI_COMM_WORLD);
504 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
507 log_timed_action (action, clock);
511 static void action_bcast(const char *const *action)
513 double size = parse_double(action[2]);
514 double clock = smpi_process_simulated_elapsed();
517 * Initialize MPI_CURRENT_TYPE in order to decrease
518 * the number of the checks
520 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
523 root= atoi(action[3]);
525 MPI_CURRENT_TYPE=decode_datatype(action[4]);
530 int rank = smpi_process_index();
531 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
533 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
534 extra->type = TRACING_BCAST;
535 extra->send_size = size;
536 extra->root = root_traced;
537 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
538 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
541 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
542 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
544 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
546 log_timed_action (action, clock);
549 static void action_reduce(const char *const *action)
551 double comm_size = parse_double(action[2]);
552 double comp_size = parse_double(action[3]);
553 double clock = smpi_process_simulated_elapsed();
555 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
558 root= atoi(action[4]);
560 MPI_CURRENT_TYPE=decode_datatype(action[5]);
567 int rank = smpi_process_index();
568 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
569 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
570 extra->type = TRACING_REDUCE;
571 extra->send_size = comm_size;
572 extra->comp_size = comp_size;
573 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
574 extra->root = root_traced;
576 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
578 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
579 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
580 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
581 smpi_execute_flops(comp_size);
583 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
585 log_timed_action (action, clock);
588 static void action_allReduce(const char *const *action) {
589 double comm_size = parse_double(action[2]);
590 double comp_size = parse_double(action[3]);
592 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
593 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
595 double clock = smpi_process_simulated_elapsed();
597 int rank = smpi_process_index();
598 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
599 extra->type = TRACING_ALLREDUCE;
600 extra->send_size = comm_size;
601 extra->comp_size = comp_size;
602 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
604 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
606 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
607 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
608 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
609 smpi_execute_flops(comp_size);
611 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
613 log_timed_action (action, clock);
616 static void action_allToAll(const char *const *action) {
617 double clock = smpi_process_simulated_elapsed();
618 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
619 int send_size = parse_double(action[2]);
620 int recv_size = parse_double(action[3]);
621 MPI_Datatype MPI_CURRENT_TYPE2;
624 MPI_CURRENT_TYPE=decode_datatype(action[4]);
625 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
628 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
629 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
631 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
632 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
635 int rank = smpi_process_index();
636 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
637 extra->type = TRACING_ALLTOALL;
638 extra->send_size = send_size;
639 extra->recv_size = recv_size;
640 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
641 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
643 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
646 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
649 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
651 log_timed_action (action, clock);
656 static void action_gather(const char *const *action) {
658 The structure of the gather action for the rank 0 (total 4 processes)
663 1) 68 is the sendcounts
664 2) 68 is the recvcounts
665 3) 0 is the root node
666 4) 0 is the send datatype id, see decode_datatype()
667 5) 0 is the recv datatype id, see decode_datatype()
669 double clock = smpi_process_simulated_elapsed();
670 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
671 int send_size = parse_double(action[2]);
672 int recv_size = parse_double(action[3]);
673 MPI_Datatype MPI_CURRENT_TYPE2;
675 MPI_CURRENT_TYPE=decode_datatype(action[5]);
676 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
678 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
679 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
681 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
684 int root=atoi(action[4]);
685 int rank = smpi_comm_rank(MPI_COMM_WORLD);
688 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
691 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
692 extra->type = TRACING_GATHER;
693 extra->send_size = send_size;
694 extra->recv_size = recv_size;
696 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
697 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
699 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
701 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
702 recv, recv_size, MPI_CURRENT_TYPE2,
703 root, MPI_COMM_WORLD);
706 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
708 log_timed_action (action, clock);
714 static void action_gatherv(const char *const *action) {
716 The structure of the gatherv action for the rank 0 (total 4 processes)
718 0 gather 68 68 10 10 10 0 0 0
721 1) 68 is the sendcount
722 2) 68 10 10 10 is the recvcounts
723 3) 0 is the root node
724 4) 0 is the send datatype id, see decode_datatype()
725 5) 0 is the recv datatype id, see decode_datatype()
727 double clock = smpi_process_simulated_elapsed();
728 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
729 int send_size = parse_double(action[2]);
730 int *disps = xbt_new0(int, comm_size);
731 int *recvcounts = xbt_new0(int, comm_size);
734 MPI_Datatype MPI_CURRENT_TYPE2;
735 if(action[4+comm_size]) {
736 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
737 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
739 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
740 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
742 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
744 for(i=0;i<comm_size;i++) {
745 recvcounts[i] = atoi(action[i+3]);
746 recv_sum=recv_sum+recvcounts[i];
750 int root=atoi(action[3+comm_size]);
751 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
754 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
757 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
758 extra->type = TRACING_GATHERV;
759 extra->send_size = send_size;
760 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
761 for(i=0; i< comm_size; i++)//copy data to avoid bad free
762 extra->recvcounts[i] = recvcounts[i];
764 extra->num_processes = comm_size;
765 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
766 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
768 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
770 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
771 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
772 root, MPI_COMM_WORLD);
775 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
778 log_timed_action (action, clock);
779 xbt_free(recvcounts);
783 static void action_reducescatter(const char *const *action) {
786 The structure of the reducescatter action for the rank 0 (total 4 processes)
788 0 reduceScatter 275427 275427 275427 204020 11346849 0
791 1) The first four values after the name of the action declare the recvcounts array
792 2) The value 11346849 is the amount of instructions
793 3) The last value corresponds to the datatype, see decode_datatype().
795 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
799 double clock = smpi_process_simulated_elapsed();
800 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
801 int comp_size = parse_double(action[2+comm_size]);
802 int *recvcounts = xbt_new0(int, comm_size);
803 int *disps = xbt_new0(int, comm_size);
805 int rank = smpi_process_index();
807 if(action[3+comm_size])
808 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
810 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
812 for(i=0;i<comm_size;i++) {
813 recvcounts[i] = atoi(action[i+2]);
819 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
820 extra->type = TRACING_REDUCE_SCATTER;
821 extra->send_size = 0;
822 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
823 for(i=0; i< comm_size; i++)//copy data to avoid bad free
824 extra->recvcounts[i] = recvcounts[i];
825 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
826 extra->comp_size = comp_size;
827 extra->num_processes = comm_size;
829 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
831 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
832 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
834 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
836 smpi_execute_flops(comp_size);
840 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
842 xbt_free(recvcounts);
844 log_timed_action (action, clock);
848 static void action_allgatherv(const char *const *action) {
851 The structure of the allgatherv action for the rank 0 (total 4 processes)
853 0 allGatherV 275427 275427 275427 275427 204020
856 1) 275427 is the sendcount
857 2) The next four elements declare the recvcounts array
858 3) No more values mean that the datatype for sent and receive buffer
859 is the default one, see decode_datatype().
863 double clock = smpi_process_simulated_elapsed();
865 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
867 int sendcount=atoi(action[2]);
868 int *recvcounts = xbt_new0(int, comm_size);
869 int *disps = xbt_new0(int, comm_size);
871 MPI_Datatype MPI_CURRENT_TYPE2;
873 if(action[3+comm_size]) {
874 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
875 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
877 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
878 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
880 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
882 for(i=0;i<comm_size;i++) {
883 recvcounts[i] = atoi(action[i+3]);
884 recv_sum=recv_sum+recvcounts[i];
886 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
889 int rank = smpi_process_index();
890 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
891 extra->type = TRACING_ALLGATHERV;
892 extra->send_size = sendcount;
893 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
894 for(i=0; i< comm_size; i++)//copy data to avoid bad free
895 extra->recvcounts[i] = recvcounts[i];
896 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
897 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
898 extra->num_processes = comm_size;
900 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
903 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
906 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
909 log_timed_action (action, clock);
910 xbt_free(recvcounts);
915 static void action_allToAllv(const char *const *action) {
917 The structure of the allToAllV action for the rank 0 (total 4 processes)
919 0 allToAllV 100 1 7 10 12 100 1 70 10 5
922 1) 100 is the size of the send buffer *sizeof(int),
923 2) 1 7 10 12 is the sendcounts array
924 3) 100*sizeof(int) is the size of the receiver buffer
925 4) 1 70 10 5 is the recvcounts array
930 double clock = smpi_process_simulated_elapsed();
932 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
933 int send_buf_size=0,recv_buf_size=0,i=0;
934 int *sendcounts = xbt_new0(int, comm_size);
935 int *recvcounts = xbt_new0(int, comm_size);
936 int *senddisps = xbt_new0(int, comm_size);
937 int *recvdisps = xbt_new0(int, comm_size);
939 MPI_Datatype MPI_CURRENT_TYPE2;
941 send_buf_size=parse_double(action[2]);
942 recv_buf_size=parse_double(action[3+comm_size]);
943 if(action[4+2*comm_size]) {
944 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
945 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
948 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
949 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
952 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
953 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
955 for(i=0;i<comm_size;i++) {
956 sendcounts[i] = atoi(action[i+3]);
957 recvcounts[i] = atoi(action[i+4+comm_size]);
962 int rank = smpi_process_index();
963 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
964 extra->type = TRACING_ALLTOALLV;
965 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
966 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
967 extra->num_processes = comm_size;
969 for(i=0; i< comm_size; i++){//copy data to avoid bad free
970 extra->send_size += sendcounts[i];
971 extra->sendcounts[i] = sendcounts[i];
972 extra->recv_size += recvcounts[i];
973 extra->recvcounts[i] = recvcounts[i];
975 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
976 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
978 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
980 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
981 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
984 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
987 log_timed_action (action, clock);
988 xbt_free(sendcounts);
989 xbt_free(recvcounts);
994 void smpi_replay_init(int *argc, char***argv){
995 smpi_process_init(argc, argv);
996 smpi_process_mark_as_initialized();
997 smpi_process_set_replaying(1);
999 int rank = smpi_process_index();
1000 TRACE_smpi_init(rank);
1001 TRACE_smpi_computing_init(rank);
1002 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1003 extra->type = TRACING_INIT;
1004 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1005 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1008 if (!smpi_process_index()){
1009 _xbt_replay_action_init();
1010 xbt_replay_action_register("init", action_init);
1011 xbt_replay_action_register("finalize", action_finalize);
1012 xbt_replay_action_register("comm_size", action_comm_size);
1013 xbt_replay_action_register("comm_split", action_comm_split);
1014 xbt_replay_action_register("comm_dup", action_comm_dup);
1015 xbt_replay_action_register("send", action_send);
1016 xbt_replay_action_register("Isend", action_Isend);
1017 xbt_replay_action_register("recv", action_recv);
1018 xbt_replay_action_register("Irecv", action_Irecv);
1019 xbt_replay_action_register("test", action_test);
1020 xbt_replay_action_register("wait", action_wait);
1021 xbt_replay_action_register("waitAll", action_waitall);
1022 xbt_replay_action_register("barrier", action_barrier);
1023 xbt_replay_action_register("bcast", action_bcast);
1024 xbt_replay_action_register("reduce", action_reduce);
1025 xbt_replay_action_register("allReduce", action_allReduce);
1026 xbt_replay_action_register("allToAll", action_allToAll);
1027 xbt_replay_action_register("allToAllV", action_allToAllv);
1028 xbt_replay_action_register("gather", action_gather);
1029 xbt_replay_action_register("gatherV", action_gatherv);
1030 xbt_replay_action_register("allGatherV", action_allgatherv);
1031 xbt_replay_action_register("reduceScatter", action_reducescatter);
1032 xbt_replay_action_register("compute", action_compute);
1035 //if we have a delayed start, sleep here.
1038 double value = strtod((*argv)[2], &endptr);
1039 if (*endptr != '\0')
1040 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1041 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1042 smpi_execute_flops(value);
1044 xbt_replay_action_runner(*argc, *argv);
1047 int smpi_replay_finalize(){
1048 double sim_time= 1.;
1049 /* One active process will stop. Decrease the counter*/
1050 XBT_DEBUG("There are %lu elements in reqq[*]",
1051 xbt_dynar_length(reqq[smpi_process_index()]));
1052 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1053 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1054 MPI_Request requests[count_requests];
1055 MPI_Status status[count_requests];
1058 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1059 smpi_mpi_waitall(count_requests, requests, status);
1065 if(!active_processes){
1066 /* Last process alive speaking */
1067 /* end the simulated timer */
1068 sim_time = smpi_process_simulated_elapsed();
1072 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1074 if(!active_processes){
1075 XBT_INFO("Simulation time %f", sim_time);
1076 _xbt_replay_action_exit();
1077 xbt_free(sendbuffer);
1078 xbt_free(recvbuffer);
1085 int rank = smpi_process_index();
1086 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1087 extra->type = TRACING_FINALIZE;
1088 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1090 smpi_process_finalize();
1092 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1093 TRACE_smpi_finalize(smpi_process_index());
1095 smpi_process_destroy();