Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add Allgather SMP collective from MVAPICH2
authorAugustin Degomme <augustin.degomme@imag.fr>
Fri, 1 Aug 2014 08:49:19 +0000 (10:49 +0200)
committerAugustin Degomme <augustin.degomme@imag.fr>
Fri, 1 Aug 2014 08:51:10 +0000 (10:51 +0200)
buildtools/Cmake/AddTests.cmake
buildtools/Cmake/DefinePackages.cmake
src/smpi/colls/allgather-mvapich-smp.c [new file with mode: 0644]
src/smpi/colls/colls.h
src/smpi/colls/smpi_mvapich2_selector.c
src/smpi/colls/smpi_mvapich2_selector_stampede.h

index de9cf6c..ccb0fd1 100644 (file)
@@ -376,7 +376,7 @@ IF(NOT enable_memcheck)
     ENDFOREACH()
     FOREACH (ALLGATHER_COLL default  2dmesh 3dmesh bruck GB loosely_lr
                             NTSLR NTSLR_NB pair rdb  rhv ring SMP_NTS
-                            smp_simple spreading_simple ompi mpich ompi_neighborexchange mvapich2 impi)
+                            smp_simple spreading_simple ompi mpich ompi_neighborexchange mvapich2 mvapich2_smp impi)
       ADD_TESH(tesh-smpi-allgather-coll-${ALLGATHER_COLL} --cfg smpi/allgather:${ALLGATHER_COLL} --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/smpi/allgather --cd ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/allgather allgather_coll.tesh)
     ENDFOREACH()
     FOREACH (ALLGATHERV_COLL default GB pair ring ompi mpich ompi_neighborexchange ompi_bruck mpich_rdb mpich_ring mvapich2 impi)
