1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
28 static void log_timed_action (const char *const *action, double clock){
29 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30 char *name = xbt_str_join_array(action, " ");
31 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static xbt_dynar_t get_reqq_self(){
40 int size = asprintf(&key, "%d", smpi_process_index());
42 xbt_die("could not allocate memory for asprintf");
43 xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
46 return dynar_mpi_request;
49 static void set_reqq_self(xbt_dynar_t mpi_request){
52 int size = asprintf(&key, "%d", smpi_process_index());
54 xbt_die("could not allocate memory for asprintf");
55 xbt_dict_set(reqq, key, mpi_request, free);
60 //allocate a single buffer for all sends, growing it if needed
61 void* smpi_get_tmp_sendbuffer(int size){
62 if (!smpi_process_get_replaying())
63 return xbt_malloc(size);
64 if (sendbuffer_size<size){
65 sendbuffer=xbt_realloc(sendbuffer,size);
70 //allocate a single buffer for all recv
71 void* smpi_get_tmp_recvbuffer(int size){
72 if (!smpi_process_get_replaying())
73 return xbt_malloc(size);
74 if (recvbuffer_size<size){
75 recvbuffer=xbt_realloc(recvbuffer,size);
81 void smpi_free_tmp_buffer(void* buf){
82 if (!smpi_process_get_replaying())
87 static double parse_double(const char *string)
91 value = strtod(string, &endptr);
93 THROWF(unknown_error, 0, "%s is not a double", string);
97 static MPI_Datatype decode_datatype(const char *const action)
99 // Declared datatypes,
104 MPI_CURRENT_TYPE=MPI_DOUBLE;
107 MPI_CURRENT_TYPE=MPI_INT;
110 MPI_CURRENT_TYPE=MPI_CHAR;
113 MPI_CURRENT_TYPE=MPI_SHORT;
116 MPI_CURRENT_TYPE=MPI_LONG;
119 MPI_CURRENT_TYPE=MPI_FLOAT;
122 MPI_CURRENT_TYPE=MPI_BYTE;
125 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
128 return MPI_CURRENT_TYPE;
132 const char* encode_datatype(MPI_Datatype datatype, int* known)
135 //default type for output is set to MPI_BYTE
136 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
138 if (datatype==MPI_BYTE){
141 if(datatype==MPI_DOUBLE)
143 if(datatype==MPI_INT)
145 if(datatype==MPI_CHAR)
147 if(datatype==MPI_SHORT)
149 if(datatype==MPI_LONG)
151 if(datatype==MPI_FLOAT)
153 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
155 // default - not implemented.
156 // do not warn here as we pass in this function even for other trace formats
160 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
162 while(action[i]!=NULL)\
165 THROWF(arg_error, 0, "%s replay failed.\n" \
166 "%d items were given on the line. First two should be process_id and action. " \
167 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
168 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
172 static void action_init(const char *const *action)
174 XBT_DEBUG("Initialize the counters");
175 CHECK_ACTION_PARAMS(action, 0, 1);
176 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
177 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
179 /* start a simulated timer */
180 smpi_process_simulated_start();
181 /*initialize the number of active processes */
182 active_processes = smpi_process_count();
185 reqq = xbt_dict_new();
188 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
191 reqq=xbt_new0(xbt_dynar_t,active_processes);
193 for(i=0;i<active_processes;i++){
194 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
200 static void action_finalize(const char *const *action)
204 static void action_comm_size(const char *const *action)
206 double clock = smpi_process_simulated_elapsed();
208 communicator_size = parse_double(action[2]);
209 log_timed_action (action, clock);
212 static void action_comm_split(const char *const *action)
214 double clock = smpi_process_simulated_elapsed();
216 log_timed_action (action, clock);
219 static void action_comm_dup(const char *const *action)
221 double clock = smpi_process_simulated_elapsed();
223 log_timed_action (action, clock);
226 static void action_compute(const char *const *action)
228 CHECK_ACTION_PARAMS(action, 1, 0);
229 double clock = smpi_process_simulated_elapsed();
230 double flops= parse_double(action[2]);
231 int rank = smpi_process_index();
232 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
233 extra->type=TRACING_COMPUTING;
234 extra->comp_size=flops;
235 TRACE_smpi_computing_in(rank, extra);
237 smpi_execute_flops(flops);
239 TRACE_smpi_computing_out(rank);
240 log_timed_action (action, clock);
243 static void action_send(const char *const *action)
245 CHECK_ACTION_PARAMS(action, 2, 1);
246 int to = atoi(action[2]);
247 double size=parse_double(action[3]);
248 double clock = smpi_process_simulated_elapsed();
251 MPI_CURRENT_TYPE=decode_datatype(action[4]);
253 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
256 int rank = smpi_process_index();
258 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
259 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
260 extra->type = TRACING_SEND;
261 extra->send_size = size;
263 extra->dst = dst_traced;
264 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
265 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
266 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
268 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
270 log_timed_action (action, clock);
272 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
275 static void action_Isend(const char *const *action)
277 CHECK_ACTION_PARAMS(action, 2, 1);
278 int to = atoi(action[2]);
279 double size=parse_double(action[3]);
280 double clock = smpi_process_simulated_elapsed();
283 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
284 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
286 int rank = smpi_process_index();
287 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
288 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
289 extra->type = TRACING_ISEND;
290 extra->send_size = size;
292 extra->dst = dst_traced;
293 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
294 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
295 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
297 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
299 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
302 xbt_dynar_push(get_reqq_self(),&request);
304 log_timed_action (action, clock);
307 static void action_recv(const char *const *action) {
308 CHECK_ACTION_PARAMS(action, 2, 1);
309 int from = atoi(action[2]);
310 double size=parse_double(action[3]);
311 double clock = smpi_process_simulated_elapsed();
314 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
315 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
317 int rank = smpi_process_index();
318 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
320 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
321 extra->type = TRACING_RECV;
322 extra->send_size = size;
323 extra->src = src_traced;
325 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
326 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
328 //unknow size from the receiver pov
330 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
334 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
336 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
337 TRACE_smpi_recv(rank, src_traced, rank);
339 log_timed_action (action, clock);
342 static void action_Irecv(const char *const *action)
344 CHECK_ACTION_PARAMS(action, 2, 1);
345 int from = atoi(action[2]);
346 double size=parse_double(action[3]);
347 double clock = smpi_process_simulated_elapsed();
350 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
351 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
353 int rank = smpi_process_index();
354 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
355 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
356 extra->type = TRACING_IRECV;
357 extra->send_size = size;
358 extra->src = src_traced;
360 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
361 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
363 //unknow size from the receiver pov
365 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
369 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
371 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
373 xbt_dynar_push(get_reqq_self(),&request);
375 log_timed_action (action, clock);
378 static void action_test(const char *const *action){
379 CHECK_ACTION_PARAMS(action, 0, 0);
380 double clock = smpi_process_simulated_elapsed();
385 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
386 //if request is null here, this may mean that a previous test has succeeded
387 //Different times in traced application and replayed version may lead to this
388 //In this case, ignore the extra calls.
390 int rank = smpi_process_index();
391 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
392 extra->type=TRACING_TEST;
393 TRACE_smpi_testing_in(rank, extra);
395 flag = smpi_mpi_test(&request, &status);
397 XBT_DEBUG("MPI_Test result: %d", flag);
398 /* push back request in dynar to be caught by a subsequent wait. if the test
399 * did succeed, the request is now NULL.
401 xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
403 TRACE_smpi_testing_out(rank);
405 log_timed_action (action, clock);
408 static void action_wait(const char *const *action){
409 CHECK_ACTION_PARAMS(action, 0, 0);
410 double clock = smpi_process_simulated_elapsed();
414 xbt_assert(xbt_dynar_length(get_reqq_self()),
415 "action wait not preceded by any irecv or isend: %s",
416 xbt_str_join_array(action," "));
417 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
420 /* Assuming that the trace is well formed, this mean the comm might have
421 * been caught by a MPI_test. Then just return.
426 int rank = request->comm != MPI_COMM_NULL
427 ? smpi_comm_rank(request->comm)
430 MPI_Group group = smpi_comm_group(request->comm);
431 int src_traced = smpi_group_rank(group, request->src);
432 int dst_traced = smpi_group_rank(group, request->dst);
433 int is_wait_for_receive = request->recv;
434 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
435 extra->type = TRACING_WAIT;
436 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
438 smpi_mpi_wait(&request, &status);
440 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
441 if (is_wait_for_receive)
442 TRACE_smpi_recv(rank, src_traced, dst_traced);
443 log_timed_action (action, clock);
446 static void action_waitall(const char *const *action){
447 CHECK_ACTION_PARAMS(action, 0, 0);
448 double clock = smpi_process_simulated_elapsed();
449 int count_requests=0;
452 count_requests=xbt_dynar_length(get_reqq_self());
454 if (count_requests>0) {
455 MPI_Request requests[count_requests];
456 MPI_Status status[count_requests];
458 /* The reqq is an array of dynars. Its index corresponds to the rank.
459 Thus each rank saves its own requests to the array request. */
460 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
462 //save information from requests
464 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
465 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
466 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
467 for (i = 0; i < count_requests; i++) {
469 int *asrc = xbt_new(int, 1);
470 int *adst = xbt_new(int, 1);
471 int *arecv = xbt_new(int, 1);
472 *asrc = requests[i]->src;
473 *adst = requests[i]->dst;
474 *arecv = requests[i]->recv;
475 xbt_dynar_insert_at(srcs, i, asrc);
476 xbt_dynar_insert_at(dsts, i, adst);
477 xbt_dynar_insert_at(recvs, i, arecv);
482 int *t = xbt_new(int, 1);
483 xbt_dynar_insert_at(srcs, i, t);
484 xbt_dynar_insert_at(dsts, i, t);
485 xbt_dynar_insert_at(recvs, i, t);
489 int rank_traced = smpi_process_index();
490 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
491 extra->type = TRACING_WAITALL;
492 extra->send_size=count_requests;
493 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
495 smpi_mpi_waitall(count_requests, requests, status);
497 for (i = 0; i < count_requests; i++) {
498 int src_traced, dst_traced, is_wait_for_receive;
499 xbt_dynar_get_cpy(srcs, i, &src_traced);
500 xbt_dynar_get_cpy(dsts, i, &dst_traced);
501 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
502 if (is_wait_for_receive) {
503 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
506 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
508 xbt_dynar_free(&srcs);
509 xbt_dynar_free(&dsts);
510 xbt_dynar_free(&recvs);
512 //TODO xbt_dynar_free_container(get_reqq_self());
513 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
515 log_timed_action (action, clock);
518 static void action_barrier(const char *const *action){
519 double clock = smpi_process_simulated_elapsed();
520 int rank = smpi_process_index();
521 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
522 extra->type = TRACING_BARRIER;
523 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
525 mpi_coll_barrier_fun(MPI_COMM_WORLD);
527 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
528 log_timed_action (action, clock);
532 static void action_bcast(const char *const *action)
534 CHECK_ACTION_PARAMS(action, 1, 2);
535 double size = parse_double(action[2]);
536 double clock = smpi_process_simulated_elapsed();
539 * Initialize MPI_CURRENT_TYPE in order to decrease
540 * the number of the checks
542 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
545 root= atoi(action[3]);
547 MPI_CURRENT_TYPE=decode_datatype(action[4]);
551 int rank = smpi_process_index();
552 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
554 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
555 extra->type = TRACING_BCAST;
556 extra->send_size = size;
557 extra->root = root_traced;
558 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
559 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
560 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
562 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
564 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
565 log_timed_action (action, clock);
568 static void action_reduce(const char *const *action)
570 CHECK_ACTION_PARAMS(action, 2, 2);
571 double comm_size = parse_double(action[2]);
572 double comp_size = parse_double(action[3]);
573 double clock = smpi_process_simulated_elapsed();
575 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
578 root= atoi(action[4]);
580 MPI_CURRENT_TYPE=decode_datatype(action[5]);
586 int rank = smpi_process_index();
587 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
588 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
589 extra->type = TRACING_REDUCE;
590 extra->send_size = comm_size;
591 extra->comp_size = comp_size;
592 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
593 extra->root = root_traced;
595 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
597 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
598 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
599 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
600 smpi_execute_flops(comp_size);
602 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
603 log_timed_action (action, clock);
606 static void action_allReduce(const char *const *action) {
607 CHECK_ACTION_PARAMS(action, 2, 1);
608 double comm_size = parse_double(action[2]);
609 double comp_size = parse_double(action[3]);
611 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
612 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
614 double clock = smpi_process_simulated_elapsed();
615 int rank = smpi_process_index();
616 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
617 extra->type = TRACING_ALLREDUCE;
618 extra->send_size = comm_size;
619 extra->comp_size = comp_size;
620 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
621 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
623 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
624 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
625 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
626 smpi_execute_flops(comp_size);
628 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
629 log_timed_action (action, clock);
632 static void action_allToAll(const char *const *action) {
633 double clock = smpi_process_simulated_elapsed();
634 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
635 int send_size = parse_double(action[2]);
636 int recv_size = parse_double(action[3]);
637 MPI_Datatype MPI_CURRENT_TYPE2;
640 MPI_CURRENT_TYPE=decode_datatype(action[4]);
641 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
644 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
645 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
647 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
648 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
650 int rank = smpi_process_index();
651 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
652 extra->type = TRACING_ALLTOALL;
653 extra->send_size = send_size;
654 extra->recv_size = recv_size;
655 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
656 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
658 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
660 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
662 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
663 log_timed_action (action, clock);
667 static void action_gather(const char *const *action) {
669 The structure of the gather action for the rank 0 (total 4 processes)
674 1) 68 is the sendcounts
675 2) 68 is the recvcounts
676 3) 0 is the root node
677 4) 0 is the send datatype id, see decode_datatype()
678 5) 0 is the recv datatype id, see decode_datatype()
680 CHECK_ACTION_PARAMS(action, 2, 3);
681 double clock = smpi_process_simulated_elapsed();
682 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
683 int send_size = parse_double(action[2]);
684 int recv_size = parse_double(action[3]);
685 MPI_Datatype MPI_CURRENT_TYPE2;
687 MPI_CURRENT_TYPE=decode_datatype(action[5]);
688 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
690 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
691 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
693 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
697 root=atoi(action[4]);
698 int rank = smpi_comm_rank(MPI_COMM_WORLD);
701 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
703 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
704 extra->type = TRACING_GATHER;
705 extra->send_size = send_size;
706 extra->recv_size = recv_size;
708 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
709 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
711 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
713 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
714 recv, recv_size, MPI_CURRENT_TYPE2,
715 root, MPI_COMM_WORLD);
717 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
718 log_timed_action (action, clock);
723 static void action_gatherv(const char *const *action) {
725 The structure of the gatherv action for the rank 0 (total 4 processes)
727 0 gather 68 68 10 10 10 0 0 0
730 1) 68 is the sendcount
731 2) 68 10 10 10 is the recvcounts
732 3) 0 is the root node
733 4) 0 is the send datatype id, see decode_datatype()
734 5) 0 is the recv datatype id, see decode_datatype()
737 double clock = smpi_process_simulated_elapsed();
738 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
739 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
740 int send_size = parse_double(action[2]);
741 int *disps = xbt_new0(int, comm_size);
742 int *recvcounts = xbt_new0(int, comm_size);
745 MPI_Datatype MPI_CURRENT_TYPE2;
746 if(action[4+comm_size]) {
747 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
748 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
750 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
751 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
753 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
755 for(i=0;i<comm_size;i++) {
756 recvcounts[i] = atoi(action[i+3]);
757 recv_sum=recv_sum+recvcounts[i];
761 int root=atoi(action[3+comm_size]);
762 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
765 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
767 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
768 extra->type = TRACING_GATHERV;
769 extra->send_size = send_size;
770 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
771 for(i=0; i< comm_size; i++)//copy data to avoid bad free
772 extra->recvcounts[i] = recvcounts[i];
774 extra->num_processes = comm_size;
775 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
776 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
778 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
780 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
781 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
782 root, MPI_COMM_WORLD);
784 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
785 log_timed_action (action, clock);
786 xbt_free(recvcounts);
790 static void action_reducescatter(const char *const *action) {
793 The structure of the reducescatter action for the rank 0 (total 4 processes)
795 0 reduceScatter 275427 275427 275427 204020 11346849 0
798 1) The first four values after the name of the action declare the recvcounts array
799 2) The value 11346849 is the amount of instructions
800 3) The last value corresponds to the datatype, see decode_datatype().
802 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
806 double clock = smpi_process_simulated_elapsed();
807 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
808 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
809 int comp_size = parse_double(action[2+comm_size]);
810 int *recvcounts = xbt_new0(int, comm_size);
811 int *disps = xbt_new0(int, comm_size);
813 int rank = smpi_process_index();
815 if(action[3+comm_size])
816 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
818 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
820 for(i=0;i<comm_size;i++) {
821 recvcounts[i] = atoi(action[i+2]);
826 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
827 extra->type = TRACING_REDUCE_SCATTER;
828 extra->send_size = 0;
829 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
830 for(i=0; i< comm_size; i++)//copy data to avoid bad free
831 extra->recvcounts[i] = recvcounts[i];
832 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
833 extra->comp_size = comp_size;
834 extra->num_processes = comm_size;
836 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
838 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
839 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
841 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
843 smpi_execute_flops(comp_size);
846 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
847 xbt_free(recvcounts);
849 log_timed_action (action, clock);
852 static void action_allgather(const char *const *action) {
854 The structure of the allgather action for the rank 0 (total 4 processes)
856 0 allGather 275427 275427
859 1) 275427 is the sendcount
860 2) 275427 is the recvcount
861 3) No more values mean that the datatype for sent and receive buffer
862 is the default one, see decode_datatype().
866 double clock = smpi_process_simulated_elapsed();
868 CHECK_ACTION_PARAMS(action, 2, 2);
869 int sendcount=atoi(action[2]);
870 int recvcount=atoi(action[3]);
872 MPI_Datatype MPI_CURRENT_TYPE2;
875 MPI_CURRENT_TYPE = decode_datatype(action[3]);
876 MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
878 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
879 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
881 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
882 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
884 int rank = smpi_process_index();
885 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
886 extra->type = TRACING_ALLGATHER;
887 extra->send_size = sendcount;
888 extra->recv_size= recvcount;
889 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
890 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
891 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
893 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
895 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
897 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
898 log_timed_action (action, clock);
901 static void action_allgatherv(const char *const *action) {
904 The structure of the allgatherv action for the rank 0 (total 4 processes)
906 0 allGatherV 275427 275427 275427 275427 204020
909 1) 275427 is the sendcount
910 2) The next four elements declare the recvcounts array
911 3) No more values mean that the datatype for sent and receive buffer
912 is the default one, see decode_datatype().
916 double clock = smpi_process_simulated_elapsed();
918 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
919 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
921 int sendcount=atoi(action[2]);
922 int *recvcounts = xbt_new0(int, comm_size);
923 int *disps = xbt_new0(int, comm_size);
925 MPI_Datatype MPI_CURRENT_TYPE2;
927 if(action[3+comm_size]) {
928 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
929 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
931 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
932 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
934 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
936 for(i=0;i<comm_size;i++) {
937 recvcounts[i] = atoi(action[i+3]);
938 recv_sum=recv_sum+recvcounts[i];
940 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
942 int rank = smpi_process_index();
943 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
944 extra->type = TRACING_ALLGATHERV;
945 extra->send_size = sendcount;
946 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
947 for(i=0; i< comm_size; i++)//copy data to avoid bad free
948 extra->recvcounts[i] = recvcounts[i];
949 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
950 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
951 extra->num_processes = comm_size;
953 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
955 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
957 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
958 log_timed_action (action, clock);
959 xbt_free(recvcounts);
963 static void action_allToAllv(const char *const *action) {
965 The structure of the allToAllV action for the rank 0 (total 4 processes)
967 0 allToAllV 100 1 7 10 12 100 1 70 10 5
970 1) 100 is the size of the send buffer *sizeof(int),
971 2) 1 7 10 12 is the sendcounts array
972 3) 100*sizeof(int) is the size of the receiver buffer
973 4) 1 70 10 5 is the recvcounts array
978 double clock = smpi_process_simulated_elapsed();
980 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
981 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
982 int send_buf_size=0,recv_buf_size=0,i=0;
983 int *sendcounts = xbt_new0(int, comm_size);
984 int *recvcounts = xbt_new0(int, comm_size);
985 int *senddisps = xbt_new0(int, comm_size);
986 int *recvdisps = xbt_new0(int, comm_size);
988 MPI_Datatype MPI_CURRENT_TYPE2;
990 send_buf_size=parse_double(action[2]);
991 recv_buf_size=parse_double(action[3+comm_size]);
992 if(action[4+2*comm_size]) {
993 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
994 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
997 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
998 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
1001 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1002 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1004 for(i=0;i<comm_size;i++) {
1005 sendcounts[i] = atoi(action[i+3]);
1006 recvcounts[i] = atoi(action[i+4+comm_size]);
1010 int rank = smpi_process_index();
1011 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1012 extra->type = TRACING_ALLTOALLV;
1013 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1014 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1015 extra->num_processes = comm_size;
1017 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1018 extra->send_size += sendcounts[i];
1019 extra->sendcounts[i] = sendcounts[i];
1020 extra->recv_size += recvcounts[i];
1021 extra->recvcounts[i] = recvcounts[i];
1023 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1024 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1026 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1028 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1029 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1032 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1033 log_timed_action (action, clock);
1034 xbt_free(sendcounts);
1035 xbt_free(recvcounts);
1036 xbt_free(senddisps);
1037 xbt_free(recvdisps);
1040 void smpi_replay_run(int *argc, char***argv){
1041 /* First initializes everything */
1042 smpi_process_init(argc, argv);
1043 smpi_process_mark_as_initialized();
1044 smpi_process_set_replaying(1);
1046 int rank = smpi_process_index();
1047 TRACE_smpi_init(rank);
1048 TRACE_smpi_computing_init(rank);
1049 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1050 extra->type = TRACING_INIT;
1051 char *operation =bprintf("%s_init",__FUNCTION__);
1052 TRACE_smpi_collective_in(rank, -1, operation, extra);
1053 TRACE_smpi_collective_out(rank, -1, operation);
1056 if (!_xbt_replay_action_init()) {
1057 xbt_replay_action_register("init", action_init);
1058 xbt_replay_action_register("finalize", action_finalize);
1059 xbt_replay_action_register("comm_size", action_comm_size);
1060 xbt_replay_action_register("comm_split", action_comm_split);
1061 xbt_replay_action_register("comm_dup", action_comm_dup);
1062 xbt_replay_action_register("send", action_send);
1063 xbt_replay_action_register("Isend", action_Isend);
1064 xbt_replay_action_register("recv", action_recv);
1065 xbt_replay_action_register("Irecv", action_Irecv);
1066 xbt_replay_action_register("test", action_test);
1067 xbt_replay_action_register("wait", action_wait);
1068 xbt_replay_action_register("waitAll", action_waitall);
1069 xbt_replay_action_register("barrier", action_barrier);
1070 xbt_replay_action_register("bcast", action_bcast);
1071 xbt_replay_action_register("reduce", action_reduce);
1072 xbt_replay_action_register("allReduce", action_allReduce);
1073 xbt_replay_action_register("allToAll", action_allToAll);
1074 xbt_replay_action_register("allToAllV", action_allToAllv);
1075 xbt_replay_action_register("gather", action_gather);
1076 xbt_replay_action_register("gatherV", action_gatherv);
1077 xbt_replay_action_register("allGather", action_allgather);
1078 xbt_replay_action_register("allGatherV", action_allgatherv);
1079 xbt_replay_action_register("reduceScatter", action_reducescatter);
1080 xbt_replay_action_register("compute", action_compute);
1083 //if we have a delayed start, sleep here.
1086 double value = strtod((*argv)[2], &endptr);
1087 if (*endptr != '\0')
1088 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1089 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1090 smpi_execute_flops(value);
1092 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1093 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
1094 smpi_execute_flops(0.0);
1097 /* Actually run the replay */
1098 xbt_replay_action_runner(*argc, *argv);
1100 /* and now, finalize everything */
1101 double sim_time= 1.;
1102 /* One active process will stop. Decrease the counter*/
1103 XBT_DEBUG("There are %lu elements in reqq[*]",
1104 xbt_dynar_length(get_reqq_self()));
1105 if (!xbt_dynar_is_empty(get_reqq_self())){
1106 int count_requests=xbt_dynar_length(get_reqq_self());
1107 MPI_Request requests[count_requests];
1108 MPI_Status status[count_requests];
1111 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1112 smpi_mpi_waitall(count_requests, requests, status);
1118 if(!active_processes){
1119 /* Last process alive speaking */
1120 /* end the simulated timer */
1121 sim_time = smpi_process_simulated_elapsed();
1125 //TODO xbt_dynar_free_container(get_reqq_self()));
1127 if(!active_processes){
1128 XBT_INFO("Simulation time %f", sim_time);
1129 _xbt_replay_action_exit();
1130 xbt_free(sendbuffer);
1131 xbt_free(recvbuffer);
1133 xbt_dict_free(&reqq); //not need, data have been freed ???
1137 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1138 extra_fin->type = TRACING_FINALIZE;
1139 operation =bprintf("%s_finalize",__FUNCTION__);
1140 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1142 smpi_process_finalize();
1144 TRACE_smpi_collective_out(rank, -1, operation);
1145 TRACE_smpi_finalize(smpi_process_index());
1146 smpi_process_destroy();