Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
remove warning
[simgrid.git] / src / smpi / colls / bcast-ompi-pipeline.c
1  #include "colls_private.h"
2  #include "coll_tuned_topo.h"
3
4 #define MAXTREEFANOUT 32
5
6
7 int smpi_coll_tuned_bcast_ompi_pipeline( void* buffer,
8                                       int original_count, 
9                                       MPI_Datatype datatype, 
10                                       int root,
11                                       MPI_Comm comm)
12 {
13     int count_by_segment = original_count;
14     size_t type_size;
15     int segsize =1024  << 7;
16     //mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module;
17     //mca_coll_tuned_comm_t *data = tuned_module->tuned_data;
18     
19 //    return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, module,
20 //                                                count_by_segment, data->cached_pipeline );
21     ompi_coll_tree_t * tree = ompi_coll_tuned_topo_build_chain( 1, comm, root );
22     int i;
23     int rank, size;
24     int segindex;
25     int num_segments; /* Number of segments */
26     int sendcount;    /* number of elements sent in this segment */ 
27     size_t realsegsize;
28     char *tmpbuf;
29     ptrdiff_t extent;
30     MPI_Request recv_reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
31     MPI_Request *send_reqs = NULL;
32     int req_index;
33     
34     /**
35      * Determine number of elements sent per operation.
36      */
37     type_size = smpi_datatype_size(datatype);
38
39     size = smpi_comm_size(comm);
40     rank = smpi_comm_rank(comm);
41     xbt_assert( size > 1 );
42
43
44     const double a_p16  = 3.2118e-6; /* [1 / byte] */
45     const double b_p16  = 8.7936;   
46     const double a_p64  = 2.3679e-6; /* [1 / byte] */
47     const double b_p64  = 1.1787;     
48     const double a_p128 = 1.6134e-6; /* [1 / byte] */
49     const double b_p128 = 2.1102;
50     size_t message_size;
51
52     /* else we need data size for decision function */
53     message_size = type_size * (unsigned long)original_count;   /* needed for decision */
54
55     if (size < (a_p128 * message_size + b_p128)) {
56             //Pipeline with 128KB segments 
57             segsize = 1024  << 7;
58     }else if (size < (a_p64 * message_size + b_p64)) {
59             // Pipeline with 64KB segments 
60             segsize = 1024 << 6;
61     }else if (size < (a_p16 * message_size + b_p16)) {
62             //Pipeline with 16KB segments 
63             segsize = 1024 << 4;
64     }
65
66     COLL_TUNED_COMPUTED_SEGCOUNT( segsize, type_size, count_by_segment );
67
68     XBT_DEBUG("coll:tuned:bcast_intra_pipeline rank %d ss %5d type_size %lu count_by_segment %d",
69                  smpi_comm_rank(comm), segsize, (unsigned long)type_size, count_by_segment);
70
71
72
73     extent = smpi_datatype_get_extent (datatype);
74     num_segments = (original_count + count_by_segment - 1) / count_by_segment;
75     realsegsize = count_by_segment * extent;
76     
77     /* Set the buffer pointers */
78     tmpbuf = (char *) buffer;
79
80     if( tree->tree_nextsize != 0 ) {
81         send_reqs = xbt_new(MPI_Request, tree->tree_nextsize  );
82     }
83
84     /* Root code */
85     if( rank == root ) {
86         /* 
87            For each segment:
88            - send segment to all children.
89              The last segment may have less elements than other segments.
90         */
91         sendcount = count_by_segment;
92         for( segindex = 0; segindex < num_segments; segindex++ ) {
93             if( segindex == (num_segments - 1) ) {
94                 sendcount = original_count - segindex * count_by_segment;
95             }
96             for( i = 0; i < tree->tree_nextsize; i++ ) { 
97                 send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype,
98                                          tree->tree_next[i], 
99                                          777, comm);
100            } 
101
102             /* complete the sends before starting the next sends */
103             smpi_mpi_waitall( tree->tree_nextsize, send_reqs, 
104                                          MPI_STATUSES_IGNORE );
105
106             /* update tmp buffer */
107             tmpbuf += realsegsize;
108
109         }
110     } 
111     
112     /* Intermediate nodes code */
113     else if( tree->tree_nextsize > 0 ) { 
114         /* 
115            Create the pipeline. 
116            1) Post the first receive
117            2) For segments 1 .. num_segments
118               - post new receive
119               - wait on the previous receive to complete
120               - send this data to children
121            3) Wait on the last segment
122            4) Compute number of elements in last segment.
123            5) Send the last segment to children
124          */
125         req_index = 0;
126         recv_reqs[req_index]=smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
127                            tree->tree_prev, 777,
128                            comm);
129         
130         for( segindex = 1; segindex < num_segments; segindex++ ) {
131             
132             req_index = req_index ^ 0x1;
133             
134             /* post new irecv */
135             recv_reqs[req_index]= smpi_mpi_irecv( tmpbuf + realsegsize, count_by_segment,
136                                 datatype, tree->tree_prev, 
137                                 777, 
138                                 comm);
139             
140             /* wait for and forward the previous segment to children */
141             smpi_mpi_wait( &recv_reqs[req_index ^ 0x1], 
142                                      MPI_STATUSES_IGNORE );
143             
144             for( i = 0; i < tree->tree_nextsize; i++ ) { 
145                 send_reqs[i]=smpi_mpi_isend(tmpbuf, count_by_segment, datatype,
146                                          tree->tree_next[i], 
147                                          777, comm );
148             } 
149             
150             /* complete the sends before starting the next iteration */
151             smpi_mpi_waitall( tree->tree_nextsize, send_reqs, 
152                                          MPI_STATUSES_IGNORE );
153             
154             /* Update the receive buffer */
155             tmpbuf += realsegsize;
156         }
157
158         /* Process the last segment */
159         smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUSES_IGNORE );
160         sendcount = original_count - (num_segments - 1) * count_by_segment;
161         for( i = 0; i < tree->tree_nextsize; i++ ) {
162             send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype,
163                                      tree->tree_next[i], 
164                                      777, comm);
165         }
166         
167         smpi_mpi_waitall( tree->tree_nextsize, send_reqs, 
168                                      MPI_STATUSES_IGNORE );
169     }
170   
171     /* Leaf nodes */
172     else {
173         /* 
174            Receive all segments from parent in a loop:
175            1) post irecv for the first segment
176            2) for segments 1 .. num_segments
177               - post irecv for the next segment
178               - wait on the previous segment to arrive
179            3) wait for the last segment
180         */
181         req_index = 0;
182         recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
183                                  tree->tree_prev, 777,
184                                  comm);
185
186         for( segindex = 1; segindex < num_segments; segindex++ ) {
187             req_index = req_index ^ 0x1;
188             tmpbuf += realsegsize;
189             /* post receive for the next segment */
190             recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype, 
191                                      tree->tree_prev, 777, 
192                                      comm);
193             /* wait on the previous segment */
194             smpi_mpi_wait( &recv_reqs[req_index ^ 0x1], 
195                                      MPI_STATUS_IGNORE );
196         }
197
198         smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUS_IGNORE );
199     }
200
201     if( NULL != send_reqs ) free(send_reqs);
202
203     return (MPI_SUCCESS);
204 }