1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
28 static void log_timed_action (const char *const *action, double clock){
29 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30 char *name = xbt_str_join_array(action, " ");
31 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static xbt_dynar_t get_reqq_self(){
40 int size = asprintf(&key, "%d", smpi_process_index());
42 xbt_die("could not allocate memory for asprintf");
43 xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
46 return dynar_mpi_request;
49 static void set_reqq_self(xbt_dynar_t mpi_request){
52 int size = asprintf(&key, "%d", smpi_process_index());
54 xbt_die("could not allocate memory for asprintf");
55 xbt_dict_set(reqq, key, mpi_request, free);
60 //allocate a single buffer for all sends, growing it if needed
61 void* smpi_get_tmp_sendbuffer(int size){
62 if (!smpi_process_get_replaying())
63 return xbt_malloc(size);
64 if (sendbuffer_size<size){
65 sendbuffer=xbt_realloc(sendbuffer,size);
70 //allocate a single buffer for all recv
71 void* smpi_get_tmp_recvbuffer(int size){
72 if (!smpi_process_get_replaying())
73 return xbt_malloc(size);
74 if (recvbuffer_size<size){
75 recvbuffer=xbt_realloc(recvbuffer,size);
81 void smpi_free_tmp_buffer(void* buf){
82 if (!smpi_process_get_replaying())
87 static double parse_double(const char *string)
91 value = strtod(string, &endptr);
93 THROWF(unknown_error, 0, "%s is not a double", string);
97 static MPI_Datatype decode_datatype(const char *const action)
99 // Declared datatypes,
104 MPI_CURRENT_TYPE=MPI_DOUBLE;
107 MPI_CURRENT_TYPE=MPI_INT;
110 MPI_CURRENT_TYPE=MPI_CHAR;
113 MPI_CURRENT_TYPE=MPI_SHORT;
116 MPI_CURRENT_TYPE=MPI_LONG;
119 MPI_CURRENT_TYPE=MPI_FLOAT;
122 MPI_CURRENT_TYPE=MPI_BYTE;
125 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
128 return MPI_CURRENT_TYPE;
132 const char* encode_datatype(MPI_Datatype datatype, int* known)
135 //default type for output is set to MPI_BYTE
136 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
138 if (datatype==MPI_BYTE){
141 if(datatype==MPI_DOUBLE)
143 if(datatype==MPI_INT)
145 if(datatype==MPI_CHAR)
147 if(datatype==MPI_SHORT)
149 if(datatype==MPI_LONG)
151 if(datatype==MPI_FLOAT)
153 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
155 // default - not implemented.
156 // do not warn here as we pass in this function even for other trace formats
160 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
162 while(action[i]!=NULL)\
165 THROWF(arg_error, 0, "%s replay failed.\n" \
166 "%d items were given on the line. First two should be process_id and action. " \
167 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
168 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
172 static void action_init(const char *const *action)
174 XBT_DEBUG("Initialize the counters");
175 CHECK_ACTION_PARAMS(action, 0, 1);
176 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
177 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
179 /* start a simulated timer */
180 smpi_process_simulated_start();
181 /*initialize the number of active processes */
182 active_processes = smpi_process_count();
185 reqq = xbt_dict_new();
188 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
191 reqq=xbt_new0(xbt_dynar_t,active_processes);
193 for(i=0;i<active_processes;i++){
194 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
200 static void action_finalize(const char *const *action)
204 static void action_comm_size(const char *const *action)
206 double clock = smpi_process_simulated_elapsed();
208 communicator_size = parse_double(action[2]);
209 log_timed_action (action, clock);
212 static void action_comm_split(const char *const *action)
214 double clock = smpi_process_simulated_elapsed();
216 log_timed_action (action, clock);
219 static void action_comm_dup(const char *const *action)
221 double clock = smpi_process_simulated_elapsed();
223 log_timed_action (action, clock);
226 static void action_compute(const char *const *action)
228 CHECK_ACTION_PARAMS(action, 1, 0);
229 double clock = smpi_process_simulated_elapsed();
230 double flops= parse_double(action[2]);
231 int rank = smpi_process_index();
232 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
233 extra->type=TRACING_COMPUTING;
234 extra->comp_size=flops;
235 TRACE_smpi_computing_in(rank, extra);
237 smpi_execute_flops(flops);
239 TRACE_smpi_computing_out(rank);
240 log_timed_action (action, clock);
243 static void action_send(const char *const *action)
245 CHECK_ACTION_PARAMS(action, 2, 1);
246 int to = atoi(action[2]);
247 double size=parse_double(action[3]);
248 double clock = smpi_process_simulated_elapsed();
251 MPI_CURRENT_TYPE=decode_datatype(action[4]);
253 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
256 int rank = smpi_process_index();
258 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
259 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
260 extra->type = TRACING_SEND;
261 extra->send_size = size;
263 extra->dst = dst_traced;
264 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
265 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
266 if (!TRACE_smpi_view_internals()) {
267 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
270 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
272 log_timed_action (action, clock);
274 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
277 static void action_Isend(const char *const *action)
279 CHECK_ACTION_PARAMS(action, 2, 1);
280 int to = atoi(action[2]);
281 double size=parse_double(action[3]);
282 double clock = smpi_process_simulated_elapsed();
285 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
286 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
288 int rank = smpi_process_index();
289 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
290 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291 extra->type = TRACING_ISEND;
292 extra->send_size = size;
294 extra->dst = dst_traced;
295 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
296 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
297 if (!TRACE_smpi_view_internals()) {
298 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
301 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
303 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
306 xbt_dynar_push(get_reqq_self(),&request);
308 log_timed_action (action, clock);
311 static void action_recv(const char *const *action) {
312 CHECK_ACTION_PARAMS(action, 2, 1);
313 int from = atoi(action[2]);
314 double size=parse_double(action[3]);
315 double clock = smpi_process_simulated_elapsed();
318 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
319 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
321 int rank = smpi_process_index();
322 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
324 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
325 extra->type = TRACING_RECV;
326 extra->send_size = size;
327 extra->src = src_traced;
329 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
330 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
332 //unknow size from the receiver pov
334 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
338 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
340 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
341 if (!TRACE_smpi_view_internals()) {
342 TRACE_smpi_recv(rank, src_traced, rank);
345 log_timed_action (action, clock);
348 static void action_Irecv(const char *const *action)
350 CHECK_ACTION_PARAMS(action, 2, 1);
351 int from = atoi(action[2]);
352 double size=parse_double(action[3]);
353 double clock = smpi_process_simulated_elapsed();
356 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
357 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
359 int rank = smpi_process_index();
360 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
361 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
362 extra->type = TRACING_IRECV;
363 extra->send_size = size;
364 extra->src = src_traced;
366 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
367 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
369 //unknow size from the receiver pov
371 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
375 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
377 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
379 xbt_dynar_push(get_reqq_self(),&request);
381 log_timed_action (action, clock);
384 static void action_test(const char *const *action){
385 CHECK_ACTION_PARAMS(action, 0, 0);
386 double clock = smpi_process_simulated_elapsed();
391 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
392 //if request is null here, this may mean that a previous test has succeeded
393 //Different times in traced application and replayed version may lead to this
394 //In this case, ignore the extra calls.
396 int rank = smpi_process_index();
397 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
398 extra->type=TRACING_TEST;
399 TRACE_smpi_testing_in(rank, extra);
401 flag = smpi_mpi_test(&request, &status);
403 XBT_DEBUG("MPI_Test result: %d", flag);
404 /* push back request in dynar to be caught by a subsequent wait. if the test
405 * did succeed, the request is now NULL.
407 xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
409 TRACE_smpi_testing_out(rank);
411 log_timed_action (action, clock);
414 static void action_wait(const char *const *action){
415 CHECK_ACTION_PARAMS(action, 0, 0);
416 double clock = smpi_process_simulated_elapsed();
420 xbt_assert(xbt_dynar_length(get_reqq_self()),
421 "action wait not preceded by any irecv or isend: %s",
422 xbt_str_join_array(action," "));
423 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
426 /* Assuming that the trace is well formed, this mean the comm might have
427 * been caught by a MPI_test. Then just return.
432 int rank = request->comm != MPI_COMM_NULL
433 ? smpi_comm_rank(request->comm)
436 MPI_Group group = smpi_comm_group(request->comm);
437 int src_traced = smpi_group_rank(group, request->src);
438 int dst_traced = smpi_group_rank(group, request->dst);
439 int is_wait_for_receive = request->recv;
440 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
441 extra->type = TRACING_WAIT;
442 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
444 smpi_mpi_wait(&request, &status);
446 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
447 if (is_wait_for_receive)
448 TRACE_smpi_recv(rank, src_traced, dst_traced);
449 log_timed_action (action, clock);
452 static void action_waitall(const char *const *action){
453 CHECK_ACTION_PARAMS(action, 0, 0);
454 double clock = smpi_process_simulated_elapsed();
455 int count_requests=0;
458 count_requests=xbt_dynar_length(get_reqq_self());
460 if (count_requests>0) {
461 MPI_Request requests[count_requests];
462 MPI_Status status[count_requests];
464 /* The reqq is an array of dynars. Its index corresponds to the rank.
465 Thus each rank saves its own requests to the array request. */
466 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
468 //save information from requests
470 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
471 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
472 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
473 for (i = 0; i < count_requests; i++) {
475 int *asrc = xbt_new(int, 1);
476 int *adst = xbt_new(int, 1);
477 int *arecv = xbt_new(int, 1);
478 *asrc = requests[i]->src;
479 *adst = requests[i]->dst;
480 *arecv = requests[i]->recv;
481 xbt_dynar_insert_at(srcs, i, asrc);
482 xbt_dynar_insert_at(dsts, i, adst);
483 xbt_dynar_insert_at(recvs, i, arecv);
488 int *t = xbt_new(int, 1);
489 xbt_dynar_insert_at(srcs, i, t);
490 xbt_dynar_insert_at(dsts, i, t);
491 xbt_dynar_insert_at(recvs, i, t);
495 int rank_traced = smpi_process_index();
496 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
497 extra->type = TRACING_WAITALL;
498 extra->send_size=count_requests;
499 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
501 smpi_mpi_waitall(count_requests, requests, status);
503 for (i = 0; i < count_requests; i++) {
504 int src_traced, dst_traced, is_wait_for_receive;
505 xbt_dynar_get_cpy(srcs, i, &src_traced);
506 xbt_dynar_get_cpy(dsts, i, &dst_traced);
507 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
508 if (is_wait_for_receive) {
509 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
512 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
514 xbt_dynar_free(&srcs);
515 xbt_dynar_free(&dsts);
516 xbt_dynar_free(&recvs);
518 //TODO xbt_dynar_free_container(get_reqq_self());
519 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
521 log_timed_action (action, clock);
524 static void action_barrier(const char *const *action){
525 double clock = smpi_process_simulated_elapsed();
526 int rank = smpi_process_index();
527 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
528 extra->type = TRACING_BARRIER;
529 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
531 mpi_coll_barrier_fun(MPI_COMM_WORLD);
533 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
534 log_timed_action (action, clock);
538 static void action_bcast(const char *const *action)
540 CHECK_ACTION_PARAMS(action, 1, 2);
541 double size = parse_double(action[2]);
542 double clock = smpi_process_simulated_elapsed();
545 * Initialize MPI_CURRENT_TYPE in order to decrease
546 * the number of the checks
548 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
551 root= atoi(action[3]);
553 MPI_CURRENT_TYPE=decode_datatype(action[4]);
557 int rank = smpi_process_index();
558 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
560 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
561 extra->type = TRACING_BCAST;
562 extra->send_size = size;
563 extra->root = root_traced;
564 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
565 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
566 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
568 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
570 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
571 log_timed_action (action, clock);
574 static void action_reduce(const char *const *action)
576 CHECK_ACTION_PARAMS(action, 2, 2);
577 double comm_size = parse_double(action[2]);
578 double comp_size = parse_double(action[3]);
579 double clock = smpi_process_simulated_elapsed();
581 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
584 root= atoi(action[4]);
586 MPI_CURRENT_TYPE=decode_datatype(action[5]);
592 int rank = smpi_process_index();
593 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
594 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
595 extra->type = TRACING_REDUCE;
596 extra->send_size = comm_size;
597 extra->comp_size = comp_size;
598 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
599 extra->root = root_traced;
601 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
603 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
604 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
605 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
606 smpi_execute_flops(comp_size);
608 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
609 log_timed_action (action, clock);
612 static void action_allReduce(const char *const *action) {
613 CHECK_ACTION_PARAMS(action, 2, 1);
614 double comm_size = parse_double(action[2]);
615 double comp_size = parse_double(action[3]);
617 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
618 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
620 double clock = smpi_process_simulated_elapsed();
621 int rank = smpi_process_index();
622 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
623 extra->type = TRACING_ALLREDUCE;
624 extra->send_size = comm_size;
625 extra->comp_size = comp_size;
626 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
627 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
629 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
630 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
631 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
632 smpi_execute_flops(comp_size);
634 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
635 log_timed_action (action, clock);
638 static void action_allToAll(const char *const *action) {
639 CHECK_ACTION_PARAMS(action, 2, 2); //two mandatory (send and recv volumes)
640 //two optional (corresponding datatypes)
641 double clock = smpi_process_simulated_elapsed();
642 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
643 int send_size = parse_double(action[2]);
644 int recv_size = parse_double(action[3]);
645 MPI_Datatype MPI_CURRENT_TYPE2;
647 if(action[4] && action[5]) {
648 MPI_CURRENT_TYPE=decode_datatype(action[4]);
649 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
652 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
653 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
656 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
657 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
659 int rank = smpi_process_index();
660 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
661 extra->type = TRACING_ALLTOALL;
662 extra->send_size = send_size;
663 extra->recv_size = recv_size;
664 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
665 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
667 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
669 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
671 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
672 log_timed_action (action, clock);
676 static void action_gather(const char *const *action) {
678 The structure of the gather action for the rank 0 (total 4 processes)
683 1) 68 is the sendcounts
684 2) 68 is the recvcounts
685 3) 0 is the root node
686 4) 0 is the send datatype id, see decode_datatype()
687 5) 0 is the recv datatype id, see decode_datatype()
689 CHECK_ACTION_PARAMS(action, 2, 3);
690 double clock = smpi_process_simulated_elapsed();
691 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
692 int send_size = parse_double(action[2]);
693 int recv_size = parse_double(action[3]);
694 MPI_Datatype MPI_CURRENT_TYPE2;
695 if(action[4] && action[5]) {
696 MPI_CURRENT_TYPE=decode_datatype(action[5]);
697 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
699 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
700 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
702 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
706 root=atoi(action[4]);
707 int rank = smpi_comm_rank(MPI_COMM_WORLD);
710 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
712 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
713 extra->type = TRACING_GATHER;
714 extra->send_size = send_size;
715 extra->recv_size = recv_size;
717 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
718 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
720 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
722 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
723 recv, recv_size, MPI_CURRENT_TYPE2,
724 root, MPI_COMM_WORLD);
726 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
727 log_timed_action (action, clock);
732 static void action_gatherv(const char *const *action) {
734 The structure of the gatherv action for the rank 0 (total 4 processes)
736 0 gather 68 68 10 10 10 0 0 0
739 1) 68 is the sendcount
740 2) 68 10 10 10 is the recvcounts
741 3) 0 is the root node
742 4) 0 is the send datatype id, see decode_datatype()
743 5) 0 is the recv datatype id, see decode_datatype()
746 double clock = smpi_process_simulated_elapsed();
747 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
748 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
749 int send_size = parse_double(action[2]);
750 int *disps = xbt_new0(int, comm_size);
751 int *recvcounts = xbt_new0(int, comm_size);
754 MPI_Datatype MPI_CURRENT_TYPE2;
755 if(action[4+comm_size] && action[5+comm_size]) {
756 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
757 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
759 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
760 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
762 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
764 for(i=0;i<comm_size;i++) {
765 recvcounts[i] = atoi(action[i+3]);
766 recv_sum=recv_sum+recvcounts[i];
770 int root=atoi(action[3+comm_size]);
771 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
774 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
776 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
777 extra->type = TRACING_GATHERV;
778 extra->send_size = send_size;
779 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
780 for(i=0; i< comm_size; i++)//copy data to avoid bad free
781 extra->recvcounts[i] = recvcounts[i];
783 extra->num_processes = comm_size;
784 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
785 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
787 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
789 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
790 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
791 root, MPI_COMM_WORLD);
793 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
794 log_timed_action (action, clock);
795 xbt_free(recvcounts);
799 static void action_reducescatter(const char *const *action) {
802 The structure of the reducescatter action for the rank 0 (total 4 processes)
804 0 reduceScatter 275427 275427 275427 204020 11346849 0
807 1) The first four values after the name of the action declare the recvcounts array
808 2) The value 11346849 is the amount of instructions
809 3) The last value corresponds to the datatype, see decode_datatype().
811 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
815 double clock = smpi_process_simulated_elapsed();
816 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
817 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
818 int comp_size = parse_double(action[2+comm_size]);
819 int *recvcounts = xbt_new0(int, comm_size);
820 int *disps = xbt_new0(int, comm_size);
822 int rank = smpi_process_index();
824 if(action[3+comm_size])
825 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
827 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
829 for(i=0;i<comm_size;i++) {
830 recvcounts[i] = atoi(action[i+2]);
835 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
836 extra->type = TRACING_REDUCE_SCATTER;
837 extra->send_size = 0;
838 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
839 for(i=0; i< comm_size; i++)//copy data to avoid bad free
840 extra->recvcounts[i] = recvcounts[i];
841 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
842 extra->comp_size = comp_size;
843 extra->num_processes = comm_size;
845 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
847 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
848 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
850 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
852 smpi_execute_flops(comp_size);
855 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
856 xbt_free(recvcounts);
858 log_timed_action (action, clock);
861 static void action_allgather(const char *const *action) {
863 The structure of the allgather action for the rank 0 (total 4 processes)
865 0 allGather 275427 275427
868 1) 275427 is the sendcount
869 2) 275427 is the recvcount
870 3) No more values mean that the datatype for sent and receive buffer
871 is the default one, see decode_datatype().
875 double clock = smpi_process_simulated_elapsed();
877 CHECK_ACTION_PARAMS(action, 2, 2);
878 int sendcount=atoi(action[2]);
879 int recvcount=atoi(action[3]);
881 MPI_Datatype MPI_CURRENT_TYPE2;
883 if(action[4] && action[5]) {
884 MPI_CURRENT_TYPE = decode_datatype(action[4]);
885 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
887 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
888 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
890 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
891 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
893 int rank = smpi_process_index();
894 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
895 extra->type = TRACING_ALLGATHER;
896 extra->send_size = sendcount;
897 extra->recv_size= recvcount;
898 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
899 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
900 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
902 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
904 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
906 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
907 log_timed_action (action, clock);
910 static void action_allgatherv(const char *const *action) {
913 The structure of the allgatherv action for the rank 0 (total 4 processes)
915 0 allGatherV 275427 275427 275427 275427 204020
918 1) 275427 is the sendcount
919 2) The next four elements declare the recvcounts array
920 3) No more values mean that the datatype for sent and receive buffer
921 is the default one, see decode_datatype().
925 double clock = smpi_process_simulated_elapsed();
927 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
928 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
930 int sendcount=atoi(action[2]);
931 int *recvcounts = xbt_new0(int, comm_size);
932 int *disps = xbt_new0(int, comm_size);
934 MPI_Datatype MPI_CURRENT_TYPE2;
936 if(action[3+comm_size] && action[4+comm_size]) {
937 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
938 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
940 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
941 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
943 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
945 for(i=0;i<comm_size;i++) {
946 recvcounts[i] = atoi(action[i+3]);
947 recv_sum=recv_sum+recvcounts[i];
949 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
951 int rank = smpi_process_index();
952 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
953 extra->type = TRACING_ALLGATHERV;
954 extra->send_size = sendcount;
955 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
956 for(i=0; i< comm_size; i++)//copy data to avoid bad free
957 extra->recvcounts[i] = recvcounts[i];
958 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
959 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
960 extra->num_processes = comm_size;
962 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
964 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
966 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
967 log_timed_action (action, clock);
968 xbt_free(recvcounts);
972 static void action_allToAllv(const char *const *action) {
974 The structure of the allToAllV action for the rank 0 (total 4 processes)
976 0 allToAllV 100 1 7 10 12 100 1 70 10 5
979 1) 100 is the size of the send buffer *sizeof(int),
980 2) 1 7 10 12 is the sendcounts array
981 3) 100*sizeof(int) is the size of the receiver buffer
982 4) 1 70 10 5 is the recvcounts array
987 double clock = smpi_process_simulated_elapsed();
989 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
990 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
991 int send_buf_size=0,recv_buf_size=0,i=0;
992 int *sendcounts = xbt_new0(int, comm_size);
993 int *recvcounts = xbt_new0(int, comm_size);
994 int *senddisps = xbt_new0(int, comm_size);
995 int *recvdisps = xbt_new0(int, comm_size);
997 MPI_Datatype MPI_CURRENT_TYPE2;
999 send_buf_size=parse_double(action[2]);
1000 recv_buf_size=parse_double(action[3+comm_size]);
1001 if(action[4+2*comm_size] && action[5+2*comm_size]) {
1002 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
1003 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
1006 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
1007 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
1010 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1011 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1013 for(i=0;i<comm_size;i++) {
1014 sendcounts[i] = atoi(action[i+3]);
1015 recvcounts[i] = atoi(action[i+4+comm_size]);
1019 int rank = smpi_process_index();
1020 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1021 extra->type = TRACING_ALLTOALLV;
1022 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1023 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1024 extra->num_processes = comm_size;
1026 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1027 extra->send_size += sendcounts[i];
1028 extra->sendcounts[i] = sendcounts[i];
1029 extra->recv_size += recvcounts[i];
1030 extra->recvcounts[i] = recvcounts[i];
1032 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1033 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1035 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1037 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1038 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1041 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1042 log_timed_action (action, clock);
1043 xbt_free(sendcounts);
1044 xbt_free(recvcounts);
1045 xbt_free(senddisps);
1046 xbt_free(recvdisps);
1049 void smpi_replay_run(int *argc, char***argv){
1050 /* First initializes everything */
1051 smpi_process_init(argc, argv);
1052 smpi_process_mark_as_initialized();
1053 smpi_process_set_replaying(1);
1055 int rank = smpi_process_index();
1056 TRACE_smpi_init(rank);
1057 TRACE_smpi_computing_init(rank);
1058 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1059 extra->type = TRACING_INIT;
1060 char *operation =bprintf("%s_init",__FUNCTION__);
1061 TRACE_smpi_collective_in(rank, -1, operation, extra);
1062 TRACE_smpi_collective_out(rank, -1, operation);
1065 if (!_xbt_replay_action_init()) {
1066 xbt_replay_action_register("init", action_init);
1067 xbt_replay_action_register("finalize", action_finalize);
1068 xbt_replay_action_register("comm_size", action_comm_size);
1069 xbt_replay_action_register("comm_split", action_comm_split);
1070 xbt_replay_action_register("comm_dup", action_comm_dup);
1071 xbt_replay_action_register("send", action_send);
1072 xbt_replay_action_register("Isend", action_Isend);
1073 xbt_replay_action_register("recv", action_recv);
1074 xbt_replay_action_register("Irecv", action_Irecv);
1075 xbt_replay_action_register("test", action_test);
1076 xbt_replay_action_register("wait", action_wait);
1077 xbt_replay_action_register("waitAll", action_waitall);
1078 xbt_replay_action_register("barrier", action_barrier);
1079 xbt_replay_action_register("bcast", action_bcast);
1080 xbt_replay_action_register("reduce", action_reduce);
1081 xbt_replay_action_register("allReduce", action_allReduce);
1082 xbt_replay_action_register("allToAll", action_allToAll);
1083 xbt_replay_action_register("allToAllV", action_allToAllv);
1084 xbt_replay_action_register("gather", action_gather);
1085 xbt_replay_action_register("gatherV", action_gatherv);
1086 xbt_replay_action_register("allGather", action_allgather);
1087 xbt_replay_action_register("allGatherV", action_allgatherv);
1088 xbt_replay_action_register("reduceScatter", action_reducescatter);
1089 xbt_replay_action_register("compute", action_compute);
1092 //if we have a delayed start, sleep here.
1095 double value = strtod((*argv)[2], &endptr);
1096 if (*endptr != '\0')
1097 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1098 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1099 smpi_execute_flops(value);
1101 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1102 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
1103 smpi_execute_flops(0.0);
1106 /* Actually run the replay */
1107 xbt_replay_action_runner(*argc, *argv);
1109 /* and now, finalize everything */
1110 double sim_time= 1.;
1111 /* One active process will stop. Decrease the counter*/
1112 XBT_DEBUG("There are %lu elements in reqq[*]",
1113 xbt_dynar_length(get_reqq_self()));
1114 if (!xbt_dynar_is_empty(get_reqq_self())){
1115 int count_requests=xbt_dynar_length(get_reqq_self());
1116 MPI_Request requests[count_requests];
1117 MPI_Status status[count_requests];
1120 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1121 smpi_mpi_waitall(count_requests, requests, status);
1127 if(!active_processes){
1128 /* Last process alive speaking */
1129 /* end the simulated timer */
1130 sim_time = smpi_process_simulated_elapsed();
1134 //TODO xbt_dynar_free_container(get_reqq_self()));
1136 if(!active_processes){
1137 XBT_INFO("Simulation time %f", sim_time);
1138 _xbt_replay_action_exit();
1139 xbt_free(sendbuffer);
1140 xbt_free(recvbuffer);
1142 xbt_dict_free(&reqq); //not need, data have been freed ???
1146 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1147 extra_fin->type = TRACING_FINALIZE;
1148 operation =bprintf("%s_finalize",__FUNCTION__);
1149 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1151 smpi_process_finalize();
1153 TRACE_smpi_collective_out(rank, -1, operation);
1154 TRACE_smpi_finalize(smpi_process_index());
1155 smpi_process_destroy();