Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bb3251b1fbdd286ec511e9f99e8b4fcc7eb5571d
[simgrid.git] / src / smpi / colls / allreduce / allreduce-ompi-ring-segmented.cpp
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /*
8  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9  *                         University Research and Technology
10  *                         Corporation.  All rights reserved.
11  * Copyright (c) 2004-2009 The University of Tennessee and The University
12  *                         of Tennessee Research Foundation.  All rights
13  *                         reserved.
14  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15  *                         University of Stuttgart.  All rights reserved.
16  * Copyright (c) 2004-2005 The Regents of the University of California.
17  *                         All rights reserved.
18  * Copyright (c) 2009      University of Houston. All rights reserved.
19  *
20  * Additional copyrights may follow
21  *
22  *  Redistribution and use in source and binary forms, with or without
23  * modification, are permitted provided that the following conditions are
24  * met:
25
26  * - Redistributions of source code must retain the above copyright
27  *   notice, this list of conditions and the following disclaimer.
28
29  * - Redistributions in binary form must reproduce the above copyright
30  *   notice, this list of conditions and the following disclaimer listed
31  *   in this license in the documentation and/or other materials
32  *   provided with the distribution.
33
34  * - Neither the name of the copyright holders nor the names of its
35  *   contributors may be used to endorse or promote products derived from
36  *   this software without specific prior written permission.
37
38  * The copyright holders provide no reassurances that the source code
39  * provided does not infringe any patent, copyright, or any other
40  * intellectual property rights of third parties.  The copyright holders
41  * disclaim any liability to any recipient for claims brought against
42  * recipient by any third party for infringement of that parties
43  * intellectual property rights.
44
45  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
46  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
47  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
48  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
49  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
50  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
51  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
52  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
53  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
54  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
55  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
56  */
57
58 /*
59  *  ompi_coll_tuned_allreduce_intra_ring_segmented
60  *
61  *   Function:       Pipelined ring algorithm for allreduce operation
62  *   Accepts:        Same as MPI_Allreduce(), segment size
63  *   Returns:        MPI_SUCCESS or error code
64  *
65  *   Description:    Implements pipelined ring algorithm for allreduce: 
66  *                   user supplies suggested segment size for the pipelining of
67  *                   reduce operation.
68  *                   The segment size determines the number of phases, np, for 
69  *                   the algorithm execution.  
70  *                   The message is automatically divided into blocks of 
71  *                   approximately  (count / (np * segcount)) elements.
72  *                   At the end of reduction phase, allgather like step is 
73  *                   executed.
74  *                   Algorithm requires (np + 1)*(N - 1) steps.
75  *
76  *   Limitations:    The algorithm DOES NOT preserve order of operations so it 
77  *                   can be used only for commutative operations.
78  *                   In addition, algorithm cannot work if the total size is 
79  *                   less than size * segment size.
80  *         Example on 3 nodes with 2 phases
81  *         Initial state
82  *   #      0              1             2 
83  *        [00a]          [10a]         [20a]
84  *        [00b]          [10b]         [20b]
85  *        [01a]          [11a]         [21a]
86  *        [01b]          [11b]         [21b]
87  *        [02a]          [12a]         [22a]
88  *        [02b]          [12b]         [22b]
89  *
90  *        COMPUTATION PHASE 0 (a)
91  *         Step 0: rank r sends block ra to rank (r+1) and receives bloc (r-1)a 
92  *                 from rank (r-1) [with wraparound].
93  *    #     0              1             2  
94  *        [00a]        [00a+10a]       [20a]
95  *        [00b]          [10b]         [20b]
96  *        [01a]          [11a]       [11a+21a]
97  *        [01b]          [11b]         [21b]
98  *      [22a+02a]        [12a]         [22a]
99  *        [02b]          [12b]         [22b]
100  *
101  *         Step 1: rank r sends block (r-1)a to rank (r+1) and receives bloc 
102  *                 (r-2)a from rank (r-1) [with wraparound].
103  *    #     0              1             2  
104  *        [00a]        [00a+10a]   [00a+10a+20a]
105  *        [00b]          [10b]         [20b]
106  *    [11a+21a+01a]      [11a]       [11a+21a]
107  *        [01b]          [11b]         [21b]
108  *      [22a+02a]    [22a+02a+12a]     [22a]
109  *        [02b]          [12b]         [22b] 
110  *
111  *        COMPUTATION PHASE 1 (b)
112  *         Step 0: rank r sends block rb to rank (r+1) and receives bloc (r-1)b 
113  *                 from rank (r-1) [with wraparound].
114  *    #     0              1             2  
115  *        [00a]        [00a+10a]       [20a]
116  *        [00b]        [00b+10b]       [20b]
117  *        [01a]          [11a]       [11a+21a]
118  *        [01b]          [11b]       [11b+21b]
119  *      [22a+02a]        [12a]         [22a]
120  *      [22b+02b]        [12b]         [22b]
121  *
122  *         Step 1: rank r sends block (r-1)b to rank (r+1) and receives bloc 
123  *                 (r-2)b from rank (r-1) [with wraparound].
124  *    #     0              1             2  
125  *        [00a]        [00a+10a]   [00a+10a+20a]
126  *        [00b]          [10b]     [0bb+10b+20b]
127  *    [11a+21a+01a]      [11a]       [11a+21a]
128  *    [11b+21b+01b]      [11b]         [21b]
129  *      [22a+02a]    [22a+02a+12a]     [22a]
130  *        [02b]      [22b+01b+12b]     [22b] 
131  *
132  *         
133  *        DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1 (same as
134  *         in regular ring algorithm.
135  *
136  */
137  
138 #define COLL_TUNED_COMPUTED_SEGCOUNT(SEGSIZE, TYPELNG, SEGCOUNT)        \
139     if( ((SEGSIZE) >= (TYPELNG)) &&                                     \
140         ((SEGSIZE) < ((TYPELNG) * (SEGCOUNT))) ) {                      \
141         size_t residual;                                                \
142         (SEGCOUNT) = (int)((SEGSIZE) / (TYPELNG));                      \
143         residual = (SEGSIZE) - (SEGCOUNT) * (TYPELNG);                  \
144         if( residual > ((TYPELNG) >> 1) )                               \
145             (SEGCOUNT)++;                                               \
146     }                                                                   \
147     
148 #define COLL_TUNED_COMPUTE_BLOCKCOUNT( COUNT, NUM_BLOCKS, SPLIT_INDEX,       \
149                                        EARLY_BLOCK_COUNT, LATE_BLOCK_COUNT ) \
150     EARLY_BLOCK_COUNT = LATE_BLOCK_COUNT = COUNT / NUM_BLOCKS;               \
151     SPLIT_INDEX = COUNT % NUM_BLOCKS;                                        \
152     if (0 != SPLIT_INDEX) {                                                  \
153         EARLY_BLOCK_COUNT = EARLY_BLOCK_COUNT + 1;                           \
154     }                                                                        \
155
156 #include "../colls_private.h"
157 namespace simgrid{
158 namespace smpi{
159 int 
160 Coll_allreduce_ompi_ring_segmented::allreduce(void *sbuf, void *rbuf, int count,
161                                                MPI_Datatype dtype,
162                                                MPI_Op op,
163                                                MPI_Comm comm) 
164 {
165    int ret = MPI_SUCCESS;
166    int line;
167    int k, recv_from, send_to;
168    int early_blockcount, late_blockcount, split_rank; 
169    int segcount, max_segcount;
170    int num_phases, phase;
171    int block_count;
172    unsigned int inbi;
173    size_t typelng;
174    char *tmpsend = NULL, *tmprecv = NULL;
175    char *inbuf[2] = {NULL, NULL};
176    ptrdiff_t true_extent, extent;
177    ptrdiff_t block_offset, max_real_segsize;
178    MPI_Request reqs[2] = {NULL, NULL};
179    const size_t segsize = 1 << 20; /* 1 MB */
180    int size = comm->size();
181    int rank = comm->rank();
182
183    XBT_DEBUG("coll:tuned:allreduce_intra_ring_segmented rank %d, count %d", rank, count);
184
185    /* Special case for size == 1 */
186    if (1 == size) {
187       if (MPI_IN_PLACE != sbuf) {
188       ret= Datatype::copy(sbuf, count, dtype,rbuf, count, dtype);
189          if (ret < 0) { line = __LINE__; goto error_hndl; }
190       }
191       return MPI_SUCCESS;
192    }
193    
194    /* Determine segment count based on the suggested segment size */
195    extent = dtype->get_extent();
196    if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
197    true_extent = dtype->get_extent();
198    if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
199    typelng = dtype->size();
200    if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
201    segcount = count;
202    COLL_TUNED_COMPUTED_SEGCOUNT(segsize, typelng, segcount)
203
204    /* Special case for count less than size * segcount - use regular ring */
205    if (count < size * segcount) {
206       XBT_DEBUG( "coll:tuned:allreduce_ring_segmented rank %d/%d, count %d, switching to regular ring", rank, size, count);
207       return (Coll_allreduce_lr::allreduce(sbuf, rbuf, count, dtype, op, 
208                                                    comm));
209    }
210
211    /* Determine the number of phases of the algorithm */
212    num_phases = count / (size * segcount);
213    if ((count % (size * segcount) >= size) && 
214        (count % (size * segcount) > ((size * segcount) / 2))) {
215       num_phases++;
216    }
217
218    /* Determine the number of elements per block and corresponding 
219       block sizes.
220       The blocks are divided into "early" and "late" ones:
221       blocks 0 .. (split_rank - 1) are "early" and 
222       blocks (split_rank) .. (size - 1) are "late".
223       Early blocks are at most 1 element larger than the late ones.
224       Note, these blocks will be split into num_phases segments,
225       out of the largest one will have max_segcount elements.
226     */
227    COLL_TUNED_COMPUTE_BLOCKCOUNT( count, size, split_rank, 
228                                   early_blockcount, late_blockcount )
229    COLL_TUNED_COMPUTE_BLOCKCOUNT( early_blockcount, num_phases, inbi,
230                                   max_segcount, k)
231    max_real_segsize = true_extent + (max_segcount - 1) * extent;
232
233    /* Allocate and initialize temporary buffers */
234    inbuf[0] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize);
235    if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
236    if (size > 2) {
237       inbuf[1] = (char*)smpi_get_tmp_recvbuffer(max_real_segsize);
238       if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
239    }
240
241    /* Handle MPI_IN_PLACE */
242    if (MPI_IN_PLACE != sbuf) {
243       ret= Datatype::copy(sbuf, count, dtype,rbuf, count, dtype);
244       if (ret < 0) { line = __LINE__; goto error_hndl; }
245    }
246
247    /* Computation loop: for each phase, repeat ring allreduce computation loop */
248    for (phase = 0; phase < num_phases; phase ++) {
249       ptrdiff_t phase_offset;
250       int early_phase_segcount, late_phase_segcount, split_phase, phase_count;
251
252       /* 
253          For each of the remote nodes:
254          - post irecv for block (r-1)
255          - send block (r)
256            To do this, first compute block offset and count, and use block offset
257            to compute phase offset.
258          - in loop for every step k = 2 .. n
259            - post irecv for block (r + n - k) % n
260            - wait on block (r + n - k + 1) % n to arrive
261            - compute on block (r + n - k + 1) % n
262            - send block (r + n - k + 1) % n
263          - wait on block (r + 1)
264          - compute on block (r + 1)
265          - send block (r + 1) to rank (r + 1)
266          Note that we must be careful when computing the begining of buffers and
267          for send operations and computation we must compute the exact block size.
268       */
269       send_to = (rank + 1) % size;
270       recv_from = (rank + size - 1) % size;
271       
272       inbi = 0;
273       /* Initialize first receive from the neighbor on the left */
274       reqs[inbi] = Request::irecv(inbuf[inbi], max_segcount, dtype, recv_from,
275                                666, comm);
276       /* Send first block (my block) to the neighbor on the right:
277          - compute my block and phase offset
278          - send data */
279       block_offset = ((rank < split_rank)? 
280                       (rank * early_blockcount) : 
281                       (rank * late_blockcount + split_rank));
282       block_count = ((rank < split_rank)? early_blockcount : late_blockcount);
283       COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
284                                     early_phase_segcount, late_phase_segcount)
285       phase_count = ((phase < split_phase)?
286                      (early_phase_segcount) : (late_phase_segcount));
287       phase_offset = ((phase < split_phase)?
288                       (phase * early_phase_segcount) : 
289                       (phase * late_phase_segcount + split_phase));
290       tmpsend = ((char*)rbuf) + (block_offset + phase_offset) * extent;
291       Request::send(tmpsend, phase_count, dtype, send_to,
292                               666, comm);
293       
294       for (k = 2; k < size; k++) {
295          const int prevblock = (rank + size - k + 1) % size;
296          
297          inbi = inbi ^ 0x1;
298          
299          /* Post irecv for the current block */
300          reqs[inbi] = Request::irecv(inbuf[inbi], max_segcount, dtype, recv_from,
301                                666, comm);
302          if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
303          
304          /* Wait on previous block to arrive */
305          Request::wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
306          
307          /* Apply operation on previous block: result goes to rbuf
308             rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
309          */
310          block_offset = ((prevblock < split_rank)?
311                          (prevblock * early_blockcount) :
312                          (prevblock * late_blockcount + split_rank));
313          block_count = ((prevblock < split_rank)? 
314                         early_blockcount : late_blockcount);
315          COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
316                                        early_phase_segcount, late_phase_segcount)
317          phase_count = ((phase < split_phase)?
318                         (early_phase_segcount) : (late_phase_segcount));
319          phase_offset = ((phase < split_phase)?
320                          (phase * early_phase_segcount) : 
321                          (phase * late_phase_segcount + split_phase));
322          tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent;
323          if(op!=MPI_OP_NULL) op->apply( inbuf[inbi ^ 0x1], tmprecv, &phase_count, dtype);
324          /* send previous block to send_to */
325          Request::send(tmprecv, phase_count, dtype, send_to,
326                               666, comm);
327       }
328       
329       /* Wait on the last block to arrive */
330       Request::wait(&reqs[inbi], MPI_STATUS_IGNORE);
331
332       
333       /* Apply operation on the last block (from neighbor (rank + 1) 
334          rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
335       recv_from = (rank + 1) % size;
336       block_offset = ((recv_from < split_rank)?
337                       (recv_from * early_blockcount) :
338                       (recv_from * late_blockcount + split_rank));
339       block_count = ((recv_from < split_rank)? 
340                      early_blockcount : late_blockcount);
341       COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
342                                     early_phase_segcount, late_phase_segcount)
343       phase_count = ((phase < split_phase)?
344                      (early_phase_segcount) : (late_phase_segcount));
345       phase_offset = ((phase < split_phase)?
346                       (phase * early_phase_segcount) : 
347                       (phase * late_phase_segcount + split_phase));
348       tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent;
349       if(op!=MPI_OP_NULL) op->apply( inbuf[inbi], tmprecv, &phase_count, dtype);
350    }
351
352    /* Distribution loop - variation of ring allgather */
353    send_to = (rank + 1) % size;
354    recv_from = (rank + size - 1) % size;
355    for (k = 0; k < size - 1; k++) {
356       const int recv_data_from = (rank + size - k) % size;
357       const int send_data_from = (rank + 1 + size - k) % size;
358       const int send_block_offset = 
359          ((send_data_from < split_rank)?
360           (send_data_from * early_blockcount) :
361           (send_data_from * late_blockcount + split_rank));
362       const int recv_block_offset = 
363          ((recv_data_from < split_rank)?
364           (recv_data_from * early_blockcount) :
365           (recv_data_from * late_blockcount + split_rank));
366       block_count = ((send_data_from < split_rank)? 
367                      early_blockcount : late_blockcount);
368
369       tmprecv = (char*)rbuf + recv_block_offset * extent;
370       tmpsend = (char*)rbuf + send_block_offset * extent;
371
372       Request::sendrecv(tmpsend, block_count, dtype, send_to,
373                                      666,
374                                      tmprecv, early_blockcount, dtype, recv_from,
375                                      666,
376                                      comm, MPI_STATUS_IGNORE);
377
378    }
379
380    if (NULL != inbuf[0]) smpi_free_tmp_buffer(inbuf[0]);
381    if (NULL != inbuf[1]) smpi_free_tmp_buffer(inbuf[1]);
382
383    return MPI_SUCCESS;
384
385  error_hndl:
386    XBT_DEBUG("%s:%4d\tRank %d Error occurred %d\n",
387                 __FILE__, line, rank, ret);
388    if (NULL != inbuf[0]) smpi_free_tmp_buffer(inbuf[0]);
389    if (NULL != inbuf[1]) smpi_free_tmp_buffer(inbuf[1]);
390    return ret;
391 }
392 }
393 }