XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
-static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
}
}
-static std::vector<MPI_Request>* get_reqq_self()
-{
- return reqq.at(simgrid::s4u::this_actor::get_pid());
-}
-
-static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
-{
- reqq.insert({simgrid::s4u::this_actor::get_pid(), mpi_request});
-}
-
/* Helper function */
static double parse_double(std::string string)
{
Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
} else if (name == "Isend") {
MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
- get_reqq_self()->push_back(request);
+ req_storage->add(request);
} else {
xbt_die("Don't know this action, %s", name.c_str());
}
Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
} else if (name == "Irecv") {
MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
- get_reqq_self()->push_back(request);
+ req_storage->add(request);
}
TRACE_smpi_comm_out(my_proc_id);
TestAction(RequestStorage& storage) : ReplayAction("Test", storage) {}
void kernel(simgrid::xbt::ReplayAction& action) override
{
- MPI_Request request = get_reqq_self()->back();
- get_reqq_self()->pop_back();
+ MPI_Request request = req_storage->find(args.src, args.dst, args.tag);
+ req_storage->remove(request);
// if request is null here, this may mean that a previous test has succeeded
// Different times in traced application and replayed version may lead to this
// In this case, ignore the extra calls.
XBT_DEBUG("MPI_Test result: %d", flag);
/* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
* nullptr.*/
- get_reqq_self()->push_back(request);
+ if (request == MPI_REQUEST_NULL)
+ req_storage->addNullRequest(args.src, args.dst, args.tag);
+ else
+ req_storage->add(request);
TRACE_smpi_testing_out(my_proc_id);
}
/* start a simulated timer */
smpi_process()->simulated_start();
- set_reqq_self(new std::vector<MPI_Request>);
}
};
WaitAllAction(RequestStorage& storage) : ReplayAction("waitAll", storage) {}
void kernel(simgrid::xbt::ReplayAction& action) override
{
- const unsigned int count_requests = get_reqq_self()->size();
+ const unsigned int count_requests = req_storage->size();
if (count_requests > 0) {
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
- for (const auto& req : (*get_reqq_self())) {
+ std::vector<MPI_Request> reqs;
+ req_storage->get_requests(reqs);
+ for (const auto& req : reqs) {
if (req && (req->flags() & RECV)) {
sender_receiver.push_back({req->src(), req->dst()});
}
}
MPI_Status status[count_requests];
- Request::waitall(count_requests, &(*get_reqq_self())[0], status);
+ Request::waitall(count_requests, &(reqs.data())[0], status);
for (auto& pair : sender_receiver) {
TRACE_smpi_recv(pair.first, pair.second, 0);
/* and now, finalize everything */
/* One active process will stop. Decrease the counter*/
- XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
- if (not get_reqq_self()->empty()) {
- unsigned int count_requests=get_reqq_self()->size();
+ unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
+ XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
+ if (count_requests > 0) {
MPI_Request requests[count_requests];
MPI_Status status[count_requests];
unsigned int i=0;
- for (auto const& req : *get_reqq_self()) {
- requests[i] = req;
+ for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
+ requests[i] = pair.second;
i++;
}
simgrid::smpi::Request::waitall(count_requests, requests, status);
}
- delete get_reqq_self();
active_processes--;
if(active_processes==0){