1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
28 static void log_timed_action (const char *const *action, double clock){
29 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30 char *name = xbt_str_join_array(action, " ");
31 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static xbt_dynar_t get_reqq_self(){
40 int size = asprintf(&key, "%d", smpi_process_index());
42 xbt_die("could not allocate memory for asprintf");
43 xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
46 return dynar_mpi_request;
49 static void set_reqq_self(xbt_dynar_t mpi_request){
52 int size = asprintf(&key, "%d", smpi_process_index());
54 xbt_die("could not allocate memory for asprintf");
55 xbt_dict_set(reqq, key, mpi_request, free);
60 //allocate a single buffer for all sends, growing it if needed
61 void* smpi_get_tmp_sendbuffer(int size){
62 if (!smpi_process_get_replaying())
63 return xbt_malloc(size);
64 if (sendbuffer_size<size){
65 sendbuffer=xbt_realloc(sendbuffer,size);
70 //allocate a single buffer for all recv
71 void* smpi_get_tmp_recvbuffer(int size){
72 if (!smpi_process_get_replaying())
73 return xbt_malloc(size);
74 if (recvbuffer_size<size){
75 recvbuffer=xbt_realloc(recvbuffer,size);
81 void smpi_free_tmp_buffer(void* buf){
82 if (!smpi_process_get_replaying())
87 static double parse_double(const char *string)
91 value = strtod(string, &endptr);
93 THROWF(unknown_error, 0, "%s is not a double", string);
97 static MPI_Datatype decode_datatype(const char *const action)
99 // Declared datatypes,
104 MPI_CURRENT_TYPE=MPI_DOUBLE;
107 MPI_CURRENT_TYPE=MPI_INT;
110 MPI_CURRENT_TYPE=MPI_CHAR;
113 MPI_CURRENT_TYPE=MPI_SHORT;
116 MPI_CURRENT_TYPE=MPI_LONG;
119 MPI_CURRENT_TYPE=MPI_FLOAT;
122 MPI_CURRENT_TYPE=MPI_BYTE;
125 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
128 return MPI_CURRENT_TYPE;
132 const char* encode_datatype(MPI_Datatype datatype, int* known)
135 //default type for output is set to MPI_BYTE
136 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
138 if (datatype==MPI_BYTE){
141 if(datatype==MPI_DOUBLE)
143 if(datatype==MPI_INT)
145 if(datatype==MPI_CHAR)
147 if(datatype==MPI_SHORT)
149 if(datatype==MPI_LONG)
151 if(datatype==MPI_FLOAT)
153 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
155 // default - not implemented.
156 // do not warn here as we pass in this function even for other trace formats
160 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
162 while(action[i]!=NULL)\
165 THROWF(arg_error, 0, "%s replay failed.\n" \
166 "%d items were given on the line. First two should be process_id and action. " \
167 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
168 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
172 static void action_init(const char *const *action)
174 XBT_DEBUG("Initialize the counters");
175 CHECK_ACTION_PARAMS(action, 0, 1);
176 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
177 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
179 /* start a simulated timer */
180 smpi_process_simulated_start();
181 /*initialize the number of active processes */
182 active_processes = smpi_process_count();
185 reqq = xbt_dict_new();
188 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
191 reqq=xbt_new0(xbt_dynar_t,active_processes);
193 for(i=0;i<active_processes;i++){
194 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
200 static void action_finalize(const char *const *action)
204 static void action_comm_size(const char *const *action)
206 double clock = smpi_process_simulated_elapsed();
208 communicator_size = parse_double(action[2]);
209 log_timed_action (action, clock);
212 static void action_comm_split(const char *const *action)
214 double clock = smpi_process_simulated_elapsed();
216 log_timed_action (action, clock);
219 static void action_comm_dup(const char *const *action)
221 double clock = smpi_process_simulated_elapsed();
223 log_timed_action (action, clock);
226 static void action_compute(const char *const *action)
228 CHECK_ACTION_PARAMS(action, 1, 0);
229 double clock = smpi_process_simulated_elapsed();
230 double flops= parse_double(action[2]);
231 int rank = smpi_process_index();
232 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
233 extra->type=TRACING_COMPUTING;
234 extra->comp_size=flops;
235 TRACE_smpi_computing_in(rank, extra);
237 smpi_execute_flops(flops);
239 TRACE_smpi_computing_out(rank);
240 log_timed_action (action, clock);
243 static void action_send(const char *const *action)
245 CHECK_ACTION_PARAMS(action, 2, 1);
246 int to = atoi(action[2]);
247 double size=parse_double(action[3]);
248 double clock = smpi_process_simulated_elapsed();
251 MPI_CURRENT_TYPE=decode_datatype(action[4]);
253 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
256 int rank = smpi_process_index();
258 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
259 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
260 extra->type = TRACING_SEND;
261 extra->send_size = size;
263 extra->dst = dst_traced;
264 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
265 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
266 if (!TRACE_smpi_view_internals()) {
267 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
270 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
272 log_timed_action (action, clock);
274 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
277 static void action_Isend(const char *const *action)
279 CHECK_ACTION_PARAMS(action, 2, 1);
280 int to = atoi(action[2]);
281 double size=parse_double(action[3]);
282 double clock = smpi_process_simulated_elapsed();
285 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
286 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
288 int rank = smpi_process_index();
289 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
290 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291 extra->type = TRACING_ISEND;
292 extra->send_size = size;
294 extra->dst = dst_traced;
295 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
296 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
297 if (!TRACE_smpi_view_internals()) {
298 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
301 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
303 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
306 xbt_dynar_push(get_reqq_self(),&request);
308 log_timed_action (action, clock);
311 static void action_recv(const char *const *action) {
312 CHECK_ACTION_PARAMS(action, 2, 1);
313 int from = atoi(action[2]);
314 double size=parse_double(action[3]);
315 double clock = smpi_process_simulated_elapsed();
318 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
319 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
321 int rank = smpi_process_index();
322 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
324 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
325 extra->type = TRACING_RECV;
326 extra->send_size = size;
327 extra->src = src_traced;
329 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
330 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
332 //unknow size from the receiver pov
334 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
338 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
340 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
341 if (!TRACE_smpi_view_internals()) {
342 TRACE_smpi_recv(rank, src_traced, rank);
345 log_timed_action (action, clock);
348 static void action_Irecv(const char *const *action)
350 CHECK_ACTION_PARAMS(action, 2, 1);
351 int from = atoi(action[2]);
352 double size=parse_double(action[3]);
353 double clock = smpi_process_simulated_elapsed();
356 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
357 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
359 int rank = smpi_process_index();
360 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
361 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
362 extra->type = TRACING_IRECV;
363 extra->send_size = size;
364 extra->src = src_traced;
366 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
367 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
369 //unknow size from the receiver pov
371 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
375 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
377 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
379 xbt_dynar_push(get_reqq_self(),&request);
381 log_timed_action (action, clock);
384 static void action_test(const char *const *action){
385 CHECK_ACTION_PARAMS(action, 0, 0);
386 double clock = smpi_process_simulated_elapsed();
391 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
392 //if request is null here, this may mean that a previous test has succeeded
393 //Different times in traced application and replayed version may lead to this
394 //In this case, ignore the extra calls.
396 int rank = smpi_process_index();
397 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
398 extra->type=TRACING_TEST;
399 TRACE_smpi_testing_in(rank, extra);
401 flag = smpi_mpi_test(&request, &status);
403 XBT_DEBUG("MPI_Test result: %d", flag);
404 /* push back request in dynar to be caught by a subsequent wait. if the test
405 * did succeed, the request is now NULL.
407 xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
409 TRACE_smpi_testing_out(rank);
411 log_timed_action (action, clock);
414 static void action_wait(const char *const *action){
415 CHECK_ACTION_PARAMS(action, 0, 0);
416 double clock = smpi_process_simulated_elapsed();
420 xbt_assert(xbt_dynar_length(get_reqq_self()),
421 "action wait not preceded by any irecv or isend: %s",
422 xbt_str_join_array(action," "));
423 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
426 /* Assuming that the trace is well formed, this mean the comm might have
427 * been caught by a MPI_test. Then just return.
432 int rank = request->comm != MPI_COMM_NULL
433 ? smpi_comm_rank(request->comm)
436 MPI_Group group = smpi_comm_group(request->comm);
437 int src_traced = smpi_group_rank(group, request->src);
438 int dst_traced = smpi_group_rank(group, request->dst);
439 int is_wait_for_receive = request->recv;
440 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
441 extra->type = TRACING_WAIT;
442 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
444 smpi_mpi_wait(&request, &status);
446 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
447 if (is_wait_for_receive)
448 TRACE_smpi_recv(rank, src_traced, dst_traced);
449 log_timed_action (action, clock);
452 static void action_waitall(const char *const *action){
453 CHECK_ACTION_PARAMS(action, 0, 0);
454 double clock = smpi_process_simulated_elapsed();
455 int count_requests=0;
458 count_requests=xbt_dynar_length(get_reqq_self());
460 if (count_requests>0) {
461 MPI_Request requests[count_requests];
462 MPI_Status status[count_requests];
464 /* The reqq is an array of dynars. Its index corresponds to the rank.
465 Thus each rank saves its own requests to the array request. */
466 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
468 //save information from requests
470 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
471 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
472 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
473 for (i = 0; i < count_requests; i++) {
475 int *asrc = xbt_new(int, 1);
476 int *adst = xbt_new(int, 1);
477 int *arecv = xbt_new(int, 1);
478 *asrc = requests[i]->src;
479 *adst = requests[i]->dst;
480 *arecv = requests[i]->recv;
481 xbt_dynar_insert_at(srcs, i, asrc);
482 xbt_dynar_insert_at(dsts, i, adst);
483 xbt_dynar_insert_at(recvs, i, arecv);
488 int *t = xbt_new(int, 1);
489 xbt_dynar_insert_at(srcs, i, t);
490 xbt_dynar_insert_at(dsts, i, t);
491 xbt_dynar_insert_at(recvs, i, t);
495 int rank_traced = smpi_process_index();
496 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
497 extra->type = TRACING_WAITALL;
498 extra->send_size=count_requests;
499 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
501 smpi_mpi_waitall(count_requests, requests, status);
503 for (i = 0; i < count_requests; i++) {
504 int src_traced, dst_traced, is_wait_for_receive;
505 xbt_dynar_get_cpy(srcs, i, &src_traced);
506 xbt_dynar_get_cpy(dsts, i, &dst_traced);
507 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
508 if (is_wait_for_receive) {
509 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
512 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
514 xbt_dynar_free(&srcs);
515 xbt_dynar_free(&dsts);
516 xbt_dynar_free(&recvs);
518 //TODO xbt_dynar_free_container(get_reqq_self());
519 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
521 log_timed_action (action, clock);
524 static void action_barrier(const char *const *action){
525 double clock = smpi_process_simulated_elapsed();
526 int rank = smpi_process_index();
527 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
528 extra->type = TRACING_BARRIER;
529 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
531 mpi_coll_barrier_fun(MPI_COMM_WORLD);
533 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
534 log_timed_action (action, clock);
538 static void action_bcast(const char *const *action)
540 CHECK_ACTION_PARAMS(action, 1, 2);
541 double size = parse_double(action[2]);
542 double clock = smpi_process_simulated_elapsed();
545 * Initialize MPI_CURRENT_TYPE in order to decrease
546 * the number of the checks
548 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
551 root= atoi(action[3]);
553 MPI_CURRENT_TYPE=decode_datatype(action[4]);
557 int rank = smpi_process_index();
558 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
560 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
561 extra->type = TRACING_BCAST;
562 extra->send_size = size;
563 extra->root = root_traced;
564 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
565 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
566 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
568 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
570 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
571 log_timed_action (action, clock);
574 static void action_reduce(const char *const *action)
576 CHECK_ACTION_PARAMS(action, 2, 2);
577 double comm_size = parse_double(action[2]);
578 double comp_size = parse_double(action[3]);
579 double clock = smpi_process_simulated_elapsed();
581 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
584 root= atoi(action[4]);
586 MPI_CURRENT_TYPE=decode_datatype(action[5]);
592 int rank = smpi_process_index();
593 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
594 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
595 extra->type = TRACING_REDUCE;
596 extra->send_size = comm_size;
597 extra->comp_size = comp_size;
598 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
599 extra->root = root_traced;
601 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
603 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
604 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
605 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
606 smpi_execute_flops(comp_size);
608 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
609 log_timed_action (action, clock);
612 static void action_allReduce(const char *const *action) {
613 CHECK_ACTION_PARAMS(action, 2, 1);
614 double comm_size = parse_double(action[2]);
615 double comp_size = parse_double(action[3]);
617 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
618 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
620 double clock = smpi_process_simulated_elapsed();
621 int rank = smpi_process_index();
622 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
623 extra->type = TRACING_ALLREDUCE;
624 extra->send_size = comm_size;
625 extra->comp_size = comp_size;
626 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
627 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
629 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
630 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
631 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
632 smpi_execute_flops(comp_size);
634 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
635 log_timed_action (action, clock);
638 static void action_allToAll(const char *const *action) {
639 double clock = smpi_process_simulated_elapsed();
640 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
641 int send_size = parse_double(action[2]);
642 int recv_size = parse_double(action[3]);
643 MPI_Datatype MPI_CURRENT_TYPE2;
646 MPI_CURRENT_TYPE=decode_datatype(action[4]);
647 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
650 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
651 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
653 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
654 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
656 int rank = smpi_process_index();
657 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
658 extra->type = TRACING_ALLTOALL;
659 extra->send_size = send_size;
660 extra->recv_size = recv_size;
661 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
662 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
664 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
666 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
668 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
669 log_timed_action (action, clock);
673 static void action_gather(const char *const *action) {
675 The structure of the gather action for the rank 0 (total 4 processes)
680 1) 68 is the sendcounts
681 2) 68 is the recvcounts
682 3) 0 is the root node
683 4) 0 is the send datatype id, see decode_datatype()
684 5) 0 is the recv datatype id, see decode_datatype()
686 CHECK_ACTION_PARAMS(action, 2, 3);
687 double clock = smpi_process_simulated_elapsed();
688 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
689 int send_size = parse_double(action[2]);
690 int recv_size = parse_double(action[3]);
691 MPI_Datatype MPI_CURRENT_TYPE2;
693 MPI_CURRENT_TYPE=decode_datatype(action[5]);
694 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
696 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
697 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
699 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
703 root=atoi(action[4]);
704 int rank = smpi_comm_rank(MPI_COMM_WORLD);
707 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
709 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
710 extra->type = TRACING_GATHER;
711 extra->send_size = send_size;
712 extra->recv_size = recv_size;
714 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
715 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
717 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
719 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
720 recv, recv_size, MPI_CURRENT_TYPE2,
721 root, MPI_COMM_WORLD);
723 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
724 log_timed_action (action, clock);
729 static void action_gatherv(const char *const *action) {
731 The structure of the gatherv action for the rank 0 (total 4 processes)
733 0 gather 68 68 10 10 10 0 0 0
736 1) 68 is the sendcount
737 2) 68 10 10 10 is the recvcounts
738 3) 0 is the root node
739 4) 0 is the send datatype id, see decode_datatype()
740 5) 0 is the recv datatype id, see decode_datatype()
743 double clock = smpi_process_simulated_elapsed();
744 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
745 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
746 int send_size = parse_double(action[2]);
747 int *disps = xbt_new0(int, comm_size);
748 int *recvcounts = xbt_new0(int, comm_size);
751 MPI_Datatype MPI_CURRENT_TYPE2;
752 if(action[4+comm_size]) {
753 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
754 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
756 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
757 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
759 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
761 for(i=0;i<comm_size;i++) {
762 recvcounts[i] = atoi(action[i+3]);
763 recv_sum=recv_sum+recvcounts[i];
767 int root=atoi(action[3+comm_size]);
768 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
771 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
773 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
774 extra->type = TRACING_GATHERV;
775 extra->send_size = send_size;
776 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
777 for(i=0; i< comm_size; i++)//copy data to avoid bad free
778 extra->recvcounts[i] = recvcounts[i];
780 extra->num_processes = comm_size;
781 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
782 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
784 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
786 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
787 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
788 root, MPI_COMM_WORLD);
790 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
791 log_timed_action (action, clock);
792 xbt_free(recvcounts);
796 static void action_reducescatter(const char *const *action) {
799 The structure of the reducescatter action for the rank 0 (total 4 processes)
801 0 reduceScatter 275427 275427 275427 204020 11346849 0
804 1) The first four values after the name of the action declare the recvcounts array
805 2) The value 11346849 is the amount of instructions
806 3) The last value corresponds to the datatype, see decode_datatype().
808 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
812 double clock = smpi_process_simulated_elapsed();
813 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
814 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
815 int comp_size = parse_double(action[2+comm_size]);
816 int *recvcounts = xbt_new0(int, comm_size);
817 int *disps = xbt_new0(int, comm_size);
819 int rank = smpi_process_index();
821 if(action[3+comm_size])
822 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
824 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
826 for(i=0;i<comm_size;i++) {
827 recvcounts[i] = atoi(action[i+2]);
832 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
833 extra->type = TRACING_REDUCE_SCATTER;
834 extra->send_size = 0;
835 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
836 for(i=0; i< comm_size; i++)//copy data to avoid bad free
837 extra->recvcounts[i] = recvcounts[i];
838 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
839 extra->comp_size = comp_size;
840 extra->num_processes = comm_size;
842 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
844 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
845 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
847 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
849 smpi_execute_flops(comp_size);
852 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
853 xbt_free(recvcounts);
855 log_timed_action (action, clock);
858 static void action_allgather(const char *const *action) {
860 The structure of the allgather action for the rank 0 (total 4 processes)
862 0 allGather 275427 275427
865 1) 275427 is the sendcount
866 2) 275427 is the recvcount
867 3) No more values mean that the datatype for sent and receive buffer
868 is the default one, see decode_datatype().
872 double clock = smpi_process_simulated_elapsed();
874 CHECK_ACTION_PARAMS(action, 2, 2);
875 int sendcount=atoi(action[2]);
876 int recvcount=atoi(action[3]);
878 MPI_Datatype MPI_CURRENT_TYPE2;
881 MPI_CURRENT_TYPE = decode_datatype(action[3]);
882 MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
884 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
885 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
887 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
888 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
890 int rank = smpi_process_index();
891 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
892 extra->type = TRACING_ALLGATHER;
893 extra->send_size = sendcount;
894 extra->recv_size= recvcount;
895 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
896 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
897 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
899 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
901 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
903 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
904 log_timed_action (action, clock);
907 static void action_allgatherv(const char *const *action) {
910 The structure of the allgatherv action for the rank 0 (total 4 processes)
912 0 allGatherV 275427 275427 275427 275427 204020
915 1) 275427 is the sendcount
916 2) The next four elements declare the recvcounts array
917 3) No more values mean that the datatype for sent and receive buffer
918 is the default one, see decode_datatype().
922 double clock = smpi_process_simulated_elapsed();
924 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
925 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
927 int sendcount=atoi(action[2]);
928 int *recvcounts = xbt_new0(int, comm_size);
929 int *disps = xbt_new0(int, comm_size);
931 MPI_Datatype MPI_CURRENT_TYPE2;
933 if(action[3+comm_size]) {
934 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
935 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
937 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
938 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
940 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
942 for(i=0;i<comm_size;i++) {
943 recvcounts[i] = atoi(action[i+3]);
944 recv_sum=recv_sum+recvcounts[i];
946 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
948 int rank = smpi_process_index();
949 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
950 extra->type = TRACING_ALLGATHERV;
951 extra->send_size = sendcount;
952 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
953 for(i=0; i< comm_size; i++)//copy data to avoid bad free
954 extra->recvcounts[i] = recvcounts[i];
955 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
956 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
957 extra->num_processes = comm_size;
959 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
961 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
963 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
964 log_timed_action (action, clock);
965 xbt_free(recvcounts);
969 static void action_allToAllv(const char *const *action) {
971 The structure of the allToAllV action for the rank 0 (total 4 processes)
973 0 allToAllV 100 1 7 10 12 100 1 70 10 5
976 1) 100 is the size of the send buffer *sizeof(int),
977 2) 1 7 10 12 is the sendcounts array
978 3) 100*sizeof(int) is the size of the receiver buffer
979 4) 1 70 10 5 is the recvcounts array
984 double clock = smpi_process_simulated_elapsed();
986 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
987 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
988 int send_buf_size=0,recv_buf_size=0,i=0;
989 int *sendcounts = xbt_new0(int, comm_size);
990 int *recvcounts = xbt_new0(int, comm_size);
991 int *senddisps = xbt_new0(int, comm_size);
992 int *recvdisps = xbt_new0(int, comm_size);
994 MPI_Datatype MPI_CURRENT_TYPE2;
996 send_buf_size=parse_double(action[2]);
997 recv_buf_size=parse_double(action[3+comm_size]);
998 if(action[4+2*comm_size]) {
999 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
1000 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
1003 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
1004 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
1007 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1008 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1010 for(i=0;i<comm_size;i++) {
1011 sendcounts[i] = atoi(action[i+3]);
1012 recvcounts[i] = atoi(action[i+4+comm_size]);
1016 int rank = smpi_process_index();
1017 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1018 extra->type = TRACING_ALLTOALLV;
1019 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1020 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1021 extra->num_processes = comm_size;
1023 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1024 extra->send_size += sendcounts[i];
1025 extra->sendcounts[i] = sendcounts[i];
1026 extra->recv_size += recvcounts[i];
1027 extra->recvcounts[i] = recvcounts[i];
1029 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1030 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1032 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1034 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1035 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1038 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1039 log_timed_action (action, clock);
1040 xbt_free(sendcounts);
1041 xbt_free(recvcounts);
1042 xbt_free(senddisps);
1043 xbt_free(recvdisps);
1046 void smpi_replay_run(int *argc, char***argv){
1047 /* First initializes everything */
1048 smpi_process_init(argc, argv);
1049 smpi_process_mark_as_initialized();
1050 smpi_process_set_replaying(1);
1052 int rank = smpi_process_index();
1053 TRACE_smpi_init(rank);
1054 TRACE_smpi_computing_init(rank);
1055 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1056 extra->type = TRACING_INIT;
1057 char *operation =bprintf("%s_init",__FUNCTION__);
1058 TRACE_smpi_collective_in(rank, -1, operation, extra);
1059 TRACE_smpi_collective_out(rank, -1, operation);
1062 if (!_xbt_replay_action_init()) {
1063 xbt_replay_action_register("init", action_init);
1064 xbt_replay_action_register("finalize", action_finalize);
1065 xbt_replay_action_register("comm_size", action_comm_size);
1066 xbt_replay_action_register("comm_split", action_comm_split);
1067 xbt_replay_action_register("comm_dup", action_comm_dup);
1068 xbt_replay_action_register("send", action_send);
1069 xbt_replay_action_register("Isend", action_Isend);
1070 xbt_replay_action_register("recv", action_recv);
1071 xbt_replay_action_register("Irecv", action_Irecv);
1072 xbt_replay_action_register("test", action_test);
1073 xbt_replay_action_register("wait", action_wait);
1074 xbt_replay_action_register("waitAll", action_waitall);
1075 xbt_replay_action_register("barrier", action_barrier);
1076 xbt_replay_action_register("bcast", action_bcast);
1077 xbt_replay_action_register("reduce", action_reduce);
1078 xbt_replay_action_register("allReduce", action_allReduce);
1079 xbt_replay_action_register("allToAll", action_allToAll);
1080 xbt_replay_action_register("allToAllV", action_allToAllv);
1081 xbt_replay_action_register("gather", action_gather);
1082 xbt_replay_action_register("gatherV", action_gatherv);
1083 xbt_replay_action_register("allGather", action_allgather);
1084 xbt_replay_action_register("allGatherV", action_allgatherv);
1085 xbt_replay_action_register("reduceScatter", action_reducescatter);
1086 xbt_replay_action_register("compute", action_compute);
1089 //if we have a delayed start, sleep here.
1092 double value = strtod((*argv)[2], &endptr);
1093 if (*endptr != '\0')
1094 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1095 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1096 smpi_execute_flops(value);
1098 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1099 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
1100 smpi_execute_flops(0.0);
1103 /* Actually run the replay */
1104 xbt_replay_action_runner(*argc, *argv);
1106 /* and now, finalize everything */
1107 double sim_time= 1.;
1108 /* One active process will stop. Decrease the counter*/
1109 XBT_DEBUG("There are %lu elements in reqq[*]",
1110 xbt_dynar_length(get_reqq_self()));
1111 if (!xbt_dynar_is_empty(get_reqq_self())){
1112 int count_requests=xbt_dynar_length(get_reqq_self());
1113 MPI_Request requests[count_requests];
1114 MPI_Status status[count_requests];
1117 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1118 smpi_mpi_waitall(count_requests, requests, status);
1124 if(!active_processes){
1125 /* Last process alive speaking */
1126 /* end the simulated timer */
1127 sim_time = smpi_process_simulated_elapsed();
1131 //TODO xbt_dynar_free_container(get_reqq_self()));
1133 if(!active_processes){
1134 XBT_INFO("Simulation time %f", sim_time);
1135 _xbt_replay_action_exit();
1136 xbt_free(sendbuffer);
1137 xbt_free(recvbuffer);
1139 xbt_dict_free(&reqq); //not need, data have been freed ???
1143 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1144 extra_fin->type = TRACING_FINALIZE;
1145 operation =bprintf("%s_finalize",__FUNCTION__);
1146 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1148 smpi_process_finalize();
1150 TRACE_smpi_collective_out(rank, -1, operation);
1151 TRACE_smpi_finalize(smpi_process_index());
1152 smpi_process_destroy();