Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
avoid warning by checking index in the right order
[simgrid.git] / src / smpi / colls / reduce_scatter-ompi.c
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /*
8  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9  *                         University Research and Technology
10  *                         Corporation.  All rights reserved.
11  * Copyright (c) 2004-2012 The University of Tennessee and The University
12  *                         of Tennessee Research Foundation.  All rights
13  *                         reserved.
14  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15  *                         University of Stuttgart.  All rights reserved.
16  * Copyright (c) 2004-2005 The Regents of the University of California.
17  *                         All rights reserved.
18  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
19  * Copyright (c) 2009      University of Houston. All rights reserved.
20  *
21  * Additional copyrights may follow
22  */
23
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26
27 /*
28  * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
29  * I have removed the part which handles "large" message sizes 
30  * (non-overlapping version of reduce_Scatter).
31  */
32
33 /* copied function (with appropriate renaming) starts here */
34
35 /*
36  *  reduce_scatter_ompi_basic_recursivehalving
37  *
38  *  Function:   - reduce scatter implementation using recursive-halving 
39  *                algorithm
40  *  Accepts:    - same as MPI_Reduce_scatter()
41  *  Returns:    - MPI_SUCCESS or error code
42  *  Limitation: - Works only for commutative operations.
43  */
44 int
45 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf, 
46                                                             void *rbuf, 
47                                                             int *rcounts,
48                                                             MPI_Datatype dtype,
49                                                             MPI_Op op,
50                                                             MPI_Comm comm
51                                                             )
52 {
53     int i, rank, size, count, err = MPI_SUCCESS;
54     int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
55     ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
56     char *recv_buf = NULL, *recv_buf_free = NULL;
57     char *result_buf = NULL, *result_buf_free = NULL;
58    
59     /* Initialize */
60     rank = smpi_comm_rank(comm);
61     size = smpi_comm_size(comm);
62    
63     XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
64     if(!smpi_op_is_commute(op))
65       THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
66
67     /* Find displacements and the like */
68     disps = (int*) xbt_malloc(sizeof(int) * size);
69     if (NULL == disps) return MPI_ERR_OTHER;
70
71     disps[0] = 0;
72     for (i = 0; i < (size - 1); ++i) {
73         disps[i + 1] = disps[i] + rcounts[i];
74     }
75     count = disps[size - 1] + rcounts[size - 1];
76
77     /* short cut the trivial case */
78     if (0 == count) {
79         xbt_free(disps);
80         return MPI_SUCCESS;
81     }
82
83     /* get datatype information */
84     smpi_datatype_extent(dtype, &lb, &extent);
85     smpi_datatype_extent(dtype, &true_lb, &true_extent);
86     buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
87
88     /* Handle MPI_IN_PLACE */
89     if (MPI_IN_PLACE == sbuf) {
90         sbuf = rbuf;
91     }
92
93     /* Allocate temporary receive buffer. */
94     recv_buf_free = (char*) xbt_malloc(buf_size);
95     recv_buf = recv_buf_free - lb;
96     if (NULL == recv_buf_free) {
97         err = MPI_ERR_OTHER;
98         goto cleanup;
99     }
100    
101     /* allocate temporary buffer for results */
102     result_buf_free = (char*) xbt_malloc(buf_size);
103     result_buf = result_buf_free - lb;
104    
105     /* copy local buffer into the temporary results */
106     err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
107     if (MPI_SUCCESS != err) goto cleanup;
108    
109     /* figure out power of two mapping: grow until larger than
110        comm size, then go back one, to get the largest power of
111        two less than comm size */
112     while (tmp_size <= size) tmp_size <<= 1;
113     tmp_size >>= 1;
114     remain = size - tmp_size;
115    
116     /* If comm size is not a power of two, have the first "remain"
117        procs with an even rank send to rank + 1, leaving a power of
118        two procs to do the rest of the algorithm */
119     if (rank < 2 * remain) {
120         if ((rank & 1) == 0) {
121             smpi_mpi_send(result_buf, count, dtype, rank + 1, 
122                                     COLL_TAG_REDUCE_SCATTER,
123                                     comm);
124             /* we don't participate from here on out */
125             tmp_rank = -1;
126         } else {
127             smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
128                                     COLL_TAG_REDUCE_SCATTER,
129                                     comm, MPI_STATUS_IGNORE);
130          
131             /* integrate their results into our temp results */
132             smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
133          
134             /* adjust rank to be the bottom "remain" ranks */
135             tmp_rank = rank / 2;
136         }
137     } else {
138         /* just need to adjust rank to show that the bottom "even
139            remain" ranks dropped out */
140         tmp_rank = rank - remain;
141     }
142    
143     /* For ranks not kicked out by the above code, perform the
144        recursive halving */
145     if (tmp_rank >= 0) {
146         int *tmp_disps = NULL, *tmp_rcounts = NULL;
147         int mask, send_index, recv_index, last_index;
148       
149         /* recalculate disps and rcounts to account for the
150            special "remainder" processes that are no longer doing
151            anything */
152         tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
153         if (NULL == tmp_rcounts) {
154             err = MPI_ERR_OTHER;
155             goto cleanup;
156         }
157         tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
158         if (NULL == tmp_disps) {
159             xbt_free(tmp_rcounts);
160             err = MPI_ERR_OTHER;
161             goto cleanup;
162         }
163
164         for (i = 0 ; i < tmp_size ; ++i) {
165             if (i < remain) {
166                 /* need to include old neighbor as well */
167                 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
168             } else {
169                 tmp_rcounts[i] = rcounts[i + remain];
170             }
171         }
172
173         tmp_disps[0] = 0;
174         for (i = 0; i < tmp_size - 1; ++i) {
175             tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
176         }
177
178         /* do the recursive halving communication.  Don't use the
179            dimension information on the communicator because I
180            think the information is invalidated by our "shrinking"
181            of the communicator */
182         mask = tmp_size >> 1;
183         send_index = recv_index = 0;
184         last_index = tmp_size;
185         while (mask > 0) {
186             int tmp_peer, peer, send_count, recv_count;
187             MPI_Request request;
188
189             tmp_peer = tmp_rank ^ mask;
190             peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
191
192             /* figure out if we're sending, receiving, or both */
193             send_count = recv_count = 0;
194             if (tmp_rank < tmp_peer) {
195                 send_index = recv_index + mask;
196                 for (i = send_index ; i < last_index ; ++i) {
197                     send_count += tmp_rcounts[i];
198                 }
199                 for (i = recv_index ; i < send_index ; ++i) {
200                     recv_count += tmp_rcounts[i];
201                 }
202             } else {
203                 recv_index = send_index + mask;
204                 for (i = send_index ; i < recv_index ; ++i) {
205                     send_count += tmp_rcounts[i];
206                 }
207                 for (i = recv_index ; i < last_index ; ++i) {
208                     recv_count += tmp_rcounts[i];
209                 }
210             }
211
212             /* actual data transfer.  Send from result_buf,
213                receive into recv_buf */
214             if (send_count > 0 && recv_count != 0) {
215                 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
216                                          recv_count, dtype, peer,
217                                          COLL_TAG_REDUCE_SCATTER,
218                                          comm);
219                 if (MPI_SUCCESS != err) {
220                     xbt_free(tmp_rcounts);
221                     xbt_free(tmp_disps);
222                     goto cleanup;
223                 }                                             
224             }
225             if (recv_count > 0 && send_count != 0) {
226                 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
227                                         send_count, dtype, peer, 
228                                         COLL_TAG_REDUCE_SCATTER,
229                                         comm);
230                 if (MPI_SUCCESS != err) {
231                     xbt_free(tmp_rcounts);
232                     xbt_free(tmp_disps);
233                     goto cleanup;
234                 }                                             
235             }
236             if (send_count > 0 && recv_count != 0) {
237                 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
238             }
239
240             /* if we received something on this step, push it into
241                the results buffer */
242             if (recv_count > 0) {
243                 smpi_op_apply(op, 
244                                recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent, 
245                                result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
246                                &recv_count, &dtype);
247             }
248
249             /* update for next iteration */
250             send_index = recv_index;
251             last_index = recv_index + mask;
252             mask >>= 1;
253         }
254
255         /* copy local results from results buffer into real receive buffer */
256         if (0 != rcounts[rank]) {
257             err = smpi_datatype_copy(result_buf + disps[rank] * extent,
258                                        rcounts[rank], dtype, 
259                                        rbuf, rcounts[rank], dtype);
260             if (MPI_SUCCESS != err) {
261                 xbt_free(tmp_rcounts);
262                 xbt_free(tmp_disps);
263                 goto cleanup;
264             }                                             
265         }
266
267         xbt_free(tmp_rcounts);
268         xbt_free(tmp_disps);
269     }
270
271     /* Now fix up the non-power of two case, by having the odd
272        procs send the even procs the proper results */
273     if (rank < (2 * remain)) {
274         if ((rank & 1) == 0) {
275             if (rcounts[rank]) {
276                 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
277                                         COLL_TAG_REDUCE_SCATTER,
278                                         comm, MPI_STATUS_IGNORE);
279             }
280         } else {
281             if (rcounts[rank - 1]) {
282                 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
283                                         rcounts[rank - 1], dtype, rank - 1,
284                                         COLL_TAG_REDUCE_SCATTER,
285                                         comm);
286             }
287         }            
288     }
289
290  cleanup:
291     if (NULL != disps) xbt_free(disps);
292     if (NULL != recv_buf_free) xbt_free(recv_buf_free);
293     if (NULL != result_buf_free) xbt_free(result_buf_free);
294
295     return err;
296 }
297
298 /* copied function (with appropriate renaming) ends here */
299
300
301 /*
302  *   smpi_coll_tuned_reduce_scatter_ompi_ring
303  *
304  *   Function:       Ring algorithm for reduce_scatter operation
305  *   Accepts:        Same as MPI_Reduce_scatter()
306  *   Returns:        MPI_SUCCESS or error code
307  *
308  *   Description:    Implements ring algorithm for reduce_scatter: 
309  *                   the block sizes defined in rcounts are exchanged and 
310  8                    updated until they reach proper destination.
311  *                   Algorithm requires 2 * max(rcounts) extra buffering
312  *
313  *   Limitations:    The algorithm DOES NOT preserve order of operations so it 
314  *                   can be used only for commutative operations.
315  *         Example on 5 nodes:
316  *         Initial state
317  *   #      0              1             2              3             4
318  *        [00]           [10]   ->     [20]           [30]           [40]
319  *        [01]           [11]          [21]  ->       [31]           [41]
320  *        [02]           [12]          [22]           [32]  ->       [42]
321  *    ->  [03]           [13]          [23]           [33]           [43] --> ..
322  *        [04]  ->       [14]          [24]           [34]           [44]
323  *
324  *        COMPUTATION PHASE
325  *         Step 0: rank r sends block (r-1) to rank (r+1) and 
326  *                 receives block (r+1) from rank (r-1) [with wraparound].
327  *   #      0              1             2              3             4
328  *        [00]           [10]        [10+20]   ->     [30]           [40]
329  *        [01]           [11]          [21]          [21+31]  ->     [41]
330  *    ->  [02]           [12]          [22]           [32]         [32+42] -->..
331  *      [43+03] ->       [13]          [23]           [33]           [43]
332  *        [04]         [04+14]  ->     [24]           [34]           [44]
333  *         
334  *         Step 1:
335  *   #      0              1             2              3             4
336  *        [00]           [10]        [10+20]       [10+20+30] ->     [40]
337  *    ->  [01]           [11]          [21]          [21+31]      [21+31+41] ->
338  *     [32+42+02] ->     [12]          [22]           [32]         [32+42] 
339  *        [03]        [43+03+13] ->    [23]           [33]           [43]
340  *        [04]         [04+14]      [04+14+24]  ->    [34]           [44]
341  *
342  *         Step 2:
343  *   #      0              1             2              3             4
344  *     -> [00]           [10]        [10+20]       [10+20+30]   [10+20+30+40] ->
345  *   [21+31+41+01]->     [11]          [21]          [21+31]      [21+31+41]
346  *     [32+42+02]   [32+42+02+12]->    [22]           [32]         [32+42] 
347  *        [03]        [43+03+13]   [43+03+13+23]->    [33]           [43]
348  *        [04]         [04+14]      [04+14+24]    [04+14+24+34] ->   [44]
349  *
350  *         Step 3:
351  *   #      0             1              2              3             4
352  * [10+20+30+40+00]     [10]         [10+20]       [10+20+30]   [10+20+30+40]
353  *  [21+31+41+01] [21+31+41+01+11]     [21]          [21+31]      [21+31+41]
354  *    [32+42+02]   [32+42+02+12] [32+42+02+12+22]     [32]         [32+42] 
355  *       [03]        [43+03+13]    [43+03+13+23] [43+03+13+23+33]    [43]
356  *       [04]         [04+14]       [04+14+24]    [04+14+24+34] [04+14+24+34+44]
357  *    DONE :)
358  *
359  */
360 int 
361 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
362                                           MPI_Datatype dtype,
363                                           MPI_Op op,
364                                           MPI_Comm comm
365                                           )
366 {
367     int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
368     int inbi, *displs = NULL;
369     char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
370     char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
371     ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
372     MPI_Request reqs[2] = {NULL, NULL};
373
374     size = smpi_comm_size(comm);
375     rank = smpi_comm_rank(comm);
376
377     XBT_DEBUG(  "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d", 
378                  rank, size);
379
380     /* Determine the maximum number of elements per node, 
381        corresponding block size, and displacements array.
382     */
383     displs = (int*) xbt_malloc(size * sizeof(int));
384     if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
385     displs[0] = 0;
386     total_count = rcounts[0];
387     max_block_count = rcounts[0];
388     for (i = 1; i < size; i++) { 
389         displs[i] = total_count;
390         total_count += rcounts[i];
391         if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
392     }
393       
394     /* Special case for size == 1 */
395     if (1 == size) {
396         if (MPI_IN_PLACE != sbuf) {
397             ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
398             if (ret < 0) { line = __LINE__; goto error_hndl; }
399         }
400         xbt_free(displs);
401         return MPI_SUCCESS;
402     }
403
404     /* Allocate and initialize temporary buffers, we need:
405        - a temporary buffer to perform reduction (size total_count) since
406        rbuf can be of rcounts[rank] size.
407        - up to two temporary buffers used for communication/computation overlap.
408     */
409     smpi_datatype_extent(dtype, &lb, &extent);
410     smpi_datatype_extent(dtype, &true_lb, &true_extent);
411
412     max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
413
414     accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
415     if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
416     accumbuf = accumbuf_free - lb;
417
418     inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
419     if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
420     inbuf[0] = inbuf_free[0] - lb;
421     if (size > 2) {
422         inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
423         if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
424         inbuf[1] = inbuf_free[1] - lb;
425     }
426
427     /* Handle MPI_IN_PLACE for size > 1 */
428     if (MPI_IN_PLACE == sbuf) {
429         sbuf = rbuf;
430     }
431
432     ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
433     if (ret < 0) { line = __LINE__; goto error_hndl; }
434
435     /* Computation loop */
436
437     /* 
438        For each of the remote nodes:
439        - post irecv for block (r-2) from (r-1) with wrap around
440        - send block (r-1) to (r+1)
441        - in loop for every step k = 2 .. n
442        - post irecv for block (r - 1 + n - k) % n
443        - wait on block (r + n - k) % n to arrive
444        - compute on block (r + n - k ) % n
445        - send block (r + n - k) % n
446        - wait on block (r)
447        - compute on block (r)
448        - copy block (r) to rbuf
449        Note that we must be careful when computing the begining of buffers and
450        for send operations and computation we must compute the exact block size.
451     */
452     send_to = (rank + 1) % size;
453     recv_from = (rank + size - 1) % size;
454
455     inbi = 0;
456     /* Initialize first receive from the neighbor on the left */
457     reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
458                              COLL_TAG_REDUCE_SCATTER, comm
459                              );
460     tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
461     smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
462                             COLL_TAG_REDUCE_SCATTER,
463                              comm);
464
465     for (k = 2; k < size; k++) {
466         const int prevblock = (rank + size - k) % size;
467       
468         inbi = inbi ^ 0x1;
469
470         /* Post irecv for the current block */
471         reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
472                                  COLL_TAG_REDUCE_SCATTER, comm
473                                  );
474       
475         /* Wait on previous block to arrive */
476         smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
477       
478         /* Apply operation on previous block: result goes to rbuf
479            rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
480         */
481         tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
482         smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
483       
484         /* send previous block to send_to */
485         smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
486                                 COLL_TAG_REDUCE_SCATTER,
487                                  comm);
488     }
489
490     /* Wait on the last block to arrive */
491     smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
492
493     /* Apply operation on the last block (my block)
494        rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
495     tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
496     smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
497    
498     /* Copy result from tmprecv to rbuf */
499     ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
500     if (ret < 0) { line = __LINE__; goto error_hndl; }
501
502     if (NULL != displs) xbt_free(displs);
503     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
504     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
505     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
506
507     return MPI_SUCCESS;
508
509  error_hndl:
510     XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
511                  __FILE__, line, rank, ret);
512     if (NULL != displs) xbt_free(displs);
513     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
514     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
515     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
516     return ret;
517 }
518