1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 using simgrid::s4u::Actor;
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 static int communicator_size = 0;
23 static int active_processes = 0;
24 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
26 static MPI_Datatype MPI_DEFAULT_TYPE;
27 static MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size = 0;
30 static char* sendbuffer = nullptr;
31 static int recvbuffer_size = 0;
32 static char* recvbuffer = nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(Actor::self()->getPid());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({Actor::self()->getPid(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (not smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (not smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (not smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
94 switch(atoi(action)) {
96 MPI_CURRENT_TYPE=MPI_DOUBLE;
99 MPI_CURRENT_TYPE=MPI_INT;
102 MPI_CURRENT_TYPE=MPI_CHAR;
105 MPI_CURRENT_TYPE=MPI_SHORT;
108 MPI_CURRENT_TYPE=MPI_LONG;
111 MPI_CURRENT_TYPE=MPI_FLOAT;
114 MPI_CURRENT_TYPE=MPI_BYTE;
117 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
120 return MPI_CURRENT_TYPE;
123 const char* encode_datatype(MPI_Datatype datatype)
125 if (datatype==MPI_BYTE)
127 if(datatype==MPI_DOUBLE)
129 if(datatype==MPI_INT)
131 if(datatype==MPI_CHAR)
133 if(datatype==MPI_SHORT)
135 if(datatype==MPI_LONG)
137 if(datatype==MPI_FLOAT)
139 // default - not implemented.
140 // do not warn here as we pass in this function even for other trace formats
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
146 while(action[i]!=nullptr)\
149 THROWF(arg_error, 0, "%s replay failed.\n" \
150 "%d items were given on the line. First two should be process_id and action. " \
151 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
158 static void action_init(const char *const *action)
160 XBT_DEBUG("Initialize the counters");
161 CHECK_ACTION_PARAMS(action, 0, 1)
163 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
165 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
167 /* start a simulated timer */
168 smpi_process()->simulated_start();
169 /*initialize the number of active processes */
170 active_processes = smpi_process_count();
172 set_reqq_self(new std::vector<MPI_Request>);
175 static void action_finalize(const char *const *action)
180 static void action_comm_size(const char *const *action)
182 communicator_size = parse_double(action[2]);
183 log_timed_action (action, smpi_process()->simulated_elapsed());
186 static void action_comm_split(const char *const *action)
188 log_timed_action (action, smpi_process()->simulated_elapsed());
191 static void action_comm_dup(const char *const *action)
193 log_timed_action (action, smpi_process()->simulated_elapsed());
196 static void action_compute(const char *const *action)
198 CHECK_ACTION_PARAMS(action, 1, 0)
199 double clock = smpi_process()->simulated_elapsed();
200 double flops= parse_double(action[2]);
201 int my_proc_id = Actor::self()->getPid();
203 TRACE_smpi_computing_in(my_proc_id, flops);
204 smpi_execute_flops(flops);
205 TRACE_smpi_computing_out(my_proc_id);
207 log_timed_action (action, clock);
210 static void action_send(const char *const *action)
212 CHECK_ACTION_PARAMS(action, 2, 1)
213 int to = atoi(action[2]);
214 double size=parse_double(action[3]);
215 double clock = smpi_process()->simulated_elapsed();
217 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
219 int my_proc_id = Actor::self()->getPid();
220 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
222 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
223 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
224 if (not TRACE_smpi_view_internals())
225 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
227 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
229 TRACE_smpi_comm_out(my_proc_id);
231 log_timed_action(action, clock);
234 static void action_Isend(const char *const *action)
236 CHECK_ACTION_PARAMS(action, 2, 1)
237 int to = atoi(action[2]);
238 double size=parse_double(action[3]);
239 double clock = smpi_process()->simulated_elapsed();
241 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
243 int my_proc_id = Actor::self()->getPid();
244 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
245 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
246 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
247 if (not TRACE_smpi_view_internals())
248 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
250 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
252 TRACE_smpi_comm_out(my_proc_id);
254 get_reqq_self()->push_back(request);
256 log_timed_action (action, clock);
259 static void action_recv(const char *const *action) {
260 CHECK_ACTION_PARAMS(action, 2, 1)
261 int from = atoi(action[2]);
262 double size=parse_double(action[3]);
263 double clock = smpi_process()->simulated_elapsed();
266 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
268 int my_proc_id = Actor::self()->getPid();
269 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
271 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
272 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
274 //unknown size from the receiver point of view
276 Request::probe(from, 0, MPI_COMM_WORLD, &status);
280 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
282 TRACE_smpi_comm_out(my_proc_id);
283 if (not TRACE_smpi_view_internals()) {
284 TRACE_smpi_recv(src_traced, my_proc_id, 0);
287 log_timed_action (action, clock);
290 static void action_Irecv(const char *const *action)
292 CHECK_ACTION_PARAMS(action, 2, 1)
293 int from = atoi(action[2]);
294 double size=parse_double(action[3]);
295 double clock = smpi_process()->simulated_elapsed();
297 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
299 int my_proc_id = Actor::self()->getPid();
300 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
301 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
303 //unknow size from the receiver pov
305 Request::probe(from, 0, MPI_COMM_WORLD, &status);
309 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
311 TRACE_smpi_comm_out(my_proc_id);
312 get_reqq_self()->push_back(request);
314 log_timed_action (action, clock);
317 static void action_test(const char* const* action)
319 CHECK_ACTION_PARAMS(action, 0, 0)
320 double clock = smpi_process()->simulated_elapsed();
323 MPI_Request request = get_reqq_self()->back();
324 get_reqq_self()->pop_back();
325 //if request is null here, this may mean that a previous test has succeeded
326 //Different times in traced application and replayed version may lead to this
327 //In this case, ignore the extra calls.
328 if(request!=nullptr){
329 int my_proc_id = Actor::self()->getPid();
330 TRACE_smpi_testing_in(my_proc_id);
332 int flag = Request::test(&request, &status);
334 XBT_DEBUG("MPI_Test result: %d", flag);
335 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
336 get_reqq_self()->push_back(request);
338 TRACE_smpi_testing_out(my_proc_id);
340 log_timed_action (action, clock);
343 static void action_wait(const char *const *action){
344 CHECK_ACTION_PARAMS(action, 0, 0)
345 double clock = smpi_process()->simulated_elapsed();
348 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
349 xbt_str_join_array(action," "));
350 MPI_Request request = get_reqq_self()->back();
351 get_reqq_self()->pop_back();
353 if (request==nullptr){
354 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
358 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
360 MPI_Group group = request->comm()->group();
361 int src_traced = group->rank(request->src());
362 int dst_traced = group->rank(request->dst());
363 int is_wait_for_receive = (request->flags() & RECV);
364 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
366 Request::wait(&request, &status);
368 TRACE_smpi_comm_out(rank);
369 if (is_wait_for_receive)
370 TRACE_smpi_recv(src_traced, dst_traced, 0);
371 log_timed_action (action, clock);
374 static void action_waitall(const char *const *action){
375 CHECK_ACTION_PARAMS(action, 0, 0)
376 double clock = smpi_process()->simulated_elapsed();
377 const unsigned int count_requests = get_reqq_self()->size();
379 if (count_requests>0) {
380 MPI_Status status[count_requests];
382 int my_proc_id_traced = Actor::self()->getPid();
383 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
384 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
385 int recvs_snd[count_requests];
386 int recvs_rcv[count_requests];
387 for (unsigned int i = 0; i < count_requests; i++) {
388 const auto& req = (*get_reqq_self())[i];
389 if (req && (req->flags() & RECV)) {
390 recvs_snd[i] = req->src();
391 recvs_rcv[i] = req->dst();
395 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
397 for (unsigned i = 0; i < count_requests; i++) {
398 if (recvs_snd[i]!=-100)
399 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
401 TRACE_smpi_comm_out(my_proc_id_traced);
403 log_timed_action (action, clock);
406 static void action_barrier(const char *const *action){
407 double clock = smpi_process()->simulated_elapsed();
408 int my_proc_id = Actor::self()->getPid();
409 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
411 Colls::barrier(MPI_COMM_WORLD);
413 TRACE_smpi_comm_out(my_proc_id);
414 log_timed_action (action, clock);
417 static void action_bcast(const char *const *action)
419 CHECK_ACTION_PARAMS(action, 1, 2)
420 double size = parse_double(action[2]);
421 double clock = smpi_process()->simulated_elapsed();
422 int root = (action[3]) ? atoi(action[3]) : 0;
423 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
424 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
426 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
428 int my_proc_id = Actor::self()->getPid();
429 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
430 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
431 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
433 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
435 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
437 TRACE_smpi_comm_out(my_proc_id);
438 log_timed_action (action, clock);
441 static void action_reduce(const char *const *action)
443 CHECK_ACTION_PARAMS(action, 2, 2)
444 double comm_size = parse_double(action[2]);
445 double comp_size = parse_double(action[3]);
446 double clock = smpi_process()->simulated_elapsed();
447 int root = (action[4]) ? atoi(action[4]) : 0;
449 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
451 int my_proc_id = Actor::self()->getPid();
452 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
453 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
454 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
456 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
458 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
459 smpi_execute_flops(comp_size);
461 TRACE_smpi_comm_out(my_proc_id);
462 log_timed_action (action, clock);
465 static void action_allReduce(const char *const *action) {
466 CHECK_ACTION_PARAMS(action, 2, 1)
467 double comm_size = parse_double(action[2]);
468 double comp_size = parse_double(action[3]);
470 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
472 double clock = smpi_process()->simulated_elapsed();
473 int my_proc_id = Actor::self()->getPid();
474 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
475 encode_datatype(MPI_CURRENT_TYPE), ""));
477 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
479 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
480 smpi_execute_flops(comp_size);
482 TRACE_smpi_comm_out(my_proc_id);
483 log_timed_action (action, clock);
486 static void action_allToAll(const char *const *action) {
487 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
488 double clock = smpi_process()->simulated_elapsed();
489 int comm_size = MPI_COMM_WORLD->size();
490 int send_size = parse_double(action[2]);
491 int recv_size = parse_double(action[3]);
492 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
493 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
495 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
496 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
498 int my_proc_id = Actor::self()->getPid();
499 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
500 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
501 encode_datatype(MPI_CURRENT_TYPE),
502 encode_datatype(MPI_CURRENT_TYPE2)));
504 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
506 TRACE_smpi_comm_out(my_proc_id);
507 log_timed_action (action, clock);
510 static void action_gather(const char *const *action) {
511 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
514 1) 68 is the sendcounts
515 2) 68 is the recvcounts
516 3) 0 is the root node
517 4) 0 is the send datatype id, see decode_datatype()
518 5) 0 is the recv datatype id, see decode_datatype()
520 CHECK_ACTION_PARAMS(action, 2, 3)
521 double clock = smpi_process()->simulated_elapsed();
522 int comm_size = MPI_COMM_WORLD->size();
523 int send_size = parse_double(action[2]);
524 int recv_size = parse_double(action[3]);
525 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
526 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
528 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
529 void *recv = nullptr;
530 int root = (action[4]) ? atoi(action[4]) : 0;
531 int rank = MPI_COMM_WORLD->rank();
534 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
536 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
537 encode_datatype(MPI_CURRENT_TYPE),
538 encode_datatype(MPI_CURRENT_TYPE2)));
540 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
542 TRACE_smpi_comm_out(Actor::self()->getPid());
543 log_timed_action (action, clock);
546 static void action_scatter(const char* const* action)
548 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
551 1) 68 is the sendcounts
552 2) 68 is the recvcounts
553 3) 0 is the root node
554 4) 0 is the send datatype id, see decode_datatype()
555 5) 0 is the recv datatype id, see decode_datatype()
557 CHECK_ACTION_PARAMS(action, 2, 3)
558 double clock = smpi_process()->simulated_elapsed();
559 int comm_size = MPI_COMM_WORLD->size();
560 int send_size = parse_double(action[2]);
561 int recv_size = parse_double(action[3]);
562 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
563 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
565 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
566 void* recv = nullptr;
567 int root = (action[4]) ? atoi(action[4]) : 0;
568 int rank = MPI_COMM_WORLD->rank();
571 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
573 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
574 encode_datatype(MPI_CURRENT_TYPE),
575 encode_datatype(MPI_CURRENT_TYPE2)));
577 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
579 TRACE_smpi_comm_out(Actor::self()->getPid());
580 log_timed_action(action, clock);
583 static void action_gatherv(const char *const *action) {
584 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
585 0 gather 68 68 10 10 10 0 0 0
587 1) 68 is the sendcount
588 2) 68 10 10 10 is the recvcounts
589 3) 0 is the root node
590 4) 0 is the send datatype id, see decode_datatype()
591 5) 0 is the recv datatype id, see decode_datatype()
593 double clock = smpi_process()->simulated_elapsed();
594 int comm_size = MPI_COMM_WORLD->size();
595 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
596 int send_size = parse_double(action[2]);
597 int disps[comm_size];
598 int recvcounts[comm_size];
602 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
603 MPI_Datatype MPI_CURRENT_TYPE2{
604 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
606 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
607 void *recv = nullptr;
608 for(int i=0;i<comm_size;i++) {
609 recvcounts[i] = atoi(action[i+3]);
610 recv_sum=recv_sum+recvcounts[i];
614 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
615 int rank = MPI_COMM_WORLD->rank();
618 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
620 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
622 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
623 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
624 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
626 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
628 TRACE_smpi_comm_out(Actor::self()->getPid());
629 log_timed_action (action, clock);
632 static void action_scatterv(const char* const* action)
634 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
635 0 gather 68 10 10 10 68 0 0 0
637 1) 68 10 10 10 is the sendcounts
638 2) 68 is the recvcount
639 3) 0 is the root node
640 4) 0 is the send datatype id, see decode_datatype()
641 5) 0 is the recv datatype id, see decode_datatype()
643 double clock = smpi_process()->simulated_elapsed();
644 int comm_size = MPI_COMM_WORLD->size();
645 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
646 int recv_size = parse_double(action[2 + comm_size]);
647 int disps[comm_size];
648 int sendcounts[comm_size];
652 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
653 MPI_Datatype MPI_CURRENT_TYPE2{
654 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
656 void* send = nullptr;
657 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
658 for (int i = 0; i < comm_size; i++) {
659 sendcounts[i] = atoi(action[i + 2]);
660 send_sum += sendcounts[i];
664 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
665 int rank = MPI_COMM_WORLD->rank();
668 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
670 std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
672 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
673 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
674 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
676 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
678 TRACE_smpi_comm_out(Actor::self()->getPid());
679 log_timed_action(action, clock);
682 static void action_reducescatter(const char *const *action) {
683 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
684 0 reduceScatter 275427 275427 275427 204020 11346849 0
686 1) The first four values after the name of the action declare the recvcounts array
687 2) The value 11346849 is the amount of instructions
688 3) The last value corresponds to the datatype, see decode_datatype().
690 double clock = smpi_process()->simulated_elapsed();
691 int comm_size = MPI_COMM_WORLD->size();
692 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
693 int comp_size = parse_double(action[2+comm_size]);
694 int recvcounts[comm_size];
695 int my_proc_id = Actor::self()->getPid();
697 std::vector<int>* trace_recvcounts = new std::vector<int>;
698 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
700 for(int i=0;i<comm_size;i++) {
701 recvcounts[i] = atoi(action[i+2]);
702 trace_recvcounts->push_back(recvcounts[i]);
706 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
707 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
708 std::to_string(comp_size), /* ugly hack to print comp_size */
709 encode_datatype(MPI_CURRENT_TYPE)));
711 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
712 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
714 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
715 smpi_execute_flops(comp_size);
717 TRACE_smpi_comm_out(my_proc_id);
718 log_timed_action (action, clock);
721 static void action_allgather(const char *const *action) {
722 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
723 0 allGather 275427 275427
725 1) 275427 is the sendcount
726 2) 275427 is the recvcount
727 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
729 double clock = smpi_process()->simulated_elapsed();
731 CHECK_ACTION_PARAMS(action, 2, 2)
732 int sendcount=atoi(action[2]);
733 int recvcount=atoi(action[3]);
735 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
736 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
738 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
739 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
741 int my_proc_id = Actor::self()->getPid();
743 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
744 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
745 encode_datatype(MPI_CURRENT_TYPE),
746 encode_datatype(MPI_CURRENT_TYPE2)));
748 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
750 TRACE_smpi_comm_out(my_proc_id);
751 log_timed_action (action, clock);
754 static void action_allgatherv(const char *const *action) {
755 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
756 0 allGatherV 275427 275427 275427 275427 204020
758 1) 275427 is the sendcount
759 2) The next four elements declare the recvcounts array
760 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
762 double clock = smpi_process()->simulated_elapsed();
764 int comm_size = MPI_COMM_WORLD->size();
765 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
766 int sendcount=atoi(action[2]);
767 int recvcounts[comm_size];
768 int disps[comm_size];
772 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
773 MPI_Datatype MPI_CURRENT_TYPE2{
774 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
776 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
778 for(int i=0;i<comm_size;i++) {
779 recvcounts[i] = atoi(action[i+3]);
780 recv_sum=recv_sum+recvcounts[i];
783 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
785 int my_proc_id = Actor::self()->getPid();
787 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
789 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
790 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
791 encode_datatype(MPI_CURRENT_TYPE),
792 encode_datatype(MPI_CURRENT_TYPE2)));
794 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
797 TRACE_smpi_comm_out(my_proc_id);
798 log_timed_action (action, clock);
801 static void action_allToAllv(const char *const *action) {
802 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
803 0 allToAllV 100 1 7 10 12 100 1 70 10 5
805 1) 100 is the size of the send buffer *sizeof(int),
806 2) 1 7 10 12 is the sendcounts array
807 3) 100*sizeof(int) is the size of the receiver buffer
808 4) 1 70 10 5 is the recvcounts array
810 double clock = smpi_process()->simulated_elapsed();
812 int comm_size = MPI_COMM_WORLD->size();
813 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
816 int sendcounts[comm_size];
817 std::vector<int>* trace_sendcounts = new std::vector<int>;
818 int recvcounts[comm_size];
819 std::vector<int>* trace_recvcounts = new std::vector<int>;
820 int senddisps[comm_size];
821 int recvdisps[comm_size];
823 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
824 ? decode_datatype(action[4 + 2 * comm_size])
826 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
827 ? decode_datatype(action[5 + 2 * comm_size])
830 int send_buf_size=parse_double(action[2]);
831 int recv_buf_size=parse_double(action[3+comm_size]);
832 int my_proc_id = Actor::self()->getPid();
833 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
834 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
836 for(int i=0;i<comm_size;i++) {
837 sendcounts[i] = atoi(action[i+3]);
838 trace_sendcounts->push_back(sendcounts[i]);
839 send_size += sendcounts[i];
840 recvcounts[i] = atoi(action[i+4+comm_size]);
841 trace_recvcounts->push_back(recvcounts[i]);
842 recv_size += recvcounts[i];
847 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
848 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
849 trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
850 encode_datatype(MPI_CURRENT_TYPE2)));
852 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
853 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
855 TRACE_smpi_comm_out(my_proc_id);
856 log_timed_action (action, clock);
859 }} // namespace simgrid::smpi
861 /** @brief Only initialize the replay, don't do it for real */
862 void smpi_replay_init(int* argc, char*** argv)
864 simgrid::smpi::Process::init(argc, argv);
865 smpi_process()->mark_as_initialized();
866 smpi_process()->set_replaying(true);
868 int my_proc_id = Actor::self()->getPid();
869 TRACE_smpi_init(my_proc_id);
870 TRACE_smpi_computing_init(my_proc_id);
871 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
872 TRACE_smpi_comm_out(my_proc_id);
873 xbt_replay_action_register("init", simgrid::smpi::action_init);
874 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
875 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
876 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
877 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
878 xbt_replay_action_register("send", simgrid::smpi::action_send);
879 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
880 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
881 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
882 xbt_replay_action_register("test", simgrid::smpi::action_test);
883 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
884 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
885 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
886 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
887 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
888 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
889 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
890 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
891 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
892 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
893 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
894 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
895 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
896 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
897 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
898 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
900 //if we have a delayed start, sleep here.
902 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
903 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
904 smpi_execute_flops(value);
906 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
907 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
908 smpi_execute_flops(0.0);
912 /** @brief actually run the replay after initialization */
913 void smpi_replay_main(int* argc, char*** argv)
915 simgrid::xbt::replay_runner(*argc, *argv);
917 /* and now, finalize everything */
918 /* One active process will stop. Decrease the counter*/
919 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
920 if (not get_reqq_self()->empty()) {
921 unsigned int count_requests=get_reqq_self()->size();
922 MPI_Request requests[count_requests];
923 MPI_Status status[count_requests];
926 for (auto const& req : *get_reqq_self()) {
930 simgrid::smpi::Request::waitall(count_requests, requests, status);
932 delete get_reqq_self();
935 if(active_processes==0){
936 /* Last process alive speaking: end the simulated timer */
937 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
938 xbt_free(sendbuffer);
939 xbt_free(recvbuffer);
942 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
944 smpi_process()->finalize();
946 TRACE_smpi_comm_out(Actor::self()->getPid());
947 TRACE_smpi_finalize(Actor::self()->getPid());
950 /** @brief chain a replay initialization and a replay start */
951 void smpi_replay_run(int* argc, char*** argv)
953 smpi_replay_init(argc, argv);
954 smpi_replay_main(argc, argv);