Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bd2d57891219527738ac886d8a14b81d2c18e56a
[simgrid.git] / src / smpi / colls / reduce_scatter-ompi.c
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /*
8  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9  *                         University Research and Technology
10  *                         Corporation.  All rights reserved.
11  * Copyright (c) 2004-2012 The University of Tennessee and The University
12  *                         of Tennessee Research Foundation.  All rights
13  *                         reserved.
14  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15  *                         University of Stuttgart.  All rights reserved.
16  * Copyright (c) 2004-2005 The Regents of the University of California.
17  *                         All rights reserved.
18  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
19  * Copyright (c) 2009      University of Houston. All rights reserved.
20  *
21  * Additional copyrights may follow
22  */
23
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26 #include "xbt/replay.h"
27
28 /*
29  * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
30  * I have removed the part which handles "large" message sizes 
31  * (non-overlapping version of reduce_Scatter).
32  */
33
34 /* copied function (with appropriate renaming) starts here */
35
36 /*
37  *  reduce_scatter_ompi_basic_recursivehalving
38  *
39  *  Function:   - reduce scatter implementation using recursive-halving 
40  *                algorithm
41  *  Accepts:    - same as MPI_Reduce_scatter()
42  *  Returns:    - MPI_SUCCESS or error code
43  *  Limitation: - Works only for commutative operations.
44  */
45 int
46 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf, 
47                                                             void *rbuf, 
48                                                             int *rcounts,
49                                                             MPI_Datatype dtype,
50                                                             MPI_Op op,
51                                                             MPI_Comm comm
52                                                             )
53 {
54     int i, rank, size, count, err = MPI_SUCCESS;
55     int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
56     ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
57     char *recv_buf = NULL, *recv_buf_free = NULL;
58     char *result_buf = NULL, *result_buf_free = NULL;
59    
60     /* Initialize */
61     rank = smpi_comm_rank(comm);
62     size = smpi_comm_size(comm);
63    
64     XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
65     if(!smpi_op_is_commute(op))
66       THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
67
68     /* Find displacements and the like */
69     disps = (int*) xbt_malloc(sizeof(int) * size);
70     if (NULL == disps) return MPI_ERR_OTHER;
71
72     disps[0] = 0;
73     for (i = 0; i < (size - 1); ++i) {
74         disps[i + 1] = disps[i] + rcounts[i];
75     }
76     count = disps[size - 1] + rcounts[size - 1];
77
78     /* short cut the trivial case */
79     if (0 == count) {
80         xbt_free(disps);
81         return MPI_SUCCESS;
82     }
83
84     /* get datatype information */
85     smpi_datatype_extent(dtype, &lb, &extent);
86     smpi_datatype_extent(dtype, &true_lb, &true_extent);
87     buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
88
89     /* Handle MPI_IN_PLACE */
90     if (MPI_IN_PLACE == sbuf) {
91         sbuf = rbuf;
92     }
93
94     /* Allocate temporary receive buffer. */
95     if(_xbt_replay_is_active()){
96       recv_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
97     }else{
98       recv_buf_free = (char*) xbt_malloc(buf_size);
99     }
100     recv_buf = recv_buf_free - lb;
101     if (NULL == recv_buf_free) {
102         err = MPI_ERR_OTHER;
103         goto cleanup;
104     }
105    
106     /* allocate temporary buffer for results */
107     if(_xbt_replay_is_active()){
108       result_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
109     }else{
110       result_buf_free = (char*) xbt_malloc(buf_size);
111     }
112     result_buf = result_buf_free - lb;
113    
114     /* copy local buffer into the temporary results */
115     err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
116     if (MPI_SUCCESS != err) goto cleanup;
117    
118     /* figure out power of two mapping: grow until larger than
119        comm size, then go back one, to get the largest power of
120        two less than comm size */
121     while (tmp_size <= size) tmp_size <<= 1;
122     tmp_size >>= 1;
123     remain = size - tmp_size;
124    
125     /* If comm size is not a power of two, have the first "remain"
126        procs with an even rank send to rank + 1, leaving a power of
127        two procs to do the rest of the algorithm */
128     if (rank < 2 * remain) {
129         if ((rank & 1) == 0) {
130             smpi_mpi_send(result_buf, count, dtype, rank + 1, 
131                                     COLL_TAG_REDUCE_SCATTER,
132                                     comm);
133             /* we don't participate from here on out */
134             tmp_rank = -1;
135         } else {
136             smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
137                                     COLL_TAG_REDUCE_SCATTER,
138                                     comm, MPI_STATUS_IGNORE);
139          
140             /* integrate their results into our temp results */
141             smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
142          
143             /* adjust rank to be the bottom "remain" ranks */
144             tmp_rank = rank / 2;
145         }
146     } else {
147         /* just need to adjust rank to show that the bottom "even
148            remain" ranks dropped out */
149         tmp_rank = rank - remain;
150     }
151    
152     /* For ranks not kicked out by the above code, perform the
153        recursive halving */
154     if (tmp_rank >= 0) {
155         int *tmp_disps = NULL, *tmp_rcounts = NULL;
156         int mask, send_index, recv_index, last_index;
157       
158         /* recalculate disps and rcounts to account for the
159            special "remainder" processes that are no longer doing
160            anything */
161         tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
162         if (NULL == tmp_rcounts) {
163             err = MPI_ERR_OTHER;
164             goto cleanup;
165         }
166         tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
167         if (NULL == tmp_disps) {
168             xbt_free(tmp_rcounts);
169             err = MPI_ERR_OTHER;
170             goto cleanup;
171         }
172
173         for (i = 0 ; i < tmp_size ; ++i) {
174             if (i < remain) {
175                 /* need to include old neighbor as well */
176                 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
177             } else {
178                 tmp_rcounts[i] = rcounts[i + remain];
179             }
180         }
181
182         tmp_disps[0] = 0;
183         for (i = 0; i < tmp_size - 1; ++i) {
184             tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
185         }
186
187         /* do the recursive halving communication.  Don't use the
188            dimension information on the communicator because I
189            think the information is invalidated by our "shrinking"
190            of the communicator */
191         mask = tmp_size >> 1;
192         send_index = recv_index = 0;
193         last_index = tmp_size;
194         while (mask > 0) {
195             int tmp_peer, peer, send_count, recv_count;
196             MPI_Request request;
197
198             tmp_peer = tmp_rank ^ mask;
199             peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
200
201             /* figure out if we're sending, receiving, or both */
202             send_count = recv_count = 0;
203             if (tmp_rank < tmp_peer) {
204                 send_index = recv_index + mask;
205                 for (i = send_index ; i < last_index ; ++i) {
206                     send_count += tmp_rcounts[i];
207                 }
208                 for (i = recv_index ; i < send_index ; ++i) {
209                     recv_count += tmp_rcounts[i];
210                 }
211             } else {
212                 recv_index = send_index + mask;
213                 for (i = send_index ; i < recv_index ; ++i) {
214                     send_count += tmp_rcounts[i];
215                 }
216                 for (i = recv_index ; i < last_index ; ++i) {
217                     recv_count += tmp_rcounts[i];
218                 }
219             }
220
221             /* actual data transfer.  Send from result_buf,
222                receive into recv_buf */
223             if (send_count > 0 && recv_count != 0) {
224                 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
225                                          recv_count, dtype, peer,
226                                          COLL_TAG_REDUCE_SCATTER,
227                                          comm);
228                 if (MPI_SUCCESS != err) {
229                     xbt_free(tmp_rcounts);
230                     xbt_free(tmp_disps);
231                     goto cleanup;
232                 }                                             
233             }
234             if (recv_count > 0 && send_count != 0) {
235                 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
236                                         send_count, dtype, peer, 
237                                         COLL_TAG_REDUCE_SCATTER,
238                                         comm);
239                 if (MPI_SUCCESS != err) {
240                     xbt_free(tmp_rcounts);
241                     xbt_free(tmp_disps);
242                     goto cleanup;
243                 }                                             
244             }
245             if (send_count > 0 && recv_count != 0) {
246                 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
247             }
248
249             /* if we received something on this step, push it into
250                the results buffer */
251             if (recv_count > 0) {
252                 smpi_op_apply(op, 
253                                recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent, 
254                                result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
255                                &recv_count, &dtype);
256             }
257
258             /* update for next iteration */
259             send_index = recv_index;
260             last_index = recv_index + mask;
261             mask >>= 1;
262         }
263
264         /* copy local results from results buffer into real receive buffer */
265         if (0 != rcounts[rank]) {
266             err = smpi_datatype_copy(result_buf + disps[rank] * extent,
267                                        rcounts[rank], dtype, 
268                                        rbuf, rcounts[rank], dtype);
269             if (MPI_SUCCESS != err) {
270                 xbt_free(tmp_rcounts);
271                 xbt_free(tmp_disps);
272                 goto cleanup;
273             }                                             
274         }
275
276         xbt_free(tmp_rcounts);
277         xbt_free(tmp_disps);
278     }
279
280     /* Now fix up the non-power of two case, by having the odd
281        procs send the even procs the proper results */
282     if (rank < (2 * remain)) {
283         if ((rank & 1) == 0) {
284             if (rcounts[rank]) {
285                 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
286                                         COLL_TAG_REDUCE_SCATTER,
287                                         comm, MPI_STATUS_IGNORE);
288             }
289         } else {
290             if (rcounts[rank - 1]) {
291                 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
292                                         rcounts[rank - 1], dtype, rank - 1,
293                                         COLL_TAG_REDUCE_SCATTER,
294                                         comm);
295             }
296         }            
297     }
298
299  cleanup:
300     if (NULL != disps) xbt_free(disps);
301     
302     if (!_xbt_replay_is_active()){
303       if (NULL != recv_buf_free) xbt_free(recv_buf_free);
304       if (NULL != result_buf_free) xbt_free(result_buf_free);
305     }else{
306       if (NULL != recv_buf_free) SMPI_SHARED_FREE(recv_buf_free);
307       if (NULL != result_buf_free) SMPI_SHARED_FREE(result_buf_free);
308     }
309     return err;
310 }
311
312 /* copied function (with appropriate renaming) ends here */
313
314
315 /*
316  *   smpi_coll_tuned_reduce_scatter_ompi_ring
317  *
318  *   Function:       Ring algorithm for reduce_scatter operation
319  *   Accepts:        Same as MPI_Reduce_scatter()
320  *   Returns:        MPI_SUCCESS or error code
321  *
322  *   Description:    Implements ring algorithm for reduce_scatter: 
323  *                   the block sizes defined in rcounts are exchanged and 
324  8                    updated until they reach proper destination.
325  *                   Algorithm requires 2 * max(rcounts) extra buffering
326  *
327  *   Limitations:    The algorithm DOES NOT preserve order of operations so it 
328  *                   can be used only for commutative operations.
329  *         Example on 5 nodes:
330  *         Initial state
331  *   #      0              1             2              3             4
332  *        [00]           [10]   ->     [20]           [30]           [40]
333  *        [01]           [11]          [21]  ->       [31]           [41]
334  *        [02]           [12]          [22]           [32]  ->       [42]
335  *    ->  [03]           [13]          [23]           [33]           [43] --> ..
336  *        [04]  ->       [14]          [24]           [34]           [44]
337  *
338  *        COMPUTATION PHASE
339  *         Step 0: rank r sends block (r-1) to rank (r+1) and 
340  *                 receives block (r+1) from rank (r-1) [with wraparound].
341  *   #      0              1             2              3             4
342  *        [00]           [10]        [10+20]   ->     [30]           [40]
343  *        [01]           [11]          [21]          [21+31]  ->     [41]
344  *    ->  [02]           [12]          [22]           [32]         [32+42] -->..
345  *      [43+03] ->       [13]          [23]           [33]           [43]
346  *        [04]         [04+14]  ->     [24]           [34]           [44]
347  *         
348  *         Step 1:
349  *   #      0              1             2              3             4
350  *        [00]           [10]        [10+20]       [10+20+30] ->     [40]
351  *    ->  [01]           [11]          [21]          [21+31]      [21+31+41] ->
352  *     [32+42+02] ->     [12]          [22]           [32]         [32+42] 
353  *        [03]        [43+03+13] ->    [23]           [33]           [43]
354  *        [04]         [04+14]      [04+14+24]  ->    [34]           [44]
355  *
356  *         Step 2:
357  *   #      0              1             2              3             4
358  *     -> [00]           [10]        [10+20]       [10+20+30]   [10+20+30+40] ->
359  *   [21+31+41+01]->     [11]          [21]          [21+31]      [21+31+41]
360  *     [32+42+02]   [32+42+02+12]->    [22]           [32]         [32+42] 
361  *        [03]        [43+03+13]   [43+03+13+23]->    [33]           [43]
362  *        [04]         [04+14]      [04+14+24]    [04+14+24+34] ->   [44]
363  *
364  *         Step 3:
365  *   #      0             1              2              3             4
366  * [10+20+30+40+00]     [10]         [10+20]       [10+20+30]   [10+20+30+40]
367  *  [21+31+41+01] [21+31+41+01+11]     [21]          [21+31]      [21+31+41]
368  *    [32+42+02]   [32+42+02+12] [32+42+02+12+22]     [32]         [32+42] 
369  *       [03]        [43+03+13]    [43+03+13+23] [43+03+13+23+33]    [43]
370  *       [04]         [04+14]       [04+14+24]    [04+14+24+34] [04+14+24+34+44]
371  *    DONE :)
372  *
373  */
374 int 
375 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
376                                           MPI_Datatype dtype,
377                                           MPI_Op op,
378                                           MPI_Comm comm
379                                           )
380 {
381     int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
382     int inbi, *displs = NULL;
383     char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
384     char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
385     ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
386     MPI_Request reqs[2] = {NULL, NULL};
387
388     size = smpi_comm_size(comm);
389     rank = smpi_comm_rank(comm);
390
391     XBT_DEBUG(  "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d", 
392                  rank, size);
393
394     /* Determine the maximum number of elements per node, 
395        corresponding block size, and displacements array.
396     */
397     displs = (int*) xbt_malloc(size * sizeof(int));
398     if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
399     displs[0] = 0;
400     total_count = rcounts[0];
401     max_block_count = rcounts[0];
402     for (i = 1; i < size; i++) { 
403         displs[i] = total_count;
404         total_count += rcounts[i];
405         if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
406     }
407       
408     /* Special case for size == 1 */
409     if (1 == size) {
410         if (MPI_IN_PLACE != sbuf) {
411             ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
412             if (ret < 0) { line = __LINE__; goto error_hndl; }
413         }
414         xbt_free(displs);
415         return MPI_SUCCESS;
416     }
417
418     /* Allocate and initialize temporary buffers, we need:
419        - a temporary buffer to perform reduction (size total_count) since
420        rbuf can be of rcounts[rank] size.
421        - up to two temporary buffers used for communication/computation overlap.
422     */
423     smpi_datatype_extent(dtype, &lb, &extent);
424     smpi_datatype_extent(dtype, &true_lb, &true_extent);
425
426     max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
427
428     accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
429     if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
430     accumbuf = accumbuf_free - lb;
431
432     inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
433     if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
434     inbuf[0] = inbuf_free[0] - lb;
435     if (size > 2) {
436         inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
437         if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
438         inbuf[1] = inbuf_free[1] - lb;
439     }
440
441     /* Handle MPI_IN_PLACE for size > 1 */
442     if (MPI_IN_PLACE == sbuf) {
443         sbuf = rbuf;
444     }
445
446     ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
447     if (ret < 0) { line = __LINE__; goto error_hndl; }
448
449     /* Computation loop */
450
451     /* 
452        For each of the remote nodes:
453        - post irecv for block (r-2) from (r-1) with wrap around
454        - send block (r-1) to (r+1)
455        - in loop for every step k = 2 .. n
456        - post irecv for block (r - 1 + n - k) % n
457        - wait on block (r + n - k) % n to arrive
458        - compute on block (r + n - k ) % n
459        - send block (r + n - k) % n
460        - wait on block (r)
461        - compute on block (r)
462        - copy block (r) to rbuf
463        Note that we must be careful when computing the begining of buffers and
464        for send operations and computation we must compute the exact block size.
465     */
466     send_to = (rank + 1) % size;
467     recv_from = (rank + size - 1) % size;
468
469     inbi = 0;
470     /* Initialize first receive from the neighbor on the left */
471     reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
472                              COLL_TAG_REDUCE_SCATTER, comm
473                              );
474     tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
475     smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
476                             COLL_TAG_REDUCE_SCATTER,
477                              comm);
478
479     for (k = 2; k < size; k++) {
480         const int prevblock = (rank + size - k) % size;
481       
482         inbi = inbi ^ 0x1;
483
484         /* Post irecv for the current block */
485         reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
486                                  COLL_TAG_REDUCE_SCATTER, comm
487                                  );
488       
489         /* Wait on previous block to arrive */
490         smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
491       
492         /* Apply operation on previous block: result goes to rbuf
493            rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
494         */
495         tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
496         smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
497       
498         /* send previous block to send_to */
499         smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
500                                 COLL_TAG_REDUCE_SCATTER,
501                                  comm);
502     }
503
504     /* Wait on the last block to arrive */
505     smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
506
507     /* Apply operation on the last block (my block)
508        rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
509     tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
510     smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
511    
512     /* Copy result from tmprecv to rbuf */
513     ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
514     if (ret < 0) { line = __LINE__; goto error_hndl; }
515
516     if (NULL != displs) xbt_free(displs);
517     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
518     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
519     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
520
521     return MPI_SUCCESS;
522
523  error_hndl:
524     XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
525                  __FILE__, line, rank, ret);
526     if (NULL != displs) xbt_free(displs);
527     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
528     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
529     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
530     return ret;
531 }
532