Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
... untested ...
authorgenaud <genaud@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 20 Jul 2009 19:23:06 +0000 (19:23 +0000)
committergenaud <genaud@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 20 Jul 2009 19:23:06 +0000 (19:23 +0000)
* second part in tuned coll alltoall: basic_linear
* enriched a bit more datatypes (ub,lb).
  MPI_Type_get_extent() . -Does not allow datatype creation.)

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6524 48e7efb5-ca39-0410-a469-dd3cf9ba447f

include/smpi/smpi.h
src/smpi/README
src/smpi/private.h
src/smpi/smpi_base.c
src/smpi/smpi_coll.c
src/smpi/smpi_coll_private.h
src/smpi/smpi_global.c
src/smpi/smpi_mpi.c

index d4e4fee..f4bc183 100644 (file)
@@ -21,6 +21,7 @@ SG_BEGIN_DECL()
 #define MPI_ERR_COUNT   6
 #define MPI_ERR_RANK    7
 #define MPI_ERR_TAG     8
+#define MPI_ERR_TRUNCATE 9
 // MPI_Comm
      typedef struct smpi_mpi_communicator_t *smpi_mpi_communicator_t;
      typedef smpi_mpi_communicator_t MPI_Comm;
@@ -71,6 +72,7 @@ SG_BEGIN_DECL()
 #define MPI_COMM_NULL     NULL
 
 #define MPI_STATUS_IGNORE NULL
+#define MPI_Aint ptrdiff_t
 
 #define MPI_BYTE          (smpi_mpi_global->mpi_byte)
 #define MPI_CHAR          (smpi_mpi_global->mpi_char)
@@ -89,8 +91,11 @@ SG_BEGIN_DECL()
 #define MPI_Finalize() SMPI_MPI_Finalize()
 #define MPI_Abort(a, b) SMPI_MPI_Abort(a, b)
 #define MPI_Comm_size(a, b) SMPI_MPI_Comm_size(a, b)
+#define MPI_Comm_split(a, b, c, d) SMPI_MPI_Comm_split(a, b, c, d)
 #define MPI_Comm_rank(a, b) SMPI_MPI_Comm_rank(a, b)
 #define MPI_Type_size(a, b) SMPI_MPI_Type_size(a, b)
+#define MPI_Type_get_extent(a, b, c) SMPI_MPI_Type_get_extent(a, b, c)
+
 #define MPI_Barrier(a) SMPI_MPI_Barrier(a)
 #define MPI_Irecv(a, b, c, d, e, f, g) SMPI_MPI_Irecv(a, b, c, d, e, f, g)
 #define MPI_Recv(a, b, c, d, e, f, g) SMPI_MPI_Recv(a, b, c, d, e, f, g)
@@ -101,7 +106,6 @@ SG_BEGIN_DECL()
 #define MPI_Wait(a, b) SMPI_MPI_Wait(a, b)
 #define MPI_Waitall(a, b, c) SMPI_MPI_Waitall(a, b, c)
 #define MPI_Waitany(a, b, c, d) SMPI_MPI_Waitany(a, b, c, d)
-#define MPI_Comm_split(a, b, c, d) SMPI_MPI_Comm_split(a, b, c, d)
 #define MPI_Wtime() SMPI_MPI_Wtime()
 #define MPI_Reduce( a, b, c, d, e, f, g) SMPI_MPI_Reduce( a, b, c, d, e, f, g)
 #define MPI_Allreduce( a, b, c, d, e, f) SMPI_MPI_Allreduce( a, b, c, d, e, f)
@@ -115,6 +119,8 @@ XBT_PUBLIC(int) SMPI_MPI_Abort(MPI_Comm comm, int errorcode);
 XBT_PUBLIC(int) SMPI_MPI_Comm_size(MPI_Comm comm, int *size);
 XBT_PUBLIC(int) SMPI_MPI_Comm_rank(MPI_Comm comm, int *rank);
 XBT_PUBLIC(int) SMPI_MPI_Type_size(MPI_Datatype datatype, size_t * size);
