Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
leaks --
[simgrid.git] / src / smpi / colls / allreduce-smp-rdb.c
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "colls_private.h"
8 /* IMPLEMENTED BY PITCH PATARASUK 
9    Non-topoloty-specific (however, number of cores/node need to be changed) 
10    all-reduce operation designed for smp clusters
11    It uses 2-layer communication: binomial for intra-communication 
12    and rdb for inter-communication*/
13
14
15 /* ** NOTE **
16    Use -DMPICH2 if this code does not compile.
17    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
18    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
19 */
20
21 //#include <star-reduction.c>
22
23 /*
24 This fucntion performs all-reduce operation as follow.
25 1) binomial_tree reduce inside each SMP node
26 2) Recursive doubling intra-communication between root of each SMP node
27 3) binomial_tree bcast inside each SMP node
28 */
29 int smpi_coll_tuned_allreduce_smp_rdb(void *send_buf, void *recv_buf, int count,
30                                       MPI_Datatype dtype, MPI_Op op,
31                                       MPI_Comm comm)
32 {
33   int comm_size, rank;
34   void *tmp_buf;
35   int tag = COLL_TAG_ALLREDUCE;
36   int mask, src, dst;
37   MPI_Status status;
38   if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL){
39     smpi_comm_init_smp(comm);
40   }
41   int num_core=1;
42   if (smpi_comm_is_uniform(comm)){
43     num_core = smpi_comm_size(smpi_comm_get_intra_comm(comm));
44   }
45   /*
46      #ifdef MPICH2_REDUCTION
47      MPI_User_function * uop = MPIR_Op_table[op % 16 - 1];
48      #else
49      MPI_User_function *uop;
50      struct MPIR_OP *op_ptr;
51      op_ptr = MPIR_ToPointer(op);
52      uop  = op_ptr->op;
53      #endif
54    */
55   comm_size = smpi_comm_size(comm);
56   rank = smpi_comm_rank(comm);
57   MPI_Aint extent;
58   extent = smpi_datatype_get_extent(dtype);
59   tmp_buf = (void *) smpi_get_tmp_sendbuffer(count * extent);
60
61   /* compute intra and inter ranking */
62   int intra_rank, inter_rank;
63   intra_rank = rank % num_core;
64   inter_rank = rank / num_core;
65
66   /* size of processes participate in intra communications =>
67      should be equal to number of machines */
68   int inter_comm_size = (comm_size + num_core - 1) / num_core;
69
70   /* copy input buffer to output buffer */
71   smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
72                recv_buf, count, dtype, rank, tag, comm, &status);
73
74   /* start binomial reduce intra communication inside each SMP node */
75   mask = 1;
76   while (mask < num_core) {
77     if ((mask & intra_rank) == 0) {
78       src = (inter_rank * num_core) + (intra_rank | mask);
79       if (src < comm_size) {
80         smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
81         smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
82       }
83     } else {
84       dst = (inter_rank * num_core) + (intra_rank & (~mask));
85       smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
86       break;
87     }
88     mask <<= 1;
89   }                             /* end binomial reduce intra-communication */
90
91
92   /* start rdb (recursive doubling) all-reduce inter-communication 
93      between each SMP nodes : each node only have one process that can communicate
94      to other nodes */
95   if (intra_rank == 0) {
96
97     /* find nearest power-of-two less than or equal to inter_comm_size */
98     int pof2, rem, newrank, newdst;
99     pof2 = 1;
100     while (pof2 <= inter_comm_size)
101       pof2 <<= 1;
102     pof2 >>= 1;
103     rem = inter_comm_size - pof2;
104
105     /* In the non-power-of-two case, all even-numbered
106        processes of rank < 2*rem send their data to
107        (rank+1). These even-numbered processes no longer
108        participate in the algorithm until the very end.
109      */
110     if (inter_rank < 2 * rem) {
111       if (inter_rank % 2 == 0) {
112         dst = rank + num_core;
113         smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
114         newrank = -1;
115       } else {
116         src = rank - num_core;
117         smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
118         smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
119         newrank = inter_rank / 2;
120       }
121     } else {
122       newrank = inter_rank - rem;
123     }
124
125     /* example inter-communication RDB rank change algorithm 
126        0,4,8,12..36 <= true rank (assume 4 core per SMP)
127        0123 4567 89 <= inter_rank
128        1 3 4567 89 (1,3 got data from 0,2 : 0,2 will be idle until the end)
129        0 1 4567 89 
130        0 1 2345 67 => newrank
131      */
132
133     if (newrank != -1) {
134       mask = 1;
135       while (mask < pof2) {
136         newdst = newrank ^ mask;
137         /* find real rank of dest */
138         dst = (newdst < rem) ? newdst * 2 + 1 : newdst + rem;
139         dst *= num_core;
140
141         /* exchange data in rdb manner */
142         smpi_mpi_sendrecv(recv_buf, count, dtype, dst, tag, tmp_buf, count, dtype,
143                      dst, tag, comm, &status);
144         smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
145         mask <<= 1;
146       }
147     }
148
149     /* non pof2 case 
150        left-over processes (all even ranks: < 2 * rem) get the result    
151      */
152     if (inter_rank < 2 * rem) {
153       if (inter_rank % 2) {
154         smpi_mpi_send(recv_buf, count, dtype, rank - num_core, tag, comm);
155       } else {
156         smpi_mpi_recv(recv_buf, count, dtype, rank + num_core, tag, comm, &status);
157       }
158     }
159   }
160
161   /* start binomial broadcast intra-communication inside each SMP nodes */
162   int num_core_in_current_smp = num_core;
163   if (inter_rank == (inter_comm_size - 1)) {
164     num_core_in_current_smp = comm_size - (inter_rank * num_core);
165   }
166   mask = 1;
167   while (mask < num_core_in_current_smp) {
168     if (intra_rank & mask) {
169       src = (inter_rank * num_core) + (intra_rank - mask);
170       smpi_mpi_recv(recv_buf, count, dtype, src, tag, comm, &status);
171       break;
172     }
173     mask <<= 1;
174   }
175   mask >>= 1;
176
177   while (mask > 0) {
178     dst = (inter_rank * num_core) + (intra_rank + mask);
179     if (dst < comm_size) {
180       smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
181     }
182     mask >>= 1;
183   }
184
185   smpi_free_tmp_buffer(tmp_buf);
186   return MPI_SUCCESS;
187 }