From 33a2dd5a405effecfbc7a26d7a84664902a2432a Mon Sep 17 00:00:00 2001 From: Augustin Degomme Date: Sun, 20 Mar 2022 14:01:45 +0100 Subject: [PATCH] add finalizing state for smpi actor: if finalization barrier is used, tag is now different, in order not to match against other MPI_Barrier that may be in the MPI code --- src/smpi/bindings/smpi_pmpi.cpp | 1 + .../colls/barrier/barrier-mvapich2-pair.cpp | 10 ++-- src/smpi/colls/barrier/barrier-ompi.cpp | 52 +++++++++++-------- src/smpi/include/private.hpp | 2 +- src/smpi/include/smpi_actor.hpp | 2 + src/smpi/internals/smpi_actor.cpp | 13 +++++ 6 files changed, 52 insertions(+), 28 deletions(-) diff --git a/src/smpi/bindings/smpi_pmpi.cpp b/src/smpi/bindings/smpi_pmpi.cpp index 8764f14f5a..7de62bd039 100644 --- a/src/smpi/bindings/smpi_pmpi.cpp +++ b/src/smpi/bindings/smpi_pmpi.cpp @@ -65,6 +65,7 @@ int PMPI_Finalize() { smpi_bench_end(); aid_t rank_traced = simgrid::s4u::this_actor::get_pid(); + smpi_process()->mark_as_finalizing(); TRACE_smpi_comm_in(rank_traced, __func__, new simgrid::instr::NoOpTIData("finalize")); if(simgrid::config::get_value("smpi/finalization-barrier")) diff --git a/src/smpi/colls/barrier/barrier-mvapich2-pair.cpp b/src/smpi/colls/barrier/barrier-mvapich2-pair.cpp index 57b87e2f09..8def355ebe 100644 --- a/src/smpi/colls/barrier/barrier-mvapich2-pair.cpp +++ b/src/smpi/colls/barrier/barrier-mvapich2-pair.cpp @@ -41,6 +41,7 @@ #include "../coll_tuned_topo.hpp" #include "../colls_private.hpp" +#include "smpi_actor.hpp" namespace simgrid{ namespace smpi{ int barrier__mvapich2_pair(MPI_Comm comm) @@ -49,6 +50,7 @@ int barrier__mvapich2_pair(MPI_Comm comm) int size, rank; int d, dst, src; int mpi_errno = MPI_SUCCESS; + int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER; size = comm->size(); /* Trivial barriers return immediately */ @@ -68,25 +70,25 @@ int barrier__mvapich2_pair(MPI_Comm comm) if (rank < surfeit) { /* get the fanin letter from the upper "half" process: */ dst = N2_prev + rank; - Request::recv(nullptr, 0, MPI_BYTE, dst, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::recv(nullptr, 0, MPI_BYTE, dst, tag, comm, MPI_STATUS_IGNORE); } /* combine on embedded N2_prev power-of-two processes */ for (d = 1; d < N2_prev; d <<= 1) { dst = (rank ^ d); - Request::sendrecv(nullptr, 0, MPI_BYTE, dst, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, dst, COLL_TAG_BARRIER, + Request::sendrecv(nullptr, 0, MPI_BYTE, dst, tag, nullptr, 0, MPI_BYTE, dst, tag, comm, MPI_STATUS_IGNORE); } /* fanout data to nodes above N2_prev... */ if (rank < surfeit) { dst = N2_prev + rank; - Request::send(nullptr, 0, MPI_BYTE, dst, COLL_TAG_BARRIER, comm); + Request::send(nullptr, 0, MPI_BYTE, dst, tag, comm); } } else { /* fanin data to power of 2 subset */ src = rank - N2_prev; - Request::sendrecv(nullptr, 0, MPI_BYTE, src, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, src, COLL_TAG_BARRIER, + Request::sendrecv(nullptr, 0, MPI_BYTE, src, tag, nullptr, 0, MPI_BYTE, src, tag, comm, MPI_STATUS_IGNORE); } diff --git a/src/smpi/colls/barrier/barrier-ompi.cpp b/src/smpi/colls/barrier/barrier-ompi.cpp index 3d540de5ad..5c921c8370 100644 --- a/src/smpi/colls/barrier/barrier-ompi.cpp +++ b/src/smpi/colls/barrier/barrier-ompi.cpp @@ -22,6 +22,7 @@ #include "../coll_tuned_topo.hpp" #include "../colls_private.hpp" +#include "smpi_actor.hpp" /* * Barrier is meant to be a synchronous operation, as some BTLs can mark @@ -52,35 +53,35 @@ int barrier__ompi_doublering(MPI_Comm comm) rank = comm->rank(); size = comm->size(); - + int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER; XBT_DEBUG("ompi_coll_tuned_barrier_ompi_doublering rank %d", rank); left = ((rank-1+size)%size); right = ((rank+1)%size); if (rank > 0) { /* receive message from the left */ - Request::recv(nullptr, 0, MPI_BYTE, left, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::recv(nullptr, 0, MPI_BYTE, left, tag, comm, MPI_STATUS_IGNORE); } /* Send message to the right */ - Request::send(nullptr, 0, MPI_BYTE, right, COLL_TAG_BARRIER, comm); + Request::send(nullptr, 0, MPI_BYTE, right, tag, comm); /* root needs to receive from the last node */ if (rank == 0) { - Request::recv(nullptr, 0, MPI_BYTE, left, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::recv(nullptr, 0, MPI_BYTE, left, tag, comm, MPI_STATUS_IGNORE); } /* Allow nodes to exit */ if (rank > 0) { /* post Receive from left */ - Request::recv(nullptr, 0, MPI_BYTE, left, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::recv(nullptr, 0, MPI_BYTE, left, tag, comm, MPI_STATUS_IGNORE); } /* send message to the right one */ - Request::send(nullptr, 0, MPI_BYTE, right, COLL_TAG_BARRIER, comm); + Request::send(nullptr, 0, MPI_BYTE, right, tag, comm); /* rank 0 post receive from the last node */ if (rank == 0) { - Request::recv(nullptr, 0, MPI_BYTE, left, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::recv(nullptr, 0, MPI_BYTE, left, tag, comm, MPI_STATUS_IGNORE); } return MPI_SUCCESS; @@ -99,6 +100,7 @@ int barrier__ompi_recursivedoubling(MPI_Comm comm) rank = comm->rank(); size = comm->size(); + int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER; XBT_DEBUG( "ompi_coll_tuned_barrier_ompi_recursivedoubling rank %d", rank); @@ -112,13 +114,13 @@ int barrier__ompi_recursivedoubling(MPI_Comm comm) if (rank >= adjsize) { /* send message to lower ranked node */ remote = rank - adjsize; - Request::sendrecv(nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, remote, - COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::sendrecv(nullptr, 0, MPI_BYTE, remote, tag, nullptr, 0, MPI_BYTE, remote, + tag, comm, MPI_STATUS_IGNORE); } else if (rank < (size - adjsize)) { /* receive message from high level rank */ - Request::recv(nullptr, 0, MPI_BYTE, rank + adjsize, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::recv(nullptr, 0, MPI_BYTE, rank + adjsize, tag, comm, MPI_STATUS_IGNORE); } } @@ -131,8 +133,8 @@ int barrier__ompi_recursivedoubling(MPI_Comm comm) if (remote >= adjsize) continue; /* post receive from the remote node */ - Request::sendrecv(nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, remote, - COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::sendrecv(nullptr, 0, MPI_BYTE, remote, tag, nullptr, 0, MPI_BYTE, remote, + tag, comm, MPI_STATUS_IGNORE); } } @@ -141,7 +143,7 @@ int barrier__ompi_recursivedoubling(MPI_Comm comm) if (rank < (size - adjsize)) { /* send enter message to higher ranked node */ remote = rank + adjsize; - Request::send(nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER, comm); + Request::send(nullptr, 0, MPI_BYTE, remote, tag, comm); } } @@ -161,6 +163,7 @@ int barrier__ompi_bruck(MPI_Comm comm) rank = comm->rank(); size = comm->size(); + int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER; XBT_DEBUG( "ompi_coll_tuned_barrier_ompi_bruck rank %d", rank); @@ -170,7 +173,7 @@ int barrier__ompi_bruck(MPI_Comm comm) to = (rank + distance) % size; /* send message to lower ranked node */ - Request::sendrecv(nullptr, 0, MPI_BYTE, to, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, from, COLL_TAG_BARRIER, + Request::sendrecv(nullptr, 0, MPI_BYTE, to, tag, nullptr, 0, MPI_BYTE, from, tag, comm, MPI_STATUS_IGNORE); } @@ -188,11 +191,12 @@ int barrier__ompi_two_procs(MPI_Comm comm) int remote; remote = comm->rank(); + int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER; XBT_DEBUG( "ompi_coll_tuned_barrier_ompi_two_procs rank %d", remote); remote = (remote + 1) & 0x1; - Request::sendrecv(nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER, + Request::sendrecv(nullptr, 0, MPI_BYTE, remote, tag, nullptr, 0, MPI_BYTE, remote, tag, comm, MPI_STATUS_IGNORE); return (MPI_SUCCESS); } @@ -218,12 +222,13 @@ int barrier__ompi_basic_linear(MPI_Comm comm) int size = comm->size(); int rank = comm->rank(); + int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER; /* All non-root send & receive zero-length message. */ if (rank > 0) { - Request::send(nullptr, 0, MPI_BYTE, 0, COLL_TAG_BARRIER, comm); + Request::send(nullptr, 0, MPI_BYTE, 0, tag, comm); - Request::recv(nullptr, 0, MPI_BYTE, 0, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::recv(nullptr, 0, MPI_BYTE, 0, tag, comm, MPI_STATUS_IGNORE); } /* The root collects and broadcasts the messages. */ @@ -233,12 +238,12 @@ int barrier__ompi_basic_linear(MPI_Comm comm) requests = new MPI_Request[size]; for (i = 1; i < size; ++i) { - requests[i] = Request::irecv(nullptr, 0, MPI_BYTE, i, COLL_TAG_BARRIER, comm); + requests[i] = Request::irecv(nullptr, 0, MPI_BYTE, i, tag, comm); } Request::waitall( size-1, requests+1, MPI_STATUSES_IGNORE ); for (i = 1; i < size; ++i) { - requests[i] = Request::isend(nullptr, 0, MPI_BYTE, i, COLL_TAG_BARRIER, comm); + requests[i] = Request::isend(nullptr, 0, MPI_BYTE, i, tag, comm); } Request::waitall( size-1, requests+1, MPI_STATUSES_IGNORE ); delete[] requests; @@ -262,6 +267,7 @@ int barrier__ompi_tree(MPI_Comm comm) rank = comm->rank(); size = comm->size(); + int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER; XBT_DEBUG( "ompi_coll_tuned_barrier_ompi_tree %d", rank); @@ -273,9 +279,9 @@ int barrier__ompi_tree(MPI_Comm comm) partner = rank ^ jump; if (!(partner & (jump-1)) && partner < size) { if (partner > rank) { - Request::recv(nullptr, 0, MPI_BYTE, partner, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::recv(nullptr, 0, MPI_BYTE, partner, tag, comm, MPI_STATUS_IGNORE); } else if (partner < rank) { - Request::send(nullptr, 0, MPI_BYTE, partner, COLL_TAG_BARRIER, comm); + Request::send(nullptr, 0, MPI_BYTE, partner, tag, comm); } } } @@ -285,9 +291,9 @@ int barrier__ompi_tree(MPI_Comm comm) partner = rank ^ jump; if (!(partner & (jump-1)) && partner < size) { if (partner > rank) { - Request::send(nullptr, 0, MPI_BYTE, partner, COLL_TAG_BARRIER, comm); + Request::send(nullptr, 0, MPI_BYTE, partner, tag, comm); } else if (partner < rank) { - Request::recv(nullptr, 0, MPI_BYTE, partner, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE); + Request::recv(nullptr, 0, MPI_BYTE, partner, tag, comm, MPI_STATUS_IGNORE); } } } diff --git a/src/smpi/include/private.hpp b/src/smpi/include/private.hpp index 3351208aad..7d1fa6c62d 100644 --- a/src/smpi/include/private.hpp +++ b/src/smpi/include/private.hpp @@ -31,7 +31,7 @@ constexpr unsigned MPI_REQ_MATCHED = 0x4000; constexpr unsigned MPI_REQ_CANCELLED = 0x8000; constexpr unsigned MPI_REQ_NBC = 0x10000; -enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED /*(=MPI_Init called)*/, FINALIZED }; +enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED /*(=MPI_Init called)*/, FINALIZING, FINALIZED }; constexpr int COLL_TAG_REDUCE = -112; constexpr int COLL_TAG_SCATTER = -223; diff --git a/src/smpi/include/smpi_actor.hpp b/src/smpi/include/smpi_actor.hpp index 3b1e6f83c6..a44f60f629 100644 --- a/src/smpi/include/smpi_actor.hpp +++ b/src/smpi/include/smpi_actor.hpp @@ -56,7 +56,9 @@ public: int finalized() const; int initializing() const; int initialized() const; + int finalizing() const; void mark_as_initialized(); + void mark_as_finalizing(); void set_replaying(bool value); bool replaying() const; std::string get_instance_id() const { return instance_id_;} diff --git a/src/smpi/internals/smpi_actor.cpp b/src/smpi/internals/smpi_actor.cpp index 4e2fa04ffd..ad8af7dc62 100644 --- a/src/smpi/internals/smpi_actor.cpp +++ b/src/smpi/internals/smpi_actor.cpp @@ -99,6 +99,19 @@ void ActorExt::mark_as_initialized() state_ = SmpiProcessState::INITIALIZED; } +/** @brief Mark a process as finalizing (=MPI_Finalize called) */ +void ActorExt::mark_as_finalizing() +{ + if (state_ != SmpiProcessState::FINALIZED) + state_ = SmpiProcessState::FINALIZING; +} + +/** @brief Check if a process is finalizing */ +int ActorExt::finalizing() const +{ + return (state_ == SmpiProcessState::FINALIZING); +} + void ActorExt::set_replaying(bool value) { if (state_ != SmpiProcessState::FINALIZED) -- 2.20.1