+XBT_PUBLIC(int) SMPI_MPI_Type_get_extent(MPI_Datatype datatype, MPI_Aint* lb, MPI_Aint *extent);
+
 XBT_PUBLIC(int) SMPI_MPI_Barrier(MPI_Comm comm);
 XBT_PUBLIC(int) SMPI_MPI_Irecv(void *buf, int count, MPI_Datatype datatype,
                                int src, int tag, MPI_Comm comm,
index c3cfb9b..cac137c 100644 (file)
@@ -51,3 +51,52 @@ a LD_LIBRARY_PATH variable pointing to src/.libs
 
 
 
+What's implemented
+==================
+
+As a proof of concept, and due to lack of time, the implementation is far from complete
+with respect to a MPI-1.2 specification. Here is what is implemented so far. Please update
+if you can.
+
+
+* MPI_Datatypes:
+MPI_BYTE  
+MPI_CHAR  
+MPI_INT   
+MPI_FLOAT
+MPI_DOUBLE
+
+
+* MPI_Op:
+MPI_LAND  
+MPI_SUM   
+MPI_PROD  
+MPI_MIN   
+MPI_MAX   
+
+*primitives:
+MPI_Init
+MPI_Finalize
+MPI_Abort
+MPI_Comm_size
+MPI_Comm_split
+MPI_Comm_rank
+MPI_Type_size
+MPI_Barrier
+MPI_Irecv
+MPI_Recv
+MPI_Isend
+MPI_Send
+MPI_Sendrecv
+MPI_Bcast
+MPI_Wait
+MPI_Waitall
+MPI_Waitany
+MPI_Wtime
+MPI_Reduce
+MPI_Allreduce
+MPI_Scatter
+MPI_Alltoall
+
+
+
index e01c624..e1f5b9d 100644 (file)
@@ -27,6 +27,10 @@ typedef struct smpi_mpi_communicator_t {
 // smpi mpi datatype
 typedef struct smpi_mpi_datatype_t {
   size_t size;
+  ptrdiff_t lb;
+  ptrdiff_t ub;
+  uint16_t flags; /* flags: has it been committed, etc ...*/
+  uint16_t id;    /* data id, normally the index in the data array. */
 } s_smpi_mpi_datatype_t;
 
 // smpi mpi request
@@ -123,6 +127,7 @@ typedef struct smpi_host_data_t *smpi_process_data_t;
 void smpi_process_init(int *argc, char ***argv);
 void smpi_process_finalize(void);
 int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm);
+int smpi_mpi_type_get_extent(MPI_Datatype datatype, MPI_Aint *lb, MPI_Aint *extent);
 
 int smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
 int smpi_mpi_barrier(smpi_mpi_communicator_t comm);
index 498fed2..7021f11 100644 (file)
@@ -3,6 +3,7 @@
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
                                 "Logging specific to SMPI (base)");
+XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
@@ -13,6 +14,22 @@ XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
 
 smpi_mpi_global_t smpi_mpi_global = NULL;
 
+
+/**
+ * Get the lower bound and extent for a Datatype 
+ *
+ * FIXME: this an incomplete implementation as we do not support yet MPI_Type_commit.
+ * Hence, this can be called only for primitive type MPI_INT, MPI_DOUBLE, ...
+ *
+ * remark: MPI-1 has also the deprecated 
+ * int MPI_Type_extent(MPI_Datatype datatype, *MPI_Aint *extent);
+ *
+ **/
+int smpi_mpi_type_get_extent(MPI_Datatype datatype, MPI_Aint *lb, MPI_Aint *extent) {
+        *extent =  datatype->ub - datatype->lb;
+        return( MPI_SUCCESS );
+}
+
 /**
  * Operations of MPI_OP : implemented=land,sum,min,max
  **/
index 9c8dd5c..e5a1ebd 100644 (file)
@@ -16,6 +16,8 @@
 #include "private.h"
 #include "smpi_coll_private.h"
 
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_coll, smpi,
+                                "Logging specific to SMPI (coll)");
 
 /* proc_tree taken and translated from P2P-MPI */
 
@@ -293,7 +295,6 @@ int retval;
 /**
  * Barrier
  **/
-
 int nary_tree_barrier( MPI_Comm comm , int arity)
 {
         int rank;
@@ -321,6 +322,16 @@ int nary_tree_barrier( MPI_Comm comm , int arity)
 }
 
 
