Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Test of an MPI_Ibarrier implementation.
authordegomme <adegomme@users.noreply.github.com>
Thu, 28 Mar 2019 11:12:01 +0000 (12:12 +0100)
committerdegomme <adegomme@users.noreply.github.com>
Thu, 28 Mar 2019 17:35:56 +0000 (18:35 +0100)
Instead of a helper process, or of a scheduler as used by MPI implementations, let's play dumb for now and just use a bunch of Isend/Irecv, and just test them all when needed.
pros:
- just .. works ?
- we have the default naive algorithms already implemented for most of the collectives, so in the end we just have to spilt the startall and waitall parts.
cons:
- simple basic algos only.
More advanced ones would need a scheduler with progress management, let's do this later if needed (and MPI implementations have the odd idea that it's fine to advance the scheduler's round only when MPI_Test is called, so we might actually be faster in the end).

include/smpi/smpi.h
src/smpi/bindings/smpi_mpi.cpp
src/smpi/bindings/smpi_pmpi_coll.cpp
src/smpi/colls/smpi_nbc_impl.cpp [new file with mode: 0644]
src/smpi/include/smpi_coll.hpp
src/smpi/include/smpi_request.hpp
src/smpi/mpi/smpi_request.cpp
teshsuite/smpi/mpich3-test/coll/CMakeLists.txt
teshsuite/smpi/mpich3-test/coll/testlist
tools/cmake/DefinePackages.cmake

index a0835da..0568399 100644 (file)
@@ -569,6 +569,7 @@ MPI_CALL(XBT_PUBLIC MPI_Fint, MPI_Request_c2f, (MPI_Request request));
 
 MPI_CALL(XBT_PUBLIC int, MPI_Bcast, (void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm));
 MPI_CALL(XBT_PUBLIC int, MPI_Barrier, (MPI_Comm comm));
+MPI_CALL(XBT_PUBLIC int, MPI_Ibarrier, (MPI_Comm comm, MPI_Request *request));
 MPI_CALL(XBT_PUBLIC int, MPI_Gather, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
                                       MPI_Datatype recvtype, int root, MPI_Comm comm));
 MPI_CALL(XBT_PUBLIC int, MPI_Gatherv, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
index 5c905b7..fad1c55 100644 (file)
@@ -140,6 +140,7 @@ WRAPPED_PMPI_CALL(int,MPI_Group_rank,(MPI_Group group, int *rank),(group, rank))
 WRAPPED_PMPI_CALL(int,MPI_Group_size,(MPI_Group group, int *size),(group, size))
 WRAPPED_PMPI_CALL(int,MPI_Group_translate_ranks,(MPI_Group group1, int n, int *ranks1, MPI_Group group2, int *ranks2),(group1, n, ranks1, group2, ranks2))
 WRAPPED_PMPI_CALL(int,MPI_Group_union,(MPI_Group group1, MPI_Group group2, MPI_Group * newgroup),(group1, group2, newgroup))
+WRAPPED_PMPI_CALL(int,MPI_Ibarrier,(MPI_Comm comm, MPI_Request *request),(comm,request))
 WRAPPED_PMPI_CALL(int,MPI_Info_create,( MPI_Info *info),( info))
 WRAPPED_PMPI_CALL(int,MPI_Info_delete,(MPI_Info info, char *key),(info, key))
 WRAPPED_PMPI_CALL(int,MPI_Info_dup,(MPI_Info info, MPI_Info *newinfo),(info, newinfo))
index 93202eb..755c0db 100644 (file)
@@ -6,6 +6,7 @@
 #include "private.hpp"
 #include "smpi_coll.hpp"
 #include "smpi_comm.hpp"
+#include "smpi_request.hpp"
 #include "smpi_datatype_derived.hpp"
 #include "smpi_op.hpp"
 #include "src/smpi/include/smpi_actor.hpp"
@@ -66,6 +67,24 @@ int PMPI_Barrier(MPI_Comm comm)
   return retval;
 }
 
