1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(smpi_process()->index());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({smpi_process()->index(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (not smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (not smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (not smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
90 static MPI_Datatype decode_datatype(const char *const action)
92 switch(atoi(action)) {
94 MPI_CURRENT_TYPE=MPI_DOUBLE;
97 MPI_CURRENT_TYPE=MPI_INT;
100 MPI_CURRENT_TYPE=MPI_CHAR;
103 MPI_CURRENT_TYPE=MPI_SHORT;
106 MPI_CURRENT_TYPE=MPI_LONG;
109 MPI_CURRENT_TYPE=MPI_FLOAT;
112 MPI_CURRENT_TYPE=MPI_BYTE;
115 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
118 return MPI_CURRENT_TYPE;
121 const char* encode_datatype(MPI_Datatype datatype, int* known)
123 //default type for output is set to MPI_BYTE
124 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
127 if (datatype==MPI_BYTE)
129 if(datatype==MPI_DOUBLE)
131 if(datatype==MPI_INT)
133 if(datatype==MPI_CHAR)
135 if(datatype==MPI_SHORT)
137 if(datatype==MPI_LONG)
139 if(datatype==MPI_FLOAT)
141 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
144 // default - not implemented.
145 // do not warn here as we pass in this function even for other trace formats
149 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
151 while(action[i]!=nullptr)\
154 THROWF(arg_error, 0, "%s replay failed.\n" \
155 "%d items were given on the line. First two should be process_id and action. " \
156 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
157 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
163 static void action_init(const char *const *action)
165 XBT_DEBUG("Initialize the counters");
166 CHECK_ACTION_PARAMS(action, 0, 1)
168 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
169 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
171 /* start a simulated timer */
172 smpi_process()->simulated_start();
173 /*initialize the number of active processes */
174 active_processes = smpi_process_count();
176 set_reqq_self(new std::vector<MPI_Request>);
179 static void action_finalize(const char *const *action)
184 static void action_comm_size(const char *const *action)
186 communicator_size = parse_double(action[2]);
187 log_timed_action (action, smpi_process()->simulated_elapsed());
190 static void action_comm_split(const char *const *action)
192 log_timed_action (action, smpi_process()->simulated_elapsed());
195 static void action_comm_dup(const char *const *action)
197 log_timed_action (action, smpi_process()->simulated_elapsed());
200 static void action_compute(const char *const *action)
202 CHECK_ACTION_PARAMS(action, 1, 0)
203 double clock = smpi_process()->simulated_elapsed();
204 double flops= parse_double(action[2]);
205 int rank = smpi_process()->index();
206 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
207 extra->type=TRACING_COMPUTING;
208 extra->comp_size=flops;
209 TRACE_smpi_computing_in(rank, extra);
211 smpi_execute_flops(flops);
213 TRACE_smpi_computing_out(rank);
214 log_timed_action (action, clock);
217 static void action_send(const char *const *action)
219 CHECK_ACTION_PARAMS(action, 2, 1)
220 int to = atoi(action[2]);
221 double size=parse_double(action[3]);
222 double clock = smpi_process()->simulated_elapsed();
225 MPI_CURRENT_TYPE=decode_datatype(action[4]);
227 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
229 int rank = smpi_process()->index();
231 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
232 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
233 extra->type = TRACING_SEND;
234 extra->send_size = size;
236 extra->dst = dst_traced;
237 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
238 TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
239 if (not TRACE_smpi_view_internals())
240 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
242 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
244 log_timed_action (action, clock);
246 TRACE_smpi_ptp_out(rank);
249 static void action_Isend(const char *const *action)
251 CHECK_ACTION_PARAMS(action, 2, 1)
252 int to = atoi(action[2]);
253 double size=parse_double(action[3]);
254 double clock = smpi_process()->simulated_elapsed();
257 MPI_CURRENT_TYPE=decode_datatype(action[4]);
259 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
261 int rank = smpi_process()->index();
262 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
263 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
264 extra->type = TRACING_ISEND;
265 extra->send_size = size;
267 extra->dst = dst_traced;
268 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
269 TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
270 if (not TRACE_smpi_view_internals())
271 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
273 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
275 TRACE_smpi_ptp_out(rank);
277 get_reqq_self()->push_back(request);
279 log_timed_action (action, clock);
282 static void action_recv(const char *const *action) {
283 CHECK_ACTION_PARAMS(action, 2, 1)
284 int from = atoi(action[2]);
285 double size=parse_double(action[3]);
286 double clock = smpi_process()->simulated_elapsed();
290 MPI_CURRENT_TYPE=decode_datatype(action[4]);
292 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
294 int rank = smpi_process()->index();
295 int src_traced = MPI_COMM_WORLD->group()->rank(from);
297 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
298 extra->type = TRACING_RECV;
299 extra->send_size = size;
300 extra->src = src_traced;
302 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
303 TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
305 //unknown size from the receiver point of view
307 Request::probe(from, 0, MPI_COMM_WORLD, &status);
311 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
313 TRACE_smpi_ptp_out(rank);
314 if (not TRACE_smpi_view_internals()) {
315 TRACE_smpi_recv(src_traced, rank, 0);
318 log_timed_action (action, clock);
321 static void action_Irecv(const char *const *action)
323 CHECK_ACTION_PARAMS(action, 2, 1)
324 int from = atoi(action[2]);
325 double size=parse_double(action[3]);
326 double clock = smpi_process()->simulated_elapsed();
329 MPI_CURRENT_TYPE=decode_datatype(action[4]);
331 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
333 int rank = smpi_process()->index();
334 int src_traced = MPI_COMM_WORLD->group()->rank(from);
335 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
336 extra->type = TRACING_IRECV;
337 extra->send_size = size;
338 extra->src = src_traced;
340 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
341 TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
343 //unknow size from the receiver pov
345 Request::probe(from, 0, MPI_COMM_WORLD, &status);
349 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
351 TRACE_smpi_ptp_out(rank);
352 get_reqq_self()->push_back(request);
354 log_timed_action (action, clock);
357 static void action_test(const char *const *action){
358 CHECK_ACTION_PARAMS(action, 0, 0)
359 double clock = smpi_process()->simulated_elapsed();
362 MPI_Request request = get_reqq_self()->back();
363 get_reqq_self()->pop_back();
364 //if request is null here, this may mean that a previous test has succeeded
365 //Different times in traced application and replayed version may lead to this
366 //In this case, ignore the extra calls.
367 if(request!=nullptr){
368 int rank = smpi_process()->index();
369 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
370 extra->type=TRACING_TEST;
371 TRACE_smpi_testing_in(rank, extra);
373 int flag = Request::test(&request, &status);
375 XBT_DEBUG("MPI_Test result: %d", flag);
376 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
377 get_reqq_self()->push_back(request);
379 TRACE_smpi_testing_out(rank);
381 log_timed_action (action, clock);
384 static void action_wait(const char *const *action){
385 CHECK_ACTION_PARAMS(action, 0, 0)
386 double clock = smpi_process()->simulated_elapsed();
389 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
390 xbt_str_join_array(action," "));
391 MPI_Request request = get_reqq_self()->back();
392 get_reqq_self()->pop_back();
394 if (request==nullptr){
395 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
399 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
401 MPI_Group group = request->comm()->group();
402 int src_traced = group->rank(request->src());
403 int dst_traced = group->rank(request->dst());
404 int is_wait_for_receive = (request->flags() & RECV);
405 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
406 extra->type = TRACING_WAIT;
407 TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
409 Request::wait(&request, &status);
411 TRACE_smpi_ptp_out(rank);
412 if (is_wait_for_receive)
413 TRACE_smpi_recv(src_traced, dst_traced, 0);
414 log_timed_action (action, clock);
417 static void action_waitall(const char *const *action){
418 CHECK_ACTION_PARAMS(action, 0, 0)
419 double clock = smpi_process()->simulated_elapsed();
420 const unsigned int count_requests = get_reqq_self()->size();
422 if (count_requests>0) {
423 MPI_Status status[count_requests];
425 int rank_traced = smpi_process()->index();
426 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
427 extra->type = TRACING_WAITALL;
428 extra->send_size=count_requests;
429 TRACE_smpi_ptp_in(rank_traced, __FUNCTION__,extra);
430 int recvs_snd[count_requests];
431 int recvs_rcv[count_requests];
432 for (unsigned int i = 0; i < count_requests; i++) {
433 const auto& req = (*get_reqq_self())[i];
434 if (req && (req->flags () & RECV)){
435 recvs_snd[i]=req->src();
436 recvs_rcv[i]=req->dst();
440 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
442 for (unsigned i = 0; i < count_requests; i++) {
443 if (recvs_snd[i]!=-100)
444 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
446 TRACE_smpi_ptp_out(rank_traced);
448 log_timed_action (action, clock);
451 static void action_barrier(const char *const *action){
452 double clock = smpi_process()->simulated_elapsed();
453 int rank = smpi_process()->index();
454 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
455 extra->type = TRACING_BARRIER;
456 TRACE_smpi_collective_in(rank, __FUNCTION__, extra);
458 Colls::barrier(MPI_COMM_WORLD);
460 TRACE_smpi_collective_out(rank);
461 log_timed_action (action, clock);
464 static void action_bcast(const char *const *action)
466 CHECK_ACTION_PARAMS(action, 1, 2)
467 double size = parse_double(action[2]);
468 double clock = smpi_process()->simulated_elapsed();
470 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
471 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
474 root= atoi(action[3]);
476 MPI_CURRENT_TYPE=decode_datatype(action[4]);
479 int rank = smpi_process()->index();
480 int root_traced = MPI_COMM_WORLD->group()->index(root);
482 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
483 extra->type = TRACING_BCAST;
484 extra->send_size = size;
485 extra->root = root_traced;
486 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
487 TRACE_smpi_collective_in(rank, __FUNCTION__, extra);
488 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
490 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
492 TRACE_smpi_collective_out(rank);
493 log_timed_action (action, clock);
496 static void action_reduce(const char *const *action)
498 CHECK_ACTION_PARAMS(action, 2, 2)
499 double comm_size = parse_double(action[2]);
500 double comp_size = parse_double(action[3]);
501 double clock = smpi_process()->simulated_elapsed();
503 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
506 root= atoi(action[4]);
508 MPI_CURRENT_TYPE=decode_datatype(action[5]);
511 int rank = smpi_process()->index();
512 int root_traced = MPI_COMM_WORLD->group()->rank(root);
513 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
514 extra->type = TRACING_REDUCE;
515 extra->send_size = comm_size;
516 extra->comp_size = comp_size;
517 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
518 extra->root = root_traced;
520 TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
522 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
523 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
524 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
525 smpi_execute_flops(comp_size);
527 TRACE_smpi_collective_out(rank);
528 log_timed_action (action, clock);
531 static void action_allReduce(const char *const *action) {
532 CHECK_ACTION_PARAMS(action, 2, 1)
533 double comm_size = parse_double(action[2]);
534 double comp_size = parse_double(action[3]);
537 MPI_CURRENT_TYPE=decode_datatype(action[4]);
539 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
541 double clock = smpi_process()->simulated_elapsed();
542 int rank = smpi_process()->index();
543 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
544 extra->type = TRACING_ALLREDUCE;
545 extra->send_size = comm_size;
546 extra->comp_size = comp_size;
547 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
548 TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
550 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
551 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
552 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
553 smpi_execute_flops(comp_size);
555 TRACE_smpi_collective_out(rank);
556 log_timed_action (action, clock);
559 static void action_allToAll(const char *const *action) {
560 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
561 double clock = smpi_process()->simulated_elapsed();
562 int comm_size = MPI_COMM_WORLD->size();
563 int send_size = parse_double(action[2]);
564 int recv_size = parse_double(action[3]);
565 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
567 if(action[4] && action[5]) {
568 MPI_CURRENT_TYPE=decode_datatype(action[4]);
569 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
572 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
574 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
575 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
577 int rank = smpi_process()->index();
578 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
579 extra->type = TRACING_ALLTOALL;
580 extra->send_size = send_size;
581 extra->recv_size = recv_size;
582 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
583 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
585 TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
587 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
589 TRACE_smpi_collective_out(rank);
590 log_timed_action (action, clock);
593 static void action_gather(const char *const *action) {
594 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
597 1) 68 is the sendcounts
598 2) 68 is the recvcounts
599 3) 0 is the root node
600 4) 0 is the send datatype id, see decode_datatype()
601 5) 0 is the recv datatype id, see decode_datatype()
603 CHECK_ACTION_PARAMS(action, 2, 3)
604 double clock = smpi_process()->simulated_elapsed();
605 int comm_size = MPI_COMM_WORLD->size();
606 int send_size = parse_double(action[2]);
607 int recv_size = parse_double(action[3]);
608 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
609 if(action[4] && action[5]) {
610 MPI_CURRENT_TYPE=decode_datatype(action[5]);
611 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
613 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
615 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
616 void *recv = nullptr;
619 root=atoi(action[4]);
620 int rank = MPI_COMM_WORLD->rank();
623 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
625 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
626 extra->type = TRACING_GATHER;
627 extra->send_size = send_size;
628 extra->recv_size = recv_size;
630 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
631 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
633 TRACE_smpi_collective_in(smpi_process()->index(), __FUNCTION__, extra);
635 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
637 TRACE_smpi_collective_out(smpi_process()->index());
638 log_timed_action (action, clock);
641 static void action_gatherv(const char *const *action) {
642 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
643 0 gather 68 68 10 10 10 0 0 0
645 1) 68 is the sendcount
646 2) 68 10 10 10 is the recvcounts
647 3) 0 is the root node
648 4) 0 is the send datatype id, see decode_datatype()
649 5) 0 is the recv datatype id, see decode_datatype()
651 double clock = smpi_process()->simulated_elapsed();
652 int comm_size = MPI_COMM_WORLD->size();
653 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
654 int send_size = parse_double(action[2]);
655 int disps[comm_size];
656 int recvcounts[comm_size];
659 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
660 if(action[4+comm_size] && action[5+comm_size]) {
661 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
662 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
664 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
666 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
667 void *recv = nullptr;
668 for(int i=0;i<comm_size;i++) {
669 recvcounts[i] = atoi(action[i+3]);
670 recv_sum=recv_sum+recvcounts[i];
674 int root=atoi(action[3+comm_size]);
675 int rank = MPI_COMM_WORLD->rank();
678 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
680 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
681 extra->type = TRACING_GATHERV;
682 extra->send_size = send_size;
683 extra->recvcounts= xbt_new(int,comm_size);
684 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
685 extra->recvcounts[i] = recvcounts[i];
687 extra->num_processes = comm_size;
688 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
689 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
691 TRACE_smpi_collective_in(smpi_process()->index(), __FUNCTION__, extra);
693 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
695 TRACE_smpi_collective_out(smpi_process()->index());
696 log_timed_action (action, clock);
699 static void action_reducescatter(const char *const *action) {
700 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
701 0 reduceScatter 275427 275427 275427 204020 11346849 0
703 1) The first four values after the name of the action declare the recvcounts array
704 2) The value 11346849 is the amount of instructions
705 3) The last value corresponds to the datatype, see decode_datatype().
707 double clock = smpi_process()->simulated_elapsed();
708 int comm_size = MPI_COMM_WORLD->size();
709 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
710 int comp_size = parse_double(action[2+comm_size]);
711 int recvcounts[comm_size];
712 int rank = smpi_process()->index();
714 if(action[3+comm_size])
715 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
717 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
719 for(int i=0;i<comm_size;i++) {
720 recvcounts[i] = atoi(action[i+2]);
724 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
725 extra->type = TRACING_REDUCE_SCATTER;
726 extra->send_size = 0;
727 extra->recvcounts= xbt_new(int, comm_size);
728 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
729 extra->recvcounts[i] = recvcounts[i];
730 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
731 extra->comp_size = comp_size;
732 extra->num_processes = comm_size;
734 TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
736 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
737 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
739 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
740 smpi_execute_flops(comp_size);
742 TRACE_smpi_collective_out(rank);
743 log_timed_action (action, clock);
746 static void action_allgather(const char *const *action) {
747 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
748 0 allGather 275427 275427
750 1) 275427 is the sendcount
751 2) 275427 is the recvcount
752 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
754 double clock = smpi_process()->simulated_elapsed();
756 CHECK_ACTION_PARAMS(action, 2, 2)
757 int sendcount=atoi(action[2]);
758 int recvcount=atoi(action[3]);
760 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
762 if(action[4] && action[5]) {
763 MPI_CURRENT_TYPE = decode_datatype(action[4]);
764 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
766 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
768 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
769 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
771 int rank = smpi_process()->index();
772 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
773 extra->type = TRACING_ALLGATHER;
774 extra->send_size = sendcount;
775 extra->recv_size= recvcount;
776 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
777 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
778 extra->num_processes = MPI_COMM_WORLD->size();
780 TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
782 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
784 TRACE_smpi_collective_out(rank);
785 log_timed_action (action, clock);
788 static void action_allgatherv(const char *const *action) {
789 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
790 0 allGatherV 275427 275427 275427 275427 204020
792 1) 275427 is the sendcount
793 2) The next four elements declare the recvcounts array
794 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
796 double clock = smpi_process()->simulated_elapsed();
798 int comm_size = MPI_COMM_WORLD->size();
799 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
800 int sendcount=atoi(action[2]);
801 int recvcounts[comm_size];
802 int disps[comm_size];
804 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
806 if(action[3+comm_size] && action[4+comm_size]) {
807 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
808 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
810 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
812 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
814 for(int i=0;i<comm_size;i++) {
815 recvcounts[i] = atoi(action[i+3]);
816 recv_sum=recv_sum+recvcounts[i];
819 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
821 int rank = smpi_process()->index();
822 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
823 extra->type = TRACING_ALLGATHERV;
824 extra->send_size = sendcount;
825 extra->recvcounts= xbt_new(int, comm_size);
826 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
827 extra->recvcounts[i] = recvcounts[i];
828 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
829 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
830 extra->num_processes = comm_size;
832 TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
834 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
837 TRACE_smpi_collective_out(rank);
838 log_timed_action (action, clock);
841 static void action_allToAllv(const char *const *action) {
842 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
843 0 allToAllV 100 1 7 10 12 100 1 70 10 5
845 1) 100 is the size of the send buffer *sizeof(int),
846 2) 1 7 10 12 is the sendcounts array
847 3) 100*sizeof(int) is the size of the receiver buffer
848 4) 1 70 10 5 is the recvcounts array
850 double clock = smpi_process()->simulated_elapsed();
852 int comm_size = MPI_COMM_WORLD->size();
853 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
854 int sendcounts[comm_size];
855 int recvcounts[comm_size];
856 int senddisps[comm_size];
857 int recvdisps[comm_size];
859 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
861 int send_buf_size=parse_double(action[2]);
862 int recv_buf_size=parse_double(action[3+comm_size]);
863 if(action[4+2*comm_size] && action[5+2*comm_size]) {
864 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
865 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
868 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
870 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
871 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
873 for(int i=0;i<comm_size;i++) {
874 sendcounts[i] = atoi(action[i+3]);
875 recvcounts[i] = atoi(action[i+4+comm_size]);
880 int rank = smpi_process()->index();
881 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
882 extra->type = TRACING_ALLTOALLV;
883 extra->recvcounts= xbt_new(int, comm_size);
884 extra->sendcounts= xbt_new(int, comm_size);
885 extra->num_processes = comm_size;
887 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
888 extra->send_size += sendcounts[i];
889 extra->sendcounts[i] = sendcounts[i];
890 extra->recv_size += recvcounts[i];
891 extra->recvcounts[i] = recvcounts[i];
893 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
894 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
896 TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
898 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
899 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
901 TRACE_smpi_collective_out(rank);
902 log_timed_action (action, clock);
905 }} // namespace simgrid::smpi
907 /** @brief Only initialize the replay, don't do it for real */
908 void smpi_replay_init(int* argc, char*** argv)
910 simgrid::smpi::Process::init(argc, argv);
911 smpi_process()->mark_as_initialized();
912 smpi_process()->set_replaying(true);
914 int rank = smpi_process()->index();
915 TRACE_smpi_init(rank);
916 TRACE_smpi_computing_init(rank);
917 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
918 extra->type = TRACING_INIT;
919 TRACE_smpi_collective_in(rank, "smpi_replay_run_init", extra);
920 TRACE_smpi_collective_out(rank);
921 xbt_replay_action_register("init", simgrid::smpi::action_init);
922 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
923 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
924 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
925 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
926 xbt_replay_action_register("send", simgrid::smpi::action_send);
927 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
928 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
929 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
930 xbt_replay_action_register("test", simgrid::smpi::action_test);
931 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
932 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
933 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
934 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
935 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
936 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
937 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
938 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
939 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
940 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
941 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
942 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
943 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
944 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
946 //if we have a delayed start, sleep here.
948 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
949 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
950 smpi_execute_flops(value);
952 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
953 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
954 smpi_execute_flops(0.0);
958 /** @brief actually run the replay after initialization */
959 void smpi_replay_main(int* argc, char*** argv)
961 simgrid::xbt::replay_runner(*argc, *argv);
963 /* and now, finalize everything */
964 /* One active process will stop. Decrease the counter*/
965 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
966 if (not get_reqq_self()->empty()) {
967 unsigned int count_requests=get_reqq_self()->size();
968 MPI_Request requests[count_requests];
969 MPI_Status status[count_requests];
972 for (auto const& req : *get_reqq_self()) {
976 simgrid::smpi::Request::waitall(count_requests, requests, status);
978 delete get_reqq_self();
981 if(active_processes==0){
982 /* Last process alive speaking: end the simulated timer */
983 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
984 xbt_free(sendbuffer);
985 xbt_free(recvbuffer);
988 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
989 extra_fin->type = TRACING_FINALIZE;
990 TRACE_smpi_collective_in(smpi_process()->index(), "smpi_replay_run_finalize", extra_fin);
992 smpi_process()->finalize();
994 TRACE_smpi_collective_out(smpi_process()->index());
995 TRACE_smpi_finalize(smpi_process()->index());
998 /** @brief chain a replay initialization and a replay start */
999 void smpi_replay_run(int* argc, char*** argv)
1001 smpi_replay_init(argc, argv);
1002 smpi_replay_main(argc, argv);