+/**
+ * Alltoall pairwise
+ *
+ * this algorithm performs size steps (1<=s<=size) and
+ * at each step s, a process p sends iand receive to.from a unique distinct remote process
+ * size=5 : s=1:  4->0->1, 0->1->2, 1->2->3, ... 
+ *          s=2:  3->0->2, 4->1->3, 0->2->4, 1->3->0 , 2->4->1
+ *          .... 
+ * Openmpi calls this routine when the message size sent to each rank is greater than 3000 bytes
+ **/
 
 int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatype datatype,
                    void* recvbuf, int recvcount, MPI_Datatype recvdatatype, MPI_Comm comm)
@@ -356,15 +367,147 @@ int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatyp
                                                  comm, MPI_STATUS_IGNORE);
          }
          return(retval);
+}
+
+/**
+ * helper: copy a datatype into another (in the simple case dt1=dt2) 
+**/
+int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype, void *rbuf, int rcount, const MPI_Datatype rdtype);
+int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype,
+             void *rbuf, int rcount, const MPI_Datatype rdtype)
+{
+    /* First check if we really have something to do */
+    if (0 == rcount) {
+        return ((0 == scount) ? MPI_SUCCESS : MPI_ERR_TRUNCATE);
+    }
+   /* If same datatypes used, just copy. */
+   if (sdtype == rdtype) {
+      int count = ( scount < rcount ? scount : rcount );
+      memcpy( rbuf, sbuf, sdtype->size*count);
+      return ((scount > rcount) ? MPI_ERR_TRUNCATE : MPI_SUCCESS);
+   }
+   /* FIXME:  cases 
+    * - If receive packed. 
+    * - If send packed
+    * to be treated once we have the MPI_Pack things ...
+    **/
+     return( MPI_SUCCESS );
+}
+
+/**
+ *
+ **/
+int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype sdtype,
+                   void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm)
+{
+         int i;
+        int system_tag = 999;
+         int rank;
+         int size = comm->size;
+         int err;
+         char *psnd;
+         char *prcv;
+        int nreq = 0;
+         MPI_Aint lb;
+         MPI_Aint sndinc;
+         MPI_Aint rcvinc;
+         MPI_Request *reqs;
+
+         /* Initialize. */
+         rank = smpi_mpi_comm_rank(comm);
+
+        err = smpi_mpi_type_get_extent(sdtype, &lb, &sndinc);
+        err = smpi_mpi_type_get_extent(rdtype, &lb, &rcvinc);
+         /* simple optimization */
+         psnd = ((char *) sbuf) + (rank * sndinc);
+         prcv = ((char *) rbuf) + (rank * rcvinc);
+
+         err = copy_dt( psnd, scount, sdtype, prcv, rcount, rdtype );
+         if (MPI_SUCCESS != err) {
+                   return err;
+         }
+
+         /* If only one process, we're done. */
+         if (1 == size) {
+                   return MPI_SUCCESS;
+         }
+
+         /* Initiate all send/recv to/from others. */
+         reqs =  xbt_malloc(2*(size-1) * sizeof(smpi_mpi_request_t));
+
+         prcv = (char *) rbuf;
+         psnd = (char *) sbuf;
+
+         /* Post all receives first -- a simple optimization */
+         for (i = (rank + 1) % size; i != rank; i = (i + 1) % size) {
+                   err = smpi_create_request( prcv + (i * rcvinc), rcount, rdtype,
+                                        i, rank,
+                                        system_tag + i,
+                                        comm, &(reqs[nreq]));
+                if (MPI_SUCCESS != err) {
+                        DEBUG2("[%d] failed to create request for rank %d\n",rank,i);
+                        for (i=0;i< nreq;i++) 
+                                xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
+                        return err;
+                }
+                nreq++;
+         }
+         /* Now post all sends in reverse order 
+          *        - We would like to minimize the search time through message queue
+          *                 when messages actually arrive in the order in which they were posted.
+          *                      */
+         for (i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size ) {
+                   err = smpi_create_request (psnd + (i * sndinc), scount, sdtype, 
+                                         rank, i,
+                                                    system_tag + i, 
+                                         comm, &(reqs[nreq]));
+                if (MPI_SUCCESS != err) {
+                        DEBUG2("[%d] failed to create request for rank %d\n",rank,i);
+                        for (i=0;i< nreq;i++) 
+                                xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
+                        return err;
+                }
+                nreq++;
+        }
+
+        /* Start your engines.  This will never return an error. */
+        for ( i=0; i< nreq/2; i++ ) {
+            smpi_mpi_irecv( reqs[i] );
+        }
+        for ( i= nreq/2; i<nreq; i++ ) {
+            smpi_mpi_isend( reqs[i] );
+        }
+
 
+        /* Wait for them all.  If there's an error, note that we don't
+         * care what the error was -- just that there *was* an error.  The
+         * PML will finish all requests, even if one or more of them fail.
+         * i.e., by the end of this call, all the requests are free-able.
+         * So free them anyway -- even if there was an error, and return
+         * the error after we free everything. */
+
+         err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE);
+
+         /* Free the reqs */
+        for (i=0;i< 2*(size-1);i++) {
+            xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
+        }
+         return err;
 }
 