index e16e918..21a4ec0 100644 (file)
@@ -143,6 +143,7 @@ set(SMPI_SRC
   src/smpi/colls/allgather-loosely-lr.c
   src/smpi/colls/allgather-ompi-neighborexchange.c
   src/smpi/colls/allgather-pair.c
+  src/smpi/colls/allgather-mvapich-smp.c
   src/smpi/colls/allgather-rdb.c
   src/smpi/colls/allgather-rhv.c
   src/smpi/colls/allgather-ring.c
diff --git a/src/smpi/colls/allgather-mvapich-smp.c b/src/smpi/colls/allgather-mvapich-smp.c
new file mode 100644 (file)
index 0000000..b988967
--- /dev/null
@@ -0,0 +1,122 @@
+#include "colls_private.h"
+
+
+
+int smpi_coll_tuned_allgather_mvapich2_smp(void *sendbuf,int sendcnt, MPI_Datatype sendtype,
+                            void *recvbuf, int recvcnt,MPI_Datatype recvtype,
+                            MPI_Comm  comm)
+{
+    int rank, size;
+    int local_rank, local_size;
+    int leader_comm_size = 0; 
+    int mpi_errno = MPI_SUCCESS;
+    MPI_Aint recvtype_extent = 0;  /* Datatype extent */
+    MPI_Comm shmem_comm, leader_comm;
+
+  if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL){
+    smpi_comm_init_smp(comm);
+  }
+  
+    if(!smpi_comm_is_uniform(comm) || !smpi_comm_is_blocked(comm))
+    THROWF(arg_error,0, "allgather MVAPICH2 smp algorithm can't be used with irregular deployment. Please insure that processes deployed on the same node are contiguous and that each node has the same number of processes");
+  
+    if (recvcnt == 0) {
+        return MPI_SUCCESS;
+    }
+
+    rank = smpi_comm_rank(comm);
+    size = smpi_comm_size(comm);
+
+    /* extract the rank,size information for the intra-node
+     * communicator */
+    recvtype_extent=smpi_datatype_get_extent(recvtype);
+    
+    shmem_comm = smpi_comm_get_intra_comm(comm);
+    local_rank = smpi_comm_rank(shmem_comm);
+    local_size = smpi_comm_size(shmem_comm);
+
+    if (local_rank == 0) {
+        /* Node leader. Extract the rank, size information for the leader
+         * communicator */
+        leader_comm = smpi_comm_get_leaders_comm(comm);
+        if(leader_comm==MPI_COMM_NULL){
+          leader_comm = MPI_COMM_WORLD;
+        }
+        leader_comm_size = smpi_comm_size(leader_comm);
+    }
+
+    /*If there is just one node, after gather itself,
+     * root has all the data and it can do bcast*/
+    if(local_rank == 0) {
+        mpi_errno = mpi_coll_gather_fun(sendbuf, sendcnt,sendtype, 
+                                    (void*)((char*)recvbuf + (rank * recvcnt * recvtype_extent)), 
+                                     recvcnt, recvtype,
+                                     0, shmem_comm);
+    } else {
+        /*Since in allgather all the processes could have 
+         * its own data in place*/
+        if(sendbuf == MPI_IN_PLACE) {
+            mpi_errno = mpi_coll_gather_fun((void*)((char*)recvbuf + (rank * recvcnt * recvtype_extent)), 
+                                         recvcnt , recvtype, 
+                                         recvbuf, recvcnt, recvtype,
+                                         0, shmem_comm);
+        } else {
+            mpi_errno = mpi_coll_gather_fun(sendbuf, sendcnt,sendtype, 
+                                         recvbuf, recvcnt, recvtype,
+                                         0, shmem_comm);
+        }
+    }
+    /* Exchange the data between the node leaders*/
+    if (local_rank == 0 && (leader_comm_size > 1)) {
+        /*When data in each socket is different*/
+        if (smpi_comm_is_uniform(comm) != 1) {
+
+            int *displs = NULL;
+            int *recvcnts = NULL;
+            int *node_sizes = NULL;
+            int i = 0;
+
+            node_sizes = smpi_comm_get_non_uniform_map(comm);
+
+            displs = xbt_malloc(sizeof (int) * leader_comm_size);
+            recvcnts = xbt_malloc(sizeof (int) * leader_comm_size);
+            if (!displs || !recvcnts) {
+                return MPI_ERR_OTHER;
+            }
+            recvcnts[0] = node_sizes[0] * recvcnt;
+            displs[0] = 0;
+
+            for (i = 1; i < leader_comm_size; i++) {
+                displs[i] = displs[i - 1] + node_sizes[i - 1] * recvcnt;
+                recvcnts[i] = node_sizes[i] * recvcnt;
+            }
+
+
+            void* sendbuf=((char*)recvbuf)+smpi_datatype_get_extent(recvtype)*displs[smpi_comm_rank(leader_comm)];
+
+            mpi_errno = mpi_coll_allgatherv_fun(sendbuf,
+                                       (recvcnt*local_size),
+                                       recvtype, 
+                                       recvbuf, recvcnts,
+                                       displs, recvtype,
+                                       leader_comm);
+            xbt_free(displs);
+            xbt_free(recvcnts);
+        } else {
+        void* sendtmpbuf=((char*)recvbuf)+smpi_datatype_get_extent(recvtype)*(recvcnt*local_size)*smpi_comm_rank(leader_comm);
+        
+          
+
+            mpi_errno = smpi_coll_tuned_allgather_mpich(sendtmpbuf, 
+                                               (recvcnt*local_size),
+                                               recvtype,
+                                               recvbuf, (recvcnt*local_size), recvtype,
+                                             leader_comm);
+
+        }
+    }
+
+    /*Bcast the entire data from node leaders to all other cores*/
+    mpi_errno = mpi_coll_bcast_fun (recvbuf, recvcnt * size, recvtype, 0, shmem_comm);
+    return mpi_errno;
+}
index dff58e5..78d04b8 100644 (file)
@@ -76,6 +76,7 @@ COLL_APPLY(action, COLL_ALLGATHER_SIG, spreading_simple) COLL_sep \
 COLL_APPLY(action, COLL_ALLGATHER_SIG, ompi) COLL_sep \
 COLL_APPLY(action, COLL_ALLGATHER_SIG, ompi_neighborexchange) COLL_sep \
 COLL_APPLY(action, COLL_ALLGATHER_SIG, mvapich2) COLL_sep \
+COLL_APPLY(action, COLL_ALLGATHER_SIG, mvapich2_smp) COLL_sep \
 COLL_APPLY(action, COLL_ALLGATHER_SIG, mpich) COLL_sep \
 COLL_APPLY(action, COLL_ALLGATHER_SIG, impi) COLL_sep \
 COLL_APPLY(action, COLL_ALLGATHER_SIG, automatic)
