Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
replace failing algo with a new version from mpich
authordegomme <augustin.degomme@unibas.ch>
Sat, 11 Feb 2017 01:42:02 +0000 (02:42 +0100)
committerdegomme <augustin.degomme@unibas.ch>
Sun, 12 Feb 2017 01:08:28 +0000 (02:08 +0100)
src/smpi/colls/bcast-scatter-rdb-allgather.c

index 7bda0f7..582b8f8 100644 (file)
-/* Copyright (c) 2013-2014. The SimGrid Team.
- * All rights reserved.                                                     */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
 #include "colls_private.h"
+#include "src/smpi/smpi_mpi_dt_private.h"
 
-/*****************************************************************************
-
-Copyright (c) 2006, Ahmad Faraj & Xin Yuan,
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-  * Redistributions of source code must retain the above copyright notice,
-    this list of conditions and the following disclaimer.
-
-  * Redistributions in binary form must reproduce the above copyright notice,
-    this list of conditions and the following disclaimer in the documentation
-    and/or other materials provided with the distribution.
-
-  * Neither the name of the Florida State University nor the names of its
-    contributors may be used to endorse or promote products derived from this
-    software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
-ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
-ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-  *************************************************************************
-  *     Any results obtained from executing this software require the     *
-  *     acknowledgment and citation of the software and its owners.       *
-  *     The full citation is given below:                                 *
-  *                                                                       *
-  *     A. Faraj and X. Yuan. "Automatic Generation and Tuning of MPI     *
-  *     Collective Communication Routines." The 19th ACM International    *
-  *     Conference on Supercomputing (ICS), Cambridge, Massachusetts,     *
-  *     June 20-22, 2005.                                                 *
-  *************************************************************************
+static int scatter_for_bcast(
+    int root,
+    MPI_Comm comm,
+    int nbytes,
+    void *tmp_buf)
+{
+    MPI_Status status;
+    int        rank, comm_size, src, dst;
+    int        relative_rank, mask;
+    int mpi_errno = MPI_SUCCESS;
+    int scatter_size, curr_size, recv_size = 0, send_size;
+
+    comm_size = smpi_comm_size(comm);
+    rank = smpi_comm_rank(comm);
+    relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
+
+    /* use long message algorithm: binomial tree scatter followed by an allgather */
+
+    /* The scatter algorithm divides the buffer into nprocs pieces and
+       scatters them among the processes. Root gets the first piece,
+       root+1 gets the second piece, and so forth. Uses the same binomial
+       tree algorithm as above. Ceiling division
+       is used to compute the size of each piece. This means some
+       processes may not get any data. For example if bufsize = 97 and
+       nprocs = 16, ranks 15 and 16 will get 0 data. On each process, the
+       scattered data is stored at the same offset in the buffer as it is
+       on the root process. */ 
+
+    scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
+    curr_size = (rank == root) ? nbytes : 0; /* root starts with all the
+                                                data */
+
+    mask = 0x1;
+    while (mask < comm_size)
+    {
+        if (relative_rank & mask)
+        {
+            src = rank - mask; 
+            if (src < 0) src += comm_size;
+            recv_size = nbytes - relative_rank*scatter_size;
+            /* recv_size is larger than what might actually be sent by the
+               sender. We don't need compute the exact value because MPI
+               allows you to post a larger recv.*/ 
+            if (recv_size <= 0)
+            {
+                curr_size = 0; /* this process doesn't receive any data
+                                  because of uneven division */
+            }
+            else
+            {
+                smpi_mpi_recv(((char *)tmp_buf +
+                                          relative_rank*scatter_size),
+                                         recv_size, MPI_BYTE, src,
+                                         COLL_TAG_BCAST, comm, &status);
+                /* query actual size of data received */
+                curr_size=smpi_mpi_get_count(&status, MPI_BYTE);
+            }
+            break;
+        }
+        mask <<= 1;
+    }
 
-*****************************************************************************/
+    /* This process is responsible for all processes that have bits
+       set from the LSB upto (but not including) mask.  Because of
+       the "not including", we start by shifting mask back down
+       one. */
 
