Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix dist
[simgrid.git] / src / smpi / smpi_rma.cpp
index 147140c..b91fd0f 100644 (file)
@@ -6,11 +6,10 @@
 
 #include "private.h"
 #include <vector>
+#include "src/smpi/smpi_group.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
 
-msg_bar_t creation_bar = nullptr;
-
 typedef struct s_smpi_mpi_win{
   void* base;
   MPI_Aint size;
@@ -30,13 +29,11 @@ typedef struct s_smpi_mpi_win{
 
 
 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
-  MPI_Win win;
-
   int comm_size = smpi_comm_size(comm);
-  int rank=smpi_comm_rank(comm);
+  int rank      = smpi_comm_rank(comm);
   XBT_DEBUG("Creating window");
 
-  win = xbt_new(s_smpi_mpi_win_t, 1);
+  MPI_Win win    = xbt_new(s_smpi_mpi_win_t, 1);
   win->base = base;
   win->size = size;
   win->disp_unit = disp_unit;
@@ -112,25 +109,31 @@ void smpi_mpi_win_set_name(MPI_Win win, char* name){
   win->name = xbt_strdup(name);
 }
 
-int smpi_mpi_win_fence( int assert,  MPI_Win win){
+int smpi_mpi_win_fence(int assert, MPI_Win win)
+{
   XBT_DEBUG("Entering fence");
-  if(win->opened==0)
+  if (win->opened == 0)
     win->opened=1;
-  if(assert != MPI_MODE_NOPRECEDE){
+  if (assert != MPI_MODE_NOPRECEDE) {
+    // This is not the first fence => finalize what came before
     MSG_barrier_wait(win->bar);
     xbt_mutex_acquire(win->mut);
+    // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
+    // Without this, the vector could get redimensionned when another process pushes.
+    // This would result in the array used by smpi_mpi_waitall() to be invalidated.
+    // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall
     std::vector<MPI_Request> *reqs = win->requests;
     int size = static_cast<int>(reqs->size());
     // start all requests that have been prepared by another process
-    if(size>0){
-        for(auto req: *reqs){
-          if (req && (req->flags & PREPARED))
-            smpi_mpi_start(req);
-        }
+    if (size > 0) {
+      for (const auto& req : *reqs) {
+        if (req && (req->flags & PREPARED))
+          smpi_mpi_start(req);
+      }
 
-        MPI_Request* treqs = &(*reqs)[0];
+      MPI_Request* treqs = &(*reqs)[0];
 
-        smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+      smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
     }
     win->count=0;
     xbt_mutex_release(win->mut);
@@ -138,7 +141,7 @@ int smpi_mpi_win_fence( int assert,  MPI_Win win){
   win->assert = assert;
 
   MSG_barrier_wait(win->bar);
-  XBT_DEBUG("Leaving fence ");
+  XBT_DEBUG("Leaving fence");
 
   return MPI_SUCCESS;
 }
@@ -157,11 +160,11 @@ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datat
   if(target_rank != smpi_comm_rank(win->comm)){
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
-        smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
+        smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
-        smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
+        smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
 
     //push request to receiver's win
     xbt_mutex_acquire(recv_win->mut);
@@ -195,12 +198,12 @@ int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datat
   if(target_rank != smpi_comm_rank(win->comm)){
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
-        smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
+        smpi_comm_group(win->comm)->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
         MPI_OP_NULL);
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
-        smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
+        smpi_comm_group(win->comm)->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
         MPI_OP_NULL);
 
     //start the send, with another process than us as sender. 
@@ -238,11 +241,11 @@ int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi
     //As the tag will be used for ordering of the operations, add count to it
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
-        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
+        smpi_process_index(), smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
-        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
+        smpi_process_index(), smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
 
     win->count++;
     //push request to receiver's win
@@ -274,17 +277,18 @@ int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
     must complete, without further dependencies.  */
 
   //naive, blocking implementation.
-  int i=0,j=0;
-  int size = smpi_group_size(group);
-  MPI_Request* reqs = xbt_new0(MPI_Request, size);
-
-  while(j!=size){
-    int src=smpi_group_index(group,j);
-    if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){
-      reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
-      i++;
-    }
-    j++;
+    int i             = 0;
+    int j             = 0;
+    int size          = group->getsize();
+    MPI_Request* reqs = xbt_new0(MPI_Request, size);
+
+    while (j != size) {
+      int src = group->index(j);
+      if (src != smpi_process_index() && src != MPI_UNDEFINED) {
+        reqs[i] = smpi_irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
+        i++;
+      }
+      j++;
   }
   size=i;
   smpi_mpi_startall(size, reqs);
@@ -295,18 +299,19 @@ int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
   xbt_free(reqs);
   win->opened++; //we're open for business !
   win->group=group;
-  smpi_group_use(group);
+  group->use();
   return MPI_SUCCESS;
 }
 
 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
   //let's make a synchronous send here
-  int i=0,j=0;
-  int size = smpi_group_size(group);
+  int i             = 0;
+  int j             = 0;
+  int size = group->getsize();
   MPI_Request* reqs = xbt_new0(MPI_Request, size);
 
   while(j!=size){
-    int dst=smpi_group_index(group,j);
+    int dst=group->index(j);
     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
       i++;
@@ -323,7 +328,7 @@ int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
   xbt_free(reqs);
   win->opened++; //we're open for business !
   win->group=group;
-  smpi_group_use(group);
+  group->use();
   return MPI_SUCCESS;
 }
 
@@ -332,12 +337,13 @@ int smpi_mpi_win_complete(MPI_Win win){
     xbt_die("Complete called on already opened MPI_Win");
 
   XBT_DEBUG("Entering MPI_Win_Complete");
-  int i=0,j=0;
-  int size = smpi_group_size(win->group);
+  int i             = 0;
+  int j             = 0;
+  int size = win->group->getsize();
   MPI_Request* reqs = xbt_new0(MPI_Request, size);
 
   while(j!=size){
-    int dst=smpi_group_index(win->group,j);
+    int dst=win->group->index(j);
     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
       i++;
@@ -360,18 +366,20 @@ int smpi_mpi_win_complete(MPI_Win win){
   size = static_cast<int>(reqqs->size());
 
   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
-  // start all requests that have been prepared by another process
-  for (auto req: *reqqs){
-    if (req && (req->flags & PREPARED))
-      smpi_mpi_start(req);
-  }
+  if (size > 0) {
+    // start all requests that have been prepared by another process
+    for (const auto& req : *reqqs) {
+      if (req && (req->flags & PREPARED))
+        smpi_mpi_start(req);
+    }
 
-  MPI_Request* treqs = &(*reqqs)[0];
-  smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
-  reqqs->clear();
+    MPI_Request* treqs = &(*reqqs)[0];
+    smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
+    reqqs->clear();
+  }
   xbt_mutex_release(win->mut);
 
-  smpi_group_unuse(win->group);
+  win->group->unuse();
   win->opened--; //we're closed for business !
   return MPI_SUCCESS;
 }
@@ -380,11 +388,11 @@ int smpi_mpi_win_wait(MPI_Win win){
   //naive, blocking implementation.
   XBT_DEBUG("Entering MPI_Win_Wait");
   int i=0,j=0;
-  int size = smpi_group_size(win->group);
+  int size = win->group->getsize();
   MPI_Request* reqs = xbt_new0(MPI_Request, size);
 
   while(j!=size){
-    int src=smpi_group_index(win->group,j);
+    int src=win->group->index(j);
     if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
       i++;
@@ -404,19 +412,20 @@ int smpi_mpi_win_wait(MPI_Win win){
   size = static_cast<int>(reqqs->size());
 
   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
+  if (size > 0) {
+    // start all requests that have been prepared by another process
+    for (const auto& req : *reqqs) {
+      if (req && (req->flags & PREPARED))
+        smpi_mpi_start(req);
+    }
 
-  // start all requests that have been prepared by another process
-  for(auto req: *reqqs){
-    if (req && (req->flags & PREPARED))
-      smpi_mpi_start(req);
+    MPI_Request* treqs = &(*reqqs)[0];
+    smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
+    reqqs->clear();
   }
-
-  MPI_Request* treqs = &(*reqqs)[0];
-  smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
-  reqqs->clear();
   xbt_mutex_release(win->mut);
 
-  smpi_group_unuse(win->group);
+  win->group->unuse();
   win->opened--; //we're opened for business !
   return MPI_SUCCESS;
 }