Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
end of ompi collectives addition, with barrier
authordegomme <degomme@debian.localdomain>
Thu, 13 Jun 2013 09:36:27 +0000 (11:36 +0200)
committerdegomme <degomme@debian.localdomain>
Thu, 13 Jun 2013 09:36:27 +0000 (11:36 +0200)
buildtools/Cmake/DefinePackages.cmake
src/include/smpi/smpi_interface.h
src/simgrid/sg_config.c
src/smpi/colls/barrier-ompi.c [new file with mode: 0644]
src/smpi/colls/colls.h
src/smpi/colls/smpi_openmpi_selector.c
src/smpi/smpi_coll.c
src/smpi/smpi_global.c
src/smpi/smpi_pmpi.c

index 4b5c74e..6d5586f 100644 (file)
@@ -198,6 +198,7 @@ set(SMPI_SRC
   src/smpi/colls/gather-ompi.c
   src/smpi/colls/reduce_scatter-ompi.c
   src/smpi/colls/scatter-ompi.c
+  src/smpi/colls/barrier-ompi.c
   )
 
 if(SMPI_F2C)
index 7ba0fdc..04fc1da 100644 (file)
@@ -108,6 +108,14 @@ XBT_PUBLIC_DATA(int (*mpi_coll_scatter_fun)
                 void *recvbuf, int recvcount, MPI_Datatype recvtype,
                 int root, MPI_Comm comm));
 
