Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add MPI_Win_lock_all MPI_Win_unlock_all MPI_Win_flush MPI_Win_flush_local MPI_Win_flu...
authordegomme <augustin.degomme@unibas.ch>
Tue, 4 Apr 2017 08:05:58 +0000 (10:05 +0200)
committerdegomme <augustin.degomme@unibas.ch>
Tue, 4 Apr 2017 08:27:12 +0000 (10:27 +0200)
include/smpi/smpi.h
src/smpi/smpi_mpi.cpp
src/smpi/smpi_pmpi.cpp
src/smpi/smpi_win.cpp
src/smpi/smpi_win.hpp

index 079f565..ad6c4d5 100644 (file)
@@ -761,13 +761,21 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Comm_spawn_multiple,(int count, char **array_of_co
                                                    MPI_Comm comm, MPI_Comm *intercomm, int* array_of_errcodes));
 MPI_CALL(XBT_PUBLIC(int), MPI_Comm_get_parent,( MPI_Comm *parent));
 MPI_CALL(XBT_PUBLIC(int),  MPI_Win_complete,(MPI_Win win));
-MPI_CALL(XBT_PUBLIC(int),  MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win));
+
 MPI_CALL(XBT_PUBLIC(int),  MPI_Win_post,(MPI_Group group, int assert, MPI_Win win));
 MPI_CALL(XBT_PUBLIC(int),  MPI_Win_start,(MPI_Group group, int assert, MPI_Win win));
 MPI_CALL(XBT_PUBLIC(int),  MPI_Win_test,(MPI_Win win, int *flag));
-MPI_CALL(XBT_PUBLIC(int),  MPI_Win_unlock,(int rank, MPI_Win win));
 MPI_CALL(XBT_PUBLIC(int),  MPI_Win_wait,(MPI_Win win));
 
+MPI_CALL(XBT_PUBLIC(int),  MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win));
+MPI_CALL(XBT_PUBLIC(int),  MPI_Win_lock_all,(int assert, MPI_Win win));
+MPI_CALL(XBT_PUBLIC(int),  MPI_Win_unlock,(int rank, MPI_Win win));
+MPI_CALL(XBT_PUBLIC(int),  MPI_Win_unlock_all,(MPI_Win win));
+
+MPI_CALL(XBT_PUBLIC(int),  MPI_Win_flush,(int rank, MPI_Win win));
+MPI_CALL(XBT_PUBLIC(int),  MPI_Win_flush_local,(int rank, MPI_Win win));
+MPI_CALL(XBT_PUBLIC(int),  MPI_Win_flush_all,(MPI_Win win));
+MPI_CALL(XBT_PUBLIC(int),  MPI_Win_flush_local_all,(MPI_Win win));
 
 MPI_CALL(XBT_PUBLIC(int),  MPI_File_get_errhandler , (MPI_File file, MPI_Errhandler *errhandler));
 MPI_CALL(XBT_PUBLIC(int),  MPI_File_set_errhandler, (MPI_File file, MPI_Errhandler errhandler));
