Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Avoid doing things when there is nothing to do.
[simgrid.git] / src / smpi / smpi_rma.cpp
index 21f3e02..6edbee3 100644 (file)
@@ -9,9 +9,7 @@
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
 
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
 
-#define RMA_TAG -1234
-
-xbt_bar_t creation_bar = nullptr;
+msg_bar_t creation_bar = nullptr;
 
 typedef struct s_smpi_mpi_win{
   void* base;
 
 typedef struct s_smpi_mpi_win{
   void* base;
@@ -21,11 +19,12 @@ typedef struct s_smpi_mpi_win{
   MPI_Info info;
   int assert;
   std::vector<MPI_Request> *requests;
   MPI_Info info;
   int assert;
   std::vector<MPI_Request> *requests;
-  xbt_bar_t bar;
+  msg_bar_t bar;
   MPI_Win* connected_wins;
   char* name;
   int opened;
   MPI_Group group;
   MPI_Win* connected_wins;
   char* name;
   int opened;
   MPI_Group group;
+  int count; //for ordering the accs
 } s_smpi_mpi_win_t;
 
 
 } s_smpi_mpi_win_t;
 
 
@@ -51,14 +50,14 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info
   win->requests = new std::vector<MPI_Request>();
   win->connected_wins = xbt_new0(MPI_Win, comm_size);
   win->connected_wins[rank] = win;
   win->requests = new std::vector<MPI_Request>();
   win->connected_wins = xbt_new0(MPI_Win, comm_size);
   win->connected_wins[rank] = win;
-
+  win->count = 0;
   if(rank==0){
   if(rank==0){
-    win->bar=xbt_barrier_init(comm_size);
+    win->bar = MSG_barrier_init(comm_size);
   }
   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
                          MPI_BYTE, comm);
 
   }
   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
                          MPI_BYTE, comm);
 
-  mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
+  mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
 
   mpi_coll_barrier_fun(comm);
 
 
   mpi_coll_barrier_fun(comm);
 