+/** \ingroup MPI barrier
+ *  \brief The list of all available allgather collectives
+ */
+XBT_PUBLIC_DATA(s_mpi_coll_description_t) mpi_coll_barrier_description[];
+XBT_PUBLIC_DATA(int (*mpi_coll_barrier_fun)
+                (MPI_Comm comm));
+
+
 XBT_PUBLIC(void) coll_help(const char *category,
                            s_mpi_coll_description_t * table);
 XBT_PUBLIC(int) find_coll_description(s_mpi_coll_description_t * table,
index c29f5e8..3022b61 100644 (file)
@@ -282,6 +282,9 @@ static void _sg_cfg_cb__coll_reduce_scatter(const char *name, int pos){
 static void _sg_cfg_cb__coll_scatter(const char *name, int pos){
   _sg_cfg_cb__coll("scatter", mpi_coll_scatter_description, name, pos);
 }
+static void _sg_cfg_cb__coll_barrier(const char *name, int pos){
+  _sg_cfg_cb__coll("barrier", mpi_coll_barrier_description, name, pos);
+}
 #endif
 
 /* callback of the inclusion path */
@@ -778,6 +781,11 @@ void sg_config_init(int *argc, char **argv)
                     xbt_cfgelm_string, NULL, 1, 1, &_sg_cfg_cb__coll_allgather,
                     NULL);
 
+    xbt_cfg_register(&_sg_cfg_set, "smpi/barrier",
+                    "Which collective to use for barrier",
+                    xbt_cfgelm_string, NULL, 1, 1, &_sg_cfg_cb__coll_barrier,
+                    NULL);
+
     xbt_cfg_register(&_sg_cfg_set, "smpi/reduce_scatter",
                     "Which collective to use for reduce_scatter",
                     xbt_cfgelm_string, NULL, 1, 1, &_sg_cfg_cb__coll_reduce_scatter,
diff --git a/src/smpi/colls/barrier-ompi.c b/src/smpi/colls/barrier-ompi.c
new file mode 100644 (file)
index 0000000..ad5b748
--- /dev/null
@@ -0,0 +1,344 @@
+/*
+ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
+ *                         University Research and Technology
+ *                         Corporation.  All rights reserved.
+ * Copyright (c) 2004-2006 The University of Tennessee and The University
+ *                         of Tennessee Research Foundation.  All rights
+ *                         reserved.
+ * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
+ *                         University of Stuttgart.  All rights reserved.
+ * Copyright (c) 2004-2005 The Regents of the University of California.
+ *                         All rights reserved.
+ * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
+ * $COPYRIGHT$
+ *
+ * Additional copyrights may follow
+ *
+ * $HEADER$
+ */
+
+#include "colls_private.h"
+#include "coll_tuned_topo.h"
+
+
+#define MCA_COLL_BASE_TAG_BARRIER 100
+/*
+ * Barrier is ment to be a synchronous operation, as some BTLs can mark 
+ * a request done before its passed to the NIC and progress might not be made 
+ * elsewhere we cannot allow a process to exit the barrier until its last 
+ * [round of] sends are completed.
+ *
+ * It is last round of sends rather than 'last' individual send as each pair of 
+ * peers can use different channels/devices/btls and the receiver of one of 
+ * these sends might be forced to wait as the sender
+ * leaves the collective and does not make progress until the next mpi call 
+ *
+ */
+
+/*
+ * Simple double ring version of barrier
+ *
+ * synchronous gurantee made by last ring of sends are synchronous
+ *
+ */
+int smpi_coll_tuned_barrier_ompi_doublering(MPI_Comm comm
+                                            )
+{
+    int rank, size;
+    int left, right;
+
+
+    rank = smpi_comm_rank(comm);
+    size = smpi_comm_size(comm);
+
+    XBT_DEBUG("ompi_coll_tuned_barrier_ompi_doublering rank %d", rank);
+  
+    left = ((rank-1)%size);
+    right = ((rank+1)%size);
+
+    if (rank > 0) { /* receive message from the left */
+        smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, left, 
+                                MCA_COLL_BASE_TAG_BARRIER, comm, 
+                                MPI_STATUS_IGNORE);
+    }
+
+    /* Send message to the right */
+    smpi_mpi_send((void*)NULL, 0, MPI_BYTE, right, 
+                            MCA_COLL_BASE_TAG_BARRIER, 
+                             comm);
+
+    /* root needs to receive from the last node */
+    if (rank == 0) {
+        smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, left, 
+                                MCA_COLL_BASE_TAG_BARRIER, comm, 
+                                MPI_STATUS_IGNORE);
+    }
+
+    /* Allow nodes to exit */
+    if (rank > 0) { /* post Receive from left */
+        smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, left, 
+                                MCA_COLL_BASE_TAG_BARRIER, comm, 
+                                MPI_STATUS_IGNORE);
+    }
+
+    /* send message to the right one */
+    smpi_mpi_send((void*)NULL, 0, MPI_BYTE, right, 
+                            MCA_COLL_BASE_TAG_BARRIER, 
+                             comm);
+    /* rank 0 post receive from the last node */
+    if (rank == 0) {
+        smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, left, 
+                                MCA_COLL_BASE_TAG_BARRIER, comm, 
+                                MPI_STATUS_IGNORE);
+    }
+
+    return MPI_SUCCESS;
+
+}
+
+
+/*
+ * To make synchronous, uses sync sends and sync sendrecvs
+ */
+
+int smpi_coll_tuned_barrier_ompi_recursivedoubling(MPI_Comm comm
+                                                   )
+{
+    int rank, size, adjsize;
+    int mask, remote;
+
+    rank = smpi_comm_rank(comm);
+    size = smpi_comm_size(comm);
+    XBT_DEBUG(
+                 "ompi_coll_tuned_barrier_ompi_recursivedoubling rank %d", 
+                 rank);
+
+    /* do nearest power of 2 less than size calc */
+    for( adjsize = 1; adjsize <= size; adjsize <<= 1 );
+    adjsize >>= 1;
+
+    /* if size is not exact power of two, perform an extra step */
+    if (adjsize != size) {
+        if (rank >= adjsize) {
+            /* send message to lower ranked node */
+            remote = rank - adjsize;
+            smpi_mpi_sendrecv(NULL, 0, MPI_BYTE, remote,
+                                                  MCA_COLL_BASE_TAG_BARRIER,
+                                                  NULL, 0, MPI_BYTE, remote,
+                                                  MCA_COLL_BASE_TAG_BARRIER,
+                                                  comm, MPI_STATUS_IGNORE);
+
+        } else if (rank < (size - adjsize)) {
+
+            /* receive message from high level rank */
+            smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, rank+adjsize,
+                                    MCA_COLL_BASE_TAG_BARRIER, comm, 
+                                    MPI_STATUS_IGNORE);
+
+        }
+    }
+
+    /* exchange messages */
+    if ( rank < adjsize ) {
+        mask = 0x1;
+        while ( mask < adjsize ) {
+            remote = rank ^ mask;
+            mask <<= 1;
+            if (remote >= adjsize) continue;
+
+            /* post receive from the remote node */
+            smpi_mpi_sendrecv(NULL, 0, MPI_BYTE, remote,
+                                                  MCA_COLL_BASE_TAG_BARRIER,
+                                                  NULL, 0, MPI_BYTE, remote,
+                                                  MCA_COLL_BASE_TAG_BARRIER,
+                                                  comm, MPI_STATUS_IGNORE);
+        }
+    }
+
+    /* non-power of 2 case */
+    if (adjsize != size) {
+        if (rank < (size - adjsize)) {
+            /* send enter message to higher ranked node */
+            remote = rank + adjsize;
+            smpi_mpi_send((void*)NULL, 0, MPI_BYTE, remote, 
+                                    MCA_COLL_BASE_TAG_BARRIER, 
+                                     comm);
+
+        }
+    }
+
+    return MPI_SUCCESS;
+
+}
+
+
+/*
+ * To make synchronous, uses sync sends and sync sendrecvs
+ */
+
+int smpi_coll_tuned_barrier_ompi_bruck(MPI_Comm comm
+                                       )
+{
+    int rank, size;
+    int distance, to, from;
+
+    rank = smpi_comm_rank(comm);
+    size = smpi_comm_size(comm);
+    XBT_DEBUG(
+                 "ompi_coll_tuned_barrier_ompi_bruck rank %d", rank);
+
+    /* exchange data with rank-2^k and rank+2^k */
+    for (distance = 1; distance < size; distance <<= 1) { 
+        from = (rank + size - distance) % size;
+        to   = (rank + distance) % size;
+
+        /* send message to lower ranked node */
+        smpi_mpi_sendrecv(NULL, 0, MPI_BYTE, to, 
+                                              MCA_COLL_BASE_TAG_BARRIER,
+                                              NULL, 0, MPI_BYTE, from, 
+                                              MCA_COLL_BASE_TAG_BARRIER,
+                                              comm, MPI_STATUS_IGNORE);
+    }
+
+    return MPI_SUCCESS;
+
+}
+
+
+/*
+ * To make synchronous, uses sync sends and sync sendrecvs
+ */
+/* special case for two processes */
+int smpi_coll_tuned_barrier_ompi_two_procs(MPI_Comm comm
+                                           )
+{
+    int remote;
+
+    remote = smpi_comm_rank(comm);
+    XBT_DEBUG(
+                 "ompi_coll_tuned_barrier_ompi_two_procs rank %d", remote);
+    remote = (remote + 1) & 0x1;
+
+    smpi_mpi_sendrecv(NULL, 0, MPI_BYTE, remote, 
+                                          MCA_COLL_BASE_TAG_BARRIER, 
+                                          NULL, 0, MPI_BYTE, remote, 
+                                          MCA_COLL_BASE_TAG_BARRIER,
+                                          comm, MPI_STATUS_IGNORE);
+    return (MPI_SUCCESS);
+}
+
+
+/*
+ * Linear functions are copied from the BASIC coll module
+ * they do not segment the message and are simple implementations
+ * but for some small number of nodes and/or small data sizes they
+ * are just as fast as tuned/tree based segmenting operations
+ * and as such may be selected by the decision functions
+ * These are copied into this module due to the way we select modules
+ * in V1. i.e. in V2 we will handle this differently and so will not
+ * have to duplicate code.
+ * GEF Oct05 after asking Jeff.
+ */
+
+/* copied function (with appropriate renaming) starts here */
+
+int smpi_coll_tuned_barrier_ompi_basic_linear(MPI_Comm comm)
+{
+    int i;
+    int size = smpi_comm_size(comm);
+    int rank = smpi_comm_rank(comm);
+
+    /* All non-root send & receive zero-length message. */
+
+    if (rank > 0) {
+        smpi_mpi_send (NULL, 0, MPI_BYTE, 0, 
+                                 MCA_COLL_BASE_TAG_BARRIER,
+                                  comm);
+
+        smpi_mpi_recv (NULL, 0, MPI_BYTE, 0, 
+                                 MCA_COLL_BASE_TAG_BARRIER,
+                                 comm, MPI_STATUS_IGNORE);
+    }
+
+    /* The root collects and broadcasts the messages. */
+
+    else {
+        MPI_Request* requests;
+
+        requests = (MPI_Request*)malloc( size * sizeof(MPI_Request) );
+        for (i = 1; i < size; ++i) {
+            requests[i] = smpi_mpi_irecv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE,
+                                     MCA_COLL_BASE_TAG_BARRIER, comm
+                                     );
+        }
+        smpi_mpi_waitall( size-1, requests+1, MPI_STATUSES_IGNORE );
+
+        for (i = 1; i < size; ++i) {
+            requests[i] = smpi_mpi_isend(NULL, 0, MPI_BYTE, i,
+                                     MCA_COLL_BASE_TAG_BARRIER, 
+                                      comm
+                                     );
+        }
+        smpi_mpi_waitall( size-1, requests+1, MPI_STATUSES_IGNORE );
+        free( requests );
+    }
+
+    /* All done */
+
+    return MPI_SUCCESS;
+
+}
+/* copied function (with appropriate renaming) ends here */
+
+/*
+ * Another recursive doubling type algorithm, but in this case
+ * we go up the tree and back down the tree.  
+ */
+int smpi_coll_tuned_barrier_ompi_tree(MPI_Comm comm)
+{
+    int rank, size, depth;
+    int jump, partner;
+
+    rank = smpi_comm_rank(comm);
+    size = smpi_comm_size(comm);
+    XBT_DEBUG(
+                 "ompi_coll_tuned_barrier_ompi_tree %d", 
+                 rank);
+
+    /* Find the nearest power of 2 of the communicator size. */
+    for(depth = 1; depth < size; depth <<= 1 );
+
+    for (jump=1; jump<depth; jump<<=1) {
+        partner = rank ^ jump;
+        if (!(partner & (jump-1)) && partner < size) {
+            if (partner > rank) {
+                smpi_mpi_recv (NULL, 0, MPI_BYTE, partner, 
+                                         MCA_COLL_BASE_TAG_BARRIER, comm,
+                                         MPI_STATUS_IGNORE);
+            } else if (partner < rank) {
+                smpi_mpi_send (NULL, 0, MPI_BYTE, partner,
+                                         MCA_COLL_BASE_TAG_BARRIER, 
+                                          comm);
+            }
+        }
+    }
+    
+    depth>>=1;
+    for (jump = depth; jump>0; jump>>=1) {
+        partner = rank ^ jump;
+        if (!(partner & (jump-1)) && partner < size) {
+            if (partner > rank) {
+                smpi_mpi_send (NULL, 0, MPI_BYTE, partner,
+                                         MCA_COLL_BASE_TAG_BARRIER,
+                                          comm);
+            } else if (partner < rank) {
+                smpi_mpi_recv (NULL, 0, MPI_BYTE, partner, 
+                                         MCA_COLL_BASE_TAG_BARRIER, comm,
+                                         MPI_STATUS_IGNORE);
+            }
+        }
+    }
+
+    return MPI_SUCCESS;
+}
index 12627c0..09e47ce 100644 (file)
@@ -229,7 +229,7 @@ COLL_REDUCE_SCATTERS(COLL_PROTO, COLL_NOsep)
 
 
 /*************
- * REDUCE_SCATTER *
+ * SCATTER *
  *************/
 #define COLL_SCATTER_SIG scatter, int, \
                 (void *sendbuf, int sendcount, MPI_Datatype sendtype,\
@@ -243,4 +243,22 @@ COLL_APPLY(action, COLL_SCATTER_SIG, ompi_binomial)
 
 COLL_SCATTERS(COLL_PROTO, COLL_NOsep)
 
+/*************
+ * SCATTER *
+ *************/
+#define COLL_BARRIER_SIG barrier, int, \
+                (MPI_Comm comm)
+
+#define COLL_BARRIERS(action, COLL_sep) \
+COLL_APPLY(action, COLL_BARRIER_SIG, ompi) COLL_sep \
+COLL_APPLY(action, COLL_BARRIER_SIG, ompi_basic_linear) COLL_sep \
+COLL_APPLY(action, COLL_BARRIER_SIG, ompi_two_procs)  COLL_sep \
+COLL_APPLY(action, COLL_BARRIER_SIG, ompi_tree)  COLL_sep \
+COLL_APPLY(action, COLL_BARRIER_SIG, ompi_bruck)  COLL_sep \
+COLL_APPLY(action, COLL_BARRIER_SIG, ompi_recursivedoubling) COLL_sep \
+COLL_APPLY(action, COLL_BARRIER_SIG, ompi_doublering)  
+
+COLL_BARRIERS(COLL_PROTO, COLL_NOsep)
+
+
 #endif
index f5a23c4..819242f 100644 (file)
@@ -99,27 +99,27 @@ int smpi_coll_tuned_alltoallv_ompi(void *sbuf, int *scounts, int *sdisps,
                                                         comm);
 }
 