+
+/**
+ * Alltoall Bruck 
+ *
+ * Openmpi calls this routine when the message size sent to each rank < 2000 bytes and size < 12
+ **/
+
+
 int smpi_coll_tuned_alltoall_bruck(void *sendbuf, int sendcount, MPI_Datatype sdtype,
                    void* recvbuf, int recvcount, MPI_Datatype rdtype, MPI_Comm comm)
 {
-         /*
+/*       int size = comm->size;
          int i, k, line = -1;
-         int rank, size;
          int sendto, recvfrom, distance, *displs=NULL, *blen=NULL;
          int maxpacksize, packsize, position;
          char * tmpbuf=NULL, *packbuf=NULL;
@@ -373,35 +516,27 @@ int smpi_coll_tuned_alltoall_bruck(void *sendbuf, int sendcount, MPI_Datatype sd
          int weallocated = 0;
          MPI_Datatype iddt;
 
-         size = ompi_comm_size(comm);
-         rank = ompi_comm_rank(comm);
-
-         OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_bruck rank %d", rank));
-
-         err = ompi_ddt_get_extent (sdtype, &lb, &sext);
-         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
-
-         err = ompi_ddt_get_extent (rdtype, &lb, &rext);
-         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
-
+         rank = smpi_mpi_comm_rank(comm);
+*/
+         INFO0("coll:tuned:alltoall_intra_bruck ** NOT IMPLEMENTED YET**");
+/*
+         displs = xbt_malloc(size*sizeof(int));
+         blen =   xbt_malloc(size*sizeof(int));
 
-         displs = (int *) malloc(size*sizeof(int));
-         if (displs == NULL) { line = __LINE__; err = -1; goto err_hndl; }
-         blen = (int *) malloc(size*sizeof(int));
-         if (blen == NULL) { line = __LINE__; err = -1; goto err_hndl; }
          weallocated = 1;
 */
          /* Prepare for packing data */
-         /*err = MPI_Pack_size( scount*size, sdtype, comm, &maxpacksize );
-         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
+/*
+         err = MPI_Pack_size( scount*size, sdtype, comm, &maxpacksize );
+         if (err != MPI_SUCCESS) {  }
 */
          /* pack buffer allocation */
 /*       packbuf = (char*) malloc((unsigned) maxpacksize);
-         if (packbuf == NULL) { line = __LINE__; err = -1; goto err_hndl; }
+         if (packbuf == NULL) { }
 */
          /* tmp buffer allocation for message data */
-/*       tmpbuf = (char *) malloc(scount*size*sext);
-         if (tmpbuf == NULL) { line = __LINE__; err = -1; goto err_hndl; }
+/*       tmpbuf = xbt_malloc(scount*size*sext);
+         if (tmpbuf == NULL) {  }
 */
 
          /* Step 1 - local rotation - shift up by rank */
@@ -496,8 +631,7 @@ err_hndl:
          }
          return err;
          */
-         int NOTYET=1;
-         return NOTYET;
+         return -1; /* FIXME: to be changed*/
 }
 
 
index 7647b0e..b4b1e9d 100644 (file)
@@ -19,3 +19,6 @@ int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatyp
                                           void* recvbuf, int recvcount, MPI_Datatype recvdatatype,
                                                   MPI_Comm comm);
 
+int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype sdtype,
+                                void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm);
+
index f604a9b..44a75b3 100644 (file)
@@ -193,16 +193,30 @@ void smpi_global_init()
   }
 
   // mpi datatypes
