Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add MPI_Win_post, MPI_Win_start, MPI_Win_complete, and MPI_Win_wait support.
authorAugustin Degomme <augustin.degomme@imag.fr>
Fri, 5 Dec 2014 13:51:37 +0000 (14:51 +0100)
committerAugustin Degomme <augustin.degomme@imag.fr>
Fri, 5 Dec 2014 13:59:52 +0000 (14:59 +0100)
This is the second (out of 3) of the classic MPI RMA synchronization methods.
This version is naive and may not be what real MPI lib do, as the standard lets the implementer chose the behavior of theses calls.

src/smpi/instr_smpi.c
src/smpi/private.h
src/smpi/smpi_f77.c
src/smpi/smpi_pmpi.c
src/smpi/smpi_rma.c

index bda2b95..1560ae5 100644 (file)
@@ -53,7 +53,11 @@ static const char *smpi_colors[] ={
     "put",       "0.3 1 0",
     "get",       "0 1 0.3",
     "accumulate",       "1 0.3 0",
     "put",       "0.3 1 0",
     "get",       "0 1 0.3",
     "accumulate",       "1 0.3 0",
-    "fence",       "1 0 0.3",
+    "win_fence",       "1 0 0.3",
+    "win_post",       "1 0 0.8",
+    "win_wait",       "1 0.8 0",
+    "win_start",       "0.8 0 1",
+    "win_complete",       "0.8 1 0",
     NULL, NULL,
 };
 
     NULL, NULL,
 };
 
index 13f2e76..bc36a1c 100644 (file)
@@ -398,6 +398,11 @@ void smpi_mpi_win_set_name(MPI_Win win, char* name);
 
 int smpi_mpi_win_fence( int assert,  MPI_Win win);
 
 
 int smpi_mpi_win_fence( int assert,  MPI_Win win);
 
+int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win);
+int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win);
+int smpi_mpi_win_complete(MPI_Win win);
+int smpi_mpi_win_wait(MPI_Win win);
+
 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win);
 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win);
 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
@@ -558,6 +563,10 @@ void mpi_win_free_( int* win, int* ierr);
 void mpi_win_create_( int *base, MPI_Aint* size, int* disp_unit, int* info, int* comm, int *win, int* ierr);
 void mpi_win_set_name_ (int*  win, char * name, int* ierr, int size);
 void mpi_win_get_name_ (int*  win, char * name, int* len, int* ierr);
 void mpi_win_create_( int *base, MPI_Aint* size, int* disp_unit, int* info, int* comm, int *win, int* ierr);
 void mpi_win_set_name_ (int*  win, char * name, int* ierr, int size);
 void mpi_win_get_name_ (int*  win, char * name, int* len, int* ierr);
+void mpi_win_post_(int* group, int assert, int* win, int* ierr);
+void mpi_win_start_(int* group, int assert, int* win, int* ierr);
+void mpi_win_complete_(int* win, int* ierr);
+void mpi_win_wait_(int* win, int* ierr);
 void mpi_info_create_( int *info, int* ierr);
 void mpi_info_set_( int *info, char *key, char *value, int* ierr, unsigned int keylen, unsigned int valuelen);
 void mpi_info_free_(int* info, int* ierr);
 void mpi_info_create_( int *info, int* ierr);
 void mpi_info_set_( int *info, char *key, char *value, int* ierr, unsigned int keylen, unsigned int valuelen);
 void mpi_info_free_(int* info, int* ierr);
index a832f33..b044802 100644 (file)
@@ -748,6 +748,22 @@ void mpi_win_create_( int *base, MPI_Aint* size, int* disp_unit, int* info, int*
  }
 }
 
  }
 }
 
+void mpi_win_post_(int* group, int assert, int* win, int* ierr){
+  *ierr =  MPI_Win_post(smpi_group_f2c(*group), assert, smpi_win_f2c(*win));
+}
+
+void mpi_win_start_(int* group, int assert, int* win, int* ierr){
+  *ierr =  MPI_Win_start(smpi_group_f2c(*group), assert, smpi_win_f2c(*win));
+}
+
+void mpi_win_complete_(int* win, int* ierr){
+  *ierr =  MPI_Win_complete(smpi_win_f2c(*win));
+}
+
+void mpi_win_wait_(int* win, int* ierr){
+  *ierr =  MPI_Win_wait(smpi_win_f2c(*win));
+}
+
 void mpi_win_set_name_ (int*  win, char * name, int* ierr, int size){
  //handle trailing blanks
  while(name[size-1]==' ')size--;
 void mpi_win_set_name_ (int*  win, char * name, int* ierr, int size){
  //handle trailing blanks
  while(name[size-1]==' ')size--;
@@ -1729,3 +1745,4 @@ void mpi_comm_get_parent_ ( int* parent, int* ierr){
     *parent = smpi_comm_c2f(tmp);
   }
 }
     *parent = smpi_comm_c2f(tmp);
   }
 }
