Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Protect access to remote process list of rma requests, to avoid race conditions..
authordegomme <augustin.degomme@unibas.ch>
Tue, 21 Feb 2017 22:18:13 +0000 (23:18 +0100)
committerdegomme <augustin.degomme@unibas.ch>
Tue, 21 Feb 2017 22:18:13 +0000 (23:18 +0100)
src/smpi/smpi_rma.cpp

index 6edbee3..147140c 100644 (file)
@@ -19,6 +19,7 @@ typedef struct s_smpi_mpi_win{
   MPI_Info info;
   int assert;
   std::vector<MPI_Request> *requests;
+  xbt_mutex_t mut;
   msg_bar_t bar;
   MPI_Win* connected_wins;
   char* name;
@@ -48,6 +49,7 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info
   win->opened = 0;
   win->group = MPI_GROUP_NULL;
   win->requests = new std::vector<MPI_Request>();
+  win->mut=xbt_mutex_init();
   win->connected_wins = xbt_new0(MPI_Win, comm_size);
   win->connected_wins[rank] = win;
   win->count = 0;
@@ -67,7 +69,9 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info
 int smpi_mpi_win_free( MPI_Win* win){
   //As per the standard, perform a barrier to ensure every async comm is finished
   MSG_barrier_wait((*win)->bar);
+  xbt_mutex_acquire((*win)->mut);
   delete (*win)->requests;
+  xbt_mutex_release((*win)->mut);
   xbt_free((*win)->connected_wins);
   if ((*win)->name != nullptr){
     xbt_free((*win)->name);
@@ -80,6 +84,7 @@ int smpi_mpi_win_free( MPI_Win* win){
   int rank=smpi_comm_rank((*win)->comm);
   if(rank == 0)
     MSG_barrier_destroy((*win)->bar);
+  xbt_mutex_destroy((*win)->mut);
   xbt_free(*win);
   *win = MPI_WIN_NULL;
   return MPI_SUCCESS;
@@ -113,7 +118,7 @@ int smpi_mpi_win_fence( int assert,  MPI_Win win){
     win->opened=1;
   if(assert != MPI_MODE_NOPRECEDE){
     MSG_barrier_wait(win->bar);
-
+    xbt_mutex_acquire(win->mut);
     std::vector<MPI_Request> *reqs = win->requests;
     int size = static_cast<int>(reqs->size());
     // start all requests that have been prepared by another process
@@ -128,6 +133,7 @@ int smpi_mpi_win_fence( int assert,  MPI_Win win){
         smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
     }
     win->count=0;
+    xbt_mutex_release(win->mut);
   }
   win->assert = assert;
 
@@ -158,13 +164,16 @@ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datat
         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
 
     //push request to receiver's win
+    xbt_mutex_acquire(recv_win->mut);
     recv_win->requests->push_back(rreq);
-
+    xbt_mutex_release(recv_win->mut);
     //start send
     smpi_mpi_start(sreq);
 
     //push request to sender's win
+    xbt_mutex_acquire(win->mut);
     win->requests->push_back(sreq);
+    xbt_mutex_release(win->mut);
   }else{
     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
   }
@@ -196,15 +205,17 @@ int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datat
 
     //start the send, with another process than us as sender. 
     smpi_mpi_start(sreq);
-
     //push request to receiver's win
+    xbt_mutex_acquire(send_win->mut);
     send_win->requests->push_back(sreq);
+    xbt_mutex_release(send_win->mut);
 
     //start recv
     smpi_mpi_start(rreq);
-
     //push request to sender's win
+    xbt_mutex_acquire(win->mut);
     win->requests->push_back(rreq);
+    xbt_mutex_release(win->mut);
   }else{
     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
   }
@@ -235,12 +246,16 @@ int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi
 
     win->count++;
     //push request to receiver's win
+    xbt_mutex_acquire(recv_win->mut);
     recv_win->requests->push_back(rreq);
+    xbt_mutex_release(recv_win->mut);
     //start send
     smpi_mpi_start(sreq);
 
     //push request to sender's win
+    xbt_mutex_acquire(win->mut);
     win->requests->push_back(sreq);
+    xbt_mutex_release(win->mut);
 
   return MPI_SUCCESS;
 }
@@ -265,7 +280,7 @@ int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
 
   while(j!=size){
     int src=smpi_group_index(group,j);
-    if(src!=smpi_process_index()){
+    if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){
       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
       i++;
     }
@@ -292,7 +307,7 @@ int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
 
   while(j!=size){
     int dst=smpi_group_index(group,j);
-    if(dst!=smpi_process_index()){
+    if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
       i++;
     }
@@ -323,7 +338,7 @@ int smpi_mpi_win_complete(MPI_Win win){
 
   while(j!=size){
     int dst=smpi_group_index(win->group,j);
-    if(dst!=smpi_process_index()){
+    if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
       i++;
     }
@@ -340,7 +355,7 @@ int smpi_mpi_win_complete(MPI_Win win){
   xbt_free(reqs);
 
   //now we can finish RMA calls
-
+  xbt_mutex_acquire(win->mut);
   std::vector<MPI_Request> *reqqs = win->requests;
   size = static_cast<int>(reqqs->size());
 
@@ -354,6 +369,8 @@ int smpi_mpi_win_complete(MPI_Win win){
   MPI_Request* treqs = &(*reqqs)[0];
   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
   reqqs->clear();
+  xbt_mutex_release(win->mut);
+
   smpi_group_unuse(win->group);
   win->opened--; //we're closed for business !
   return MPI_SUCCESS;
@@ -368,7 +385,7 @@ int smpi_mpi_win_wait(MPI_Win win){
 
   while(j!=size){
     int src=smpi_group_index(win->group,j);
-    if(src!=smpi_process_index()){
+    if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
       i++;
     }
@@ -382,11 +399,11 @@ int smpi_mpi_win_wait(MPI_Win win){
     smpi_mpi_request_free(&reqs[i]);
   }
   xbt_free(reqs);
-
+  xbt_mutex_acquire(win->mut);
   std::vector<MPI_Request> *reqqs = win->requests;
   size = static_cast<int>(reqqs->size());
 
-  XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
+  XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
 
   // start all requests that have been prepared by another process
   for(auto req: *reqqs){
@@ -397,6 +414,8 @@ int smpi_mpi_win_wait(MPI_Win win){
   MPI_Request* treqs = &(*reqqs)[0];
   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
   reqqs->clear();
+  xbt_mutex_release(win->mut);
+
   smpi_group_unuse(win->group);
   win->opened--; //we're opened for business !
   return MPI_SUCCESS;