Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e188c0037019cf5d4779d2af827339197e353854
[simgrid.git] / src / smpi / colls / reduce_scatter-ompi.c
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /*
8  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9  *                         University Research and Technology
10  *                         Corporation.  All rights reserved.
11  * Copyright (c) 2004-2012 The University of Tennessee and The University
12  *                         of Tennessee Research Foundation.  All rights
13  *                         reserved.
14  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15  *                         University of Stuttgart.  All rights reserved.
16  * Copyright (c) 2004-2005 The Regents of the University of California.
17  *                         All rights reserved.
18  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
19  * Copyright (c) 2009      University of Houston. All rights reserved.
20  *
21  * Additional copyrights may follow
22  */
23
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26 #include "xbt/replay.h"
27
28 /*
29  * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
30  * I have removed the part which handles "large" message sizes 
31  * (non-overlapping version of reduce_Scatter).
32  */
33
34 /* copied function (with appropriate renaming) starts here */
35
36 /*
37  *  reduce_scatter_ompi_basic_recursivehalving
38  *
39  *  Function:   - reduce scatter implementation using recursive-halving 
40  *                algorithm
41  *  Accepts:    - same as MPI_Reduce_scatter()
42  *  Returns:    - MPI_SUCCESS or error code
43  *  Limitation: - Works only for commutative operations.
44  */
45 int
46 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf, 
47                                                             void *rbuf, 
48                                                             int *rcounts,
49                                                             MPI_Datatype dtype,
50                                                             MPI_Op op,
51                                                             MPI_Comm comm
52                                                             )
53 {
54     int i, rank, size, count, err = MPI_SUCCESS;
55     int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
56     ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
57     char *recv_buf = NULL, *recv_buf_free = NULL;
58     char *result_buf = NULL, *result_buf_free = NULL;
59    
60     /* Initialize */
61     rank = smpi_comm_rank(comm);
62     size = smpi_comm_size(comm);
63    
64     XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
65     if(!smpi_op_is_commute(op))
66       THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
67
68     /* Find displacements and the like */
69     disps = (int*) xbt_malloc(sizeof(int) * size);
70     if (NULL == disps) return MPI_ERR_OTHER;
71
72     disps[0] = 0;
73     for (i = 0; i < (size - 1); ++i) {
74         disps[i + 1] = disps[i] + rcounts[i];
75     }
76     count = disps[size - 1] + rcounts[size - 1];
77
78     /* short cut the trivial case */
79     if (0 == count) {
80         xbt_free(disps);
81         return MPI_SUCCESS;
82     }
83
84     /* get datatype information */
85     smpi_datatype_extent(dtype, &lb, &extent);
86     smpi_datatype_extent(dtype, &true_lb, &true_extent);
87     buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
88
89     /* Handle MPI_IN_PLACE */
90     if (MPI_IN_PLACE == sbuf) {
91         sbuf = rbuf;
92     }
93
94     /* Allocate temporary receive buffer. */
95 #ifndef WIN32
96     if(_xbt_replay_is_active()){
97       recv_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
98     }else
99 #endif
100       recv_buf_free = (char*) xbt_malloc(buf_size);
101
102     recv_buf = recv_buf_free - lb;
103     if (NULL == recv_buf_free) {
104         err = MPI_ERR_OTHER;
105         goto cleanup;
106     }
107    
108     /* allocate temporary buffer for results */
109 #ifndef WIN32
110     if(_xbt_replay_is_active()){
111       result_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
112     }else
113 #endif
114       result_buf_free = (char*) xbt_malloc(buf_size);
115
116     result_buf = result_buf_free - lb;
117    
118     /* copy local buffer into the temporary results */
119     err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
120     if (MPI_SUCCESS != err) goto cleanup;
121    
122     /* figure out power of two mapping: grow until larger than
123        comm size, then go back one, to get the largest power of
124        two less than comm size */
125     while (tmp_size <= size) tmp_size <<= 1;
126     tmp_size >>= 1;
127     remain = size - tmp_size;
128    
129     /* If comm size is not a power of two, have the first "remain"
130        procs with an even rank send to rank + 1, leaving a power of
131        two procs to do the rest of the algorithm */
132     if (rank < 2 * remain) {
133         if ((rank & 1) == 0) {
134             smpi_mpi_send(result_buf, count, dtype, rank + 1, 
135                                     COLL_TAG_REDUCE_SCATTER,
136                                     comm);
137             /* we don't participate from here on out */
138             tmp_rank = -1;
139         } else {
140             smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
141                                     COLL_TAG_REDUCE_SCATTER,
142                                     comm, MPI_STATUS_IGNORE);
143          
144             /* integrate their results into our temp results */
145             smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
146          
147             /* adjust rank to be the bottom "remain" ranks */
148             tmp_rank = rank / 2;
149         }
150     } else {
151         /* just need to adjust rank to show that the bottom "even
152            remain" ranks dropped out */
153         tmp_rank = rank - remain;
154     }
155    
156     /* For ranks not kicked out by the above code, perform the
157        recursive halving */
158     if (tmp_rank >= 0) {
159         int *tmp_disps = NULL, *tmp_rcounts = NULL;
160         int mask, send_index, recv_index, last_index;
161       
162         /* recalculate disps and rcounts to account for the
163            special "remainder" processes that are no longer doing
164            anything */
165         tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
166         if (NULL == tmp_rcounts) {
167             err = MPI_ERR_OTHER;
168             goto cleanup;
169         }
170         tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
171         if (NULL == tmp_disps) {
172             xbt_free(tmp_rcounts);
173             err = MPI_ERR_OTHER;
174             goto cleanup;
175         }
176
177         for (i = 0 ; i < tmp_size ; ++i) {
178             if (i < remain) {
179                 /* need to include old neighbor as well */
180                 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
181             } else {
182                 tmp_rcounts[i] = rcounts[i + remain];
183             }
184         }
185
186         tmp_disps[0] = 0;
187         for (i = 0; i < tmp_size - 1; ++i) {
188             tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
189         }
190
191         /* do the recursive halving communication.  Don't use the
192            dimension information on the communicator because I
193            think the information is invalidated by our "shrinking"
194            of the communicator */
195         mask = tmp_size >> 1;
196         send_index = recv_index = 0;
197         last_index = tmp_size;
198         while (mask > 0) {
199             int tmp_peer, peer, send_count, recv_count;
200             MPI_Request request;
201
202             tmp_peer = tmp_rank ^ mask;
203             peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
204
205             /* figure out if we're sending, receiving, or both */
206             send_count = recv_count = 0;
207             if (tmp_rank < tmp_peer) {
208                 send_index = recv_index + mask;
209                 for (i = send_index ; i < last_index ; ++i) {
210                     send_count += tmp_rcounts[i];
211                 }
212                 for (i = recv_index ; i < send_index ; ++i) {
213                     recv_count += tmp_rcounts[i];
214                 }
215             } else {
216                 recv_index = send_index + mask;
217                 for (i = send_index ; i < recv_index ; ++i) {
218                     send_count += tmp_rcounts[i];
219                 }
220                 for (i = recv_index ; i < last_index ; ++i) {
221                     recv_count += tmp_rcounts[i];
222                 }
223             }
224
225             /* actual data transfer.  Send from result_buf,
226                receive into recv_buf */
227             if (send_count > 0 && recv_count != 0) {
228                 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
229                                          recv_count, dtype, peer,
230                                          COLL_TAG_REDUCE_SCATTER,
231                                          comm);
232                 if (MPI_SUCCESS != err) {
233                     xbt_free(tmp_rcounts);
234                     xbt_free(tmp_disps);
235                     goto cleanup;
236                 }                                             
237             }
238             if (recv_count > 0 && send_count != 0) {
239                 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
240                                         send_count, dtype, peer, 
241                                         COLL_TAG_REDUCE_SCATTER,
242                                         comm);
243                 if (MPI_SUCCESS != err) {
244                     xbt_free(tmp_rcounts);
245                     xbt_free(tmp_disps);
246                     goto cleanup;
247                 }                                             
248             }
249             if (send_count > 0 && recv_count != 0) {
250                 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
251             }
252
253             /* if we received something on this step, push it into
254                the results buffer */
255             if (recv_count > 0) {
256                 smpi_op_apply(op, 
257                                recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent, 
258                                result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
259                                &recv_count, &dtype);
260             }
261
262             /* update for next iteration */
263             send_index = recv_index;
264             last_index = recv_index + mask;
265             mask >>= 1;
266         }
267
268         /* copy local results from results buffer into real receive buffer */
269         if (0 != rcounts[rank]) {
270             err = smpi_datatype_copy(result_buf + disps[rank] * extent,
271                                        rcounts[rank], dtype, 
272                                        rbuf, rcounts[rank], dtype);
273             if (MPI_SUCCESS != err) {
274                 xbt_free(tmp_rcounts);
275                 xbt_free(tmp_disps);
276                 goto cleanup;
277             }                                             
278         }
279
280         xbt_free(tmp_rcounts);
281         xbt_free(tmp_disps);
282     }
283
284     /* Now fix up the non-power of two case, by having the odd
285        procs send the even procs the proper results */
286     if (rank < (2 * remain)) {
287         if ((rank & 1) == 0) {
288             if (rcounts[rank]) {
289                 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
290                                         COLL_TAG_REDUCE_SCATTER,
291                                         comm, MPI_STATUS_IGNORE);
292             }
293         } else {
294             if (rcounts[rank - 1]) {
295                 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
296                                         rcounts[rank - 1], dtype, rank - 1,
297                                         COLL_TAG_REDUCE_SCATTER,
298                                         comm);
299             }
300         }            
301     }
302
303  cleanup:
304     if (NULL != disps) xbt_free(disps);
305     if (!_xbt_replay_is_active()){
306       if (NULL != recv_buf_free) xbt_free(recv_buf_free);
307       if (NULL != result_buf_free) xbt_free(result_buf_free);
308     }
309 #ifndef WIN32
310     else{
311       if (NULL != recv_buf_free) SMPI_SHARED_FREE(recv_buf_free);
312       if (NULL != result_buf_free) SMPI_SHARED_FREE(result_buf_free);
313     }
314 #endif
315     return err;
316 }
317
318 /* copied function (with appropriate renaming) ends here */
319
320
321 /*
322  *   smpi_coll_tuned_reduce_scatter_ompi_ring
323  *
324  *   Function:       Ring algorithm for reduce_scatter operation
325  *   Accepts:        Same as MPI_Reduce_scatter()
326  *   Returns:        MPI_SUCCESS or error code
327  *
328  *   Description:    Implements ring algorithm for reduce_scatter: 
329  *                   the block sizes defined in rcounts are exchanged and 
330  8                    updated until they reach proper destination.
331  *                   Algorithm requires 2 * max(rcounts) extra buffering
332  *
333  *   Limitations:    The algorithm DOES NOT preserve order of operations so it 
334  *                   can be used only for commutative operations.
335  *         Example on 5 nodes:
336  *         Initial state
337  *   #      0              1             2              3             4
338  *        [00]           [10]   ->     [20]           [30]           [40]
339  *        [01]           [11]          [21]  ->       [31]           [41]
340  *        [02]           [12]          [22]           [32]  ->       [42]
341  *    ->  [03]           [13]          [23]           [33]           [43] --> ..
342  *        [04]  ->       [14]          [24]           [34]           [44]
343  *
344  *        COMPUTATION PHASE
345  *         Step 0: rank r sends block (r-1) to rank (r+1) and 
346  *                 receives block (r+1) from rank (r-1) [with wraparound].
347  *   #      0              1             2              3             4
348  *        [00]           [10]        [10+20]   ->     [30]           [40]
349  *        [01]           [11]          [21]          [21+31]  ->     [41]
350  *    ->  [02]           [12]          [22]           [32]         [32+42] -->..
351  *      [43+03] ->       [13]          [23]           [33]           [43]
352  *        [04]         [04+14]  ->     [24]           [34]           [44]
353  *         
354  *         Step 1:
355  *   #      0              1             2              3             4
356  *        [00]           [10]        [10+20]       [10+20+30] ->     [40]
357  *    ->  [01]           [11]          [21]          [21+31]      [21+31+41] ->
358  *     [32+42+02] ->     [12]          [22]           [32]         [32+42] 
359  *        [03]        [43+03+13] ->    [23]           [33]           [43]
360  *        [04]         [04+14]      [04+14+24]  ->    [34]           [44]
361  *
362  *         Step 2:
363  *   #      0              1             2              3             4
364  *     -> [00]           [10]        [10+20]       [10+20+30]   [10+20+30+40] ->
365  *   [21+31+41+01]->     [11]          [21]          [21+31]      [21+31+41]
366  *     [32+42+02]   [32+42+02+12]->    [22]           [32]         [32+42] 
367  *        [03]        [43+03+13]   [43+03+13+23]->    [33]           [43]
368  *        [04]         [04+14]      [04+14+24]    [04+14+24+34] ->   [44]
369  *
370  *         Step 3:
371  *   #      0             1              2              3             4
372  * [10+20+30+40+00]     [10]         [10+20]       [10+20+30]   [10+20+30+40]
373  *  [21+31+41+01] [21+31+41+01+11]     [21]          [21+31]      [21+31+41]
374  *    [32+42+02]   [32+42+02+12] [32+42+02+12+22]     [32]         [32+42] 
375  *       [03]        [43+03+13]    [43+03+13+23] [43+03+13+23+33]    [43]
376  *       [04]         [04+14]       [04+14+24]    [04+14+24+34] [04+14+24+34+44]
377  *    DONE :)
378  *
379  */
380 int 
381 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
382                                           MPI_Datatype dtype,
383                                           MPI_Op op,
384                                           MPI_Comm comm
385                                           )
386 {
387     int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
388     int inbi, *displs = NULL;
389     char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
390     char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
391     ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
392     MPI_Request reqs[2] = {NULL, NULL};
393
394     size = smpi_comm_size(comm);
395     rank = smpi_comm_rank(comm);
396
397     XBT_DEBUG(  "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d", 
398                  rank, size);
399
400     /* Determine the maximum number of elements per node, 
401        corresponding block size, and displacements array.
402     */
403     displs = (int*) xbt_malloc(size * sizeof(int));
404     if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
405     displs[0] = 0;
406     total_count = rcounts[0];
407     max_block_count = rcounts[0];
408     for (i = 1; i < size; i++) { 
409         displs[i] = total_count;
410         total_count += rcounts[i];
411         if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
412     }
413       
414     /* Special case for size == 1 */
415     if (1 == size) {
416         if (MPI_IN_PLACE != sbuf) {
417             ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
418             if (ret < 0) { line = __LINE__; goto error_hndl; }
419         }
420         xbt_free(displs);
421         return MPI_SUCCESS;
422     }
423
424     /* Allocate and initialize temporary buffers, we need:
425        - a temporary buffer to perform reduction (size total_count) since
426        rbuf can be of rcounts[rank] size.
427        - up to two temporary buffers used for communication/computation overlap.
428     */
429     smpi_datatype_extent(dtype, &lb, &extent);
430     smpi_datatype_extent(dtype, &true_lb, &true_extent);
431
432     max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
433
434     accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
435     if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
436     accumbuf = accumbuf_free - lb;
437
438     inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
439     if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
440     inbuf[0] = inbuf_free[0] - lb;
441     if (size > 2) {
442         inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
443         if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
444         inbuf[1] = inbuf_free[1] - lb;
445     }
446
447     /* Handle MPI_IN_PLACE for size > 1 */
448     if (MPI_IN_PLACE == sbuf) {
449         sbuf = rbuf;
450     }
451
452     ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
453     if (ret < 0) { line = __LINE__; goto error_hndl; }
454
455     /* Computation loop */
456
457     /* 
458        For each of the remote nodes:
459        - post irecv for block (r-2) from (r-1) with wrap around
460        - send block (r-1) to (r+1)
461        - in loop for every step k = 2 .. n
462        - post irecv for block (r - 1 + n - k) % n
463        - wait on block (r + n - k) % n to arrive
464        - compute on block (r + n - k ) % n
465        - send block (r + n - k) % n
466        - wait on block (r)
467        - compute on block (r)
468        - copy block (r) to rbuf
469        Note that we must be careful when computing the begining of buffers and
470        for send operations and computation we must compute the exact block size.
471     */
472     send_to = (rank + 1) % size;
473     recv_from = (rank + size - 1) % size;
474
475     inbi = 0;
476     /* Initialize first receive from the neighbor on the left */
477     reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
478                              COLL_TAG_REDUCE_SCATTER, comm
479                              );
480     tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
481     smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
482                             COLL_TAG_REDUCE_SCATTER,
483                              comm);
484
485     for (k = 2; k < size; k++) {
486         const int prevblock = (rank + size - k) % size;
487       
488         inbi = inbi ^ 0x1;
489
490         /* Post irecv for the current block */
491         reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
492                                  COLL_TAG_REDUCE_SCATTER, comm
493                                  );
494       
495         /* Wait on previous block to arrive */
496         smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
497       
498         /* Apply operation on previous block: result goes to rbuf
499            rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
500         */
501         tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
502         smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
503       
504         /* send previous block to send_to */
505         smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
506                                 COLL_TAG_REDUCE_SCATTER,
507                                  comm);
508     }
509
510     /* Wait on the last block to arrive */
511     smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
512
513     /* Apply operation on the last block (my block)
514        rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
515     tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
516     smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
517    
518     /* Copy result from tmprecv to rbuf */
519     ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
520     if (ret < 0) { line = __LINE__; goto error_hndl; }
521
522     if (NULL != displs) xbt_free(displs);
523     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
524     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
525     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
526
527     return MPI_SUCCESS;
528
529  error_hndl:
530     XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
531                  __FILE__, line, rank, ret);
532     if (NULL != displs) xbt_free(displs);
533     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
534     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
535     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
536     return ret;
537 }
538