1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 static void* get_sendbuffer(int size){
36 if (sendbuffer_size<size){
37 sendbuffer=xbt_realloc(sendbuffer,size);
42 //allocate a single buffer for all recv
43 static void* get_recvbuffer(int size){
44 if (recvbuffer_size<size){
45 recvbuffer=xbt_realloc(recvbuffer,size);
52 static double parse_double(const char *string)
56 value = strtod(string, &endptr);
58 THROWF(unknown_error, 0, "%s is not a double", string);
62 static MPI_Datatype decode_datatype(const char *const action)
64 // Declared datatypes,
69 MPI_CURRENT_TYPE=MPI_DOUBLE;
72 MPI_CURRENT_TYPE=MPI_INT;
75 MPI_CURRENT_TYPE=MPI_CHAR;
78 MPI_CURRENT_TYPE=MPI_SHORT;
81 MPI_CURRENT_TYPE=MPI_LONG;
84 MPI_CURRENT_TYPE=MPI_FLOAT;
87 MPI_CURRENT_TYPE=MPI_BYTE;
90 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
93 return MPI_CURRENT_TYPE;
97 const char* encode_datatype(MPI_Datatype datatype)
100 //default type for output is set to MPI_BYTE
101 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
102 if (datatype==MPI_BYTE){
105 if(datatype==MPI_DOUBLE)
107 if(datatype==MPI_INT)
109 if(datatype==MPI_CHAR)
111 if(datatype==MPI_SHORT)
113 if(datatype==MPI_LONG)
115 if(datatype==MPI_FLOAT)
118 // default - not implemented.
119 // do not warn here as we pass in this function even for other trace formats
123 static void action_init(const char *const *action)
126 XBT_DEBUG("Initialize the counters");
128 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
129 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
131 /* start a simulated timer */
132 smpi_process_simulated_start();
133 /*initialize the number of active processes */
134 active_processes = smpi_process_count();
137 reqq=xbt_new0(xbt_dynar_t,active_processes);
139 for(i=0;i<active_processes;i++){
140 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
145 static void action_finalize(const char *const *action)
149 static void action_comm_size(const char *const *action)
151 double clock = smpi_process_simulated_elapsed();
153 communicator_size = parse_double(action[2]);
154 log_timed_action (action, clock);
157 static void action_comm_split(const char *const *action)
159 double clock = smpi_process_simulated_elapsed();
161 log_timed_action (action, clock);
164 static void action_comm_dup(const char *const *action)
166 double clock = smpi_process_simulated_elapsed();
168 log_timed_action (action, clock);
171 static void action_compute(const char *const *action)
173 double clock = smpi_process_simulated_elapsed();
174 double flops= parse_double(action[2]);
176 int rank = smpi_process_index();
177 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
178 extra->type=TRACING_COMPUTING;
179 extra->comp_size=flops;
180 TRACE_smpi_computing_in(rank, extra);
182 smpi_execute_flops(flops);
184 TRACE_smpi_computing_out(rank);
187 log_timed_action (action, clock);
190 static void action_send(const char *const *action)
192 int to = atoi(action[2]);
193 double size=parse_double(action[3]);
194 double clock = smpi_process_simulated_elapsed();
197 MPI_CURRENT_TYPE=decode_datatype(action[4]);
199 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
203 int rank = smpi_process_index();
205 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
206 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
207 extra->type = TRACING_SEND;
208 extra->send_size = size;
210 extra->dst = dst_traced;
211 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
212 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
213 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
216 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
218 log_timed_action (action, clock);
221 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
226 static void action_Isend(const char *const *action)
228 int to = atoi(action[2]);
229 double size=parse_double(action[3]);
230 double clock = smpi_process_simulated_elapsed();
233 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
234 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
237 int rank = smpi_process_index();
238 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
239 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
240 extra->type = TRACING_ISEND;
241 extra->send_size = size;
243 extra->dst = dst_traced;
244 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
245 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
246 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
249 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
252 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
256 xbt_dynar_push(reqq[smpi_process_index()],&request);
258 log_timed_action (action, clock);
261 static void action_recv(const char *const *action) {
262 int from = atoi(action[2]);
263 double size=parse_double(action[3]);
264 double clock = smpi_process_simulated_elapsed();
267 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
268 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
271 int rank = smpi_process_index();
272 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
274 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
275 extra->type = TRACING_RECV;
276 extra->send_size = size;
277 extra->src = src_traced;
279 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
280 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
283 //unknow size from the receiver pov
285 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
289 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
292 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
293 TRACE_smpi_recv(rank, src_traced, rank);
296 log_timed_action (action, clock);
299 static void action_Irecv(const char *const *action)
301 int from = atoi(action[2]);
302 double size=parse_double(action[3]);
303 double clock = smpi_process_simulated_elapsed();
306 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
307 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
310 int rank = smpi_process_index();
311 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
312 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
313 extra->type = TRACING_IRECV;
314 extra->send_size = size;
315 extra->src = src_traced;
317 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
318 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
321 //unknow size from the receiver pov
323 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
327 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
330 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
333 xbt_dynar_push(reqq[smpi_process_index()],&request);
335 log_timed_action (action, clock);
338 static void action_test(const char *const *action){
339 double clock = smpi_process_simulated_elapsed();
344 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
345 xbt_assert(request != NULL, "found null request in reqq");
348 int rank = smpi_process_index();
349 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
350 extra->type=TRACING_TEST;
351 TRACE_smpi_testing_in(rank, extra);
353 flag = smpi_mpi_test(&request, &status);
354 XBT_DEBUG("MPI_Test result: %d", flag);
355 /* push back request in dynar to be caught by a subsequent wait. if the test
356 * did succeed, the request is now NULL.
358 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
361 TRACE_smpi_testing_out(rank);
364 log_timed_action (action, clock);
367 static void action_wait(const char *const *action){
368 double clock = smpi_process_simulated_elapsed();
372 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
373 "action wait not preceded by any irecv or isend: %s",
374 xbt_str_join_array(action," "));
375 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
378 /* Assuming that the trace is well formed, this mean the comm might have
379 * been caught by a MPI_test. Then just return.
385 int rank = request->comm != MPI_COMM_NULL
386 ? smpi_comm_rank(request->comm)
389 MPI_Group group = smpi_comm_group(request->comm);
390 int src_traced = smpi_group_rank(group, request->src);
391 int dst_traced = smpi_group_rank(group, request->dst);
392 int is_wait_for_receive = request->recv;
393 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
394 extra->type = TRACING_WAIT;
395 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
397 smpi_mpi_wait(&request, &status);
399 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
400 if (is_wait_for_receive) {
401 TRACE_smpi_recv(rank, src_traced, dst_traced);
405 log_timed_action (action, clock);
408 static void action_waitall(const char *const *action){
409 double clock = smpi_process_simulated_elapsed();
410 int count_requests=0;
413 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
415 if (count_requests>0) {
416 MPI_Request requests[count_requests];
417 MPI_Status status[count_requests];
419 /* The reqq is an array of dynars. Its index corresponds to the rank.
420 Thus each rank saves its own requests to the array request. */
421 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
424 //save information from requests
426 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
427 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
428 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
429 for (i = 0; i < count_requests; i++) {
431 int *asrc = xbt_new(int, 1);
432 int *adst = xbt_new(int, 1);
433 int *arecv = xbt_new(int, 1);
434 *asrc = requests[i]->src;
435 *adst = requests[i]->dst;
436 *arecv = requests[i]->recv;
437 xbt_dynar_insert_at(srcs, i, asrc);
438 xbt_dynar_insert_at(dsts, i, adst);
439 xbt_dynar_insert_at(recvs, i, arecv);
444 int *t = xbt_new(int, 1);
445 xbt_dynar_insert_at(srcs, i, t);
446 xbt_dynar_insert_at(dsts, i, t);
447 xbt_dynar_insert_at(recvs, i, t);
451 int rank_traced = smpi_process_index();
452 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
453 extra->type = TRACING_WAITALL;
454 extra->send_size=count_requests;
455 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
458 smpi_mpi_waitall(count_requests, requests, status);
461 for (i = 0; i < count_requests; i++) {
462 int src_traced, dst_traced, is_wait_for_receive;
463 xbt_dynar_get_cpy(srcs, i, &src_traced);
464 xbt_dynar_get_cpy(dsts, i, &dst_traced);
465 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
466 if (is_wait_for_receive) {
467 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
470 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
472 xbt_dynar_free(&srcs);
473 xbt_dynar_free(&dsts);
474 xbt_dynar_free(&recvs);
477 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
479 log_timed_action (action, clock);
482 static void action_barrier(const char *const *action){
483 double clock = smpi_process_simulated_elapsed();
485 int rank = smpi_process_index();
486 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
487 extra->type = TRACING_BARRIER;
488 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
490 mpi_coll_barrier_fun(MPI_COMM_WORLD);
492 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
495 log_timed_action (action, clock);
499 static void action_bcast(const char *const *action)
501 double size = parse_double(action[2]);
502 double clock = smpi_process_simulated_elapsed();
505 * Initialize MPI_CURRENT_TYPE in order to decrease
506 * the number of the checks
508 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
511 root= atoi(action[3]);
513 MPI_CURRENT_TYPE=decode_datatype(action[4]);
518 int rank = smpi_process_index();
519 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
521 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
522 extra->type = TRACING_BCAST;
523 extra->send_size = size;
524 extra->root = root_traced;
525 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
526 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
530 mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
532 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
535 log_timed_action (action, clock);
538 static void action_reduce(const char *const *action)
540 double comm_size = parse_double(action[2]);
541 double comp_size = parse_double(action[3]);
542 double clock = smpi_process_simulated_elapsed();
544 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
547 root= atoi(action[4]);
549 MPI_CURRENT_TYPE=decode_datatype(action[5]);
554 int rank = smpi_process_index();
555 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
556 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
557 extra->type = TRACING_REDUCE;
558 extra->send_size = comm_size;
559 extra->comp_size = comp_size;
560 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
561 extra->root = root_traced;
563 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
565 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
566 smpi_execute_flops(comp_size);
568 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
571 log_timed_action (action, clock);
574 static void action_allReduce(const char *const *action) {
575 double comm_size = parse_double(action[2]);
576 double comp_size = parse_double(action[3]);
578 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
579 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
581 double clock = smpi_process_simulated_elapsed();
583 int rank = smpi_process_index();
584 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
585 extra->type = TRACING_ALLREDUCE;
586 extra->send_size = comm_size;
587 extra->comp_size = comp_size;
588 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
590 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
592 mpi_coll_allreduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
593 smpi_execute_flops(comp_size);
595 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
598 log_timed_action (action, clock);
601 static void action_allToAll(const char *const *action) {
602 double clock = smpi_process_simulated_elapsed();
603 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
604 int send_size = parse_double(action[2]);
605 int recv_size = parse_double(action[3]);
606 MPI_Datatype MPI_CURRENT_TYPE2;
609 MPI_CURRENT_TYPE=decode_datatype(action[4]);
610 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
613 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
614 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
616 void *send = get_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
617 void *recv = get_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
620 int rank = smpi_process_index();
621 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
622 extra->type = TRACING_ALLTOALL;
623 extra->send_size = send_size;
624 extra->recv_size = recv_size;
625 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
626 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
628 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
631 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
634 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
637 log_timed_action (action, clock);
642 static void action_gather(const char *const *action) {
644 The structure of the gather action for the rank 0 (total 4 processes)
649 1) 68 is the sendcounts
650 2) 68 is the recvcounts
651 3) 0 is the root node
652 4) 0 is the send datatype id, see decode_datatype()
653 5) 0 is the recv datatype id, see decode_datatype()
655 double clock = smpi_process_simulated_elapsed();
656 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
657 int send_size = parse_double(action[2]);
658 int recv_size = parse_double(action[3]);
659 MPI_Datatype MPI_CURRENT_TYPE2;
661 MPI_CURRENT_TYPE=decode_datatype(action[5]);
662 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
664 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
665 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
667 void *send = get_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
670 int root=atoi(action[4]);
671 int rank = smpi_process_index();
674 recv = get_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
677 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
678 extra->type = TRACING_GATHER;
679 extra->send_size = send_size;
680 extra->recv_size = recv_size;
682 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
683 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
685 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
687 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
688 recv, recv_size, MPI_CURRENT_TYPE2,
689 root, MPI_COMM_WORLD);
692 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
695 log_timed_action (action, clock);
701 static void action_gatherv(const char *const *action) {
703 The structure of the gatherv action for the rank 0 (total 4 processes)
705 0 gather 68 68 10 10 10 0 0 0
708 1) 68 is the sendcount
709 2) 68 10 10 10 is the recvcounts
710 3) 0 is the root node
711 4) 0 is the send datatype id, see decode_datatype()
712 5) 0 is the recv datatype id, see decode_datatype()
714 double clock = smpi_process_simulated_elapsed();
715 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
716 int send_size = parse_double(action[2]);
717 int *disps = xbt_new0(int, comm_size);
718 int *recvcounts = xbt_new0(int, comm_size);
721 MPI_Datatype MPI_CURRENT_TYPE2;
722 if(action[4+comm_size]) {
723 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
724 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
726 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
727 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
729 void *send = get_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
731 for(i=0;i<comm_size;i++) {
732 recvcounts[i] = atoi(action[i+3]);
733 recv_sum=recv_sum+recvcounts[i];
737 int root=atoi(action[3+comm_size]);
738 int rank = smpi_process_index();
741 recv = get_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
744 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
745 extra->type = TRACING_GATHERV;
746 extra->send_size = send_size;
747 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
748 for(i=0; i< comm_size; i++)//copy data to avoid bad free
749 extra->recvcounts[i] = recvcounts[i];
751 extra->num_processes = comm_size;
752 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
753 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
755 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
757 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
758 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
759 root, MPI_COMM_WORLD);
762 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
765 log_timed_action (action, clock);
766 xbt_free(recvcounts);
771 static void action_reducescatter(const char *const *action) {
774 The structure of the reducescatter action for the rank 0 (total 4 processes)
776 0 reduceScatter 275427 275427 275427 204020 11346849 0
779 1) The first four values after the name of the action declare the recvcounts array
780 2) The value 11346849 is the amount of instructions
781 3) The last value corresponds to the datatype, see decode_datatype().
783 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
787 double clock = smpi_process_simulated_elapsed();
788 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
789 int comp_size = parse_double(action[2+comm_size]);
790 int *recvcounts = xbt_new0(int, comm_size);
791 int *disps = xbt_new0(int, comm_size);
793 int rank = smpi_process_index();
795 if(action[3+comm_size])
796 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
798 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
800 for(i=0;i<comm_size;i++) {
801 recvcounts[i] = atoi(action[i+2]);
806 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
807 extra->type = TRACING_REDUCE_SCATTER;
808 extra->send_size = 0;
809 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
810 for(i=0; i< comm_size; i++)//copy data to avoid bad free
811 extra->recvcounts[i] = recvcounts[i];
812 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
813 extra->comp_size = comp_size;
814 extra->num_processes = comm_size;
817 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
819 mpi_coll_reduce_scatter_fun(NULL, NULL, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
821 smpi_execute_flops(comp_size);
825 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
827 xbt_free(recvcounts);
829 log_timed_action (action, clock);
833 static void action_allgatherv(const char *const *action) {
836 The structure of the allgatherv action for the rank 0 (total 4 processes)
838 0 allGatherV 275427 275427 275427 275427 204020
841 1) 275427 is the sendcount
842 2) The next four elements declare the recvcounts array
843 3) No more values mean that the datatype for sent and receive buffer
844 is the default one, see decode_datatype().
848 double clock = smpi_process_simulated_elapsed();
850 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
852 int sendcount=atoi(action[2]);
853 int *recvcounts = xbt_new0(int, comm_size);
854 int *disps = xbt_new0(int, comm_size);
856 MPI_Datatype MPI_CURRENT_TYPE2;
858 if(action[3+comm_size]) {
859 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
860 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
862 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
863 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
865 void *sendbuf = get_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
867 for(i=0;i<comm_size;i++) {
868 recvcounts[i] = atoi(action[i+3]);
869 recv_sum=recv_sum+recvcounts[i];
871 void *recvbuf = get_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
874 int rank = smpi_process_index();
875 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
876 extra->type = TRACING_ALLGATHERV;
877 extra->send_size = sendcount;
878 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
879 for(i=0; i< comm_size; i++)//copy data to avoid bad free
880 extra->recvcounts[i] = recvcounts[i];
881 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
882 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
883 extra->num_processes = comm_size;
885 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
888 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
891 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
894 log_timed_action (action, clock);
895 xbt_free(recvcounts);
900 static void action_allToAllv(const char *const *action) {
902 The structure of the allToAllV action for the rank 0 (total 4 processes)
904 0 allToAllV 100 1 7 10 12 100 1 70 10 5
907 1) 100 is the size of the send buffer *sizeof(int),
908 2) 1 7 10 12 is the sendcounts array
909 3) 100*sizeof(int) is the size of the receiver buffer
910 4) 1 70 10 5 is the recvcounts array
915 double clock = smpi_process_simulated_elapsed();
917 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
918 int send_buf_size=0,recv_buf_size=0,i=0;
919 int *sendcounts = xbt_new0(int, comm_size);
920 int *recvcounts = xbt_new0(int, comm_size);
921 int *senddisps = xbt_new0(int, comm_size);
922 int *recvdisps = xbt_new0(int, comm_size);
924 MPI_Datatype MPI_CURRENT_TYPE2;
926 send_buf_size=parse_double(action[2]);
927 recv_buf_size=parse_double(action[3+comm_size]);
928 if(action[4+2*comm_size]) {
929 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
930 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
933 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
934 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
937 void *sendbuf = get_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
938 void *recvbuf = get_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
940 for(i=0;i<comm_size;i++) {
941 sendcounts[i] = atoi(action[i+3]);
942 recvcounts[i] = atoi(action[i+4+comm_size]);
947 int rank = smpi_process_index();
948 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
949 extra->type = TRACING_ALLTOALLV;
950 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
951 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
952 extra->num_processes = comm_size;
954 for(i=0; i< comm_size; i++){//copy data to avoid bad free
955 extra->send_size += sendcounts[i];
956 extra->sendcounts[i] = sendcounts[i];
957 extra->recv_size += recvcounts[i];
958 extra->recvcounts[i] = recvcounts[i];
960 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
961 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
963 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
965 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
966 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
969 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
972 log_timed_action (action, clock);
973 xbt_free(sendcounts);
974 xbt_free(recvcounts);
979 void smpi_replay_init(int *argc, char***argv){
980 smpi_process_init(argc, argv);
981 smpi_process_mark_as_initialized();
983 int rank = smpi_process_index();
984 TRACE_smpi_init(rank);
985 TRACE_smpi_computing_init(rank);
986 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
987 extra->type = TRACING_INIT;
988 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
989 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
992 if (!smpi_process_index()){
993 _xbt_replay_action_init();
994 xbt_replay_action_register("init", action_init);
995 xbt_replay_action_register("finalize", action_finalize);
996 xbt_replay_action_register("comm_size", action_comm_size);
997 xbt_replay_action_register("comm_split", action_comm_split);
998 xbt_replay_action_register("comm_dup", action_comm_dup);
999 xbt_replay_action_register("send", action_send);
1000 xbt_replay_action_register("Isend", action_Isend);
1001 xbt_replay_action_register("recv", action_recv);
1002 xbt_replay_action_register("Irecv", action_Irecv);
1003 xbt_replay_action_register("test", action_test);
1004 xbt_replay_action_register("wait", action_wait);
1005 xbt_replay_action_register("waitAll", action_waitall);
1006 xbt_replay_action_register("barrier", action_barrier);
1007 xbt_replay_action_register("bcast", action_bcast);
1008 xbt_replay_action_register("reduce", action_reduce);
1009 xbt_replay_action_register("allReduce", action_allReduce);
1010 xbt_replay_action_register("allToAll", action_allToAll);
1011 xbt_replay_action_register("allToAllV", action_allToAllv);
1012 xbt_replay_action_register("gather", action_gather);
1013 xbt_replay_action_register("gatherV", action_gatherv);
1014 xbt_replay_action_register("allGatherV", action_allgatherv);
1015 xbt_replay_action_register("reduceScatter", action_reducescatter);
1016 xbt_replay_action_register("compute", action_compute);
1019 //if we have a delayed start, sleep here.
1022 double value = strtod((*argv)[2], &endptr);
1023 if (*endptr != '\0')
1024 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1025 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1026 smpi_execute_flops(value);
1028 xbt_replay_action_runner(*argc, *argv);
1031 int smpi_replay_finalize(){
1032 double sim_time= 1.;
1033 /* One active process will stop. Decrease the counter*/
1034 XBT_DEBUG("There are %lu elements in reqq[*]",
1035 xbt_dynar_length(reqq[smpi_process_index()]));
1036 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1037 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1038 MPI_Request requests[count_requests];
1039 MPI_Status status[count_requests];
1042 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1043 smpi_mpi_waitall(count_requests, requests, status);
1049 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1051 if(!active_processes){
1052 /* Last process alive speaking */
1053 /* end the simulated timer */
1054 sim_time = smpi_process_simulated_elapsed();
1055 XBT_INFO("Simulation time %f", sim_time);
1056 _xbt_replay_action_exit();
1057 xbt_free(sendbuffer);
1058 xbt_free(recvbuffer);
1062 mpi_coll_barrier_fun(MPI_COMM_WORLD);
1064 int rank = smpi_process_index();
1065 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1066 extra->type = TRACING_FINALIZE;
1067 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1069 smpi_process_finalize();
1071 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1072 TRACE_smpi_finalize(smpi_process_index());
1074 smpi_process_destroy();