-  smpi_mpi_global->mpi_byte = xbt_new(s_smpi_mpi_datatype_t, 1);
+  smpi_mpi_global->mpi_byte = xbt_new(s_smpi_mpi_datatype_t, 1); /* we can think of it as a placeholder for value*/
   smpi_mpi_global->mpi_byte->size = (size_t) 1;
+  smpi_mpi_global->mpi_byte->lb = (ptrdiff_t) &(smpi_mpi_global->mpi_byte);
+  smpi_mpi_global->mpi_byte->ub = smpi_mpi_global->mpi_byte->lb + smpi_mpi_global->mpi_byte->size;
+
   smpi_mpi_global->mpi_char = xbt_new(s_smpi_mpi_datatype_t, 1);
   smpi_mpi_global->mpi_char->size = (size_t) 1;
+  smpi_mpi_global->mpi_char->lb = (ptrdiff_t) &(smpi_mpi_global->mpi_char);
+  smpi_mpi_global->mpi_char->ub = smpi_mpi_global->mpi_char->lb + smpi_mpi_global->mpi_char->size; 
+
   smpi_mpi_global->mpi_int = xbt_new(s_smpi_mpi_datatype_t, 1);
   smpi_mpi_global->mpi_int->size = sizeof(int);
+  smpi_mpi_global->mpi_int->lb = (ptrdiff_t) &(smpi_mpi_global->mpi_int);
+  smpi_mpi_global->mpi_int->ub = smpi_mpi_global->mpi_int->lb + smpi_mpi_global->mpi_int->size;
+
   smpi_mpi_global->mpi_float = xbt_new(s_smpi_mpi_datatype_t, 1);
   smpi_mpi_global->mpi_float->size = sizeof(float);
+  smpi_mpi_global->mpi_float->lb = (ptrdiff_t) &(smpi_mpi_global->mpi_float);
+  smpi_mpi_global->mpi_float->ub = smpi_mpi_global->mpi_float->lb + smpi_mpi_global->mpi_float->size;
+
   smpi_mpi_global->mpi_double = xbt_new(s_smpi_mpi_datatype_t, 1);
   smpi_mpi_global->mpi_double->size = sizeof(double);
+  smpi_mpi_global->mpi_double->lb = (ptrdiff_t) &(smpi_mpi_global->mpi_float);
+  smpi_mpi_global->mpi_double->ub = smpi_mpi_global->mpi_double->lb + smpi_mpi_global->mpi_double->size;
 
   // mpi operations
   smpi_mpi_global->mpi_land = xbt_new(s_smpi_mpi_op_t, 1);
index c1f5253..fb2e86b 100644 (file)
@@ -66,6 +66,11 @@ int SMPI_MPI_Comm_rank(MPI_Comm comm, int *rank)
   return retval;
 }
 
+
+//------------------------------- Datatypes ---------------------------------------
+/**
+ * query the size of the type
+ **/
 int SMPI_MPI_Type_size(MPI_Datatype datatype, size_t * size)
 {
   int retval = MPI_SUCCESS;
@@ -85,6 +90,18 @@ int SMPI_MPI_Type_size(MPI_Datatype datatype, size_t * size)
   return retval;
 }
 
+
+/**
+ * query extent and lower bound of the type 
+ **/
+int SMPI_MPI_Type_get_extent( MPI_Datatype datatype, int *lb, int *extent) 
+{
+        return( smpi_mpi_type_get_extent( datatype, lb, extent));
+}
+
+
+
+
 int SMPI_MPI_Barrier(MPI_Comm comm)
 {
   int retval = MPI_SUCCESS;
@@ -575,10 +592,7 @@ int SMPI_MPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype datatype,
                                  recvbuf, recvcount, recvtype, comm);
 
   } else if (block_dsize < 3000) {
-/* use this one !!         retval = smpi_coll_tuned_alltoall_basic_linear(sendbuf, sendcount, datatype,
-                                 recvbuf, recvcount, recvtype, comm);
-                                 */
-  retval = smpi_coll_tuned_alltoall_pairwise(sendbuf, sendcount, datatype,
+           retval = smpi_coll_tuned_alltoall_basic_linear(sendbuf, sendcount, datatype,
                                  recvbuf, recvcount, recvtype, comm);
   } else {