-/*
-void smpi_coll_tuned_barrier_ompi(MPI_Comm  comm)
+
+int smpi_coll_tuned_barrier_ompi(MPI_Comm  comm)
 {    int communicator_size = smpi_comm_size(comm);
 
     if( 2 == communicator_size )
-        return smpi_coll_tuned_barrier_intra_two_procs(comm, module);
-     * Basic optimisation. If we have a power of 2 number of nodes
-     * the use the recursive doubling algorithm, otherwise
-     * bruck is the one we want.
+        return smpi_coll_tuned_barrier_ompi_two_procs(comm);
+/*     * Basic optimisation. If we have a power of 2 number of nodes*/
+/*     * the use the recursive doubling algorithm, otherwise*/
+/*     * bruck is the one we want.*/
     {
-        bool has_one = false;
+        int has_one = 0;
         for( ; communicator_size > 0; communicator_size >>= 1 ) {
             if( communicator_size & 0x1 ) {
                 if( has_one )
-                    return smpi_coll_tuned_barrier_intra_bruck(comm, module);
-                has_one = true;
+                    return smpi_coll_tuned_barrier_ompi_bruck(comm);
+                has_one = 1;
             }
         }
     }
-    return smpi_coll_tuned_barrier_intra_recursivedoubling(comm, module);
-}*/
+    return smpi_coll_tuned_barrier_ompi_recursivedoubling(comm);
+}
 
 int smpi_coll_tuned_bcast_ompi(void *buff, int count,
                                           MPI_Datatype datatype, int root,
index 50ee98e..172ddb1 100644 (file)
@@ -63,6 +63,13 @@ COLL_SCATTERS(COLL_DESCRIPTION, COLL_COMMA),
   {NULL, NULL, NULL}      /* this array must be NULL terminated */
 };
 
