Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Move all smpi colls to cpp.
[simgrid.git] / src / smpi / colls / bcast-ompi-pipeline.cpp
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7  #include "colls_private.h"
8  #include "coll_tuned_topo.h"
9
10 #define MAXTREEFANOUT 32
11
12
13 int smpi_coll_tuned_bcast_ompi_pipeline( void* buffer,
14                                       int original_count, 
15                                       MPI_Datatype datatype, 
16                                       int root,
17                                       MPI_Comm comm)
18 {
19     int count_by_segment = original_count;
20     size_t type_size;
21     size_t segsize =1024  << 7;
22     //mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module;
23     //mca_coll_tuned_comm_t *data = tuned_module->tuned_data;
24     
25 //    return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, module,
26 //                                                count_by_segment, data->cached_pipeline );
27     ompi_coll_tree_t * tree = ompi_coll_tuned_topo_build_chain( 1, comm, root );
28     int i;
29     int rank, size;
30     int segindex;
31     int num_segments; /* Number of segments */
32     int sendcount;    /* number of elements sent in this segment */ 
33     size_t realsegsize;
34     char *tmpbuf;
35     ptrdiff_t extent;
36     MPI_Request recv_reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
37     MPI_Request *send_reqs = NULL;
38     int req_index;
39     
40     /**
41      * Determine number of elements sent per operation.
42      */
43     type_size = smpi_datatype_size(datatype);
44
45     size = smpi_comm_size(comm);
46     rank = smpi_comm_rank(comm);
47     if(size==1)return MPI_SUCCESS;
48
49
50     const double a_p16  = 3.2118e-6; /* [1 / byte] */
51     const double b_p16  = 8.7936;   
52     const double a_p64  = 2.3679e-6; /* [1 / byte] */
53     const double b_p64  = 1.1787;     
54     const double a_p128 = 1.6134e-6; /* [1 / byte] */
55     const double b_p128 = 2.1102;
56     size_t message_size;
57
58     /* else we need data size for decision function */
59     message_size = type_size * (unsigned long)original_count;   /* needed for decision */
60
61     if (size < (a_p128 * message_size + b_p128)) {
62             //Pipeline with 128KB segments 
63             segsize = 1024  << 7;
64     }else if (size < (a_p64 * message_size + b_p64)) {
65             // Pipeline with 64KB segments 
66             segsize = 1024 << 6;
67     }else if (size < (a_p16 * message_size + b_p16)) {
68             //Pipeline with 16KB segments 
69             segsize = 1024 << 4;
70     }
71
72     COLL_TUNED_COMPUTED_SEGCOUNT( segsize, type_size, count_by_segment );
73
74     XBT_DEBUG("coll:tuned:bcast_intra_pipeline rank %d ss %5zu type_size %lu count_by_segment %d",
75                  smpi_comm_rank(comm), segsize, (unsigned long)type_size, count_by_segment);
76
77
78
79     extent = smpi_datatype_get_extent (datatype);
80     num_segments = (original_count + count_by_segment - 1) / count_by_segment;
81     realsegsize = count_by_segment * extent;
82     
83     /* Set the buffer pointers */
84     tmpbuf = (char *) buffer;
85
86     if( tree->tree_nextsize != 0 ) {
87         send_reqs = xbt_new(MPI_Request, tree->tree_nextsize  );
88     }
89
90     /* Root code */
91     if( rank == root ) {
92         /* 
93            For each segment:
94            - send segment to all children.
95              The last segment may have less elements than other segments.
96         */
97         sendcount = count_by_segment;
98         for( segindex = 0; segindex < num_segments; segindex++ ) {
99             if( segindex == (num_segments - 1) ) {
100                 sendcount = original_count - segindex * count_by_segment;
101             }
102             for( i = 0; i < tree->tree_nextsize; i++ ) { 
103                 send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype,
104                                          tree->tree_next[i], 
105                                          COLL_TAG_BCAST, comm);
106            } 
107
108             /* complete the sends before starting the next sends */
109             smpi_mpi_waitall( tree->tree_nextsize, send_reqs, 
110                                          MPI_STATUSES_IGNORE );
111
112             /* update tmp buffer */
113             tmpbuf += realsegsize;
114
115         }
116     } 
117     
118     /* Intermediate nodes code */
119     else if( tree->tree_nextsize > 0 ) { 
120         /* 
121            Create the pipeline. 
122            1) Post the first receive
123            2) For segments 1 .. num_segments
124               - post new receive
125               - wait on the previous receive to complete
126               - send this data to children
127            3) Wait on the last segment
128            4) Compute number of elements in last segment.
129            5) Send the last segment to children
130          */
131         req_index = 0;
132         recv_reqs[req_index]=smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
133                            tree->tree_prev, COLL_TAG_BCAST,
134                            comm);
135         
136         for( segindex = 1; segindex < num_segments; segindex++ ) {
137             
138             req_index = req_index ^ 0x1;
139             
140             /* post new irecv */
141             recv_reqs[req_index]= smpi_mpi_irecv( tmpbuf + realsegsize, count_by_segment,
142                                 datatype, tree->tree_prev, 
143                                 COLL_TAG_BCAST,
144                                 comm);
145             
146             /* wait for and forward the previous segment to children */
147             smpi_mpi_wait( &recv_reqs[req_index ^ 0x1], 
148                                      MPI_STATUSES_IGNORE );
149             
150             for( i = 0; i < tree->tree_nextsize; i++ ) { 
151                 send_reqs[i]=smpi_mpi_isend(tmpbuf, count_by_segment, datatype,
152                                          tree->tree_next[i], 
153                                          COLL_TAG_BCAST, comm );
154             } 
155             
156             /* complete the sends before starting the next iteration */
157             smpi_mpi_waitall( tree->tree_nextsize, send_reqs, 
158                                          MPI_STATUSES_IGNORE );
159             
160             /* Update the receive buffer */
161             tmpbuf += realsegsize;
162         }
163
164         /* Process the last segment */
165         smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUSES_IGNORE );
166         sendcount = original_count - (num_segments - 1) * count_by_segment;
167         for( i = 0; i < tree->tree_nextsize; i++ ) {
168             send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype,
169                                      tree->tree_next[i], 
170                                      COLL_TAG_BCAST, comm);
171         }
172         
173         smpi_mpi_waitall( tree->tree_nextsize, send_reqs, 
174                                      MPI_STATUSES_IGNORE );
175     }
176   
177     /* Leaf nodes */
178     else {
179         /* 
180            Receive all segments from parent in a loop:
181            1) post irecv for the first segment
182            2) for segments 1 .. num_segments
183               - post irecv for the next segment
184               - wait on the previous segment to arrive
185            3) wait for the last segment
186         */
187         req_index = 0;
188         recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
189                                  tree->tree_prev, COLL_TAG_BCAST,
190                                  comm);
191
192         for( segindex = 1; segindex < num_segments; segindex++ ) {
193             req_index = req_index ^ 0x1;
194             tmpbuf += realsegsize;
195             /* post receive for the next segment */
196             recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype, 
197                                      tree->tree_prev, COLL_TAG_BCAST,
198                                      comm);
199             /* wait on the previous segment */
200             smpi_mpi_wait( &recv_reqs[req_index ^ 0x1], 
201                                      MPI_STATUS_IGNORE );
202         }
203
204         smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUS_IGNORE );
205     }
206
207     if( NULL != send_reqs ) free(send_reqs);
208     xbt_free(tree);
209
210     return (MPI_SUCCESS);
211 }