1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "xbt/replay.hpp"
8 #include <unordered_map>
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15 int communicator_size = 0;
16 static int active_processes = 0;
17 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
22 static int sendbuffer_size=0;
23 char* sendbuffer=nullptr;
24 static int recvbuffer_size=0;
25 char* recvbuffer=nullptr;
27 static void log_timed_action (const char *const *action, double clock){
28 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
29 char *name = xbt_str_join_array(action, " ");
30 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
35 static std::vector<MPI_Request>* get_reqq_self()
37 return reqq.at(smpi_process()->index());
40 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
42 reqq.insert({smpi_process()->index(), mpi_request});
45 //allocate a single buffer for all sends, growing it if needed
46 void* smpi_get_tmp_sendbuffer(int size)
48 if (!smpi_process()->replaying())
49 return xbt_malloc(size);
50 if (sendbuffer_size<size){
51 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
57 //allocate a single buffer for all recv
58 void* smpi_get_tmp_recvbuffer(int size){
59 if (!smpi_process()->replaying())
60 return xbt_malloc(size);
61 if (recvbuffer_size<size){
62 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
68 void smpi_free_tmp_buffer(void* buf){
69 if (!smpi_process()->replaying())
74 static double parse_double(const char *string)
77 double value = strtod(string, &endptr);
79 THROWF(unknown_error, 0, "%s is not a double", string);
83 static MPI_Datatype decode_datatype(const char *const action)
85 switch(atoi(action)) {
87 MPI_CURRENT_TYPE=MPI_DOUBLE;
90 MPI_CURRENT_TYPE=MPI_INT;
93 MPI_CURRENT_TYPE=MPI_CHAR;
96 MPI_CURRENT_TYPE=MPI_SHORT;
99 MPI_CURRENT_TYPE=MPI_LONG;
102 MPI_CURRENT_TYPE=MPI_FLOAT;
105 MPI_CURRENT_TYPE=MPI_BYTE;
108 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
110 return MPI_CURRENT_TYPE;
113 const char* encode_datatype(MPI_Datatype datatype, int* known)
115 //default type for output is set to MPI_BYTE
116 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
119 if (datatype==MPI_BYTE)
121 if(datatype==MPI_DOUBLE)
123 if(datatype==MPI_INT)
125 if(datatype==MPI_CHAR)
127 if(datatype==MPI_SHORT)
129 if(datatype==MPI_LONG)
131 if(datatype==MPI_FLOAT)
133 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
136 // default - not implemented.
137 // do not warn here as we pass in this function even for other trace formats
141 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
143 while(action[i]!=nullptr)\
146 THROWF(arg_error, 0, "%s replay failed.\n" \
147 "%d items were given on the line. First two should be process_id and action. " \
148 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
149 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
155 static void action_init(const char *const *action)
157 XBT_DEBUG("Initialize the counters");
158 CHECK_ACTION_PARAMS(action, 0, 1)
160 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
161 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
163 /* start a simulated timer */
164 smpi_process()->simulated_start();
165 /*initialize the number of active processes */
166 active_processes = smpi_process_count();
168 set_reqq_self(new std::vector<MPI_Request>);
171 static void action_finalize(const char *const *action)
176 static void action_comm_size(const char *const *action)
178 communicator_size = parse_double(action[2]);
179 log_timed_action (action, smpi_process()->simulated_elapsed());
182 static void action_comm_split(const char *const *action)
184 log_timed_action (action, smpi_process()->simulated_elapsed());
187 static void action_comm_dup(const char *const *action)
189 log_timed_action (action, smpi_process()->simulated_elapsed());
192 static void action_compute(const char *const *action)
194 CHECK_ACTION_PARAMS(action, 1, 0)
195 double clock = smpi_process()->simulated_elapsed();
196 double flops= parse_double(action[2]);
197 int rank = smpi_process()->index();
198 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
199 extra->type=TRACING_COMPUTING;
200 extra->comp_size=flops;
201 TRACE_smpi_computing_in(rank, extra);
203 smpi_execute_flops(flops);
205 TRACE_smpi_computing_out(rank);
206 log_timed_action (action, clock);
209 static void action_send(const char *const *action)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 int to = atoi(action[2]);
213 double size=parse_double(action[3]);
214 double clock = smpi_process()->simulated_elapsed();
217 MPI_CURRENT_TYPE=decode_datatype(action[4]);
219 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
221 int rank = smpi_process()->index();
223 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
224 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
225 extra->type = TRACING_SEND;
226 extra->send_size = size;
228 extra->dst = dst_traced;
229 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
230 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
231 if (!TRACE_smpi_view_internals())
232 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
234 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
236 log_timed_action (action, clock);
238 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
241 static void action_Isend(const char *const *action)
243 CHECK_ACTION_PARAMS(action, 2, 1)
244 int to = atoi(action[2]);
245 double size=parse_double(action[3]);
246 double clock = smpi_process()->simulated_elapsed();
249 MPI_CURRENT_TYPE=decode_datatype(action[4]);
251 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
253 int rank = smpi_process()->index();
254 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
255 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
256 extra->type = TRACING_ISEND;
257 extra->send_size = size;
259 extra->dst = dst_traced;
260 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
261 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
262 if (!TRACE_smpi_view_internals())
263 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
265 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
267 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
269 get_reqq_self()->push_back(request);
271 log_timed_action (action, clock);
274 static void action_recv(const char *const *action) {
275 CHECK_ACTION_PARAMS(action, 2, 1)
276 int from = atoi(action[2]);
277 double size=parse_double(action[3]);
278 double clock = smpi_process()->simulated_elapsed();
282 MPI_CURRENT_TYPE=decode_datatype(action[4]);
284 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
286 int rank = smpi_process()->index();
287 int src_traced = MPI_COMM_WORLD->group()->rank(from);
289 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
290 extra->type = TRACING_RECV;
291 extra->send_size = size;
292 extra->src = src_traced;
294 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
295 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
297 //unknown size from the receiver point of view
299 Request::probe(from, 0, MPI_COMM_WORLD, &status);
303 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
305 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
306 if (!TRACE_smpi_view_internals()) {
307 TRACE_smpi_recv(rank, src_traced, rank, 0);
310 log_timed_action (action, clock);
313 static void action_Irecv(const char *const *action)
315 CHECK_ACTION_PARAMS(action, 2, 1)
316 int from = atoi(action[2]);
317 double size=parse_double(action[3]);
318 double clock = smpi_process()->simulated_elapsed();
321 MPI_CURRENT_TYPE=decode_datatype(action[4]);
323 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
325 int rank = smpi_process()->index();
326 int src_traced = MPI_COMM_WORLD->group()->rank(from);
327 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328 extra->type = TRACING_IRECV;
329 extra->send_size = size;
330 extra->src = src_traced;
332 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
333 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
335 //unknow size from the receiver pov
337 Request::probe(from, 0, MPI_COMM_WORLD, &status);
341 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
343 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
344 get_reqq_self()->push_back(request);
346 log_timed_action (action, clock);
349 static void action_test(const char *const *action){
350 CHECK_ACTION_PARAMS(action, 0, 0)
351 double clock = smpi_process()->simulated_elapsed();
354 MPI_Request request = get_reqq_self()->back();
355 get_reqq_self()->pop_back();
356 //if request is null here, this may mean that a previous test has succeeded
357 //Different times in traced application and replayed version may lead to this
358 //In this case, ignore the extra calls.
359 if(request!=nullptr){
360 int rank = smpi_process()->index();
361 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
362 extra->type=TRACING_TEST;
363 TRACE_smpi_testing_in(rank, extra);
365 int flag = Request::test(&request, &status);
367 XBT_DEBUG("MPI_Test result: %d", flag);
368 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
369 get_reqq_self()->push_back(request);
371 TRACE_smpi_testing_out(rank);
373 log_timed_action (action, clock);
376 static void action_wait(const char *const *action){
377 CHECK_ACTION_PARAMS(action, 0, 0)
378 double clock = smpi_process()->simulated_elapsed();
381 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
382 xbt_str_join_array(action," "));
383 MPI_Request request = get_reqq_self()->back();
384 get_reqq_self()->pop_back();
386 if (request==nullptr){
387 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
391 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
393 MPI_Group group = request->comm()->group();
394 int src_traced = group->rank(request->src());
395 int dst_traced = group->rank(request->dst());
396 int is_wait_for_receive = (request->flags() & RECV);
397 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
398 extra->type = TRACING_WAIT;
399 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
401 Request::wait(&request, &status);
403 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
404 if (is_wait_for_receive)
405 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
406 log_timed_action (action, clock);
409 static void action_waitall(const char *const *action){
410 CHECK_ACTION_PARAMS(action, 0, 0)
411 double clock = smpi_process()->simulated_elapsed();
412 unsigned int count_requests=get_reqq_self()->size();
414 if (count_requests>0) {
415 MPI_Status status[count_requests];
417 int rank_traced = smpi_process()->index();
418 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
419 extra->type = TRACING_WAITALL;
420 extra->send_size=count_requests;
421 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
422 int recvs_snd[count_requests];
423 int recvs_rcv[count_requests];
425 for (auto req : *(get_reqq_self())){
426 if (req && (req->flags () & RECV)){
427 recvs_snd[i]=req->src();
428 recvs_rcv[i]=req->dst();
433 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
435 for (i=0; i<count_requests;i++){
436 if (recvs_snd[i]!=-100)
437 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
439 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
441 log_timed_action (action, clock);
444 static void action_barrier(const char *const *action){
445 double clock = smpi_process()->simulated_elapsed();
446 int rank = smpi_process()->index();
447 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
448 extra->type = TRACING_BARRIER;
449 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
451 Colls::barrier(MPI_COMM_WORLD);
453 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
454 log_timed_action (action, clock);
457 static void action_bcast(const char *const *action)
459 CHECK_ACTION_PARAMS(action, 1, 2)
460 double size = parse_double(action[2]);
461 double clock = smpi_process()->simulated_elapsed();
463 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
464 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
467 root= atoi(action[3]);
469 MPI_CURRENT_TYPE=decode_datatype(action[4]);
472 int rank = smpi_process()->index();
473 int root_traced = MPI_COMM_WORLD->group()->index(root);
475 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
476 extra->type = TRACING_BCAST;
477 extra->send_size = size;
478 extra->root = root_traced;
479 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
480 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
481 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
483 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
485 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
486 log_timed_action (action, clock);
489 static void action_reduce(const char *const *action)
491 CHECK_ACTION_PARAMS(action, 2, 2)
492 double comm_size = parse_double(action[2]);
493 double comp_size = parse_double(action[3]);
494 double clock = smpi_process()->simulated_elapsed();
496 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
499 root= atoi(action[4]);
501 MPI_CURRENT_TYPE=decode_datatype(action[5]);
504 int rank = smpi_process()->index();
505 int root_traced = MPI_COMM_WORLD->group()->rank(root);
506 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
507 extra->type = TRACING_REDUCE;
508 extra->send_size = comm_size;
509 extra->comp_size = comp_size;
510 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
511 extra->root = root_traced;
513 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
515 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
516 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
517 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
518 smpi_execute_flops(comp_size);
520 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
521 log_timed_action (action, clock);
524 static void action_allReduce(const char *const *action) {
525 CHECK_ACTION_PARAMS(action, 2, 1)
526 double comm_size = parse_double(action[2]);
527 double comp_size = parse_double(action[3]);
530 MPI_CURRENT_TYPE=decode_datatype(action[4]);
532 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
534 double clock = smpi_process()->simulated_elapsed();
535 int rank = smpi_process()->index();
536 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537 extra->type = TRACING_ALLREDUCE;
538 extra->send_size = comm_size;
539 extra->comp_size = comp_size;
540 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
541 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
543 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
544 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
545 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
546 smpi_execute_flops(comp_size);
548 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
549 log_timed_action (action, clock);
552 static void action_allToAll(const char *const *action) {
553 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
554 double clock = smpi_process()->simulated_elapsed();
555 int comm_size = MPI_COMM_WORLD->size();
556 int send_size = parse_double(action[2]);
557 int recv_size = parse_double(action[3]);
558 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
560 if(action[4] && action[5]) {
561 MPI_CURRENT_TYPE=decode_datatype(action[4]);
562 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
565 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
567 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
568 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
570 int rank = smpi_process()->index();
571 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
572 extra->type = TRACING_ALLTOALL;
573 extra->send_size = send_size;
574 extra->recv_size = recv_size;
575 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
576 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
578 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
580 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
582 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
583 log_timed_action (action, clock);
586 static void action_gather(const char *const *action) {
587 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
590 1) 68 is the sendcounts
591 2) 68 is the recvcounts
592 3) 0 is the root node
593 4) 0 is the send datatype id, see decode_datatype()
594 5) 0 is the recv datatype id, see decode_datatype()
596 CHECK_ACTION_PARAMS(action, 2, 3)
597 double clock = smpi_process()->simulated_elapsed();
598 int comm_size = MPI_COMM_WORLD->size();
599 int send_size = parse_double(action[2]);
600 int recv_size = parse_double(action[3]);
601 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
602 if(action[4] && action[5]) {
603 MPI_CURRENT_TYPE=decode_datatype(action[5]);
604 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
606 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
608 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
609 void *recv = nullptr;
612 root=atoi(action[4]);
613 int rank = MPI_COMM_WORLD->rank();
616 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
618 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
619 extra->type = TRACING_GATHER;
620 extra->send_size = send_size;
621 extra->recv_size = recv_size;
623 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
624 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
626 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
628 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
630 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
631 log_timed_action (action, clock);
634 static void action_gatherv(const char *const *action) {
635 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
636 0 gather 68 68 10 10 10 0 0 0
638 1) 68 is the sendcount
639 2) 68 10 10 10 is the recvcounts
640 3) 0 is the root node
641 4) 0 is the send datatype id, see decode_datatype()
642 5) 0 is the recv datatype id, see decode_datatype()
644 double clock = smpi_process()->simulated_elapsed();
645 int comm_size = MPI_COMM_WORLD->size();
646 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
647 int send_size = parse_double(action[2]);
648 int disps[comm_size];
649 int recvcounts[comm_size];
652 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
653 if(action[4+comm_size] && action[5+comm_size]) {
654 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
655 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
657 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
659 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
660 void *recv = nullptr;
661 for(int i=0;i<comm_size;i++) {
662 recvcounts[i] = atoi(action[i+3]);
663 recv_sum=recv_sum+recvcounts[i];
667 int root=atoi(action[3+comm_size]);
668 int rank = MPI_COMM_WORLD->rank();
671 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
673 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
674 extra->type = TRACING_GATHERV;
675 extra->send_size = send_size;
676 extra->recvcounts= xbt_new(int,comm_size);
677 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
678 extra->recvcounts[i] = recvcounts[i];
680 extra->num_processes = comm_size;
681 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
682 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
684 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
686 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
688 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
689 log_timed_action (action, clock);
692 static void action_reducescatter(const char *const *action) {
693 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
694 0 reduceScatter 275427 275427 275427 204020 11346849 0
696 1) The first four values after the name of the action declare the recvcounts array
697 2) The value 11346849 is the amount of instructions
698 3) The last value corresponds to the datatype, see decode_datatype().
700 double clock = smpi_process()->simulated_elapsed();
701 int comm_size = MPI_COMM_WORLD->size();
702 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
703 int comp_size = parse_double(action[2+comm_size]);
704 int recvcounts[comm_size];
705 int rank = smpi_process()->index();
707 if(action[3+comm_size])
708 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
710 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
712 for(int i=0;i<comm_size;i++) {
713 recvcounts[i] = atoi(action[i+2]);
717 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
718 extra->type = TRACING_REDUCE_SCATTER;
719 extra->send_size = 0;
720 extra->recvcounts= xbt_new(int, comm_size);
721 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
722 extra->recvcounts[i] = recvcounts[i];
723 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
724 extra->comp_size = comp_size;
725 extra->num_processes = comm_size;
727 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
729 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
730 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
732 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
733 smpi_execute_flops(comp_size);
735 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
736 log_timed_action (action, clock);
739 static void action_allgather(const char *const *action) {
740 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
741 0 allGather 275427 275427
743 1) 275427 is the sendcount
744 2) 275427 is the recvcount
745 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
747 double clock = smpi_process()->simulated_elapsed();
749 CHECK_ACTION_PARAMS(action, 2, 2)
750 int sendcount=atoi(action[2]);
751 int recvcount=atoi(action[3]);
753 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
755 if(action[4] && action[5]) {
756 MPI_CURRENT_TYPE = decode_datatype(action[4]);
757 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
759 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
761 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
762 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
764 int rank = smpi_process()->index();
765 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
766 extra->type = TRACING_ALLGATHER;
767 extra->send_size = sendcount;
768 extra->recv_size= recvcount;
769 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
770 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
771 extra->num_processes = MPI_COMM_WORLD->size();
773 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
775 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
777 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
778 log_timed_action (action, clock);
781 static void action_allgatherv(const char *const *action) {
782 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
783 0 allGatherV 275427 275427 275427 275427 204020
785 1) 275427 is the sendcount
786 2) The next four elements declare the recvcounts array
787 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
789 double clock = smpi_process()->simulated_elapsed();
791 int comm_size = MPI_COMM_WORLD->size();
792 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
793 int sendcount=atoi(action[2]);
794 int recvcounts[comm_size];
795 int disps[comm_size];
797 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
799 if(action[3+comm_size] && action[4+comm_size]) {
800 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
801 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
803 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
805 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
807 for(int i=0;i<comm_size;i++) {
808 recvcounts[i] = atoi(action[i+3]);
809 recv_sum=recv_sum+recvcounts[i];
812 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
814 int rank = smpi_process()->index();
815 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
816 extra->type = TRACING_ALLGATHERV;
817 extra->send_size = sendcount;
818 extra->recvcounts= xbt_new(int, comm_size);
819 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
820 extra->recvcounts[i] = recvcounts[i];
821 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
822 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
823 extra->num_processes = comm_size;
825 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
827 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
830 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
831 log_timed_action (action, clock);
834 static void action_allToAllv(const char *const *action) {
835 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
836 0 allToAllV 100 1 7 10 12 100 1 70 10 5
838 1) 100 is the size of the send buffer *sizeof(int),
839 2) 1 7 10 12 is the sendcounts array
840 3) 100*sizeof(int) is the size of the receiver buffer
841 4) 1 70 10 5 is the recvcounts array
843 double clock = smpi_process()->simulated_elapsed();
845 int comm_size = MPI_COMM_WORLD->size();
846 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
847 int sendcounts[comm_size];
848 int recvcounts[comm_size];
849 int senddisps[comm_size];
850 int recvdisps[comm_size];
852 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
854 int send_buf_size=parse_double(action[2]);
855 int recv_buf_size=parse_double(action[3+comm_size]);
856 if(action[4+2*comm_size] && action[5+2*comm_size]) {
857 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
858 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
861 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
863 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
864 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
866 for(int i=0;i<comm_size;i++) {
867 sendcounts[i] = atoi(action[i+3]);
868 recvcounts[i] = atoi(action[i+4+comm_size]);
873 int rank = smpi_process()->index();
874 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
875 extra->type = TRACING_ALLTOALLV;
876 extra->recvcounts= xbt_new(int, comm_size);
877 extra->sendcounts= xbt_new(int, comm_size);
878 extra->num_processes = comm_size;
880 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
881 extra->send_size += sendcounts[i];
882 extra->sendcounts[i] = sendcounts[i];
883 extra->recv_size += recvcounts[i];
884 extra->recvcounts[i] = recvcounts[i];
886 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
887 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
889 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
891 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
892 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
894 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
895 log_timed_action (action, clock);
898 }} // namespace simgrid::smpi
900 void smpi_replay_run(int *argc, char***argv){
901 /* First initializes everything */
902 simgrid::smpi::Process::init(argc, argv);
903 smpi_process()->mark_as_initialized();
904 smpi_process()->set_replaying(true);
906 int rank = smpi_process()->index();
907 TRACE_smpi_init(rank);
908 TRACE_smpi_computing_init(rank);
909 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
910 extra->type = TRACING_INIT;
911 char *operation =bprintf("%s_init",__FUNCTION__);
912 TRACE_smpi_collective_in(rank, -1, operation, extra);
913 TRACE_smpi_collective_out(rank, -1, operation);
915 xbt_replay_action_register("init", simgrid::smpi::action_init);
916 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
917 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
918 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
919 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
920 xbt_replay_action_register("send", simgrid::smpi::action_send);
921 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
922 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
923 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
924 xbt_replay_action_register("test", simgrid::smpi::action_test);
925 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
926 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
927 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
928 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
929 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
930 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
931 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
932 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
933 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
934 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
935 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
936 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
937 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
938 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
940 //if we have a delayed start, sleep here.
943 double value = strtod((*argv)[2], &endptr);
945 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
946 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
947 smpi_execute_flops(value);
949 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
950 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
951 smpi_execute_flops(0.0);
954 /* Actually run the replay */
955 simgrid::xbt::replay_runner(*argc, *argv);
957 /* and now, finalize everything */
958 /* One active process will stop. Decrease the counter*/
959 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
960 if (!get_reqq_self()->empty()){
961 unsigned int count_requests=get_reqq_self()->size();
962 MPI_Request requests[count_requests];
963 MPI_Status status[count_requests];
966 for (auto req: *get_reqq_self()){
970 simgrid::smpi::Request::waitall(count_requests, requests, status);
972 delete get_reqq_self();
975 if(active_processes==0){
976 /* Last process alive speaking: end the simulated timer */
977 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
978 xbt_free(sendbuffer);
979 xbt_free(recvbuffer);
982 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
983 extra_fin->type = TRACING_FINALIZE;
984 operation =bprintf("%s_finalize",__FUNCTION__);
985 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
987 smpi_process()->finalize();
989 TRACE_smpi_collective_out(rank, -1, operation);
990 TRACE_smpi_finalize(smpi_process()->index());
991 smpi_process()->destroy();