MPI_Info info;
int assert;
std::vector<MPI_Request> *requests;
+ xbt_mutex_t mut;
msg_bar_t bar;
MPI_Win* connected_wins;
char* name;
win->opened = 0;
win->group = MPI_GROUP_NULL;
win->requests = new std::vector<MPI_Request>();
+ win->mut=xbt_mutex_init();
win->connected_wins = xbt_new0(MPI_Win, comm_size);
win->connected_wins[rank] = win;
win->count = 0;
int smpi_mpi_win_free( MPI_Win* win){
//As per the standard, perform a barrier to ensure every async comm is finished
MSG_barrier_wait((*win)->bar);
+ xbt_mutex_acquire((*win)->mut);
delete (*win)->requests;
+ xbt_mutex_release((*win)->mut);
xbt_free((*win)->connected_wins);
if ((*win)->name != nullptr){
xbt_free((*win)->name);
int rank=smpi_comm_rank((*win)->comm);
if(rank == 0)
MSG_barrier_destroy((*win)->bar);
+ xbt_mutex_destroy((*win)->mut);
xbt_free(*win);
*win = MPI_WIN_NULL;
return MPI_SUCCESS;
win->opened=1;
if(assert != MPI_MODE_NOPRECEDE){
MSG_barrier_wait(win->bar);
-
+ xbt_mutex_acquire(win->mut);
std::vector<MPI_Request> *reqs = win->requests;
int size = static_cast<int>(reqs->size());
// start all requests that have been prepared by another process
smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
}
win->count=0;
+ xbt_mutex_release(win->mut);
}
win->assert = assert;
smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
//push request to receiver's win
+ xbt_mutex_acquire(recv_win->mut);
recv_win->requests->push_back(rreq);
-
+ xbt_mutex_release(recv_win->mut);
//start send
smpi_mpi_start(sreq);
//push request to sender's win
+ xbt_mutex_acquire(win->mut);
win->requests->push_back(sreq);
+ xbt_mutex_release(win->mut);
}else{
smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
}
//start the send, with another process than us as sender.
smpi_mpi_start(sreq);
-
//push request to receiver's win
+ xbt_mutex_acquire(send_win->mut);
send_win->requests->push_back(sreq);
+ xbt_mutex_release(send_win->mut);
//start recv
smpi_mpi_start(rreq);
-
//push request to sender's win
+ xbt_mutex_acquire(win->mut);
win->requests->push_back(rreq);
+ xbt_mutex_release(win->mut);
}else{
smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
}
win->count++;
//push request to receiver's win
+ xbt_mutex_acquire(recv_win->mut);
recv_win->requests->push_back(rreq);
+ xbt_mutex_release(recv_win->mut);
//start send
smpi_mpi_start(sreq);
//push request to sender's win
+ xbt_mutex_acquire(win->mut);
win->requests->push_back(sreq);
+ xbt_mutex_release(win->mut);
return MPI_SUCCESS;
}
while(j!=size){
int src=smpi_group_index(group,j);
- if(src!=smpi_process_index()){
+ if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){
reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
i++;
}
while(j!=size){
int dst=smpi_group_index(group,j);
- if(dst!=smpi_process_index()){
+ if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
i++;
}
while(j!=size){
int dst=smpi_group_index(win->group,j);
- if(dst!=smpi_process_index()){
+ if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
i++;
}
xbt_free(reqs);
//now we can finish RMA calls
-
+ xbt_mutex_acquire(win->mut);
std::vector<MPI_Request> *reqqs = win->requests;
size = static_cast<int>(reqqs->size());
MPI_Request* treqs = &(*reqqs)[0];
smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
reqqs->clear();
+ xbt_mutex_release(win->mut);
+
smpi_group_unuse(win->group);
win->opened--; //we're closed for business !
return MPI_SUCCESS;
while(j!=size){
int src=smpi_group_index(win->group,j);
- if(src!=smpi_process_index()){
+ if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
i++;
}
smpi_mpi_request_free(&reqs[i]);
}
xbt_free(reqs);
-
+ xbt_mutex_acquire(win->mut);
std::vector<MPI_Request> *reqqs = win->requests;
size = static_cast<int>(reqqs->size());
- XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
+ XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
// start all requests that have been prepared by another process
for(auto req: *reqqs){
MPI_Request* treqs = &(*reqqs)[0];
smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
reqqs->clear();
+ xbt_mutex_release(win->mut);
+
smpi_group_unuse(win->group);
win->opened--; //we're opened for business !
return MPI_SUCCESS;