-/* Copyright (c) 2013-2014. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
#include "colls_private.h"
+#include "src/smpi/smpi_mpi_dt_private.h"
-/*****************************************************************************
-
-Copyright (c) 2006, Ahmad Faraj & Xin Yuan,
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
- * Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
-
- * Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
- * Neither the name of the Florida State University nor the names of its
- contributors may be used to endorse or promote products derived from this
- software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
-ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
-ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
- *************************************************************************
- * Any results obtained from executing this software require the *
- * acknowledgment and citation of the software and its owners. *
- * The full citation is given below: *
- * *
- * A. Faraj and X. Yuan. "Automatic Generation and Tuning of MPI *
- * Collective Communication Routines." The 19th ACM International *
- * Conference on Supercomputing (ICS), Cambridge, Massachusetts, *
- * June 20-22, 2005. *
- *************************************************************************
+static int scatter_for_bcast(
+ int root,
+ MPI_Comm comm,
+ int nbytes,
+ void *tmp_buf)
+{
+ MPI_Status status;
+ int rank, comm_size, src, dst;
+ int relative_rank, mask;
+ int mpi_errno = MPI_SUCCESS;
+ int scatter_size, curr_size, recv_size = 0, send_size;
+
+ comm_size = smpi_comm_size(comm);
+ rank = smpi_comm_rank(comm);
+ relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
+
+ /* use long message algorithm: binomial tree scatter followed by an allgather */
+
+ /* The scatter algorithm divides the buffer into nprocs pieces and
+ scatters them among the processes. Root gets the first piece,
+ root+1 gets the second piece, and so forth. Uses the same binomial
+ tree algorithm as above. Ceiling division
+ is used to compute the size of each piece. This means some
+ processes may not get any data. For example if bufsize = 97 and
+ nprocs = 16, ranks 15 and 16 will get 0 data. On each process, the
+ scattered data is stored at the same offset in the buffer as it is
+ on the root process. */
+
+ scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
+ curr_size = (rank == root) ? nbytes : 0; /* root starts with all the
+ data */
+
+ mask = 0x1;
+ while (mask < comm_size)
+ {
+ if (relative_rank & mask)
+ {
+ src = rank - mask;
+ if (src < 0) src += comm_size;
+ recv_size = nbytes - relative_rank*scatter_size;
+ /* recv_size is larger than what might actually be sent by the
+ sender. We don't need compute the exact value because MPI
+ allows you to post a larger recv.*/
+ if (recv_size <= 0)
+ {
+ curr_size = 0; /* this process doesn't receive any data
+ because of uneven division */
+ }
+ else
+ {
+ smpi_mpi_recv(((char *)tmp_buf +
+ relative_rank*scatter_size),
+ recv_size, MPI_BYTE, src,
+ COLL_TAG_BCAST, comm, &status);
+ /* query actual size of data received */
+ curr_size=smpi_mpi_get_count(&status, MPI_BYTE);
+ }
+ break;
+ }
+ mask <<= 1;
+ }
-*****************************************************************************/
+ /* This process is responsible for all processes that have bits
+ set from the LSB upto (but not including) mask. Because of
+ the "not including", we start by shifting mask back down
+ one. */
-/*****************************************************************************
+ mask >>= 1;
+ while (mask > 0)
+ {
+ if (relative_rank + mask < comm_size)
+ {
+ send_size = curr_size - scatter_size * mask;
+ /* mask is also the size of this process's subtree */
+
+ if (send_size > 0)
+ {
+ dst = rank + mask;
+ if (dst >= comm_size) dst -= comm_size;
+ smpi_mpi_send(((char *)tmp_buf +
+ scatter_size*(relative_rank+mask)),
+ send_size, MPI_BYTE, dst,
+ COLL_TAG_BCAST, comm);
+ curr_size -= send_size;
+ }
+ }
+ mask >>= 1;
+ }
- * Function: bcast_scatter_rdb_allgather
+ return mpi_errno;
+}
- * Return: int
+int
+smpi_coll_tuned_bcast_scatter_rdb_allgather (
+ void *buffer,
+ int count,
+ MPI_Datatype datatype,
+ int root,
+ MPI_Comm comm)
+{
+ MPI_Status status;
+ int rank, comm_size, dst;
+ int relative_rank, mask;
+ int mpi_errno = MPI_SUCCESS;
+ int scatter_size, curr_size, recv_size = 0;
+ int j, k, i, tmp_mask, is_contig, is_homogeneous;
+ MPI_Aint type_size, nbytes = 0;
+ int relative_dst, dst_tree_root, my_tree_root, send_offset;
+ int recv_offset, tree_root, nprocs_completed, offset;
+ int position;
+ MPI_Aint true_extent, true_lb;
+ void *tmp_buf;
+
+ comm_size = smpi_comm_size(comm);
+ rank = smpi_comm_rank(comm);
+ relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
+
+ /* If there is only one process, return */
+ if (comm_size == 1) goto fn_exit;
+
+ //if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)
+ if(datatype->flags & DT_FLAG_CONTIGUOUS)
+ is_contig = 1;
+ else {
+ is_contig = 0;
+ }
- * Inputs:
- buff: send input buffer
- count: number of elements to send
- data_type: data type of elements being sent
- root: source of data
- comm: communicator
+ is_homogeneous = 1;
- * Descrp: broadcasts using a scatter followed by rdb allgather.
+ /* MPI_Type_size() might not give the accurate size of the packed
+ * datatype for heterogeneous systems (because of padding, encoding,
+ * etc). On the other hand, MPI_Pack_size() can become very
+ * expensive, depending on the implementation, especially for
+ * heterogeneous systems. We want to use MPI_Type_size() wherever
+ * possible, and MPI_Pack_size() in other places.
+ */
+ if (is_homogeneous)
+ type_size=smpi_datatype_size(datatype);
- * Auther: MPICH / modified by Ahmad Faraj
+ nbytes = type_size * count;
+ if (nbytes == 0)
+ goto fn_exit; /* nothing to do */
- ****************************************************************************/
+ if (is_contig && is_homogeneous)
+ {
+ /* contiguous and homogeneous. no need to pack. */
+ smpi_datatype_extent(datatype, &true_lb, &true_extent);
-int
-smpi_coll_tuned_bcast_scatter_rdb_allgather(void *buff, int count, MPI_Datatype
- data_type, int root, MPI_Comm comm)
-{
- MPI_Aint extent;
- MPI_Status status;
-
- int i, j, k, src, dst, rank, num_procs, send_offset, recv_offset;
- int mask, relative_rank, curr_size, recv_size = 0, send_size, nbytes;
- int scatter_size, tree_root, relative_dst, dst_tree_root;
- int my_tree_root, offset, tmp_mask, num_procs_completed;
- int tag = COLL_TAG_BCAST;
-
- rank = smpi_comm_rank(comm);
- num_procs = smpi_comm_size(comm);
- extent = smpi_datatype_get_extent(data_type);
-
- nbytes = extent * count;
- scatter_size = (nbytes + num_procs - 1) / num_procs; // ceiling division
- curr_size = (rank == root) ? nbytes : 0; // root starts with all the data
- relative_rank = (rank >= root) ? rank - root : rank - root + num_procs;
-
- mask = 0x1;
- while (mask < num_procs) {
- if (relative_rank & mask) {
- src = rank - mask;
- if (src < 0)
- src += num_procs;
- recv_size = nbytes - relative_rank * scatter_size;
- // recv_size is larger than what might actually be sent by the
- // sender. We don't need compute the exact value because MPI
- // allows you to post a larger recv.
- if (recv_size <= 0)
- curr_size = 0; // this process doesn't receive any data
- // because of uneven division
- else {
- smpi_mpi_recv((char *)buff + relative_rank * scatter_size, recv_size,
- MPI_BYTE, src, tag, comm, &status);
- curr_size = smpi_mpi_get_count(&status, MPI_BYTE);
- }
- break;
+ tmp_buf = (char *) buffer + true_lb;
}
- mask <<= 1;
- }
-
- // This process is responsible for all processes that have bits
- // set from the LSB upto (but not including) mask. Because of
- // the "not including", we start by shifting mask back down
- // one.
-
- mask >>= 1;
- while (mask > 0) {
- if (relative_rank + mask < num_procs) {
- send_size = curr_size - scatter_size * mask;
- // mask is also the size of this process's subtree
-
- if (send_size > 0) {
- dst = rank + mask;
- if (dst >= num_procs)
- dst -= num_procs;
- smpi_mpi_send((char *)buff + scatter_size * (relative_rank + mask),
- send_size, MPI_BYTE, dst, tag, comm);
-
- curr_size -= send_size;
- }
+ else
+ {
+ tmp_buf=(void*)xbt_malloc(nbytes);
+
+ /* TODO: Pipeline the packing and communication */
+ position = 0;
+ if (rank == root) {
+ mpi_errno = smpi_mpi_pack(buffer, count, datatype, tmp_buf, nbytes,
+ &position, comm);
+ if (mpi_errno) xbt_die("crash while packing %d", mpi_errno);
+ }
}
- mask >>= 1;
- }
- // done scatter now do allgather
+ scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
- mask = 0x1;
- i = 0;
- while (mask < num_procs) {
- relative_dst = relative_rank ^ mask;
-
- dst = (relative_dst + root) % num_procs;
-
- /* find offset into send and recv buffers.
- zero out the least significant "i" bits of relative_rank and
- relative_dst to find root of src and dst
- subtrees. Use ranks of roots as index to send from
- and recv into buffer */
+ mpi_errno = scatter_for_bcast(root, comm,
+ nbytes, tmp_buf);
+ if (mpi_errno) {
+ xbt_die("crash while scattering %d", mpi_errno);
+ }
- dst_tree_root = relative_dst >> i;
- dst_tree_root <<= i;
+ /* curr_size is the amount of data that this process now has stored in
+ * buffer at byte offset (relative_rank*scatter_size) */
+ curr_size = scatter_size < (nbytes - (relative_rank * scatter_size)) ? scatter_size : (nbytes - (relative_rank * scatter_size)) ;
+ if (curr_size < 0)
+ curr_size = 0;
+
+ /* medium size allgather and pof2 comm_size. use recurive doubling. */
+
+ mask = 0x1;
+ i = 0;
+ while (mask < comm_size)
+ {
+ relative_dst = relative_rank ^ mask;
+
+ dst = (relative_dst + root) % comm_size;
+
+ /* find offset into send and recv buffers.
+ zero out the least significant "i" bits of relative_rank and
+ relative_dst to find root of src and dst
+ subtrees. Use ranks of roots as index to send from
+ and recv into buffer */
+
+ dst_tree_root = relative_dst >> i;
+ dst_tree_root <<= i;
+
+ my_tree_root = relative_rank >> i;
+ my_tree_root <<= i;
+
+ send_offset = my_tree_root * scatter_size;
+ recv_offset = dst_tree_root * scatter_size;
+
+ if (relative_dst < comm_size)
+ {
+ smpi_mpi_sendrecv(((char *)tmp_buf + send_offset),
+ curr_size, MPI_BYTE, dst, COLL_TAG_BCAST,
+ ((char *)tmp_buf + recv_offset),
+ (nbytes-recv_offset < 0 ? 0 : nbytes-recv_offset),
+ MPI_BYTE, dst, COLL_TAG_BCAST, comm, &status);
+ recv_size=smpi_mpi_get_count(&status, MPI_BYTE);
+ curr_size += recv_size;
+ }
- my_tree_root = relative_rank >> i;
- my_tree_root <<= i;
+ /* if some processes in this process's subtree in this step
+ did not have any destination process to communicate with
+ because of non-power-of-two, we need to send them the
+ data that they would normally have received from those
+ processes. That is, the haves in this subtree must send to
+ the havenots. We use a logarithmic recursive-halfing algorithm
+ for this. */
+
+ /* This part of the code will not currently be
+ executed because we are not using recursive
+ doubling for non power of two. Mark it as experimental
+ so that it doesn't show up as red in the coverage tests. */
+
+ /* --BEGIN EXPERIMENTAL-- */
+ if (dst_tree_root + mask > comm_size)
+ {
+ nprocs_completed = comm_size - my_tree_root - mask;
+ /* nprocs_completed is the number of processes in this
+ subtree that have all the data. Send data to others
+ in a tree fashion. First find root of current tree
+ that is being divided into two. k is the number of
+ least-significant bits in this process's rank that
+ must be zeroed out to find the rank of the root */
+ j = mask;
+ k = 0;
+ while (j)
+ {
+ j >>= 1;
+ k++;
+ }
+ k--;
+
+ offset = scatter_size * (my_tree_root + mask);
+ tmp_mask = mask >> 1;
+
+ while (tmp_mask)
+ {
+ relative_dst = relative_rank ^ tmp_mask;
+ dst = (relative_dst + root) % comm_size;
+
+ tree_root = relative_rank >> k;
+ tree_root <<= k;
+
+ /* send only if this proc has data and destination
+ doesn't have data. */
+
+ /* if (rank == 3) {
+ printf("rank %d, dst %d, root %d, nprocs_completed %d\n", relative_rank, relative_dst, tree_root, nprocs_completed);
+ fflush(stdout);
+ }*/
+
+ if ((relative_dst > relative_rank) &&
+ (relative_rank < tree_root + nprocs_completed)
+ && (relative_dst >= tree_root + nprocs_completed))
+ {
+
+ /* printf("Rank %d, send to %d, offset %d, size %d\n", rank, dst, offset, recv_size);
+ fflush(stdout); */
+ smpi_mpi_send(((char *)tmp_buf + offset),
+ recv_size, MPI_BYTE, dst,
+ COLL_TAG_BCAST, comm);
+ /* recv_size was set in the previous
+ receive. that's the amount of data to be
+ sent now. */
+ }
+ /* recv only if this proc. doesn't have data and sender
+ has data */
+ else if ((relative_dst < relative_rank) &&
+ (relative_dst < tree_root + nprocs_completed) &&
+ (relative_rank >= tree_root + nprocs_completed))
+ {
+ /* printf("Rank %d waiting to recv from rank %d\n",
+ relative_rank, dst); */
+ smpi_mpi_recv(((char *)tmp_buf + offset),
+ nbytes - offset,
+ MPI_BYTE, dst, COLL_TAG_BCAST,
+ comm, &status);
+ /* nprocs_completed is also equal to the no. of processes
+ whose data we don't have */
+ recv_size=smpi_mpi_get_count(&status, MPI_BYTE);
+ curr_size += recv_size;
+ /* printf("Rank %d, recv from %d, offset %d, size %d\n", rank, dst, offset, recv_size);
+ fflush(stdout);*/
+ }
+ tmp_mask >>= 1;
+ k--;
+ }
+ }
+ /* --END EXPERIMENTAL-- */
- send_offset = my_tree_root * scatter_size;
- recv_offset = dst_tree_root * scatter_size;
+ mask <<= 1;
+ i++;
+ }
- if (relative_dst < num_procs) {
- smpi_mpi_sendrecv((char *)buff + send_offset, curr_size, MPI_BYTE, dst, tag,
- (char *)buff + recv_offset, scatter_size * mask, MPI_BYTE, dst,
- tag, comm, &status);
- recv_size = smpi_mpi_get_count(&status, MPI_BYTE);
- curr_size += recv_size;
+ /* check that we received as much as we expected */
+ /* recvd_size may not be accurate for packed heterogeneous data */
+ if (is_homogeneous && curr_size != nbytes) {
+ xbt_die("we didn't receive enough !");
}
- /* if some processes in this process's subtree in this step
- did not have any destination process to communicate with
- because of non-power-of-two, we need to send them the
- data that they would normally have received from those
- processes. That is, the haves in this subtree must send to
- the havenots. We use a logarithmic recursive-halfing algorithm
- for this. */
-
- if (dst_tree_root + mask > num_procs) {
- num_procs_completed = num_procs - my_tree_root - mask;
- /* num_procs_completed is the number of processes in this
- subtree that have all the data. Send data to others
- in a tree fashion. First find root of current tree
- that is being divided into two. k is the number of
- least-significant bits in this process's rank that
- must be zeroed out to find the rank of the root */
- j = mask;
- k = 0;
- while (j) {
- j >>= 1;
- k++;
- }
- k--;
-
- offset = scatter_size * (my_tree_root + mask);
- tmp_mask = mask >> 1;
-
- while (tmp_mask) {
- relative_dst = relative_rank ^ tmp_mask;
- dst = (relative_dst + root) % num_procs;
-
- tree_root = relative_rank >> k;
- tree_root <<= k;
-
- /* send only if this proc has data and destination
- doesn't have data. */
-
- if ((relative_dst > relative_rank)
- && (relative_rank < tree_root + num_procs_completed)
- && (relative_dst >= tree_root + num_procs_completed)) {
- smpi_mpi_send((char *)buff + offset, recv_size, MPI_BYTE, dst, tag, comm);
-
- /* recv_size was set in the previous
- receive. that's the amount of data to be
- sent now. */
- }
- /* recv only if this proc. doesn't have data and sender
- has data */
- else if ((relative_dst < relative_rank)
- && (relative_dst < tree_root + num_procs_completed)
- && (relative_rank >= tree_root + num_procs_completed)) {
-
- smpi_mpi_recv((char *)buff + offset, scatter_size * num_procs_completed,
- MPI_BYTE, dst, tag, comm, &status);
-
- /* num_procs_completed is also equal to the no. of processes
- whose data we don't have */
- recv_size = smpi_mpi_get_count(&status, MPI_BYTE);
- curr_size += recv_size;
+ if (!is_contig || !is_homogeneous)
+ {
+ if (rank != root)
+ {
+ position = 0;
+ mpi_errno = MPI_Unpack(tmp_buf, nbytes, &position, buffer,
+ count, datatype, comm);
+ if (mpi_errno) xbt_die("error when unpacking %d", mpi_errno);
}
- tmp_mask >>= 1;
- k--;
- }
}
- mask <<= 1;
- i++;
- }
- return MPI_SUCCESS;
+fn_exit:
+/* xbt_free(tmp_buf);*/
+ return mpi_errno;
+fn_fail:
+ goto fn_exit;
}