X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/1576bb1d7806d7c5c0bc14271cb1539618bf8e19..a40d504b595655bff5c4e433bf929091a155aef7:/src/smpi/internals/smpi_replay.cpp diff --git a/src/smpi/internals/smpi_replay.cpp b/src/smpi/internals/smpi_replay.cpp index 2f6522d86e..beb0191d9b 100644 --- a/src/smpi/internals/smpi_replay.cpp +++ b/src/smpi/internals/smpi_replay.cpp @@ -39,10 +39,11 @@ static MPI_Datatype MPI_DEFAULT_TYPE; static_cast(optional)); \ } -static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){ +static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock) +{ if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){ std::string s = boost::algorithm::join(action, " "); - XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock); + XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock); } } @@ -62,23 +63,175 @@ static double parse_double(std::string string) return xbt_str_parse_double(string.c_str(), "%s is not a double"); } +namespace simgrid { +namespace smpi { -//TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable. -static MPI_Datatype decode_datatype(std::string action) -{ - return simgrid::smpi::Datatype::decode(const_cast(action.c_str())); -} +namespace Replay { +class ActionArgParser { +public: + virtual void parse(simgrid::xbt::ReplayAction& action){}; +}; + +class SendRecvParser : public ActionArgParser { +public: + /* communication partner; if we send, this is the receiver and vice versa */ + int partner; + double size; + MPI_Datatype datatype1 = MPI_DEFAULT_TYPE; + + void parse(simgrid::xbt::ReplayAction& action) override + { + CHECK_ACTION_PARAMS(action, 2, 1) + partner = std::stoi(action[2]); + size = parse_double(action[3]); + if (action.size() > 4) + datatype1 = simgrid::smpi::Datatype::decode(action[4]); + } +}; -const char* encode_datatype(MPI_Datatype datatype) -{ - if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */ - return "-1"; +class ComputeParser : public ActionArgParser { +public: + /* communication partner; if we send, this is the receiver and vice versa */ + double flops; - return datatype->encode(); -} + void parse(simgrid::xbt::ReplayAction& action) override + { + CHECK_ACTION_PARAMS(action, 1, 0) + flops = parse_double(action[2]); + } +}; -namespace simgrid { -namespace smpi { +template class ReplayAction { +protected: + const std::string name; + T args; + + int my_proc_id; + +public: + explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) + { + } + + virtual void execute(simgrid::xbt::ReplayAction& action) + { + // Needs to be re-initialized for every action, hence here + double start_time = smpi_process()->simulated_elapsed(); + args.parse(action); + kernel(action); + log_timed_action(action, start_time); + } + + virtual void kernel(simgrid::xbt::ReplayAction& action) = 0; +}; + +class WaitAction : public ReplayAction { +public: + WaitAction() : ReplayAction("Wait") {} + void kernel(simgrid::xbt::ReplayAction& action) override + { + CHECK_ACTION_PARAMS(action, 0, 0) + MPI_Status status; + + std::string s = boost::algorithm::join(action, " "); + xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str()); + MPI_Request request = get_reqq_self()->back(); + get_reqq_self()->pop_back(); + + if (request == nullptr) { + /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just + * return.*/ + return; + } + + int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1; + + MPI_Group group = request->comm()->group(); + int src_traced = group->rank(request->src()); + int dst_traced = group->rank(request->dst()); + bool is_wait_for_receive = (request->flags() & RECV); + // TODO: Here we take the rank while we normally take the process id (look for my_proc_id) + TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait")); + + Request::wait(&request, &status); + + TRACE_smpi_comm_out(rank); + if (is_wait_for_receive) + TRACE_smpi_recv(src_traced, dst_traced, 0); + } +}; + +class SendAction : public ReplayAction { +public: + SendAction() = delete; + SendAction(std::string name) : ReplayAction(name) {} + void kernel(simgrid::xbt::ReplayAction& action) override + { + int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid(); + + TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size, + Datatype::encode(args.datatype1))); + if (not TRACE_smpi_view_internals()) + TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size()); + + if (name == "send") { + Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD); + } else if (name == "Isend") { + MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD); + get_reqq_self()->push_back(request); + } else { + xbt_die("Don't know this action, %s", name.c_str()); + } + + TRACE_smpi_comm_out(my_proc_id); + } +}; + +class RecvAction : public ReplayAction { +public: + RecvAction() = delete; + explicit RecvAction(std::string name) : ReplayAction(name) {} + void kernel(simgrid::xbt::ReplayAction& action) override + { + int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid(); + + TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size, + Datatype::encode(args.datatype1))); + + MPI_Status status; + // unknown size from the receiver point of view + if (args.size <= 0.0) { + Request::probe(args.partner, 0, MPI_COMM_WORLD, &status); + args.size = status.count; + } + + if (name == "recv") { + Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status); + } else if (name == "Irecv") { + MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD); + get_reqq_self()->push_back(request); + } + + TRACE_smpi_comm_out(my_proc_id); + // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case + if (name == "recv" && not TRACE_smpi_view_internals()) { + TRACE_smpi_recv(src_traced, my_proc_id, 0); + } + } +}; + +class ComputeAction : public ReplayAction { +public: + ComputeAction() : ReplayAction("compute") {} + void kernel(simgrid::xbt::ReplayAction& action) override + { + TRACE_smpi_computing_in(my_proc_id, args.flops); + smpi_execute_flops(args.flops); + TRACE_smpi_computing_out(my_proc_id); + } +}; + +} // Replay Namespace static void action_init(simgrid::xbt::ReplayAction& action) { @@ -120,124 +273,7 @@ static void action_comm_dup(simgrid::xbt::ReplayAction& action) static void action_compute(simgrid::xbt::ReplayAction& action) { - CHECK_ACTION_PARAMS(action, 1, 0) - double clock = smpi_process()->simulated_elapsed(); - double flops= parse_double(action[2]); - int my_proc_id = Actor::self()->getPid(); - - TRACE_smpi_computing_in(my_proc_id, flops); - smpi_execute_flops(flops); - TRACE_smpi_computing_out(my_proc_id); - - log_timed_action (action, clock); -} - -static void action_send(simgrid::xbt::ReplayAction& action) -{ - CHECK_ACTION_PARAMS(action, 2, 1) - int to = std::stoi(action[2]); - double size=parse_double(action[3]); - double clock = smpi_process()->simulated_elapsed(); - - MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE; - - int my_proc_id = Actor::self()->getPid(); - int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid(); - - TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, - new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode())); - if (not TRACE_smpi_view_internals()) - TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size()); - - Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD); - - TRACE_smpi_comm_out(my_proc_id); - - log_timed_action(action, clock); -} - -static void action_Isend(simgrid::xbt::ReplayAction& action) -{ - CHECK_ACTION_PARAMS(action, 2, 1) - int to = std::stoi(action[2]); - double size=parse_double(action[3]); - double clock = smpi_process()->simulated_elapsed(); - - MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE; - - int my_proc_id = Actor::self()->getPid(); - int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid(); - TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, - new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode())); - if (not TRACE_smpi_view_internals()) - TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size()); - - MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD); - - TRACE_smpi_comm_out(my_proc_id); - - get_reqq_self()->push_back(request); - - log_timed_action (action, clock); -} - -static void action_recv(simgrid::xbt::ReplayAction& action) -{ - CHECK_ACTION_PARAMS(action, 2, 1) - int from = std::stoi(action[2]); - double size=parse_double(action[3]); - double clock = smpi_process()->simulated_elapsed(); - MPI_Status status; - - MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE; - - int my_proc_id = Actor::self()->getPid(); - int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid(); - - TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, - new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode())); - - //unknown size from the receiver point of view - if (size <= 0.0) { - Request::probe(from, 0, MPI_COMM_WORLD, &status); - size=status.count; - } - - Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status); - - TRACE_smpi_comm_out(my_proc_id); - if (not TRACE_smpi_view_internals()) { - TRACE_smpi_recv(src_traced, my_proc_id, 0); - } - - log_timed_action (action, clock); -} - -static void action_Irecv(simgrid::xbt::ReplayAction& action) -{ - CHECK_ACTION_PARAMS(action, 2, 1) - int from = std::stoi(action[2]); - double size=parse_double(action[3]); - double clock = smpi_process()->simulated_elapsed(); - - MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE; - - int my_proc_id = Actor::self()->getPid(); - TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, - new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode())); - MPI_Status status; - //unknow size from the receiver pov - if (size <= 0.0) { - Request::probe(from, 0, MPI_COMM_WORLD, &status); - size = status.count; - } - - MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD); - - TRACE_smpi_comm_out(my_proc_id); - get_reqq_self()->push_back(request); - - log_timed_action (action, clock); + Replay::ComputeAction().execute(action); } static void action_test(simgrid::xbt::ReplayAction& action) @@ -266,38 +302,6 @@ static void action_test(simgrid::xbt::ReplayAction& action) log_timed_action (action, clock); } -static void action_wait(simgrid::xbt::ReplayAction& action) -{ - CHECK_ACTION_PARAMS(action, 0, 0) - double clock = smpi_process()->simulated_elapsed(); - MPI_Status status; - - std::string s = boost::algorithm::join(action, " "); - xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str()); - MPI_Request request = get_reqq_self()->back(); - get_reqq_self()->pop_back(); - - if (request==nullptr){ - /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/ - return; - } - - int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1; - - MPI_Group group = request->comm()->group(); - int src_traced = group->rank(request->src()); - int dst_traced = group->rank(request->dst()); - int is_wait_for_receive = (request->flags() & RECV); - TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait")); - - Request::wait(&request, &status); - - TRACE_smpi_comm_out(rank); - if (is_wait_for_receive) - TRACE_smpi_recv(src_traced, dst_traced, 0); - log_timed_action (action, clock); -} - static void action_waitall(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) @@ -350,12 +354,12 @@ static void action_bcast(simgrid::xbt::ReplayAction& action) double clock = smpi_process()->simulated_elapsed(); int root = (action.size() > 3) ? std::stoi(action[3]) : 0; /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */ - MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE; + MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE; int my_proc_id = Actor::self()->getPid(); TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size, - -1, MPI_CURRENT_TYPE->encode(), "")); + -1, Datatype::encode(MPI_CURRENT_TYPE), "")); void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size()); @@ -373,12 +377,12 @@ static void action_reduce(simgrid::xbt::ReplayAction& action) double clock = smpi_process()->simulated_elapsed(); int root = (action.size() > 4) ? std::stoi(action[4]) : 0; - MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE; + MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE; int my_proc_id = Actor::self()->getPid(); TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size, - comm_size, -1, MPI_CURRENT_TYPE->encode(), "")); + comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), "")); void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size()); void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size()); @@ -395,12 +399,12 @@ static void action_allReduce(simgrid::xbt::ReplayAction& action) double comm_size = parse_double(action[2]); double comp_size = parse_double(action[3]); - MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE; + MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE; double clock = smpi_process()->simulated_elapsed(); int my_proc_id = Actor::self()->getPid(); TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1, - MPI_CURRENT_TYPE->encode(), "")); + Datatype::encode(MPI_CURRENT_TYPE), "")); void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size()); void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size()); @@ -418,8 +422,8 @@ static void action_allToAll(simgrid::xbt::ReplayAction& action) unsigned long comm_size = MPI_COMM_WORLD->size(); int send_size = parse_double(action[2]); int recv_size = parse_double(action[3]); - MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE}; - MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE}; void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size()); void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size()); @@ -427,7 +431,8 @@ static void action_allToAll(simgrid::xbt::ReplayAction& action) int my_proc_id = Actor::self()->getPid(); TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size, - MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode())); + Datatype::encode(MPI_CURRENT_TYPE), + Datatype::encode(MPI_CURRENT_TYPE2))); Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD); @@ -443,16 +448,16 @@ static void action_gather(simgrid::xbt::ReplayAction& action) 1) 68 is the sendcounts 2) 68 is the recvcounts 3) 0 is the root node - 4) 0 is the send datatype id, see decode_datatype() - 5) 0 is the recv datatype id, see decode_datatype() + 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode() + 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode() */ CHECK_ACTION_PARAMS(action, 2, 3) double clock = smpi_process()->simulated_elapsed(); unsigned long comm_size = MPI_COMM_WORLD->size(); int send_size = parse_double(action[2]); int recv_size = parse_double(action[3]); - MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE}; - MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE}; void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size()); void *recv = nullptr; @@ -462,9 +467,9 @@ static void action_gather(simgrid::xbt::ReplayAction& action) if(rank==root) recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size()); - TRACE_smpi_comm_in(rank, __FUNCTION__, - new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size, - MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode())); + TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size, + Datatype::encode(MPI_CURRENT_TYPE), + Datatype::encode(MPI_CURRENT_TYPE2))); Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD); @@ -480,16 +485,16 @@ static void action_scatter(simgrid::xbt::ReplayAction& action) 1) 68 is the sendcounts 2) 68 is the recvcounts 3) 0 is the root node - 4) 0 is the send datatype id, see decode_datatype() - 5) 0 is the recv datatype id, see decode_datatype() + 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode() + 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode() */ CHECK_ACTION_PARAMS(action, 2, 3) double clock = smpi_process()->simulated_elapsed(); unsigned long comm_size = MPI_COMM_WORLD->size(); int send_size = parse_double(action[2]); int recv_size = parse_double(action[3]); - MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE}; - MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE}; void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size()); void* recv = nullptr; @@ -499,9 +504,9 @@ static void action_scatter(simgrid::xbt::ReplayAction& action) if (rank == root) recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size()); - TRACE_smpi_comm_in(rank, __FUNCTION__, - new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size, - MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode())); + TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size, + Datatype::encode(MPI_CURRENT_TYPE), + Datatype::encode(MPI_CURRENT_TYPE2))); Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD); @@ -517,8 +522,8 @@ static void action_gatherv(simgrid::xbt::ReplayAction& action) 1) 68 is the sendcount 2) 68 10 10 10 is the recvcounts 3) 0 is the root node - 4) 0 is the send datatype id, see decode_datatype() - 5) 0 is the recv datatype id, see decode_datatype() + 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode() + 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode() */ double clock = smpi_process()->simulated_elapsed(); unsigned long comm_size = MPI_COMM_WORLD->size(); @@ -528,9 +533,9 @@ static void action_gatherv(simgrid::xbt::ReplayAction& action) std::shared_ptr> recvcounts(new std::vector(comm_size)); MPI_Datatype MPI_CURRENT_TYPE = - (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE; - MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size]) - : MPI_DEFAULT_TYPE}; + (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE; + MPI_Datatype MPI_CURRENT_TYPE2{ + (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE}; void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size()); void *recv = nullptr; @@ -545,9 +550,9 @@ static void action_gatherv(simgrid::xbt::ReplayAction& action) if(rank==root) recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size()); - TRACE_smpi_comm_in(rank, __FUNCTION__, - new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts, - MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode())); + TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData( + "gatherV", root, send_size, nullptr, -1, recvcounts, + Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2))); Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD); @@ -564,8 +569,8 @@ static void action_scatterv(simgrid::xbt::ReplayAction& action) 1) 68 10 10 10 is the sendcounts 2) 68 is the recvcount 3) 0 is the root node - 4) 0 is the send datatype id, see decode_datatype() - 5) 0 is the recv datatype id, see decode_datatype() + 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode() + 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode() */ double clock = smpi_process()->simulated_elapsed(); unsigned long comm_size = MPI_COMM_WORLD->size(); @@ -575,9 +580,9 @@ static void action_scatterv(simgrid::xbt::ReplayAction& action) std::shared_ptr> sendcounts(new std::vector(comm_size)); MPI_Datatype MPI_CURRENT_TYPE = - (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE; - MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size]) - : MPI_DEFAULT_TYPE}; + (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE; + MPI_Datatype MPI_CURRENT_TYPE2{ + (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE}; void* send = nullptr; void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size()); @@ -592,9 +597,9 @@ static void action_scatterv(simgrid::xbt::ReplayAction& action) if (rank == root) send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size()); - TRACE_smpi_comm_in(rank, __FUNCTION__, - new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr, - MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode())); + TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, + nullptr, Datatype::encode(MPI_CURRENT_TYPE), + Datatype::encode(MPI_CURRENT_TYPE2))); Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD); @@ -610,7 +615,7 @@ static void action_reducescatter(simgrid::xbt::ReplayAction& action) where: 1) The first four values after the name of the action declare the recvcounts array 2) The value 11346849 is the amount of instructions - 3) The last value corresponds to the datatype, see decode_datatype(). + 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode(). */ double clock = smpi_process()->simulated_elapsed(); unsigned long comm_size = MPI_COMM_WORLD->size(); @@ -619,7 +624,7 @@ static void action_reducescatter(simgrid::xbt::ReplayAction& action) int my_proc_id = Actor::self()->getPid(); std::shared_ptr> recvcounts(new std::vector); MPI_Datatype MPI_CURRENT_TYPE = - (action.size() > 3 + comm_size) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE; + (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE; for (unsigned int i = 0; i < comm_size; i++) { recvcounts->push_back(std::stoi(action[i + 2])); @@ -629,7 +634,7 @@ static void action_reducescatter(simgrid::xbt::ReplayAction& action) TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts, std::to_string(comp_size), /* ugly hack to print comp_size */ - MPI_CURRENT_TYPE->encode())); + Datatype::encode(MPI_CURRENT_TYPE))); void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size()); void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size()); @@ -648,7 +653,8 @@ static void action_allgather(simgrid::xbt::ReplayAction& action) where: 1) 275427 is the sendcount 2) 275427 is the recvcount - 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype(). + 3) No more values mean that the datatype for sent and receive buffer is the default one, see + simgrid::smpi::Datatype::decode(). */ double clock = smpi_process()->simulated_elapsed(); @@ -656,8 +662,8 @@ static void action_allgather(simgrid::xbt::ReplayAction& action) int sendcount = std::stoi(action[2]); int recvcount = std::stoi(action[3]); - MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE}; - MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE}; void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size()); void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size()); @@ -666,7 +672,8 @@ static void action_allgather(simgrid::xbt::ReplayAction& action) TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount, - MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode())); + Datatype::encode(MPI_CURRENT_TYPE), + Datatype::encode(MPI_CURRENT_TYPE2))); Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD); @@ -681,7 +688,8 @@ static void action_allgatherv(simgrid::xbt::ReplayAction& action) where: 1) 275427 is the sendcount 2) The next four elements declare the recvcounts array - 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype(). + 3) No more values mean that the datatype for sent and receive buffer is the default one, see + simgrid::smpi::Datatype::decode(). */ double clock = smpi_process()->simulated_elapsed(); @@ -707,8 +715,10 @@ static void action_allgatherv(simgrid::xbt::ReplayAction& action) disps[i] = std::stoi(action[disp_index + i]); } - MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE}; - MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index]) + : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index]) + : MPI_DEFAULT_TYPE}; void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size()); @@ -722,7 +732,8 @@ static void action_allgatherv(simgrid::xbt::ReplayAction& action) TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts, - MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode())); + Datatype::encode(MPI_CURRENT_TYPE), + Datatype::encode(MPI_CURRENT_TYPE2))); Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, MPI_COMM_WORLD); @@ -750,10 +761,12 @@ static void action_allToAllv(simgrid::xbt::ReplayAction& action) std::vector senddisps(comm_size, 0); std::vector recvdisps(comm_size, 0); - MPI_Datatype MPI_CURRENT_TYPE = - (action.size() > 5 + 2 * comm_size) ? decode_datatype(action[4 + 2 * comm_size]) : MPI_DEFAULT_TYPE; - MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size) ? decode_datatype(action[5 + 2 * comm_size]) - : MPI_DEFAULT_TYPE}; + MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size) + ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]) + : MPI_DEFAULT_TYPE; + MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size) + ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]) + : MPI_DEFAULT_TYPE}; int send_buf_size=parse_double(action[2]); int recv_buf_size=parse_double(action[3+comm_size]); @@ -770,7 +783,8 @@ static void action_allToAllv(simgrid::xbt::ReplayAction& action) TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts, - MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode())); + Datatype::encode(MPI_CURRENT_TYPE), + Datatype::encode(MPI_CURRENT_TYPE2))); Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD); @@ -798,12 +812,24 @@ void smpi_replay_init(int* argc, char*** argv) xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size); xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split); xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup); - xbt_replay_action_register("send", simgrid::smpi::action_send); - xbt_replay_action_register("Isend", simgrid::smpi::action_Isend); - xbt_replay_action_register("recv", simgrid::smpi::action_recv); - xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv); - xbt_replay_action_register("test", simgrid::smpi::action_test); - xbt_replay_action_register("wait", simgrid::smpi::action_wait); + + std::shared_ptr isend(new simgrid::smpi::Replay::SendAction("Isend")); + std::shared_ptr send(new simgrid::smpi::Replay::SendAction("send")); + std::shared_ptr irecv(new simgrid::smpi::Replay::RecvAction("Irecv")); + std::shared_ptr recv(new simgrid::smpi::Replay::RecvAction("recv")); + std::shared_ptr wait(new simgrid::smpi::Replay::WaitAction()); + + xbt_replay_action_register("send", + std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1)); + xbt_replay_action_register("Isend", + std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1)); + xbt_replay_action_register("recv", + std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1)); + xbt_replay_action_register("Irecv", + std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1)); + xbt_replay_action_register("test", simgrid::smpi::action_test); + xbt_replay_action_register("wait", + std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1)); xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall); xbt_replay_action_register("barrier", simgrid::smpi::action_barrier); xbt_replay_action_register("bcast", simgrid::smpi::action_bcast); @@ -818,7 +844,7 @@ void smpi_replay_init(int* argc, char*** argv) xbt_replay_action_register("allGather", simgrid::smpi::action_allgather); xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv); xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter); - xbt_replay_action_register("compute", simgrid::smpi::action_compute); + xbt_replay_action_register("compute", simgrid::smpi::action_compute); //if we have a delayed start, sleep here. if(*argc>2){