-/*****************************************************************************
+    mask >>= 1;
+    while (mask > 0)
+    {
+        if (relative_rank + mask < comm_size)
+        {
+            send_size = curr_size - scatter_size * mask; 
+            /* mask is also the size of this process's subtree */
+
+            if (send_size > 0)
+            {
+                dst = rank + mask;
+                if (dst >= comm_size) dst -= comm_size;
+                smpi_mpi_send(((char *)tmp_buf +
+                                          scatter_size*(relative_rank+mask)),
+                                         send_size, MPI_BYTE, dst,
+                                         COLL_TAG_BCAST, comm);
+                curr_size -= send_size;
+            }
+        }
+        mask >>= 1;
+    }
 
- * Function: bcast_scatter_rdb_allgather
+    return mpi_errno;
+}
 
- * Return: int
+int
+smpi_coll_tuned_bcast_scatter_rdb_allgather (
+    void *buffer, 
+    int count, 
+    MPI_Datatype datatype, 
+    int root, 
+    MPI_Comm comm)
+{
+    MPI_Status status;
+    int rank, comm_size, dst;
+    int relative_rank, mask;
+    int mpi_errno = MPI_SUCCESS;
+    int scatter_size, curr_size, recv_size = 0;
+    int j, k, i, tmp_mask, is_contig, is_homogeneous;
+    MPI_Aint type_size, nbytes = 0;
+    int relative_dst, dst_tree_root, my_tree_root, send_offset;
+    int recv_offset, tree_root, nprocs_completed, offset;
+    int position;
+    MPI_Aint true_extent, true_lb;
+    void *tmp_buf;
+
+    comm_size = smpi_comm_size(comm);
+    rank = smpi_comm_rank(comm);
+    relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
+
+    /* If there is only one process, return */
+    if (comm_size == 1) goto fn_exit;
+
+    //if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)
+    if(datatype->flags & DT_FLAG_CONTIGUOUS)
+        is_contig = 1;
+    else {
+        is_contig = 0;
+    }
 
- * Inputs:
-    buff: send input buffer
-    count: number of elements to send
-    data_type: data type of elements being sent
-    root: source of data
-    comm: communicator
+    is_homogeneous = 1;
 
- * Descrp: broadcasts using a scatter followed by rdb allgather.
+    /* MPI_Type_size() might not give the accurate size of the packed
+     * datatype for heterogeneous systems (because of padding, encoding,
+     * etc). On the other hand, MPI_Pack_size() can become very
+     * expensive, depending on the implementation, especially for
+     * heterogeneous systems. We want to use MPI_Type_size() wherever
+     * possible, and MPI_Pack_size() in other places.
+     */
+    if (is_homogeneous)
+        type_size=smpi_datatype_size(datatype);
 
- * Auther: MPICH / modified by Ahmad Faraj
+    nbytes = type_size * count;
+    if (nbytes == 0)
+        goto fn_exit; /* nothing to do */
 
- ****************************************************************************/
+    if (is_contig && is_homogeneous)
+    {
+        /* contiguous and homogeneous. no need to pack. */
+        smpi_datatype_extent(datatype, &true_lb, &true_extent);
 
-int
-smpi_coll_tuned_bcast_scatter_rdb_allgather(void *buff, int count, MPI_Datatype
-                                            data_type, int root, MPI_Comm comm)
-{
-  MPI_Aint extent;
-  MPI_Status status;
-
-  int i, j, k, src, dst, rank, num_procs, send_offset, recv_offset;
-  int mask, relative_rank, curr_size, recv_size = 0, send_size, nbytes;
-  int scatter_size, tree_root, relative_dst, dst_tree_root;
-  int my_tree_root, offset, tmp_mask, num_procs_completed;
-  int tag = COLL_TAG_BCAST;
-
-  rank = smpi_comm_rank(comm);
-  num_procs = smpi_comm_size(comm);
-  extent = smpi_datatype_get_extent(data_type);
-
-  nbytes = extent * count;
-  scatter_size = (nbytes + num_procs - 1) / num_procs;  // ceiling division 
-  curr_size = (rank == root) ? nbytes : 0;      // root starts with all the data
-  relative_rank = (rank >= root) ? rank - root : rank - root + num_procs;
-
-  mask = 0x1;
-  while (mask < num_procs) {
-    if (relative_rank & mask) {
-      src = rank - mask;
-      if (src < 0)
-        src += num_procs;
-      recv_size = nbytes - relative_rank * scatter_size;
-      //  recv_size is larger than what might actually be sent by the
-      //  sender. We don't need compute the exact value because MPI
-      //  allows you to post a larger recv.
-      if (recv_size <= 0)
-        curr_size = 0;          // this process doesn't receive any data
-      // because of uneven division 
-      else {
-        smpi_mpi_recv((char *)buff + relative_rank * scatter_size, recv_size,
-                 MPI_BYTE, src, tag, comm, &status);
-        curr_size = smpi_mpi_get_count(&status, MPI_BYTE);
-      }
-      break;
+        tmp_buf = (char *) buffer + true_lb;
     }
-    mask <<= 1;
-  }
-
-  // This process is responsible for all processes that have bits
-  // set from the LSB upto (but not including) mask.  Because of
-  // the "not including", we start by shifting mask back down
-  // one.
-
-  mask >>= 1;
-  while (mask > 0) {
-    if (relative_rank + mask < num_procs) {
-      send_size = curr_size - scatter_size * mask;
-      // mask is also the size of this process's subtree 
-
-      if (send_size > 0) {
-        dst = rank + mask;
-        if (dst >= num_procs)
-          dst -= num_procs;
-        smpi_mpi_send((char *)buff + scatter_size * (relative_rank + mask),
-                 send_size, MPI_BYTE, dst, tag, comm);
-
-        curr_size -= send_size;
-      }
+    else
+    {
+        tmp_buf=(void*)xbt_malloc(nbytes);
+
+        /* TODO: Pipeline the packing and communication */
+        position = 0;
+        if (rank == root) {
+            mpi_errno = smpi_mpi_pack(buffer, count, datatype, tmp_buf, nbytes,
+                                       &position, comm);
+            if (mpi_errno) xbt_die("crash while packing %d", mpi_errno);
+        }
     }
-    mask >>= 1;
-  }
 
-  // done scatter now do allgather
 
+    scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
 
-  mask = 0x1;
-  i = 0;
-  while (mask < num_procs) {
-    relative_dst = relative_rank ^ mask;
-
-    dst = (relative_dst + root) % num_procs;
-
-    /* find offset into send and recv buffers.
-       zero out the least significant "i" bits of relative_rank and
-       relative_dst to find root of src and dst
-       subtrees. Use ranks of roots as index to send from
-       and recv into  buffer */
+    mpi_errno = scatter_for_bcast(root, comm,
+                                  nbytes, tmp_buf);
+    if (mpi_errno) {
+      xbt_die("crash while scattering %d", mpi_errno);
+    }
 
-    dst_tree_root = relative_dst >> i;
-    dst_tree_root <<= i;
+    /* curr_size is the amount of data that this process now has stored in
+     * buffer at byte offset (relative_rank*scatter_size) */
+    curr_size = scatter_size < (nbytes - (relative_rank * scatter_size)) ? scatter_size :  (nbytes - (relative_rank * scatter_size)) ;
+    if (curr_size < 0)
+        curr_size = 0;
+
+    /* medium size allgather and pof2 comm_size. use recurive doubling. */
+
+    mask = 0x1;
+    i = 0;
+    while (mask < comm_size)
+    {
+        relative_dst = relative_rank ^ mask;
+
+        dst = (relative_dst + root) % comm_size; 
+
+        /* find offset into send and recv buffers.
+           zero out the least significant "i" bits of relative_rank and
+           relative_dst to find root of src and dst
+           subtrees. Use ranks of roots as index to send from
+           and recv into  buffer */ 
+
+        dst_tree_root = relative_dst >> i;
+        dst_tree_root <<= i;
+
+        my_tree_root = relative_rank >> i;
+        my_tree_root <<= i;
+
+        send_offset = my_tree_root * scatter_size;
+        recv_offset = dst_tree_root * scatter_size;
+
+        if (relative_dst < comm_size)
+        {
+            smpi_mpi_sendrecv(((char *)tmp_buf + send_offset),
+                                         curr_size, MPI_BYTE, dst, COLL_TAG_BCAST, 
+                                         ((char *)tmp_buf + recv_offset),
+                                         (nbytes-recv_offset < 0 ? 0 : nbytes-recv_offset), 
+                                         MPI_BYTE, dst, COLL_TAG_BCAST, comm, &status);
+            recv_size=smpi_mpi_get_count(&status, MPI_BYTE);
+            curr_size += recv_size;
+        }
 
-    my_tree_root = relative_rank >> i;
-    my_tree_root <<= i;
+        /* if some processes in this process's subtree in this step
+           did not have any destination process to communicate with
+           because of non-power-of-two, we need to send them the
+           data that they would normally have received from those
+           processes. That is, the haves in this subtree must send to
+           the havenots. We use a logarithmic recursive-halfing algorithm
+           for this. */
+
+        /* This part of the code will not currently be
+           executed because we are not using recursive
+           doubling for non power of two. Mark it as experimental
+           so that it doesn't show up as red in the coverage tests. */  
+
+        /* --BEGIN EXPERIMENTAL-- */
+        if (dst_tree_root + mask > comm_size)
+        {
+            nprocs_completed = comm_size - my_tree_root - mask;
+            /* nprocs_completed is the number of processes in this
+               subtree that have all the data. Send data to others
+               in a tree fashion. First find root of current tree
+               that is being divided into two. k is the number of
+               least-significant bits in this process's rank that
+               must be zeroed out to find the rank of the root */ 
+            j = mask;
+            k = 0;
+            while (j)
+            {
+                j >>= 1;
+                k++;
+            }
+            k--;
+
+            offset = scatter_size * (my_tree_root + mask);
+            tmp_mask = mask >> 1;
+
+            while (tmp_mask)
+            {
+                relative_dst = relative_rank ^ tmp_mask;
+                dst = (relative_dst + root) % comm_size; 
+
+                tree_root = relative_rank >> k;
+                tree_root <<= k;
+
+                /* send only if this proc has data and destination
+                   doesn't have data. */
+
+                /* if (rank == 3) { 
+                   printf("rank %d, dst %d, root %d, nprocs_completed %d\n", relative_rank, relative_dst, tree_root, nprocs_completed);
+                   fflush(stdout);
+                   }*/
+
+                if ((relative_dst > relative_rank) && 
+                    (relative_rank < tree_root + nprocs_completed)
+                    && (relative_dst >= tree_root + nprocs_completed))
+                {
+
+                    /* printf("Rank %d, send to %d, offset %d, size %d\n", rank, dst, offset, recv_size);
+                       fflush(stdout); */
+                    smpi_mpi_send(((char *)tmp_buf + offset),
+                                             recv_size, MPI_BYTE, dst,
+                                             COLL_TAG_BCAST, comm);
+                    /* recv_size was set in the previous
+                       receive. that's the amount of data to be
+                       sent now. */
+                }
+                /* recv only if this proc. doesn't have data and sender
+                   has data */
+                else if ((relative_dst < relative_rank) && 
+                         (relative_dst < tree_root + nprocs_completed) &&
+                         (relative_rank >= tree_root + nprocs_completed))
+                {
+                    /* printf("Rank %d waiting to recv from rank %d\n",
+                       relative_rank, dst); */
+                    smpi_mpi_recv(((char *)tmp_buf + offset),
+                                             nbytes - offset, 
+                                             MPI_BYTE, dst, COLL_TAG_BCAST,
+                                             comm, &status);
+                    /* nprocs_completed is also equal to the no. of processes
+                       whose data we don't have */
+                    recv_size=smpi_mpi_get_count(&status, MPI_BYTE);
+                    curr_size += recv_size;
+                    /* printf("Rank %d, recv from %d, offset %d, size %d\n", rank, dst, offset, recv_size);
+                       fflush(stdout);*/
+                }
+                tmp_mask >>= 1;
+                k--;
+            }
+        }
+        /* --END EXPERIMENTAL-- */
 
-    send_offset = my_tree_root * scatter_size;
-    recv_offset = dst_tree_root * scatter_size;
+        mask <<= 1;
+        i++;
+    }
 
-    if (relative_dst < num_procs) {
-      smpi_mpi_sendrecv((char *)buff + send_offset, curr_size, MPI_BYTE, dst, tag,
-                   (char *)buff + recv_offset, scatter_size * mask, MPI_BYTE, dst,
-                   tag, comm, &status);
-      recv_size = smpi_mpi_get_count(&status, MPI_BYTE);
-      curr_size += recv_size;
+    /* check that we received as much as we expected */
+    /* recvd_size may not be accurate for packed heterogeneous data */
+    if (is_homogeneous && curr_size != nbytes) {
+      xbt_die("we didn't receive enough !");
     }
 
-    /* if some processes in this process's subtree in this step
-       did not have any destination process to communicate with
-       because of non-power-of-two, we need to send them the
-       data that they would normally have received from those
-       processes. That is, the haves in this subtree must send to
-       the havenots. We use a logarithmic recursive-halfing algorithm
-       for this. */
-
-    if (dst_tree_root + mask > num_procs) {
-      num_procs_completed = num_procs - my_tree_root - mask;
-      /* num_procs_completed is the number of processes in this
-         subtree that have all the data. Send data to others
-         in a tree fashion. First find root of current tree
-         that is being divided into two. k is the number of
-         least-significant bits in this process's rank that
-         must be zeroed out to find the rank of the root */
-      j = mask;
-      k = 0;
-      while (j) {
-        j >>= 1;
-        k++;
-      }
-      k--;
-
-      offset = scatter_size * (my_tree_root + mask);
-      tmp_mask = mask >> 1;
-
-      while (tmp_mask) {
-        relative_dst = relative_rank ^ tmp_mask;
-        dst = (relative_dst + root) % num_procs;
-
-        tree_root = relative_rank >> k;
-        tree_root <<= k;
-
-        /* send only if this proc has data and destination
-           doesn't have data. */
-
-        if ((relative_dst > relative_rank)
-            && (relative_rank < tree_root + num_procs_completed)
-            && (relative_dst >= tree_root + num_procs_completed)) {
-          smpi_mpi_send((char *)buff + offset, recv_size, MPI_BYTE, dst, tag, comm);
-
-          /* recv_size was set in the previous
-             receive. that's the amount of data to be
-             sent now. */
-        }
-        /* recv only if this proc. doesn't have data and sender
-           has data */
-        else if ((relative_dst < relative_rank)
-                 && (relative_dst < tree_root + num_procs_completed)
-                 && (relative_rank >= tree_root + num_procs_completed)) {
-
-          smpi_mpi_recv((char *)buff + offset, scatter_size * num_procs_completed,
-                   MPI_BYTE, dst, tag, comm, &status);
-
-          /* num_procs_completed is also equal to the no. of processes
-             whose data we don't have */
-          recv_size = smpi_mpi_get_count(&status, MPI_BYTE);
-          curr_size += recv_size;
+    if (!is_contig || !is_homogeneous)
+    {
+        if (rank != root)
+        {
+            position = 0;
+            mpi_errno = MPI_Unpack(tmp_buf, nbytes, &position, buffer,
+                                         count, datatype, comm);
+            if (mpi_errno) xbt_die("error when unpacking %d", mpi_errno);
         }
-        tmp_mask >>= 1;
-        k--;
-      }
     }
-    mask <<= 1;
-    i++;
-  }
 
-  return MPI_SUCCESS;
+fn_exit:
+/*    xbt_free(tmp_buf);*/
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
 }