1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static void log_timed_action (const char *const *action, double clock){
22 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
23 char *name = xbt_str_join_array(action, " ");
24 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30 static double parse_double(const char *string)
34 value = strtod(string, &endptr);
36 THROWF(unknown_error, 0, "%s is not a double", string);
40 static MPI_Datatype decode_datatype(const char *const action)
42 // Declared datatypes,
47 MPI_CURRENT_TYPE=MPI_DOUBLE;
50 MPI_CURRENT_TYPE=MPI_INT;
53 MPI_CURRENT_TYPE=MPI_CHAR;
56 MPI_CURRENT_TYPE=MPI_SHORT;
59 MPI_CURRENT_TYPE=MPI_LONG;
62 MPI_CURRENT_TYPE=MPI_FLOAT;
65 MPI_CURRENT_TYPE=MPI_BYTE;
68 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
71 return MPI_CURRENT_TYPE;
75 const char* encode_datatype(MPI_Datatype datatype)
78 //default type for output is set to MPI_BYTE
79 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
80 if (datatype==MPI_BYTE){
83 if(datatype==MPI_DOUBLE)
87 if(datatype==MPI_CHAR)
89 if(datatype==MPI_SHORT)
91 if(datatype==MPI_LONG)
93 if(datatype==MPI_FLOAT)
96 // default - not implemented.
97 // do not warn here as we pass in this function even for other trace formats
101 static void action_init(const char *const *action)
104 XBT_DEBUG("Initialize the counters");
106 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
107 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
109 /* start a simulated timer */
110 smpi_process_simulated_start();
111 /*initialize the number of active processes */
112 active_processes = smpi_process_count();
115 reqq=xbt_new0(xbt_dynar_t,active_processes);
117 for(i=0;i<active_processes;i++){
118 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
123 static void action_finalize(const char *const *action)
127 static void action_comm_size(const char *const *action)
129 double clock = smpi_process_simulated_elapsed();
131 communicator_size = parse_double(action[2]);
132 log_timed_action (action, clock);
135 static void action_comm_split(const char *const *action)
137 double clock = smpi_process_simulated_elapsed();
139 log_timed_action (action, clock);
142 static void action_comm_dup(const char *const *action)
144 double clock = smpi_process_simulated_elapsed();
146 log_timed_action (action, clock);
149 static void action_compute(const char *const *action)
151 double clock = smpi_process_simulated_elapsed();
152 double flops= parse_double(action[2]);
154 int rank = smpi_comm_rank(MPI_COMM_WORLD);
155 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
156 extra->type=TRACING_COMPUTING;
157 extra->comp_size=flops;
158 TRACE_smpi_computing_in(rank, extra);
160 smpi_execute_flops(flops);
162 TRACE_smpi_computing_out(rank);
165 log_timed_action (action, clock);
168 static void action_send(const char *const *action)
170 int to = atoi(action[2]);
171 double size=parse_double(action[3]);
172 double clock = smpi_process_simulated_elapsed();
175 MPI_CURRENT_TYPE=decode_datatype(action[4]);
177 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
181 int rank = smpi_comm_rank(MPI_COMM_WORLD);
183 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
184 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
185 extra->type = TRACING_SEND;
186 extra->send_size = size;
188 extra->dst = dst_traced;
189 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
190 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
191 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
194 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
196 log_timed_action (action, clock);
199 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
204 static void action_Isend(const char *const *action)
206 int to = atoi(action[2]);
207 double size=parse_double(action[3]);
208 double clock = smpi_process_simulated_elapsed();
211 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
212 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
215 int rank = smpi_comm_rank(MPI_COMM_WORLD);
216 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
217 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
218 extra->type = TRACING_ISEND;
219 extra->send_size = size;
221 extra->dst = dst_traced;
222 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
223 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
224 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
227 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
234 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
236 log_timed_action (action, clock);
239 static void action_recv(const char *const *action) {
240 int from = atoi(action[2]);
241 double size=parse_double(action[3]);
242 double clock = smpi_process_simulated_elapsed();
245 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
246 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
249 int rank = smpi_comm_rank(MPI_COMM_WORLD);
250 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
252 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253 extra->type = TRACING_RECV;
254 extra->send_size = size;
255 extra->src = src_traced;
257 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
258 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
261 //unknow size from the receiver pov
263 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
267 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
270 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
271 TRACE_smpi_recv(rank, src_traced, rank);
274 log_timed_action (action, clock);
277 static void action_Irecv(const char *const *action)
279 int from = atoi(action[2]);
280 double size=parse_double(action[3]);
281 double clock = smpi_process_simulated_elapsed();
284 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
285 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
288 int rank = smpi_comm_rank(MPI_COMM_WORLD);
289 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
290 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291 extra->type = TRACING_IRECV;
292 extra->send_size = size;
293 extra->src = src_traced;
295 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
296 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
299 //unknow size from the receiver pov
301 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
305 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
308 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
311 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
313 log_timed_action (action, clock);
316 static void action_test(const char *const *action){
317 double clock = smpi_process_simulated_elapsed();
322 request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
323 xbt_assert(request != NULL, "found null request in reqq");
326 int rank = smpi_comm_rank(MPI_COMM_WORLD);
327 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328 extra->type=TRACING_TEST;
329 TRACE_smpi_testing_in(rank, extra);
331 flag = smpi_mpi_test(&request, &status);
332 XBT_DEBUG("MPI_Test result: %d", flag);
333 /* push back request in dynar to be caught by a subsequent wait. if the test
334 * did succeed, the request is now NULL.
336 xbt_dynar_push_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request, request);
339 TRACE_smpi_testing_out(rank);
342 log_timed_action (action, clock);
345 static void action_wait(const char *const *action){
346 double clock = smpi_process_simulated_elapsed();
350 xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
351 "action wait not preceded by any irecv or isend: %s",
352 xbt_str_join_array(action," "));
353 request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
356 /* Assuming that the trace is well formed, this mean the comm might have
357 * been caught by a MPI_test. Then just return.
363 int rank = request->comm != MPI_COMM_NULL
364 ? smpi_comm_rank(request->comm)
367 MPI_Group group = smpi_comm_group(request->comm);
368 int src_traced = smpi_group_rank(group, request->src);
369 int dst_traced = smpi_group_rank(group, request->dst);
370 int is_wait_for_receive = request->recv;
371 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
372 extra->type = TRACING_WAIT;
373 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
375 smpi_mpi_wait(&request, &status);
377 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
378 if (is_wait_for_receive) {
379 TRACE_smpi_recv(rank, src_traced, dst_traced);
383 log_timed_action (action, clock);
386 static void action_waitall(const char *const *action){
387 double clock = smpi_process_simulated_elapsed();
388 int count_requests=0;
391 count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
393 if (count_requests>0) {
394 MPI_Request requests[count_requests];
395 MPI_Status status[count_requests];
397 /* The reqq is an array of dynars. Its index corresponds to the rank.
398 Thus each rank saves its own requests to the array request. */
399 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
402 //save information from requests
404 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
405 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
406 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
407 for (i = 0; i < count_requests; i++) {
409 int *asrc = xbt_new(int, 1);
410 int *adst = xbt_new(int, 1);
411 int *arecv = xbt_new(int, 1);
412 *asrc = requests[i]->src;
413 *adst = requests[i]->dst;
414 *arecv = requests[i]->recv;
415 xbt_dynar_insert_at(srcs, i, asrc);
416 xbt_dynar_insert_at(dsts, i, adst);
417 xbt_dynar_insert_at(recvs, i, arecv);
422 int *t = xbt_new(int, 1);
423 xbt_dynar_insert_at(srcs, i, t);
424 xbt_dynar_insert_at(dsts, i, t);
425 xbt_dynar_insert_at(recvs, i, t);
429 int rank_traced = smpi_process_index();
430 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
431 extra->type = TRACING_WAITALL;
432 extra->send_size=count_requests;
433 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
436 smpi_mpi_waitall(count_requests, requests, status);
439 for (i = 0; i < count_requests; i++) {
440 int src_traced, dst_traced, is_wait_for_receive;
441 xbt_dynar_get_cpy(srcs, i, &src_traced);
442 xbt_dynar_get_cpy(dsts, i, &dst_traced);
443 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
444 if (is_wait_for_receive) {
445 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
448 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
450 xbt_dynar_free(&srcs);
451 xbt_dynar_free(&dsts);
452 xbt_dynar_free(&recvs);
455 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
457 log_timed_action (action, clock);
460 static void action_barrier(const char *const *action){
461 double clock = smpi_process_simulated_elapsed();
463 int rank = smpi_comm_rank(MPI_COMM_WORLD);
464 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
465 extra->type = TRACING_BARRIER;
466 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
468 mpi_coll_barrier_fun(MPI_COMM_WORLD);
470 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
473 log_timed_action (action, clock);
477 static void action_bcast(const char *const *action)
479 double size = parse_double(action[2]);
480 double clock = smpi_process_simulated_elapsed();
483 * Initialize MPI_CURRENT_TYPE in order to decrease
484 * the number of the checks
486 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
489 root= atoi(action[3]);
491 MPI_CURRENT_TYPE=decode_datatype(action[4]);
496 int rank = smpi_comm_rank(MPI_COMM_WORLD);
497 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
499 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
500 extra->type = TRACING_BCAST;
501 extra->send_size = size;
502 extra->root = root_traced;
503 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
504 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
508 mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
510 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
513 log_timed_action (action, clock);
516 static void action_reduce(const char *const *action)
518 double comm_size = parse_double(action[2]);
519 double comp_size = parse_double(action[3]);
520 double clock = smpi_process_simulated_elapsed();
522 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
525 root= atoi(action[4]);
527 MPI_CURRENT_TYPE=decode_datatype(action[5]);
532 int rank = smpi_comm_rank(MPI_COMM_WORLD);
533 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
534 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
535 extra->type = TRACING_REDUCE;
536 extra->send_size = comm_size;
537 extra->comp_size = comp_size;
538 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
539 extra->root = root_traced;
541 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
543 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
544 smpi_execute_flops(comp_size);
546 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
549 log_timed_action (action, clock);
552 static void action_allReduce(const char *const *action) {
553 double comm_size = parse_double(action[2]);
554 double comp_size = parse_double(action[3]);
556 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
557 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
559 double clock = smpi_process_simulated_elapsed();
561 int rank = smpi_comm_rank(MPI_COMM_WORLD);
562 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
563 extra->type = TRACING_ALLREDUCE;
564 extra->send_size = comm_size;
565 extra->comp_size = comp_size;
566 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
568 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
570 mpi_coll_allreduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
571 smpi_execute_flops(comp_size);
573 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
576 log_timed_action (action, clock);
579 static void action_allToAll(const char *const *action) {
580 double clock = smpi_process_simulated_elapsed();
581 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
582 int send_size = parse_double(action[2]);
583 int recv_size = parse_double(action[3]);
584 MPI_Datatype MPI_CURRENT_TYPE2;
587 MPI_CURRENT_TYPE=decode_datatype(action[4]);
588 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
591 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
592 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
594 void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));
595 void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
598 int rank = smpi_process_index();
599 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
600 extra->type = TRACING_ALLTOALL;
601 extra->send_size = send_size;
602 extra->recv_size = recv_size;
603 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
604 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
606 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
609 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
612 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
615 log_timed_action (action, clock);
621 static void action_gather(const char *const *action) {
623 The structure of the gather action for the rank 0 (total 4 processes)
628 1) 68 is the sendcounts
629 2) 68 is the recvcounts
630 3) 0 is the root node
631 4) 0 is the send datatype id, see decode_datatype()
632 5) 0 is the recv datatype id, see decode_datatype()
634 double clock = smpi_process_simulated_elapsed();
635 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
636 int send_size = parse_double(action[2]);
637 int recv_size = parse_double(action[3]);
638 MPI_Datatype MPI_CURRENT_TYPE2;
640 MPI_CURRENT_TYPE=decode_datatype(action[5]);
641 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
643 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
644 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
646 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
649 int root=atoi(action[4]);
650 int rank = smpi_process_index();
653 recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
656 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
657 extra->type = TRACING_GATHER;
658 extra->send_size = send_size;
659 extra->recv_size = recv_size;
661 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
662 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
664 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
666 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
667 recv, recv_size, MPI_CURRENT_TYPE2,
668 root, MPI_COMM_WORLD);
671 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
674 log_timed_action (action, clock);
681 static void action_gatherv(const char *const *action) {
683 The structure of the gatherv action for the rank 0 (total 4 processes)
685 0 gather 68 68 10 10 10 0 0 0
688 1) 68 is the sendcount
689 2) 68 10 10 10 is the recvcounts
690 3) 0 is the root node
691 4) 0 is the send datatype id, see decode_datatype()
692 5) 0 is the recv datatype id, see decode_datatype()
694 double clock = smpi_process_simulated_elapsed();
695 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
696 int send_size = parse_double(action[2]);
697 int *disps = xbt_new0(int, comm_size);
698 int *recvcounts = xbt_new0(int, comm_size);
701 MPI_Datatype MPI_CURRENT_TYPE2;
702 if(action[4+comm_size]) {
703 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
704 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
706 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
707 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
709 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
711 for(i=0;i<comm_size;i++) {
712 recvcounts[i] = atoi(action[i+3]);
713 recv_sum=recv_sum+recvcounts[i];
717 int root=atoi(action[3+comm_size]);
718 int rank = smpi_process_index();
721 recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
724 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
725 extra->type = TRACING_GATHERV;
726 extra->send_size = send_size;
727 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
728 for(i=0; i< comm_size; i++)//copy data to avoid bad free
729 extra->recvcounts[i] = recvcounts[i];
731 extra->num_processes = comm_size;
732 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
733 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
735 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
737 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
738 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
739 root, MPI_COMM_WORLD);
742 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
745 log_timed_action (action, clock);
746 xbt_free(recvcounts);
753 static void action_reducescatter(const char *const *action) {
756 The structure of the reducescatter action for the rank 0 (total 4 processes)
758 0 reduceScatter 275427 275427 275427 204020 11346849 0
761 1) The first four values after the name of the action declare the recvcounts array
762 2) The value 11346849 is the amount of instructions
763 3) The last value corresponds to the datatype, see decode_datatype().
765 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
769 double clock = smpi_process_simulated_elapsed();
770 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
771 int comp_size = parse_double(action[2+comm_size]);
772 int *recvcounts = xbt_new0(int, comm_size);
773 int *disps = xbt_new0(int, comm_size);
776 int rank = smpi_process_index();
778 if(action[3+comm_size])
779 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
781 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
783 for(i=0;i<comm_size;i++) {
784 recvcounts[i] = atoi(action[i+2]);
785 recv_sum=recv_sum+recvcounts[i];
790 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
791 extra->type = TRACING_REDUCE_SCATTER;
792 extra->send_size = 0;
793 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
794 for(i=0; i< comm_size; i++)//copy data to avoid bad free
795 extra->recvcounts[i] = recvcounts[i];
796 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
797 extra->comp_size = comp_size;
798 extra->num_processes = comm_size;
801 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
803 mpi_coll_reduce_scatter_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
805 smpi_execute_flops(comp_size);
809 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
811 xbt_free(recvcounts);
813 log_timed_action (action, clock);
817 static void action_allgatherv(const char *const *action) {
820 The structure of the allgatherv action for the rank 0 (total 4 processes)
822 0 allGatherV 275427 275427 275427 275427 204020
825 1) 275427 is the sendcount
826 2) The next four elements declare the recvcounts array
827 3) No more values mean that the datatype for sent and receive buffer
828 is the default one, see decode_datatype().
832 double clock = smpi_process_simulated_elapsed();
834 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
836 int sendcount=atoi(action[2]);
837 int *recvcounts = xbt_new0(int, comm_size);
838 int *disps = xbt_new0(int, comm_size);
840 MPI_Datatype MPI_CURRENT_TYPE2;
842 if(action[3+comm_size]) {
843 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
844 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
846 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
847 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
849 void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));
851 for(i=0;i<comm_size;i++) {
852 recvcounts[i] = atoi(action[i+3]);
853 recv_sum=recv_sum+recvcounts[i];
855 void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
858 int rank = smpi_process_index();
859 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
860 extra->type = TRACING_ALLGATHERV;
861 extra->send_size = sendcount;
862 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
863 for(i=0; i< comm_size; i++)//copy data to avoid bad free
864 extra->recvcounts[i] = recvcounts[i];
865 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
866 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
867 extra->num_processes = comm_size;
869 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
872 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
875 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
878 log_timed_action (action, clock);
881 xbt_free(recvcounts);
886 static void action_allToAllv(const char *const *action) {
888 The structure of the allToAllV action for the rank 0 (total 4 processes)
890 0 allToAllV 100 1 7 10 12 100 1 70 10 5
893 1) 100 is the size of the send buffer *sizeof(int),
894 2) 1 7 10 12 is the sendcounts array
895 3) 100*sizeof(int) is the size of the receiver buffer
896 4) 1 70 10 5 is the recvcounts array
901 double clock = smpi_process_simulated_elapsed();
903 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
904 int send_buf_size=0,recv_buf_size=0,i=0;
905 int *sendcounts = xbt_new0(int, comm_size);
906 int *recvcounts = xbt_new0(int, comm_size);
907 int *senddisps = xbt_new0(int, comm_size);
908 int *recvdisps = xbt_new0(int, comm_size);
910 MPI_Datatype MPI_CURRENT_TYPE2;
912 send_buf_size=parse_double(action[2]);
913 recv_buf_size=parse_double(action[3+comm_size]);
914 if(action[4+2*comm_size]) {
915 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
916 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
919 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
920 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
923 void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));
924 void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
926 for(i=0;i<comm_size;i++) {
927 sendcounts[i] = atoi(action[i+3]);
928 recvcounts[i] = atoi(action[i+4+comm_size]);
933 int rank = smpi_process_index();
934 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
935 extra->type = TRACING_ALLTOALLV;
936 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
937 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
938 extra->num_processes = comm_size;
940 for(i=0; i< comm_size; i++){//copy data to avoid bad free
941 extra->send_size += sendcounts[i];
942 extra->sendcounts[i] = sendcounts[i];
943 extra->recv_size += recvcounts[i];
944 extra->recvcounts[i] = recvcounts[i];
946 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
947 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
949 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
951 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
952 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
955 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
958 log_timed_action (action, clock);
961 xbt_free(sendcounts);
962 xbt_free(recvcounts);
967 void smpi_replay_init(int *argc, char***argv){
968 smpi_process_init(argc, argv);
969 smpi_process_mark_as_initialized();
971 int rank = smpi_process_index();
972 TRACE_smpi_init(rank);
973 TRACE_smpi_computing_init(rank);
974 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
975 extra->type = TRACING_INIT;
976 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
977 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
980 if (!smpi_process_index()){
981 _xbt_replay_action_init();
982 xbt_replay_action_register("init", action_init);
983 xbt_replay_action_register("finalize", action_finalize);
984 xbt_replay_action_register("comm_size", action_comm_size);
985 xbt_replay_action_register("comm_split", action_comm_split);
986 xbt_replay_action_register("comm_dup", action_comm_dup);
987 xbt_replay_action_register("send", action_send);
988 xbt_replay_action_register("Isend", action_Isend);
989 xbt_replay_action_register("recv", action_recv);
990 xbt_replay_action_register("Irecv", action_Irecv);
991 xbt_replay_action_register("test", action_test);
992 xbt_replay_action_register("wait", action_wait);
993 xbt_replay_action_register("waitAll", action_waitall);
994 xbt_replay_action_register("barrier", action_barrier);
995 xbt_replay_action_register("bcast", action_bcast);
996 xbt_replay_action_register("reduce", action_reduce);
997 xbt_replay_action_register("allReduce", action_allReduce);
998 xbt_replay_action_register("allToAll", action_allToAll);
999 xbt_replay_action_register("allToAllV", action_allToAllv);
1000 xbt_replay_action_register("gather", action_gather);
1001 xbt_replay_action_register("gatherV", action_gatherv);
1002 xbt_replay_action_register("allGatherV", action_allgatherv);
1003 xbt_replay_action_register("reduceScatter", action_reducescatter);
1004 xbt_replay_action_register("compute", action_compute);
1007 xbt_replay_action_runner(*argc, *argv);
1010 int smpi_replay_finalize(){
1011 double sim_time= 1.;
1012 /* One active process will stop. Decrease the counter*/
1013 XBT_DEBUG("There are %lu elements in reqq[*]",
1014 xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
1015 if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
1016 int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
1017 MPI_Request requests[count_requests];
1018 MPI_Status status[count_requests];
1021 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
1022 smpi_mpi_waitall(count_requests, requests, status);
1028 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
1030 if(!active_processes){
1031 /* Last process alive speaking */
1032 /* end the simulated timer */
1033 sim_time = smpi_process_simulated_elapsed();
1034 XBT_INFO("Simulation time %f", sim_time);
1035 _xbt_replay_action_exit();
1039 smpi_mpi_barrier(MPI_COMM_WORLD);
1041 int rank = smpi_process_index();
1042 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1043 extra->type = TRACING_FINALIZE;
1044 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1046 smpi_process_finalize();
1048 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1049 TRACE_smpi_finalize(smpi_process_index());
1051 smpi_process_destroy();