index 167e1cd..be3cece 100644 (file)
@@ -201,6 +201,12 @@ WRAPPED_PMPI_CALL(int,MPI_Win_start,(MPI_Group group, int assert, MPI_Win win),(
 WRAPPED_PMPI_CALL(int,MPI_Win_wait,(MPI_Win win),(win))
 WRAPPED_PMPI_CALL(int,MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win) ,(lock_type, rank, assert, win))
 WRAPPED_PMPI_CALL(int,MPI_Win_unlock,(int rank, MPI_Win win),(rank, win))
+WRAPPED_PMPI_CALL(int,MPI_Win_lock_all,(int assert, MPI_Win win) ,(assert, win))
+WRAPPED_PMPI_CALL(int,MPI_Win_unlock_all,(MPI_Win win),(win))
+WRAPPED_PMPI_CALL(int,MPI_Win_flush,(int rank, MPI_Win win),(rank, win))
+WRAPPED_PMPI_CALL(int,MPI_Win_flush_local,(int rank, MPI_Win win),(rank, win))
+WRAPPED_PMPI_CALL(int,MPI_Win_flush_all,(MPI_Win win),(win))
+WRAPPED_PMPI_CALL(int,MPI_Win_flush_local_all,(MPI_Win win),(win))
 WRAPPED_PMPI_CALL(int,MPI_Win_get_attr, (MPI_Win type, int type_keyval, void *attribute_val, int* flag), (type, type_keyval, attribute_val, flag))
 WRAPPED_PMPI_CALL(int,MPI_Win_set_attr, (MPI_Win type, int type_keyval, void *att), (type, type_keyval, att))
 WRAPPED_PMPI_CALL(int,MPI_Win_delete_attr, (MPI_Win type, int comm_keyval), (type, comm_keyval))
index d09c579..5c59152 100644 (file)
@@ -2861,6 +2861,100 @@ int PMPI_Win_unlock(int rank, MPI_Win win){
   return retval;
 }
 
+int PMPI_Win_lock_all(int assert, MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else {
+    int myrank = smpi_process()->index();
+    TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr);
+    retval = win->lock_all(assert);
+    TRACE_smpi_collective_out(myrank, -1, __FUNCTION__);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Win_unlock_all(MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else {
+    int myrank = smpi_process()->index();
+    TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr);
+    retval = win->unlock_all();
+    TRACE_smpi_collective_out(myrank, -1, __FUNCTION__);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Win_flush(int rank, MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else if (rank == MPI_PROC_NULL){ 
+    retval = MPI_SUCCESS;
+  } else {
+    int myrank = smpi_process()->index();
+    TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr);
+    retval = win->flush(rank);
+    TRACE_smpi_collective_out(myrank, -1, __FUNCTION__);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Win_flush_local(int rank, MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else if (rank == MPI_PROC_NULL){ 
+    retval = MPI_SUCCESS;
+  } else {
+    int myrank = smpi_process()->index();
+    TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr);
+    retval = win->flush_local(rank);
+    TRACE_smpi_collective_out(myrank, -1, __FUNCTION__);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Win_flush_all(MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else {
+    int myrank = smpi_process()->index();
+    TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr);
+    retval = win->flush_all();
+    TRACE_smpi_collective_out(myrank, -1, __FUNCTION__);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Win_flush_local_all(MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else {
+    int myrank = smpi_process()->index();
+    TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr);
+    retval = win->flush_local_all();
+    TRACE_smpi_collective_out(myrank, -1, __FUNCTION__);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
 int PMPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){
   void *ptr = xbt_malloc(size);
   if(ptr==nullptr)
index bd90414..fb16d16 100644 (file)
@@ -523,6 +523,17 @@ int Win::lock(int lock_type, int rank, int assert){
   return MPI_SUCCESS;
 }
 
+int Win::lock_all(int assert){
+  int i=0;
+  int retval = MPI_SUCCESS;
+  for (i=0; i<comm_->size();i++){
+      int ret = this->lock(MPI_LOCK_SHARED, i, assert);
+      if(ret != MPI_SUCCESS)
+        retval = ret;
+  }
+  return retval;
+}
+
 int Win::unlock(int rank){
   if(opened_!=0)
     return MPI_ERR_WIN;
@@ -541,6 +552,50 @@ int Win::unlock(int rank){
   return MPI_SUCCESS;
 }
 
+int Win::unlock_all(){
+  int i=0;
+  int retval = MPI_SUCCESS;
+  for (i=0; i<comm_->size();i++){
+      int ret = this->unlock(i);
+      if(ret != MPI_SUCCESS)
+        retval = ret;
+  }
+  return retval;
+}
+
+int Win::flush(int rank){
+  MPI_Win target_win = connected_wins_[rank];
+  int finished = finish_comms(rank);
+  XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
+  finished = target_win->finish_comms(rank_);
+  XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
+  return MPI_SUCCESS;
+}
+
+int Win::flush_local(int rank){
+  int finished = finish_comms(rank);
+  XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
+  return MPI_SUCCESS;
+}
+
+int Win::flush_all(){
+  int i=0;
+  int finished = 0;
+  finished = finish_comms();
+  XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
+  for (i=0; i<comm_->size();i++){
+    finished = connected_wins_[i]->finish_comms(rank_);
+    XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
+  }
+  return MPI_SUCCESS;
+}
+
+int Win::flush_local_all(){
+  int finished = finish_comms();
+  XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
+  return MPI_SUCCESS;
+}
+
 Win* Win::f2c(int id){
   return static_cast<Win*>(F2C::f2c(id));
 }
@@ -560,6 +615,35 @@ int Win::finish_comms(){
   return size;
 }
 
+int Win::finish_comms(int rank){
+  xbt_mutex_acquire(mut_);
+  //Finish own requests
+  std::vector<MPI_Request> *reqqs = requests_;
+  int size = static_cast<int>(reqqs->size());
+  if (size > 0) {
+    size = 0;
+    std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
+    std::vector<MPI_Request>::iterator iter = reqqs->begin();
+    while (iter != reqqs->end()){
+      if(((*iter)->src() == rank) || ((*iter)->dst() == rank)){
+          myreqqs->push_back(*iter);
+          iter = reqqs->erase(iter);
+          size++;
+      } else {
+        ++iter;
+      }
+    }
+    if(size >0){
+      MPI_Request* treqs = &(*myreqqs)[0];
+      Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
+      myreqqs->clear();
+      delete myreqqs;
+    }
+  }
+  xbt_mutex_release(mut_);
+  return size;
+}
+
 
 }
 }
index 5beb90c..1d36f36 100644 (file)
@@ -73,7 +73,14 @@ public:
   static Win* f2c(int id);
   int lock(int lock_type, int rank, int assert);
   int unlock(int rank);
+  int lock_all(int assert);
+  int unlock_all();
+  int flush(int rank);
+  int flush_local(int rank);
+  int flush_all();
+  int flush_local_all();
   int finish_comms();
+  int finish_comms(int rank);
 };