+
index bbd04ba..7c06028 100644 (file)
@@ -3013,6 +3013,85 @@ int PMPI_Accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_da
 }
 
 
 }
 
 
+int PMPI_Win_post(MPI_Group group, int assert, MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else if (group==MPI_GROUP_NULL){
+    retval = MPI_ERR_GROUP;
+  }
+#ifdef HAVE_TRACING
+  int rank = smpi_process_index();
+  TRACE_smpi_collective_in(rank, -1, __FUNCTION__, NULL);
+#endif
+  retval = smpi_mpi_win_post(group,assert,win);
+#ifdef HAVE_TRACING
+  TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
+#endif
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Win_start(MPI_Group group, int assert, MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else if (group==MPI_GROUP_NULL){
+    retval = MPI_ERR_GROUP;
+  }
+  
+#ifdef HAVE_TRACING
+  int rank = smpi_process_index();
+  TRACE_smpi_collective_in(rank, -1, __FUNCTION__, NULL);
+#endif
+  retval = smpi_mpi_win_start(group,assert,win);
+#ifdef HAVE_TRACING
+  TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
+#endif
+  smpi_bench_begin();
+  return retval;
+}
+
+
+int PMPI_Win_complete(MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  }
+  
+#ifdef HAVE_TRACING
+  int rank = smpi_process_index();
+  TRACE_smpi_collective_in(rank, -1, __FUNCTION__, NULL);
+#endif
+  retval = smpi_mpi_win_complete(win);
+#ifdef HAVE_TRACING
+  TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
+#endif
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Win_wait(MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  }
+#ifdef HAVE_TRACING
+  int rank = smpi_process_index();
+  TRACE_smpi_collective_in(rank, -1, __FUNCTION__, NULL);
+#endif
+  retval = smpi_mpi_win_wait(win);
+#ifdef HAVE_TRACING
+  TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
+#endif
+  smpi_bench_begin();
+  return retval;
+}
+
 int PMPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){
   void *ptr = xbt_malloc(size);
   if(!ptr)
 int PMPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){
   void *ptr = xbt_malloc(size);
   if(!ptr)
@@ -3664,22 +3743,10 @@ int PMPI_Comm_get_parent( MPI_Comm *parent){
   NOT_YET_IMPLEMENTED
 }
 
   NOT_YET_IMPLEMENTED
 }
 
-int PMPI_Win_complete(MPI_Win win){
-  NOT_YET_IMPLEMENTED
-}
-
 int PMPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win) {
   NOT_YET_IMPLEMENTED
 }
 
 int PMPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win) {
   NOT_YET_IMPLEMENTED
 }
 
-int PMPI_Win_post(MPI_Group group, int assert, MPI_Win win){
-  NOT_YET_IMPLEMENTED
-}
-
-int PMPI_Win_start(MPI_Group group, int assert, MPI_Win win){
-  NOT_YET_IMPLEMENTED
-}
-
 int PMPI_Win_test(MPI_Win win, int *flag){
   NOT_YET_IMPLEMENTED
 }
 int PMPI_Win_test(MPI_Win win, int *flag){
   NOT_YET_IMPLEMENTED
 }
@@ -3688,6 +3755,3 @@ int PMPI_Win_unlock(int rank, MPI_Win win){
   NOT_YET_IMPLEMENTED
 }
 
   NOT_YET_IMPLEMENTED
 }
 
-int PMPI_Win_wait(MPI_Win win){
-  NOT_YET_IMPLEMENTED
-}
index 356ab58..b35bc1a 100644 (file)
@@ -24,6 +24,8 @@ typedef struct s_smpi_mpi_win{
   xbt_bar_t bar;
   MPI_Win* connected_wins;
   char* name;
   xbt_bar_t bar;
   MPI_Win* connected_wins;
   char* name;
+  int opened;
+  MPI_Group group;
 } s_smpi_mpi_win_t;
 
 
 } s_smpi_mpi_win_t;
 
 
