XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
-msg_bar_t creation_bar = nullptr;
-
typedef struct s_smpi_mpi_win{
void* base;
MPI_Aint size;
MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
- MPI_Win win;
-
int comm_size = smpi_comm_size(comm);
- int rank=smpi_comm_rank(comm);
+ int rank = smpi_comm_rank(comm);
XBT_DEBUG("Creating window");
- win = xbt_new(s_smpi_mpi_win_t, 1);
+ MPI_Win win = xbt_new(s_smpi_mpi_win_t, 1);
win->base = base;
win->size = size;
win->disp_unit = disp_unit;
win->name = xbt_strdup(name);
}
-int smpi_mpi_win_fence( int assert, MPI_Win win){
+int smpi_mpi_win_fence(int assert, MPI_Win win)
+{
XBT_DEBUG("Entering fence");
- if(win->opened==0)
+ if (win->opened == 0)
win->opened=1;
- if(assert != MPI_MODE_NOPRECEDE){
+ if (assert != MPI_MODE_NOPRECEDE) {
+ // This is not the first fence => finalize what came before
MSG_barrier_wait(win->bar);
xbt_mutex_acquire(win->mut);
+ // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
+ // Without this, the vector could get redimensionned when another process pushes.
+ // This would result in the array used by smpi_mpi_waitall() to be invalidated.
+ // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall
std::vector<MPI_Request> *reqs = win->requests;
int size = static_cast<int>(reqs->size());
// start all requests that have been prepared by another process
- if(size>0){
- for(auto req: *reqs){
- if (req && (req->flags & PREPARED))
- smpi_mpi_start(req);
- }
+ if (size > 0) {
+ for (const auto& req : *reqs) {
+ if (req && (req->flags & PREPARED))
+ smpi_mpi_start(req);
+ }
- MPI_Request* treqs = &(*reqs)[0];
+ MPI_Request* treqs = &(*reqs)[0];
- smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+ smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
}
win->count=0;
xbt_mutex_release(win->mut);
win->assert = assert;
MSG_barrier_wait(win->bar);
- XBT_DEBUG("Leaving fence ");
+ XBT_DEBUG("Leaving fence");
return MPI_SUCCESS;
}
must complete, without further dependencies. */
//naive, blocking implementation.
- int i=0,j=0;
- int size = smpi_group_size(group);
- MPI_Request* reqs = xbt_new0(MPI_Request, size);
-
- while(j!=size){
- int src=smpi_group_index(group,j);
- if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){
- reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
- i++;
- }
- j++;
+ int i = 0;
+ int j = 0;
+ int size = smpi_group_size(group);
+ MPI_Request* reqs = xbt_new0(MPI_Request, size);
+
+ while (j != size) {
+ int src = smpi_group_index(group, j);
+ if (src != smpi_process_index() && src != MPI_UNDEFINED) {
+ reqs[i] = smpi_irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
+ i++;
+ }
+ j++;
}
size=i;
smpi_mpi_startall(size, reqs);
int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
//let's make a synchronous send here
- int i=0,j=0;
+ int i = 0;
+ int j = 0;
int size = smpi_group_size(group);
MPI_Request* reqs = xbt_new0(MPI_Request, size);
xbt_die("Complete called on already opened MPI_Win");
XBT_DEBUG("Entering MPI_Win_Complete");
- int i=0,j=0;
+ int i = 0;
+ int j = 0;
int size = smpi_group_size(win->group);
MPI_Request* reqs = xbt_new0(MPI_Request, size);
size = static_cast<int>(reqqs->size());
XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
- // start all requests that have been prepared by another process
- for (auto req: *reqqs){
- if (req && (req->flags & PREPARED))
- smpi_mpi_start(req);
- }
+ if (size > 0) {
+ // start all requests that have been prepared by another process
+ for (const auto& req : *reqqs) {
+ if (req && (req->flags & PREPARED))
+ smpi_mpi_start(req);
+ }
- MPI_Request* treqs = &(*reqqs)[0];
- smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
- reqqs->clear();
+ MPI_Request* treqs = &(*reqqs)[0];
+ smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
+ reqqs->clear();
+ }
xbt_mutex_release(win->mut);
smpi_group_unuse(win->group);
size = static_cast<int>(reqqs->size());
XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
+ if (size > 0) {
+ // start all requests that have been prepared by another process
+ for (const auto& req : *reqqs) {
+ if (req && (req->flags & PREPARED))
+ smpi_mpi_start(req);
+ }
- // start all requests that have been prepared by another process
- for(auto req: *reqqs){
- if (req && (req->flags & PREPARED))
- smpi_mpi_start(req);
+ MPI_Request* treqs = &(*reqqs)[0];
+ smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
+ reqqs->clear();
}
-
- MPI_Request* treqs = &(*reqqs)[0];
- smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
- reqqs->clear();
xbt_mutex_release(win->mut);
smpi_group_unuse(win->group);