1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include <xbt/replay.h>
10 #include <unordered_map>
13 #define KEY_SIZE (sizeof(int) * 2 + 1)
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
17 int communicator_size = 0;
18 static int active_processes = 0;
19 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
21 MPI_Datatype MPI_DEFAULT_TYPE;
22 MPI_Datatype MPI_CURRENT_TYPE;
24 static int sendbuffer_size=0;
25 char* sendbuffer=nullptr;
26 static int recvbuffer_size=0;
27 char* recvbuffer=nullptr;
29 static void log_timed_action (const char *const *action, double clock){
30 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
31 char *name = xbt_str_join_array(action, " ");
32 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static std::vector<MPI_Request>* get_reqq_self()
39 return reqq.at(smpi_process_index());
42 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
44 reqq.insert({smpi_process_index(), mpi_request});
47 //allocate a single buffer for all sends, growing it if needed
48 void* smpi_get_tmp_sendbuffer(int size)
50 if (!smpi_process_get_replaying())
51 return xbt_malloc(size);
52 if (sendbuffer_size<size){
53 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59 //allocate a single buffer for all recv
60 void* smpi_get_tmp_recvbuffer(int size){
61 if (!smpi_process_get_replaying())
62 return xbt_malloc(size);
63 if (recvbuffer_size<size){
64 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70 void smpi_free_tmp_buffer(void* buf){
71 if (!smpi_process_get_replaying())
76 static double parse_double(const char *string)
79 double value = strtod(string, &endptr);
81 THROWF(unknown_error, 0, "%s is not a double", string);
85 static MPI_Datatype decode_datatype(const char *const action)
87 switch(atoi(action)) {
89 MPI_CURRENT_TYPE=MPI_DOUBLE;
92 MPI_CURRENT_TYPE=MPI_INT;
95 MPI_CURRENT_TYPE=MPI_CHAR;
98 MPI_CURRENT_TYPE=MPI_SHORT;
101 MPI_CURRENT_TYPE=MPI_LONG;
104 MPI_CURRENT_TYPE=MPI_FLOAT;
107 MPI_CURRENT_TYPE=MPI_BYTE;
110 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
112 return MPI_CURRENT_TYPE;
115 const char* encode_datatype(MPI_Datatype datatype, int* known)
117 //default type for output is set to MPI_BYTE
118 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
121 if (datatype==MPI_BYTE)
123 if(datatype==MPI_DOUBLE)
125 if(datatype==MPI_INT)
127 if(datatype==MPI_CHAR)
129 if(datatype==MPI_SHORT)
131 if(datatype==MPI_LONG)
133 if(datatype==MPI_FLOAT)
135 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
138 // default - not implemented.
139 // do not warn here as we pass in this function even for other trace formats
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145 while(action[i]!=nullptr)\
148 THROWF(arg_error, 0, "%s replay failed.\n" \
149 "%d items were given on the line. First two should be process_id and action. " \
150 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
154 static void action_init(const char *const *action)
156 XBT_DEBUG("Initialize the counters");
157 CHECK_ACTION_PARAMS(action, 0, 1)
159 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
160 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
162 /* start a simulated timer */
163 smpi_process_simulated_start();
164 /*initialize the number of active processes */
165 active_processes = smpi_process_count();
167 set_reqq_self(new std::vector<MPI_Request>);
170 static void action_finalize(const char *const *action)
175 static void action_comm_size(const char *const *action)
177 communicator_size = parse_double(action[2]);
178 log_timed_action (action, smpi_process_simulated_elapsed());
181 static void action_comm_split(const char *const *action)
183 log_timed_action (action, smpi_process_simulated_elapsed());
186 static void action_comm_dup(const char *const *action)
188 log_timed_action (action, smpi_process_simulated_elapsed());
191 static void action_compute(const char *const *action)
193 CHECK_ACTION_PARAMS(action, 1, 0)
194 double clock = smpi_process_simulated_elapsed();
195 double flops= parse_double(action[2]);
196 int rank = smpi_process_index();
197 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
198 extra->type=TRACING_COMPUTING;
199 extra->comp_size=flops;
200 TRACE_smpi_computing_in(rank, extra);
202 smpi_execute_flops(flops);
204 TRACE_smpi_computing_out(rank);
205 log_timed_action (action, clock);
208 static void action_send(const char *const *action)
210 CHECK_ACTION_PARAMS(action, 2, 1)
211 int to = atoi(action[2]);
212 double size=parse_double(action[3]);
213 double clock = smpi_process_simulated_elapsed();
216 MPI_CURRENT_TYPE=decode_datatype(action[4]);
218 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
220 int rank = smpi_process_index();
222 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
223 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
224 extra->type = TRACING_SEND;
225 extra->send_size = size;
227 extra->dst = dst_traced;
228 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
229 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
230 if (!TRACE_smpi_view_internals())
231 TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
233 smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
235 log_timed_action (action, clock);
237 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
240 static void action_Isend(const char *const *action)
242 CHECK_ACTION_PARAMS(action, 2, 1)
243 int to = atoi(action[2]);
244 double size=parse_double(action[3]);
245 double clock = smpi_process_simulated_elapsed();
248 MPI_CURRENT_TYPE=decode_datatype(action[4]);
250 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
252 int rank = smpi_process_index();
253 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
254 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
255 extra->type = TRACING_ISEND;
256 extra->send_size = size;
258 extra->dst = dst_traced;
259 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
260 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
261 if (!TRACE_smpi_view_internals())
262 TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
264 MPI_Request request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
266 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
269 get_reqq_self()->push_back(request);
271 log_timed_action (action, clock);
274 static void action_recv(const char *const *action) {
275 CHECK_ACTION_PARAMS(action, 2, 1)
276 int from = atoi(action[2]);
277 double size=parse_double(action[3]);
278 double clock = smpi_process_simulated_elapsed();
282 MPI_CURRENT_TYPE=decode_datatype(action[4]);
284 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
286 int rank = smpi_process_index();
287 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
289 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
290 extra->type = TRACING_RECV;
291 extra->send_size = size;
292 extra->src = src_traced;
294 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
295 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
297 //unknown size from the receiver point of view
299 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
303 smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
305 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
306 if (!TRACE_smpi_view_internals()) {
307 TRACE_smpi_recv(rank, src_traced, rank, 0);
310 log_timed_action (action, clock);
313 static void action_Irecv(const char *const *action)
315 CHECK_ACTION_PARAMS(action, 2, 1)
316 int from = atoi(action[2]);
317 double size=parse_double(action[3]);
318 double clock = smpi_process_simulated_elapsed();
321 MPI_CURRENT_TYPE=decode_datatype(action[4]);
323 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
325 int rank = smpi_process_index();
326 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
327 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328 extra->type = TRACING_IRECV;
329 extra->send_size = size;
330 extra->src = src_traced;
332 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
333 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
335 //unknow size from the receiver pov
337 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
341 MPI_Request request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
343 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
345 get_reqq_self()->push_back(request);
347 log_timed_action (action, clock);
350 static void action_test(const char *const *action){
351 CHECK_ACTION_PARAMS(action, 0, 0)
352 double clock = smpi_process_simulated_elapsed();
355 MPI_Request request = get_reqq_self()->back();
356 get_reqq_self()->pop_back();
357 //if request is null here, this may mean that a previous test has succeeded
358 //Different times in traced application and replayed version may lead to this
359 //In this case, ignore the extra calls.
360 if(request!=nullptr){
361 int rank = smpi_process_index();
362 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
363 extra->type=TRACING_TEST;
364 TRACE_smpi_testing_in(rank, extra);
366 int flag = smpi_mpi_test(&request, &status);
368 XBT_DEBUG("MPI_Test result: %d", flag);
369 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
370 get_reqq_self()->push_back(request);
372 TRACE_smpi_testing_out(rank);
374 log_timed_action (action, clock);
377 static void action_wait(const char *const *action){
378 CHECK_ACTION_PARAMS(action, 0, 0)
379 double clock = smpi_process_simulated_elapsed();
382 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
383 xbt_str_join_array(action," "));
384 MPI_Request request = get_reqq_self()->back();
385 get_reqq_self()->pop_back();
387 if (request==nullptr){
388 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
392 int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
394 MPI_Group group = smpi_comm_group(request->comm);
395 int src_traced = smpi_group_rank(group, request->src);
396 int dst_traced = smpi_group_rank(group, request->dst);
397 int is_wait_for_receive = request->recv;
398 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
399 extra->type = TRACING_WAIT;
400 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
402 smpi_mpi_wait(&request, &status);
404 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
405 if (is_wait_for_receive)
406 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
407 log_timed_action (action, clock);
410 static void action_waitall(const char *const *action){
411 CHECK_ACTION_PARAMS(action, 0, 0)
412 double clock = smpi_process_simulated_elapsed();
413 unsigned int count_requests=get_reqq_self()->size();
415 if (count_requests>0) {
416 MPI_Status status[count_requests];
418 int rank_traced = smpi_process_index();
419 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
420 extra->type = TRACING_WAITALL;
421 extra->send_size=count_requests;
422 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
423 int recvs_snd[count_requests];
424 int recvs_rcv[count_requests];
426 for (auto req : *(get_reqq_self())){
427 if (req && req->recv){
428 recvs_snd[i]=req->src;
429 recvs_rcv[i]=req->dst;
434 smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
436 for (i=0; i<count_requests;i++){
437 if (recvs_snd[i]!=-100)
438 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
440 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
442 log_timed_action (action, clock);
445 static void action_barrier(const char *const *action){
446 double clock = smpi_process_simulated_elapsed();
447 int rank = smpi_process_index();
448 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
449 extra->type = TRACING_BARRIER;
450 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
452 mpi_coll_barrier_fun(MPI_COMM_WORLD);
454 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
455 log_timed_action (action, clock);
458 static void action_bcast(const char *const *action)
460 CHECK_ACTION_PARAMS(action, 1, 2)
461 double size = parse_double(action[2]);
462 double clock = smpi_process_simulated_elapsed();
464 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
465 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
468 root= atoi(action[3]);
470 MPI_CURRENT_TYPE=decode_datatype(action[4]);
473 int rank = smpi_process_index();
474 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
476 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
477 extra->type = TRACING_BCAST;
478 extra->send_size = size;
479 extra->root = root_traced;
480 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
481 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
482 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
484 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
486 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
487 log_timed_action (action, clock);
490 static void action_reduce(const char *const *action)
492 CHECK_ACTION_PARAMS(action, 2, 2)
493 double comm_size = parse_double(action[2]);
494 double comp_size = parse_double(action[3]);
495 double clock = smpi_process_simulated_elapsed();
497 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
500 root= atoi(action[4]);
502 MPI_CURRENT_TYPE=decode_datatype(action[5]);
505 int rank = smpi_process_index();
506 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
507 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
508 extra->type = TRACING_REDUCE;
509 extra->send_size = comm_size;
510 extra->comp_size = comp_size;
511 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
512 extra->root = root_traced;
514 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
516 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
517 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
518 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
519 smpi_execute_flops(comp_size);
521 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
522 log_timed_action (action, clock);
525 static void action_allReduce(const char *const *action) {
526 CHECK_ACTION_PARAMS(action, 2, 1)
527 double comm_size = parse_double(action[2]);
528 double comp_size = parse_double(action[3]);
531 MPI_CURRENT_TYPE=decode_datatype(action[4]);
533 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
535 double clock = smpi_process_simulated_elapsed();
536 int rank = smpi_process_index();
537 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
538 extra->type = TRACING_ALLREDUCE;
539 extra->send_size = comm_size;
540 extra->comp_size = comp_size;
541 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
542 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
544 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
545 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
546 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
547 smpi_execute_flops(comp_size);
549 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
550 log_timed_action (action, clock);
553 static void action_allToAll(const char *const *action) {
554 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
555 double clock = smpi_process_simulated_elapsed();
556 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
557 int send_size = parse_double(action[2]);
558 int recv_size = parse_double(action[3]);
559 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
561 if(action[4] && action[5]) {
562 MPI_CURRENT_TYPE=decode_datatype(action[4]);
563 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
566 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
568 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
569 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
571 int rank = smpi_process_index();
572 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573 extra->type = TRACING_ALLTOALL;
574 extra->send_size = send_size;
575 extra->recv_size = recv_size;
576 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
577 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
579 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
581 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
583 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
584 log_timed_action (action, clock);
587 static void action_gather(const char *const *action) {
588 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
591 1) 68 is the sendcounts
592 2) 68 is the recvcounts
593 3) 0 is the root node
594 4) 0 is the send datatype id, see decode_datatype()
595 5) 0 is the recv datatype id, see decode_datatype()
597 CHECK_ACTION_PARAMS(action, 2, 3)
598 double clock = smpi_process_simulated_elapsed();
599 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
600 int send_size = parse_double(action[2]);
601 int recv_size = parse_double(action[3]);
602 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
603 if(action[4] && action[5]) {
604 MPI_CURRENT_TYPE=decode_datatype(action[5]);
605 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
607 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
609 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
610 void *recv = nullptr;
613 root=atoi(action[4]);
614 int rank = smpi_comm_rank(MPI_COMM_WORLD);
617 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
619 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
620 extra->type = TRACING_GATHER;
621 extra->send_size = send_size;
622 extra->recv_size = recv_size;
624 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
625 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
627 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
629 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
631 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
632 log_timed_action (action, clock);
635 static void action_gatherv(const char *const *action) {
636 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
637 0 gather 68 68 10 10 10 0 0 0
639 1) 68 is the sendcount
640 2) 68 10 10 10 is the recvcounts
641 3) 0 is the root node
642 4) 0 is the send datatype id, see decode_datatype()
643 5) 0 is the recv datatype id, see decode_datatype()
645 double clock = smpi_process_simulated_elapsed();
646 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
647 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
648 int send_size = parse_double(action[2]);
649 int disps[comm_size];
650 int recvcounts[comm_size];
653 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
654 if(action[4+comm_size] && action[5+comm_size]) {
655 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
656 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
658 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
660 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
661 void *recv = nullptr;
662 for(int i=0;i<comm_size;i++) {
663 recvcounts[i] = atoi(action[i+3]);
664 recv_sum=recv_sum+recvcounts[i];
668 int root=atoi(action[3+comm_size]);
669 int rank = smpi_comm_rank(MPI_COMM_WORLD);
672 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
674 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
675 extra->type = TRACING_GATHERV;
676 extra->send_size = send_size;
677 extra->recvcounts= xbt_new(int,comm_size);
678 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
679 extra->recvcounts[i] = recvcounts[i];
681 extra->num_processes = comm_size;
682 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
683 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
685 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
687 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
689 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
690 log_timed_action (action, clock);
693 static void action_reducescatter(const char *const *action) {
694 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
695 0 reduceScatter 275427 275427 275427 204020 11346849 0
697 1) The first four values after the name of the action declare the recvcounts array
698 2) The value 11346849 is the amount of instructions
699 3) The last value corresponds to the datatype, see decode_datatype().
701 double clock = smpi_process_simulated_elapsed();
702 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
703 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
704 int comp_size = parse_double(action[2+comm_size]);
705 int recvcounts[comm_size];
706 int rank = smpi_process_index();
708 if(action[3+comm_size])
709 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
711 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
713 for(int i=0;i<comm_size;i++) {
714 recvcounts[i] = atoi(action[i+2]);
718 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
719 extra->type = TRACING_REDUCE_SCATTER;
720 extra->send_size = 0;
721 extra->recvcounts= xbt_new(int, comm_size);
722 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
723 extra->recvcounts[i] = recvcounts[i];
724 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
725 extra->comp_size = comp_size;
726 extra->num_processes = comm_size;
728 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
730 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
731 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
733 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
734 smpi_execute_flops(comp_size);
736 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
737 log_timed_action (action, clock);
740 static void action_allgather(const char *const *action) {
741 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
742 0 allGather 275427 275427
744 1) 275427 is the sendcount
745 2) 275427 is the recvcount
746 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
748 double clock = smpi_process_simulated_elapsed();
750 CHECK_ACTION_PARAMS(action, 2, 2)
751 int sendcount=atoi(action[2]);
752 int recvcount=atoi(action[3]);
754 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
756 if(action[4] && action[5]) {
757 MPI_CURRENT_TYPE = decode_datatype(action[4]);
758 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
760 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
762 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
763 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
765 int rank = smpi_process_index();
766 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
767 extra->type = TRACING_ALLGATHER;
768 extra->send_size = sendcount;
769 extra->recv_size= recvcount;
770 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
771 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
772 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
774 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
776 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
778 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
779 log_timed_action (action, clock);
782 static void action_allgatherv(const char *const *action) {
783 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
784 0 allGatherV 275427 275427 275427 275427 204020
786 1) 275427 is the sendcount
787 2) The next four elements declare the recvcounts array
788 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
790 double clock = smpi_process_simulated_elapsed();
792 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
793 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
794 int sendcount=atoi(action[2]);
795 int recvcounts[comm_size];
796 int disps[comm_size];
798 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
800 if(action[3+comm_size] && action[4+comm_size]) {
801 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
802 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
804 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
806 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
808 for(int i=0;i<comm_size;i++) {
809 recvcounts[i] = atoi(action[i+3]);
810 recv_sum=recv_sum+recvcounts[i];
813 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
815 int rank = smpi_process_index();
816 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
817 extra->type = TRACING_ALLGATHERV;
818 extra->send_size = sendcount;
819 extra->recvcounts= xbt_new(int, comm_size);
820 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
821 extra->recvcounts[i] = recvcounts[i];
822 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
823 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
824 extra->num_processes = comm_size;
826 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
828 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
831 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
832 log_timed_action (action, clock);
835 static void action_allToAllv(const char *const *action) {
836 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
837 0 allToAllV 100 1 7 10 12 100 1 70 10 5
839 1) 100 is the size of the send buffer *sizeof(int),
840 2) 1 7 10 12 is the sendcounts array
841 3) 100*sizeof(int) is the size of the receiver buffer
842 4) 1 70 10 5 is the recvcounts array
844 double clock = smpi_process_simulated_elapsed();
846 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
847 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
848 int sendcounts[comm_size];
849 int recvcounts[comm_size];
850 int senddisps[comm_size];
851 int recvdisps[comm_size];
853 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
855 int send_buf_size=parse_double(action[2]);
856 int recv_buf_size=parse_double(action[3+comm_size]);
857 if(action[4+2*comm_size] && action[5+2*comm_size]) {
858 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
859 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
862 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
864 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
865 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
867 for(int i=0;i<comm_size;i++) {
868 sendcounts[i] = atoi(action[i+3]);
869 recvcounts[i] = atoi(action[i+4+comm_size]);
874 int rank = smpi_process_index();
875 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
876 extra->type = TRACING_ALLTOALLV;
877 extra->recvcounts= xbt_new(int, comm_size);
878 extra->sendcounts= xbt_new(int, comm_size);
879 extra->num_processes = comm_size;
881 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
882 extra->send_size += sendcounts[i];
883 extra->sendcounts[i] = sendcounts[i];
884 extra->recv_size += recvcounts[i];
885 extra->recvcounts[i] = recvcounts[i];
887 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
888 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
890 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
892 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
893 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
895 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
896 log_timed_action (action, clock);
899 void smpi_replay_run(int *argc, char***argv){
900 /* First initializes everything */
901 smpi_process_init(argc, argv);
902 smpi_process_mark_as_initialized();
903 smpi_process_set_replaying(true);
905 int rank = smpi_process_index();
906 TRACE_smpi_init(rank);
907 TRACE_smpi_computing_init(rank);
908 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
909 extra->type = TRACING_INIT;
910 char *operation =bprintf("%s_init",__FUNCTION__);
911 TRACE_smpi_collective_in(rank, -1, operation, extra);
912 TRACE_smpi_collective_out(rank, -1, operation);
915 if (_xbt_replay_action_init()==0) {
916 xbt_replay_action_register("init", action_init);
917 xbt_replay_action_register("finalize", action_finalize);
918 xbt_replay_action_register("comm_size", action_comm_size);
919 xbt_replay_action_register("comm_split", action_comm_split);
920 xbt_replay_action_register("comm_dup", action_comm_dup);
921 xbt_replay_action_register("send", action_send);
922 xbt_replay_action_register("Isend", action_Isend);
923 xbt_replay_action_register("recv", action_recv);
924 xbt_replay_action_register("Irecv", action_Irecv);
925 xbt_replay_action_register("test", action_test);
926 xbt_replay_action_register("wait", action_wait);
927 xbt_replay_action_register("waitAll", action_waitall);
928 xbt_replay_action_register("barrier", action_barrier);
929 xbt_replay_action_register("bcast", action_bcast);
930 xbt_replay_action_register("reduce", action_reduce);
931 xbt_replay_action_register("allReduce", action_allReduce);
932 xbt_replay_action_register("allToAll", action_allToAll);
933 xbt_replay_action_register("allToAllV", action_allToAllv);
934 xbt_replay_action_register("gather", action_gather);
935 xbt_replay_action_register("gatherV", action_gatherv);
936 xbt_replay_action_register("allGather", action_allgather);
937 xbt_replay_action_register("allGatherV", action_allgatherv);
938 xbt_replay_action_register("reduceScatter", action_reducescatter);
939 xbt_replay_action_register("compute", action_compute);
942 //if we have a delayed start, sleep here.
945 double value = strtod((*argv)[2], &endptr);
947 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
948 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
949 smpi_execute_flops(value);
951 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
952 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
953 smpi_execute_flops(0.0);
956 /* Actually run the replay */
957 xbt_replay_action_runner(*argc, *argv);
959 /* and now, finalize everything */
960 /* One active process will stop. Decrease the counter*/
961 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
962 if (!get_reqq_self()->empty()){
963 unsigned int count_requests=get_reqq_self()->size();
964 MPI_Request requests[count_requests];
965 MPI_Status status[count_requests];
968 for (auto req: *get_reqq_self()){
972 smpi_mpi_waitall(count_requests, requests, status);
976 if(active_processes==0){
977 /* Last process alive speaking: end the simulated timer */
978 XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
979 _xbt_replay_action_exit();
980 xbt_free(sendbuffer);
981 xbt_free(recvbuffer);
984 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
985 extra_fin->type = TRACING_FINALIZE;
986 operation =bprintf("%s_finalize",__FUNCTION__);
987 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
989 smpi_process_finalize();
991 TRACE_smpi_collective_out(rank, -1, operation);
992 TRACE_smpi_finalize(smpi_process_index());
993 smpi_process_destroy();