1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = nullptr;
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
23 static int sendbuffer_size=0;
24 char* sendbuffer=nullptr;
25 static int recvbuffer_size=0;
26 char* recvbuffer=nullptr;
28 static void log_timed_action (const char *const *action, double clock){
29 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30 char *name = xbt_str_join_array(action, " ");
31 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
36 static xbt_dynar_t get_reqq_self()
38 char * key = bprintf("%d", smpi_process_index());
39 xbt_dynar_t dynar_mpi_request = static_cast<xbt_dynar_t>(xbt_dict_get(reqq, key));
42 return dynar_mpi_request;
45 static void set_reqq_self(xbt_dynar_t mpi_request)
47 char * key = bprintf("%d", smpi_process_index());
48 xbt_dict_set(reqq, key, mpi_request, free);
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (!smpi_process_get_replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (!smpi_process_get_replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (!smpi_process_get_replaying())
81 static double parse_double(const char *string)
85 value = strtod(string, &endptr);
87 THROWF(unknown_error, 0, "%s is not a double", string);
91 static MPI_Datatype decode_datatype(const char *const action)
93 // Declared datatypes,
94 switch(atoi(action)) {
96 MPI_CURRENT_TYPE=MPI_DOUBLE;
99 MPI_CURRENT_TYPE=MPI_INT;
102 MPI_CURRENT_TYPE=MPI_CHAR;
105 MPI_CURRENT_TYPE=MPI_SHORT;
108 MPI_CURRENT_TYPE=MPI_LONG;
111 MPI_CURRENT_TYPE=MPI_FLOAT;
114 MPI_CURRENT_TYPE=MPI_BYTE;
117 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
119 return MPI_CURRENT_TYPE;
123 const char* encode_datatype(MPI_Datatype datatype, int* known)
125 //default type for output is set to MPI_BYTE
126 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
129 if (datatype==MPI_BYTE){
132 if(datatype==MPI_DOUBLE)
134 if(datatype==MPI_INT)
136 if(datatype==MPI_CHAR)
138 if(datatype==MPI_SHORT)
140 if(datatype==MPI_LONG)
142 if(datatype==MPI_FLOAT)
144 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
147 // default - not implemented.
148 // do not warn here as we pass in this function even for other trace formats
152 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
154 while(action[i]!=nullptr)\
157 THROWF(arg_error, 0, "%s replay failed.\n" \
158 "%d items were given on the line. First two should be process_id and action. " \
159 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
160 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
163 static void action_init(const char *const *action)
165 XBT_DEBUG("Initialize the counters");
166 CHECK_ACTION_PARAMS(action, 0, 1)
168 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
169 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
171 /* start a simulated timer */
172 smpi_process_simulated_start();
173 /*initialize the number of active processes */
174 active_processes = smpi_process_count();
177 reqq = xbt_dict_new();
180 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
183 static void action_finalize(const char *const *action)
188 static void action_comm_size(const char *const *action)
190 double clock = smpi_process_simulated_elapsed();
192 communicator_size = parse_double(action[2]);
193 log_timed_action (action, clock);
196 static void action_comm_split(const char *const *action)
198 double clock = smpi_process_simulated_elapsed();
200 log_timed_action (action, clock);
203 static void action_comm_dup(const char *const *action)
205 double clock = smpi_process_simulated_elapsed();
207 log_timed_action (action, clock);
210 static void action_compute(const char *const *action)
212 CHECK_ACTION_PARAMS(action, 1, 0)
213 double clock = smpi_process_simulated_elapsed();
214 double flops= parse_double(action[2]);
215 int rank = smpi_process_index();
216 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
217 extra->type=TRACING_COMPUTING;
218 extra->comp_size=flops;
219 TRACE_smpi_computing_in(rank, extra);
221 smpi_execute_flops(flops);
223 TRACE_smpi_computing_out(rank);
224 log_timed_action (action, clock);
227 static void action_send(const char *const *action)
229 CHECK_ACTION_PARAMS(action, 2, 1)
230 int to = atoi(action[2]);
231 double size=parse_double(action[3]);
232 double clock = smpi_process_simulated_elapsed();
235 MPI_CURRENT_TYPE=decode_datatype(action[4]);
237 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
239 int rank = smpi_process_index();
241 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
242 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
243 extra->type = TRACING_SEND;
244 extra->send_size = size;
246 extra->dst = dst_traced;
247 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
248 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
249 if (!TRACE_smpi_view_internals()) {
250 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
253 smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
255 log_timed_action (action, clock);
257 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
260 static void action_Isend(const char *const *action)
262 CHECK_ACTION_PARAMS(action, 2, 1)
263 int to = atoi(action[2]);
264 double size=parse_double(action[3]);
265 double clock = smpi_process_simulated_elapsed();
269 MPI_CURRENT_TYPE=decode_datatype(action[4]);
271 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
273 int rank = smpi_process_index();
274 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
275 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
276 extra->type = TRACING_ISEND;
277 extra->send_size = size;
279 extra->dst = dst_traced;
280 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
281 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
282 if (!TRACE_smpi_view_internals()) {
283 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
286 request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
288 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
291 xbt_dynar_push(get_reqq_self(),&request);
293 log_timed_action (action, clock);
296 static void action_recv(const char *const *action) {
297 CHECK_ACTION_PARAMS(action, 2, 1)
298 int from = atoi(action[2]);
299 double size=parse_double(action[3]);
300 double clock = smpi_process_simulated_elapsed();
304 MPI_CURRENT_TYPE=decode_datatype(action[4]);
306 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
308 int rank = smpi_process_index();
309 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
311 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
312 extra->type = TRACING_RECV;
313 extra->send_size = size;
314 extra->src = src_traced;
316 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
317 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
319 //unknow size from the receiver pov
321 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
325 smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
327 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
328 if (!TRACE_smpi_view_internals()) {
329 TRACE_smpi_recv(rank, src_traced, rank);
332 log_timed_action (action, clock);
335 static void action_Irecv(const char *const *action)
337 CHECK_ACTION_PARAMS(action, 2, 1)
338 int from = atoi(action[2]);
339 double size=parse_double(action[3]);
340 double clock = smpi_process_simulated_elapsed();
344 MPI_CURRENT_TYPE=decode_datatype(action[4]);
346 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
348 int rank = smpi_process_index();
349 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
350 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
351 extra->type = TRACING_IRECV;
352 extra->send_size = size;
353 extra->src = src_traced;
355 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
356 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
358 //unknow size from the receiver pov
360 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
364 request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
366 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
368 xbt_dynar_push(get_reqq_self(),&request);
370 log_timed_action (action, clock);
373 static void action_test(const char *const *action){
374 CHECK_ACTION_PARAMS(action, 0, 0)
375 double clock = smpi_process_simulated_elapsed();
380 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
381 //if request is null here, this may mean that a previous test has succeeded
382 //Different times in traced application and replayed version may lead to this
383 //In this case, ignore the extra calls.
384 if(request!=nullptr){
385 int rank = smpi_process_index();
386 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
387 extra->type=TRACING_TEST;
388 TRACE_smpi_testing_in(rank, extra);
390 flag = smpi_mpi_test(&request, &status);
392 XBT_DEBUG("MPI_Test result: %d", flag);
393 /* push back request in dynar to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
394 xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
396 TRACE_smpi_testing_out(rank);
398 log_timed_action (action, clock);
401 static void action_wait(const char *const *action){
402 CHECK_ACTION_PARAMS(action, 0, 0)
403 double clock = smpi_process_simulated_elapsed();
407 xbt_assert(xbt_dynar_length(get_reqq_self()),
408 "action wait not preceded by any irecv or isend: %s",
409 xbt_str_join_array(action," "));
410 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
412 if (request==nullptr){
413 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
417 int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
419 MPI_Group group = smpi_comm_group(request->comm);
420 int src_traced = smpi_group_rank(group, request->src);
421 int dst_traced = smpi_group_rank(group, request->dst);
422 int is_wait_for_receive = request->recv;
423 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
424 extra->type = TRACING_WAIT;
425 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
427 smpi_mpi_wait(&request, &status);
429 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
430 if (is_wait_for_receive)
431 TRACE_smpi_recv(rank, src_traced, dst_traced);
432 log_timed_action (action, clock);
435 static void action_waitall(const char *const *action){
436 CHECK_ACTION_PARAMS(action, 0, 0)
437 double clock = smpi_process_simulated_elapsed();
438 int count_requests=0;
441 count_requests=xbt_dynar_length(get_reqq_self());
443 if (count_requests>0) {
444 MPI_Request requests[count_requests];
445 MPI_Status status[count_requests];
447 /* The reqq is an array of dynars. Its index corresponds to the rank.
448 Thus each rank saves its own requests to the array request. */
449 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
451 //save information from requests
452 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), nullptr);
453 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), nullptr);
454 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), nullptr);
455 for (i = 0; static_cast<int>(i) < count_requests; i++) {
457 int *asrc = xbt_new(int, 1);
458 int *adst = xbt_new(int, 1);
459 int *arecv = xbt_new(int, 1);
460 *asrc = requests[i]->src;
461 *adst = requests[i]->dst;
462 *arecv = requests[i]->recv;
463 xbt_dynar_insert_at(srcs, i, asrc);
464 xbt_dynar_insert_at(dsts, i, adst);
465 xbt_dynar_insert_at(recvs, i, arecv);
470 int *t = xbt_new(int, 1);
471 xbt_dynar_insert_at(srcs, i, t);
472 xbt_dynar_insert_at(dsts, i, t);
473 xbt_dynar_insert_at(recvs, i, t);
477 int rank_traced = smpi_process_index();
478 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
479 extra->type = TRACING_WAITALL;
480 extra->send_size=count_requests;
481 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
483 smpi_mpi_waitall(count_requests, requests, status);
485 for (i = 0; static_cast<int>(i) < count_requests; i++) {
486 int src_traced, dst_traced, is_wait_for_receive;
487 xbt_dynar_get_cpy(srcs, i, &src_traced);
488 xbt_dynar_get_cpy(dsts, i, &dst_traced);
489 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
490 if (is_wait_for_receive) {
491 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
494 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
496 xbt_dynar_free(&srcs);
497 xbt_dynar_free(&dsts);
498 xbt_dynar_free(&recvs);
499 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
501 log_timed_action (action, clock);
504 static void action_barrier(const char *const *action){
505 double clock = smpi_process_simulated_elapsed();
506 int rank = smpi_process_index();
507 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
508 extra->type = TRACING_BARRIER;
509 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
511 mpi_coll_barrier_fun(MPI_COMM_WORLD);
513 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
514 log_timed_action (action, clock);
517 static void action_bcast(const char *const *action)
519 CHECK_ACTION_PARAMS(action, 1, 2)
520 double size = parse_double(action[2]);
521 double clock = smpi_process_simulated_elapsed();
523 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
524 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
527 root= atoi(action[3]);
529 MPI_CURRENT_TYPE=decode_datatype(action[4]);
533 int rank = smpi_process_index();
534 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
536 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537 extra->type = TRACING_BCAST;
538 extra->send_size = size;
539 extra->root = root_traced;
540 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
541 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
542 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
544 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
546 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
547 log_timed_action (action, clock);
550 static void action_reduce(const char *const *action)
552 CHECK_ACTION_PARAMS(action, 2, 2)
553 double comm_size = parse_double(action[2]);
554 double comp_size = parse_double(action[3]);
555 double clock = smpi_process_simulated_elapsed();
557 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
560 root= atoi(action[4]);
562 MPI_CURRENT_TYPE=decode_datatype(action[5]);
566 int rank = smpi_process_index();
567 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
568 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
569 extra->type = TRACING_REDUCE;
570 extra->send_size = comm_size;
571 extra->comp_size = comp_size;
572 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
573 extra->root = root_traced;
575 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
577 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
578 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
579 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
580 smpi_execute_flops(comp_size);
582 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
583 log_timed_action (action, clock);
586 static void action_allReduce(const char *const *action) {
587 CHECK_ACTION_PARAMS(action, 2, 1)
588 double comm_size = parse_double(action[2]);
589 double comp_size = parse_double(action[3]);
592 MPI_CURRENT_TYPE=decode_datatype(action[4]);
594 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
596 double clock = smpi_process_simulated_elapsed();
597 int rank = smpi_process_index();
598 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
599 extra->type = TRACING_ALLREDUCE;
600 extra->send_size = comm_size;
601 extra->comp_size = comp_size;
602 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
603 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
605 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
606 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
607 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
608 smpi_execute_flops(comp_size);
610 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
611 log_timed_action (action, clock);
614 static void action_allToAll(const char *const *action) {
615 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
616 //two optional (corresponding datatypes)
617 double clock = smpi_process_simulated_elapsed();
618 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
619 int send_size = parse_double(action[2]);
620 int recv_size = parse_double(action[3]);
621 MPI_Datatype MPI_CURRENT_TYPE2;
623 if(action[4] && action[5]) {
624 MPI_CURRENT_TYPE=decode_datatype(action[4]);
625 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
628 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
629 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
632 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
633 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
635 int rank = smpi_process_index();
636 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
637 extra->type = TRACING_ALLTOALL;
638 extra->send_size = send_size;
639 extra->recv_size = recv_size;
640 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
641 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
643 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
645 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
647 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
648 log_timed_action (action, clock);
651 static void action_gather(const char *const *action) {
652 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
656 1) 68 is the sendcounts
657 2) 68 is the recvcounts
658 3) 0 is the root node
659 4) 0 is the send datatype id, see decode_datatype()
660 5) 0 is the recv datatype id, see decode_datatype()
662 CHECK_ACTION_PARAMS(action, 2, 3)
663 double clock = smpi_process_simulated_elapsed();
664 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
665 int send_size = parse_double(action[2]);
666 int recv_size = parse_double(action[3]);
667 MPI_Datatype MPI_CURRENT_TYPE2;
668 if(action[4] && action[5]) {
669 MPI_CURRENT_TYPE=decode_datatype(action[5]);
670 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
672 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
673 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
675 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
676 void *recv = nullptr;
679 root=atoi(action[4]);
680 int rank = smpi_comm_rank(MPI_COMM_WORLD);
683 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
685 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
686 extra->type = TRACING_GATHER;
687 extra->send_size = send_size;
688 extra->recv_size = recv_size;
690 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
691 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
693 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
695 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
697 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
698 log_timed_action (action, clock);
701 static void action_gatherv(const char *const *action) {
702 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
703 0 gather 68 68 10 10 10 0 0 0
706 1) 68 is the sendcount
707 2) 68 10 10 10 is the recvcounts
708 3) 0 is the root node
709 4) 0 is the send datatype id, see decode_datatype()
710 5) 0 is the recv datatype id, see decode_datatype()
713 double clock = smpi_process_simulated_elapsed();
714 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
715 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
716 int send_size = parse_double(action[2]);
717 int *disps = xbt_new0(int, comm_size);
718 int *recvcounts = xbt_new0(int, comm_size);
721 MPI_Datatype MPI_CURRENT_TYPE2;
722 if(action[4+comm_size] && action[5+comm_size]) {
723 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
724 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
726 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
727 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
729 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
730 void *recv = nullptr;
731 for(i=0;i<comm_size;i++) {
732 recvcounts[i] = atoi(action[i+3]);
733 recv_sum=recv_sum+recvcounts[i];
737 int root=atoi(action[3+comm_size]);
738 int rank = smpi_comm_rank(MPI_COMM_WORLD);
741 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
743 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
744 extra->type = TRACING_GATHERV;
745 extra->send_size = send_size;
746 extra->recvcounts= xbt_new(int,comm_size);
747 for(i=0; i< comm_size; i++)//copy data to avoid bad free
748 extra->recvcounts[i] = recvcounts[i];
750 extra->num_processes = comm_size;
751 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
752 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
754 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
756 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
758 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
759 log_timed_action (action, clock);
760 xbt_free(recvcounts);
764 static void action_reducescatter(const char *const *action) {
765 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
766 0 reduceScatter 275427 275427 275427 204020 11346849 0
769 1) The first four values after the name of the action declare the recvcounts array
770 2) The value 11346849 is the amount of instructions
771 3) The last value corresponds to the datatype, see decode_datatype().
773 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
774 double clock = smpi_process_simulated_elapsed();
775 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
776 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
777 int comp_size = parse_double(action[2+comm_size]);
778 int *recvcounts = xbt_new0(int, comm_size);
779 int *disps = xbt_new0(int, comm_size);
781 int rank = smpi_process_index();
783 if(action[3+comm_size])
784 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
786 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
788 for(i=0;i<comm_size;i++) {
789 recvcounts[i] = atoi(action[i+2]);
794 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
795 extra->type = TRACING_REDUCE_SCATTER;
796 extra->send_size = 0;
797 extra->recvcounts= xbt_new(int, comm_size);
798 for(i=0; i< comm_size; i++)//copy data to avoid bad free
799 extra->recvcounts[i] = recvcounts[i];
800 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
801 extra->comp_size = comp_size;
802 extra->num_processes = comm_size;
804 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
806 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
807 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
809 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
810 smpi_execute_flops(comp_size);
812 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
813 xbt_free(recvcounts);
815 log_timed_action (action, clock);
818 static void action_allgather(const char *const *action) {
819 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
820 0 allGather 275427 275427
823 1) 275427 is the sendcount
824 2) 275427 is the recvcount
825 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype(). */
826 double clock = smpi_process_simulated_elapsed();
828 CHECK_ACTION_PARAMS(action, 2, 2)
829 int sendcount=atoi(action[2]);
830 int recvcount=atoi(action[3]);
832 MPI_Datatype MPI_CURRENT_TYPE2;
834 if(action[4] && action[5]) {
835 MPI_CURRENT_TYPE = decode_datatype(action[4]);
836 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
838 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
839 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
841 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
842 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
844 int rank = smpi_process_index();
845 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
846 extra->type = TRACING_ALLGATHER;
847 extra->send_size = sendcount;
848 extra->recv_size= recvcount;
849 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
850 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
851 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
853 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
855 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
857 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
858 log_timed_action (action, clock);
861 static void action_allgatherv(const char *const *action) {
862 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
863 0 allGatherV 275427 275427 275427 275427 204020
866 1) 275427 is the sendcount
867 2) The next four elements declare the recvcounts array
868 3) No more values mean that the datatype for sent and receive buffer
869 is the default one, see decode_datatype(). */
870 double clock = smpi_process_simulated_elapsed();
872 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
873 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
875 int sendcount=atoi(action[2]);
876 int *recvcounts = xbt_new0(int, comm_size);
877 int *disps = xbt_new0(int, comm_size);
879 MPI_Datatype MPI_CURRENT_TYPE2;
881 if(action[3+comm_size] && action[4+comm_size]) {
882 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
883 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
885 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
886 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
888 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
890 for(i=0;i<comm_size;i++) {
891 recvcounts[i] = atoi(action[i+3]);
892 recv_sum=recv_sum+recvcounts[i];
894 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
896 int rank = smpi_process_index();
897 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
898 extra->type = TRACING_ALLGATHERV;
899 extra->send_size = sendcount;
900 extra->recvcounts= xbt_new(int, comm_size);
901 for(i=0; i< comm_size; i++)//copy data to avoid bad free
902 extra->recvcounts[i] = recvcounts[i];
903 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
904 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
905 extra->num_processes = comm_size;
907 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
909 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
912 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
913 log_timed_action (action, clock);
914 xbt_free(recvcounts);
918 static void action_allToAllv(const char *const *action) {
919 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
920 0 allToAllV 100 1 7 10 12 100 1 70 10 5
923 1) 100 is the size of the send buffer *sizeof(int),
924 2) 1 7 10 12 is the sendcounts array
925 3) 100*sizeof(int) is the size of the receiver buffer
926 4) 1 70 10 5 is the recvcounts array */
927 double clock = smpi_process_simulated_elapsed();
929 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
930 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
931 int send_buf_size=0,recv_buf_size=0,i=0;
932 int *sendcounts = xbt_new0(int, comm_size);
933 int *recvcounts = xbt_new0(int, comm_size);
934 int *senddisps = xbt_new0(int, comm_size);
935 int *recvdisps = xbt_new0(int, comm_size);
937 MPI_Datatype MPI_CURRENT_TYPE2;
939 send_buf_size=parse_double(action[2]);
940 recv_buf_size=parse_double(action[3+comm_size]);
941 if(action[4+2*comm_size] && action[5+2*comm_size]) {
942 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
943 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
946 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
947 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
950 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
951 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
953 for(i=0;i<comm_size;i++) {
954 sendcounts[i] = atoi(action[i+3]);
955 recvcounts[i] = atoi(action[i+4+comm_size]);
958 int rank = smpi_process_index();
959 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
960 extra->type = TRACING_ALLTOALLV;
961 extra->recvcounts= xbt_new(int, comm_size);
962 extra->sendcounts= xbt_new(int, comm_size);
963 extra->num_processes = comm_size;
965 for(i=0; i< comm_size; i++){//copy data to avoid bad free
966 extra->send_size += sendcounts[i];
967 extra->sendcounts[i] = sendcounts[i];
968 extra->recv_size += recvcounts[i];
969 extra->recvcounts[i] = recvcounts[i];
971 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
972 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
974 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
976 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
977 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
979 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
980 log_timed_action (action, clock);
981 xbt_free(sendcounts);
982 xbt_free(recvcounts);
987 void smpi_replay_run(int *argc, char***argv){
988 /* First initializes everything */
989 smpi_process_init(argc, argv);
990 smpi_process_mark_as_initialized();
991 smpi_process_set_replaying(true);
993 int rank = smpi_process_index();
994 TRACE_smpi_init(rank);
995 TRACE_smpi_computing_init(rank);
996 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
997 extra->type = TRACING_INIT;
998 char *operation =bprintf("%s_init",__FUNCTION__);
999 TRACE_smpi_collective_in(rank, -1, operation, extra);
1000 TRACE_smpi_collective_out(rank, -1, operation);
1001 xbt_free(operation);
1003 if (_xbt_replay_action_init()==0) {
1004 xbt_replay_action_register("init", action_init);
1005 xbt_replay_action_register("finalize", action_finalize);
1006 xbt_replay_action_register("comm_size", action_comm_size);
1007 xbt_replay_action_register("comm_split", action_comm_split);
1008 xbt_replay_action_register("comm_dup", action_comm_dup);
1009 xbt_replay_action_register("send", action_send);
1010 xbt_replay_action_register("Isend", action_Isend);
1011 xbt_replay_action_register("recv", action_recv);
1012 xbt_replay_action_register("Irecv", action_Irecv);
1013 xbt_replay_action_register("test", action_test);
1014 xbt_replay_action_register("wait", action_wait);
1015 xbt_replay_action_register("waitAll", action_waitall);
1016 xbt_replay_action_register("barrier", action_barrier);
1017 xbt_replay_action_register("bcast", action_bcast);
1018 xbt_replay_action_register("reduce", action_reduce);
1019 xbt_replay_action_register("allReduce", action_allReduce);
1020 xbt_replay_action_register("allToAll", action_allToAll);
1021 xbt_replay_action_register("allToAllV", action_allToAllv);
1022 xbt_replay_action_register("gather", action_gather);
1023 xbt_replay_action_register("gatherV", action_gatherv);
1024 xbt_replay_action_register("allGather", action_allgather);
1025 xbt_replay_action_register("allGatherV", action_allgatherv);
1026 xbt_replay_action_register("reduceScatter", action_reducescatter);
1027 xbt_replay_action_register("compute", action_compute);
1030 //if we have a delayed start, sleep here.
1033 double value = strtod((*argv)[2], &endptr);
1034 if (*endptr != '\0')
1035 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1036 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1037 smpi_execute_flops(value);
1039 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1040 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
1041 smpi_execute_flops(0.0);
1044 /* Actually run the replay */
1045 xbt_replay_action_runner(*argc, *argv);
1047 /* and now, finalize everything */
1048 double sim_time= 1.;
1049 /* One active process will stop. Decrease the counter*/
1050 XBT_DEBUG("There are %lu elements in reqq[*]", xbt_dynar_length(get_reqq_self()));
1051 if (xbt_dynar_is_empty(get_reqq_self())==0){
1052 int count_requests=xbt_dynar_length(get_reqq_self());
1053 MPI_Request requests[count_requests];
1054 MPI_Status status[count_requests];
1057 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1058 smpi_mpi_waitall(count_requests, requests, status);
1064 if(active_processes==0){
1065 /* Last process alive speaking */
1066 /* end the simulated timer */
1067 sim_time = smpi_process_simulated_elapsed();
1068 XBT_INFO("Simulation time %f", sim_time);
1069 _xbt_replay_action_exit();
1070 xbt_free(sendbuffer);
1071 xbt_free(recvbuffer);
1072 xbt_dict_free(&reqq); //not need, data have been freed ???
1076 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1077 extra_fin->type = TRACING_FINALIZE;
1078 operation =bprintf("%s_finalize",__FUNCTION__);
1079 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1081 smpi_process_finalize();
1083 TRACE_smpi_collective_out(rank, -1, operation);
1084 TRACE_smpi_finalize(smpi_process_index());
1085 smpi_process_destroy();
1086 xbt_free(operation);