1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
68 static void action_init(simgrid::xbt::ReplayAction& action)
70 XBT_DEBUG("Initialize the counters");
71 CHECK_ACTION_PARAMS(action, 0, 1)
72 if (action.size() > 2)
73 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
75 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
77 /* start a simulated timer */
78 smpi_process()->simulated_start();
79 /*initialize the number of active processes */
80 active_processes = smpi_process_count();
82 set_reqq_self(new std::vector<MPI_Request>);
85 static void action_finalize(simgrid::xbt::ReplayAction& action)
90 static void action_comm_size(simgrid::xbt::ReplayAction& action)
92 communicator_size = parse_double(action[2]);
93 log_timed_action (action, smpi_process()->simulated_elapsed());
96 static void action_comm_split(simgrid::xbt::ReplayAction& action)
98 log_timed_action (action, smpi_process()->simulated_elapsed());
101 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
103 log_timed_action (action, smpi_process()->simulated_elapsed());
106 static void action_compute(simgrid::xbt::ReplayAction& action)
108 CHECK_ACTION_PARAMS(action, 1, 0)
109 double clock = smpi_process()->simulated_elapsed();
110 double flops= parse_double(action[2]);
111 int my_proc_id = Actor::self()->getPid();
113 TRACE_smpi_computing_in(my_proc_id, flops);
114 smpi_execute_flops(flops);
115 TRACE_smpi_computing_out(my_proc_id);
117 log_timed_action (action, clock);
120 static void action_send(simgrid::xbt::ReplayAction& action)
122 CHECK_ACTION_PARAMS(action, 2, 1)
123 int to = std::stoi(action[2]);
124 double size=parse_double(action[3]);
125 double clock = smpi_process()->simulated_elapsed();
127 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
129 int my_proc_id = Actor::self()->getPid();
130 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
132 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
133 new simgrid::instr::Pt2PtTIData("send", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
134 if (not TRACE_smpi_view_internals())
135 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
137 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
139 TRACE_smpi_comm_out(my_proc_id);
141 log_timed_action(action, clock);
144 static void action_Isend(simgrid::xbt::ReplayAction& action)
146 CHECK_ACTION_PARAMS(action, 2, 1)
147 int to = std::stoi(action[2]);
148 double size=parse_double(action[3]);
149 double clock = smpi_process()->simulated_elapsed();
151 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
153 int my_proc_id = Actor::self()->getPid();
154 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
155 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
156 new simgrid::instr::Pt2PtTIData("Isend", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
157 if (not TRACE_smpi_view_internals())
158 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
160 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
162 TRACE_smpi_comm_out(my_proc_id);
164 get_reqq_self()->push_back(request);
166 log_timed_action (action, clock);
169 static void action_recv(simgrid::xbt::ReplayAction& action)
171 CHECK_ACTION_PARAMS(action, 2, 1)
172 int from = std::stoi(action[2]);
173 double size=parse_double(action[3]);
174 double clock = smpi_process()->simulated_elapsed();
177 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
179 int my_proc_id = Actor::self()->getPid();
180 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
182 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
183 new simgrid::instr::Pt2PtTIData("recv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
185 //unknown size from the receiver point of view
187 Request::probe(from, 0, MPI_COMM_WORLD, &status);
191 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
193 TRACE_smpi_comm_out(my_proc_id);
194 if (not TRACE_smpi_view_internals()) {
195 TRACE_smpi_recv(src_traced, my_proc_id, 0);
198 log_timed_action (action, clock);
201 static void action_Irecv(simgrid::xbt::ReplayAction& action)
203 CHECK_ACTION_PARAMS(action, 2, 1)
204 int from = std::stoi(action[2]);
205 double size=parse_double(action[3]);
206 double clock = smpi_process()->simulated_elapsed();
208 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
210 int my_proc_id = Actor::self()->getPid();
211 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
212 new simgrid::instr::Pt2PtTIData("Irecv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
214 //unknow size from the receiver pov
216 Request::probe(from, 0, MPI_COMM_WORLD, &status);
220 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
222 TRACE_smpi_comm_out(my_proc_id);
223 get_reqq_self()->push_back(request);
225 log_timed_action (action, clock);
228 static void action_test(simgrid::xbt::ReplayAction& action)
230 CHECK_ACTION_PARAMS(action, 0, 0)
231 double clock = smpi_process()->simulated_elapsed();
234 MPI_Request request = get_reqq_self()->back();
235 get_reqq_self()->pop_back();
236 //if request is null here, this may mean that a previous test has succeeded
237 //Different times in traced application and replayed version may lead to this
238 //In this case, ignore the extra calls.
239 if(request!=nullptr){
240 int my_proc_id = Actor::self()->getPid();
241 TRACE_smpi_testing_in(my_proc_id);
243 int flag = Request::test(&request, &status);
245 XBT_DEBUG("MPI_Test result: %d", flag);
246 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
247 get_reqq_self()->push_back(request);
249 TRACE_smpi_testing_out(my_proc_id);
251 log_timed_action (action, clock);
254 static void action_wait(simgrid::xbt::ReplayAction& action)
256 CHECK_ACTION_PARAMS(action, 0, 0)
257 double clock = smpi_process()->simulated_elapsed();
260 std::string s = boost::algorithm::join(action, " ");
261 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
262 MPI_Request request = get_reqq_self()->back();
263 get_reqq_self()->pop_back();
265 if (request==nullptr){
266 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
270 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
272 MPI_Group group = request->comm()->group();
273 int src_traced = group->rank(request->src());
274 int dst_traced = group->rank(request->dst());
275 int is_wait_for_receive = (request->flags() & RECV);
276 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
278 Request::wait(&request, &status);
280 TRACE_smpi_comm_out(rank);
281 if (is_wait_for_receive)
282 TRACE_smpi_recv(src_traced, dst_traced, 0);
283 log_timed_action (action, clock);
286 static void action_waitall(simgrid::xbt::ReplayAction& action)
288 CHECK_ACTION_PARAMS(action, 0, 0)
289 double clock = smpi_process()->simulated_elapsed();
290 const unsigned int count_requests = get_reqq_self()->size();
292 if (count_requests>0) {
293 MPI_Status status[count_requests];
295 int my_proc_id_traced = Actor::self()->getPid();
296 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
297 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
298 int recvs_snd[count_requests];
299 int recvs_rcv[count_requests];
300 for (unsigned int i = 0; i < count_requests; i++) {
301 const auto& req = (*get_reqq_self())[i];
302 if (req && (req->flags() & RECV)) {
303 recvs_snd[i] = req->src();
304 recvs_rcv[i] = req->dst();
308 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
310 for (unsigned i = 0; i < count_requests; i++) {
311 if (recvs_snd[i]!=-100)
312 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
314 TRACE_smpi_comm_out(my_proc_id_traced);
316 log_timed_action (action, clock);
319 static void action_barrier(simgrid::xbt::ReplayAction& action)
321 double clock = smpi_process()->simulated_elapsed();
322 int my_proc_id = Actor::self()->getPid();
323 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
325 Colls::barrier(MPI_COMM_WORLD);
327 TRACE_smpi_comm_out(my_proc_id);
328 log_timed_action (action, clock);
331 static void action_bcast(simgrid::xbt::ReplayAction& action)
333 CHECK_ACTION_PARAMS(action, 1, 2)
334 double size = parse_double(action[2]);
335 double clock = smpi_process()->simulated_elapsed();
336 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
337 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
338 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
340 int my_proc_id = Actor::self()->getPid();
341 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
342 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
343 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
345 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
347 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
349 TRACE_smpi_comm_out(my_proc_id);
350 log_timed_action (action, clock);
353 static void action_reduce(simgrid::xbt::ReplayAction& action)
355 CHECK_ACTION_PARAMS(action, 2, 2)
356 double comm_size = parse_double(action[2]);
357 double comp_size = parse_double(action[3]);
358 double clock = smpi_process()->simulated_elapsed();
359 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
361 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
363 int my_proc_id = Actor::self()->getPid();
364 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
365 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
366 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
368 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
369 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
370 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
371 smpi_execute_flops(comp_size);
373 TRACE_smpi_comm_out(my_proc_id);
374 log_timed_action (action, clock);
377 static void action_allReduce(simgrid::xbt::ReplayAction& action)
379 CHECK_ACTION_PARAMS(action, 2, 1)
380 double comm_size = parse_double(action[2]);
381 double comp_size = parse_double(action[3]);
383 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
385 double clock = smpi_process()->simulated_elapsed();
386 int my_proc_id = Actor::self()->getPid();
387 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
388 Datatype::encode(MPI_CURRENT_TYPE), ""));
390 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
391 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
392 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
393 smpi_execute_flops(comp_size);
395 TRACE_smpi_comm_out(my_proc_id);
396 log_timed_action (action, clock);
399 static void action_allToAll(simgrid::xbt::ReplayAction& action)
401 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
402 double clock = smpi_process()->simulated_elapsed();
403 unsigned long comm_size = MPI_COMM_WORLD->size();
404 int send_size = parse_double(action[2]);
405 int recv_size = parse_double(action[3]);
406 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
407 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
409 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
410 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
412 int my_proc_id = Actor::self()->getPid();
413 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
414 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
415 Datatype::encode(MPI_CURRENT_TYPE),
416 Datatype::encode(MPI_CURRENT_TYPE2)));
418 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
420 TRACE_smpi_comm_out(my_proc_id);
421 log_timed_action (action, clock);
424 static void action_gather(simgrid::xbt::ReplayAction& action)
426 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
429 1) 68 is the sendcounts
430 2) 68 is the recvcounts
431 3) 0 is the root node
432 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
433 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
435 CHECK_ACTION_PARAMS(action, 2, 3)
436 double clock = smpi_process()->simulated_elapsed();
437 unsigned long comm_size = MPI_COMM_WORLD->size();
438 int send_size = parse_double(action[2]);
439 int recv_size = parse_double(action[3]);
440 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
441 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
443 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
444 void *recv = nullptr;
445 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
446 int rank = MPI_COMM_WORLD->rank();
449 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
451 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
452 Datatype::encode(MPI_CURRENT_TYPE),
453 Datatype::encode(MPI_CURRENT_TYPE2)));
455 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
457 TRACE_smpi_comm_out(Actor::self()->getPid());
458 log_timed_action (action, clock);
461 static void action_scatter(simgrid::xbt::ReplayAction& action)
463 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
466 1) 68 is the sendcounts
467 2) 68 is the recvcounts
468 3) 0 is the root node
469 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
470 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
472 CHECK_ACTION_PARAMS(action, 2, 3)
473 double clock = smpi_process()->simulated_elapsed();
474 unsigned long comm_size = MPI_COMM_WORLD->size();
475 int send_size = parse_double(action[2]);
476 int recv_size = parse_double(action[3]);
477 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
478 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
480 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
481 void* recv = nullptr;
482 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
483 int rank = MPI_COMM_WORLD->rank();
486 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
488 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
489 Datatype::encode(MPI_CURRENT_TYPE),
490 Datatype::encode(MPI_CURRENT_TYPE2)));
492 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
494 TRACE_smpi_comm_out(Actor::self()->getPid());
495 log_timed_action(action, clock);
498 static void action_gatherv(simgrid::xbt::ReplayAction& action)
500 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
501 0 gather 68 68 10 10 10 0 0 0
503 1) 68 is the sendcount
504 2) 68 10 10 10 is the recvcounts
505 3) 0 is the root node
506 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
507 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
509 double clock = smpi_process()->simulated_elapsed();
510 unsigned long comm_size = MPI_COMM_WORLD->size();
511 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
512 int send_size = parse_double(action[2]);
513 std::vector<int> disps(comm_size, 0);
514 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
516 MPI_Datatype MPI_CURRENT_TYPE =
517 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
518 MPI_Datatype MPI_CURRENT_TYPE2{
519 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
521 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
522 void *recv = nullptr;
523 for (unsigned int i = 0; i < comm_size; i++) {
524 (*recvcounts)[i] = std::stoi(action[i + 3]);
526 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
528 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
529 int rank = MPI_COMM_WORLD->rank();
532 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
534 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
535 "gatherV", root, send_size, nullptr, -1, recvcounts,
536 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
538 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
541 TRACE_smpi_comm_out(Actor::self()->getPid());
542 log_timed_action (action, clock);
545 static void action_scatterv(simgrid::xbt::ReplayAction& action)
547 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
548 0 gather 68 10 10 10 68 0 0 0
550 1) 68 10 10 10 is the sendcounts
551 2) 68 is the recvcount
552 3) 0 is the root node
553 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
554 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
556 double clock = smpi_process()->simulated_elapsed();
557 unsigned long comm_size = MPI_COMM_WORLD->size();
558 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
559 int recv_size = parse_double(action[2 + comm_size]);
560 std::vector<int> disps(comm_size, 0);
561 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
563 MPI_Datatype MPI_CURRENT_TYPE =
564 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
565 MPI_Datatype MPI_CURRENT_TYPE2{
566 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
568 void* send = nullptr;
569 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
570 for (unsigned int i = 0; i < comm_size; i++) {
571 (*sendcounts)[i] = std::stoi(action[i + 2]);
573 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
575 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
576 int rank = MPI_COMM_WORLD->rank();
579 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
581 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
582 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
583 Datatype::encode(MPI_CURRENT_TYPE2)));
585 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
588 TRACE_smpi_comm_out(Actor::self()->getPid());
589 log_timed_action(action, clock);
592 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
594 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
595 0 reduceScatter 275427 275427 275427 204020 11346849 0
597 1) The first four values after the name of the action declare the recvcounts array
598 2) The value 11346849 is the amount of instructions
599 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
601 double clock = smpi_process()->simulated_elapsed();
602 unsigned long comm_size = MPI_COMM_WORLD->size();
603 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
604 int comp_size = parse_double(action[2+comm_size]);
605 int my_proc_id = Actor::self()->getPid();
606 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
607 MPI_Datatype MPI_CURRENT_TYPE =
608 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
610 for (unsigned int i = 0; i < comm_size; i++) {
611 recvcounts->push_back(std::stoi(action[i + 2]));
613 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
615 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
616 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
617 std::to_string(comp_size), /* ugly hack to print comp_size */
618 Datatype::encode(MPI_CURRENT_TYPE)));
620 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
621 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
623 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
624 smpi_execute_flops(comp_size);
626 TRACE_smpi_comm_out(my_proc_id);
627 log_timed_action (action, clock);
630 static void action_allgather(simgrid::xbt::ReplayAction& action)
632 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
633 0 allGather 275427 275427
635 1) 275427 is the sendcount
636 2) 275427 is the recvcount
637 3) No more values mean that the datatype for sent and receive buffer is the default one, see
638 simgrid::smpi::Datatype::decode().
640 double clock = smpi_process()->simulated_elapsed();
642 CHECK_ACTION_PARAMS(action, 2, 2)
643 int sendcount = std::stoi(action[2]);
644 int recvcount = std::stoi(action[3]);
646 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
647 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
649 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
650 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
652 int my_proc_id = Actor::self()->getPid();
654 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
655 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
656 Datatype::encode(MPI_CURRENT_TYPE),
657 Datatype::encode(MPI_CURRENT_TYPE2)));
659 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
661 TRACE_smpi_comm_out(my_proc_id);
662 log_timed_action (action, clock);
665 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
667 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
668 0 allGatherV 275427 275427 275427 275427 204020
670 1) 275427 is the sendcount
671 2) The next four elements declare the recvcounts array
672 3) No more values mean that the datatype for sent and receive buffer is the default one, see
673 simgrid::smpi::Datatype::decode().
675 double clock = smpi_process()->simulated_elapsed();
677 unsigned long comm_size = MPI_COMM_WORLD->size();
678 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
679 int sendcount = std::stoi(action[2]);
680 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
681 std::vector<int> disps(comm_size, 0);
683 int datatype_index = 0, disp_index = 0;
684 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
685 datatype_index = 3 + comm_size;
686 disp_index = datatype_index + 1;
687 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
689 disp_index = 3 + comm_size;
690 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
691 datatype_index = 3 + comm_size;
694 if (disp_index != 0) {
695 for (unsigned int i = 0; i < comm_size; i++)
696 disps[i] = std::stoi(action[disp_index + i]);
699 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
701 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
704 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
706 for (unsigned int i = 0; i < comm_size; i++) {
707 (*recvcounts)[i] = std::stoi(action[i + 3]);
709 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
710 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
712 int my_proc_id = Actor::self()->getPid();
714 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
715 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
716 Datatype::encode(MPI_CURRENT_TYPE),
717 Datatype::encode(MPI_CURRENT_TYPE2)));
719 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
722 TRACE_smpi_comm_out(my_proc_id);
723 log_timed_action (action, clock);
726 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
728 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
729 0 allToAllV 100 1 7 10 12 100 1 70 10 5
731 1) 100 is the size of the send buffer *sizeof(int),
732 2) 1 7 10 12 is the sendcounts array
733 3) 100*sizeof(int) is the size of the receiver buffer
734 4) 1 70 10 5 is the recvcounts array
736 double clock = smpi_process()->simulated_elapsed();
738 unsigned long comm_size = MPI_COMM_WORLD->size();
739 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
740 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
741 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
742 std::vector<int> senddisps(comm_size, 0);
743 std::vector<int> recvdisps(comm_size, 0);
745 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
746 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
748 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
749 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
752 int send_buf_size=parse_double(action[2]);
753 int recv_buf_size=parse_double(action[3+comm_size]);
754 int my_proc_id = Actor::self()->getPid();
755 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
756 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
758 for (unsigned int i = 0; i < comm_size; i++) {
759 (*sendcounts)[i] = std::stoi(action[3 + i]);
760 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
762 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
763 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
765 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
766 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
767 Datatype::encode(MPI_CURRENT_TYPE),
768 Datatype::encode(MPI_CURRENT_TYPE2)));
770 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
771 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
773 TRACE_smpi_comm_out(my_proc_id);
774 log_timed_action (action, clock);
777 }} // namespace simgrid::smpi
779 /** @brief Only initialize the replay, don't do it for real */
780 void smpi_replay_init(int* argc, char*** argv)
782 simgrid::smpi::Process::init(argc, argv);
783 smpi_process()->mark_as_initialized();
784 smpi_process()->set_replaying(true);
786 int my_proc_id = Actor::self()->getPid();
787 TRACE_smpi_init(my_proc_id);
788 TRACE_smpi_computing_init(my_proc_id);
789 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
790 TRACE_smpi_comm_out(my_proc_id);
791 xbt_replay_action_register("init", simgrid::smpi::action_init);
792 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
793 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
794 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
795 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
796 xbt_replay_action_register("send", simgrid::smpi::action_send);
797 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
798 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
799 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
800 xbt_replay_action_register("test", simgrid::smpi::action_test);
801 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
802 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
803 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
804 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
805 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
806 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
807 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
808 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
809 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
810 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
811 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
812 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
813 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
814 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
815 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
816 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
818 //if we have a delayed start, sleep here.
820 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
821 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
822 smpi_execute_flops(value);
824 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
825 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
826 smpi_execute_flops(0.0);
830 /** @brief actually run the replay after initialization */
831 void smpi_replay_main(int* argc, char*** argv)
833 simgrid::xbt::replay_runner(*argc, *argv);
835 /* and now, finalize everything */
836 /* One active process will stop. Decrease the counter*/
837 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
838 if (not get_reqq_self()->empty()) {
839 unsigned int count_requests=get_reqq_self()->size();
840 MPI_Request requests[count_requests];
841 MPI_Status status[count_requests];
844 for (auto const& req : *get_reqq_self()) {
848 simgrid::smpi::Request::waitall(count_requests, requests, status);
850 delete get_reqq_self();
853 if(active_processes==0){
854 /* Last process alive speaking: end the simulated timer */
855 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
856 smpi_free_replay_tmp_buffers();
859 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
861 smpi_process()->finalize();
863 TRACE_smpi_comm_out(Actor::self()->getPid());
864 TRACE_smpi_finalize(Actor::self()->getPid());
867 /** @brief chain a replay initialization and a replay start */
868 void smpi_replay_run(int* argc, char*** argv)
870 smpi_replay_init(argc, argv);
871 smpi_replay_main(argc, argv);