Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Win build does not support mmap, or shared_malloc
[simgrid.git] / src / smpi / colls / reduce_scatter-ompi.c
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /*
8  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9  *                         University Research and Technology
10  *                         Corporation.  All rights reserved.
11  * Copyright (c) 2004-2012 The University of Tennessee and The University
12  *                         of Tennessee Research Foundation.  All rights
13  *                         reserved.
14  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15  *                         University of Stuttgart.  All rights reserved.
16  * Copyright (c) 2004-2005 The Regents of the University of California.
17  *                         All rights reserved.
18  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
19  * Copyright (c) 2009      University of Houston. All rights reserved.
20  *
21  * Additional copyrights may follow
22  */
23
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26 #include "xbt/replay.h"
27
28 /*
29  * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
30  * I have removed the part which handles "large" message sizes 
31  * (non-overlapping version of reduce_Scatter).
32  */
33
34 /* copied function (with appropriate renaming) starts here */
35
36 /*
37  *  reduce_scatter_ompi_basic_recursivehalving
38  *
39  *  Function:   - reduce scatter implementation using recursive-halving 
40  *                algorithm
41  *  Accepts:    - same as MPI_Reduce_scatter()
42  *  Returns:    - MPI_SUCCESS or error code
43  *  Limitation: - Works only for commutative operations.
44  */
45 int
46 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf, 
47                                                             void *rbuf, 
48                                                             int *rcounts,
49                                                             MPI_Datatype dtype,
50                                                             MPI_Op op,
51                                                             MPI_Comm comm
52                                                             )
53 {
54     int i, rank, size, count, err = MPI_SUCCESS;
55     int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
56     ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
57     char *recv_buf = NULL, *recv_buf_free = NULL;
58     char *result_buf = NULL, *result_buf_free = NULL;
59    
60     /* Initialize */
61     rank = smpi_comm_rank(comm);
62     size = smpi_comm_size(comm);
63    
64     XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
65     if(!smpi_op_is_commute(op))
66       THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
67
68     /* Find displacements and the like */
69     disps = (int*) xbt_malloc(sizeof(int) * size);
70     if (NULL == disps) return MPI_ERR_OTHER;
71
72     disps[0] = 0;
73     for (i = 0; i < (size - 1); ++i) {
74         disps[i + 1] = disps[i] + rcounts[i];
75     }
76     count = disps[size - 1] + rcounts[size - 1];
77
78     /* short cut the trivial case */
79     if (0 == count) {
80         xbt_free(disps);
81         return MPI_SUCCESS;
82     }
83
84     /* get datatype information */
85     smpi_datatype_extent(dtype, &lb, &extent);
86     smpi_datatype_extent(dtype, &true_lb, &true_extent);
87     buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
88
89     /* Handle MPI_IN_PLACE */
90     if (MPI_IN_PLACE == sbuf) {
91         sbuf = rbuf;
92     }
93
94     /* Allocate temporary receive buffer. */
95 #ifndef WIN32
96     if(_xbt_replay_is_active()){
97       recv_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
98     }else
99 #else
100       recv_buf_free = (char*) xbt_malloc(buf_size);
101 #endif
102     recv_buf = recv_buf_free - lb;
103     if (NULL == recv_buf_free) {
104         err = MPI_ERR_OTHER;
105         goto cleanup;
106     }
107    
108     /* allocate temporary buffer for results */
109 #ifndef WIN32
110     if(_xbt_replay_is_active()){
111       result_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
112     }else
113 #else
114       result_buf_free = (char*) xbt_malloc(buf_size);
115 #endif
116     result_buf = result_buf_free - lb;
117    
118     /* copy local buffer into the temporary results */
119     err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
120     if (MPI_SUCCESS != err) goto cleanup;
121    
122     /* figure out power of two mapping: grow until larger than
123        comm size, then go back one, to get the largest power of
124        two less than comm size */
125     while (tmp_size <= size) tmp_size <<= 1;
126     tmp_size >>= 1;
127     remain = size - tmp_size;
128    
129     /* If comm size is not a power of two, have the first "remain"
130        procs with an even rank send to rank + 1, leaving a power of
131        two procs to do the rest of the algorithm */
132     if (rank < 2 * remain) {
133         if ((rank & 1) == 0) {
134             smpi_mpi_send(result_buf, count, dtype, rank + 1, 
135                                     COLL_TAG_REDUCE_SCATTER,
136                                     comm);
137             /* we don't participate from here on out */
138             tmp_rank = -1;
139         } else {
140             smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
141                                     COLL_TAG_REDUCE_SCATTER,
142                                     comm, MPI_STATUS_IGNORE);
143          
144             /* integrate their results into our temp results */
145             smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
146          
147             /* adjust rank to be the bottom "remain" ranks */
148             tmp_rank = rank / 2;
149         }
150     } else {
151         /* just need to adjust rank to show that the bottom "even
152            remain" ranks dropped out */
153         tmp_rank = rank - remain;
154     }
155    
156     /* For ranks not kicked out by the above code, perform the
157        recursive halving */
158     if (tmp_rank >= 0) {
159         int *tmp_disps = NULL, *tmp_rcounts = NULL;
160         int mask, send_index, recv_index, last_index;
161       
162         /* recalculate disps and rcounts to account for the
163            special "remainder" processes that are no longer doing
164            anything */
165         tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
166         if (NULL == tmp_rcounts) {
167             err = MPI_ERR_OTHER;
168             goto cleanup;
169         }
170         tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
171         if (NULL == tmp_disps) {
172             xbt_free(tmp_rcounts);
173             err = MPI_ERR_OTHER;
174             goto cleanup;
175         }
176
177         for (i = 0 ; i < tmp_size ; ++i) {
178             if (i < remain) {
179                 /* need to include old neighbor as well */
180                 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
181             } else {
182                 tmp_rcounts[i] = rcounts[i + remain];
183             }
184         }
185
186         tmp_disps[0] = 0;
187         for (i = 0; i < tmp_size - 1; ++i) {
188             tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
189         }
190
191         /* do the recursive halving communication.  Don't use the
192            dimension information on the communicator because I
193            think the information is invalidated by our "shrinking"
194            of the communicator */
195         mask = tmp_size >> 1;
196         send_index = recv_index = 0;
197         last_index = tmp_size;
198         while (mask > 0) {
199             int tmp_peer, peer, send_count, recv_count;
200             MPI_Request request;
201
202             tmp_peer = tmp_rank ^ mask;
203             peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
204
205             /* figure out if we're sending, receiving, or both */
206             send_count = recv_count = 0;
207             if (tmp_rank < tmp_peer) {
208                 send_index = recv_index + mask;
209                 for (i = send_index ; i < last_index ; ++i) {
210                     send_count += tmp_rcounts[i];
211                 }
212                 for (i = recv_index ; i < send_index ; ++i) {
213                     recv_count += tmp_rcounts[i];
214                 }
215             } else {
216                 recv_index = send_index + mask;
217                 for (i = send_index ; i < recv_index ; ++i) {
218                     send_count += tmp_rcounts[i];
219                 }
220                 for (i = recv_index ; i < last_index ; ++i) {
221                     recv_count += tmp_rcounts[i];
222                 }
223             }
224
225             /* actual data transfer.  Send from result_buf,
226                receive into recv_buf */
227             if (send_count > 0 && recv_count != 0) {
228                 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
229                                          recv_count, dtype, peer,
230                                          COLL_TAG_REDUCE_SCATTER,
231                                          comm);
232                 if (MPI_SUCCESS != err) {
233                     xbt_free(tmp_rcounts);
234                     xbt_free(tmp_disps);
235                     goto cleanup;
236                 }                                             
237             }
238             if (recv_count > 0 && send_count != 0) {
239                 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
240                                         send_count, dtype, peer, 
241                                         COLL_TAG_REDUCE_SCATTER,
242                                         comm);
243                 if (MPI_SUCCESS != err) {
244                     xbt_free(tmp_rcounts);
245                     xbt_free(tmp_disps);
246                     goto cleanup;
247                 }                                             
248             }
249             if (send_count > 0 && recv_count != 0) {
250                 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
251             }
252
253             /* if we received something on this step, push it into
254                the results buffer */
255             if (recv_count > 0) {
256                 smpi_op_apply(op, 
257                                recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent, 
258                                result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
259                                &recv_count, &dtype);
260             }
261
262             /* update for next iteration */
263             send_index = recv_index;
264             last_index = recv_index + mask;
265             mask >>= 1;
266         }
267
268         /* copy local results from results buffer into real receive buffer */
269         if (0 != rcounts[rank]) {
270             err = smpi_datatype_copy(result_buf + disps[rank] * extent,
271                                        rcounts[rank], dtype, 
272                                        rbuf, rcounts[rank], dtype);
273             if (MPI_SUCCESS != err) {
274                 xbt_free(tmp_rcounts);
275                 xbt_free(tmp_disps);
276                 goto cleanup;
277             }                                             
278         }
279
280         xbt_free(tmp_rcounts);
281         xbt_free(tmp_disps);
282     }
283
284     /* Now fix up the non-power of two case, by having the odd
285        procs send the even procs the proper results */
286     if (rank < (2 * remain)) {
287         if ((rank & 1) == 0) {
288             if (rcounts[rank]) {
289                 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
290                                         COLL_TAG_REDUCE_SCATTER,
291                                         comm, MPI_STATUS_IGNORE);
292             }
293         } else {
294             if (rcounts[rank - 1]) {
295                 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
296                                         rcounts[rank - 1], dtype, rank - 1,
297                                         COLL_TAG_REDUCE_SCATTER,
298                                         comm);
299             }
300         }            
301     }
302
303  cleanup:
304     if (NULL != disps) xbt_free(disps);
305 #ifdef WIN32
306     if (!_xbt_replay_is_active()){
307       if (NULL != recv_buf_free) xbt_free(recv_buf_free);
308       if (NULL != result_buf_free) xbt_free(result_buf_free);
309     }else{
310 #else
311       if (NULL != recv_buf_free) SMPI_SHARED_FREE(recv_buf_free);
312       if (NULL != result_buf_free) SMPI_SHARED_FREE(result_buf_free);
313 #endif
314 #ifdef WIN32
315     }
316 #endif
317     return err;
318 }
319
320 /* copied function (with appropriate renaming) ends here */
321
322
323 /*
324  *   smpi_coll_tuned_reduce_scatter_ompi_ring
325  *
326  *   Function:       Ring algorithm for reduce_scatter operation
327  *   Accepts:        Same as MPI_Reduce_scatter()
328  *   Returns:        MPI_SUCCESS or error code
329  *
330  *   Description:    Implements ring algorithm for reduce_scatter: 
331  *                   the block sizes defined in rcounts are exchanged and 
332  8                    updated until they reach proper destination.
333  *                   Algorithm requires 2 * max(rcounts) extra buffering
334  *
335  *   Limitations:    The algorithm DOES NOT preserve order of operations so it 
336  *                   can be used only for commutative operations.
337  *         Example on 5 nodes:
338  *         Initial state
339  *   #      0              1             2              3             4
340  *        [00]           [10]   ->     [20]           [30]           [40]
341  *        [01]           [11]          [21]  ->       [31]           [41]
342  *        [02]           [12]          [22]           [32]  ->       [42]
343  *    ->  [03]           [13]          [23]           [33]           [43] --> ..
344  *        [04]  ->       [14]          [24]           [34]           [44]
345  *
346  *        COMPUTATION PHASE
347  *         Step 0: rank r sends block (r-1) to rank (r+1) and 
348  *                 receives block (r+1) from rank (r-1) [with wraparound].
349  *   #      0              1             2              3             4
350  *        [00]           [10]        [10+20]   ->     [30]           [40]
351  *        [01]           [11]          [21]          [21+31]  ->     [41]
352  *    ->  [02]           [12]          [22]           [32]         [32+42] -->..
353  *      [43+03] ->       [13]          [23]           [33]           [43]
354  *        [04]         [04+14]  ->     [24]           [34]           [44]
355  *         
356  *         Step 1:
357  *   #      0              1             2              3             4
358  *        [00]           [10]        [10+20]       [10+20+30] ->     [40]
359  *    ->  [01]           [11]          [21]          [21+31]      [21+31+41] ->
360  *     [32+42+02] ->     [12]          [22]           [32]         [32+42] 
361  *        [03]        [43+03+13] ->    [23]           [33]           [43]
362  *        [04]         [04+14]      [04+14+24]  ->    [34]           [44]
363  *
364  *         Step 2:
365  *   #      0              1             2              3             4
366  *     -> [00]           [10]        [10+20]       [10+20+30]   [10+20+30+40] ->
367  *   [21+31+41+01]->     [11]          [21]          [21+31]      [21+31+41]
368  *     [32+42+02]   [32+42+02+12]->    [22]           [32]         [32+42] 
369  *        [03]        [43+03+13]   [43+03+13+23]->    [33]           [43]
370  *        [04]         [04+14]      [04+14+24]    [04+14+24+34] ->   [44]
371  *
372  *         Step 3:
373  *   #      0             1              2              3             4
374  * [10+20+30+40+00]     [10]         [10+20]       [10+20+30]   [10+20+30+40]
375  *  [21+31+41+01] [21+31+41+01+11]     [21]          [21+31]      [21+31+41]
376  *    [32+42+02]   [32+42+02+12] [32+42+02+12+22]     [32]         [32+42] 
377  *       [03]        [43+03+13]    [43+03+13+23] [43+03+13+23+33]    [43]
378  *       [04]         [04+14]       [04+14+24]    [04+14+24+34] [04+14+24+34+44]
379  *    DONE :)
380  *
381  */
382 int 
383 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
384                                           MPI_Datatype dtype,
385                                           MPI_Op op,
386                                           MPI_Comm comm
387                                           )
388 {
389     int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
390     int inbi, *displs = NULL;
391     char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
392     char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
393     ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
394     MPI_Request reqs[2] = {NULL, NULL};
395
396     size = smpi_comm_size(comm);
397     rank = smpi_comm_rank(comm);
398
399     XBT_DEBUG(  "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d", 
400                  rank, size);
401
402     /* Determine the maximum number of elements per node, 
403        corresponding block size, and displacements array.
404     */
405     displs = (int*) xbt_malloc(size * sizeof(int));
406     if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
407     displs[0] = 0;
408     total_count = rcounts[0];
409     max_block_count = rcounts[0];
410     for (i = 1; i < size; i++) { 
411         displs[i] = total_count;
412         total_count += rcounts[i];
413         if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
414     }
415       
416     /* Special case for size == 1 */
417     if (1 == size) {
418         if (MPI_IN_PLACE != sbuf) {
419             ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
420             if (ret < 0) { line = __LINE__; goto error_hndl; }
421         }
422         xbt_free(displs);
423         return MPI_SUCCESS;
424     }
425
426     /* Allocate and initialize temporary buffers, we need:
427        - a temporary buffer to perform reduction (size total_count) since
428        rbuf can be of rcounts[rank] size.
429        - up to two temporary buffers used for communication/computation overlap.
430     */
431     smpi_datatype_extent(dtype, &lb, &extent);
432     smpi_datatype_extent(dtype, &true_lb, &true_extent);
433
434     max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
435
436     accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
437     if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
438     accumbuf = accumbuf_free - lb;
439
440     inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
441     if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
442     inbuf[0] = inbuf_free[0] - lb;
443     if (size > 2) {
444         inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
445         if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
446         inbuf[1] = inbuf_free[1] - lb;
447     }
448
449     /* Handle MPI_IN_PLACE for size > 1 */
450     if (MPI_IN_PLACE == sbuf) {
451         sbuf = rbuf;
452     }
453
454     ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
455     if (ret < 0) { line = __LINE__; goto error_hndl; }
456
457     /* Computation loop */
458
459     /* 
460        For each of the remote nodes:
461        - post irecv for block (r-2) from (r-1) with wrap around
462        - send block (r-1) to (r+1)
463        - in loop for every step k = 2 .. n
464        - post irecv for block (r - 1 + n - k) % n
465        - wait on block (r + n - k) % n to arrive
466        - compute on block (r + n - k ) % n
467        - send block (r + n - k) % n
468        - wait on block (r)
469        - compute on block (r)
470        - copy block (r) to rbuf
471        Note that we must be careful when computing the begining of buffers and
472        for send operations and computation we must compute the exact block size.
473     */
474     send_to = (rank + 1) % size;
475     recv_from = (rank + size - 1) % size;
476
477     inbi = 0;
478     /* Initialize first receive from the neighbor on the left */
479     reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
480                              COLL_TAG_REDUCE_SCATTER, comm
481                              );
482     tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
483     smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
484                             COLL_TAG_REDUCE_SCATTER,
485                              comm);
486
487     for (k = 2; k < size; k++) {
488         const int prevblock = (rank + size - k) % size;
489       
490         inbi = inbi ^ 0x1;
491
492         /* Post irecv for the current block */
493         reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
494                                  COLL_TAG_REDUCE_SCATTER, comm
495                                  );
496       
497         /* Wait on previous block to arrive */
498         smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
499       
500         /* Apply operation on previous block: result goes to rbuf
501            rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
502         */
503         tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
504         smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
505       
506         /* send previous block to send_to */
507         smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
508                                 COLL_TAG_REDUCE_SCATTER,
509                                  comm);
510     }
511
512     /* Wait on the last block to arrive */
513     smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
514
515     /* Apply operation on the last block (my block)
516        rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
517     tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
518     smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
519    
520     /* Copy result from tmprecv to rbuf */
521     ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
522     if (ret < 0) { line = __LINE__; goto error_hndl; }
523
524     if (NULL != displs) xbt_free(displs);
525     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
526     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
527     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
528
529     return MPI_SUCCESS;
530
531  error_hndl:
532     XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
533                  __FILE__, line, rank, ret);
534     if (NULL != displs) xbt_free(displs);
535     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
536     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
537     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
538     return ret;
539 }
540