1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
28 static void log_timed_action (const char *const *action, double clock){
29 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30 char *name = xbt_str_join_array(action, " ");
31 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static xbt_dynar_t get_reqq_self()
39 char * key = bprintf("%d", smpi_process_index());
40 xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
43 return dynar_mpi_request;
46 static void set_reqq_self(xbt_dynar_t mpi_request)
48 char * key = bprintf("%d", smpi_process_index());
49 xbt_dict_set(reqq, key, mpi_request, free);
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
57 if (!smpi_process_get_replaying())
58 return xbt_malloc(size);
59 if (sendbuffer_size<size){
60 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
65 //allocate a single buffer for all recv
66 void* smpi_get_tmp_recvbuffer(int size){
67 if (!smpi_process_get_replaying())
68 return xbt_malloc(size);
69 if (recvbuffer_size<size){
70 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
76 void smpi_free_tmp_buffer(void* buf){
77 if (!smpi_process_get_replaying())
82 static double parse_double(const char *string)
86 value = strtod(string, &endptr);
88 THROWF(unknown_error, 0, "%s is not a double", string);
92 static MPI_Datatype decode_datatype(const char *const action)
94 // Declared datatypes,
99 MPI_CURRENT_TYPE=MPI_DOUBLE;
102 MPI_CURRENT_TYPE=MPI_INT;
105 MPI_CURRENT_TYPE=MPI_CHAR;
108 MPI_CURRENT_TYPE=MPI_SHORT;
111 MPI_CURRENT_TYPE=MPI_LONG;
114 MPI_CURRENT_TYPE=MPI_FLOAT;
117 MPI_CURRENT_TYPE=MPI_BYTE;
120 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
123 return MPI_CURRENT_TYPE;
127 const char* encode_datatype(MPI_Datatype datatype, int* known)
130 //default type for output is set to MPI_BYTE
131 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
133 if (datatype==MPI_BYTE){
136 if(datatype==MPI_DOUBLE)
138 if(datatype==MPI_INT)
140 if(datatype==MPI_CHAR)
142 if(datatype==MPI_SHORT)
144 if(datatype==MPI_LONG)
146 if(datatype==MPI_FLOAT)
148 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
150 // default - not implemented.
151 // do not warn here as we pass in this function even for other trace formats
155 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
157 while(action[i]!=NULL)\
160 THROWF(arg_error, 0, "%s replay failed.\n" \
161 "%d items were given on the line. First two should be process_id and action. " \
162 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
163 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
167 static void action_init(const char *const *action)
169 XBT_DEBUG("Initialize the counters");
170 CHECK_ACTION_PARAMS(action, 0, 1);
171 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
172 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
174 /* start a simulated timer */
175 smpi_process_simulated_start();
176 /*initialize the number of active processes */
177 active_processes = smpi_process_count();
180 reqq = xbt_dict_new();
183 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
186 reqq=xbt_new0(xbt_dynar_t,active_processes);
188 for(i=0;i<active_processes;i++){
189 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
195 static void action_finalize(const char *const *action)
199 static void action_comm_size(const char *const *action)
201 double clock = smpi_process_simulated_elapsed();
203 communicator_size = parse_double(action[2]);
204 log_timed_action (action, clock);
207 static void action_comm_split(const char *const *action)
209 double clock = smpi_process_simulated_elapsed();
211 log_timed_action (action, clock);
214 static void action_comm_dup(const char *const *action)
216 double clock = smpi_process_simulated_elapsed();
218 log_timed_action (action, clock);
221 static void action_compute(const char *const *action)
223 CHECK_ACTION_PARAMS(action, 1, 0);
224 double clock = smpi_process_simulated_elapsed();
225 double flops= parse_double(action[2]);
226 int rank = smpi_process_index();
227 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
228 extra->type=TRACING_COMPUTING;
229 extra->comp_size=flops;
230 TRACE_smpi_computing_in(rank, extra);
232 smpi_execute_flops(flops);
234 TRACE_smpi_computing_out(rank);
235 log_timed_action (action, clock);
238 static void action_send(const char *const *action)
240 CHECK_ACTION_PARAMS(action, 2, 1);
241 int to = atoi(action[2]);
242 double size=parse_double(action[3]);
243 double clock = smpi_process_simulated_elapsed();
246 MPI_CURRENT_TYPE=decode_datatype(action[4]);
248 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
251 int rank = smpi_process_index();
253 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
254 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
255 extra->type = TRACING_SEND;
256 extra->send_size = size;
258 extra->dst = dst_traced;
259 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
260 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
261 if (!TRACE_smpi_view_internals()) {
262 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
265 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
267 log_timed_action (action, clock);
269 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
272 static void action_Isend(const char *const *action)
274 CHECK_ACTION_PARAMS(action, 2, 1);
275 int to = atoi(action[2]);
276 double size=parse_double(action[3]);
277 double clock = smpi_process_simulated_elapsed();
280 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
281 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
283 int rank = smpi_process_index();
284 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
285 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
286 extra->type = TRACING_ISEND;
287 extra->send_size = size;
289 extra->dst = dst_traced;
290 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
291 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
292 if (!TRACE_smpi_view_internals()) {
293 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
296 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
298 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
301 xbt_dynar_push(get_reqq_self(),&request);
303 log_timed_action (action, clock);
306 static void action_recv(const char *const *action) {
307 CHECK_ACTION_PARAMS(action, 2, 1);
308 int from = atoi(action[2]);
309 double size=parse_double(action[3]);
310 double clock = smpi_process_simulated_elapsed();
313 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
314 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
316 int rank = smpi_process_index();
317 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
319 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
320 extra->type = TRACING_RECV;
321 extra->send_size = size;
322 extra->src = src_traced;
324 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
325 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
327 //unknow size from the receiver pov
329 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
333 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
335 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
336 if (!TRACE_smpi_view_internals()) {
337 TRACE_smpi_recv(rank, src_traced, rank);
340 log_timed_action (action, clock);
343 static void action_Irecv(const char *const *action)
345 CHECK_ACTION_PARAMS(action, 2, 1);
346 int from = atoi(action[2]);
347 double size=parse_double(action[3]);
348 double clock = smpi_process_simulated_elapsed();
351 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
352 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
354 int rank = smpi_process_index();
355 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
356 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
357 extra->type = TRACING_IRECV;
358 extra->send_size = size;
359 extra->src = src_traced;
361 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
362 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
364 //unknow size from the receiver pov
366 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
370 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
372 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
374 xbt_dynar_push(get_reqq_self(),&request);
376 log_timed_action (action, clock);
379 static void action_test(const char *const *action){
380 CHECK_ACTION_PARAMS(action, 0, 0);
381 double clock = smpi_process_simulated_elapsed();
386 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
387 //if request is null here, this may mean that a previous test has succeeded
388 //Different times in traced application and replayed version may lead to this
389 //In this case, ignore the extra calls.
391 int rank = smpi_process_index();
392 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
393 extra->type=TRACING_TEST;
394 TRACE_smpi_testing_in(rank, extra);
396 flag = smpi_mpi_test(&request, &status);
398 XBT_DEBUG("MPI_Test result: %d", flag);
399 /* push back request in dynar to be caught by a subsequent wait. if the test
400 * did succeed, the request is now NULL.
402 xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
404 TRACE_smpi_testing_out(rank);
406 log_timed_action (action, clock);
409 static void action_wait(const char *const *action){
410 CHECK_ACTION_PARAMS(action, 0, 0);
411 double clock = smpi_process_simulated_elapsed();
415 xbt_assert(xbt_dynar_length(get_reqq_self()),
416 "action wait not preceded by any irecv or isend: %s",
417 xbt_str_join_array(action," "));
418 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
421 /* Assuming that the trace is well formed, this mean the comm might have
422 * been caught by a MPI_test. Then just return.
427 int rank = request->comm != MPI_COMM_NULL
428 ? smpi_comm_rank(request->comm)
431 MPI_Group group = smpi_comm_group(request->comm);
432 int src_traced = smpi_group_rank(group, request->src);
433 int dst_traced = smpi_group_rank(group, request->dst);
434 int is_wait_for_receive = request->recv;
435 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
436 extra->type = TRACING_WAIT;
437 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
439 smpi_mpi_wait(&request, &status);
441 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
442 if (is_wait_for_receive)
443 TRACE_smpi_recv(rank, src_traced, dst_traced);
444 log_timed_action (action, clock);
447 static void action_waitall(const char *const *action){
448 CHECK_ACTION_PARAMS(action, 0, 0);
449 double clock = smpi_process_simulated_elapsed();
450 int count_requests=0;
453 count_requests=xbt_dynar_length(get_reqq_self());
455 if (count_requests>0) {
456 MPI_Request requests[count_requests];
457 MPI_Status status[count_requests];
459 /* The reqq is an array of dynars. Its index corresponds to the rank.
460 Thus each rank saves its own requests to the array request. */
461 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
463 //save information from requests
465 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
466 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
467 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
468 for (i = 0; (int)i < count_requests; i++) {
470 int *asrc = xbt_new(int, 1);
471 int *adst = xbt_new(int, 1);
472 int *arecv = xbt_new(int, 1);
473 *asrc = requests[i]->src;
474 *adst = requests[i]->dst;
475 *arecv = requests[i]->recv;
476 xbt_dynar_insert_at(srcs, i, asrc);
477 xbt_dynar_insert_at(dsts, i, adst);
478 xbt_dynar_insert_at(recvs, i, arecv);
483 int *t = xbt_new(int, 1);
484 xbt_dynar_insert_at(srcs, i, t);
485 xbt_dynar_insert_at(dsts, i, t);
486 xbt_dynar_insert_at(recvs, i, t);
490 int rank_traced = smpi_process_index();
491 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
492 extra->type = TRACING_WAITALL;
493 extra->send_size=count_requests;
494 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
496 smpi_mpi_waitall(count_requests, requests, status);
498 for (i = 0; (int)i < count_requests; i++) {
499 int src_traced, dst_traced, is_wait_for_receive;
500 xbt_dynar_get_cpy(srcs, i, &src_traced);
501 xbt_dynar_get_cpy(dsts, i, &dst_traced);
502 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
503 if (is_wait_for_receive) {
504 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
507 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
509 xbt_dynar_free(&srcs);
510 xbt_dynar_free(&dsts);
511 xbt_dynar_free(&recvs);
513 //TODO xbt_dynar_free_container(get_reqq_self());
514 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
516 log_timed_action (action, clock);
519 static void action_barrier(const char *const *action){
520 double clock = smpi_process_simulated_elapsed();
521 int rank = smpi_process_index();
522 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
523 extra->type = TRACING_BARRIER;
524 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
526 mpi_coll_barrier_fun(MPI_COMM_WORLD);
528 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
529 log_timed_action (action, clock);
533 static void action_bcast(const char *const *action)
535 CHECK_ACTION_PARAMS(action, 1, 2);
536 double size = parse_double(action[2]);
537 double clock = smpi_process_simulated_elapsed();
540 * Initialize MPI_CURRENT_TYPE in order to decrease
541 * the number of the checks
543 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
546 root= atoi(action[3]);
548 MPI_CURRENT_TYPE=decode_datatype(action[4]);
552 int rank = smpi_process_index();
553 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
555 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
556 extra->type = TRACING_BCAST;
557 extra->send_size = size;
558 extra->root = root_traced;
559 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
560 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
561 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
563 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
565 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
566 log_timed_action (action, clock);
569 static void action_reduce(const char *const *action)
571 CHECK_ACTION_PARAMS(action, 2, 2);
572 double comm_size = parse_double(action[2]);
573 double comp_size = parse_double(action[3]);
574 double clock = smpi_process_simulated_elapsed();
576 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
579 root= atoi(action[4]);
581 MPI_CURRENT_TYPE=decode_datatype(action[5]);
587 int rank = smpi_process_index();
588 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
589 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
590 extra->type = TRACING_REDUCE;
591 extra->send_size = comm_size;
592 extra->comp_size = comp_size;
593 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
594 extra->root = root_traced;
596 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
598 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
599 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
600 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
601 smpi_execute_flops(comp_size);
603 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
604 log_timed_action (action, clock);
607 static void action_allReduce(const char *const *action) {
608 CHECK_ACTION_PARAMS(action, 2, 1);
609 double comm_size = parse_double(action[2]);
610 double comp_size = parse_double(action[3]);
612 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
613 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
615 double clock = smpi_process_simulated_elapsed();
616 int rank = smpi_process_index();
617 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
618 extra->type = TRACING_ALLREDUCE;
619 extra->send_size = comm_size;
620 extra->comp_size = comp_size;
621 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
622 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
624 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
625 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
626 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
627 smpi_execute_flops(comp_size);
629 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
630 log_timed_action (action, clock);
633 static void action_allToAll(const char *const *action) {
634 CHECK_ACTION_PARAMS(action, 2, 2); //two mandatory (send and recv volumes)
635 //two optional (corresponding datatypes)
636 double clock = smpi_process_simulated_elapsed();
637 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
638 int send_size = parse_double(action[2]);
639 int recv_size = parse_double(action[3]);
640 MPI_Datatype MPI_CURRENT_TYPE2;
642 if(action[4] && action[5]) {
643 MPI_CURRENT_TYPE=decode_datatype(action[4]);
644 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
647 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
648 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
651 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
652 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
654 int rank = smpi_process_index();
655 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
656 extra->type = TRACING_ALLTOALL;
657 extra->send_size = send_size;
658 extra->recv_size = recv_size;
659 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
660 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
662 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
664 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
666 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
667 log_timed_action (action, clock);
671 static void action_gather(const char *const *action) {
673 The structure of the gather action for the rank 0 (total 4 processes)
678 1) 68 is the sendcounts
679 2) 68 is the recvcounts
680 3) 0 is the root node
681 4) 0 is the send datatype id, see decode_datatype()
682 5) 0 is the recv datatype id, see decode_datatype()
684 CHECK_ACTION_PARAMS(action, 2, 3);
685 double clock = smpi_process_simulated_elapsed();
686 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
687 int send_size = parse_double(action[2]);
688 int recv_size = parse_double(action[3]);
689 MPI_Datatype MPI_CURRENT_TYPE2;
690 if(action[4] && action[5]) {
691 MPI_CURRENT_TYPE=decode_datatype(action[5]);
692 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
694 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
695 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
697 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
701 root=atoi(action[4]);
702 int rank = smpi_comm_rank(MPI_COMM_WORLD);
705 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
707 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
708 extra->type = TRACING_GATHER;
709 extra->send_size = send_size;
710 extra->recv_size = recv_size;
712 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
713 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
715 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
717 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
718 recv, recv_size, MPI_CURRENT_TYPE2,
719 root, MPI_COMM_WORLD);
721 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
722 log_timed_action (action, clock);
727 static void action_gatherv(const char *const *action) {
729 The structure of the gatherv action for the rank 0 (total 4 processes)
731 0 gather 68 68 10 10 10 0 0 0
734 1) 68 is the sendcount
735 2) 68 10 10 10 is the recvcounts
736 3) 0 is the root node
737 4) 0 is the send datatype id, see decode_datatype()
738 5) 0 is the recv datatype id, see decode_datatype()
741 double clock = smpi_process_simulated_elapsed();
742 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
743 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
744 int send_size = parse_double(action[2]);
745 int *disps = xbt_new0(int, comm_size);
746 int *recvcounts = xbt_new0(int, comm_size);
749 MPI_Datatype MPI_CURRENT_TYPE2;
750 if(action[4+comm_size] && action[5+comm_size]) {
751 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
752 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
754 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
755 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
757 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
759 for(i=0;i<comm_size;i++) {
760 recvcounts[i] = atoi(action[i+3]);
761 recv_sum=recv_sum+recvcounts[i];
765 int root=atoi(action[3+comm_size]);
766 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
769 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
771 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
772 extra->type = TRACING_GATHERV;
773 extra->send_size = send_size;
774 extra->recvcounts= xbt_new(int,comm_size);
775 for(i=0; i< comm_size; i++)//copy data to avoid bad free
776 extra->recvcounts[i] = recvcounts[i];
778 extra->num_processes = comm_size;
779 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
780 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
782 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
784 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
785 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
786 root, MPI_COMM_WORLD);
788 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
789 log_timed_action (action, clock);
790 xbt_free(recvcounts);
794 static void action_reducescatter(const char *const *action) {
797 The structure of the reducescatter action for the rank 0 (total 4 processes)
799 0 reduceScatter 275427 275427 275427 204020 11346849 0
802 1) The first four values after the name of the action declare the recvcounts array
803 2) The value 11346849 is the amount of instructions
804 3) The last value corresponds to the datatype, see decode_datatype().
806 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
810 double clock = smpi_process_simulated_elapsed();
811 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
812 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
813 int comp_size = parse_double(action[2+comm_size]);
814 int *recvcounts = xbt_new0(int, comm_size);
815 int *disps = xbt_new0(int, comm_size);
817 int rank = smpi_process_index();
819 if(action[3+comm_size])
820 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
822 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
824 for(i=0;i<comm_size;i++) {
825 recvcounts[i] = atoi(action[i+2]);
830 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
831 extra->type = TRACING_REDUCE_SCATTER;
832 extra->send_size = 0;
833 extra->recvcounts= xbt_new(int, comm_size);
834 for(i=0; i< comm_size; i++)//copy data to avoid bad free
835 extra->recvcounts[i] = recvcounts[i];
836 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
837 extra->comp_size = comp_size;
838 extra->num_processes = comm_size;
840 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
842 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
843 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
845 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
847 smpi_execute_flops(comp_size);
850 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
851 xbt_free(recvcounts);
853 log_timed_action (action, clock);
856 static void action_allgather(const char *const *action) {
858 The structure of the allgather action for the rank 0 (total 4 processes)
860 0 allGather 275427 275427
863 1) 275427 is the sendcount
864 2) 275427 is the recvcount
865 3) No more values mean that the datatype for sent and receive buffer
866 is the default one, see decode_datatype().
870 double clock = smpi_process_simulated_elapsed();
872 CHECK_ACTION_PARAMS(action, 2, 2);
873 int sendcount=atoi(action[2]);
874 int recvcount=atoi(action[3]);
876 MPI_Datatype MPI_CURRENT_TYPE2;
878 if(action[4] && action[5]) {
879 MPI_CURRENT_TYPE = decode_datatype(action[4]);
880 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
882 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
883 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
885 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
886 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
888 int rank = smpi_process_index();
889 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
890 extra->type = TRACING_ALLGATHER;
891 extra->send_size = sendcount;
892 extra->recv_size= recvcount;
893 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
894 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
895 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
897 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
899 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
901 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
902 log_timed_action (action, clock);
905 static void action_allgatherv(const char *const *action) {
908 The structure of the allgatherv action for the rank 0 (total 4 processes)
910 0 allGatherV 275427 275427 275427 275427 204020
913 1) 275427 is the sendcount
914 2) The next four elements declare the recvcounts array
915 3) No more values mean that the datatype for sent and receive buffer
916 is the default one, see decode_datatype().
920 double clock = smpi_process_simulated_elapsed();
922 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
923 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
925 int sendcount=atoi(action[2]);
926 int *recvcounts = xbt_new0(int, comm_size);
927 int *disps = xbt_new0(int, comm_size);
929 MPI_Datatype MPI_CURRENT_TYPE2;
931 if(action[3+comm_size] && action[4+comm_size]) {
932 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
933 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
935 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
936 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
938 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
940 for(i=0;i<comm_size;i++) {
941 recvcounts[i] = atoi(action[i+3]);
942 recv_sum=recv_sum+recvcounts[i];
944 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
946 int rank = smpi_process_index();
947 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
948 extra->type = TRACING_ALLGATHERV;
949 extra->send_size = sendcount;
950 extra->recvcounts= xbt_new(int, comm_size);
951 for(i=0; i< comm_size; i++)//copy data to avoid bad free
952 extra->recvcounts[i] = recvcounts[i];
953 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
954 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
955 extra->num_processes = comm_size;
957 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
959 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
961 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
962 log_timed_action (action, clock);
963 xbt_free(recvcounts);
967 static void action_allToAllv(const char *const *action) {
969 The structure of the allToAllV action for the rank 0 (total 4 processes)
971 0 allToAllV 100 1 7 10 12 100 1 70 10 5
974 1) 100 is the size of the send buffer *sizeof(int),
975 2) 1 7 10 12 is the sendcounts array
976 3) 100*sizeof(int) is the size of the receiver buffer
977 4) 1 70 10 5 is the recvcounts array
982 double clock = smpi_process_simulated_elapsed();
984 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
985 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
986 int send_buf_size=0,recv_buf_size=0,i=0;
987 int *sendcounts = xbt_new0(int, comm_size);
988 int *recvcounts = xbt_new0(int, comm_size);
989 int *senddisps = xbt_new0(int, comm_size);
990 int *recvdisps = xbt_new0(int, comm_size);
992 MPI_Datatype MPI_CURRENT_TYPE2;
994 send_buf_size=parse_double(action[2]);
995 recv_buf_size=parse_double(action[3+comm_size]);
996 if(action[4+2*comm_size] && action[5+2*comm_size]) {
997 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
998 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
1001 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
1002 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
1005 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1006 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1008 for(i=0;i<comm_size;i++) {
1009 sendcounts[i] = atoi(action[i+3]);
1010 recvcounts[i] = atoi(action[i+4+comm_size]);
1014 int rank = smpi_process_index();
1015 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1016 extra->type = TRACING_ALLTOALLV;
1017 extra->recvcounts= xbt_new(int, comm_size);
1018 extra->sendcounts= xbt_new(int, comm_size);
1019 extra->num_processes = comm_size;
1021 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1022 extra->send_size += sendcounts[i];
1023 extra->sendcounts[i] = sendcounts[i];
1024 extra->recv_size += recvcounts[i];
1025 extra->recvcounts[i] = recvcounts[i];
1027 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1028 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1030 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1032 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1033 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1036 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1037 log_timed_action (action, clock);
1038 xbt_free(sendcounts);
1039 xbt_free(recvcounts);
1040 xbt_free(senddisps);
1041 xbt_free(recvdisps);
1044 void smpi_replay_run(int *argc, char***argv){
1045 /* First initializes everything */
1046 smpi_process_init(argc, argv);
1047 smpi_process_mark_as_initialized();
1048 smpi_process_set_replaying(1);
1050 int rank = smpi_process_index();
1051 TRACE_smpi_init(rank);
1052 TRACE_smpi_computing_init(rank);
1053 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1054 extra->type = TRACING_INIT;
1055 char *operation =bprintf("%s_init",__FUNCTION__);
1056 TRACE_smpi_collective_in(rank, -1, operation, extra);
1057 TRACE_smpi_collective_out(rank, -1, operation);
1060 if (!_xbt_replay_action_init()) {
1061 xbt_replay_action_register("init", action_init);
1062 xbt_replay_action_register("finalize", action_finalize);
1063 xbt_replay_action_register("comm_size", action_comm_size);
1064 xbt_replay_action_register("comm_split", action_comm_split);
1065 xbt_replay_action_register("comm_dup", action_comm_dup);
1066 xbt_replay_action_register("send", action_send);
1067 xbt_replay_action_register("Isend", action_Isend);
1068 xbt_replay_action_register("recv", action_recv);
1069 xbt_replay_action_register("Irecv", action_Irecv);
1070 xbt_replay_action_register("test", action_test);
1071 xbt_replay_action_register("wait", action_wait);
1072 xbt_replay_action_register("waitAll", action_waitall);
1073 xbt_replay_action_register("barrier", action_barrier);
1074 xbt_replay_action_register("bcast", action_bcast);
1075 xbt_replay_action_register("reduce", action_reduce);
1076 xbt_replay_action_register("allReduce", action_allReduce);
1077 xbt_replay_action_register("allToAll", action_allToAll);
1078 xbt_replay_action_register("allToAllV", action_allToAllv);
1079 xbt_replay_action_register("gather", action_gather);
1080 xbt_replay_action_register("gatherV", action_gatherv);
1081 xbt_replay_action_register("allGather", action_allgather);
1082 xbt_replay_action_register("allGatherV", action_allgatherv);
1083 xbt_replay_action_register("reduceScatter", action_reducescatter);
1084 xbt_replay_action_register("compute", action_compute);
1087 //if we have a delayed start, sleep here.
1090 double value = strtod((*argv)[2], &endptr);
1091 if (*endptr != '\0')
1092 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1093 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1094 smpi_execute_flops(value);
1096 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1097 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
1098 smpi_execute_flops(0.0);
1101 /* Actually run the replay */
1102 xbt_replay_action_runner(*argc, *argv);
1104 /* and now, finalize everything */
1105 double sim_time= 1.;
1106 /* One active process will stop. Decrease the counter*/
1107 XBT_DEBUG("There are %lu elements in reqq[*]",
1108 xbt_dynar_length(get_reqq_self()));
1109 if (!xbt_dynar_is_empty(get_reqq_self())){
1110 int count_requests=xbt_dynar_length(get_reqq_self());
1111 MPI_Request requests[count_requests];
1112 MPI_Status status[count_requests];
1115 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1116 smpi_mpi_waitall(count_requests, requests, status);
1122 if(!active_processes){
1123 /* Last process alive speaking */
1124 /* end the simulated timer */
1125 sim_time = smpi_process_simulated_elapsed();
1129 //TODO xbt_dynar_free_container(get_reqq_self()));
1131 if(!active_processes){
1132 XBT_INFO("Simulation time %f", sim_time);
1133 _xbt_replay_action_exit();
1134 xbt_free(sendbuffer);
1135 xbt_free(recvbuffer);
1137 xbt_dict_free(&reqq); //not need, data have been freed ???
1141 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1142 extra_fin->type = TRACING_FINALIZE;
1143 operation =bprintf("%s_finalize",__FUNCTION__);
1144 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1146 smpi_process_finalize();
1148 TRACE_smpi_collective_out(rank, -1, operation);
1149 TRACE_smpi_finalize(smpi_process_index());
1150 smpi_process_destroy();