1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
26 static void log_timed_action (const char *const *action, double clock){
27 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28 char *name = xbt_str_join_array(action, " ");
29 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36 if (!smpi_process_get_replaying())
37 return xbt_malloc(size);
38 if (sendbuffer_size<size){
39 sendbuffer=xbt_realloc(sendbuffer,size);
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46 if (!smpi_process_get_replaying())
47 return xbt_malloc(size);
48 if (recvbuffer_size<size){
49 recvbuffer=xbt_realloc(recvbuffer,size);
55 void smpi_free_tmp_buffer(void* buf){
56 if (!smpi_process_get_replaying())
61 static double parse_double(const char *string)
65 value = strtod(string, &endptr);
67 THROWF(unknown_error, 0, "%s is not a double", string);
71 static MPI_Datatype decode_datatype(const char *const action)
73 // Declared datatypes,
78 MPI_CURRENT_TYPE=MPI_DOUBLE;
81 MPI_CURRENT_TYPE=MPI_INT;
84 MPI_CURRENT_TYPE=MPI_CHAR;
87 MPI_CURRENT_TYPE=MPI_SHORT;
90 MPI_CURRENT_TYPE=MPI_LONG;
93 MPI_CURRENT_TYPE=MPI_FLOAT;
96 MPI_CURRENT_TYPE=MPI_BYTE;
99 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
102 return MPI_CURRENT_TYPE;
106 const char* encode_datatype(MPI_Datatype datatype, int* known)
109 //default type for output is set to MPI_BYTE
110 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
112 if (datatype==MPI_BYTE){
115 if(datatype==MPI_DOUBLE)
117 if(datatype==MPI_INT)
119 if(datatype==MPI_CHAR)
121 if(datatype==MPI_SHORT)
123 if(datatype==MPI_LONG)
125 if(datatype==MPI_FLOAT)
127 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
129 // default - not implemented.
130 // do not warn here as we pass in this function even for other trace formats
134 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
136 while(action[i]!=NULL)\
139 THROWF(arg_error, 0, "%s replay failed.\n" \
140 "%d items were given on the line. First two should be process_id and action. " \
141 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
142 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
146 static void action_init(const char *const *action)
149 XBT_DEBUG("Initialize the counters");
150 CHECK_ACTION_PARAMS(action, 0, 1);
151 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
152 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
154 /* start a simulated timer */
155 smpi_process_simulated_start();
156 /*initialize the number of active processes */
157 active_processes = smpi_process_count();
160 reqq=xbt_new0(xbt_dynar_t,active_processes);
162 for(i=0;i<active_processes;i++){
163 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
168 static void action_finalize(const char *const *action)
172 static void action_comm_size(const char *const *action)
174 double clock = smpi_process_simulated_elapsed();
176 communicator_size = parse_double(action[2]);
177 log_timed_action (action, clock);
180 static void action_comm_split(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
184 log_timed_action (action, clock);
187 static void action_comm_dup(const char *const *action)
189 double clock = smpi_process_simulated_elapsed();
191 log_timed_action (action, clock);
194 static void action_compute(const char *const *action)
196 CHECK_ACTION_PARAMS(action, 1, 0);
197 double clock = smpi_process_simulated_elapsed();
198 double flops= parse_double(action[2]);
199 int rank = smpi_process_index();
200 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
201 extra->type=TRACING_COMPUTING;
202 extra->comp_size=flops;
203 TRACE_smpi_computing_in(rank, extra);
205 smpi_execute_flops(flops);
207 TRACE_smpi_computing_out(rank);
208 log_timed_action (action, clock);
211 static void action_send(const char *const *action)
213 CHECK_ACTION_PARAMS(action, 2, 1);
214 int to = atoi(action[2]);
215 double size=parse_double(action[3]);
216 double clock = smpi_process_simulated_elapsed();
219 MPI_CURRENT_TYPE=decode_datatype(action[4]);
221 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
224 int rank = smpi_process_index();
226 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
227 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
228 extra->type = TRACING_SEND;
229 extra->send_size = size;
231 extra->dst = dst_traced;
232 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
233 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
234 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
236 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
238 log_timed_action (action, clock);
240 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
243 static void action_Isend(const char *const *action)
245 CHECK_ACTION_PARAMS(action, 2, 1);
246 int to = atoi(action[2]);
247 double size=parse_double(action[3]);
248 double clock = smpi_process_simulated_elapsed();
251 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
252 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
254 int rank = smpi_process_index();
255 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
256 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
257 extra->type = TRACING_ISEND;
258 extra->send_size = size;
260 extra->dst = dst_traced;
261 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
262 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
263 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
265 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
267 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
270 xbt_dynar_push(reqq[smpi_process_index()],&request);
272 log_timed_action (action, clock);
275 static void action_recv(const char *const *action) {
276 CHECK_ACTION_PARAMS(action, 2, 1);
277 int from = atoi(action[2]);
278 double size=parse_double(action[3]);
279 double clock = smpi_process_simulated_elapsed();
282 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
283 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
285 int rank = smpi_process_index();
286 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
288 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
289 extra->type = TRACING_RECV;
290 extra->send_size = size;
291 extra->src = src_traced;
293 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
294 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
296 //unknow size from the receiver pov
298 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
302 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
304 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
305 TRACE_smpi_recv(rank, src_traced, rank);
307 log_timed_action (action, clock);
310 static void action_Irecv(const char *const *action)
312 CHECK_ACTION_PARAMS(action, 2, 1);
313 int from = atoi(action[2]);
314 double size=parse_double(action[3]);
315 double clock = smpi_process_simulated_elapsed();
318 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
319 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
321 int rank = smpi_process_index();
322 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
323 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
324 extra->type = TRACING_IRECV;
325 extra->send_size = size;
326 extra->src = src_traced;
328 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
329 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
331 //unknow size from the receiver pov
333 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
337 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
339 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
341 xbt_dynar_push(reqq[smpi_process_index()],&request);
343 log_timed_action (action, clock);
346 static void action_test(const char *const *action){
347 CHECK_ACTION_PARAMS(action, 0, 0);
348 double clock = smpi_process_simulated_elapsed();
353 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354 //if request is null here, this may mean that a previous test has succeeded
355 //Different times in traced application and replayed version may lead to this
356 //In this case, ignore the extra calls.
358 int rank = smpi_process_index();
359 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
360 extra->type=TRACING_TEST;
361 TRACE_smpi_testing_in(rank, extra);
363 flag = smpi_mpi_test(&request, &status);
365 XBT_DEBUG("MPI_Test result: %d", flag);
366 /* push back request in dynar to be caught by a subsequent wait. if the test
367 * did succeed, the request is now NULL.
369 xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
371 TRACE_smpi_testing_out(rank);
373 log_timed_action (action, clock);
376 static void action_wait(const char *const *action){
377 CHECK_ACTION_PARAMS(action, 0, 0);
378 double clock = smpi_process_simulated_elapsed();
382 xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
383 "action wait not preceded by any irecv or isend: %s",
384 xbt_str_join_array(action," "));
385 request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
388 /* Assuming that the trace is well formed, this mean the comm might have
389 * been caught by a MPI_test. Then just return.
394 int rank = request->comm != MPI_COMM_NULL
395 ? smpi_comm_rank(request->comm)
398 MPI_Group group = smpi_comm_group(request->comm);
399 int src_traced = smpi_group_rank(group, request->src);
400 int dst_traced = smpi_group_rank(group, request->dst);
401 int is_wait_for_receive = request->recv;
402 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
403 extra->type = TRACING_WAIT;
404 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
406 smpi_mpi_wait(&request, &status);
408 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
409 if (is_wait_for_receive)
410 TRACE_smpi_recv(rank, src_traced, dst_traced);
411 log_timed_action (action, clock);
414 static void action_waitall(const char *const *action){
415 CHECK_ACTION_PARAMS(action, 0, 0);
416 double clock = smpi_process_simulated_elapsed();
417 int count_requests=0;
420 count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
422 if (count_requests>0) {
423 MPI_Request requests[count_requests];
424 MPI_Status status[count_requests];
426 /* The reqq is an array of dynars. Its index corresponds to the rank.
427 Thus each rank saves its own requests to the array request. */
428 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
430 //save information from requests
432 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
433 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
434 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
435 for (i = 0; i < count_requests; i++) {
437 int *asrc = xbt_new(int, 1);
438 int *adst = xbt_new(int, 1);
439 int *arecv = xbt_new(int, 1);
440 *asrc = requests[i]->src;
441 *adst = requests[i]->dst;
442 *arecv = requests[i]->recv;
443 xbt_dynar_insert_at(srcs, i, asrc);
444 xbt_dynar_insert_at(dsts, i, adst);
445 xbt_dynar_insert_at(recvs, i, arecv);
450 int *t = xbt_new(int, 1);
451 xbt_dynar_insert_at(srcs, i, t);
452 xbt_dynar_insert_at(dsts, i, t);
453 xbt_dynar_insert_at(recvs, i, t);
457 int rank_traced = smpi_process_index();
458 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
459 extra->type = TRACING_WAITALL;
460 extra->send_size=count_requests;
461 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
463 smpi_mpi_waitall(count_requests, requests, status);
465 for (i = 0; i < count_requests; i++) {
466 int src_traced, dst_traced, is_wait_for_receive;
467 xbt_dynar_get_cpy(srcs, i, &src_traced);
468 xbt_dynar_get_cpy(dsts, i, &dst_traced);
469 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
470 if (is_wait_for_receive) {
471 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
474 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
476 xbt_dynar_free(&srcs);
477 xbt_dynar_free(&dsts);
478 xbt_dynar_free(&recvs);
480 int freedrank=smpi_process_index();
481 xbt_dynar_free_container(&(reqq[freedrank]));
482 reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
484 log_timed_action (action, clock);
487 static void action_barrier(const char *const *action){
488 double clock = smpi_process_simulated_elapsed();
489 int rank = smpi_process_index();
490 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
491 extra->type = TRACING_BARRIER;
492 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
494 mpi_coll_barrier_fun(MPI_COMM_WORLD);
496 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
497 log_timed_action (action, clock);
501 static void action_bcast(const char *const *action)
503 CHECK_ACTION_PARAMS(action, 1, 2);
504 double size = parse_double(action[2]);
505 double clock = smpi_process_simulated_elapsed();
508 * Initialize MPI_CURRENT_TYPE in order to decrease
509 * the number of the checks
511 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
514 root= atoi(action[3]);
516 MPI_CURRENT_TYPE=decode_datatype(action[4]);
520 int rank = smpi_process_index();
521 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
523 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
524 extra->type = TRACING_BCAST;
525 extra->send_size = size;
526 extra->root = root_traced;
527 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
528 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
529 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
531 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
533 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
534 log_timed_action (action, clock);
537 static void action_reduce(const char *const *action)
539 CHECK_ACTION_PARAMS(action, 2, 2);
540 double comm_size = parse_double(action[2]);
541 double comp_size = parse_double(action[3]);
542 double clock = smpi_process_simulated_elapsed();
544 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
547 root= atoi(action[4]);
549 MPI_CURRENT_TYPE=decode_datatype(action[5]);
555 int rank = smpi_process_index();
556 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
557 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
558 extra->type = TRACING_REDUCE;
559 extra->send_size = comm_size;
560 extra->comp_size = comp_size;
561 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
562 extra->root = root_traced;
564 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
566 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
567 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
568 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
569 smpi_execute_flops(comp_size);
571 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
572 log_timed_action (action, clock);
575 static void action_allReduce(const char *const *action) {
576 CHECK_ACTION_PARAMS(action, 2, 1);
577 double comm_size = parse_double(action[2]);
578 double comp_size = parse_double(action[3]);
580 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
581 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
583 double clock = smpi_process_simulated_elapsed();
584 int rank = smpi_process_index();
585 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
586 extra->type = TRACING_ALLREDUCE;
587 extra->send_size = comm_size;
588 extra->comp_size = comp_size;
589 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
590 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
592 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
593 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
594 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
595 smpi_execute_flops(comp_size);
597 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
598 log_timed_action (action, clock);
601 static void action_allToAll(const char *const *action) {
602 double clock = smpi_process_simulated_elapsed();
603 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
604 int send_size = parse_double(action[2]);
605 int recv_size = parse_double(action[3]);
606 MPI_Datatype MPI_CURRENT_TYPE2;
609 MPI_CURRENT_TYPE=decode_datatype(action[4]);
610 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
613 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
614 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
616 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
617 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
619 int rank = smpi_process_index();
620 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
621 extra->type = TRACING_ALLTOALL;
622 extra->send_size = send_size;
623 extra->recv_size = recv_size;
624 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
625 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
627 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
629 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
631 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
632 log_timed_action (action, clock);
636 static void action_gather(const char *const *action) {
638 The structure of the gather action for the rank 0 (total 4 processes)
643 1) 68 is the sendcounts
644 2) 68 is the recvcounts
645 3) 0 is the root node
646 4) 0 is the send datatype id, see decode_datatype()
647 5) 0 is the recv datatype id, see decode_datatype()
649 CHECK_ACTION_PARAMS(action, 2, 3);
650 double clock = smpi_process_simulated_elapsed();
651 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
652 int send_size = parse_double(action[2]);
653 int recv_size = parse_double(action[3]);
654 MPI_Datatype MPI_CURRENT_TYPE2;
656 MPI_CURRENT_TYPE=decode_datatype(action[5]);
657 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
659 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
660 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
662 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
666 root=atoi(action[4]);
667 int rank = smpi_comm_rank(MPI_COMM_WORLD);
670 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
672 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
673 extra->type = TRACING_GATHER;
674 extra->send_size = send_size;
675 extra->recv_size = recv_size;
677 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
678 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
680 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
682 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
683 recv, recv_size, MPI_CURRENT_TYPE2,
684 root, MPI_COMM_WORLD);
686 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
687 log_timed_action (action, clock);
692 static void action_gatherv(const char *const *action) {
694 The structure of the gatherv action for the rank 0 (total 4 processes)
696 0 gather 68 68 10 10 10 0 0 0
699 1) 68 is the sendcount
700 2) 68 10 10 10 is the recvcounts
701 3) 0 is the root node
702 4) 0 is the send datatype id, see decode_datatype()
703 5) 0 is the recv datatype id, see decode_datatype()
706 double clock = smpi_process_simulated_elapsed();
707 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
708 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
709 int send_size = parse_double(action[2]);
710 int *disps = xbt_new0(int, comm_size);
711 int *recvcounts = xbt_new0(int, comm_size);
714 MPI_Datatype MPI_CURRENT_TYPE2;
715 if(action[4+comm_size]) {
716 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
717 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
719 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
720 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
722 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
724 for(i=0;i<comm_size;i++) {
725 recvcounts[i] = atoi(action[i+3]);
726 recv_sum=recv_sum+recvcounts[i];
730 int root=atoi(action[3+comm_size]);
731 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
734 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
736 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
737 extra->type = TRACING_GATHERV;
738 extra->send_size = send_size;
739 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
740 for(i=0; i< comm_size; i++)//copy data to avoid bad free
741 extra->recvcounts[i] = recvcounts[i];
743 extra->num_processes = comm_size;
744 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
745 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
747 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
749 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
750 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
751 root, MPI_COMM_WORLD);
753 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
754 log_timed_action (action, clock);
755 xbt_free(recvcounts);
759 static void action_reducescatter(const char *const *action) {
762 The structure of the reducescatter action for the rank 0 (total 4 processes)
764 0 reduceScatter 275427 275427 275427 204020 11346849 0
767 1) The first four values after the name of the action declare the recvcounts array
768 2) The value 11346849 is the amount of instructions
769 3) The last value corresponds to the datatype, see decode_datatype().
771 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
775 double clock = smpi_process_simulated_elapsed();
776 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
777 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
778 int comp_size = parse_double(action[2+comm_size]);
779 int *recvcounts = xbt_new0(int, comm_size);
780 int *disps = xbt_new0(int, comm_size);
782 int rank = smpi_process_index();
784 if(action[3+comm_size])
785 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
787 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
789 for(i=0;i<comm_size;i++) {
790 recvcounts[i] = atoi(action[i+2]);
795 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
796 extra->type = TRACING_REDUCE_SCATTER;
797 extra->send_size = 0;
798 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
799 for(i=0; i< comm_size; i++)//copy data to avoid bad free
800 extra->recvcounts[i] = recvcounts[i];
801 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
802 extra->comp_size = comp_size;
803 extra->num_processes = comm_size;
805 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
807 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
808 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
810 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
812 smpi_execute_flops(comp_size);
815 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
816 xbt_free(recvcounts);
818 log_timed_action (action, clock);
821 static void action_allgather(const char *const *action) {
823 The structure of the allgather action for the rank 0 (total 4 processes)
825 0 allGather 275427 275427
828 1) 275427 is the sendcount
829 2) 275427 is the recvcount
830 3) No more values mean that the datatype for sent and receive buffer
831 is the default one, see decode_datatype().
835 double clock = smpi_process_simulated_elapsed();
837 CHECK_ACTION_PARAMS(action, 2, 2);
838 int sendcount=atoi(action[2]);
839 int recvcount=atoi(action[3]);
841 MPI_Datatype MPI_CURRENT_TYPE2;
844 MPI_CURRENT_TYPE = decode_datatype(action[3]);
845 MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
847 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
848 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
850 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
851 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
853 int rank = smpi_process_index();
854 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
855 extra->type = TRACING_ALLGATHER;
856 extra->send_size = sendcount;
857 extra->recv_size= recvcount;
858 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
859 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
860 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
862 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
864 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
866 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
867 log_timed_action (action, clock);
870 static void action_allgatherv(const char *const *action) {
873 The structure of the allgatherv action for the rank 0 (total 4 processes)
875 0 allGatherV 275427 275427 275427 275427 204020
878 1) 275427 is the sendcount
879 2) The next four elements declare the recvcounts array
880 3) No more values mean that the datatype for sent and receive buffer
881 is the default one, see decode_datatype().
885 double clock = smpi_process_simulated_elapsed();
887 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
888 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
890 int sendcount=atoi(action[2]);
891 int *recvcounts = xbt_new0(int, comm_size);
892 int *disps = xbt_new0(int, comm_size);
894 MPI_Datatype MPI_CURRENT_TYPE2;
896 if(action[3+comm_size]) {
897 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
898 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
900 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
901 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
903 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
905 for(i=0;i<comm_size;i++) {
906 recvcounts[i] = atoi(action[i+3]);
907 recv_sum=recv_sum+recvcounts[i];
909 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
911 int rank = smpi_process_index();
912 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
913 extra->type = TRACING_ALLGATHERV;
914 extra->send_size = sendcount;
915 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
916 for(i=0; i< comm_size; i++)//copy data to avoid bad free
917 extra->recvcounts[i] = recvcounts[i];
918 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
919 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
920 extra->num_processes = comm_size;
922 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
924 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
926 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
927 log_timed_action (action, clock);
928 xbt_free(recvcounts);
932 static void action_allToAllv(const char *const *action) {
934 The structure of the allToAllV action for the rank 0 (total 4 processes)
936 0 allToAllV 100 1 7 10 12 100 1 70 10 5
939 1) 100 is the size of the send buffer *sizeof(int),
940 2) 1 7 10 12 is the sendcounts array
941 3) 100*sizeof(int) is the size of the receiver buffer
942 4) 1 70 10 5 is the recvcounts array
947 double clock = smpi_process_simulated_elapsed();
949 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
950 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
951 int send_buf_size=0,recv_buf_size=0,i=0;
952 int *sendcounts = xbt_new0(int, comm_size);
953 int *recvcounts = xbt_new0(int, comm_size);
954 int *senddisps = xbt_new0(int, comm_size);
955 int *recvdisps = xbt_new0(int, comm_size);
957 MPI_Datatype MPI_CURRENT_TYPE2;
959 send_buf_size=parse_double(action[2]);
960 recv_buf_size=parse_double(action[3+comm_size]);
961 if(action[4+2*comm_size]) {
962 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
963 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
966 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
967 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
970 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
971 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
973 for(i=0;i<comm_size;i++) {
974 sendcounts[i] = atoi(action[i+3]);
975 recvcounts[i] = atoi(action[i+4+comm_size]);
979 int rank = smpi_process_index();
980 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
981 extra->type = TRACING_ALLTOALLV;
982 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
983 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
984 extra->num_processes = comm_size;
986 for(i=0; i< comm_size; i++){//copy data to avoid bad free
987 extra->send_size += sendcounts[i];
988 extra->sendcounts[i] = sendcounts[i];
989 extra->recv_size += recvcounts[i];
990 extra->recvcounts[i] = recvcounts[i];
992 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
993 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
995 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
997 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
998 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1001 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1002 log_timed_action (action, clock);
1003 xbt_free(sendcounts);
1004 xbt_free(recvcounts);
1005 xbt_free(senddisps);
1006 xbt_free(recvdisps);
1009 void smpi_replay_init(int *argc, char***argv){
1010 smpi_process_init(argc, argv);
1011 smpi_process_mark_as_initialized();
1012 smpi_process_set_replaying(1);
1014 int rank = smpi_process_index();
1015 TRACE_smpi_init(rank);
1016 TRACE_smpi_computing_init(rank);
1017 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1018 extra->type = TRACING_INIT;
1019 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1020 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1022 if (!smpi_process_index()){
1023 _xbt_replay_action_init();
1024 xbt_replay_action_register("init", action_init);
1025 xbt_replay_action_register("finalize", action_finalize);
1026 xbt_replay_action_register("comm_size", action_comm_size);
1027 xbt_replay_action_register("comm_split", action_comm_split);
1028 xbt_replay_action_register("comm_dup", action_comm_dup);
1029 xbt_replay_action_register("send", action_send);
1030 xbt_replay_action_register("Isend", action_Isend);
1031 xbt_replay_action_register("recv", action_recv);
1032 xbt_replay_action_register("Irecv", action_Irecv);
1033 xbt_replay_action_register("test", action_test);
1034 xbt_replay_action_register("wait", action_wait);
1035 xbt_replay_action_register("waitAll", action_waitall);
1036 xbt_replay_action_register("barrier", action_barrier);
1037 xbt_replay_action_register("bcast", action_bcast);
1038 xbt_replay_action_register("reduce", action_reduce);
1039 xbt_replay_action_register("allReduce", action_allReduce);
1040 xbt_replay_action_register("allToAll", action_allToAll);
1041 xbt_replay_action_register("allToAllV", action_allToAllv);
1042 xbt_replay_action_register("gather", action_gather);
1043 xbt_replay_action_register("gatherV", action_gatherv);
1044 xbt_replay_action_register("allGather", action_allgather);
1045 xbt_replay_action_register("allGatherV", action_allgatherv);
1046 xbt_replay_action_register("reduceScatter", action_reducescatter);
1047 xbt_replay_action_register("compute", action_compute);
1050 //if we have a delayed start, sleep here.
1053 double value = strtod((*argv)[2], &endptr);
1054 if (*endptr != '\0')
1055 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1056 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1057 smpi_execute_flops(value);
1059 xbt_replay_action_runner(*argc, *argv);
1062 int smpi_replay_finalize(){
1063 double sim_time= 1.;
1064 /* One active process will stop. Decrease the counter*/
1065 XBT_DEBUG("There are %lu elements in reqq[*]",
1066 xbt_dynar_length(reqq[smpi_process_index()]));
1067 if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1068 int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1069 MPI_Request requests[count_requests];
1070 MPI_Status status[count_requests];
1073 xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1074 smpi_mpi_waitall(count_requests, requests, status);
1080 if(!active_processes){
1081 /* Last process alive speaking */
1082 /* end the simulated timer */
1083 sim_time = smpi_process_simulated_elapsed();
1087 xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1089 if(!active_processes){
1090 XBT_INFO("Simulation time %f", sim_time);
1091 _xbt_replay_action_exit();
1092 xbt_free(sendbuffer);
1093 xbt_free(recvbuffer);
1099 int rank = smpi_process_index();
1100 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1101 extra->type = TRACING_FINALIZE;
1102 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1104 smpi_process_finalize();
1106 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1107 TRACE_smpi_finalize(smpi_process_index());
1108 smpi_process_destroy();