index d9e3565..14335a5 100644 (file)
@@ -92,12 +92,12 @@ int smpi_coll_tuned_allgather_mvapich2(void *sendbuf, int sendcount, MPI_Datatyp
   int mpi_errno = MPI_SUCCESS;
   int nbytes = 0, comm_size, recvtype_size;
   int range = 0;
-  //int partial_sub_ok = 0;
+  int partial_sub_ok = 0;
   int conf_index = 0;
   int range_threshold = 0;
   int is_two_level = 0;
-  //int local_size = -1;
-  //MPI_Comm shmem_comm;
+  int local_size = -1;
+  MPI_Comm shmem_comm;
   //MPI_Comm *shmem_commptr=NULL;
   /* Get the size of the communicator */
   comm_size = smpi_comm_size(comm);
@@ -106,34 +106,35 @@ int smpi_coll_tuned_allgather_mvapich2(void *sendbuf, int sendcount, MPI_Datatyp
 
   if(mv2_allgather_table_ppn_conf==NULL)
     init_mv2_allgather_tables_stampede();
+    
+  if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL){
+    smpi_comm_init_smp(comm);
+  }
 
-  //int i;
-  /* check if safe to use partial subscription mode */
-  /*  if (comm->ch.shmem_coll_ok == 1 && comm->ch.is_uniform) {
-
-        shmem_comm = comm->ch.shmem_comm;
-        MPID_Comm_get_ptr(shmem_comm, shmem_commptr);
-        local_size = shmem_commptr->local_size;
-        i = 0;
-        if (mv2_allgather_table_ppn_conf[0] == -1) {
-            // Indicating user defined tuning
-            conf_index = 0;
-            goto conf_check_end;
-        }
-        do {
-            if (local_size == mv2_allgather_table_ppn_conf[i]) {
-                conf_index = i;
-                partial_sub_ok = 1;
-                break;
-            }
-            i++;
-        } while(i < mv2_allgather_num_ppn_conf);
+  int i;
+  if (smpi_comm_is_uniform(comm)){
+    shmem_comm = smpi_comm_get_intra_comm(comm);
+    local_size = smpi_comm_size(shmem_comm);
+    i = 0;
+    if (mv2_allgather_table_ppn_conf[0] == -1) {
+      // Indicating user defined tuning
+      conf_index = 0;
+      goto conf_check_end;
     }
-
+    do {
+      if (local_size == mv2_allgather_table_ppn_conf[i]) {
+        conf_index = i;
+        partial_sub_ok = 1;
+        break;
+      }
+      i++;
+    } while(i < mv2_allgather_num_ppn_conf);
+  }
   conf_check_end:
-    if (partial_sub_ok != 1) {
-        conf_index = 0;
-    }*/
+  if (partial_sub_ok != 1) {
+    conf_index = 0;
+  }
+  
   /* Search for the corresponding system size inside the tuning table */
   while ((range < (mv2_size_allgather_tuning_table[conf_index] - 1)) &&
       (comm_size >
@@ -158,24 +159,21 @@ int smpi_coll_tuned_allgather_mvapich2(void *sendbuf, int sendcount, MPI_Datatyp
 
   /* intracommunicator */
   if(is_two_level ==1){
-
-      /*       if(comm->ch.shmem_coll_ok == 1){
-            MPIR_T_PVAR_COUNTER_INC(MV2, mv2_num_shmem_coll_calls, 1);
-          if (1 == comm->ch.is_blocked) {
-                mpi_errno = MPIR_2lvl_Allgather_MV2(sendbuf, sendcount, sendtype,
-                                                   recvbuf, recvcount, recvtype,
-                                                   comm, errflag);
-          }
-          else {
-              mpi_errno = MPIR_Allgather_intra(sendbuf, sendcount, sendtype,
-                                               recvbuf, recvcount, recvtype,
-                                               comm, errflag);
-          }
-        } else {*/
+    if(partial_sub_ok ==1){
+      if (smpi_comm_is_blocked(comm)){
+      mpi_errno = MPIR_2lvl_Allgather_MV2(sendbuf, sendcount, sendtype,
+                            recvbuf, recvcount, recvtype,
+                            comm);
+      }else{
+      mpi_errno = smpi_coll_tuned_allgather_mpich(sendbuf, sendcount, sendtype,
+                            recvbuf, recvcount, recvtype,
+                            comm);
+      }
+    } else {
       mpi_errno = MPIR_Allgather_RD_MV2(sendbuf, sendcount, sendtype,
           recvbuf, recvcount, recvtype,
           comm);
-      //     }
+    }
   } else if(MV2_Allgather_function == &MPIR_Allgather_Bruck_MV2
       || MV2_Allgather_function == &MPIR_Allgather_RD_MV2
       || MV2_Allgather_function == &MPIR_Allgather_Ring_MV2) {
index b86dbb6..8503dbe 100644 (file)
@@ -331,7 +331,7 @@ static int MPIR_Allgather_RD_Allgather_Comm_MV2( void *sendbuf,
 #define MPIR_Allgather_Bruck_MV2 smpi_coll_tuned_allgather_bruck
 #define MPIR_Allgather_RD_MV2 smpi_coll_tuned_allgather_rdb
 #define MPIR_Allgather_Ring_MV2 smpi_coll_tuned_allgather_ring
-
+#define MPIR_2lvl_Allgather_MV2 smpi_coll_tuned_allgather_mvapich2_smp
 
 static void init_mv2_allgather_tables_stampede(){
   int i;