1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
28 static void log_timed_action (const char *const *action, double clock){
29 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30 char *name = xbt_str_join_array(action, " ");
31 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static xbt_dynar_t get_reqq_self(){
40 asprintf(&key, "%d", smpi_process_index());
41 xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
44 return dynar_mpi_request;
47 static void set_reqq_self(xbt_dynar_t mpi_request){
50 asprintf(&key, "%d", smpi_process_index());
51 xbt_dict_set(reqq, key, mpi_request, free);
56 //allocate a single buffer for all sends, growing it if needed
57 void* smpi_get_tmp_sendbuffer(int size){
58 if (!smpi_process_get_replaying())
59 return xbt_malloc(size);
60 if (sendbuffer_size<size){
61 sendbuffer=xbt_realloc(sendbuffer,size);
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68 if (!smpi_process_get_replaying())
69 return xbt_malloc(size);
70 if (recvbuffer_size<size){
71 recvbuffer=xbt_realloc(recvbuffer,size);
77 void smpi_free_tmp_buffer(void* buf){
78 if (!smpi_process_get_replaying())
83 static double parse_double(const char *string)
87 value = strtod(string, &endptr);
89 THROWF(unknown_error, 0, "%s is not a double", string);
93 static MPI_Datatype decode_datatype(const char *const action)
95 // Declared datatypes,
100 MPI_CURRENT_TYPE=MPI_DOUBLE;
103 MPI_CURRENT_TYPE=MPI_INT;
106 MPI_CURRENT_TYPE=MPI_CHAR;
109 MPI_CURRENT_TYPE=MPI_SHORT;
112 MPI_CURRENT_TYPE=MPI_LONG;
115 MPI_CURRENT_TYPE=MPI_FLOAT;
118 MPI_CURRENT_TYPE=MPI_BYTE;
121 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
124 return MPI_CURRENT_TYPE;
128 const char* encode_datatype(MPI_Datatype datatype, int* known)
131 //default type for output is set to MPI_BYTE
132 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
134 if (datatype==MPI_BYTE){
137 if(datatype==MPI_DOUBLE)
139 if(datatype==MPI_INT)
141 if(datatype==MPI_CHAR)
143 if(datatype==MPI_SHORT)
145 if(datatype==MPI_LONG)
147 if(datatype==MPI_FLOAT)
149 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
151 // default - not implemented.
152 // do not warn here as we pass in this function even for other trace formats
156 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
158 while(action[i]!=NULL)\
161 THROWF(arg_error, 0, "%s replay failed.\n" \
162 "%d items were given on the line. First two should be process_id and action. " \
163 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
164 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
168 static void action_init(const char *const *action)
171 XBT_DEBUG("Initialize the counters");
172 CHECK_ACTION_PARAMS(action, 0, 1);
173 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
174 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
176 /* start a simulated timer */
177 smpi_process_simulated_start();
178 /*initialize the number of active processes */
179 active_processes = smpi_process_count();
182 reqq = xbt_dict_new();
185 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
188 reqq=xbt_new0(xbt_dynar_t,active_processes);
190 for(i=0;i<active_processes;i++){
191 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
197 static void action_finalize(const char *const *action)
201 static void action_comm_size(const char *const *action)
203 double clock = smpi_process_simulated_elapsed();
205 communicator_size = parse_double(action[2]);
206 log_timed_action (action, clock);
209 static void action_comm_split(const char *const *action)
211 double clock = smpi_process_simulated_elapsed();
213 log_timed_action (action, clock);
216 static void action_comm_dup(const char *const *action)
218 double clock = smpi_process_simulated_elapsed();
220 log_timed_action (action, clock);
223 static void action_compute(const char *const *action)
225 CHECK_ACTION_PARAMS(action, 1, 0);
226 double clock = smpi_process_simulated_elapsed();
227 double flops= parse_double(action[2]);
228 int rank = smpi_process_index();
229 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
230 extra->type=TRACING_COMPUTING;
231 extra->comp_size=flops;
232 TRACE_smpi_computing_in(rank, extra);
234 smpi_execute_flops(flops);
236 TRACE_smpi_computing_out(rank);
237 log_timed_action (action, clock);
240 static void action_send(const char *const *action)
242 CHECK_ACTION_PARAMS(action, 2, 1);
243 int to = atoi(action[2]);
244 double size=parse_double(action[3]);
245 double clock = smpi_process_simulated_elapsed();
248 MPI_CURRENT_TYPE=decode_datatype(action[4]);
250 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
253 int rank = smpi_process_index();
255 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
256 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
257 extra->type = TRACING_SEND;
258 extra->send_size = size;
260 extra->dst = dst_traced;
261 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
262 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
263 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
265 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
267 log_timed_action (action, clock);
269 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
272 static void action_Isend(const char *const *action)
274 CHECK_ACTION_PARAMS(action, 2, 1);
275 int to = atoi(action[2]);
276 double size=parse_double(action[3]);
277 double clock = smpi_process_simulated_elapsed();
280 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
281 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
283 int rank = smpi_process_index();
284 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
285 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
286 extra->type = TRACING_ISEND;
287 extra->send_size = size;
289 extra->dst = dst_traced;
290 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
291 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
292 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
294 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
296 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
299 xbt_dynar_push(get_reqq_self(),&request);
301 log_timed_action (action, clock);
304 static void action_recv(const char *const *action) {
305 CHECK_ACTION_PARAMS(action, 2, 1);
306 int from = atoi(action[2]);
307 double size=parse_double(action[3]);
308 double clock = smpi_process_simulated_elapsed();
311 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
312 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
314 int rank = smpi_process_index();
315 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
317 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
318 extra->type = TRACING_RECV;
319 extra->send_size = size;
320 extra->src = src_traced;
322 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
323 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
325 //unknow size from the receiver pov
327 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
331 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
333 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
334 TRACE_smpi_recv(rank, src_traced, rank);
336 log_timed_action (action, clock);
339 static void action_Irecv(const char *const *action)
341 CHECK_ACTION_PARAMS(action, 2, 1);
342 int from = atoi(action[2]);
343 double size=parse_double(action[3]);
344 double clock = smpi_process_simulated_elapsed();
347 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
348 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
350 int rank = smpi_process_index();
351 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
352 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
353 extra->type = TRACING_IRECV;
354 extra->send_size = size;
355 extra->src = src_traced;
357 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
358 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
360 //unknow size from the receiver pov
362 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
366 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
368 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
370 xbt_dynar_push(get_reqq_self(),&request);
372 log_timed_action (action, clock);
375 static void action_test(const char *const *action){
376 CHECK_ACTION_PARAMS(action, 0, 0);
377 double clock = smpi_process_simulated_elapsed();
382 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
383 //if request is null here, this may mean that a previous test has succeeded
384 //Different times in traced application and replayed version may lead to this
385 //In this case, ignore the extra calls.
387 int rank = smpi_process_index();
388 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
389 extra->type=TRACING_TEST;
390 TRACE_smpi_testing_in(rank, extra);
392 flag = smpi_mpi_test(&request, &status);
394 XBT_DEBUG("MPI_Test result: %d", flag);
395 /* push back request in dynar to be caught by a subsequent wait. if the test
396 * did succeed, the request is now NULL.
398 xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
400 TRACE_smpi_testing_out(rank);
402 log_timed_action (action, clock);
405 static void action_wait(const char *const *action){
406 CHECK_ACTION_PARAMS(action, 0, 0);
407 double clock = smpi_process_simulated_elapsed();
411 xbt_assert(xbt_dynar_length(get_reqq_self()),
412 "action wait not preceded by any irecv or isend: %s",
413 xbt_str_join_array(action," "));
414 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
417 /* Assuming that the trace is well formed, this mean the comm might have
418 * been caught by a MPI_test. Then just return.
423 int rank = request->comm != MPI_COMM_NULL
424 ? smpi_comm_rank(request->comm)
427 MPI_Group group = smpi_comm_group(request->comm);
428 int src_traced = smpi_group_rank(group, request->src);
429 int dst_traced = smpi_group_rank(group, request->dst);
430 int is_wait_for_receive = request->recv;
431 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
432 extra->type = TRACING_WAIT;
433 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
435 smpi_mpi_wait(&request, &status);
437 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
438 if (is_wait_for_receive)
439 TRACE_smpi_recv(rank, src_traced, dst_traced);
440 log_timed_action (action, clock);
443 static void action_waitall(const char *const *action){
444 CHECK_ACTION_PARAMS(action, 0, 0);
445 double clock = smpi_process_simulated_elapsed();
446 int count_requests=0;
449 count_requests=xbt_dynar_length(get_reqq_self());
451 if (count_requests>0) {
452 MPI_Request requests[count_requests];
453 MPI_Status status[count_requests];
455 /* The reqq is an array of dynars. Its index corresponds to the rank.
456 Thus each rank saves its own requests to the array request. */
457 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
459 //save information from requests
461 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
462 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
463 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
464 for (i = 0; i < count_requests; i++) {
466 int *asrc = xbt_new(int, 1);
467 int *adst = xbt_new(int, 1);
468 int *arecv = xbt_new(int, 1);
469 *asrc = requests[i]->src;
470 *adst = requests[i]->dst;
471 *arecv = requests[i]->recv;
472 xbt_dynar_insert_at(srcs, i, asrc);
473 xbt_dynar_insert_at(dsts, i, adst);
474 xbt_dynar_insert_at(recvs, i, arecv);
479 int *t = xbt_new(int, 1);
480 xbt_dynar_insert_at(srcs, i, t);
481 xbt_dynar_insert_at(dsts, i, t);
482 xbt_dynar_insert_at(recvs, i, t);
486 int rank_traced = smpi_process_index();
487 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
488 extra->type = TRACING_WAITALL;
489 extra->send_size=count_requests;
490 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
492 smpi_mpi_waitall(count_requests, requests, status);
494 for (i = 0; i < count_requests; i++) {
495 int src_traced, dst_traced, is_wait_for_receive;
496 xbt_dynar_get_cpy(srcs, i, &src_traced);
497 xbt_dynar_get_cpy(dsts, i, &dst_traced);
498 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
499 if (is_wait_for_receive) {
500 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
503 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
505 xbt_dynar_free(&srcs);
506 xbt_dynar_free(&dsts);
507 xbt_dynar_free(&recvs);
509 //TODO xbt_dynar_free_container(get_reqq_self());
510 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
512 log_timed_action (action, clock);
515 static void action_barrier(const char *const *action){
516 double clock = smpi_process_simulated_elapsed();
517 int rank = smpi_process_index();
518 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
519 extra->type = TRACING_BARRIER;
520 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
522 mpi_coll_barrier_fun(MPI_COMM_WORLD);
524 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
525 log_timed_action (action, clock);
529 static void action_bcast(const char *const *action)
531 CHECK_ACTION_PARAMS(action, 1, 2);
532 double size = parse_double(action[2]);
533 double clock = smpi_process_simulated_elapsed();
536 * Initialize MPI_CURRENT_TYPE in order to decrease
537 * the number of the checks
539 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
542 root= atoi(action[3]);
544 MPI_CURRENT_TYPE=decode_datatype(action[4]);
548 int rank = smpi_process_index();
549 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
551 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
552 extra->type = TRACING_BCAST;
553 extra->send_size = size;
554 extra->root = root_traced;
555 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
556 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
557 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
559 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
561 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
562 log_timed_action (action, clock);
565 static void action_reduce(const char *const *action)
567 CHECK_ACTION_PARAMS(action, 2, 2);
568 double comm_size = parse_double(action[2]);
569 double comp_size = parse_double(action[3]);
570 double clock = smpi_process_simulated_elapsed();
572 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
575 root= atoi(action[4]);
577 MPI_CURRENT_TYPE=decode_datatype(action[5]);
583 int rank = smpi_process_index();
584 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
585 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
586 extra->type = TRACING_REDUCE;
587 extra->send_size = comm_size;
588 extra->comp_size = comp_size;
589 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
590 extra->root = root_traced;
592 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
594 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
595 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
596 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
597 smpi_execute_flops(comp_size);
599 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
600 log_timed_action (action, clock);
603 static void action_allReduce(const char *const *action) {
604 CHECK_ACTION_PARAMS(action, 2, 1);
605 double comm_size = parse_double(action[2]);
606 double comp_size = parse_double(action[3]);
608 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
609 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
611 double clock = smpi_process_simulated_elapsed();
612 int rank = smpi_process_index();
613 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
614 extra->type = TRACING_ALLREDUCE;
615 extra->send_size = comm_size;
616 extra->comp_size = comp_size;
617 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
618 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
620 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
621 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
622 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
623 smpi_execute_flops(comp_size);
625 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
626 log_timed_action (action, clock);
629 static void action_allToAll(const char *const *action) {
630 double clock = smpi_process_simulated_elapsed();
631 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
632 int send_size = parse_double(action[2]);
633 int recv_size = parse_double(action[3]);
634 MPI_Datatype MPI_CURRENT_TYPE2;
637 MPI_CURRENT_TYPE=decode_datatype(action[4]);
638 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
641 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
642 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
644 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
645 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
647 int rank = smpi_process_index();
648 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
649 extra->type = TRACING_ALLTOALL;
650 extra->send_size = send_size;
651 extra->recv_size = recv_size;
652 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
653 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
655 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
657 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
659 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
660 log_timed_action (action, clock);
664 static void action_gather(const char *const *action) {
666 The structure of the gather action for the rank 0 (total 4 processes)
671 1) 68 is the sendcounts
672 2) 68 is the recvcounts
673 3) 0 is the root node
674 4) 0 is the send datatype id, see decode_datatype()
675 5) 0 is the recv datatype id, see decode_datatype()
677 CHECK_ACTION_PARAMS(action, 2, 3);
678 double clock = smpi_process_simulated_elapsed();
679 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
680 int send_size = parse_double(action[2]);
681 int recv_size = parse_double(action[3]);
682 MPI_Datatype MPI_CURRENT_TYPE2;
684 MPI_CURRENT_TYPE=decode_datatype(action[5]);
685 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
687 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
688 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
690 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
694 root=atoi(action[4]);
695 int rank = smpi_comm_rank(MPI_COMM_WORLD);
698 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
700 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
701 extra->type = TRACING_GATHER;
702 extra->send_size = send_size;
703 extra->recv_size = recv_size;
705 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
706 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
708 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
710 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
711 recv, recv_size, MPI_CURRENT_TYPE2,
712 root, MPI_COMM_WORLD);
714 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
715 log_timed_action (action, clock);
720 static void action_gatherv(const char *const *action) {
722 The structure of the gatherv action for the rank 0 (total 4 processes)
724 0 gather 68 68 10 10 10 0 0 0
727 1) 68 is the sendcount
728 2) 68 10 10 10 is the recvcounts
729 3) 0 is the root node
730 4) 0 is the send datatype id, see decode_datatype()
731 5) 0 is the recv datatype id, see decode_datatype()
734 double clock = smpi_process_simulated_elapsed();
735 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
736 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
737 int send_size = parse_double(action[2]);
738 int *disps = xbt_new0(int, comm_size);
739 int *recvcounts = xbt_new0(int, comm_size);
742 MPI_Datatype MPI_CURRENT_TYPE2;
743 if(action[4+comm_size]) {
744 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
745 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
747 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
748 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
750 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
752 for(i=0;i<comm_size;i++) {
753 recvcounts[i] = atoi(action[i+3]);
754 recv_sum=recv_sum+recvcounts[i];
758 int root=atoi(action[3+comm_size]);
759 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
762 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
764 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
765 extra->type = TRACING_GATHERV;
766 extra->send_size = send_size;
767 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
768 for(i=0; i< comm_size; i++)//copy data to avoid bad free
769 extra->recvcounts[i] = recvcounts[i];
771 extra->num_processes = comm_size;
772 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
773 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
775 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
777 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
778 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
779 root, MPI_COMM_WORLD);
781 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
782 log_timed_action (action, clock);
783 xbt_free(recvcounts);
787 static void action_reducescatter(const char *const *action) {
790 The structure of the reducescatter action for the rank 0 (total 4 processes)
792 0 reduceScatter 275427 275427 275427 204020 11346849 0
795 1) The first four values after the name of the action declare the recvcounts array
796 2) The value 11346849 is the amount of instructions
797 3) The last value corresponds to the datatype, see decode_datatype().
799 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
803 double clock = smpi_process_simulated_elapsed();
804 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
805 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
806 int comp_size = parse_double(action[2+comm_size]);
807 int *recvcounts = xbt_new0(int, comm_size);
808 int *disps = xbt_new0(int, comm_size);
810 int rank = smpi_process_index();
812 if(action[3+comm_size])
813 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
815 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
817 for(i=0;i<comm_size;i++) {
818 recvcounts[i] = atoi(action[i+2]);
823 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
824 extra->type = TRACING_REDUCE_SCATTER;
825 extra->send_size = 0;
826 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
827 for(i=0; i< comm_size; i++)//copy data to avoid bad free
828 extra->recvcounts[i] = recvcounts[i];
829 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
830 extra->comp_size = comp_size;
831 extra->num_processes = comm_size;
833 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
835 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
836 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
838 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
840 smpi_execute_flops(comp_size);
843 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
844 xbt_free(recvcounts);
846 log_timed_action (action, clock);
849 static void action_allgather(const char *const *action) {
851 The structure of the allgather action for the rank 0 (total 4 processes)
853 0 allGather 275427 275427
856 1) 275427 is the sendcount
857 2) 275427 is the recvcount
858 3) No more values mean that the datatype for sent and receive buffer
859 is the default one, see decode_datatype().
863 double clock = smpi_process_simulated_elapsed();
865 CHECK_ACTION_PARAMS(action, 2, 2);
866 int sendcount=atoi(action[2]);
867 int recvcount=atoi(action[3]);
869 MPI_Datatype MPI_CURRENT_TYPE2;
872 MPI_CURRENT_TYPE = decode_datatype(action[3]);
873 MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
875 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
876 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
878 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
879 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
881 int rank = smpi_process_index();
882 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
883 extra->type = TRACING_ALLGATHER;
884 extra->send_size = sendcount;
885 extra->recv_size= recvcount;
886 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
887 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
888 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
890 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
892 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
894 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
895 log_timed_action (action, clock);
898 static void action_allgatherv(const char *const *action) {
901 The structure of the allgatherv action for the rank 0 (total 4 processes)
903 0 allGatherV 275427 275427 275427 275427 204020
906 1) 275427 is the sendcount
907 2) The next four elements declare the recvcounts array
908 3) No more values mean that the datatype for sent and receive buffer
909 is the default one, see decode_datatype().
913 double clock = smpi_process_simulated_elapsed();
915 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
916 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
918 int sendcount=atoi(action[2]);
919 int *recvcounts = xbt_new0(int, comm_size);
920 int *disps = xbt_new0(int, comm_size);
922 MPI_Datatype MPI_CURRENT_TYPE2;
924 if(action[3+comm_size]) {
925 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
926 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
928 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
929 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
931 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
933 for(i=0;i<comm_size;i++) {
934 recvcounts[i] = atoi(action[i+3]);
935 recv_sum=recv_sum+recvcounts[i];
937 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
939 int rank = smpi_process_index();
940 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
941 extra->type = TRACING_ALLGATHERV;
942 extra->send_size = sendcount;
943 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
944 for(i=0; i< comm_size; i++)//copy data to avoid bad free
945 extra->recvcounts[i] = recvcounts[i];
946 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
947 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
948 extra->num_processes = comm_size;
950 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
952 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
954 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
955 log_timed_action (action, clock);
956 xbt_free(recvcounts);
960 static void action_allToAllv(const char *const *action) {
962 The structure of the allToAllV action for the rank 0 (total 4 processes)
964 0 allToAllV 100 1 7 10 12 100 1 70 10 5
967 1) 100 is the size of the send buffer *sizeof(int),
968 2) 1 7 10 12 is the sendcounts array
969 3) 100*sizeof(int) is the size of the receiver buffer
970 4) 1 70 10 5 is the recvcounts array
975 double clock = smpi_process_simulated_elapsed();
977 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
978 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
979 int send_buf_size=0,recv_buf_size=0,i=0;
980 int *sendcounts = xbt_new0(int, comm_size);
981 int *recvcounts = xbt_new0(int, comm_size);
982 int *senddisps = xbt_new0(int, comm_size);
983 int *recvdisps = xbt_new0(int, comm_size);
985 MPI_Datatype MPI_CURRENT_TYPE2;
987 send_buf_size=parse_double(action[2]);
988 recv_buf_size=parse_double(action[3+comm_size]);
989 if(action[4+2*comm_size]) {
990 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
991 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
994 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
995 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
998 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
999 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1001 for(i=0;i<comm_size;i++) {
1002 sendcounts[i] = atoi(action[i+3]);
1003 recvcounts[i] = atoi(action[i+4+comm_size]);
1007 int rank = smpi_process_index();
1008 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1009 extra->type = TRACING_ALLTOALLV;
1010 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1011 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1012 extra->num_processes = comm_size;
1014 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1015 extra->send_size += sendcounts[i];
1016 extra->sendcounts[i] = sendcounts[i];
1017 extra->recv_size += recvcounts[i];
1018 extra->recvcounts[i] = recvcounts[i];
1020 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1021 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1023 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1025 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1026 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1029 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1030 log_timed_action (action, clock);
1031 xbt_free(sendcounts);
1032 xbt_free(recvcounts);
1033 xbt_free(senddisps);
1034 xbt_free(recvdisps);
1037 void smpi_replay_init(int *argc, char***argv){
1038 smpi_process_init(argc, argv);
1039 smpi_process_mark_as_initialized();
1040 smpi_process_set_replaying(1);
1042 int rank = smpi_process_index();
1043 TRACE_smpi_init(rank);
1044 TRACE_smpi_computing_init(rank);
1045 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1046 extra->type = TRACING_INIT;
1047 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1048 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1051 _xbt_replay_action_init();
1052 xbt_replay_action_register("init", action_init);
1053 xbt_replay_action_register("finalize", action_finalize);
1054 xbt_replay_action_register("comm_size", action_comm_size);
1055 xbt_replay_action_register("comm_split", action_comm_split);
1056 xbt_replay_action_register("comm_dup", action_comm_dup);
1057 xbt_replay_action_register("send", action_send);
1058 xbt_replay_action_register("Isend", action_Isend);
1059 xbt_replay_action_register("recv", action_recv);
1060 xbt_replay_action_register("Irecv", action_Irecv);
1061 xbt_replay_action_register("test", action_test);
1062 xbt_replay_action_register("wait", action_wait);
1063 xbt_replay_action_register("waitAll", action_waitall);
1064 xbt_replay_action_register("barrier", action_barrier);
1065 xbt_replay_action_register("bcast", action_bcast);
1066 xbt_replay_action_register("reduce", action_reduce);
1067 xbt_replay_action_register("allReduce", action_allReduce);
1068 xbt_replay_action_register("allToAll", action_allToAll);
1069 xbt_replay_action_register("allToAllV", action_allToAllv);
1070 xbt_replay_action_register("gather", action_gather);
1071 xbt_replay_action_register("gatherV", action_gatherv);
1072 xbt_replay_action_register("allGather", action_allgather);
1073 xbt_replay_action_register("allGatherV", action_allgatherv);
1074 xbt_replay_action_register("reduceScatter", action_reducescatter);
1075 xbt_replay_action_register("compute", action_compute);
1078 //if we have a delayed start, sleep here.
1081 double value = strtod((*argv)[2], &endptr);
1082 if (*endptr != '\0')
1083 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1084 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1085 smpi_execute_flops(value);
1087 //UGLY done to force context switch to be sure that all MSG_processes begin initialization
1088 XBT_VERB("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
1089 smpi_execute_flops(0.0);
1092 xbt_replay_action_runner(*argc, *argv);
1095 int smpi_replay_finalize(){
1096 double sim_time= 1.;
1097 /* One active process will stop. Decrease the counter*/
1098 XBT_DEBUG("There are %lu elements in reqq[*]",
1099 xbt_dynar_length(get_reqq_self()));
1100 if (!xbt_dynar_is_empty(get_reqq_self())){
1101 int count_requests=xbt_dynar_length(get_reqq_self());
1102 MPI_Request requests[count_requests];
1103 MPI_Status status[count_requests];
1106 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1107 smpi_mpi_waitall(count_requests, requests, status);
1113 if(!active_processes){
1114 /* Last process alive speaking */
1115 /* end the simulated timer */
1116 sim_time = smpi_process_simulated_elapsed();
1120 //TODO xbt_dynar_free_container(get_reqq_self()));
1122 if(!active_processes){
1123 XBT_INFO("Simulation time %f", sim_time);
1124 _xbt_replay_action_exit();
1125 xbt_free(sendbuffer);
1126 xbt_free(recvbuffer);
1128 xbt_dict_free(&reqq); //not need, data have been freed ???
1133 int rank = smpi_process_index();
1134 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1135 extra->type = TRACING_FINALIZE;
1136 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1138 smpi_process_finalize();
1140 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1141 TRACE_smpi_finalize(smpi_process_index());
1142 smpi_process_destroy();