1 /* Copyright (c) 2013-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "colls_private.h"
8 /* IMPLEMENTED BY PITCH PATARASUK
9 Non-topoloty-specific (however, number of cores/node need to be changed)
10 all-reduce operation designed for smp clusters
11 It uses 2-layer communication: binomial for both intra-communication
13 The communication are done in a pipeline fashion */
15 /* change number of core per smp-node
16 we assume that number of core per process will be the same for all implementations */
21 /* this is a default segment size for pipelining,
22 but it is typically passed as a command line argument */
23 int allreduce_smp_binomial_pipeline_segment_size = 4096;
26 This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
27 for (loop over pipelength) {
28 smp-binomial main code;
33 Use -DMPICH2 if this code does not compile.
34 MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
35 This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
39 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
40 1) binomial_tree reduce inside each SMP node
41 2) binomial_tree reduce intra-communication between root of each SMP node
42 3) binomial_tree bcast intra-communication between root of each SMP node
43 4) binomial_tree bcast inside each SMP node
45 int smpi_coll_tuned_allreduce_smp_binomial_pipeline(void *send_buf,
46 void *recv_buf, int count,
48 MPI_Op op, MPI_Comm comm)
52 int tag = COLL_TAG_ALLREDUCE;
55 int num_core = simcall_host_get_core(SIMIX_host_self());
56 // do we use the default one or the number of cores in the platform ?
57 // if the number of cores is one, the platform may be simulated with 1 node = 1 core
58 if (num_core == 1) num_core = NUM_CORE;
60 comm_size = smpi_comm_size(comm);
61 rank = smpi_comm_rank(comm);
63 extent = smpi_datatype_get_extent(dtype);
64 tmp_buf = (void *) xbt_malloc(count * extent);
66 int intra_rank, inter_rank;
67 intra_rank = rank % num_core;
68 inter_rank = rank / num_core;
73 int pcount = allreduce_smp_binomial_pipeline_segment_size;
78 /* size of processes participate in intra communications =>
79 should be equal to number of machines */
80 int inter_comm_size = (comm_size + num_core - 1) / num_core;
82 /* copy input buffer to output buffer */
83 smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
84 recv_buf, count, dtype, rank, tag, comm, &status);
86 /* compute pipe length */
88 pipelength = count / pcount;
90 /* pipelining over pipelength (+3 is because we have 4 stages:
91 reduce-intra, reduce-inter, bcast-inter, bcast-intra */
92 for (phase = 0; phase < pipelength + 3; phase++) {
94 /* start binomial reduce intra communication inside each SMP node */
95 if (phase < pipelength) {
97 while (mask < num_core) {
98 if ((mask & intra_rank) == 0) {
99 src = (inter_rank * num_core) + (intra_rank | mask);
100 if (src < comm_size) {
101 recv_offset = phase * pcount * extent;
102 smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
103 smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
106 send_offset = phase * pcount * extent;
107 dst = (inter_rank * num_core) + (intra_rank & (~mask));
108 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
115 /* start binomial reduce inter-communication between each SMP nodes:
116 each node only have one process that can communicate to other nodes */
117 if ((phase > 0) && (phase < (pipelength + 1))) {
118 if (intra_rank == 0) {
121 while (mask < inter_comm_size) {
122 if ((mask & inter_rank) == 0) {
123 src = (inter_rank | mask) * num_core;
124 if (src < comm_size) {
125 recv_offset = (phase - 1) * pcount * extent;
126 smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
127 smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
130 dst = (inter_rank & (~mask)) * num_core;
131 send_offset = (phase - 1) * pcount * extent;
132 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
140 /* start binomial broadcast inter-communication between each SMP nodes:
141 each node only have one process that can communicate to other nodes */
142 if ((phase > 1) && (phase < (pipelength + 2))) {
143 if (intra_rank == 0) {
145 while (mask < inter_comm_size) {
146 if (inter_rank & mask) {
147 src = (inter_rank - mask) * num_core;
148 recv_offset = (phase - 2) * pcount * extent;
149 smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
158 if (inter_rank < inter_comm_size) {
159 dst = (inter_rank + mask) * num_core;
160 if (dst < comm_size) {
161 //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
162 send_offset = (phase - 2) * pcount * extent;
163 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
171 /* start binomial broadcast intra-communication inside each SMP nodes */
173 int num_core_in_current_smp = num_core;
174 if (inter_rank == (inter_comm_size - 1)) {
175 num_core_in_current_smp = comm_size - (inter_rank * num_core);
178 while (mask < num_core_in_current_smp) {
179 if (intra_rank & mask) {
180 src = (inter_rank * num_core) + (intra_rank - mask);
181 recv_offset = (phase - 3) * pcount * extent;
182 smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
191 dst = (inter_rank * num_core) + (intra_rank + mask);
192 if (dst < comm_size) {
193 send_offset = (phase - 3) * pcount * extent;
194 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);