1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
11 #include <unordered_map>
14 #define KEY_SIZE (sizeof(int) * 2 + 1)
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
18 int communicator_size = 0;
19 static int active_processes = 0;
20 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
22 MPI_Datatype MPI_DEFAULT_TYPE;
23 MPI_Datatype MPI_CURRENT_TYPE;
25 static int sendbuffer_size=0;
26 char* sendbuffer=nullptr;
27 static int recvbuffer_size=0;
28 char* recvbuffer=nullptr;
30 static void log_timed_action (const char *const *action, double clock){
31 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
32 char *name = xbt_str_join_array(action, " ");
33 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
38 static std::vector<MPI_Request>* get_reqq_self()
40 return reqq.at(smpi_process_index());
43 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
45 reqq.insert({smpi_process_index(), mpi_request});
48 //allocate a single buffer for all sends, growing it if needed
49 void* smpi_get_tmp_sendbuffer(int size)
51 if (!smpi_process_get_replaying())
52 return xbt_malloc(size);
53 if (sendbuffer_size<size){
54 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
60 //allocate a single buffer for all recv
61 void* smpi_get_tmp_recvbuffer(int size){
62 if (!smpi_process_get_replaying())
63 return xbt_malloc(size);
64 if (recvbuffer_size<size){
65 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
71 void smpi_free_tmp_buffer(void* buf){
72 if (!smpi_process_get_replaying())
77 static double parse_double(const char *string)
81 value = strtod(string, &endptr);
83 THROWF(unknown_error, 0, "%s is not a double", string);
87 static MPI_Datatype decode_datatype(const char *const action)
89 // Declared datatypes,
90 switch(atoi(action)) {
92 MPI_CURRENT_TYPE=MPI_DOUBLE;
95 MPI_CURRENT_TYPE=MPI_INT;
98 MPI_CURRENT_TYPE=MPI_CHAR;
101 MPI_CURRENT_TYPE=MPI_SHORT;
104 MPI_CURRENT_TYPE=MPI_LONG;
107 MPI_CURRENT_TYPE=MPI_FLOAT;
110 MPI_CURRENT_TYPE=MPI_BYTE;
113 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
115 return MPI_CURRENT_TYPE;
119 const char* encode_datatype(MPI_Datatype datatype, int* known)
121 //default type for output is set to MPI_BYTE
122 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
125 if (datatype==MPI_BYTE){
128 if(datatype==MPI_DOUBLE)
130 if(datatype==MPI_INT)
132 if(datatype==MPI_CHAR)
134 if(datatype==MPI_SHORT)
136 if(datatype==MPI_LONG)
138 if(datatype==MPI_FLOAT)
140 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
143 // default - not implemented.
144 // do not warn here as we pass in this function even for other trace formats
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
150 while(action[i]!=nullptr)\
153 THROWF(arg_error, 0, "%s replay failed.\n" \
154 "%d items were given on the line. First two should be process_id and action. " \
155 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
159 static void action_init(const char *const *action)
161 XBT_DEBUG("Initialize the counters");
162 CHECK_ACTION_PARAMS(action, 0, 1)
164 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
165 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
167 /* start a simulated timer */
168 smpi_process_simulated_start();
169 /*initialize the number of active processes */
170 active_processes = smpi_process_count();
172 set_reqq_self(new std::vector<MPI_Request>);
175 static void action_finalize(const char *const *action)
180 static void action_comm_size(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
184 communicator_size = parse_double(action[2]);
185 log_timed_action (action, clock);
188 static void action_comm_split(const char *const *action)
190 double clock = smpi_process_simulated_elapsed();
192 log_timed_action (action, clock);
195 static void action_comm_dup(const char *const *action)
197 double clock = smpi_process_simulated_elapsed();
199 log_timed_action (action, clock);
202 static void action_compute(const char *const *action)
204 CHECK_ACTION_PARAMS(action, 1, 0)
205 double clock = smpi_process_simulated_elapsed();
206 double flops= parse_double(action[2]);
207 int rank = smpi_process_index();
208 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
209 extra->type=TRACING_COMPUTING;
210 extra->comp_size=flops;
211 TRACE_smpi_computing_in(rank, extra);
213 smpi_execute_flops(flops);
215 TRACE_smpi_computing_out(rank);
216 log_timed_action (action, clock);
219 static void action_send(const char *const *action)
221 CHECK_ACTION_PARAMS(action, 2, 1)
222 int to = atoi(action[2]);
223 double size=parse_double(action[3]);
224 double clock = smpi_process_simulated_elapsed();
227 MPI_CURRENT_TYPE=decode_datatype(action[4]);
229 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
231 int rank = smpi_process_index();
233 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
234 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
235 extra->type = TRACING_SEND;
236 extra->send_size = size;
238 extra->dst = dst_traced;
239 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
240 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
241 if (!TRACE_smpi_view_internals()) {
242 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
245 smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
247 log_timed_action (action, clock);
249 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
252 static void action_Isend(const char *const *action)
254 CHECK_ACTION_PARAMS(action, 2, 1)
255 int to = atoi(action[2]);
256 double size=parse_double(action[3]);
257 double clock = smpi_process_simulated_elapsed();
261 MPI_CURRENT_TYPE=decode_datatype(action[4]);
263 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
265 int rank = smpi_process_index();
266 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
267 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
268 extra->type = TRACING_ISEND;
269 extra->send_size = size;
271 extra->dst = dst_traced;
272 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
273 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
274 if (!TRACE_smpi_view_internals()) {
275 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
278 request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
280 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
283 get_reqq_self()->push_back(request);
285 log_timed_action (action, clock);
288 static void action_recv(const char *const *action) {
289 CHECK_ACTION_PARAMS(action, 2, 1)
290 int from = atoi(action[2]);
291 double size=parse_double(action[3]);
292 double clock = smpi_process_simulated_elapsed();
296 MPI_CURRENT_TYPE=decode_datatype(action[4]);
298 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
300 int rank = smpi_process_index();
301 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
303 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
304 extra->type = TRACING_RECV;
305 extra->send_size = size;
306 extra->src = src_traced;
308 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
309 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
311 //unknow size from the receiver pov
313 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
317 smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
319 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
320 if (!TRACE_smpi_view_internals()) {
321 TRACE_smpi_recv(rank, src_traced, rank);
324 log_timed_action (action, clock);
327 static void action_Irecv(const char *const *action)
329 CHECK_ACTION_PARAMS(action, 2, 1)
330 int from = atoi(action[2]);
331 double size=parse_double(action[3]);
332 double clock = smpi_process_simulated_elapsed();
336 MPI_CURRENT_TYPE=decode_datatype(action[4]);
338 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
340 int rank = smpi_process_index();
341 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
342 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
343 extra->type = TRACING_IRECV;
344 extra->send_size = size;
345 extra->src = src_traced;
347 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
348 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
350 //unknow size from the receiver pov
352 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
356 request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
358 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
360 get_reqq_self()->push_back(request);
362 log_timed_action (action, clock);
365 static void action_test(const char *const *action){
366 CHECK_ACTION_PARAMS(action, 0, 0)
367 double clock = smpi_process_simulated_elapsed();
371 MPI_Request request = get_reqq_self()->back();
372 get_reqq_self()->pop_back();
373 //if request is null here, this may mean that a previous test has succeeded
374 //Different times in traced application and replayed version may lead to this
375 //In this case, ignore the extra calls.
376 if(request!=nullptr){
377 int rank = smpi_process_index();
378 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
379 extra->type=TRACING_TEST;
380 TRACE_smpi_testing_in(rank, extra);
382 flag = smpi_mpi_test(&request, &status);
384 XBT_DEBUG("MPI_Test result: %d", flag);
385 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
386 get_reqq_self()->push_back(request);
388 TRACE_smpi_testing_out(rank);
390 log_timed_action (action, clock);
393 static void action_wait(const char *const *action){
394 CHECK_ACTION_PARAMS(action, 0, 0)
395 double clock = smpi_process_simulated_elapsed();
398 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
399 xbt_str_join_array(action," "));
400 MPI_Request request = get_reqq_self()->back();
401 get_reqq_self()->pop_back();
403 if (request==nullptr){
404 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
408 int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
410 MPI_Group group = smpi_comm_group(request->comm);
411 int src_traced = smpi_group_rank(group, request->src);
412 int dst_traced = smpi_group_rank(group, request->dst);
413 int is_wait_for_receive = request->recv;
414 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
415 extra->type = TRACING_WAIT;
416 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
418 smpi_mpi_wait(&request, &status);
420 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
421 if (is_wait_for_receive)
422 TRACE_smpi_recv(rank, src_traced, dst_traced);
423 log_timed_action (action, clock);
426 static void action_waitall(const char *const *action){
427 CHECK_ACTION_PARAMS(action, 0, 0)
428 double clock = smpi_process_simulated_elapsed();
429 unsigned int count_requests=get_reqq_self()->size();
431 if (count_requests>0) {
432 MPI_Status status[count_requests];
434 int rank_traced = smpi_process_index();
435 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
436 extra->type = TRACING_WAITALL;
437 extra->send_size=count_requests;
438 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
440 smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
442 for (auto req : *(get_reqq_self())){
443 if (req && req->recv)
444 TRACE_smpi_recv(rank_traced, req->src, req->dst);
446 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
447 set_reqq_self(new std::vector<MPI_Request>);
449 log_timed_action (action, clock);
452 static void action_barrier(const char *const *action){
453 double clock = smpi_process_simulated_elapsed();
454 int rank = smpi_process_index();
455 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
456 extra->type = TRACING_BARRIER;
457 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
459 mpi_coll_barrier_fun(MPI_COMM_WORLD);
461 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
462 log_timed_action (action, clock);
465 static void action_bcast(const char *const *action)
467 CHECK_ACTION_PARAMS(action, 1, 2)
468 double size = parse_double(action[2]);
469 double clock = smpi_process_simulated_elapsed();
471 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
472 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
475 root= atoi(action[3]);
477 MPI_CURRENT_TYPE=decode_datatype(action[4]);
481 int rank = smpi_process_index();
482 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
484 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
485 extra->type = TRACING_BCAST;
486 extra->send_size = size;
487 extra->root = root_traced;
488 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
489 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
490 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
492 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
494 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
495 log_timed_action (action, clock);
498 static void action_reduce(const char *const *action)
500 CHECK_ACTION_PARAMS(action, 2, 2)
501 double comm_size = parse_double(action[2]);
502 double comp_size = parse_double(action[3]);
503 double clock = smpi_process_simulated_elapsed();
505 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
508 root= atoi(action[4]);
510 MPI_CURRENT_TYPE=decode_datatype(action[5]);
514 int rank = smpi_process_index();
515 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
516 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
517 extra->type = TRACING_REDUCE;
518 extra->send_size = comm_size;
519 extra->comp_size = comp_size;
520 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
521 extra->root = root_traced;
523 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
525 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
526 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
527 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
528 smpi_execute_flops(comp_size);
530 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
531 log_timed_action (action, clock);
534 static void action_allReduce(const char *const *action) {
535 CHECK_ACTION_PARAMS(action, 2, 1)
536 double comm_size = parse_double(action[2]);
537 double comp_size = parse_double(action[3]);
540 MPI_CURRENT_TYPE=decode_datatype(action[4]);
542 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
544 double clock = smpi_process_simulated_elapsed();
545 int rank = smpi_process_index();
546 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
547 extra->type = TRACING_ALLREDUCE;
548 extra->send_size = comm_size;
549 extra->comp_size = comp_size;
550 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
551 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
553 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
554 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
555 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
556 smpi_execute_flops(comp_size);
558 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
559 log_timed_action (action, clock);
562 static void action_allToAll(const char *const *action) {
563 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
564 //two optional (corresponding datatypes)
565 double clock = smpi_process_simulated_elapsed();
566 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
567 int send_size = parse_double(action[2]);
568 int recv_size = parse_double(action[3]);
569 MPI_Datatype MPI_CURRENT_TYPE2;
571 if(action[4] && action[5]) {
572 MPI_CURRENT_TYPE=decode_datatype(action[4]);
573 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
576 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
577 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
580 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
581 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
583 int rank = smpi_process_index();
584 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
585 extra->type = TRACING_ALLTOALL;
586 extra->send_size = send_size;
587 extra->recv_size = recv_size;
588 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
589 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
591 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
593 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
595 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
596 log_timed_action (action, clock);
599 static void action_gather(const char *const *action) {
600 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
604 1) 68 is the sendcounts
605 2) 68 is the recvcounts
606 3) 0 is the root node
607 4) 0 is the send datatype id, see decode_datatype()
608 5) 0 is the recv datatype id, see decode_datatype()
610 CHECK_ACTION_PARAMS(action, 2, 3)
611 double clock = smpi_process_simulated_elapsed();
612 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
613 int send_size = parse_double(action[2]);
614 int recv_size = parse_double(action[3]);
615 MPI_Datatype MPI_CURRENT_TYPE2;
616 if(action[4] && action[5]) {
617 MPI_CURRENT_TYPE=decode_datatype(action[5]);
618 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
620 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
621 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
623 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
624 void *recv = nullptr;
627 root=atoi(action[4]);
628 int rank = smpi_comm_rank(MPI_COMM_WORLD);
631 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
633 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
634 extra->type = TRACING_GATHER;
635 extra->send_size = send_size;
636 extra->recv_size = recv_size;
638 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
639 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
641 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
643 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
645 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
646 log_timed_action (action, clock);
649 static void action_gatherv(const char *const *action) {
650 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
651 0 gather 68 68 10 10 10 0 0 0
654 1) 68 is the sendcount
655 2) 68 10 10 10 is the recvcounts
656 3) 0 is the root node
657 4) 0 is the send datatype id, see decode_datatype()
658 5) 0 is the recv datatype id, see decode_datatype()
661 double clock = smpi_process_simulated_elapsed();
662 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
663 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
664 int send_size = parse_double(action[2]);
665 int *disps = xbt_new0(int, comm_size);
666 int *recvcounts = xbt_new0(int, comm_size);
669 MPI_Datatype MPI_CURRENT_TYPE2;
670 if(action[4+comm_size] && action[5+comm_size]) {
671 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
672 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
674 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
675 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
677 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
678 void *recv = nullptr;
679 for(i=0;i<comm_size;i++) {
680 recvcounts[i] = atoi(action[i+3]);
681 recv_sum=recv_sum+recvcounts[i];
685 int root=atoi(action[3+comm_size]);
686 int rank = smpi_comm_rank(MPI_COMM_WORLD);
689 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
691 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
692 extra->type = TRACING_GATHERV;
693 extra->send_size = send_size;
694 extra->recvcounts= xbt_new(int,comm_size);
695 for(i=0; i< comm_size; i++)//copy data to avoid bad free
696 extra->recvcounts[i] = recvcounts[i];
698 extra->num_processes = comm_size;
699 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
700 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
702 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
704 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
706 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
707 log_timed_action (action, clock);
708 xbt_free(recvcounts);
712 static void action_reducescatter(const char *const *action) {
713 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
714 0 reduceScatter 275427 275427 275427 204020 11346849 0
717 1) The first four values after the name of the action declare the recvcounts array
718 2) The value 11346849 is the amount of instructions
719 3) The last value corresponds to the datatype, see decode_datatype().
721 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
722 double clock = smpi_process_simulated_elapsed();
723 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
724 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
725 int comp_size = parse_double(action[2+comm_size]);
726 int *recvcounts = xbt_new0(int, comm_size);
727 int *disps = xbt_new0(int, comm_size);
729 int rank = smpi_process_index();
731 if(action[3+comm_size])
732 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
734 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
736 for(i=0;i<comm_size;i++) {
737 recvcounts[i] = atoi(action[i+2]);
742 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
743 extra->type = TRACING_REDUCE_SCATTER;
744 extra->send_size = 0;
745 extra->recvcounts= xbt_new(int, comm_size);
746 for(i=0; i< comm_size; i++)//copy data to avoid bad free
747 extra->recvcounts[i] = recvcounts[i];
748 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
749 extra->comp_size = comp_size;
750 extra->num_processes = comm_size;
752 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
754 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
755 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
757 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
758 smpi_execute_flops(comp_size);
760 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
761 xbt_free(recvcounts);
763 log_timed_action (action, clock);
766 static void action_allgather(const char *const *action) {
767 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
768 0 allGather 275427 275427
771 1) 275427 is the sendcount
772 2) 275427 is the recvcount
773 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype(). */
774 double clock = smpi_process_simulated_elapsed();
776 CHECK_ACTION_PARAMS(action, 2, 2)
777 int sendcount=atoi(action[2]);
778 int recvcount=atoi(action[3]);
780 MPI_Datatype MPI_CURRENT_TYPE2;
782 if(action[4] && action[5]) {
783 MPI_CURRENT_TYPE = decode_datatype(action[4]);
784 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
786 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
787 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
789 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
790 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
792 int rank = smpi_process_index();
793 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
794 extra->type = TRACING_ALLGATHER;
795 extra->send_size = sendcount;
796 extra->recv_size= recvcount;
797 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
798 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
799 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
801 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
803 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
805 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
806 log_timed_action (action, clock);
809 static void action_allgatherv(const char *const *action) {
810 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
811 0 allGatherV 275427 275427 275427 275427 204020
814 1) 275427 is the sendcount
815 2) The next four elements declare the recvcounts array
816 3) No more values mean that the datatype for sent and receive buffer
817 is the default one, see decode_datatype(). */
818 double clock = smpi_process_simulated_elapsed();
820 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
821 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
823 int sendcount=atoi(action[2]);
824 int *recvcounts = xbt_new0(int, comm_size);
825 int *disps = xbt_new0(int, comm_size);
827 MPI_Datatype MPI_CURRENT_TYPE2;
829 if(action[3+comm_size] && action[4+comm_size]) {
830 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
831 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
833 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
834 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
836 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
838 for(i=0;i<comm_size;i++) {
839 recvcounts[i] = atoi(action[i+3]);
840 recv_sum=recv_sum+recvcounts[i];
842 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
844 int rank = smpi_process_index();
845 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
846 extra->type = TRACING_ALLGATHERV;
847 extra->send_size = sendcount;
848 extra->recvcounts= xbt_new(int, comm_size);
849 for(i=0; i< comm_size; i++)//copy data to avoid bad free
850 extra->recvcounts[i] = recvcounts[i];
851 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
852 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
853 extra->num_processes = comm_size;
855 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
857 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
860 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
861 log_timed_action (action, clock);
862 xbt_free(recvcounts);
866 static void action_allToAllv(const char *const *action) {
867 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
868 0 allToAllV 100 1 7 10 12 100 1 70 10 5
871 1) 100 is the size of the send buffer *sizeof(int),
872 2) 1 7 10 12 is the sendcounts array
873 3) 100*sizeof(int) is the size of the receiver buffer
874 4) 1 70 10 5 is the recvcounts array */
875 double clock = smpi_process_simulated_elapsed();
877 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
878 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
879 int send_buf_size=0,recv_buf_size=0,i=0;
880 int *sendcounts = xbt_new0(int, comm_size);
881 int *recvcounts = xbt_new0(int, comm_size);
882 int *senddisps = xbt_new0(int, comm_size);
883 int *recvdisps = xbt_new0(int, comm_size);
885 MPI_Datatype MPI_CURRENT_TYPE2;
887 send_buf_size=parse_double(action[2]);
888 recv_buf_size=parse_double(action[3+comm_size]);
889 if(action[4+2*comm_size] && action[5+2*comm_size]) {
890 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
891 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
894 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
895 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
898 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
899 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
901 for(i=0;i<comm_size;i++) {
902 sendcounts[i] = atoi(action[i+3]);
903 recvcounts[i] = atoi(action[i+4+comm_size]);
906 int rank = smpi_process_index();
907 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
908 extra->type = TRACING_ALLTOALLV;
909 extra->recvcounts= xbt_new(int, comm_size);
910 extra->sendcounts= xbt_new(int, comm_size);
911 extra->num_processes = comm_size;
913 for(i=0; i< comm_size; i++){//copy data to avoid bad free
914 extra->send_size += sendcounts[i];
915 extra->sendcounts[i] = sendcounts[i];
916 extra->recv_size += recvcounts[i];
917 extra->recvcounts[i] = recvcounts[i];
919 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
920 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
922 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
924 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
925 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
927 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
928 log_timed_action (action, clock);
929 xbt_free(sendcounts);
930 xbt_free(recvcounts);
935 void smpi_replay_run(int *argc, char***argv){
936 /* First initializes everything */
937 smpi_process_init(argc, argv);
938 smpi_process_mark_as_initialized();
939 smpi_process_set_replaying(true);
941 int rank = smpi_process_index();
942 TRACE_smpi_init(rank);
943 TRACE_smpi_computing_init(rank);
944 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
945 extra->type = TRACING_INIT;
946 char *operation =bprintf("%s_init",__FUNCTION__);
947 TRACE_smpi_collective_in(rank, -1, operation, extra);
948 TRACE_smpi_collective_out(rank, -1, operation);
951 if (_xbt_replay_action_init()==0) {
952 xbt_replay_action_register("init", action_init);
953 xbt_replay_action_register("finalize", action_finalize);
954 xbt_replay_action_register("comm_size", action_comm_size);
955 xbt_replay_action_register("comm_split", action_comm_split);
956 xbt_replay_action_register("comm_dup", action_comm_dup);
957 xbt_replay_action_register("send", action_send);
958 xbt_replay_action_register("Isend", action_Isend);
959 xbt_replay_action_register("recv", action_recv);
960 xbt_replay_action_register("Irecv", action_Irecv);
961 xbt_replay_action_register("test", action_test);
962 xbt_replay_action_register("wait", action_wait);
963 xbt_replay_action_register("waitAll", action_waitall);
964 xbt_replay_action_register("barrier", action_barrier);
965 xbt_replay_action_register("bcast", action_bcast);
966 xbt_replay_action_register("reduce", action_reduce);
967 xbt_replay_action_register("allReduce", action_allReduce);
968 xbt_replay_action_register("allToAll", action_allToAll);
969 xbt_replay_action_register("allToAllV", action_allToAllv);
970 xbt_replay_action_register("gather", action_gather);
971 xbt_replay_action_register("gatherV", action_gatherv);
972 xbt_replay_action_register("allGather", action_allgather);
973 xbt_replay_action_register("allGatherV", action_allgatherv);
974 xbt_replay_action_register("reduceScatter", action_reducescatter);
975 xbt_replay_action_register("compute", action_compute);
978 //if we have a delayed start, sleep here.
981 double value = strtod((*argv)[2], &endptr);
983 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
984 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
985 smpi_execute_flops(value);
987 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
988 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
989 smpi_execute_flops(0.0);
992 /* Actually run the replay */
993 xbt_replay_action_runner(*argc, *argv);
995 /* and now, finalize everything */
997 /* One active process will stop. Decrease the counter*/
998 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
999 if (!get_reqq_self()->empty()){
1000 unsigned int count_requests=get_reqq_self()->size();
1001 MPI_Request requests[count_requests];
1002 MPI_Status status[count_requests];
1005 for (auto req: *get_reqq_self()){
1009 smpi_mpi_waitall(count_requests, requests, status);
1015 if(active_processes==0){
1016 /* Last process alive speaking */
1017 /* end the simulated timer */
1018 sim_time = smpi_process_simulated_elapsed();
1019 XBT_INFO("Simulation time %f", sim_time);
1020 _xbt_replay_action_exit();
1021 xbt_free(sendbuffer);
1022 xbt_free(recvbuffer);
1025 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1026 extra_fin->type = TRACING_FINALIZE;
1027 operation =bprintf("%s_finalize",__FUNCTION__);
1028 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1030 smpi_process_finalize();
1032 TRACE_smpi_collective_out(rank, -1, operation);
1033 TRACE_smpi_finalize(smpi_process_index());
1034 smpi_process_destroy();
1035 xbt_free(operation);