@@ -67,7 +66,7 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info
 
 int smpi_mpi_win_free( MPI_Win* win){
   //As per the standard, perform a barrier to ensure every async comm is finished
 
 int smpi_mpi_win_free( MPI_Win* win){
   //As per the standard, perform a barrier to ensure every async comm is finished
-  xbt_barrier_wait((*win)->bar);
+  MSG_barrier_wait((*win)->bar);
   delete (*win)->requests;
   xbt_free((*win)->connected_wins);
   if ((*win)->name != nullptr){
   delete (*win)->requests;
   xbt_free((*win)->connected_wins);
   if ((*win)->name != nullptr){
@@ -80,7 +79,7 @@ int smpi_mpi_win_free( MPI_Win* win){
   mpi_coll_barrier_fun((*win)->comm);
   int rank=smpi_comm_rank((*win)->comm);
   if(rank == 0)
   mpi_coll_barrier_fun((*win)->comm);
   int rank=smpi_comm_rank((*win)->comm);
   if(rank == 0)
-    xbt_barrier_destroy((*win)->bar);
+    MSG_barrier_destroy((*win)->bar);
   xbt_free(*win);
   *win = MPI_WIN_NULL;
   return MPI_SUCCESS;
   xbt_free(*win);
   *win = MPI_WIN_NULL;
   return MPI_SUCCESS;
@@ -99,6 +98,8 @@ void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
   if(win->comm != MPI_COMM_NULL){
     *group = smpi_comm_group(win->comm);
 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
   if(win->comm != MPI_COMM_NULL){
     *group = smpi_comm_group(win->comm);
+  } else {
+    *group = MPI_GROUP_NULL;
   }
 }
 
   }
 }
 
@@ -111,22 +112,26 @@ int smpi_mpi_win_fence( int assert,  MPI_Win win){
   if(win->opened==0)
     win->opened=1;
   if(assert != MPI_MODE_NOPRECEDE){
   if(win->opened==0)
     win->opened=1;
   if(assert != MPI_MODE_NOPRECEDE){
-    xbt_barrier_wait(win->bar);
+    MSG_barrier_wait(win->bar);
 
     std::vector<MPI_Request> *reqs = win->requests;
     int size = static_cast<int>(reqs->size());
     // start all requests that have been prepared by another process
 
     std::vector<MPI_Request> *reqs = win->requests;
     int size = static_cast<int>(reqs->size());
     // start all requests that have been prepared by another process
-    for(auto req: *reqs){
-      if (req && (req->flags & PREPARED))
-        smpi_mpi_start(req);
-    }
+    if(size>0){
+        for(auto req: *reqs){
+          if (req && (req->flags & PREPARED))
+            smpi_mpi_start(req);
+        }
+
+        MPI_Request* treqs = &(*reqs)[0];
 
 
-    MPI_Request* treqs = &(*reqs)[0];
-    smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+        smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+    }
+    win->count=0;
   }
   win->assert = assert;
 
   }
   win->assert = assert;
 
-  xbt_barrier_wait(win->bar);
+  MSG_barrier_wait(win->bar);
   XBT_DEBUG("Leaving fence ");
 
   return MPI_SUCCESS;
   XBT_DEBUG("Leaving fence ");
 
   return MPI_SUCCESS;
@@ -146,11 +151,11 @@ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datat
   if(target_rank != smpi_comm_rank(win->comm)){
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
   if(target_rank != smpi_comm_rank(win->comm)){
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
-        smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
+        smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
-        smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
+        smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
 
     //push request to receiver's win
     recv_win->requests->push_back(rreq);
 
     //push request to receiver's win
     recv_win->requests->push_back(rreq);
@@ -181,12 +186,12 @@ int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datat
   if(target_rank != smpi_comm_rank(win->comm)){
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
   if(target_rank != smpi_comm_rank(win->comm)){
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
-        smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
+        smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
         MPI_OP_NULL);
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
         MPI_OP_NULL);
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
-        smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
+        smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
         MPI_OP_NULL);
 
     //start the send, with another process than us as sender. 
         MPI_OP_NULL);
 
     //start the send, with another process than us as sender. 
@@ -219,14 +224,16 @@ int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi
 
   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
 
   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
-
+    //As the tag will be used for ordering of the operations, add count to it
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
-        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
+        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
-        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
+        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
+
+    win->count++;
     //push request to receiver's win
     recv_win->requests->push_back(rreq);
     //start send
     //push request to receiver's win
     recv_win->requests->push_back(rreq);
     //start send
@@ -259,7 +266,7 @@ int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
   while(j!=size){
     int src=smpi_group_index(group,j);
     if(src!=smpi_process_index()){
   while(j!=size){
     int src=smpi_group_index(group,j);
     if(src!=smpi_process_index()){
-      reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
+      reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
       i++;
     }
     j++;
       i++;
     }
     j++;
@@ -286,7 +293,7 @@ int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
   while(j!=size){
     int dst=smpi_group_index(group,j);
     if(dst!=smpi_process_index()){
   while(j!=size){
     int dst=smpi_group_index(group,j);
     if(dst!=smpi_process_index()){
-      reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
+      reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
       i++;
     }
     j++;
       i++;
     }
     j++;
@@ -317,7 +324,7 @@ int smpi_mpi_win_complete(MPI_Win win){
   while(j!=size){
     int dst=smpi_group_index(win->group,j);
     if(dst!=smpi_process_index()){
   while(j!=size){
     int dst=smpi_group_index(win->group,j);
     if(dst!=smpi_process_index()){
-      reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
+      reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
       i++;
     }
     j++;
       i++;
     }
     j++;
@@ -362,7 +369,7 @@ int smpi_mpi_win_wait(MPI_Win win){
   while(j!=size){
     int src=smpi_group_index(win->group,j);
     if(src!=smpi_process_index()){
   while(j!=size){
     int src=smpi_group_index(win->group,j);
     if(src!=smpi_process_index()){
-      reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
+      reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
       i++;
     }
     j++;
       i++;
     }
     j++;