Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Sonar improvements
[simgrid.git] / src / smpi / colls / reduce_scatter-ompi.c
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /*
8  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9  *                         University Research and Technology
10  *                         Corporation.  All rights reserved.
11  * Copyright (c) 2004-2012 The University of Tennessee and The University
12  *                         of Tennessee Research Foundation.  All rights
13  *                         reserved.
14  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15  *                         University of Stuttgart.  All rights reserved.
16  * Copyright (c) 2004-2005 The Regents of the University of California.
17  *                         All rights reserved.
18  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
19  * Copyright (c) 2009      University of Houston. All rights reserved.
20  *
21  * Additional copyrights may follow
22  */
23
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26 #include "xbt/replay.h"
27
28 /*
29  * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
30  * I have removed the part which handles "large" message sizes 
31  * (non-overlapping version of reduce_Scatter).
32  */
33
34 /* copied function (with appropriate renaming) starts here */
35
36 /*
37  *  reduce_scatter_ompi_basic_recursivehalving
38  *
39  *  Function:   - reduce scatter implementation using recursive-halving 
40  *                algorithm
41  *  Accepts:    - same as MPI_Reduce_scatter()
42  *  Returns:    - MPI_SUCCESS or error code
43  *  Limitation: - Works only for commutative operations.
44  */
45 int
46 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf, 
47                                                             void *rbuf, 
48                                                             int *rcounts,
49                                                             MPI_Datatype dtype,
50                                                             MPI_Op op,
51                                                             MPI_Comm comm
52                                                             )
53 {
54     int i, rank, size, count, err = MPI_SUCCESS;
55     int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
56     ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
57     char *recv_buf = NULL, *recv_buf_free = NULL;
58     char *result_buf = NULL, *result_buf_free = NULL;
59    
60     /* Initialize */
61     rank = smpi_comm_rank(comm);
62     size = smpi_comm_size(comm);
63    
64     XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
65     if(!smpi_op_is_commute(op))
66       THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
67
68     /* Find displacements and the like */
69     disps = (int*) xbt_malloc(sizeof(int) * size);
70     if (NULL == disps) return MPI_ERR_OTHER;
71
72     disps[0] = 0;
73     for (i = 0; i < (size - 1); ++i) {
74         disps[i + 1] = disps[i] + rcounts[i];
75     }
76     count = disps[size - 1] + rcounts[size - 1];
77
78     /* short cut the trivial case */
79     if (0 == count) {
80         xbt_free(disps);
81         return MPI_SUCCESS;
82     }
83
84     /* get datatype information */
85     smpi_datatype_extent(dtype, &lb, &extent);
86     smpi_datatype_extent(dtype, &true_lb, &true_extent);
87     buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
88
89     /* Handle MPI_IN_PLACE */
90     if (MPI_IN_PLACE == sbuf) {
91         sbuf = rbuf;
92     }
93
94     /* Allocate temporary receive buffer. */
95     recv_buf_free = (char*) smpi_get_tmp_recvbuffer(buf_size);
96
97     recv_buf = recv_buf_free - lb;
98     if (NULL == recv_buf_free) {
99         err = MPI_ERR_OTHER;
100         goto cleanup;
101     }
102    
103     /* allocate temporary buffer for results */
104     result_buf_free = (char*) smpi_get_tmp_sendbuffer(buf_size);
105
106     result_buf = result_buf_free - lb;
107    
108     /* copy local buffer into the temporary results */
109     err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
110     if (MPI_SUCCESS != err) goto cleanup;
111    
112     /* figure out power of two mapping: grow until larger than
113        comm size, then go back one, to get the largest power of
114        two less than comm size */
115     while (tmp_size <= size) tmp_size <<= 1;
116     tmp_size >>= 1;
117     remain = size - tmp_size;
118    
119     /* If comm size is not a power of two, have the first "remain"
120        procs with an even rank send to rank + 1, leaving a power of
121        two procs to do the rest of the algorithm */
122     if (rank < 2 * remain) {
123         if ((rank & 1) == 0) {
124             smpi_mpi_send(result_buf, count, dtype, rank + 1, 
125                                     COLL_TAG_REDUCE_SCATTER,
126                                     comm);
127             /* we don't participate from here on out */
128             tmp_rank = -1;
129         } else {
130             smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
131                                     COLL_TAG_REDUCE_SCATTER,
132                                     comm, MPI_STATUS_IGNORE);
133          
134             /* integrate their results into our temp results */
135             smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
136          
137             /* adjust rank to be the bottom "remain" ranks */
138             tmp_rank = rank / 2;
139         }
140     } else {
141         /* just need to adjust rank to show that the bottom "even
142            remain" ranks dropped out */
143         tmp_rank = rank - remain;
144     }
145    
146     /* For ranks not kicked out by the above code, perform the
147        recursive halving */
148     if (tmp_rank >= 0) {
149         int *tmp_disps = NULL, *tmp_rcounts = NULL;
150         int mask, send_index, recv_index, last_index;
151       
152         /* recalculate disps and rcounts to account for the
153            special "remainder" processes that are no longer doing
154            anything */
155         tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
156         if (NULL == tmp_rcounts) {
157             err = MPI_ERR_OTHER;
158             goto cleanup;
159         }
160         tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
161         if (NULL == tmp_disps) {
162             xbt_free(tmp_rcounts);
163             err = MPI_ERR_OTHER;
164             goto cleanup;
165         }
166
167         for (i = 0 ; i < tmp_size ; ++i) {
168             if (i < remain) {
169                 /* need to include old neighbor as well */
170                 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
171             } else {
172                 tmp_rcounts[i] = rcounts[i + remain];
173             }
174         }
175
176         tmp_disps[0] = 0;
177         for (i = 0; i < tmp_size - 1; ++i) {
178             tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
179         }
180
181         /* do the recursive halving communication.  Don't use the
182            dimension information on the communicator because I
183            think the information is invalidated by our "shrinking"
184            of the communicator */
185         mask = tmp_size >> 1;
186         send_index = recv_index = 0;
187         last_index = tmp_size;
188         while (mask > 0) {
189             int tmp_peer, peer, send_count, recv_count;
190             MPI_Request request;
191
192             tmp_peer = tmp_rank ^ mask;
193             peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
194
195             /* figure out if we're sending, receiving, or both */
196             send_count = recv_count = 0;
197             if (tmp_rank < tmp_peer) {
198                 send_index = recv_index + mask;
199                 for (i = send_index ; i < last_index ; ++i) {
200                     send_count += tmp_rcounts[i];
201                 }
202                 for (i = recv_index ; i < send_index ; ++i) {
203                     recv_count += tmp_rcounts[i];
204                 }
205             } else {
206                 recv_index = send_index + mask;
207                 for (i = send_index ; i < recv_index ; ++i) {
208                     send_count += tmp_rcounts[i];
209                 }
210                 for (i = recv_index ; i < last_index ; ++i) {
211                     recv_count += tmp_rcounts[i];
212                 }
213             }
214
215             /* actual data transfer.  Send from result_buf,
216                receive into recv_buf */
217             if (send_count > 0 && recv_count != 0) {
218                 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
219                                          recv_count, dtype, peer,
220                                          COLL_TAG_REDUCE_SCATTER,
221                                          comm);
222                 if (MPI_SUCCESS != err) {
223                     xbt_free(tmp_rcounts);
224                     xbt_free(tmp_disps);
225                     goto cleanup;
226                 }                                             
227             }
228             if (recv_count > 0 && send_count != 0) {
229                 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
230                                         send_count, dtype, peer, 
231                                         COLL_TAG_REDUCE_SCATTER,
232                                         comm);
233                 if (MPI_SUCCESS != err) {
234                     xbt_free(tmp_rcounts);
235                     xbt_free(tmp_disps);
236                     goto cleanup;
237                 }                                             
238             }
239             if (send_count > 0 && recv_count != 0) {
240                 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
241             }
242
243             /* if we received something on this step, push it into
244                the results buffer */
245             if (recv_count > 0) {
246                 smpi_op_apply(op, 
247                                recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent, 
248                                result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
249                                &recv_count, &dtype);
250             }
251
252             /* update for next iteration */
253             send_index = recv_index;
254             last_index = recv_index + mask;
255             mask >>= 1;
256         }
257
258         /* copy local results from results buffer into real receive buffer */
259         if (0 != rcounts[rank]) {
260             err = smpi_datatype_copy(result_buf + disps[rank] * extent,
261                                        rcounts[rank], dtype, 
262                                        rbuf, rcounts[rank], dtype);
263             if (MPI_SUCCESS != err) {
264                 xbt_free(tmp_rcounts);
265                 xbt_free(tmp_disps);
266                 goto cleanup;
267             }                                             
268         }
269
270         xbt_free(tmp_rcounts);
271         xbt_free(tmp_disps);
272     }
273
274     /* Now fix up the non-power of two case, by having the odd
275        procs send the even procs the proper results */
276     if (rank < (2 * remain)) {
277         if ((rank & 1) == 0) {
278             if (rcounts[rank]) {
279                 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
280                                         COLL_TAG_REDUCE_SCATTER,
281                                         comm, MPI_STATUS_IGNORE);
282             }
283         } else {
284             if (rcounts[rank - 1]) {
285                 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
286                                         rcounts[rank - 1], dtype, rank - 1,
287                                         COLL_TAG_REDUCE_SCATTER,
288                                         comm);
289             }
290         }            
291     }
292
293  cleanup:
294     if (NULL != disps) xbt_free(disps);
295     if (NULL != recv_buf_free) smpi_free_tmp_buffer(recv_buf_free);
296     if (NULL != result_buf_free) smpi_free_tmp_buffer(result_buf_free);
297
298     return err;
299 }
300
301 /* copied function (with appropriate renaming) ends here */
302
303
304 /*
305  *   smpi_coll_tuned_reduce_scatter_ompi_ring
306  *
307  *   Function:       Ring algorithm for reduce_scatter operation
308  *   Accepts:        Same as MPI_Reduce_scatter()
309  *   Returns:        MPI_SUCCESS or error code
310  *
311  *   Description:    Implements ring algorithm for reduce_scatter: 
312  *                   the block sizes defined in rcounts are exchanged and 
313  8                    updated until they reach proper destination.
314  *                   Algorithm requires 2 * max(rcounts) extra buffering
315  *
316  *   Limitations:    The algorithm DOES NOT preserve order of operations so it 
317  *                   can be used only for commutative operations.
318  *         Example on 5 nodes:
319  *         Initial state
320  *   #      0              1             2              3             4
321  *        [00]           [10]   ->     [20]           [30]           [40]
322  *        [01]           [11]          [21]  ->       [31]           [41]
323  *        [02]           [12]          [22]           [32]  ->       [42]
324  *    ->  [03]           [13]          [23]           [33]           [43] --> ..
325  *        [04]  ->       [14]          [24]           [34]           [44]
326  *
327  *        COMPUTATION PHASE
328  *         Step 0: rank r sends block (r-1) to rank (r+1) and 
329  *                 receives block (r+1) from rank (r-1) [with wraparound].
330  *   #      0              1             2              3             4
331  *        [00]           [10]        [10+20]   ->     [30]           [40]
332  *        [01]           [11]          [21]          [21+31]  ->     [41]
333  *    ->  [02]           [12]          [22]           [32]         [32+42] -->..
334  *      [43+03] ->       [13]          [23]           [33]           [43]
335  *        [04]         [04+14]  ->     [24]           [34]           [44]
336  *         
337  *         Step 1:
338  *   #      0              1             2              3             4
339  *        [00]           [10]        [10+20]       [10+20+30] ->     [40]
340  *    ->  [01]           [11]          [21]          [21+31]      [21+31+41] ->
341  *     [32+42+02] ->     [12]          [22]           [32]         [32+42] 
342  *        [03]        [43+03+13] ->    [23]           [33]           [43]
343  *        [04]         [04+14]      [04+14+24]  ->    [34]           [44]
344  *
345  *         Step 2:
346  *   #      0              1             2              3             4
347  *     -> [00]           [10]        [10+20]       [10+20+30]   [10+20+30+40] ->
348  *   [21+31+41+01]->     [11]          [21]          [21+31]      [21+31+41]
349  *     [32+42+02]   [32+42+02+12]->    [22]           [32]         [32+42] 
350  *        [03]        [43+03+13]   [43+03+13+23]->    [33]           [43]
351  *        [04]         [04+14]      [04+14+24]    [04+14+24+34] ->   [44]
352  *
353  *         Step 3:
354  *   #      0             1              2              3             4
355  * [10+20+30+40+00]     [10]         [10+20]       [10+20+30]   [10+20+30+40]
356  *  [21+31+41+01] [21+31+41+01+11]     [21]          [21+31]      [21+31+41]
357  *    [32+42+02]   [32+42+02+12] [32+42+02+12+22]     [32]         [32+42] 
358  *       [03]        [43+03+13]    [43+03+13+23] [43+03+13+23+33]    [43]
359  *       [04]         [04+14]       [04+14+24]    [04+14+24+34] [04+14+24+34+44]
360  *    DONE :)
361  *
362  */
363 int 
364 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
365                                           MPI_Datatype dtype,
366                                           MPI_Op op,
367                                           MPI_Comm comm
368                                           )
369 {
370     int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
371     int inbi, *displs = NULL;
372     char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
373     char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
374     ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
375     MPI_Request reqs[2] = {NULL, NULL};
376
377     size = smpi_comm_size(comm);
378     rank = smpi_comm_rank(comm);
379
380     XBT_DEBUG(  "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d", 
381                  rank, size);
382
383     /* Determine the maximum number of elements per node, 
384        corresponding block size, and displacements array.
385     */
386     displs = (int*) xbt_malloc(size * sizeof(int));
387     if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
388     displs[0] = 0;
389     total_count = rcounts[0];
390     max_block_count = rcounts[0];
391     for (i = 1; i < size; i++) { 
392         displs[i] = total_count;
393         total_count += rcounts[i];
394         if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
395     }
396       
397     /* Special case for size == 1 */
398     if (1 == size) {
399         if (MPI_IN_PLACE != sbuf) {
400             ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
401             if (ret < 0) { line = __LINE__; goto error_hndl; }
402         }
403         xbt_free(displs);
404         return MPI_SUCCESS;
405     }
406
407     /* Allocate and initialize temporary buffers, we need:
408        - a temporary buffer to perform reduction (size total_count) since
409        rbuf can be of rcounts[rank] size.
410        - up to two temporary buffers used for communication/computation overlap.
411     */
412     smpi_datatype_extent(dtype, &lb, &extent);
413     smpi_datatype_extent(dtype, &true_lb, &true_extent);
414
415     max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
416
417     accumbuf_free = (char*)smpi_get_tmp_recvbuffer(true_extent + (ptrdiff_t)(total_count - 1) * extent);
418     if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
419     accumbuf = accumbuf_free - lb;
420
421     inbuf_free[0] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize);
422     if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
423     inbuf[0] = inbuf_free[0] - lb;
424     if (size > 2) {
425         inbuf_free[1] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize);
426         if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
427         inbuf[1] = inbuf_free[1] - lb;
428     }
429
430     /* Handle MPI_IN_PLACE for size > 1 */
431     if (MPI_IN_PLACE == sbuf) {
432         sbuf = rbuf;
433     }
434
435     ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
436     if (ret < 0) { line = __LINE__; goto error_hndl; }
437
438     /* Computation loop */
439
440     /* 
441        For each of the remote nodes:
442        - post irecv for block (r-2) from (r-1) with wrap around
443        - send block (r-1) to (r+1)
444        - in loop for every step k = 2 .. n
445        - post irecv for block (r - 1 + n - k) % n
446        - wait on block (r + n - k) % n to arrive
447        - compute on block (r + n - k ) % n
448        - send block (r + n - k) % n
449        - wait on block (r)
450        - compute on block (r)
451        - copy block (r) to rbuf
452        Note that we must be careful when computing the begining of buffers and
453        for send operations and computation we must compute the exact block size.
454     */
455     send_to = (rank + 1) % size;
456     recv_from = (rank + size - 1) % size;
457
458     inbi = 0;
459     /* Initialize first receive from the neighbor on the left */
460     reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
461                              COLL_TAG_REDUCE_SCATTER, comm
462                              );
463     tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
464     smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
465                             COLL_TAG_REDUCE_SCATTER,
466                              comm);
467
468     for (k = 2; k < size; k++) {
469         const int prevblock = (rank + size - k) % size;
470       
471         inbi = inbi ^ 0x1;
472
473         /* Post irecv for the current block */
474         reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
475                                  COLL_TAG_REDUCE_SCATTER, comm
476                                  );
477       
478         /* Wait on previous block to arrive */
479         smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
480       
481         /* Apply operation on previous block: result goes to rbuf
482            rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
483         */
484         tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
485         smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
486       
487         /* send previous block to send_to */
488         smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
489                                 COLL_TAG_REDUCE_SCATTER,
490                                  comm);
491     }
492
493     /* Wait on the last block to arrive */
494     smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
495
496     /* Apply operation on the last block (my block)
497        rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
498     tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
499     smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
500    
501     /* Copy result from tmprecv to rbuf */
502     ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
503     if (ret < 0) { line = __LINE__; goto error_hndl; }
504
505     if (NULL != displs) xbt_free(displs);
506     if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free);
507     if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]);
508     if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]);
509
510     return MPI_SUCCESS;
511
512  error_hndl:
513     XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
514                  __FILE__, line, rank, ret);
515     if (NULL != displs) xbt_free(displs);
516     if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free);
517     if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]);
518     if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]);
519     return ret;
520 }
521