+s_mpi_coll_description_t mpi_coll_barrier_description[] = {
+  {"default",
+   "barrier default collective",
+   smpi_mpi_barrier},
+COLL_BARRIERS(COLL_DESCRIPTION, COLL_COMMA),
+  {NULL, NULL, NULL}      /* this array must be NULL terminated */
+};
 s_mpi_coll_description_t mpi_coll_alltoall_description[] = {
   {"default",
    "Ompi alltoall default collective",
@@ -161,6 +168,7 @@ int (*mpi_coll_bcast_fun)(void *buf, int count, MPI_Datatype datatype, int root,
 int (*mpi_coll_reduce_fun)(void *buf, void *rbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm);
 int (*mpi_coll_reduce_scatter_fun)(void *sbuf, void *rbuf, int *rcounts,MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
 int (*mpi_coll_scatter_fun)(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount, MPI_Datatype recvtype,int root, MPI_Comm comm);
+int (*mpi_coll_barrier_fun)(MPI_Comm comm);
 struct s_proc_tree {
   int PROCTREE_A;
   int numChildren;
index d980dfc..c62c103 100644 (file)
@@ -420,6 +420,12 @@ int smpi_main(int (*realmain) (int argc, char *argv[]),int argc, char *argv[])
                 void *recvbuf, int recvcount, MPI_Datatype recvtype,\
                 int root, MPI_Comm comm))
                           mpi_coll_scatter_description[scatter_id].coll;
+
+  int barrier_id = find_coll_description(mpi_coll_barrier_description,
+                                           sg_cfg_get_string("smpi/barrier"));
+  mpi_coll_barrier_fun = (int (*)(MPI_Comm comm))
+                          mpi_coll_barrier_description[barrier_id].coll;
+
   smpi_global_init();
 
   /* Clean IO before the run */
index 32fb308..8e890ec 100644 (file)
@@ -1579,7 +1579,7 @@ int PMPI_Barrier(MPI_Comm comm)
   if (comm == MPI_COMM_NULL) {
     retval = MPI_ERR_COMM;
   } else {
-    smpi_mpi_barrier(comm);
+    mpi_coll_barrier_fun(comm);
     retval = MPI_SUCCESS;
   }
 #ifdef HAVE_TRACING