1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!smpi_process_get_replaying())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!smpi_process_get_replaying())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!smpi_process_get_replaying())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111 if (datatype==MPI_BYTE){
114 if(datatype==MPI_DOUBLE)
116 if(datatype==MPI_INT)
118 if(datatype==MPI_CHAR)
120 if(datatype==MPI_SHORT)
122 if(datatype==MPI_LONG)
124 if(datatype==MPI_FLOAT)
127 // default - not implemented.
128 // do not warn here as we pass in this function even for other trace formats
132 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
134 while(action[i]!=NULL)\
137 THROWF(arg_error, 0, "%s replay failed.\n" \
138 "%d items were given on the line. First two should be process_id and action. " \
139 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
140 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
144 static void action_init(const char *const *action)
147 XBT_DEBUG("Initialize the counters");
148 CHECK_ACTION_PARAMS(action, 0, 1);
149 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
150 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
152 /* start a simulated timer */
153 smpi_process_simulated_start();
154 /*initialize the number of active processes */
155 active_processes = smpi_process_count();
158 reqq=xbt_new0(xbt_dynar_t,active_processes);
160 for(i=0;i<active_processes;i++){
161 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
166 static void action_finalize(const char *const *action)
170 static void action_comm_size(const char *const *action)
172 double clock = smpi_process_simulated_elapsed();
174 communicator_size = parse_double(action[2]);
175 log_timed_action (action, clock);
178 static void action_comm_split(const char *const *action)
180 double clock = smpi_process_simulated_elapsed();
182 log_timed_action (action, clock);
185 static void action_comm_dup(const char *const *action)
187 double clock = smpi_process_simulated_elapsed();
189 log_timed_action (action, clock);
192 static void action_compute(const char *const *action)
194 CHECK_ACTION_PARAMS(action, 1, 0);
195 double clock = smpi_process_simulated_elapsed();
196 double flops= parse_double(action[2]);
198 int rank = smpi_process_index();
199 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
200 extra->type=TRACING_COMPUTING;
201 extra->comp_size=flops;
202 TRACE_smpi_computing_in(rank, extra);
204 smpi_execute_flops(flops);
206 TRACE_smpi_computing_out(rank);
209 log_timed_action (action, clock);
212 static void action_send(const char *const *action)
214 CHECK_ACTION_PARAMS(action, 2, 1);
215 int to = atoi(action[2]);
216 double size=parse_double(action[3]);
217 double clock = smpi_process_simulated_elapsed();
220 MPI_CURRENT_TYPE=decode_datatype(action[4]);
222 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
226 int rank = smpi_process_index();
228 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
229 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
230 extra->type = TRACING_SEND;
231 extra->send_size = size;
233 extra->dst = dst_traced;
234 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
235 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
236 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
239 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
241 log_timed_action (action, clock);
244 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
249 static void action_Isend(const char *const *action)
251 CHECK_ACTION_PARAMS(action, 2, 1);
252 int to = atoi(action[2]);
253 double size=parse_double(action[3]);
254 double clock = smpi_process_simulated_elapsed();
257 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
258 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
261 int rank = smpi_process_index();
262 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
263 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
264 extra->type = TRACING_ISEND;
265 extra->send_size = size;
267 extra->dst = dst_traced;
268 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
269 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
270 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
273 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
276 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
280 xbt_dynar_push(reqq[smpi_process_index()],&request);
282 log_timed_action (action, clock);
285 static void action_recv(const char *const *action) {
286 CHECK_ACTION_PARAMS(action, 2, 1);
287 int from = atoi(action[2]);
288 double size=parse_double(action[3]);
289 double clock = smpi_process_simulated_elapsed();
292 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
293 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
296 int rank = smpi_process_index();
297 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
299 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
300 extra->type = TRACING_RECV;
301 extra->send_size = size;
302 extra->src = src_traced;
304 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
305 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
308 //unknow size from the receiver pov
310 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
314 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
317 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
318 TRACE_smpi_recv(rank, src_traced, rank);
321 log_timed_action (action, clock);
324 static void action_Irecv(const char *const *action)
326 CHECK_ACTION_PARAMS(action, 2, 1);
327 int from = atoi(action[2]);
328 double size=parse_double(action[3]);
329 double clock = smpi_process_simulated_elapsed();
332 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
333 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
336 int rank = smpi_process_index();
337 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
338 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
339 extra->type = TRACING_IRECV;
340 extra->send_size = size;
341 extra->src = src_traced;
343 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
344 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
347 //unknow size from the receiver pov
349 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
353 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
356 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
359 xbt_dynar_push(reqq[smpi_process_index()],&request);
361 log_timed_action (action, clock);
364 static void action_test(const char *const *action){
365 CHECK_ACTION_PARAMS(action, 0, 0);
366 double clock = smpi_process_simulated_elapsed();
371 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
372 //if request is null here, this may mean that a previous test has succeeded
373 //Different times in traced application and replayed version may lead to this
374 //In this case, ignore the extra calls.
377 int rank = smpi_process_index();
378 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
379 extra->type=TRACING_TEST;
380 TRACE_smpi_testing_in(rank, extra);
383 flag = smpi_mpi_test(&request, &status);
385 XBT_DEBUG("MPI_Test result: %d", flag);
386 /* push back request in dynar to be caught by a subsequent wait. if the test
387 * did succeed, the request is now NULL.
389 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
392 TRACE_smpi_testing_out(rank);
395 log_timed_action (action, clock);
398 static void action_wait(const char *const *action){
399 CHECK_ACTION_PARAMS(action, 0, 0);
400 double clock = smpi_process_simulated_elapsed();
404 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
405 "action wait not preceded by any irecv or isend: %s",
406 xbt_str_join_array(action," "));
407 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
410 /* Assuming that the trace is well formed, this mean the comm might have
411 * been caught by a MPI_test. Then just return.
417 int rank = request->comm != MPI_COMM_NULL
418 ? smpi_comm_rank(request->comm)
421 MPI_Group group = smpi_comm_group(request->comm);
422 int src_traced = smpi_group_rank(group, request->src);
423 int dst_traced = smpi_group_rank(group, request->dst);
424 int is_wait_for_receive = request->recv;
425 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
426 extra->type = TRACING_WAIT;
427 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
429 smpi_mpi_wait(&request, &status);
431 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
432 if (is_wait_for_receive) {
433 TRACE_smpi_recv(rank, src_traced, dst_traced);
437 log_timed_action (action, clock);
440 static void action_waitall(const char *const *action){
441 CHECK_ACTION_PARAMS(action, 0, 0);
442 double clock = smpi_process_simulated_elapsed();
443 int count_requests=0;
446 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
448 if (count_requests>0) {
449 MPI_Request requests[count_requests];
450 MPI_Status status[count_requests];
452 /* The reqq is an array of dynars. Its index corresponds to the rank.
453 Thus each rank saves its own requests to the array request. */
454 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
457 //save information from requests
459 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
460 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
461 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
462 for (i = 0; i < count_requests; i++) {
464 int *asrc = xbt_new(int, 1);
465 int *adst = xbt_new(int, 1);
466 int *arecv = xbt_new(int, 1);
467 *asrc = requests[i]->src;
468 *adst = requests[i]->dst;
469 *arecv = requests[i]->recv;
470 xbt_dynar_insert_at(srcs, i, asrc);
471 xbt_dynar_insert_at(dsts, i, adst);
472 xbt_dynar_insert_at(recvs, i, arecv);
477 int *t = xbt_new(int, 1);
478 xbt_dynar_insert_at(srcs, i, t);
479 xbt_dynar_insert_at(dsts, i, t);
480 xbt_dynar_insert_at(recvs, i, t);
484 int rank_traced = smpi_process_index();
485 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
486 extra->type = TRACING_WAITALL;
487 extra->send_size=count_requests;
488 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
491 smpi_mpi_waitall(count_requests, requests, status);
494 for (i = 0; i < count_requests; i++) {
495 int src_traced, dst_traced, is_wait_for_receive;
496 xbt_dynar_get_cpy(srcs, i, &src_traced);
497 xbt_dynar_get_cpy(dsts, i, &dst_traced);
498 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
499 if (is_wait_for_receive) {
500 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
503 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
505 xbt_dynar_free(&srcs);
506 xbt_dynar_free(&dsts);
507 xbt_dynar_free(&recvs);
510 int freedrank=smpi_process_index();
511 xbt_dynar_free_container(&(reqq[freedrank]));
512 reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
514 log_timed_action (action, clock);
517 static void action_barrier(const char *const *action){
518 double clock = smpi_process_simulated_elapsed();
520 int rank = smpi_process_index();
521 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
522 extra->type = TRACING_BARRIER;
523 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
525 mpi_coll_barrier_fun(MPI_COMM_WORLD);
527 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
530 log_timed_action (action, clock);
534 static void action_bcast(const char *const *action)
536 CHECK_ACTION_PARAMS(action, 1, 2);
537 double size = parse_double(action[2]);
538 double clock = smpi_process_simulated_elapsed();
541 * Initialize MPI_CURRENT_TYPE in order to decrease
542 * the number of the checks
544 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
547 root= atoi(action[3]);
549 MPI_CURRENT_TYPE=decode_datatype(action[4]);
554 int rank = smpi_process_index();
555 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
557 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
558 extra->type = TRACING_BCAST;
559 extra->send_size = size;
560 extra->root = root_traced;
561 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
562 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
565 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
566 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
568 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
570 log_timed_action (action, clock);
573 static void action_reduce(const char *const *action)
575 CHECK_ACTION_PARAMS(action, 2, 2);
576 double comm_size = parse_double(action[2]);
577 double comp_size = parse_double(action[3]);
578 double clock = smpi_process_simulated_elapsed();
580 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
583 root= atoi(action[4]);
585 MPI_CURRENT_TYPE=decode_datatype(action[5]);
592 int rank = smpi_process_index();
593 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
594 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
595 extra->type = TRACING_REDUCE;
596 extra->send_size = comm_size;
597 extra->comp_size = comp_size;
598 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
599 extra->root = root_traced;
601 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
603 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
604 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
605 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
606 smpi_execute_flops(comp_size);
608 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
610 log_timed_action (action, clock);
613 static void action_allReduce(const char *const *action) {
614 CHECK_ACTION_PARAMS(action, 2, 1);
615 double comm_size = parse_double(action[2]);
616 double comp_size = parse_double(action[3]);
618 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
619 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
621 double clock = smpi_process_simulated_elapsed();
623 int rank = smpi_process_index();
624 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
625 extra->type = TRACING_ALLREDUCE;
626 extra->send_size = comm_size;
627 extra->comp_size = comp_size;
628 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
630 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
632 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
633 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
634 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
635 smpi_execute_flops(comp_size);
637 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
639 log_timed_action (action, clock);
642 static void action_allToAll(const char *const *action) {
643 double clock = smpi_process_simulated_elapsed();
644 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
645 int send_size = parse_double(action[2]);
646 int recv_size = parse_double(action[3]);
647 MPI_Datatype MPI_CURRENT_TYPE2;
650 MPI_CURRENT_TYPE=decode_datatype(action[4]);
651 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
654 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
655 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
657 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
658 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
661 int rank = smpi_process_index();
662 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
663 extra->type = TRACING_ALLTOALL;
664 extra->send_size = send_size;
665 extra->recv_size = recv_size;
666 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
667 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
669 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
672 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
675 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
677 log_timed_action (action, clock);
682 static void action_gather(const char *const *action) {
684 The structure of the gather action for the rank 0 (total 4 processes)
689 1) 68 is the sendcounts
690 2) 68 is the recvcounts
691 3) 0 is the root node
692 4) 0 is the send datatype id, see decode_datatype()
693 5) 0 is the recv datatype id, see decode_datatype()
695 CHECK_ACTION_PARAMS(action, 2, 3);
696 double clock = smpi_process_simulated_elapsed();
697 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
698 int send_size = parse_double(action[2]);
699 int recv_size = parse_double(action[3]);
700 MPI_Datatype MPI_CURRENT_TYPE2;
702 MPI_CURRENT_TYPE=decode_datatype(action[5]);
703 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
705 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
706 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
708 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
712 root=atoi(action[4]);
713 int rank = smpi_comm_rank(MPI_COMM_WORLD);
716 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
719 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
720 extra->type = TRACING_GATHER;
721 extra->send_size = send_size;
722 extra->recv_size = recv_size;
724 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
725 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
727 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
729 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
730 recv, recv_size, MPI_CURRENT_TYPE2,
731 root, MPI_COMM_WORLD);
734 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
736 log_timed_action (action, clock);
742 static void action_gatherv(const char *const *action) {
744 The structure of the gatherv action for the rank 0 (total 4 processes)
746 0 gather 68 68 10 10 10 0 0 0
749 1) 68 is the sendcount
750 2) 68 10 10 10 is the recvcounts
751 3) 0 is the root node
752 4) 0 is the send datatype id, see decode_datatype()
753 5) 0 is the recv datatype id, see decode_datatype()
756 double clock = smpi_process_simulated_elapsed();
757 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
758 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
759 int send_size = parse_double(action[2]);
760 int *disps = xbt_new0(int, comm_size);
761 int *recvcounts = xbt_new0(int, comm_size);
764 MPI_Datatype MPI_CURRENT_TYPE2;
765 if(action[4+comm_size]) {
766 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
767 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
769 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
770 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
772 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
774 for(i=0;i<comm_size;i++) {
775 recvcounts[i] = atoi(action[i+3]);
776 recv_sum=recv_sum+recvcounts[i];
780 int root=atoi(action[3+comm_size]);
781 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
784 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
787 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
788 extra->type = TRACING_GATHERV;
789 extra->send_size = send_size;
790 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
791 for(i=0; i< comm_size; i++)//copy data to avoid bad free
792 extra->recvcounts[i] = recvcounts[i];
794 extra->num_processes = comm_size;
795 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
796 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
798 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
800 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
801 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
802 root, MPI_COMM_WORLD);
805 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
808 log_timed_action (action, clock);
809 xbt_free(recvcounts);
813 static void action_reducescatter(const char *const *action) {
816 The structure of the reducescatter action for the rank 0 (total 4 processes)
818 0 reduceScatter 275427 275427 275427 204020 11346849 0
821 1) The first four values after the name of the action declare the recvcounts array
822 2) The value 11346849 is the amount of instructions
823 3) The last value corresponds to the datatype, see decode_datatype().
825 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
829 double clock = smpi_process_simulated_elapsed();
830 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
831 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
832 int comp_size = parse_double(action[2+comm_size]);
833 int *recvcounts = xbt_new0(int, comm_size);
834 int *disps = xbt_new0(int, comm_size);
836 int rank = smpi_process_index();
838 if(action[3+comm_size])
839 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
841 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
843 for(i=0;i<comm_size;i++) {
844 recvcounts[i] = atoi(action[i+2]);
850 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
851 extra->type = TRACING_REDUCE_SCATTER;
852 extra->send_size = 0;
853 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
854 for(i=0; i< comm_size; i++)//copy data to avoid bad free
855 extra->recvcounts[i] = recvcounts[i];
856 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
857 extra->comp_size = comp_size;
858 extra->num_processes = comm_size;
860 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
862 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
863 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
865 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
867 smpi_execute_flops(comp_size);
871 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
873 xbt_free(recvcounts);
875 log_timed_action (action, clock);
878 static void action_allgather(const char *const *action) {
880 The structure of the allgather action for the rank 0 (total 4 processes)
882 0 allGather 275427 275427
885 1) 275427 is the sendcount
886 2) 275427 is the recvcount
887 3) No more values mean that the datatype for sent and receive buffer
888 is the default one, see decode_datatype().
892 double clock = smpi_process_simulated_elapsed();
894 CHECK_ACTION_PARAMS(action, 2, 2);
895 int sendcount=atoi(action[2]);
896 int recvcount=atoi(action[3]);
898 MPI_Datatype MPI_CURRENT_TYPE2;
901 MPI_CURRENT_TYPE = decode_datatype(action[3]);
902 MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
904 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
905 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
907 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
908 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
911 int rank = smpi_process_index();
912 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
913 extra->type = TRACING_ALLGATHER;
914 extra->send_size = sendcount;
915 extra->recv_size= recvcount;
916 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
917 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
918 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
920 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
923 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
926 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
929 log_timed_action (action, clock);
932 static void action_allgatherv(const char *const *action) {
935 The structure of the allgatherv action for the rank 0 (total 4 processes)
937 0 allGatherV 275427 275427 275427 275427 204020
940 1) 275427 is the sendcount
941 2) The next four elements declare the recvcounts array
942 3) No more values mean that the datatype for sent and receive buffer
943 is the default one, see decode_datatype().
947 double clock = smpi_process_simulated_elapsed();
949 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
950 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
952 int sendcount=atoi(action[2]);
953 int *recvcounts = xbt_new0(int, comm_size);
954 int *disps = xbt_new0(int, comm_size);
956 MPI_Datatype MPI_CURRENT_TYPE2;
958 if(action[3+comm_size]) {
959 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
960 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
962 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
963 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
965 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
967 for(i=0;i<comm_size;i++) {
968 recvcounts[i] = atoi(action[i+3]);
969 recv_sum=recv_sum+recvcounts[i];
971 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
974 int rank = smpi_process_index();
975 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
976 extra->type = TRACING_ALLGATHERV;
977 extra->send_size = sendcount;
978 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
979 for(i=0; i< comm_size; i++)//copy data to avoid bad free
980 extra->recvcounts[i] = recvcounts[i];
981 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
982 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
983 extra->num_processes = comm_size;
985 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
988 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
991 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
994 log_timed_action (action, clock);
995 xbt_free(recvcounts);
999 static void action_allToAllv(const char *const *action) {
1001 The structure of the allToAllV action for the rank 0 (total 4 processes)
1003 0 allToAllV 100 1 7 10 12 100 1 70 10 5
1006 1) 100 is the size of the send buffer *sizeof(int),
1007 2) 1 7 10 12 is the sendcounts array
1008 3) 100*sizeof(int) is the size of the receiver buffer
1009 4) 1 70 10 5 is the recvcounts array
1014 double clock = smpi_process_simulated_elapsed();
1016 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
1017 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
1018 int send_buf_size=0,recv_buf_size=0,i=0;
1019 int *sendcounts = xbt_new0(int, comm_size);
1020 int *recvcounts = xbt_new0(int, comm_size);
1021 int *senddisps = xbt_new0(int, comm_size);
1022 int *recvdisps = xbt_new0(int, comm_size);
1024 MPI_Datatype MPI_CURRENT_TYPE2;
1026 send_buf_size=parse_double(action[2]);
1027 recv_buf_size=parse_double(action[3+comm_size]);
1028 if(action[4+2*comm_size]) {
1029 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
1030 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
1033 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
1034 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
1037 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1038 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1040 for(i=0;i<comm_size;i++) {
1041 sendcounts[i] = atoi(action[i+3]);
1042 recvcounts[i] = atoi(action[i+4+comm_size]);
1047 int rank = smpi_process_index();
1048 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1049 extra->type = TRACING_ALLTOALLV;
1050 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1051 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1052 extra->num_processes = comm_size;
1054 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1055 extra->send_size += sendcounts[i];
1056 extra->sendcounts[i] = sendcounts[i];
1057 extra->recv_size += recvcounts[i];
1058 extra->recvcounts[i] = recvcounts[i];
1060 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
1061 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
1063 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1065 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1066 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1069 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1072 log_timed_action (action, clock);
1073 xbt_free(sendcounts);
1074 xbt_free(recvcounts);
1075 xbt_free(senddisps);
1076 xbt_free(recvdisps);
1079 void smpi_replay_init(int *argc, char***argv){
1080 smpi_process_init(argc, argv);
1081 smpi_process_mark_as_initialized();
1082 smpi_process_set_replaying(1);
1084 int rank = smpi_process_index();
1085 TRACE_smpi_init(rank);
1086 TRACE_smpi_computing_init(rank);
1087 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1088 extra->type = TRACING_INIT;
1089 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1090 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1093 if (!smpi_process_index()){
1094 _xbt_replay_action_init();
1095 xbt_replay_action_register("init", action_init);
1096 xbt_replay_action_register("finalize", action_finalize);
1097 xbt_replay_action_register("comm_size", action_comm_size);
1098 xbt_replay_action_register("comm_split", action_comm_split);
1099 xbt_replay_action_register("comm_dup", action_comm_dup);
1100 xbt_replay_action_register("send", action_send);
1101 xbt_replay_action_register("Isend", action_Isend);
1102 xbt_replay_action_register("recv", action_recv);
1103 xbt_replay_action_register("Irecv", action_Irecv);
1104 xbt_replay_action_register("test", action_test);
1105 xbt_replay_action_register("wait", action_wait);
1106 xbt_replay_action_register("waitAll", action_waitall);
1107 xbt_replay_action_register("barrier", action_barrier);
1108 xbt_replay_action_register("bcast", action_bcast);
1109 xbt_replay_action_register("reduce", action_reduce);
1110 xbt_replay_action_register("allReduce", action_allReduce);
1111 xbt_replay_action_register("allToAll", action_allToAll);
1112 xbt_replay_action_register("allToAllV", action_allToAllv);
1113 xbt_replay_action_register("gather", action_gather);
1114 xbt_replay_action_register("gatherV", action_gatherv);
1115 xbt_replay_action_register("allGather", action_allgather);
1116 xbt_replay_action_register("allGatherV", action_allgatherv);
1117 xbt_replay_action_register("reduceScatter", action_reducescatter);
1118 xbt_replay_action_register("compute", action_compute);
1121 //if we have a delayed start, sleep here.
1124 double value = strtod((*argv)[2], &endptr);
1125 if (*endptr != '\0')
1126 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1127 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1128 smpi_execute_flops(value);
1130 xbt_replay_action_runner(*argc, *argv);
1133 int smpi_replay_finalize(){
1134 double sim_time= 1.;
1135 /* One active process will stop. Decrease the counter*/
1136 XBT_DEBUG("There are %lu elements in reqq[*]",
1137 xbt_dynar_length(reqq[smpi_process_index()]));
1138 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1139 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1140 MPI_Request requests[count_requests];
1141 MPI_Status status[count_requests];
1144 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1145 smpi_mpi_waitall(count_requests, requests, status);
1151 if(!active_processes){
1152 /* Last process alive speaking */
1153 /* end the simulated timer */
1154 sim_time = smpi_process_simulated_elapsed();
1158 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1160 if(!active_processes){
1161 XBT_INFO("Simulation time %f", sim_time);
1162 _xbt_replay_action_exit();
1163 xbt_free(sendbuffer);
1164 xbt_free(recvbuffer);
1171 int rank = smpi_process_index();
1172 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1173 extra->type = TRACING_FINALIZE;
1174 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1176 smpi_process_finalize();
1178 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1179 TRACE_smpi_finalize(smpi_process_index());
1181 smpi_process_destroy();