Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
use tuned barrier here if provided
[simgrid.git] / src / smpi / colls / allreduce-ompi-ring-segmented.c
1 /*
2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3  *                         University Research and Technology
4  *                         Corporation.  All rights reserved.
5  * Copyright (c) 2004-2009 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2009      University of Houston. All rights reserved.
13  * $COPYRIGHT$
14  *
15  * Additional copyrights may follow
16  *
17  * $HEADER$
18  *  Redistribution and use in source and binary forms, with or without
19  * modification, are permitted provided that the following conditions are
20  * met:
21
22  * - Redistributions of source code must retain the above copyright
23  *   notice, this list of conditions and the following disclaimer.
24
25  * - Redistributions in binary form must reproduce the above copyright
26  *   notice, this list of conditions and the following disclaimer listed
27  *   in this license in the documentation and/or other materials
28  *   provided with the distribution.
29
30  * - Neither the name of the copyright holders nor the names of its
31  *   contributors may be used to endorse or promote products derived from
32  *   this software without specific prior written permission.
33
34  * The copyright holders provide no reassurances that the source code
35  * provided does not infringe any patent, copyright, or any other
36  * intellectual property rights of third parties.  The copyright holders
37  * disclaim any liability to any recipient for claims brought against
38  * recipient by any third party for infringement of that parties
39  * intellectual property rights.
40
41  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
42  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
43  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
44  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
45  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
46  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
47  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
48  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
49  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
50  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
51  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
52
53  */
54
55
56
57 /*
58  *  ompi_coll_tuned_allreduce_intra_ring_segmented
59  *
60  *   Function:       Pipelined ring algorithm for allreduce operation
61  *   Accepts:        Same as MPI_Allreduce(), segment size
62  *   Returns:        MPI_SUCCESS or error code
63  *
64  *   Description:    Implements pipelined ring algorithm for allreduce: 
65  *                   user supplies suggested segment size for the pipelining of
66  *                   reduce operation.
67  *                   The segment size determines the number of phases, np, for 
68  *                   the algorithm execution.  
69  *                   The message is automatically divided into blocks of 
70  *                   approximately  (count / (np * segcount)) elements.
71  *                   At the end of reduction phase, allgather like step is 
72  *                   executed.
73  *                   Algorithm requires (np + 1)*(N - 1) steps.
74  *
75  *   Limitations:    The algorithm DOES NOT preserve order of operations so it 
76  *                   can be used only for commutative operations.
77  *                   In addition, algorithm cannot work if the total size is 
78  *                   less than size * segment size.
79  *         Example on 3 nodes with 2 phases
80  *         Initial state
81  *   #      0              1             2 
82  *        [00a]          [10a]         [20a]
83  *        [00b]          [10b]         [20b]
84  *        [01a]          [11a]         [21a]
85  *        [01b]          [11b]         [21b]
86  *        [02a]          [12a]         [22a]
87  *        [02b]          [12b]         [22b]
88  *
89  *        COMPUTATION PHASE 0 (a)
90  *         Step 0: rank r sends block ra to rank (r+1) and receives bloc (r-1)a 
91  *                 from rank (r-1) [with wraparound].
92  *    #     0              1             2  
93  *        [00a]        [00a+10a]       [20a]
94  *        [00b]          [10b]         [20b]
95  *        [01a]          [11a]       [11a+21a]
96  *        [01b]          [11b]         [21b]
97  *      [22a+02a]        [12a]         [22a]
98  *        [02b]          [12b]         [22b]
99  *
100  *         Step 1: rank r sends block (r-1)a to rank (r+1) and receives bloc 
101  *                 (r-2)a from rank (r-1) [with wraparound].
102  *    #     0              1             2  
103  *        [00a]        [00a+10a]   [00a+10a+20a]
104  *        [00b]          [10b]         [20b]
105  *    [11a+21a+01a]      [11a]       [11a+21a]
106  *        [01b]          [11b]         [21b]
107  *      [22a+02a]    [22a+02a+12a]     [22a]
108  *        [02b]          [12b]         [22b] 
109  *
110  *        COMPUTATION PHASE 1 (b)
111  *         Step 0: rank r sends block rb to rank (r+1) and receives bloc (r-1)b 
112  *                 from rank (r-1) [with wraparound].
113  *    #     0              1             2  
114  *        [00a]        [00a+10a]       [20a]
115  *        [00b]        [00b+10b]       [20b]
116  *        [01a]          [11a]       [11a+21a]
117  *        [01b]          [11b]       [11b+21b]
118  *      [22a+02a]        [12a]         [22a]
119  *      [22b+02b]        [12b]         [22b]
120  *
121  *         Step 1: rank r sends block (r-1)b to rank (r+1) and receives bloc 
122  *                 (r-2)b from rank (r-1) [with wraparound].
123  *    #     0              1             2  
124  *        [00a]        [00a+10a]   [00a+10a+20a]
125  *        [00b]          [10b]     [0bb+10b+20b]
126  *    [11a+21a+01a]      [11a]       [11a+21a]
127  *    [11b+21b+01b]      [11b]         [21b]
128  *      [22a+02a]    [22a+02a+12a]     [22a]
129  *        [02b]      [22b+01b+12b]     [22b] 
130  *
131  *         
132  *        DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1 (same as
133  *         in regular ring algorithm.
134  *
135  */
136  
137  #define COLL_TUNED_COMPUTED_SEGCOUNT(SEGSIZE, TYPELNG, SEGCOUNT)        \
138     if( ((SEGSIZE) >= (TYPELNG)) &&                                     \
139         ((SEGSIZE) < ((TYPELNG) * (SEGCOUNT))) ) {                      \
140         size_t residual;                                                \
141         (SEGCOUNT) = (int)((SEGSIZE) / (TYPELNG));                      \
142         residual = (SEGSIZE) - (SEGCOUNT) * (TYPELNG);                  \
143         if( residual > ((TYPELNG) >> 1) )                               \
144             (SEGCOUNT)++;                                               \
145     }                                                                   \
146     
147 #define COLL_TUNED_COMPUTE_BLOCKCOUNT( COUNT, NUM_BLOCKS, SPLIT_INDEX,       \
148                                        EARLY_BLOCK_COUNT, LATE_BLOCK_COUNT ) \
149     EARLY_BLOCK_COUNT = LATE_BLOCK_COUNT = COUNT / NUM_BLOCKS;               \
150     SPLIT_INDEX = COUNT % NUM_BLOCKS;                                        \
151     if (0 != SPLIT_INDEX) {                                                  \
152         EARLY_BLOCK_COUNT = EARLY_BLOCK_COUNT + 1;                           \
153     }                                                                        \
154  
155  #include "colls_private.h"
156 int 
157 smpi_coll_tuned_allreduce_ompi_ring_segmented(void *sbuf, void *rbuf, int count,
158                                                MPI_Datatype dtype,
159                                                MPI_Op op,
160                                                MPI_Comm comm) 
161 {
162    int ret = MPI_SUCCESS;
163    int line;
164    int rank, size, k, recv_from, send_to;
165    int early_blockcount, late_blockcount, split_rank; 
166    int segcount, max_segcount;
167    int num_phases, phase;
168    int block_count, inbi;
169    size_t typelng;
170    char *tmpsend = NULL, *tmprecv = NULL;
171    char *inbuf[2] = {NULL, NULL};
172    ptrdiff_t true_extent, extent;
173    ptrdiff_t block_offset, max_real_segsize;
174    MPI_Request reqs[2] = {NULL, NULL};
175    const size_t segsize = 1 << 20; /* 1 MB */
176    size = smpi_comm_size(comm);
177    rank = smpi_comm_rank(comm);
178
179    XBT_DEBUG("coll:tuned:allreduce_intra_ring_segmented rank %d, count %d", rank, count);
180
181    /* Special case for size == 1 */
182    if (1 == size) {
183       if (MPI_IN_PLACE != sbuf) {
184       ret= smpi_datatype_copy(sbuf, count, dtype,rbuf, count, dtype);
185          if (ret < 0) { line = __LINE__; goto error_hndl; }
186       }
187       return MPI_SUCCESS;
188    }
189    
190    /* Determine segment count based on the suggested segment size */
191    extent = smpi_datatype_get_extent(dtype);
192    if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
193    true_extent = smpi_datatype_get_extent(dtype);
194    if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
195    typelng = smpi_datatype_size(dtype);
196    if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
197    segcount = count;
198    COLL_TUNED_COMPUTED_SEGCOUNT(segsize, typelng, segcount)
199
200    /* Special case for count less than size * segcount - use regular ring */
201    if (count < size * segcount) {
202       XBT_DEBUG( "coll:tuned:allreduce_ring_segmented rank %d/%d, count %d, switching to regular ring", rank, size, count);
203       return (smpi_coll_tuned_allreduce_lr(sbuf, rbuf, count, dtype, op, 
204                                                    comm));
205    }
206
207    /* Determine the number of phases of the algorithm */
208    num_phases = count / (size * segcount);
209    if ((count % (size * segcount) >= size) && 
210        (count % (size * segcount) > ((size * segcount) / 2))) {
211       num_phases++;
212    }
213
214    /* Determine the number of elements per block and corresponding 
215       block sizes.
216       The blocks are divided into "early" and "late" ones:
217       blocks 0 .. (split_rank - 1) are "early" and 
218       blocks (split_rank) .. (size - 1) are "late".
219       Early blocks are at most 1 element larger than the late ones.
220       Note, these blocks will be split into num_phases segments,
221       out of the largest one will have max_segcount elements.
222     */
223    COLL_TUNED_COMPUTE_BLOCKCOUNT( count, size, split_rank, 
224                                   early_blockcount, late_blockcount )
225    COLL_TUNED_COMPUTE_BLOCKCOUNT( early_blockcount, num_phases, inbi,
226                                   max_segcount, k)
227    max_real_segsize = true_extent + (max_segcount - 1) * extent;
228
229    /* Allocate and initialize temporary buffers */
230    inbuf[0] = (char*)malloc(max_real_segsize);
231    if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
232    if (size > 2) {
233       inbuf[1] = (char*)malloc(max_real_segsize);
234       if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
235    }
236
237    /* Handle MPI_IN_PLACE */
238    if (MPI_IN_PLACE != sbuf) {
239       ret= smpi_datatype_copy(sbuf, count, dtype,rbuf, count, dtype);
240       if (ret < 0) { line = __LINE__; goto error_hndl; }
241    }
242
243    /* Computation loop: for each phase, repeat ring allreduce computation loop */
244    for (phase = 0; phase < num_phases; phase ++) {
245       ptrdiff_t phase_offset;
246       int early_phase_segcount, late_phase_segcount, split_phase, phase_count;
247
248       /* 
249          For each of the remote nodes:
250          - post irecv for block (r-1)
251          - send block (r)
252            To do this, first compute block offset and count, and use block offset
253            to compute phase offset.
254          - in loop for every step k = 2 .. n
255            - post irecv for block (r + n - k) % n
256            - wait on block (r + n - k + 1) % n to arrive
257            - compute on block (r + n - k + 1) % n
258            - send block (r + n - k + 1) % n
259          - wait on block (r + 1)
260          - compute on block (r + 1)
261          - send block (r + 1) to rank (r + 1)
262          Note that we must be careful when computing the begining of buffers and
263          for send operations and computation we must compute the exact block size.
264       */
265       send_to = (rank + 1) % size;
266       recv_from = (rank + size - 1) % size;
267       
268       inbi = 0;
269       /* Initialize first receive from the neighbor on the left */
270       reqs[inbi] = smpi_mpi_irecv(inbuf[inbi], max_segcount, dtype, recv_from,
271                                666, comm);
272       /* Send first block (my block) to the neighbor on the right:
273          - compute my block and phase offset
274          - send data */
275       block_offset = ((rank < split_rank)? 
276                       (rank * early_blockcount) : 
277                       (rank * late_blockcount + split_rank));
278       block_count = ((rank < split_rank)? early_blockcount : late_blockcount);
279       COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
280                                     early_phase_segcount, late_phase_segcount)
281       phase_count = ((phase < split_phase)?
282                      (early_phase_segcount) : (late_phase_segcount));
283       phase_offset = ((phase < split_phase)?
284                       (phase * early_phase_segcount) : 
285                       (phase * late_phase_segcount + split_phase));
286       tmpsend = ((char*)rbuf) + (block_offset + phase_offset) * extent;
287       smpi_mpi_send(tmpsend, phase_count, dtype, send_to,
288                               666, comm);
289       
290       for (k = 2; k < size; k++) {
291          const int prevblock = (rank + size - k + 1) % size;
292          
293          inbi = inbi ^ 0x1;
294          
295          /* Post irecv for the current block */
296          reqs[inbi] = smpi_mpi_irecv(inbuf[inbi], max_segcount, dtype, recv_from,
297                                666, comm);
298          if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
299          
300          /* Wait on previous block to arrive */
301          smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
302          
303          /* Apply operation on previous block: result goes to rbuf
304             rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
305          */
306          block_offset = ((prevblock < split_rank)?
307                          (prevblock * early_blockcount) :
308                          (prevblock * late_blockcount + split_rank));
309          block_count = ((prevblock < split_rank)? 
310                         early_blockcount : late_blockcount);
311          COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
312                                        early_phase_segcount, late_phase_segcount)
313          phase_count = ((phase < split_phase)?
314                         (early_phase_segcount) : (late_phase_segcount));
315          phase_offset = ((phase < split_phase)?
316                          (phase * early_phase_segcount) : 
317                          (phase * late_phase_segcount + split_phase));
318          tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent;
319          smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &phase_count, &dtype);
320          /* send previous block to send_to */
321          smpi_mpi_send(tmprecv, phase_count, dtype, send_to,
322                               666, comm);
323       }
324       
325       /* Wait on the last block to arrive */
326       smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
327
328       
329       /* Apply operation on the last block (from neighbor (rank + 1) 
330          rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
331       recv_from = (rank + 1) % size;
332       block_offset = ((recv_from < split_rank)?
333                       (recv_from * early_blockcount) :
334                       (recv_from * late_blockcount + split_rank));
335       block_count = ((recv_from < split_rank)? 
336                      early_blockcount : late_blockcount);
337       COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
338                                     early_phase_segcount, late_phase_segcount)
339       phase_count = ((phase < split_phase)?
340                      (early_phase_segcount) : (late_phase_segcount));
341       phase_offset = ((phase < split_phase)?
342                       (phase * early_phase_segcount) : 
343                       (phase * late_phase_segcount + split_phase));
344       tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent;
345       smpi_op_apply(op, inbuf[inbi], tmprecv, &phase_count, &dtype);
346    }
347
348    /* Distribution loop - variation of ring allgather */
349    send_to = (rank + 1) % size;
350    recv_from = (rank + size - 1) % size;
351    for (k = 0; k < size - 1; k++) {
352       const int recv_data_from = (rank + size - k) % size;
353       const int send_data_from = (rank + 1 + size - k) % size;
354       const int send_block_offset = 
355          ((send_data_from < split_rank)?
356           (send_data_from * early_blockcount) :
357           (send_data_from * late_blockcount + split_rank));
358       const int recv_block_offset = 
359          ((recv_data_from < split_rank)?
360           (recv_data_from * early_blockcount) :
361           (recv_data_from * late_blockcount + split_rank));
362       block_count = ((send_data_from < split_rank)? 
363                      early_blockcount : late_blockcount);
364
365       tmprecv = (char*)rbuf + recv_block_offset * extent;
366       tmpsend = (char*)rbuf + send_block_offset * extent;
367
368       smpi_mpi_sendrecv(tmpsend, block_count, dtype, send_to,
369                                      666,
370                                      tmprecv, early_blockcount, dtype, recv_from,
371                                      666,
372                                      comm, MPI_STATUS_IGNORE);
373
374    }
375
376    if (NULL != inbuf[0]) free(inbuf[0]);
377    if (NULL != inbuf[1]) free(inbuf[1]);
378
379    return MPI_SUCCESS;
380
381  error_hndl:
382    XBT_DEBUG("%s:%4d\tRank %d Error occurred %d\n",
383                 __FILE__, line, rank, ret);
384    if (NULL != inbuf[0]) free(inbuf[0]);
385    if (NULL != inbuf[1]) free(inbuf[1]);
386    return ret;
387 }