@@ -45,6 +47,8 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info
     info->refcount++;
   win->comm = comm;
   win->name = NULL;
     info->refcount++;
   win->comm = comm;
   win->name = NULL;
+  win->opened = 0;
+  win->group = MPI_GROUP_NULL;
   win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
   win->connected_wins = xbt_malloc0(comm_size*sizeof(MPI_Win));
   win->connected_wins[rank] = win;
   win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
   win->connected_wins = xbt_malloc0(comm_size*sizeof(MPI_Win));
   win->connected_wins[rank] = win;
@@ -100,8 +104,10 @@ void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
 }
 
 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
 }
 
 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
-  if(win->comm != MPI_COMM_NULL)
+  if(win->comm != MPI_COMM_NULL){
     *group = smpi_comm_group(win->comm);
     *group = smpi_comm_group(win->comm);
+    smpi_group_use(*group);
+  }
 }
 
 void smpi_mpi_win_set_name(MPI_Win win, char* name){
 }
 
 void smpi_mpi_win_set_name(MPI_Win win, char* name){
@@ -112,7 +118,8 @@ void smpi_mpi_win_set_name(MPI_Win win, char* name){
 int smpi_mpi_win_fence( int assert,  MPI_Win win){
 
   XBT_DEBUG("Entering fence");
 int smpi_mpi_win_fence( int assert,  MPI_Win win){
 
   XBT_DEBUG("Entering fence");
-
+  if(!win->opened)
+    win->opened=1;
   if(assert != MPI_MODE_NOPRECEDE){
     xbt_barrier_wait(win->bar);
 
   if(assert != MPI_MODE_NOPRECEDE){
     xbt_barrier_wait(win->bar);
 
@@ -142,6 +149,9 @@ int smpi_mpi_win_fence( int assert,  MPI_Win win){
 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
 {
 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
 {
+
+  if(!win->opened)//check that post/start has been done
+    return MPI_ERR_WIN;
   //get receiver pointer
   MPI_Win recv_win = win->connected_wins[target_rank];
 
   //get receiver pointer
   MPI_Win recv_win = win->connected_wins[target_rank];
 
@@ -178,6 +188,8 @@ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datat
 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
 {
 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
 {
+  if(!win->opened)//check that post/start has been done
+    return MPI_ERR_WIN;
   //get sender pointer
   MPI_Win send_win = win->connected_wins[target_rank];
 
   //get sender pointer
   MPI_Win send_win = win->connected_wins[target_rank];
 
@@ -218,6 +230,8 @@ int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datat
 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
 {
 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
 {
+  if(!win->opened)//check that post/start has been done
+    return MPI_ERR_WIN;
   //FIXME: local version 
   //get receiver pointer
   MPI_Win recv_win = win->connected_wins[target_rank];
   //FIXME: local version 
   //get receiver pointer
   MPI_Win recv_win = win->connected_wins[target_rank];
@@ -249,3 +263,183 @@ int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi
   return MPI_SUCCESS;
 }
 
   return MPI_SUCCESS;
 }
 
+int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
+    /* From MPI forum advices
+    The call to MPI_WIN_COMPLETE does not return until the put call has completed at 
+    the origin; and the target window will be accessed by the put operation only 
+    after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by the target
+     process. This still leaves much choice to implementors. The call to 
+     MPI_WIN_START can block until the matching call to MPI_WIN_POST occurs at all 
+    target processes. One can also have implementations where the call to 
+    MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching 
+    call to MPI_WIN_POST occurred; or implementations where the first two calls are 
+    nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call to 
+    MPI_WIN_POST occurred; or even implementations where all three calls can 
+    complete before any target process called MPI_WIN_POST --- the data put must be 
+    buffered, in this last case, so as to allow the put to complete at the origin 
+    ahead of its completion at the target. However, once the call to MPI_WIN_POST is
+     issued, the sequence above must complete, without further dependencies.
+    */
+  
+  //naive, blocking implementation.
+  int i=0,j=0;
+  int size = smpi_group_size(group);
+  MPI_Request* reqs = xbt_new0(MPI_Request, size);
+  
+//  for(i=0;i<size;i++){
+  while(j!=size){
+    int src=smpi_group_index(group,j);
+    if(src!=smpi_process_index()){
+      reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
+      i++;
+    }
+    j++;
+  }
+  size=i;
+  smpi_mpi_startall(size, reqs);
+  smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
+  for(i=0;i<size;i++){
+    smpi_mpi_request_free(&reqs[i]);
+  }
+  xbt_free(reqs);
+  win->opened++; //we're open for business !
+  win->group=group;
+  smpi_group_use(group);
+  return MPI_SUCCESS;
+}
+
+int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
+  //let's make a synchronous send here
+  int i=0,j=0;
+  int size = smpi_group_size(group);
+  MPI_Request* reqs = xbt_new0(MPI_Request, size);
+  
+  while(j!=size){
+    int dst=smpi_group_index(group,j);
+    if(dst!=smpi_process_index()){
+      reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst,
+        RMA_TAG+4, MPI_COMM_WORLD);
+      i++;
+    }
+    j++;
+  }
+  size=i;
+  
+  smpi_mpi_startall(size, reqs);
+  smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
+  for(i=0;i<size;i++){
+    smpi_mpi_request_free(&reqs[i]);
+  }
+  xbt_free(reqs);
+  win->opened++; //we're open for business !
+  win->group=group;
+  smpi_group_use(group);
+  return MPI_SUCCESS;
+}
+
+int smpi_mpi_win_complete(MPI_Win win){
+  if(win->opened==0)
+    xbt_die("Complete called on already opened MPI_Win");
+//  xbt_barrier_wait(win->bar);
+  //MPI_Comm comm = smpi_comm_new(win->group, NULL);
+  //mpi_coll_barrier_fun(comm);
+  //smpi_comm_destroy(comm);
+  
+  XBT_DEBUG("Entering MPI_Win_Complete");
+  int i=0,j=0;
+  int size = smpi_group_size(win->group);
+  MPI_Request* reqs = xbt_new0(MPI_Request, size);
+  
+  while(j!=size){
+    int dst=smpi_group_index(win->group,j);
+    if(dst!=smpi_process_index()){
+      reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst,
+        RMA_TAG+5, MPI_COMM_WORLD);
+      i++;
+    }
+    j++;
+  }
+  size=i;
+  XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
+  smpi_mpi_startall(size, reqs);
+  smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
+  
+  for(i=0;i<size;i++){
+    smpi_mpi_request_free(&reqs[i]);
+  }
+  xbt_free(reqs);
+  
+  //now we can finish RMA calls
+  
+  xbt_dynar_t reqqs = win->requests;
+  size = xbt_dynar_length(reqqs);
+  
+  XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
+  unsigned int cpt=0;
+  MPI_Request req;
+  // start all requests that have been prepared by another process
+  xbt_dynar_foreach(reqqs, cpt, req){
+    if (req->flags & PREPARED) smpi_mpi_start(req);
+  }
+
+  MPI_Request* treqs = xbt_dynar_to_array(reqqs);
+  smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+  xbt_free(treqs);
+  win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
+  win->opened--; //we're closed for business !
+  return MPI_SUCCESS;
+}
+
+
+
+int smpi_mpi_win_wait(MPI_Win win){
+//  xbt_barrier_wait(win->bar);
+  //MPI_Comm comm = smpi_comm_new(win->group, NULL);
+  //mpi_coll_barrier_fun(comm);
+  //smpi_comm_destroy(comm);
+  //naive, blocking implementation.
+  XBT_DEBUG("Entering MPI_Win_Wait");
+  int i=0,j=0;
+  int size = smpi_group_size(win->group);
+  MPI_Request* reqs = xbt_new0(MPI_Request, size);
+  
+//  for(i=0;i<size;i++){
+  while(j!=size){
+    int src=smpi_group_index(win->group,j);
+    if(src!=smpi_process_index()){
+      reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
+      i++;
+    }
+    j++;
+  }
+  size=i;
+  XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
+  smpi_mpi_startall(size, reqs);
+  smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
+  for(i=0;i<size;i++){
+    smpi_mpi_request_free(&reqs[i]);
+  }
+  xbt_free(reqs);
+
+  xbt_dynar_t reqqs = win->requests;
+  size = xbt_dynar_length(reqqs);
+  
+  XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
+
+  unsigned int cpt=0;
+  MPI_Request req;
+  // start all requests that have been prepared by another process
+  xbt_dynar_foreach(reqqs, cpt, req){
+    if (req->flags & PREPARED) smpi_mpi_start(req);
+  }
+
+  MPI_Request* treqs = xbt_dynar_to_array(reqqs);
+  smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+  xbt_free(treqs);
+  win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
+  win->opened--; //we're opened for business !
+  return MPI_SUCCESS;
+}
+
+
+