1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
28 static void log_timed_action (const char *const *action, double clock){
29 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30 char *name = xbt_str_join_array(action, " ");
31 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static xbt_dynar_t get_reqq_self(){
40 asprintf(&key, "%d", smpi_process_index());
41 xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
44 return dynar_mpi_request;
47 static void set_reqq_self(xbt_dynar_t mpi_request){
50 asprintf(&key, "%d", smpi_process_index());
51 xbt_dict_set(reqq, key, mpi_request, free);
56 //allocate a single buffer for all sends, growing it if needed
57 void* smpi_get_tmp_sendbuffer(int size){
58 if (!smpi_process_get_replaying())
59 return xbt_malloc(size);
60 if (sendbuffer_size<size){
61 sendbuffer=xbt_realloc(sendbuffer,size);
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68 if (!smpi_process_get_replaying())
69 return xbt_malloc(size);
70 if (recvbuffer_size<size){
71 recvbuffer=xbt_realloc(recvbuffer,size);
77 void smpi_free_tmp_buffer(void* buf){
78 if (!smpi_process_get_replaying())
83 static double parse_double(const char *string)
87 value = strtod(string, &endptr);
89 THROWF(unknown_error, 0, "%s is not a double", string);
93 static MPI_Datatype decode_datatype(const char *const action)
95 // Declared datatypes,
100 MPI_CURRENT_TYPE=MPI_DOUBLE;
103 MPI_CURRENT_TYPE=MPI_INT;
106 MPI_CURRENT_TYPE=MPI_CHAR;
109 MPI_CURRENT_TYPE=MPI_SHORT;
112 MPI_CURRENT_TYPE=MPI_LONG;
115 MPI_CURRENT_TYPE=MPI_FLOAT;
118 MPI_CURRENT_TYPE=MPI_BYTE;
121 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
124 return MPI_CURRENT_TYPE;
128 const char* encode_datatype(MPI_Datatype datatype, int* known)
131 //default type for output is set to MPI_BYTE
132 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
134 if (datatype==MPI_BYTE){
137 if(datatype==MPI_DOUBLE)
139 if(datatype==MPI_INT)
141 if(datatype==MPI_CHAR)
143 if(datatype==MPI_SHORT)
145 if(datatype==MPI_LONG)
147 if(datatype==MPI_FLOAT)
149 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
151 // default - not implemented.
152 // do not warn here as we pass in this function even for other trace formats
156 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
158 while(action[i]!=NULL)\
161 THROWF(arg_error, 0, "%s replay failed.\n" \
162 "%d items were given on the line. First two should be process_id and action. " \
163 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
164 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
168 static void action_init(const char *const *action)
170 XBT_DEBUG("Initialize the counters");
171 CHECK_ACTION_PARAMS(action, 0, 1);
172 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
173 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
175 /* start a simulated timer */
176 smpi_process_simulated_start();
177 /*initialize the number of active processes */
178 active_processes = smpi_process_count();
181 reqq = xbt_dict_new();
184 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
187 reqq=xbt_new0(xbt_dynar_t,active_processes);
189 for(i=0;i<active_processes;i++){
190 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
196 static void action_finalize(const char *const *action)
200 static void action_comm_size(const char *const *action)
202 double clock = smpi_process_simulated_elapsed();
204 communicator_size = parse_double(action[2]);
205 log_timed_action (action, clock);
208 static void action_comm_split(const char *const *action)
210 double clock = smpi_process_simulated_elapsed();
212 log_timed_action (action, clock);
215 static void action_comm_dup(const char *const *action)
217 double clock = smpi_process_simulated_elapsed();
219 log_timed_action (action, clock);
222 static void action_compute(const char *const *action)
224 CHECK_ACTION_PARAMS(action, 1, 0);
225 double clock = smpi_process_simulated_elapsed();
226 double flops= parse_double(action[2]);
227 int rank = smpi_process_index();
228 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
229 extra->type=TRACING_COMPUTING;
230 extra->comp_size=flops;
231 TRACE_smpi_computing_in(rank, extra);
233 smpi_execute_flops(flops);
235 TRACE_smpi_computing_out(rank);
236 log_timed_action (action, clock);
239 static void action_send(const char *const *action)
241 CHECK_ACTION_PARAMS(action, 2, 1);
242 int to = atoi(action[2]);
243 double size=parse_double(action[3]);
244 double clock = smpi_process_simulated_elapsed();
247 MPI_CURRENT_TYPE=decode_datatype(action[4]);
249 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
252 int rank = smpi_process_index();
254 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
255 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
256 extra->type = TRACING_SEND;
257 extra->send_size = size;
259 extra->dst = dst_traced;
260 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
261 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
262 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
264 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
266 log_timed_action (action, clock);
268 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
271 static void action_Isend(const char *const *action)
273 CHECK_ACTION_PARAMS(action, 2, 1);
274 int to = atoi(action[2]);
275 double size=parse_double(action[3]);
276 double clock = smpi_process_simulated_elapsed();
279 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
280 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
282 int rank = smpi_process_index();
283 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
284 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
285 extra->type = TRACING_ISEND;
286 extra->send_size = size;
288 extra->dst = dst_traced;
289 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
290 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
291 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
293 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
295 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
298 xbt_dynar_push(get_reqq_self(),&request);
300 log_timed_action (action, clock);
303 static void action_recv(const char *const *action) {
304 CHECK_ACTION_PARAMS(action, 2, 1);
305 int from = atoi(action[2]);
306 double size=parse_double(action[3]);
307 double clock = smpi_process_simulated_elapsed();
310 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
311 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
313 int rank = smpi_process_index();
314 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
316 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
317 extra->type = TRACING_RECV;
318 extra->send_size = size;
319 extra->src = src_traced;
321 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
322 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
324 //unknow size from the receiver pov
326 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
330 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
332 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
333 TRACE_smpi_recv(rank, src_traced, rank);
335 log_timed_action (action, clock);
338 static void action_Irecv(const char *const *action)
340 CHECK_ACTION_PARAMS(action, 2, 1);
341 int from = atoi(action[2]);
342 double size=parse_double(action[3]);
343 double clock = smpi_process_simulated_elapsed();
346 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
347 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
349 int rank = smpi_process_index();
350 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
351 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
352 extra->type = TRACING_IRECV;
353 extra->send_size = size;
354 extra->src = src_traced;
356 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
357 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
359 //unknow size from the receiver pov
361 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
365 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
367 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
369 xbt_dynar_push(get_reqq_self(),&request);
371 log_timed_action (action, clock);
374 static void action_test(const char *const *action){
375 CHECK_ACTION_PARAMS(action, 0, 0);
376 double clock = smpi_process_simulated_elapsed();
381 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
382 //if request is null here, this may mean that a previous test has succeeded
383 //Different times in traced application and replayed version may lead to this
384 //In this case, ignore the extra calls.
386 int rank = smpi_process_index();
387 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
388 extra->type=TRACING_TEST;
389 TRACE_smpi_testing_in(rank, extra);
391 flag = smpi_mpi_test(&request, &status);
393 XBT_DEBUG("MPI_Test result: %d", flag);
394 /* push back request in dynar to be caught by a subsequent wait. if the test
395 * did succeed, the request is now NULL.
397 xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
399 TRACE_smpi_testing_out(rank);
401 log_timed_action (action, clock);
404 static void action_wait(const char *const *action){
405 CHECK_ACTION_PARAMS(action, 0, 0);
406 double clock = smpi_process_simulated_elapsed();
410 xbt_assert(xbt_dynar_length(get_reqq_self()),
411 "action wait not preceded by any irecv or isend: %s",
412 xbt_str_join_array(action," "));
413 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
416 /* Assuming that the trace is well formed, this mean the comm might have
417 * been caught by a MPI_test. Then just return.
422 int rank = request->comm != MPI_COMM_NULL
423 ? smpi_comm_rank(request->comm)
426 MPI_Group group = smpi_comm_group(request->comm);
427 int src_traced = smpi_group_rank(group, request->src);
428 int dst_traced = smpi_group_rank(group, request->dst);
429 int is_wait_for_receive = request->recv;
430 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
431 extra->type = TRACING_WAIT;
432 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
434 smpi_mpi_wait(&request, &status);
436 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
437 if (is_wait_for_receive)
438 TRACE_smpi_recv(rank, src_traced, dst_traced);
439 log_timed_action (action, clock);
442 static void action_waitall(const char *const *action){
443 CHECK_ACTION_PARAMS(action, 0, 0);
444 double clock = smpi_process_simulated_elapsed();
445 int count_requests=0;
448 count_requests=xbt_dynar_length(get_reqq_self());
450 if (count_requests>0) {
451 MPI_Request requests[count_requests];
452 MPI_Status status[count_requests];
454 /* The reqq is an array of dynars. Its index corresponds to the rank.
455 Thus each rank saves its own requests to the array request. */
456 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
458 //save information from requests
460 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
461 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
462 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
463 for (i = 0; i < count_requests; i++) {
465 int *asrc = xbt_new(int, 1);
466 int *adst = xbt_new(int, 1);
467 int *arecv = xbt_new(int, 1);
468 *asrc = requests[i]->src;
469 *adst = requests[i]->dst;
470 *arecv = requests[i]->recv;
471 xbt_dynar_insert_at(srcs, i, asrc);
472 xbt_dynar_insert_at(dsts, i, adst);
473 xbt_dynar_insert_at(recvs, i, arecv);
478 int *t = xbt_new(int, 1);
479 xbt_dynar_insert_at(srcs, i, t);
480 xbt_dynar_insert_at(dsts, i, t);
481 xbt_dynar_insert_at(recvs, i, t);
485 int rank_traced = smpi_process_index();
486 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
487 extra->type = TRACING_WAITALL;
488 extra->send_size=count_requests;
489 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
491 smpi_mpi_waitall(count_requests, requests, status);
493 for (i = 0; i < count_requests; i++) {
494 int src_traced, dst_traced, is_wait_for_receive;
495 xbt_dynar_get_cpy(srcs, i, &src_traced);
496 xbt_dynar_get_cpy(dsts, i, &dst_traced);
497 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
498 if (is_wait_for_receive) {
499 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
502 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
504 xbt_dynar_free(&srcs);
505 xbt_dynar_free(&dsts);
506 xbt_dynar_free(&recvs);
508 //TODO xbt_dynar_free_container(get_reqq_self());
509 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
511 log_timed_action (action, clock);
514 static void action_barrier(const char *const *action){
515 double clock = smpi_process_simulated_elapsed();
516 int rank = smpi_process_index();
517 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
518 extra->type = TRACING_BARRIER;
519 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
521 mpi_coll_barrier_fun(MPI_COMM_WORLD);
523 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
524 log_timed_action (action, clock);
528 static void action_bcast(const char *const *action)
530 CHECK_ACTION_PARAMS(action, 1, 2);
531 double size = parse_double(action[2]);
532 double clock = smpi_process_simulated_elapsed();
535 * Initialize MPI_CURRENT_TYPE in order to decrease
536 * the number of the checks
538 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
541 root= atoi(action[3]);
543 MPI_CURRENT_TYPE=decode_datatype(action[4]);
547 int rank = smpi_process_index();
548 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
550 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
551 extra->type = TRACING_BCAST;
552 extra->send_size = size;
553 extra->root = root_traced;
554 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
555 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
556 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
558 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
560 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
561 log_timed_action (action, clock);
564 static void action_reduce(const char *const *action)
566 CHECK_ACTION_PARAMS(action, 2, 2);
567 double comm_size = parse_double(action[2]);
568 double comp_size = parse_double(action[3]);
569 double clock = smpi_process_simulated_elapsed();
571 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
574 root= atoi(action[4]);
576 MPI_CURRENT_TYPE=decode_datatype(action[5]);
582 int rank = smpi_process_index();
583 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
584 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
585 extra->type = TRACING_REDUCE;
586 extra->send_size = comm_size;
587 extra->comp_size = comp_size;
588 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
589 extra->root = root_traced;
591 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
593 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
594 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
595 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
596 smpi_execute_flops(comp_size);
598 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
599 log_timed_action (action, clock);
602 static void action_allReduce(const char *const *action) {
603 CHECK_ACTION_PARAMS(action, 2, 1);
604 double comm_size = parse_double(action[2]);
605 double comp_size = parse_double(action[3]);
607 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
608 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
610 double clock = smpi_process_simulated_elapsed();
611 int rank = smpi_process_index();
612 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
613 extra->type = TRACING_ALLREDUCE;
614 extra->send_size = comm_size;
615 extra->comp_size = comp_size;
616 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
617 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
619 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
620 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
621 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
622 smpi_execute_flops(comp_size);
624 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
625 log_timed_action (action, clock);
628 static void action_allToAll(const char *const *action) {
629 double clock = smpi_process_simulated_elapsed();
630 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
631 int send_size = parse_double(action[2]);
632 int recv_size = parse_double(action[3]);
633 MPI_Datatype MPI_CURRENT_TYPE2;
636 MPI_CURRENT_TYPE=decode_datatype(action[4]);
637 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
640 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
641 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
643 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
644 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
646 int rank = smpi_process_index();
647 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
648 extra->type = TRACING_ALLTOALL;
649 extra->send_size = send_size;
650 extra->recv_size = recv_size;
651 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
652 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
654 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
656 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
658 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
659 log_timed_action (action, clock);
663 static void action_gather(const char *const *action) {
665 The structure of the gather action for the rank 0 (total 4 processes)
670 1) 68 is the sendcounts
671 2) 68 is the recvcounts
672 3) 0 is the root node
673 4) 0 is the send datatype id, see decode_datatype()
674 5) 0 is the recv datatype id, see decode_datatype()
676 CHECK_ACTION_PARAMS(action, 2, 3);
677 double clock = smpi_process_simulated_elapsed();
678 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
679 int send_size = parse_double(action[2]);
680 int recv_size = parse_double(action[3]);
681 MPI_Datatype MPI_CURRENT_TYPE2;
683 MPI_CURRENT_TYPE=decode_datatype(action[5]);
684 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
686 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
687 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
689 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
693 root=atoi(action[4]);
694 int rank = smpi_comm_rank(MPI_COMM_WORLD);
697 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
699 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
700 extra->type = TRACING_GATHER;
701 extra->send_size = send_size;
702 extra->recv_size = recv_size;
704 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
705 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
707 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
709 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
710 recv, recv_size, MPI_CURRENT_TYPE2,
711 root, MPI_COMM_WORLD);
713 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
714 log_timed_action (action, clock);
719 static void action_gatherv(const char *const *action) {
721 The structure of the gatherv action for the rank 0 (total 4 processes)
723 0 gather 68 68 10 10 10 0 0 0
726 1) 68 is the sendcount
727 2) 68 10 10 10 is the recvcounts
728 3) 0 is the root node
729 4) 0 is the send datatype id, see decode_datatype()
730 5) 0 is the recv datatype id, see decode_datatype()
733 double clock = smpi_process_simulated_elapsed();
734 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
735 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
736 int send_size = parse_double(action[2]);
737 int *disps = xbt_new0(int, comm_size);
738 int *recvcounts = xbt_new0(int, comm_size);
741 MPI_Datatype MPI_CURRENT_TYPE2;
742 if(action[4+comm_size]) {
743 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
744 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
746 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
747 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
749 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
751 for(i=0;i<comm_size;i++) {
752 recvcounts[i] = atoi(action[i+3]);
753 recv_sum=recv_sum+recvcounts[i];
757 int root=atoi(action[3+comm_size]);
758 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
761 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
763 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
764 extra->type = TRACING_GATHERV;
765 extra->send_size = send_size;
766 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
767 for(i=0; i< comm_size; i++)//copy data to avoid bad free
768 extra->recvcounts[i] = recvcounts[i];
770 extra->num_processes = comm_size;
771 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
772 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
774 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
776 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
777 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
778 root, MPI_COMM_WORLD);
780 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
781 log_timed_action (action, clock);
782 xbt_free(recvcounts);
786 static void action_reducescatter(const char *const *action) {
789 The structure of the reducescatter action for the rank 0 (total 4 processes)
791 0 reduceScatter 275427 275427 275427 204020 11346849 0
794 1) The first four values after the name of the action declare the recvcounts array
795 2) The value 11346849 is the amount of instructions
796 3) The last value corresponds to the datatype, see decode_datatype().
798 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
802 double clock = smpi_process_simulated_elapsed();
803 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
804 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
805 int comp_size = parse_double(action[2+comm_size]);
806 int *recvcounts = xbt_new0(int, comm_size);
807 int *disps = xbt_new0(int, comm_size);
809 int rank = smpi_process_index();
811 if(action[3+comm_size])
812 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
814 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
816 for(i=0;i<comm_size;i++) {
817 recvcounts[i] = atoi(action[i+2]);
822 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
823 extra->type = TRACING_REDUCE_SCATTER;
824 extra->send_size = 0;
825 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
826 for(i=0; i< comm_size; i++)//copy data to avoid bad free
827 extra->recvcounts[i] = recvcounts[i];
828 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
829 extra->comp_size = comp_size;
830 extra->num_processes = comm_size;
832 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
834 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
835 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
837 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
839 smpi_execute_flops(comp_size);
842 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
843 xbt_free(recvcounts);
845 log_timed_action (action, clock);
848 static void action_allgather(const char *const *action) {
850 The structure of the allgather action for the rank 0 (total 4 processes)
852 0 allGather 275427 275427
855 1) 275427 is the sendcount
856 2) 275427 is the recvcount
857 3) No more values mean that the datatype for sent and receive buffer
858 is the default one, see decode_datatype().
862 double clock = smpi_process_simulated_elapsed();
864 CHECK_ACTION_PARAMS(action, 2, 2);
865 int sendcount=atoi(action[2]);
866 int recvcount=atoi(action[3]);
868 MPI_Datatype MPI_CURRENT_TYPE2;
871 MPI_CURRENT_TYPE = decode_datatype(action[3]);
872 MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
874 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
875 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
877 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
878 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
880 int rank = smpi_process_index();
881 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
882 extra->type = TRACING_ALLGATHER;
883 extra->send_size = sendcount;
884 extra->recv_size= recvcount;
885 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
886 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
887 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
889 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
891 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
893 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
894 log_timed_action (action, clock);
897 static void action_allgatherv(const char *const *action) {
900 The structure of the allgatherv action for the rank 0 (total 4 processes)
902 0 allGatherV 275427 275427 275427 275427 204020
905 1) 275427 is the sendcount
906 2) The next four elements declare the recvcounts array
907 3) No more values mean that the datatype for sent and receive buffer
908 is the default one, see decode_datatype().
912 double clock = smpi_process_simulated_elapsed();
914 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
915 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
917 int sendcount=atoi(action[2]);
918 int *recvcounts = xbt_new0(int, comm_size);
919 int *disps = xbt_new0(int, comm_size);
921 MPI_Datatype MPI_CURRENT_TYPE2;
923 if(action[3+comm_size]) {
924 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
925 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
927 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
928 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
930 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
932 for(i=0;i<comm_size;i++) {
933 recvcounts[i] = atoi(action[i+3]);
934 recv_sum=recv_sum+recvcounts[i];
936 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
938 int rank = smpi_process_index();
939 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
940 extra->type = TRACING_ALLGATHERV;
941 extra->send_size = sendcount;
942 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
943 for(i=0; i< comm_size; i++)//copy data to avoid bad free
944 extra->recvcounts[i] = recvcounts[i];
945 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
946 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
947 extra->num_processes = comm_size;
949 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
951 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
953 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
954 log_timed_action (action, clock);
955 xbt_free(recvcounts);
959 static void action_allToAllv(const char *const *action) {
961 The structure of the allToAllV action for the rank 0 (total 4 processes)
963 0 allToAllV 100 1 7 10 12 100 1 70 10 5
966 1) 100 is the size of the send buffer *sizeof(int),
967 2) 1 7 10 12 is the sendcounts array
968 3) 100*sizeof(int) is the size of the receiver buffer
969 4) 1 70 10 5 is the recvcounts array
974 double clock = smpi_process_simulated_elapsed();
976 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
977 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
978 int send_buf_size=0,recv_buf_size=0,i=0;
979 int *sendcounts = xbt_new0(int, comm_size);
980 int *recvcounts = xbt_new0(int, comm_size);
981 int *senddisps = xbt_new0(int, comm_size);
982 int *recvdisps = xbt_new0(int, comm_size);
984 MPI_Datatype MPI_CURRENT_TYPE2;
986 send_buf_size=parse_double(action[2]);
987 recv_buf_size=parse_double(action[3+comm_size]);
988 if(action[4+2*comm_size]) {
989 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
990 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
993 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
994 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
997 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
998 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1000 for(i=0;i<comm_size;i++) {
1001 sendcounts[i] = atoi(action[i+3]);
1002 recvcounts[i] = atoi(action[i+4+comm_size]);
1006 int rank = smpi_process_index();
1007 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1008 extra->type = TRACING_ALLTOALLV;
1009 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1010 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1011 extra->num_processes = comm_size;
1013 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1014 extra->send_size += sendcounts[i];
1015 extra->sendcounts[i] = sendcounts[i];
1016 extra->recv_size += recvcounts[i];
1017 extra->recvcounts[i] = recvcounts[i];
1019 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1020 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1022 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1024 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1025 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1028 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1029 log_timed_action (action, clock);
1030 xbt_free(sendcounts);
1031 xbt_free(recvcounts);
1032 xbt_free(senddisps);
1033 xbt_free(recvdisps);
1036 void smpi_replay_init(int *argc, char***argv){
1037 smpi_process_init(argc, argv);
1038 smpi_process_mark_as_initialized();
1039 smpi_process_set_replaying(1);
1041 int rank = smpi_process_index();
1042 TRACE_smpi_init(rank);
1043 TRACE_smpi_computing_init(rank);
1044 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1045 extra->type = TRACING_INIT;
1046 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1047 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1049 if (!_xbt_replay_action_init()) {
1050 xbt_replay_action_register("init", action_init);
1051 xbt_replay_action_register("finalize", action_finalize);
1052 xbt_replay_action_register("comm_size", action_comm_size);
1053 xbt_replay_action_register("comm_split", action_comm_split);
1054 xbt_replay_action_register("comm_dup", action_comm_dup);
1055 xbt_replay_action_register("send", action_send);
1056 xbt_replay_action_register("Isend", action_Isend);
1057 xbt_replay_action_register("recv", action_recv);
1058 xbt_replay_action_register("Irecv", action_Irecv);
1059 xbt_replay_action_register("test", action_test);
1060 xbt_replay_action_register("wait", action_wait);
1061 xbt_replay_action_register("waitAll", action_waitall);
1062 xbt_replay_action_register("barrier", action_barrier);
1063 xbt_replay_action_register("bcast", action_bcast);
1064 xbt_replay_action_register("reduce", action_reduce);
1065 xbt_replay_action_register("allReduce", action_allReduce);
1066 xbt_replay_action_register("allToAll", action_allToAll);
1067 xbt_replay_action_register("allToAllV", action_allToAllv);
1068 xbt_replay_action_register("gather", action_gather);
1069 xbt_replay_action_register("gatherV", action_gatherv);
1070 xbt_replay_action_register("allGather", action_allgather);
1071 xbt_replay_action_register("allGatherV", action_allgatherv);
1072 xbt_replay_action_register("reduceScatter", action_reducescatter);
1073 xbt_replay_action_register("compute", action_compute);
1076 //if we have a delayed start, sleep here.
1079 double value = strtod((*argv)[2], &endptr);
1080 if (*endptr != '\0')
1081 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1082 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1083 smpi_execute_flops(value);
1085 //UGLY done to force context switch to be sure that all MSG_processes begin initialization
1086 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
1087 smpi_execute_flops(0.0);
1090 xbt_replay_action_runner(*argc, *argv);
1093 int smpi_replay_finalize(){
1094 double sim_time= 1.;
1095 /* One active process will stop. Decrease the counter*/
1096 XBT_DEBUG("There are %lu elements in reqq[*]",
1097 xbt_dynar_length(get_reqq_self()));
1098 if (!xbt_dynar_is_empty(get_reqq_self())){
1099 int count_requests=xbt_dynar_length(get_reqq_self());
1100 MPI_Request requests[count_requests];
1101 MPI_Status status[count_requests];
1104 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1105 smpi_mpi_waitall(count_requests, requests, status);
1111 if(!active_processes){
1112 /* Last process alive speaking */
1113 /* end the simulated timer */
1114 sim_time = smpi_process_simulated_elapsed();
1118 //TODO xbt_dynar_free_container(get_reqq_self()));
1120 if(!active_processes){
1121 XBT_INFO("Simulation time %f", sim_time);
1122 _xbt_replay_action_exit();
1123 xbt_free(sendbuffer);
1124 xbt_free(recvbuffer);
1126 xbt_dict_free(&reqq); //not need, data have been freed ???
1131 int rank = smpi_process_index();
1132 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1133 extra->type = TRACING_FINALIZE;
1134 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1136 smpi_process_finalize();
1138 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1139 TRACE_smpi_finalize(smpi_process_index());
1140 smpi_process_destroy();