1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static void log_timed_action (const char *const *action, double clock){
22 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
23 char *name = xbt_str_join_array(action, " ");
24 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30 static double parse_double(const char *string)
34 value = strtod(string, &endptr);
36 THROWF(unknown_error, 0, "%s is not a double", string);
40 static MPI_Datatype decode_datatype(const char *const action)
42 // Declared datatypes,
47 MPI_CURRENT_TYPE=MPI_DOUBLE;
50 MPI_CURRENT_TYPE=MPI_INT;
53 MPI_CURRENT_TYPE=MPI_CHAR;
56 MPI_CURRENT_TYPE=MPI_SHORT;
59 MPI_CURRENT_TYPE=MPI_LONG;
62 MPI_CURRENT_TYPE=MPI_FLOAT;
65 MPI_CURRENT_TYPE=MPI_BYTE;
68 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
71 return MPI_CURRENT_TYPE;
75 const char* encode_datatype(MPI_Datatype datatype)
78 //default type for output is set to MPI_BYTE
79 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
80 if (datatype==MPI_BYTE){
83 if(datatype==MPI_DOUBLE)
87 if(datatype==MPI_CHAR)
89 if(datatype==MPI_SHORT)
91 if(datatype==MPI_LONG)
93 if(datatype==MPI_FLOAT)
96 // default - not implemented.
97 // do not warn here as we pass in this function even for other trace formats
101 static void action_init(const char *const *action)
104 XBT_DEBUG("Initialize the counters");
106 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
107 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
109 /* start a simulated timer */
110 smpi_process_simulated_start();
111 /*initialize the number of active processes */
112 active_processes = smpi_process_count();
115 reqq=xbt_new0(xbt_dynar_t,active_processes);
117 for(i=0;i<active_processes;i++){
118 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
123 static void action_finalize(const char *const *action)
127 static void action_comm_size(const char *const *action)
129 double clock = smpi_process_simulated_elapsed();
131 communicator_size = parse_double(action[2]);
132 log_timed_action (action, clock);
135 static void action_comm_split(const char *const *action)
137 double clock = smpi_process_simulated_elapsed();
139 log_timed_action (action, clock);
142 static void action_comm_dup(const char *const *action)
144 double clock = smpi_process_simulated_elapsed();
146 log_timed_action (action, clock);
149 static void action_compute(const char *const *action)
151 double clock = smpi_process_simulated_elapsed();
152 double flops= parse_double(action[2]);
154 int rank = smpi_comm_rank(MPI_COMM_WORLD);
155 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
156 extra->type=TRACING_COMPUTING;
157 extra->comp_size=flops;
158 TRACE_smpi_computing_in(rank, extra);
160 smpi_execute_flops(flops);
162 TRACE_smpi_computing_out(rank);
165 log_timed_action (action, clock);
168 static void action_send(const char *const *action)
170 int to = atoi(action[2]);
171 double size=parse_double(action[3]);
172 double clock = smpi_process_simulated_elapsed();
175 MPI_CURRENT_TYPE=decode_datatype(action[4]);
177 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
181 int rank = smpi_comm_rank(MPI_COMM_WORLD);
183 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
184 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
185 extra->type = TRACING_SEND;
186 extra->send_size = size;
188 extra->dst = dst_traced;
189 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
190 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
191 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
194 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
196 log_timed_action (action, clock);
199 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
204 static void action_Isend(const char *const *action)
206 int to = atoi(action[2]);
207 double size=parse_double(action[3]);
208 double clock = smpi_process_simulated_elapsed();
211 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
212 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
215 int rank = smpi_comm_rank(MPI_COMM_WORLD);
216 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
217 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
218 extra->type = TRACING_ISEND;
219 extra->send_size = size;
221 extra->dst = dst_traced;
222 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
223 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
224 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
227 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
234 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
236 log_timed_action (action, clock);
239 static void action_recv(const char *const *action) {
240 int from = atoi(action[2]);
241 double size=parse_double(action[3]);
242 double clock = smpi_process_simulated_elapsed();
245 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
246 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
249 int rank = smpi_comm_rank(MPI_COMM_WORLD);
250 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
252 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253 extra->type = TRACING_RECV;
254 extra->send_size = size;
255 extra->src = src_traced;
257 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
258 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
261 //unknow size from the receiver pov
263 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
267 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
270 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
271 TRACE_smpi_recv(rank, src_traced, rank);
274 log_timed_action (action, clock);
277 static void action_Irecv(const char *const *action)
279 int from = atoi(action[2]);
280 double size=parse_double(action[3]);
281 double clock = smpi_process_simulated_elapsed();
284 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
285 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
288 int rank = smpi_comm_rank(MPI_COMM_WORLD);
289 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
290 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291 extra->type = TRACING_IRECV;
292 extra->send_size = size;
293 extra->src = src_traced;
295 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
296 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
299 //unknow size from the receiver pov
301 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
305 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
308 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
311 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
313 log_timed_action (action, clock);
316 static void action_test(const char *const *action){
317 double clock = smpi_process_simulated_elapsed();
322 request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
323 xbt_assert(request != NULL, "found null request in reqq");
326 int rank = smpi_comm_rank(MPI_COMM_WORLD);
327 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328 extra->type=TRACING_TEST;
329 TRACE_smpi_testing_in(rank, extra);
331 flag = smpi_mpi_test(&request, &status);
332 XBT_DEBUG("MPI_Test result: %d", flag);
333 /* push back request in dynar to be caught by a subsequent wait. if the test
334 * did succeed, the request is now NULL.
336 xbt_dynar_push_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request, request);
339 TRACE_smpi_testing_out(rank);
342 log_timed_action (action, clock);
345 static void action_wait(const char *const *action){
346 double clock = smpi_process_simulated_elapsed();
350 xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
351 "action wait not preceded by any irecv or isend: %s",
352 xbt_str_join_array(action," "));
353 request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
356 /* Assuming that the trace is well formed, this mean the comm might have
357 * been caught by a MPI_test. Then just return.
363 int rank = request->comm != MPI_COMM_NULL
364 ? smpi_comm_rank(request->comm)
367 MPI_Group group = smpi_comm_group(request->comm);
368 int src_traced = smpi_group_rank(group, request->src);
369 int dst_traced = smpi_group_rank(group, request->dst);
370 int is_wait_for_receive = request->recv;
371 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
372 extra->type = TRACING_WAIT;
373 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
375 smpi_mpi_wait(&request, &status);
377 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
378 if (is_wait_for_receive) {
379 TRACE_smpi_recv(rank, src_traced, dst_traced);
383 log_timed_action (action, clock);
386 static void action_waitall(const char *const *action){
387 double clock = smpi_process_simulated_elapsed();
388 int count_requests=0;
391 count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
393 if (count_requests>0) {
394 MPI_Request requests[count_requests];
395 MPI_Status status[count_requests];
397 /* The reqq is an array of dynars. Its index corresponds to the rank.
398 Thus each rank saves its own requests to the array request. */
399 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
402 //save information from requests
404 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
405 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
406 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
407 for (i = 0; i < count_requests; i++) {
409 int *asrc = xbt_new(int, 1);
410 int *adst = xbt_new(int, 1);
411 int *arecv = xbt_new(int, 1);
412 *asrc = requests[i]->src;
413 *adst = requests[i]->dst;
414 *arecv = requests[i]->recv;
415 xbt_dynar_insert_at(srcs, i, asrc);
416 xbt_dynar_insert_at(dsts, i, adst);
417 xbt_dynar_insert_at(recvs, i, arecv);
422 int *t = xbt_new(int, 1);
423 xbt_dynar_insert_at(srcs, i, t);
424 xbt_dynar_insert_at(dsts, i, t);
425 xbt_dynar_insert_at(recvs, i, t);
429 int rank_traced = smpi_process_index();
430 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
431 extra->type = TRACING_WAITALL;
432 extra->send_size=count_requests;
433 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
436 smpi_mpi_waitall(count_requests, requests, status);
439 for (i = 0; i < count_requests; i++) {
440 int src_traced, dst_traced, is_wait_for_receive;
441 xbt_dynar_get_cpy(srcs, i, &src_traced);
442 xbt_dynar_get_cpy(dsts, i, &dst_traced);
443 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
444 if (is_wait_for_receive) {
445 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
448 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
450 xbt_dynar_free(&srcs);
451 xbt_dynar_free(&dsts);
452 xbt_dynar_free(&recvs);
455 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
457 log_timed_action (action, clock);
460 static void action_barrier(const char *const *action){
461 double clock = smpi_process_simulated_elapsed();
463 int rank = smpi_comm_rank(MPI_COMM_WORLD);
464 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
465 extra->type = TRACING_BARRIER;
466 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
468 smpi_mpi_barrier(MPI_COMM_WORLD);
470 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
473 log_timed_action (action, clock);
477 static void action_bcast(const char *const *action)
479 double size = parse_double(action[2]);
480 double clock = smpi_process_simulated_elapsed();
483 * Initialize MPI_CURRENT_TYPE in order to decrease
484 * the number of the checks
486 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
489 root= atoi(action[3]);
491 MPI_CURRENT_TYPE=decode_datatype(action[4]);
496 int rank = smpi_comm_rank(MPI_COMM_WORLD);
497 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
499 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
500 extra->type = TRACING_BCAST;
501 extra->send_size = size;
502 extra->root = root_traced;
503 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
504 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
508 mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
510 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
513 log_timed_action (action, clock);
516 static void action_reduce(const char *const *action)
518 double comm_size = parse_double(action[2]);
519 double comp_size = parse_double(action[3]);
520 double clock = smpi_process_simulated_elapsed();
522 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
525 root= atoi(action[4]);
527 MPI_CURRENT_TYPE=decode_datatype(action[5]);
532 int rank = smpi_comm_rank(MPI_COMM_WORLD);
533 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
534 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
535 extra->type = TRACING_REDUCE;
536 extra->send_size = comm_size;
537 extra->comp_size = comp_size;
538 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
539 extra->root = root_traced;
541 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
543 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
544 smpi_execute_flops(comp_size);
546 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
549 log_timed_action (action, clock);
552 static void action_allReduce(const char *const *action) {
553 double comm_size = parse_double(action[2]);
554 double comp_size = parse_double(action[3]);
556 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
557 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
559 double clock = smpi_process_simulated_elapsed();
561 int rank = smpi_comm_rank(MPI_COMM_WORLD);
562 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
563 extra->type = TRACING_ALLREDUCE;
564 extra->send_size = comm_size;
565 extra->comp_size = comp_size;
566 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
568 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
570 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
571 smpi_execute_flops(comp_size);
572 mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
574 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
577 log_timed_action (action, clock);
580 static void action_allToAll(const char *const *action) {
581 double clock = smpi_process_simulated_elapsed();
582 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
583 int send_size = parse_double(action[2]);
584 int recv_size = parse_double(action[3]);
585 MPI_Datatype MPI_CURRENT_TYPE2;
588 MPI_CURRENT_TYPE=decode_datatype(action[4]);
589 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
592 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
593 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
595 void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));
596 void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
599 int rank = smpi_process_index();
600 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
601 extra->type = TRACING_ALLTOALL;
602 extra->send_size = send_size;
603 extra->recv_size = recv_size;
604 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
605 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
607 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
610 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
613 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
616 log_timed_action (action, clock);
622 static void action_gather(const char *const *action) {
624 The structure of the gather action for the rank 0 (total 4 processes)
629 1) 68 is the sendcounts
630 2) 68 is the recvcounts
631 3) 0 is the root node
632 4) 0 is the send datatype id, see decode_datatype()
633 5) 0 is the recv datatype id, see decode_datatype()
635 double clock = smpi_process_simulated_elapsed();
636 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
637 int send_size = parse_double(action[2]);
638 int recv_size = parse_double(action[3]);
639 MPI_Datatype MPI_CURRENT_TYPE2;
641 MPI_CURRENT_TYPE=decode_datatype(action[5]);
642 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
644 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
645 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
647 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
650 int root=atoi(action[4]);
651 int rank = smpi_process_index();
654 recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
657 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
658 extra->type = TRACING_GATHER;
659 extra->send_size = send_size;
660 extra->recv_size = recv_size;
662 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
663 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
665 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
667 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
668 recv, recv_size, MPI_CURRENT_TYPE2,
669 root, MPI_COMM_WORLD);
672 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
675 log_timed_action (action, clock);
682 static void action_gatherv(const char *const *action) {
684 The structure of the gatherv action for the rank 0 (total 4 processes)
686 0 gather 68 68 10 10 10 0 0 0
689 1) 68 is the sendcount
690 2) 68 10 10 10 is the recvcounts
691 3) 0 is the root node
692 4) 0 is the send datatype id, see decode_datatype()
693 5) 0 is the recv datatype id, see decode_datatype()
695 double clock = smpi_process_simulated_elapsed();
696 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
697 int send_size = parse_double(action[2]);
698 int *disps = xbt_new0(int, comm_size);
699 int *recvcounts = xbt_new0(int, comm_size);
702 MPI_Datatype MPI_CURRENT_TYPE2;
703 if(action[4+comm_size]) {
704 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
705 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
707 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
708 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
710 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
712 for(i=0;i<comm_size;i++) {
713 recvcounts[i] = atoi(action[i+3]);
714 recv_sum=recv_sum+recvcounts[i];
718 int root=atoi(action[3+comm_size]);
719 int rank = smpi_process_index();
722 recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
725 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
726 extra->type = TRACING_GATHERV;
727 extra->send_size = send_size;
728 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
729 for(i=0; i< comm_size; i++)//copy data to avoid bad free
730 extra->recvcounts[i] = recvcounts[i];
732 extra->num_processes = comm_size;
733 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
734 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
736 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
738 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
739 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
740 root, MPI_COMM_WORLD);
743 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
746 log_timed_action (action, clock);
747 xbt_free(recvcounts);
754 static void action_reducescatter(const char *const *action) {
757 The structure of the reducescatter action for the rank 0 (total 4 processes)
759 0 reduceScatter 275427 275427 275427 204020 11346849 0
762 1) The first four values after the name of the action declare the recvcounts array
763 2) The value 11346849 is the amount of instructions
764 3) The last value corresponds to the datatype, see decode_datatype().
766 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
770 double clock = smpi_process_simulated_elapsed();
771 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
772 int comp_size = parse_double(action[2+comm_size]);
773 int *recvcounts = xbt_new0(int, comm_size);
774 int *disps = xbt_new0(int, comm_size);
777 int rank = smpi_process_index();
779 if(action[3+comm_size])
780 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
782 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
784 for(i=0;i<comm_size;i++) {
785 recvcounts[i] = atoi(action[i+2]);
786 recv_sum=recv_sum+recvcounts[i];
791 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
792 extra->type = TRACING_REDUCE_SCATTER;
793 extra->send_size = 0;
794 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
795 for(i=0; i< comm_size; i++)//copy data to avoid bad free
796 extra->recvcounts[i] = recvcounts[i];
797 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
798 extra->comp_size = comp_size;
799 extra->num_processes = comm_size;
802 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
804 mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
805 root, MPI_COMM_WORLD);
806 smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
807 recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
808 smpi_execute_flops(comp_size);
812 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
814 xbt_free(recvcounts);
816 log_timed_action (action, clock);
820 static void action_allgatherv(const char *const *action) {
823 The structure of the allgatherv action for the rank 0 (total 4 processes)
825 0 allGatherV 275427 275427 275427 275427 204020
828 1) 275427 is the sendcount
829 2) The next four elements declare the recvcounts array
830 3) No more values mean that the datatype for sent and receive buffer
831 is the default one, see decode_datatype().
835 double clock = smpi_process_simulated_elapsed();
837 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
839 int sendcount=atoi(action[2]);
840 int *recvcounts = xbt_new0(int, comm_size);
841 int *disps = xbt_new0(int, comm_size);
843 MPI_Datatype MPI_CURRENT_TYPE2;
845 if(action[3+comm_size]) {
846 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
847 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
849 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
850 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
852 void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));
854 for(i=0;i<comm_size;i++) {
855 recvcounts[i] = atoi(action[i+3]);
856 recv_sum=recv_sum+recvcounts[i];
858 void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
861 int rank = smpi_process_index();
862 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
863 extra->type = TRACING_ALLGATHERV;
864 extra->send_size = sendcount;
865 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
866 for(i=0; i< comm_size; i++)//copy data to avoid bad free
867 extra->recvcounts[i] = recvcounts[i];
868 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
869 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
870 extra->num_processes = comm_size;
872 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
875 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
878 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
881 log_timed_action (action, clock);
884 xbt_free(recvcounts);
889 static void action_allToAllv(const char *const *action) {
891 The structure of the allToAllV action for the rank 0 (total 4 processes)
893 0 allToAllV 100 1 7 10 12 100 1 70 10 5
896 1) 100 is the size of the send buffer *sizeof(int),
897 2) 1 7 10 12 is the sendcounts array
898 3) 100*sizeof(int) is the size of the receiver buffer
899 4) 1 70 10 5 is the recvcounts array
904 double clock = smpi_process_simulated_elapsed();
906 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
907 int send_buf_size=0,recv_buf_size=0,i=0;
908 int *sendcounts = xbt_new0(int, comm_size);
909 int *recvcounts = xbt_new0(int, comm_size);
910 int *senddisps = xbt_new0(int, comm_size);
911 int *recvdisps = xbt_new0(int, comm_size);
913 MPI_Datatype MPI_CURRENT_TYPE2;
915 send_buf_size=parse_double(action[2]);
916 recv_buf_size=parse_double(action[3+comm_size]);
917 if(action[4+2*comm_size]) {
918 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
919 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
922 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
923 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
926 void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));
927 void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
929 for(i=0;i<comm_size;i++) {
930 sendcounts[i] = atoi(action[i+3]);
931 recvcounts[i] = atoi(action[i+4+comm_size]);
936 int rank = smpi_process_index();
937 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
938 extra->type = TRACING_ALLTOALLV;
939 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
940 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
941 extra->num_processes = comm_size;
943 for(i=0; i< comm_size; i++){//copy data to avoid bad free
944 extra->send_size += sendcounts[i];
945 extra->sendcounts[i] = sendcounts[i];
946 extra->recv_size += recvcounts[i];
947 extra->recvcounts[i] = recvcounts[i];
949 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
950 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
952 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
954 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
955 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
958 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
961 log_timed_action (action, clock);
964 xbt_free(sendcounts);
965 xbt_free(recvcounts);
970 void smpi_replay_init(int *argc, char***argv){
971 smpi_process_init(argc, argv);
972 smpi_process_mark_as_initialized();
974 int rank = smpi_process_index();
975 TRACE_smpi_init(rank);
976 TRACE_smpi_computing_init(rank);
977 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
978 extra->type = TRACING_INIT;
979 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
980 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
983 if (!smpi_process_index()){
984 _xbt_replay_action_init();
985 xbt_replay_action_register("init", action_init);
986 xbt_replay_action_register("finalize", action_finalize);
987 xbt_replay_action_register("comm_size", action_comm_size);
988 xbt_replay_action_register("comm_split", action_comm_split);
989 xbt_replay_action_register("comm_dup", action_comm_dup);
990 xbt_replay_action_register("send", action_send);
991 xbt_replay_action_register("Isend", action_Isend);
992 xbt_replay_action_register("recv", action_recv);
993 xbt_replay_action_register("Irecv", action_Irecv);
994 xbt_replay_action_register("test", action_test);
995 xbt_replay_action_register("wait", action_wait);
996 xbt_replay_action_register("waitAll", action_waitall);
997 xbt_replay_action_register("barrier", action_barrier);
998 xbt_replay_action_register("bcast", action_bcast);
999 xbt_replay_action_register("reduce", action_reduce);
1000 xbt_replay_action_register("allReduce", action_allReduce);
1001 xbt_replay_action_register("allToAll", action_allToAll);
1002 xbt_replay_action_register("allToAllV", action_allToAllv);
1003 xbt_replay_action_register("gather", action_gather);
1004 xbt_replay_action_register("gatherV", action_gatherv);
1005 xbt_replay_action_register("allGatherV", action_allgatherv);
1006 xbt_replay_action_register("reduceScatter", action_reducescatter);
1007 xbt_replay_action_register("compute", action_compute);
1010 xbt_replay_action_runner(*argc, *argv);
1013 int smpi_replay_finalize(){
1014 double sim_time= 1.;
1015 /* One active process will stop. Decrease the counter*/
1016 XBT_DEBUG("There are %lu elements in reqq[*]",
1017 xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
1018 if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
1019 int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
1020 MPI_Request requests[count_requests];
1021 MPI_Status status[count_requests];
1024 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
1025 smpi_mpi_waitall(count_requests, requests, status);
1031 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
1033 if(!active_processes){
1034 /* Last process alive speaking */
1035 /* end the simulated timer */
1036 sim_time = smpi_process_simulated_elapsed();
1037 XBT_INFO("Simulation time %f", sim_time);
1038 _xbt_replay_action_exit();
1042 smpi_mpi_barrier(MPI_COMM_WORLD);
1044 int rank = smpi_process_index();
1045 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1046 extra->type = TRACING_FINALIZE;
1047 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1049 smpi_process_finalize();
1051 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1052 TRACE_smpi_finalize(smpi_process_index());
1054 smpi_process_destroy();