1 #include "colls_private.h"
2 /* IMPLEMENTED BY PITCH PATARASUK
3 Non-topoloty-specific (however, number of cores/node need to be changed)
4 all-reduce operation designed for smp clusters
5 It uses 2-layer communication: binomial for both intra-communication
7 The communication are done in a pipeline fashion */
9 /* change number of core per smp-node
10 we assume that number of core per process will be the same for all implementations */
15 /* this is a default segment size for pipelining,
16 but it is typically passed as a command line argument */
17 int allreduce_smp_binomial_pipeline_segment_size = 4096;
20 This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
21 for (loop over pipelength) {
22 smp-binomial main code;
27 Use -DMPICH2 if this code does not compile.
28 MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
29 This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
33 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
34 1) binomial_tree reduce inside each SMP node
35 2) binomial_tree reduce intra-communication between root of each SMP node
36 3) binomial_tree bcast intra-communication between root of each SMP node
37 4) binomial_tree bcast inside each SMP node
39 int smpi_coll_tuned_allreduce_smp_binomial_pipeline(void *send_buf,
40 void *recv_buf, int count,
42 MPI_Op op, MPI_Comm comm)
46 int tag = COLL_TAG_ALLREDUCE;
49 int num_core = simcall_host_get_core(SIMIX_host_self());
50 // do we use the default one or the number of cores in the platform ?
51 // if the number of cores is one, the platform may be simulated with 1 node = 1 core
52 if (num_core == 1) num_core = NUM_CORE;
54 comm_size = smpi_comm_size(comm);
55 rank = smpi_comm_rank(comm);
57 extent = smpi_datatype_get_extent(dtype);
58 tmp_buf = (void *) xbt_malloc(count * extent);
60 int intra_rank, inter_rank;
61 intra_rank = rank % num_core;
62 inter_rank = rank / num_core;
67 int pcount = allreduce_smp_binomial_pipeline_segment_size;
72 /* size of processes participate in intra communications =>
73 should be equal to number of machines */
74 int inter_comm_size = (comm_size + num_core - 1) / num_core;
76 /* copy input buffer to output buffer */
77 smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
78 recv_buf, count, dtype, rank, tag, comm, &status);
80 /* compute pipe length */
82 pipelength = count / pcount;
84 /* pipelining over pipelength (+3 is because we have 4 stages:
85 reduce-intra, reduce-inter, bcast-inter, bcast-intra */
86 for (phase = 0; phase < pipelength + 3; phase++) {
88 /* start binomial reduce intra communication inside each SMP node */
89 if (phase < pipelength) {
91 while (mask < num_core) {
92 if ((mask & intra_rank) == 0) {
93 src = (inter_rank * num_core) + (intra_rank | mask);
94 if (src < comm_size) {
95 recv_offset = phase * pcount * extent;
96 smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
97 smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
100 send_offset = phase * pcount * extent;
101 dst = (inter_rank * num_core) + (intra_rank & (~mask));
102 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
109 /* start binomial reduce inter-communication between each SMP nodes:
110 each node only have one process that can communicate to other nodes */
111 if ((phase > 0) && (phase < (pipelength + 1))) {
112 if (intra_rank == 0) {
115 while (mask < inter_comm_size) {
116 if ((mask & inter_rank) == 0) {
117 src = (inter_rank | mask) * num_core;
118 if (src < comm_size) {
119 recv_offset = (phase - 1) * pcount * extent;
120 smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
121 smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
124 dst = (inter_rank & (~mask)) * num_core;
125 send_offset = (phase - 1) * pcount * extent;
126 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
134 /* start binomial broadcast inter-communication between each SMP nodes:
135 each node only have one process that can communicate to other nodes */
136 if ((phase > 1) && (phase < (pipelength + 2))) {
137 if (intra_rank == 0) {
139 while (mask < inter_comm_size) {
140 if (inter_rank & mask) {
141 src = (inter_rank - mask) * num_core;
142 recv_offset = (phase - 2) * pcount * extent;
143 smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
152 if (inter_rank < inter_comm_size) {
153 dst = (inter_rank + mask) * num_core;
154 if (dst < comm_size) {
155 //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
156 send_offset = (phase - 2) * pcount * extent;
157 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
165 /* start binomial broadcast intra-communication inside each SMP nodes */
167 int num_core_in_current_smp = num_core;
168 if (inter_rank == (inter_comm_size - 1)) {
169 num_core_in_current_smp = comm_size - (inter_rank * num_core);
172 while (mask < num_core_in_current_smp) {
173 if (intra_rank & mask) {
174 src = (inter_rank * num_core) + (intra_rank - mask);
175 recv_offset = (phase - 3) * pcount * extent;
176 smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
185 dst = (inter_rank * num_core) + (intra_rank + mask);
186 if (dst < comm_size) {
187 send_offset = (phase - 3) * pcount * extent;
188 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);