1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
17 #include <unordered_map>
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 static int communicator_size = 0;
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29 static MPI_Datatype MPI_CURRENT_TYPE;
31 static int sendbuffer_size = 0;
32 static char* sendbuffer = nullptr;
33 static int recvbuffer_size = 0;
34 static char* recvbuffer = nullptr;
36 static void log_timed_action (const char *const *action, double clock){
37 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38 char *name = xbt_str_join_array(action, " ");
39 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
44 static std::vector<MPI_Request>* get_reqq_self()
46 return reqq.at(Actor::self()->getPid());
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
51 reqq.insert({Actor::self()->getPid(), mpi_request});
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
57 if (not smpi_process()->replaying())
58 return xbt_malloc(size);
59 if (sendbuffer_size<size){
60 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68 if (not smpi_process()->replaying())
69 return xbt_malloc(size);
70 if (recvbuffer_size<size){
71 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
77 void smpi_free_tmp_buffer(void* buf){
78 if (not smpi_process()->replaying())
83 static double parse_double(const char *string)
86 double value = strtod(string, &endptr);
88 THROWF(unknown_error, 0, "%s is not a double", string);
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
96 switch(atoi(action)) {
119 return MPI_DEFAULT_TYPE;
124 const char* encode_datatype(MPI_Datatype datatype)
126 if (datatype==MPI_BYTE)
128 if(datatype==MPI_DOUBLE)
130 if(datatype==MPI_INT)
132 if(datatype==MPI_CHAR)
134 if(datatype==MPI_SHORT)
136 if(datatype==MPI_LONG)
138 if(datatype==MPI_FLOAT)
140 // default - not implemented.
141 // do not warn here as we pass in this function even for other trace formats
145 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
147 while(action[i]!=nullptr)\
150 THROWF(arg_error, 0, "%s replay failed.\n" \
151 "%d items were given on the line. First two should be process_id and action. " \
152 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
153 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
159 static void action_init(const char *const *action)
161 XBT_DEBUG("Initialize the counters");
162 CHECK_ACTION_PARAMS(action, 0, 1)
164 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
166 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
168 /* start a simulated timer */
169 smpi_process()->simulated_start();
170 /*initialize the number of active processes */
171 active_processes = smpi_process_count();
173 set_reqq_self(new std::vector<MPI_Request>);
176 static void action_finalize(const char *const *action)
181 static void action_comm_size(const char *const *action)
183 communicator_size = parse_double(action[2]);
184 log_timed_action (action, smpi_process()->simulated_elapsed());
187 static void action_comm_split(const char *const *action)
189 log_timed_action (action, smpi_process()->simulated_elapsed());
192 static void action_comm_dup(const char *const *action)
194 log_timed_action (action, smpi_process()->simulated_elapsed());
197 static void action_compute(const char *const *action)
199 CHECK_ACTION_PARAMS(action, 1, 0)
200 double clock = smpi_process()->simulated_elapsed();
201 double flops= parse_double(action[2]);
202 int my_proc_id = Actor::self()->getPid();
204 TRACE_smpi_computing_in(my_proc_id, flops);
205 smpi_execute_flops(flops);
206 TRACE_smpi_computing_out(my_proc_id);
208 log_timed_action (action, clock);
211 static void action_send(const char *const *action)
213 CHECK_ACTION_PARAMS(action, 2, 1)
214 int to = atoi(action[2]);
215 double size=parse_double(action[3]);
216 double clock = smpi_process()->simulated_elapsed();
218 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
220 int my_proc_id = Actor::self()->getPid();
221 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
223 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
224 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
225 if (not TRACE_smpi_view_internals())
226 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
228 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
230 TRACE_smpi_comm_out(my_proc_id);
232 log_timed_action(action, clock);
235 static void action_Isend(const char *const *action)
237 CHECK_ACTION_PARAMS(action, 2, 1)
238 int to = atoi(action[2]);
239 double size=parse_double(action[3]);
240 double clock = smpi_process()->simulated_elapsed();
242 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
244 int my_proc_id = Actor::self()->getPid();
245 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
246 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
247 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
248 if (not TRACE_smpi_view_internals())
249 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
251 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
253 TRACE_smpi_comm_out(my_proc_id);
255 get_reqq_self()->push_back(request);
257 log_timed_action (action, clock);
260 static void action_recv(const char *const *action) {
261 CHECK_ACTION_PARAMS(action, 2, 1)
262 int from = atoi(action[2]);
263 double size=parse_double(action[3]);
264 double clock = smpi_process()->simulated_elapsed();
267 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
269 int my_proc_id = Actor::self()->getPid();
270 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
272 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
273 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
275 //unknown size from the receiver point of view
277 Request::probe(from, 0, MPI_COMM_WORLD, &status);
281 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
283 TRACE_smpi_comm_out(my_proc_id);
284 if (not TRACE_smpi_view_internals()) {
285 TRACE_smpi_recv(src_traced, my_proc_id, 0);
288 log_timed_action (action, clock);
291 static void action_Irecv(const char *const *action)
293 CHECK_ACTION_PARAMS(action, 2, 1)
294 int from = atoi(action[2]);
295 double size=parse_double(action[3]);
296 double clock = smpi_process()->simulated_elapsed();
298 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
300 int my_proc_id = Actor::self()->getPid();
301 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
302 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
304 //unknow size from the receiver pov
306 Request::probe(from, 0, MPI_COMM_WORLD, &status);
310 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
312 TRACE_smpi_comm_out(my_proc_id);
313 get_reqq_self()->push_back(request);
315 log_timed_action (action, clock);
318 static void action_test(const char* const* action)
320 CHECK_ACTION_PARAMS(action, 0, 0)
321 double clock = smpi_process()->simulated_elapsed();
324 MPI_Request request = get_reqq_self()->back();
325 get_reqq_self()->pop_back();
326 //if request is null here, this may mean that a previous test has succeeded
327 //Different times in traced application and replayed version may lead to this
328 //In this case, ignore the extra calls.
329 if(request!=nullptr){
330 int my_proc_id = Actor::self()->getPid();
331 TRACE_smpi_testing_in(my_proc_id);
333 int flag = Request::test(&request, &status);
335 XBT_DEBUG("MPI_Test result: %d", flag);
336 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
337 get_reqq_self()->push_back(request);
339 TRACE_smpi_testing_out(my_proc_id);
341 log_timed_action (action, clock);
344 static void action_wait(const char *const *action){
345 CHECK_ACTION_PARAMS(action, 0, 0)
346 double clock = smpi_process()->simulated_elapsed();
349 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
350 xbt_str_join_array(action," "));
351 MPI_Request request = get_reqq_self()->back();
352 get_reqq_self()->pop_back();
354 if (request==nullptr){
355 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
359 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
361 MPI_Group group = request->comm()->group();
362 int src_traced = group->rank(request->src());
363 int dst_traced = group->rank(request->dst());
364 int is_wait_for_receive = (request->flags() & RECV);
365 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
367 Request::wait(&request, &status);
369 TRACE_smpi_comm_out(rank);
370 if (is_wait_for_receive)
371 TRACE_smpi_recv(src_traced, dst_traced, 0);
372 log_timed_action (action, clock);
375 static void action_waitall(const char *const *action){
376 CHECK_ACTION_PARAMS(action, 0, 0)
377 double clock = smpi_process()->simulated_elapsed();
378 const unsigned int count_requests = get_reqq_self()->size();
380 if (count_requests>0) {
381 MPI_Status status[count_requests];
383 int my_proc_id_traced = Actor::self()->getPid();
384 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
385 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
386 int recvs_snd[count_requests];
387 int recvs_rcv[count_requests];
388 for (unsigned int i = 0; i < count_requests; i++) {
389 const auto& req = (*get_reqq_self())[i];
390 if (req && (req->flags() & RECV)) {
391 recvs_snd[i] = req->src();
392 recvs_rcv[i] = req->dst();
396 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
398 for (unsigned i = 0; i < count_requests; i++) {
399 if (recvs_snd[i]!=-100)
400 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
402 TRACE_smpi_comm_out(my_proc_id_traced);
404 log_timed_action (action, clock);
407 static void action_barrier(const char *const *action){
408 double clock = smpi_process()->simulated_elapsed();
409 int my_proc_id = Actor::self()->getPid();
410 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
412 Colls::barrier(MPI_COMM_WORLD);
414 TRACE_smpi_comm_out(my_proc_id);
415 log_timed_action (action, clock);
418 static void action_bcast(const char *const *action)
420 CHECK_ACTION_PARAMS(action, 1, 2)
421 double size = parse_double(action[2]);
422 double clock = smpi_process()->simulated_elapsed();
423 int root = (action[3]) ? atoi(action[3]) : 0;
424 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
425 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
427 int my_proc_id = Actor::self()->getPid();
428 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
430 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
432 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
434 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
436 TRACE_smpi_comm_out(my_proc_id);
437 log_timed_action (action, clock);
440 static void action_reduce(const char *const *action)
442 CHECK_ACTION_PARAMS(action, 2, 2)
443 double comm_size = parse_double(action[2]);
444 double comp_size = parse_double(action[3]);
445 double clock = smpi_process()->simulated_elapsed();
446 int root = (action[4]) ? atoi(action[4]) : 0;
448 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
450 int my_proc_id = Actor::self()->getPid();
451 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
452 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
453 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
455 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
458 smpi_execute_flops(comp_size);
460 TRACE_smpi_comm_out(my_proc_id);
461 log_timed_action (action, clock);
464 static void action_allReduce(const char *const *action) {
465 CHECK_ACTION_PARAMS(action, 2, 1)
466 double comm_size = parse_double(action[2]);
467 double comp_size = parse_double(action[3]);
469 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
471 double clock = smpi_process()->simulated_elapsed();
472 int my_proc_id = Actor::self()->getPid();
473 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
474 encode_datatype(MPI_CURRENT_TYPE), ""));
476 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
479 smpi_execute_flops(comp_size);
481 TRACE_smpi_comm_out(my_proc_id);
482 log_timed_action (action, clock);
485 static void action_allToAll(const char *const *action) {
486 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
487 double clock = smpi_process()->simulated_elapsed();
488 int comm_size = MPI_COMM_WORLD->size();
489 int send_size = parse_double(action[2]);
490 int recv_size = parse_double(action[3]);
491 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
492 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
494 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
495 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
497 int my_proc_id = Actor::self()->getPid();
498 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
499 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
500 encode_datatype(MPI_CURRENT_TYPE),
501 encode_datatype(MPI_CURRENT_TYPE2)));
503 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
505 TRACE_smpi_comm_out(my_proc_id);
506 log_timed_action (action, clock);
509 static void action_gather(const char *const *action) {
510 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
513 1) 68 is the sendcounts
514 2) 68 is the recvcounts
515 3) 0 is the root node
516 4) 0 is the send datatype id, see decode_datatype()
517 5) 0 is the recv datatype id, see decode_datatype()
519 CHECK_ACTION_PARAMS(action, 2, 3)
520 double clock = smpi_process()->simulated_elapsed();
521 int comm_size = MPI_COMM_WORLD->size();
522 int send_size = parse_double(action[2]);
523 int recv_size = parse_double(action[3]);
524 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
525 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
527 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528 void *recv = nullptr;
529 int root = (action[4]) ? atoi(action[4]) : 0;
530 int rank = MPI_COMM_WORLD->rank();
533 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
535 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
536 encode_datatype(MPI_CURRENT_TYPE),
537 encode_datatype(MPI_CURRENT_TYPE2)));
539 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
541 TRACE_smpi_comm_out(Actor::self()->getPid());
542 log_timed_action (action, clock);
545 static void action_scatter(const char* const* action)
547 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
550 1) 68 is the sendcounts
551 2) 68 is the recvcounts
552 3) 0 is the root node
553 4) 0 is the send datatype id, see decode_datatype()
554 5) 0 is the recv datatype id, see decode_datatype()
556 CHECK_ACTION_PARAMS(action, 2, 3)
557 double clock = smpi_process()->simulated_elapsed();
558 int comm_size = MPI_COMM_WORLD->size();
559 int send_size = parse_double(action[2]);
560 int recv_size = parse_double(action[3]);
561 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
562 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
564 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
565 void* recv = nullptr;
566 int root = (action[4]) ? atoi(action[4]) : 0;
567 int rank = MPI_COMM_WORLD->rank();
570 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
572 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573 encode_datatype(MPI_CURRENT_TYPE),
574 encode_datatype(MPI_CURRENT_TYPE2)));
576 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
578 TRACE_smpi_comm_out(Actor::self()->getPid());
579 log_timed_action(action, clock);
582 static void action_gatherv(const char *const *action) {
583 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
584 0 gather 68 68 10 10 10 0 0 0
586 1) 68 is the sendcount
587 2) 68 10 10 10 is the recvcounts
588 3) 0 is the root node
589 4) 0 is the send datatype id, see decode_datatype()
590 5) 0 is the recv datatype id, see decode_datatype()
592 double clock = smpi_process()->simulated_elapsed();
593 int comm_size = MPI_COMM_WORLD->size();
594 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
595 int send_size = parse_double(action[2]);
596 std::vector<int> disps(comm_size, 0);
597 int recvcounts[comm_size];
601 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
602 MPI_Datatype MPI_CURRENT_TYPE2{
603 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
605 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
606 void *recv = nullptr;
607 for(int i=0;i<comm_size;i++) {
608 recvcounts[i] = atoi(action[i+3]);
609 recv_sum=recv_sum+recvcounts[i];
612 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
613 int rank = MPI_COMM_WORLD->rank();
616 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
618 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
620 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
621 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
622 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
624 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps.data(), MPI_CURRENT_TYPE2, root,
627 TRACE_smpi_comm_out(Actor::self()->getPid());
628 log_timed_action (action, clock);
631 static void action_scatterv(const char* const* action)
633 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
634 0 gather 68 10 10 10 68 0 0 0
636 1) 68 10 10 10 is the sendcounts
637 2) 68 is the recvcount
638 3) 0 is the root node
639 4) 0 is the send datatype id, see decode_datatype()
640 5) 0 is the recv datatype id, see decode_datatype()
642 double clock = smpi_process()->simulated_elapsed();
643 int comm_size = MPI_COMM_WORLD->size();
644 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
645 int recv_size = parse_double(action[2 + comm_size]);
646 std::vector<int> disps(comm_size, 0);
647 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
650 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
651 MPI_Datatype MPI_CURRENT_TYPE2{
652 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
654 void* send = nullptr;
655 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
656 for (int i = 0; i < comm_size; i++) {
657 (*sendcounts)[i] = atoi(action[i + 2]);
659 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
661 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
662 int rank = MPI_COMM_WORLD->rank();
665 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
667 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
668 nullptr, encode_datatype(MPI_CURRENT_TYPE),
669 encode_datatype(MPI_CURRENT_TYPE2)));
671 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
674 TRACE_smpi_comm_out(Actor::self()->getPid());
675 log_timed_action(action, clock);
678 static void action_reducescatter(const char *const *action) {
679 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
680 0 reduceScatter 275427 275427 275427 204020 11346849 0
682 1) The first four values after the name of the action declare the recvcounts array
683 2) The value 11346849 is the amount of instructions
684 3) The last value corresponds to the datatype, see decode_datatype().
686 double clock = smpi_process()->simulated_elapsed();
687 int comm_size = MPI_COMM_WORLD->size();
688 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
689 int comp_size = parse_double(action[2+comm_size]);
690 int my_proc_id = Actor::self()->getPid();
691 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
692 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
694 for(int i=0;i<comm_size;i++) {
695 recvcounts->push_back(atoi(action[i + 2]));
697 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
699 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
700 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
701 std::to_string(comp_size), /* ugly hack to print comp_size */
702 encode_datatype(MPI_CURRENT_TYPE)));
704 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
705 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
707 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
708 smpi_execute_flops(comp_size);
710 TRACE_smpi_comm_out(my_proc_id);
711 log_timed_action (action, clock);
714 static void action_allgather(const char *const *action) {
715 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
716 0 allGather 275427 275427
718 1) 275427 is the sendcount
719 2) 275427 is the recvcount
720 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
722 double clock = smpi_process()->simulated_elapsed();
724 CHECK_ACTION_PARAMS(action, 2, 2)
725 int sendcount=atoi(action[2]);
726 int recvcount=atoi(action[3]);
728 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
729 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
731 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
732 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
734 int my_proc_id = Actor::self()->getPid();
736 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
737 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
738 encode_datatype(MPI_CURRENT_TYPE),
739 encode_datatype(MPI_CURRENT_TYPE2)));
741 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
743 TRACE_smpi_comm_out(my_proc_id);
744 log_timed_action (action, clock);
747 static void action_allgatherv(const char *const *action) {
748 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
749 0 allGatherV 275427 275427 275427 275427 204020
751 1) 275427 is the sendcount
752 2) The next four elements declare the recvcounts array
753 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
755 double clock = smpi_process()->simulated_elapsed();
757 int comm_size = MPI_COMM_WORLD->size();
758 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
759 int sendcount=atoi(action[2]);
760 int recvcounts[comm_size];
761 std::vector<int> disps(comm_size, 0);
765 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
766 MPI_Datatype MPI_CURRENT_TYPE2{
767 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
769 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
771 for(int i=0;i<comm_size;i++) {
772 recvcounts[i] = atoi(action[i+3]);
773 recv_sum=recv_sum+recvcounts[i];
775 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
777 int my_proc_id = Actor::self()->getPid();
779 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
781 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
782 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
783 encode_datatype(MPI_CURRENT_TYPE),
784 encode_datatype(MPI_CURRENT_TYPE2)));
786 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
789 TRACE_smpi_comm_out(my_proc_id);
790 log_timed_action (action, clock);
793 static void action_allToAllv(const char *const *action) {
794 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
795 0 allToAllV 100 1 7 10 12 100 1 70 10 5
797 1) 100 is the size of the send buffer *sizeof(int),
798 2) 1 7 10 12 is the sendcounts array
799 3) 100*sizeof(int) is the size of the receiver buffer
800 4) 1 70 10 5 is the recvcounts array
802 double clock = smpi_process()->simulated_elapsed();
804 int comm_size = MPI_COMM_WORLD->size();
805 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
806 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
807 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
808 std::vector<int> senddisps(comm_size, 0);
809 std::vector<int> recvdisps(comm_size, 0);
811 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
812 ? decode_datatype(action[4 + 2 * comm_size])
814 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
815 ? decode_datatype(action[5 + 2 * comm_size])
818 int send_buf_size=parse_double(action[2]);
819 int recv_buf_size=parse_double(action[3+comm_size]);
820 int my_proc_id = Actor::self()->getPid();
821 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
822 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
824 for(int i=0;i<comm_size;i++) {
825 (*sendcounts)[i] = atoi(action[3 + i]);
826 (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
828 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
829 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
831 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
832 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
833 encode_datatype(MPI_CURRENT_TYPE),
834 encode_datatype(MPI_CURRENT_TYPE2)));
836 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
837 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
839 TRACE_smpi_comm_out(my_proc_id);
840 log_timed_action (action, clock);
843 }} // namespace simgrid::smpi
845 /** @brief Only initialize the replay, don't do it for real */
846 void smpi_replay_init(int* argc, char*** argv)
848 simgrid::smpi::Process::init(argc, argv);
849 smpi_process()->mark_as_initialized();
850 smpi_process()->set_replaying(true);
852 int my_proc_id = Actor::self()->getPid();
853 TRACE_smpi_init(my_proc_id);
854 TRACE_smpi_computing_init(my_proc_id);
855 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
856 TRACE_smpi_comm_out(my_proc_id);
857 xbt_replay_action_register("init", simgrid::smpi::action_init);
858 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
859 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
860 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
861 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
862 xbt_replay_action_register("send", simgrid::smpi::action_send);
863 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
864 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
865 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
866 xbt_replay_action_register("test", simgrid::smpi::action_test);
867 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
868 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
869 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
870 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
871 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
872 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
873 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
874 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
875 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
876 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
877 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
878 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
879 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
880 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
881 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
882 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
884 //if we have a delayed start, sleep here.
886 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
887 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
888 smpi_execute_flops(value);
890 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
891 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
892 smpi_execute_flops(0.0);
896 /** @brief actually run the replay after initialization */
897 void smpi_replay_main(int* argc, char*** argv)
899 simgrid::xbt::replay_runner(*argc, *argv);
901 /* and now, finalize everything */
902 /* One active process will stop. Decrease the counter*/
903 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
904 if (not get_reqq_self()->empty()) {
905 unsigned int count_requests=get_reqq_self()->size();
906 MPI_Request requests[count_requests];
907 MPI_Status status[count_requests];
910 for (auto const& req : *get_reqq_self()) {
914 simgrid::smpi::Request::waitall(count_requests, requests, status);
916 delete get_reqq_self();
919 if(active_processes==0){
920 /* Last process alive speaking: end the simulated timer */
921 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
922 xbt_free(sendbuffer);
923 xbt_free(recvbuffer);
926 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
928 smpi_process()->finalize();
930 TRACE_smpi_comm_out(Actor::self()->getPid());
931 TRACE_smpi_finalize(Actor::self()->getPid());
934 /** @brief chain a replay initialization and a replay start */
935 void smpi_replay_run(int* argc, char*** argv)
937 smpi_replay_init(argc, argv);
938 smpi_replay_main(argc, argv);