1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/smpi/smpi_coll.hpp"
7 #include "src/smpi/smpi_comm.hpp"
8 #include "src/smpi/smpi_datatype.hpp"
9 #include "src/smpi/smpi_group.hpp"
10 #include "src/smpi/smpi_process.hpp"
11 #include "src/smpi/smpi_request.hpp"
12 #include "xbt/replay.hpp"
14 #include <unordered_map>
17 #define KEY_SIZE (sizeof(int) * 2 + 1)
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21 int communicator_size = 0;
22 static int active_processes = 0;
23 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
25 MPI_Datatype MPI_DEFAULT_TYPE;
26 MPI_Datatype MPI_CURRENT_TYPE;
28 static int sendbuffer_size=0;
29 char* sendbuffer=nullptr;
30 static int recvbuffer_size=0;
31 char* recvbuffer=nullptr;
33 static void log_timed_action (const char *const *action, double clock){
34 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
35 char *name = xbt_str_join_array(action, " ");
36 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
41 static std::vector<MPI_Request>* get_reqq_self()
43 return reqq.at(smpi_process()->index());
46 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 reqq.insert({smpi_process()->index(), mpi_request});
51 //allocate a single buffer for all sends, growing it if needed
52 void* smpi_get_tmp_sendbuffer(int size)
54 if (!smpi_process()->replaying())
55 return xbt_malloc(size);
56 if (sendbuffer_size<size){
57 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
63 //allocate a single buffer for all recv
64 void* smpi_get_tmp_recvbuffer(int size){
65 if (!smpi_process()->replaying())
66 return xbt_malloc(size);
67 if (recvbuffer_size<size){
68 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
74 void smpi_free_tmp_buffer(void* buf){
75 if (!smpi_process()->replaying())
80 static double parse_double(const char *string)
83 double value = strtod(string, &endptr);
85 THROWF(unknown_error, 0, "%s is not a double", string);
89 static MPI_Datatype decode_datatype(const char *const action)
91 switch(atoi(action)) {
93 MPI_CURRENT_TYPE=MPI_DOUBLE;
96 MPI_CURRENT_TYPE=MPI_INT;
99 MPI_CURRENT_TYPE=MPI_CHAR;
102 MPI_CURRENT_TYPE=MPI_SHORT;
105 MPI_CURRENT_TYPE=MPI_LONG;
108 MPI_CURRENT_TYPE=MPI_FLOAT;
111 MPI_CURRENT_TYPE=MPI_BYTE;
114 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
116 return MPI_CURRENT_TYPE;
119 const char* encode_datatype(MPI_Datatype datatype, int* known)
121 //default type for output is set to MPI_BYTE
122 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
125 if (datatype==MPI_BYTE)
127 if(datatype==MPI_DOUBLE)
129 if(datatype==MPI_INT)
131 if(datatype==MPI_CHAR)
133 if(datatype==MPI_SHORT)
135 if(datatype==MPI_LONG)
137 if(datatype==MPI_FLOAT)
139 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
142 // default - not implemented.
143 // do not warn here as we pass in this function even for other trace formats
147 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
149 while(action[i]!=nullptr)\
152 THROWF(arg_error, 0, "%s replay failed.\n" \
153 "%d items were given on the line. First two should be process_id and action. " \
154 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
155 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
161 static void action_init(const char *const *action)
163 XBT_DEBUG("Initialize the counters");
164 CHECK_ACTION_PARAMS(action, 0, 1)
166 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
167 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
169 /* start a simulated timer */
170 smpi_process()->simulated_start();
171 /*initialize the number of active processes */
172 active_processes = smpi_process_count();
174 set_reqq_self(new std::vector<MPI_Request>);
177 static void action_finalize(const char *const *action)
182 static void action_comm_size(const char *const *action)
184 communicator_size = parse_double(action[2]);
185 log_timed_action (action, smpi_process()->simulated_elapsed());
188 static void action_comm_split(const char *const *action)
190 log_timed_action (action, smpi_process()->simulated_elapsed());
193 static void action_comm_dup(const char *const *action)
195 log_timed_action (action, smpi_process()->simulated_elapsed());
198 static void action_compute(const char *const *action)
200 CHECK_ACTION_PARAMS(action, 1, 0)
201 double clock = smpi_process()->simulated_elapsed();
202 double flops= parse_double(action[2]);
203 int rank = smpi_process()->index();
204 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
205 extra->type=TRACING_COMPUTING;
206 extra->comp_size=flops;
207 TRACE_smpi_computing_in(rank, extra);
209 smpi_execute_flops(flops);
211 TRACE_smpi_computing_out(rank);
212 log_timed_action (action, clock);
215 static void action_send(const char *const *action)
217 CHECK_ACTION_PARAMS(action, 2, 1)
218 int to = atoi(action[2]);
219 double size=parse_double(action[3]);
220 double clock = smpi_process()->simulated_elapsed();
223 MPI_CURRENT_TYPE=decode_datatype(action[4]);
225 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
227 int rank = smpi_process()->index();
229 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
230 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
231 extra->type = TRACING_SEND;
232 extra->send_size = size;
234 extra->dst = dst_traced;
235 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
236 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
237 if (!TRACE_smpi_view_internals())
238 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
240 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
242 log_timed_action (action, clock);
244 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
247 static void action_Isend(const char *const *action)
249 CHECK_ACTION_PARAMS(action, 2, 1)
250 int to = atoi(action[2]);
251 double size=parse_double(action[3]);
252 double clock = smpi_process()->simulated_elapsed();
255 MPI_CURRENT_TYPE=decode_datatype(action[4]);
257 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
259 int rank = smpi_process()->index();
260 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
261 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
262 extra->type = TRACING_ISEND;
263 extra->send_size = size;
265 extra->dst = dst_traced;
266 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
267 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
268 if (!TRACE_smpi_view_internals())
269 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
271 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
273 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
275 get_reqq_self()->push_back(request);
277 log_timed_action (action, clock);
280 static void action_recv(const char *const *action) {
281 CHECK_ACTION_PARAMS(action, 2, 1)
282 int from = atoi(action[2]);
283 double size=parse_double(action[3]);
284 double clock = smpi_process()->simulated_elapsed();
288 MPI_CURRENT_TYPE=decode_datatype(action[4]);
290 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
292 int rank = smpi_process()->index();
293 int src_traced = MPI_COMM_WORLD->group()->rank(from);
295 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
296 extra->type = TRACING_RECV;
297 extra->send_size = size;
298 extra->src = src_traced;
300 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
301 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
303 //unknown size from the receiver point of view
305 Request::probe(from, 0, MPI_COMM_WORLD, &status);
309 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
311 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
312 if (!TRACE_smpi_view_internals()) {
313 TRACE_smpi_recv(rank, src_traced, rank, 0);
316 log_timed_action (action, clock);
319 static void action_Irecv(const char *const *action)
321 CHECK_ACTION_PARAMS(action, 2, 1)
322 int from = atoi(action[2]);
323 double size=parse_double(action[3]);
324 double clock = smpi_process()->simulated_elapsed();
327 MPI_CURRENT_TYPE=decode_datatype(action[4]);
329 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
331 int rank = smpi_process()->index();
332 int src_traced = MPI_COMM_WORLD->group()->rank(from);
333 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
334 extra->type = TRACING_IRECV;
335 extra->send_size = size;
336 extra->src = src_traced;
338 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
339 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
341 //unknow size from the receiver pov
343 Request::probe(from, 0, MPI_COMM_WORLD, &status);
347 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
349 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
350 get_reqq_self()->push_back(request);
352 log_timed_action (action, clock);
355 static void action_test(const char *const *action){
356 CHECK_ACTION_PARAMS(action, 0, 0)
357 double clock = smpi_process()->simulated_elapsed();
360 MPI_Request request = get_reqq_self()->back();
361 get_reqq_self()->pop_back();
362 //if request is null here, this may mean that a previous test has succeeded
363 //Different times in traced application and replayed version may lead to this
364 //In this case, ignore the extra calls.
365 if(request!=nullptr){
366 int rank = smpi_process()->index();
367 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
368 extra->type=TRACING_TEST;
369 TRACE_smpi_testing_in(rank, extra);
371 int flag = Request::test(&request, &status);
373 XBT_DEBUG("MPI_Test result: %d", flag);
374 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
375 get_reqq_self()->push_back(request);
377 TRACE_smpi_testing_out(rank);
379 log_timed_action (action, clock);
382 static void action_wait(const char *const *action){
383 CHECK_ACTION_PARAMS(action, 0, 0)
384 double clock = smpi_process()->simulated_elapsed();
387 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
388 xbt_str_join_array(action," "));
389 MPI_Request request = get_reqq_self()->back();
390 get_reqq_self()->pop_back();
392 if (request==nullptr){
393 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
397 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
399 MPI_Group group = request->comm()->group();
400 int src_traced = group->rank(request->src());
401 int dst_traced = group->rank(request->dst());
402 int is_wait_for_receive = (request->flags() & RECV);
403 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
404 extra->type = TRACING_WAIT;
405 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
407 Request::wait(&request, &status);
409 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
410 if (is_wait_for_receive)
411 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
412 log_timed_action (action, clock);
415 static void action_waitall(const char *const *action){
416 CHECK_ACTION_PARAMS(action, 0, 0)
417 double clock = smpi_process()->simulated_elapsed();
418 unsigned int count_requests=get_reqq_self()->size();
420 if (count_requests>0) {
421 MPI_Status status[count_requests];
423 int rank_traced = smpi_process()->index();
424 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
425 extra->type = TRACING_WAITALL;
426 extra->send_size=count_requests;
427 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
428 int recvs_snd[count_requests];
429 int recvs_rcv[count_requests];
431 for (auto req : *(get_reqq_self())){
432 if (req && (req->flags () & RECV)){
433 recvs_snd[i]=req->src();
434 recvs_rcv[i]=req->dst();
439 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
441 for (i=0; i<count_requests;i++){
442 if (recvs_snd[i]!=-100)
443 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
445 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
447 log_timed_action (action, clock);
450 static void action_barrier(const char *const *action){
451 double clock = smpi_process()->simulated_elapsed();
452 int rank = smpi_process()->index();
453 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
454 extra->type = TRACING_BARRIER;
455 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
457 Colls::barrier(MPI_COMM_WORLD);
459 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
460 log_timed_action (action, clock);
463 static void action_bcast(const char *const *action)
465 CHECK_ACTION_PARAMS(action, 1, 2)
466 double size = parse_double(action[2]);
467 double clock = smpi_process()->simulated_elapsed();
469 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
470 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
473 root= atoi(action[3]);
475 MPI_CURRENT_TYPE=decode_datatype(action[4]);
478 int rank = smpi_process()->index();
479 int root_traced = MPI_COMM_WORLD->group()->index(root);
481 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
482 extra->type = TRACING_BCAST;
483 extra->send_size = size;
484 extra->root = root_traced;
485 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
486 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
487 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
489 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
491 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
492 log_timed_action (action, clock);
495 static void action_reduce(const char *const *action)
497 CHECK_ACTION_PARAMS(action, 2, 2)
498 double comm_size = parse_double(action[2]);
499 double comp_size = parse_double(action[3]);
500 double clock = smpi_process()->simulated_elapsed();
502 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
505 root= atoi(action[4]);
507 MPI_CURRENT_TYPE=decode_datatype(action[5]);
510 int rank = smpi_process()->index();
511 int root_traced = MPI_COMM_WORLD->group()->rank(root);
512 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
513 extra->type = TRACING_REDUCE;
514 extra->send_size = comm_size;
515 extra->comp_size = comp_size;
516 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
517 extra->root = root_traced;
519 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
521 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
522 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
523 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
524 smpi_execute_flops(comp_size);
526 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
527 log_timed_action (action, clock);
530 static void action_allReduce(const char *const *action) {
531 CHECK_ACTION_PARAMS(action, 2, 1)
532 double comm_size = parse_double(action[2]);
533 double comp_size = parse_double(action[3]);
536 MPI_CURRENT_TYPE=decode_datatype(action[4]);
538 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
540 double clock = smpi_process()->simulated_elapsed();
541 int rank = smpi_process()->index();
542 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
543 extra->type = TRACING_ALLREDUCE;
544 extra->send_size = comm_size;
545 extra->comp_size = comp_size;
546 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
547 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
549 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
550 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
551 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
552 smpi_execute_flops(comp_size);
554 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
555 log_timed_action (action, clock);
558 static void action_allToAll(const char *const *action) {
559 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
560 double clock = smpi_process()->simulated_elapsed();
561 int comm_size = MPI_COMM_WORLD->size();
562 int send_size = parse_double(action[2]);
563 int recv_size = parse_double(action[3]);
564 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
566 if(action[4] && action[5]) {
567 MPI_CURRENT_TYPE=decode_datatype(action[4]);
568 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
571 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
573 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
574 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
576 int rank = smpi_process()->index();
577 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
578 extra->type = TRACING_ALLTOALL;
579 extra->send_size = send_size;
580 extra->recv_size = recv_size;
581 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
582 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
584 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
586 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
588 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
589 log_timed_action (action, clock);
592 static void action_gather(const char *const *action) {
593 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
596 1) 68 is the sendcounts
597 2) 68 is the recvcounts
598 3) 0 is the root node
599 4) 0 is the send datatype id, see decode_datatype()
600 5) 0 is the recv datatype id, see decode_datatype()
602 CHECK_ACTION_PARAMS(action, 2, 3)
603 double clock = smpi_process()->simulated_elapsed();
604 int comm_size = MPI_COMM_WORLD->size();
605 int send_size = parse_double(action[2]);
606 int recv_size = parse_double(action[3]);
607 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
608 if(action[4] && action[5]) {
609 MPI_CURRENT_TYPE=decode_datatype(action[5]);
610 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
612 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
614 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
615 void *recv = nullptr;
618 root=atoi(action[4]);
619 int rank = MPI_COMM_WORLD->rank();
622 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
624 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
625 extra->type = TRACING_GATHER;
626 extra->send_size = send_size;
627 extra->recv_size = recv_size;
629 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
630 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
632 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
634 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
636 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
637 log_timed_action (action, clock);
640 static void action_gatherv(const char *const *action) {
641 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
642 0 gather 68 68 10 10 10 0 0 0
644 1) 68 is the sendcount
645 2) 68 10 10 10 is the recvcounts
646 3) 0 is the root node
647 4) 0 is the send datatype id, see decode_datatype()
648 5) 0 is the recv datatype id, see decode_datatype()
650 double clock = smpi_process()->simulated_elapsed();
651 int comm_size = MPI_COMM_WORLD->size();
652 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
653 int send_size = parse_double(action[2]);
654 int disps[comm_size];
655 int recvcounts[comm_size];
658 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
659 if(action[4+comm_size] && action[5+comm_size]) {
660 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
661 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
663 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
665 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
666 void *recv = nullptr;
667 for(int i=0;i<comm_size;i++) {
668 recvcounts[i] = atoi(action[i+3]);
669 recv_sum=recv_sum+recvcounts[i];
673 int root=atoi(action[3+comm_size]);
674 int rank = MPI_COMM_WORLD->rank();
677 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
679 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
680 extra->type = TRACING_GATHERV;
681 extra->send_size = send_size;
682 extra->recvcounts= xbt_new(int,comm_size);
683 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
684 extra->recvcounts[i] = recvcounts[i];
686 extra->num_processes = comm_size;
687 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
688 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
690 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
692 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
694 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
695 log_timed_action (action, clock);
698 static void action_reducescatter(const char *const *action) {
699 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
700 0 reduceScatter 275427 275427 275427 204020 11346849 0
702 1) The first four values after the name of the action declare the recvcounts array
703 2) The value 11346849 is the amount of instructions
704 3) The last value corresponds to the datatype, see decode_datatype().
706 double clock = smpi_process()->simulated_elapsed();
707 int comm_size = MPI_COMM_WORLD->size();
708 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
709 int comp_size = parse_double(action[2+comm_size]);
710 int recvcounts[comm_size];
711 int rank = smpi_process()->index();
713 if(action[3+comm_size])
714 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
716 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
718 for(int i=0;i<comm_size;i++) {
719 recvcounts[i] = atoi(action[i+2]);
723 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
724 extra->type = TRACING_REDUCE_SCATTER;
725 extra->send_size = 0;
726 extra->recvcounts= xbt_new(int, comm_size);
727 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
728 extra->recvcounts[i] = recvcounts[i];
729 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
730 extra->comp_size = comp_size;
731 extra->num_processes = comm_size;
733 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
735 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
736 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
738 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
739 smpi_execute_flops(comp_size);
741 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
742 log_timed_action (action, clock);
745 static void action_allgather(const char *const *action) {
746 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
747 0 allGather 275427 275427
749 1) 275427 is the sendcount
750 2) 275427 is the recvcount
751 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
753 double clock = smpi_process()->simulated_elapsed();
755 CHECK_ACTION_PARAMS(action, 2, 2)
756 int sendcount=atoi(action[2]);
757 int recvcount=atoi(action[3]);
759 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
761 if(action[4] && action[5]) {
762 MPI_CURRENT_TYPE = decode_datatype(action[4]);
763 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
765 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
767 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
768 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
770 int rank = smpi_process()->index();
771 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
772 extra->type = TRACING_ALLGATHER;
773 extra->send_size = sendcount;
774 extra->recv_size= recvcount;
775 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
776 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
777 extra->num_processes = MPI_COMM_WORLD->size();
779 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
781 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
783 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
784 log_timed_action (action, clock);
787 static void action_allgatherv(const char *const *action) {
788 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
789 0 allGatherV 275427 275427 275427 275427 204020
791 1) 275427 is the sendcount
792 2) The next four elements declare the recvcounts array
793 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
795 double clock = smpi_process()->simulated_elapsed();
797 int comm_size = MPI_COMM_WORLD->size();
798 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
799 int sendcount=atoi(action[2]);
800 int recvcounts[comm_size];
801 int disps[comm_size];
803 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
805 if(action[3+comm_size] && action[4+comm_size]) {
806 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
807 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
809 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
811 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
813 for(int i=0;i<comm_size;i++) {
814 recvcounts[i] = atoi(action[i+3]);
815 recv_sum=recv_sum+recvcounts[i];
818 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
820 int rank = smpi_process()->index();
821 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
822 extra->type = TRACING_ALLGATHERV;
823 extra->send_size = sendcount;
824 extra->recvcounts= xbt_new(int, comm_size);
825 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
826 extra->recvcounts[i] = recvcounts[i];
827 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
828 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
829 extra->num_processes = comm_size;
831 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
833 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
836 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
837 log_timed_action (action, clock);
840 static void action_allToAllv(const char *const *action) {
841 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
842 0 allToAllV 100 1 7 10 12 100 1 70 10 5
844 1) 100 is the size of the send buffer *sizeof(int),
845 2) 1 7 10 12 is the sendcounts array
846 3) 100*sizeof(int) is the size of the receiver buffer
847 4) 1 70 10 5 is the recvcounts array
849 double clock = smpi_process()->simulated_elapsed();
851 int comm_size = MPI_COMM_WORLD->size();
852 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
853 int sendcounts[comm_size];
854 int recvcounts[comm_size];
855 int senddisps[comm_size];
856 int recvdisps[comm_size];
858 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
860 int send_buf_size=parse_double(action[2]);
861 int recv_buf_size=parse_double(action[3+comm_size]);
862 if(action[4+2*comm_size] && action[5+2*comm_size]) {
863 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
864 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
867 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
869 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
870 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
872 for(int i=0;i<comm_size;i++) {
873 sendcounts[i] = atoi(action[i+3]);
874 recvcounts[i] = atoi(action[i+4+comm_size]);
879 int rank = smpi_process()->index();
880 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
881 extra->type = TRACING_ALLTOALLV;
882 extra->recvcounts= xbt_new(int, comm_size);
883 extra->sendcounts= xbt_new(int, comm_size);
884 extra->num_processes = comm_size;
886 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
887 extra->send_size += sendcounts[i];
888 extra->sendcounts[i] = sendcounts[i];
889 extra->recv_size += recvcounts[i];
890 extra->recvcounts[i] = recvcounts[i];
892 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
893 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
895 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
897 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
898 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
900 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
901 log_timed_action (action, clock);
904 }} // namespace simgrid::smpi
906 void smpi_replay_run(int *argc, char***argv){
907 /* First initializes everything */
908 simgrid::smpi::Process::init(argc, argv);
909 smpi_process()->mark_as_initialized();
910 smpi_process()->set_replaying(true);
912 int rank = smpi_process()->index();
913 TRACE_smpi_init(rank);
914 TRACE_smpi_computing_init(rank);
915 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
916 extra->type = TRACING_INIT;
917 char *operation =bprintf("%s_init",__FUNCTION__);
918 TRACE_smpi_collective_in(rank, -1, operation, extra);
919 TRACE_smpi_collective_out(rank, -1, operation);
921 xbt_replay_action_register("init", simgrid::smpi::action_init);
922 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
923 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
924 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
925 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
926 xbt_replay_action_register("send", simgrid::smpi::action_send);
927 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
928 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
929 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
930 xbt_replay_action_register("test", simgrid::smpi::action_test);
931 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
932 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
933 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
934 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
935 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
936 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
937 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
938 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
939 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
940 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
941 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
942 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
943 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
944 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
946 //if we have a delayed start, sleep here.
949 double value = strtod((*argv)[2], &endptr);
951 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
952 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
953 smpi_execute_flops(value);
955 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
956 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
957 smpi_execute_flops(0.0);
960 /* Actually run the replay */
961 simgrid::xbt::replay_runner(*argc, *argv);
963 /* and now, finalize everything */
964 /* One active process will stop. Decrease the counter*/
965 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
966 if (!get_reqq_self()->empty()){
967 unsigned int count_requests=get_reqq_self()->size();
968 MPI_Request requests[count_requests];
969 MPI_Status status[count_requests];
972 for (auto req: *get_reqq_self()){
976 simgrid::smpi::Request::waitall(count_requests, requests, status);
978 delete get_reqq_self();
981 if(active_processes==0){
982 /* Last process alive speaking: end the simulated timer */
983 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
984 xbt_free(sendbuffer);
985 xbt_free(recvbuffer);
988 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
989 extra_fin->type = TRACING_FINALIZE;
990 operation =bprintf("%s_finalize",__FUNCTION__);
991 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
993 smpi_process()->finalize();
995 TRACE_smpi_collective_out(rank, -1, operation);
996 TRACE_smpi_finalize(smpi_process()->index());