1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "xbt/replay.hpp"
8 #include <unordered_map>
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15 int communicator_size = 0;
16 static int active_processes = 0;
17 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
22 static int sendbuffer_size=0;
23 char* sendbuffer=nullptr;
24 static int recvbuffer_size=0;
25 char* recvbuffer=nullptr;
27 static void log_timed_action (const char *const *action, double clock){
28 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
29 char *name = xbt_str_join_array(action, " ");
30 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
35 static std::vector<MPI_Request>* get_reqq_self()
37 return reqq.at(smpi_process()->index());
40 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
42 reqq.insert({smpi_process()->index(), mpi_request});
45 //allocate a single buffer for all sends, growing it if needed
46 void* smpi_get_tmp_sendbuffer(int size)
48 if (!smpi_process()->replaying())
49 return xbt_malloc(size);
50 if (sendbuffer_size<size){
51 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
57 //allocate a single buffer for all recv
58 void* smpi_get_tmp_recvbuffer(int size){
59 if (!smpi_process()->replaying())
60 return xbt_malloc(size);
61 if (recvbuffer_size<size){
62 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
68 void smpi_free_tmp_buffer(void* buf){
69 if (!smpi_process()->replaying())
74 static double parse_double(const char *string)
77 double value = strtod(string, &endptr);
79 THROWF(unknown_error, 0, "%s is not a double", string);
83 static MPI_Datatype decode_datatype(const char *const action)
85 switch(atoi(action)) {
87 MPI_CURRENT_TYPE=MPI_DOUBLE;
90 MPI_CURRENT_TYPE=MPI_INT;
93 MPI_CURRENT_TYPE=MPI_CHAR;
96 MPI_CURRENT_TYPE=MPI_SHORT;
99 MPI_CURRENT_TYPE=MPI_LONG;
102 MPI_CURRENT_TYPE=MPI_FLOAT;
105 MPI_CURRENT_TYPE=MPI_BYTE;
108 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
111 return MPI_CURRENT_TYPE;
114 const char* encode_datatype(MPI_Datatype datatype, int* known)
116 //default type for output is set to MPI_BYTE
117 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
120 if (datatype==MPI_BYTE)
122 if(datatype==MPI_DOUBLE)
124 if(datatype==MPI_INT)
126 if(datatype==MPI_CHAR)
128 if(datatype==MPI_SHORT)
130 if(datatype==MPI_LONG)
132 if(datatype==MPI_FLOAT)
134 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
137 // default - not implemented.
138 // do not warn here as we pass in this function even for other trace formats
142 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
144 while(action[i]!=nullptr)\
147 THROWF(arg_error, 0, "%s replay failed.\n" \
148 "%d items were given on the line. First two should be process_id and action. " \
149 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
150 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
156 static void action_init(const char *const *action)
158 XBT_DEBUG("Initialize the counters");
159 CHECK_ACTION_PARAMS(action, 0, 1)
161 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
162 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
164 /* start a simulated timer */
165 smpi_process()->simulated_start();
166 /*initialize the number of active processes */
167 active_processes = smpi_process_count();
169 set_reqq_self(new std::vector<MPI_Request>);
172 static void action_finalize(const char *const *action)
177 static void action_comm_size(const char *const *action)
179 communicator_size = parse_double(action[2]);
180 log_timed_action (action, smpi_process()->simulated_elapsed());
183 static void action_comm_split(const char *const *action)
185 log_timed_action (action, smpi_process()->simulated_elapsed());
188 static void action_comm_dup(const char *const *action)
190 log_timed_action (action, smpi_process()->simulated_elapsed());
193 static void action_compute(const char *const *action)
195 CHECK_ACTION_PARAMS(action, 1, 0)
196 double clock = smpi_process()->simulated_elapsed();
197 double flops= parse_double(action[2]);
198 int rank = smpi_process()->index();
199 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
200 extra->type=TRACING_COMPUTING;
201 extra->comp_size=flops;
202 TRACE_smpi_computing_in(rank, extra);
204 smpi_execute_flops(flops);
206 TRACE_smpi_computing_out(rank);
207 log_timed_action (action, clock);
210 static void action_send(const char *const *action)
212 CHECK_ACTION_PARAMS(action, 2, 1)
213 int to = atoi(action[2]);
214 double size=parse_double(action[3]);
215 double clock = smpi_process()->simulated_elapsed();
218 MPI_CURRENT_TYPE=decode_datatype(action[4]);
220 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
222 int rank = smpi_process()->index();
224 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
225 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
226 extra->type = TRACING_SEND;
227 extra->send_size = size;
229 extra->dst = dst_traced;
230 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
231 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
232 if (!TRACE_smpi_view_internals())
233 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
235 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
237 log_timed_action (action, clock);
239 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
242 static void action_Isend(const char *const *action)
244 CHECK_ACTION_PARAMS(action, 2, 1)
245 int to = atoi(action[2]);
246 double size=parse_double(action[3]);
247 double clock = smpi_process()->simulated_elapsed();
250 MPI_CURRENT_TYPE=decode_datatype(action[4]);
252 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
254 int rank = smpi_process()->index();
255 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
256 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
257 extra->type = TRACING_ISEND;
258 extra->send_size = size;
260 extra->dst = dst_traced;
261 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
262 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
263 if (!TRACE_smpi_view_internals())
264 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
266 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
268 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
270 get_reqq_self()->push_back(request);
272 log_timed_action (action, clock);
275 static void action_recv(const char *const *action) {
276 CHECK_ACTION_PARAMS(action, 2, 1)
277 int from = atoi(action[2]);
278 double size=parse_double(action[3]);
279 double clock = smpi_process()->simulated_elapsed();
283 MPI_CURRENT_TYPE=decode_datatype(action[4]);
285 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
287 int rank = smpi_process()->index();
288 int src_traced = MPI_COMM_WORLD->group()->rank(from);
290 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291 extra->type = TRACING_RECV;
292 extra->send_size = size;
293 extra->src = src_traced;
295 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
296 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
298 //unknown size from the receiver point of view
300 Request::probe(from, 0, MPI_COMM_WORLD, &status);
304 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
306 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
307 if (!TRACE_smpi_view_internals()) {
308 TRACE_smpi_recv(rank, src_traced, rank, 0);
311 log_timed_action (action, clock);
314 static void action_Irecv(const char *const *action)
316 CHECK_ACTION_PARAMS(action, 2, 1)
317 int from = atoi(action[2]);
318 double size=parse_double(action[3]);
319 double clock = smpi_process()->simulated_elapsed();
322 MPI_CURRENT_TYPE=decode_datatype(action[4]);
324 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
326 int rank = smpi_process()->index();
327 int src_traced = MPI_COMM_WORLD->group()->rank(from);
328 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
329 extra->type = TRACING_IRECV;
330 extra->send_size = size;
331 extra->src = src_traced;
333 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
334 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
336 //unknow size from the receiver pov
338 Request::probe(from, 0, MPI_COMM_WORLD, &status);
342 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
344 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
345 get_reqq_self()->push_back(request);
347 log_timed_action (action, clock);
350 static void action_test(const char *const *action){
351 CHECK_ACTION_PARAMS(action, 0, 0)
352 double clock = smpi_process()->simulated_elapsed();
355 MPI_Request request = get_reqq_self()->back();
356 get_reqq_self()->pop_back();
357 //if request is null here, this may mean that a previous test has succeeded
358 //Different times in traced application and replayed version may lead to this
359 //In this case, ignore the extra calls.
360 if(request!=nullptr){
361 int rank = smpi_process()->index();
362 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
363 extra->type=TRACING_TEST;
364 TRACE_smpi_testing_in(rank, extra);
366 int flag = Request::test(&request, &status);
368 XBT_DEBUG("MPI_Test result: %d", flag);
369 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
370 get_reqq_self()->push_back(request);
372 TRACE_smpi_testing_out(rank);
374 log_timed_action (action, clock);
377 static void action_wait(const char *const *action){
378 CHECK_ACTION_PARAMS(action, 0, 0)
379 double clock = smpi_process()->simulated_elapsed();
382 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
383 xbt_str_join_array(action," "));
384 MPI_Request request = get_reqq_self()->back();
385 get_reqq_self()->pop_back();
387 if (request==nullptr){
388 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
392 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
394 MPI_Group group = request->comm()->group();
395 int src_traced = group->rank(request->src());
396 int dst_traced = group->rank(request->dst());
397 int is_wait_for_receive = (request->flags() & RECV);
398 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
399 extra->type = TRACING_WAIT;
400 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
402 Request::wait(&request, &status);
404 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
405 if (is_wait_for_receive)
406 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
407 log_timed_action (action, clock);
410 static void action_waitall(const char *const *action){
411 CHECK_ACTION_PARAMS(action, 0, 0)
412 double clock = smpi_process()->simulated_elapsed();
413 unsigned int count_requests=get_reqq_self()->size();
415 if (count_requests>0) {
416 MPI_Status status[count_requests];
418 int rank_traced = smpi_process()->index();
419 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
420 extra->type = TRACING_WAITALL;
421 extra->send_size=count_requests;
422 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
423 int recvs_snd[count_requests];
424 int recvs_rcv[count_requests];
426 for (auto req : *(get_reqq_self())){
427 if (req && (req->flags () & RECV)){
428 recvs_snd[i]=req->src();
429 recvs_rcv[i]=req->dst();
434 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
436 for (i=0; i<count_requests;i++){
437 if (recvs_snd[i]!=-100)
438 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
440 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
442 log_timed_action (action, clock);
445 static void action_barrier(const char *const *action){
446 double clock = smpi_process()->simulated_elapsed();
447 int rank = smpi_process()->index();
448 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
449 extra->type = TRACING_BARRIER;
450 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
452 Colls::barrier(MPI_COMM_WORLD);
454 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
455 log_timed_action (action, clock);
458 static void action_bcast(const char *const *action)
460 CHECK_ACTION_PARAMS(action, 1, 2)
461 double size = parse_double(action[2]);
462 double clock = smpi_process()->simulated_elapsed();
464 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
465 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
468 root= atoi(action[3]);
470 MPI_CURRENT_TYPE=decode_datatype(action[4]);
473 int rank = smpi_process()->index();
474 int root_traced = MPI_COMM_WORLD->group()->index(root);
476 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
477 extra->type = TRACING_BCAST;
478 extra->send_size = size;
479 extra->root = root_traced;
480 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
481 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
482 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
484 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
486 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
487 log_timed_action (action, clock);
490 static void action_reduce(const char *const *action)
492 CHECK_ACTION_PARAMS(action, 2, 2)
493 double comm_size = parse_double(action[2]);
494 double comp_size = parse_double(action[3]);
495 double clock = smpi_process()->simulated_elapsed();
497 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
500 root= atoi(action[4]);
502 MPI_CURRENT_TYPE=decode_datatype(action[5]);
505 int rank = smpi_process()->index();
506 int root_traced = MPI_COMM_WORLD->group()->rank(root);
507 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
508 extra->type = TRACING_REDUCE;
509 extra->send_size = comm_size;
510 extra->comp_size = comp_size;
511 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
512 extra->root = root_traced;
514 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
516 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
517 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
518 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
519 smpi_execute_flops(comp_size);
521 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
522 log_timed_action (action, clock);
525 static void action_allReduce(const char *const *action) {
526 CHECK_ACTION_PARAMS(action, 2, 1)
527 double comm_size = parse_double(action[2]);
528 double comp_size = parse_double(action[3]);
531 MPI_CURRENT_TYPE=decode_datatype(action[4]);
533 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
535 double clock = smpi_process()->simulated_elapsed();
536 int rank = smpi_process()->index();
537 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
538 extra->type = TRACING_ALLREDUCE;
539 extra->send_size = comm_size;
540 extra->comp_size = comp_size;
541 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
542 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
544 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
545 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
546 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
547 smpi_execute_flops(comp_size);
549 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
550 log_timed_action (action, clock);
553 static void action_allToAll(const char *const *action) {
554 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
555 double clock = smpi_process()->simulated_elapsed();
556 int comm_size = MPI_COMM_WORLD->size();
557 int send_size = parse_double(action[2]);
558 int recv_size = parse_double(action[3]);
559 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
561 if(action[4] && action[5]) {
562 MPI_CURRENT_TYPE=decode_datatype(action[4]);
563 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
566 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
568 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
569 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
571 int rank = smpi_process()->index();
572 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573 extra->type = TRACING_ALLTOALL;
574 extra->send_size = send_size;
575 extra->recv_size = recv_size;
576 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
577 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
579 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
581 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
583 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
584 log_timed_action (action, clock);
587 static void action_gather(const char *const *action) {
588 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
591 1) 68 is the sendcounts
592 2) 68 is the recvcounts
593 3) 0 is the root node
594 4) 0 is the send datatype id, see decode_datatype()
595 5) 0 is the recv datatype id, see decode_datatype()
597 CHECK_ACTION_PARAMS(action, 2, 3)
598 double clock = smpi_process()->simulated_elapsed();
599 int comm_size = MPI_COMM_WORLD->size();
600 int send_size = parse_double(action[2]);
601 int recv_size = parse_double(action[3]);
602 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
603 if(action[4] && action[5]) {
604 MPI_CURRENT_TYPE=decode_datatype(action[5]);
605 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
607 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
609 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
610 void *recv = nullptr;
613 root=atoi(action[4]);
614 int rank = MPI_COMM_WORLD->rank();
617 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
619 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
620 extra->type = TRACING_GATHER;
621 extra->send_size = send_size;
622 extra->recv_size = recv_size;
624 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
625 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
627 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
629 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
631 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
632 log_timed_action (action, clock);
635 static void action_gatherv(const char *const *action) {
636 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
637 0 gather 68 68 10 10 10 0 0 0
639 1) 68 is the sendcount
640 2) 68 10 10 10 is the recvcounts
641 3) 0 is the root node
642 4) 0 is the send datatype id, see decode_datatype()
643 5) 0 is the recv datatype id, see decode_datatype()
645 double clock = smpi_process()->simulated_elapsed();
646 int comm_size = MPI_COMM_WORLD->size();
647 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
648 int send_size = parse_double(action[2]);
649 int disps[comm_size];
650 int recvcounts[comm_size];
653 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
654 if(action[4+comm_size] && action[5+comm_size]) {
655 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
656 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
658 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
660 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
661 void *recv = nullptr;
662 for(int i=0;i<comm_size;i++) {
663 recvcounts[i] = atoi(action[i+3]);
664 recv_sum=recv_sum+recvcounts[i];
668 int root=atoi(action[3+comm_size]);
669 int rank = MPI_COMM_WORLD->rank();
672 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
674 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
675 extra->type = TRACING_GATHERV;
676 extra->send_size = send_size;
677 extra->recvcounts= xbt_new(int,comm_size);
678 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
679 extra->recvcounts[i] = recvcounts[i];
681 extra->num_processes = comm_size;
682 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
683 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
685 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
687 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
689 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
690 log_timed_action (action, clock);
693 static void action_reducescatter(const char *const *action) {
694 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
695 0 reduceScatter 275427 275427 275427 204020 11346849 0
697 1) The first four values after the name of the action declare the recvcounts array
698 2) The value 11346849 is the amount of instructions
699 3) The last value corresponds to the datatype, see decode_datatype().
701 double clock = smpi_process()->simulated_elapsed();
702 int comm_size = MPI_COMM_WORLD->size();
703 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
704 int comp_size = parse_double(action[2+comm_size]);
705 int recvcounts[comm_size];
706 int rank = smpi_process()->index();
708 if(action[3+comm_size])
709 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
711 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
713 for(int i=0;i<comm_size;i++) {
714 recvcounts[i] = atoi(action[i+2]);
718 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
719 extra->type = TRACING_REDUCE_SCATTER;
720 extra->send_size = 0;
721 extra->recvcounts= xbt_new(int, comm_size);
722 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
723 extra->recvcounts[i] = recvcounts[i];
724 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
725 extra->comp_size = comp_size;
726 extra->num_processes = comm_size;
728 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
730 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
731 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
733 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
734 smpi_execute_flops(comp_size);
736 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
737 log_timed_action (action, clock);
740 static void action_allgather(const char *const *action) {
741 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
742 0 allGather 275427 275427
744 1) 275427 is the sendcount
745 2) 275427 is the recvcount
746 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
748 double clock = smpi_process()->simulated_elapsed();
750 CHECK_ACTION_PARAMS(action, 2, 2)
751 int sendcount=atoi(action[2]);
752 int recvcount=atoi(action[3]);
754 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
756 if(action[4] && action[5]) {
757 MPI_CURRENT_TYPE = decode_datatype(action[4]);
758 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
760 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
762 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
763 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
765 int rank = smpi_process()->index();
766 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
767 extra->type = TRACING_ALLGATHER;
768 extra->send_size = sendcount;
769 extra->recv_size= recvcount;
770 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
771 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
772 extra->num_processes = MPI_COMM_WORLD->size();
774 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
776 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
778 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
779 log_timed_action (action, clock);
782 static void action_allgatherv(const char *const *action) {
783 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
784 0 allGatherV 275427 275427 275427 275427 204020
786 1) 275427 is the sendcount
787 2) The next four elements declare the recvcounts array
788 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
790 double clock = smpi_process()->simulated_elapsed();
792 int comm_size = MPI_COMM_WORLD->size();
793 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
794 int sendcount=atoi(action[2]);
795 int recvcounts[comm_size];
796 int disps[comm_size];
798 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
800 if(action[3+comm_size] && action[4+comm_size]) {
801 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
802 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
804 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
806 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
808 for(int i=0;i<comm_size;i++) {
809 recvcounts[i] = atoi(action[i+3]);
810 recv_sum=recv_sum+recvcounts[i];
813 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
815 int rank = smpi_process()->index();
816 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
817 extra->type = TRACING_ALLGATHERV;
818 extra->send_size = sendcount;
819 extra->recvcounts= xbt_new(int, comm_size);
820 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
821 extra->recvcounts[i] = recvcounts[i];
822 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
823 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
824 extra->num_processes = comm_size;
826 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
828 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
831 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
832 log_timed_action (action, clock);
835 static void action_allToAllv(const char *const *action) {
836 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
837 0 allToAllV 100 1 7 10 12 100 1 70 10 5
839 1) 100 is the size of the send buffer *sizeof(int),
840 2) 1 7 10 12 is the sendcounts array
841 3) 100*sizeof(int) is the size of the receiver buffer
842 4) 1 70 10 5 is the recvcounts array
844 double clock = smpi_process()->simulated_elapsed();
846 int comm_size = MPI_COMM_WORLD->size();
847 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
848 int sendcounts[comm_size];
849 int recvcounts[comm_size];
850 int senddisps[comm_size];
851 int recvdisps[comm_size];
853 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
855 int send_buf_size=parse_double(action[2]);
856 int recv_buf_size=parse_double(action[3+comm_size]);
857 if(action[4+2*comm_size] && action[5+2*comm_size]) {
858 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
859 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
862 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
864 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
865 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
867 for(int i=0;i<comm_size;i++) {
868 sendcounts[i] = atoi(action[i+3]);
869 recvcounts[i] = atoi(action[i+4+comm_size]);
874 int rank = smpi_process()->index();
875 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
876 extra->type = TRACING_ALLTOALLV;
877 extra->recvcounts= xbt_new(int, comm_size);
878 extra->sendcounts= xbt_new(int, comm_size);
879 extra->num_processes = comm_size;
881 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
882 extra->send_size += sendcounts[i];
883 extra->sendcounts[i] = sendcounts[i];
884 extra->recv_size += recvcounts[i];
885 extra->recvcounts[i] = recvcounts[i];
887 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
888 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
890 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
892 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
893 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
895 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
896 log_timed_action (action, clock);
899 }} // namespace simgrid::smpi
901 void smpi_replay_run(int *argc, char***argv){
902 /* First initializes everything */
903 simgrid::smpi::Process::init(argc, argv);
904 smpi_process()->mark_as_initialized();
905 smpi_process()->set_replaying(true);
907 int rank = smpi_process()->index();
908 TRACE_smpi_init(rank);
909 TRACE_smpi_computing_init(rank);
910 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
911 extra->type = TRACING_INIT;
912 char *operation =bprintf("%s_init",__FUNCTION__);
913 TRACE_smpi_collective_in(rank, -1, operation, extra);
914 TRACE_smpi_collective_out(rank, -1, operation);
916 xbt_replay_action_register("init", simgrid::smpi::action_init);
917 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
918 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
919 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
920 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
921 xbt_replay_action_register("send", simgrid::smpi::action_send);
922 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
923 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
924 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
925 xbt_replay_action_register("test", simgrid::smpi::action_test);
926 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
927 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
928 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
929 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
930 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
931 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
932 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
933 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
934 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
935 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
936 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
937 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
938 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
939 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
941 //if we have a delayed start, sleep here.
944 double value = strtod((*argv)[2], &endptr);
946 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
947 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
948 smpi_execute_flops(value);
950 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
951 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
952 smpi_execute_flops(0.0);
955 /* Actually run the replay */
956 simgrid::xbt::replay_runner(*argc, *argv);
958 /* and now, finalize everything */
959 /* One active process will stop. Decrease the counter*/
960 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
961 if (!get_reqq_self()->empty()){
962 unsigned int count_requests=get_reqq_self()->size();
963 MPI_Request requests[count_requests];
964 MPI_Status status[count_requests];
967 for (auto req: *get_reqq_self()){
971 simgrid::smpi::Request::waitall(count_requests, requests, status);
973 delete get_reqq_self();
976 if(active_processes==0){
977 /* Last process alive speaking: end the simulated timer */
978 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
979 xbt_free(sendbuffer);
980 xbt_free(recvbuffer);
983 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
984 extra_fin->type = TRACING_FINALIZE;
985 operation =bprintf("%s_finalize",__FUNCTION__);
986 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
988 smpi_process()->finalize();
990 TRACE_smpi_collective_out(rank, -1, operation);
991 TRACE_smpi_finalize(smpi_process()->index());