+int PMPI_Ibarrier(MPI_Comm comm, MPI_Request *request)
+{
+  int retval = 0;
+  smpi_bench_end();
+  if (comm == MPI_COMM_NULL) {
+    retval = MPI_ERR_COMM;
+  } else if(request == nullptr){
+    retval = MPI_ERR_ARG;
+  }else{
+    int rank = simgrid::s4u::this_actor::get_pid();
+    TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("ibarrier"));
+    simgrid::smpi::Colls::Ibarrier(comm, request);
+    TRACE_smpi_comm_out(rank);
+  }    
+  smpi_bench_begin();
+  return retval;
+}
+
 int PMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount, MPI_Datatype recvtype,
                 int root, MPI_Comm comm)
 {
diff --git a/src/smpi/colls/smpi_nbc_impl.cpp b/src/smpi/colls/smpi_nbc_impl.cpp
new file mode 100644 (file)
index 0000000..3cccebb
--- /dev/null
@@ -0,0 +1,51 @@
+/* Asynchronous parts of the basic collective algorithms, meant to be used both for the naive default implementation, but also for non blocking collectives */
+
+/* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "colls_private.hpp"
+#include "src/smpi/include/smpi_actor.hpp"
+
+namespace simgrid{
+namespace smpi{
+
+
+int Colls::Ibarrier(MPI_Comm comm, MPI_Request* request)
+{
+  int i;
+  int size = comm->size();
+  int rank = comm->rank();
+  MPI_Request* requests;
+  (*request) = new Request( nullptr, 0, MPI_BYTE,
+                         rank,rank, COLL_TAG_BARRIER, comm, MPI_REQ_NON_PERSISTENT);
+  (*request)->ref();
+  if (rank > 0) {
+    requests = new MPI_Request[2];
+    requests[0] = Request::isend (nullptr, 0, MPI_BYTE, 0,
+                             COLL_TAG_BARRIER,
+                             comm);
+    requests[1] = Request::irecv (nullptr, 0, MPI_BYTE, 0,
+                             COLL_TAG_BARRIER,
+                             comm);
+    (*request)->set_nbc_requests(requests, 2);
+  }
+  else {
+    requests = new MPI_Request[(size-1)*2];
+    for (i = 1; i < 2*size-1; i+=2) {
+        requests[i-1] = Request::irecv(nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE,
+                                 COLL_TAG_BARRIER, comm
+                                 );
+        requests[i] = Request::isend(nullptr, 0, MPI_BYTE, (i+1)/2,
+                                 COLL_TAG_BARRIER,
+                                 comm
+                                 );
+    }
+    (*request)->set_nbc_requests(requests, 2*(size-1));
+  }
+  return MPI_SUCCESS;
+}
+
+}
+}
index 0299f6c..245a854 100644 (file)
@@ -115,6 +115,9 @@ public:
                       MPI_Datatype recvtype, int root, MPI_Comm comm);
   static int scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
   static int exscan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
+  
+  //async collectives
+  static int Ibarrier(MPI_Comm comm, MPI_Request* request);
 
   static void (*smpi_coll_cleanup_callback)();
 };
index e157353..3224828 100644 (file)
@@ -49,6 +49,8 @@ class Request : public F2C {
   MPI_Op op_;
   int cancelled_;
   smpi_mpi_generalized_request_funcs generalized_funcs;
+  MPI_Request* nbc_requests_;
+  int nbc_requests_size_;
 
 public:
   Request() = default;
@@ -66,6 +68,7 @@ public:
   void start();
   void cancel();
   void ref();
+  void set_nbc_requests(MPI_Request* reqs, int size);
   static void finish_wait(MPI_Request* request, MPI_Status* status);
   static void unref(MPI_Request* request);
   static int wait(MPI_Request* req, MPI_Status* status);
index 694a2b1..437b892 100644 (file)
@@ -71,6 +71,9 @@ Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst,
     refcount_ = 0;
   op_   = MPI_REPLACE;
   cancelled_ = 0;
+  generalized_funcs=nullptr;
+  nbc_requests_=nullptr;
+  nbc_requests_size_=0;
 }
 
 void Request::ref(){
@@ -520,6 +523,19 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
   // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
   static int nsleeps = 1;
   int ret = MPI_SUCCESS;
+  
+  // are we testing a request meant for non blocking comms ?
+  // If so, test all the subrequests.
+  if ((*request)->nbc_requests_size_>0){
+    ret = testall((*request)->nbc_requests_size_, (*request)->nbc_requests_, flag, MPI_STATUSES_IGNORE);
+    if(*flag){
+      delete[] (*request)->nbc_requests_;
+      (*request)->nbc_requests_size_=0;
+      unref(request);
+    }
+    return ret;
+  }
+  
   if(smpi_test_sleep > 0)
     simcall_process_sleep(nsleeps*smpi_test_sleep);
 
@@ -789,6 +805,8 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
       status->MPI_ERROR = req->truncated_ != 0 ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
       // this handles the case were size in receive differs from size in send
       status->count = req->real_size_;
+//      int flag;
+//      Request::get_status(req,&flag,status);
     }
 
     req->print_request("Finishing");
@@ -1110,5 +1128,10 @@ int Request::grequest_complete( MPI_Request request){
   return MPI_SUCCESS;
 }
 
+void Request::set_nbc_requests(MPI_Request* reqs, int size){
+  nbc_requests_=reqs;
+  nbc_requests_size_=size;
+}
+
 }
 }
index 1c28008..cb7d90f 100644 (file)
@@ -18,7 +18,8 @@ if(enable_smpi AND enable_smpi_MPICH3_testsuite)
               coll2 coll3 coll4 coll5 coll6 coll7 coll8 coll9 coll10 coll11 coll12 coll13
               exscan exscan2 
               gather gather2 gather_big
-            # iallred ibarrier icallgather icallgatherv icallreduce
+               ibarrier
+            # iallred icallgather icallgatherv icallreduce
              # icalltoall icalltoallv icalltoallw icbarrier icbcast
             # icgather icgatherv icreduce icscatter icscatterv
               longuser
index 6158b7a..3281f60 100644 (file)
@@ -144,7 +144,7 @@ nonblocking3 10 timeLimit=600 mpiversion=3.0
 iallred 2 mpiversion=3.0
 # ibarrier will hang forever if it fails, but will complete quickly if it
 # succeeds
-ibarrier 2 mpiversion=3.0 timeLimit=30
+ibarrier 2 timeLimit=30
 
 # run some of the tests, relinked with the nbc_pmpi_adaptor.o file
 nballtoall1 8 mpiversion=3.0
index e504118..e5a6a85 100644 (file)
@@ -203,6 +203,7 @@ set(SMPI_SRC
   src/smpi/colls/reduce/reduce-rab.cpp
   src/smpi/colls/scatter/scatter-ompi.cpp
   src/smpi/colls/scatter/scatter-mvapich-two-level.cpp
+  src/smpi/colls/smpi_nbc_impl.cpp
   src/smpi/colls/smpi_automatic_selector.cpp
   src/smpi/colls/smpi_default_selector.cpp
   src/smpi/colls/smpi_mpich_selector.cpp