Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
09fbbcd5712a80126ebf2aef115cf85cf2992ad6
[simgrid.git] / src / smpi / colls / bcast-arrival-pattern-aware.cpp
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "colls_private.h"
8
9 static int bcast_NTSL_segment_size_in_byte = 8192;
10
11 #define HEADER_SIZE 1024
12 #define MAX_NODE 1024
13
14 /* Non-topology-specific pipelined linear-bcast function */
15 int smpi_coll_tuned_bcast_arrival_pattern_aware(void *buf, int count,
16                                                 MPI_Datatype datatype, int root,
17                                                 MPI_Comm comm)
18 {
19   int tag = -COLL_TAG_BCAST;
20   MPI_Status status;
21   MPI_Request request;
22   MPI_Request *send_request_array;
23   MPI_Request *recv_request_array;
24   MPI_Status *send_status_array;
25   MPI_Status *recv_status_array;
26
27   MPI_Status temp_status_array[MAX_NODE];
28
29   int rank, size;
30   int i, j;
31
32   int sent_count;
33   int header_index;
34   int flag_array[MAX_NODE];
35   int already_sent[MAX_NODE];
36   int to_clean[MAX_NODE];
37   int header_buf[HEADER_SIZE];
38   char temp_buf[MAX_NODE];
39
40   MPI_Aint extent;
41   extent = smpi_datatype_get_extent(datatype);
42
43   /* destination */
44   int to;
45
46
47
48   rank = smpi_comm_rank(comm);
49   size = smpi_comm_size(comm);
50
51
52   /* segment is segment size in number of elements (not bytes) */
53   int segment = bcast_NTSL_segment_size_in_byte / extent;
54   segment =  segment == 0 ? 1 :segment; 
55   /* pipeline length */
56   int pipe_length = count / segment;
57
58   /* use for buffer offset for sending and receiving data = segment size in byte */
59   int increment = segment * extent;
60
61   /* if the input size is not divisible by segment size => 
62      the small remainder will be done with native implementation */
63   int remainder = count % segment;
64
65   /* if root is not zero send to rank zero first
66      this can be modified to make it faster by using logical src, dst.
67    */
68   if (root != 0) {
69     if (rank == root) {
70       smpi_mpi_send(buf, count, datatype, 0, tag, comm);
71     } else if (rank == 0) {
72       smpi_mpi_recv(buf, count, datatype, root, tag, comm, &status);
73     }
74   }
75
76   /* value == 0 means root has not send data (or header) to the node yet */
77   for (i = 0; i < MAX_NODE; i++) {
78     already_sent[i] = 0;
79     to_clean[i] = 0;
80   }
81
82   /* when a message is smaller than a block size => no pipeline */
83   if (count <= segment) {
84     if (rank == 0) {
85       sent_count = 0;
86
87       while (sent_count < (size - 1)) {
88         for (i = 1; i < size; i++) {
89           smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
90                      MPI_STATUSES_IGNORE);
91         }
92
93         header_index = 0;
94         /* recv 1-byte message */
95         for (i = 1; i < size; i++) {
96
97           /* message arrive */
98           if ((flag_array[i] == 1) && (already_sent[i] == 0)) {
99             smpi_mpi_recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status);
100             header_buf[header_index] = i;
101             header_index++;
102             sent_count++;
103
104             /* will send in the next step */
105             already_sent[i] = 1;
106           }
107         }
108
109         /* send header followed by data */
110         if (header_index != 0) {
111           header_buf[header_index] = -1;
112           to = header_buf[0];
113           smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
114           smpi_mpi_send(buf, count, datatype, to, tag, comm);
115         }
116
117         /* randomly MPI_Send to one */
118         else {
119           /* search for the first node that never received data before */
120           for (i = 1; i < size; i++) {
121             if (already_sent[i] == 0) {
122               header_buf[0] = i;
123               header_buf[1] = -1;
124               smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, i, tag, comm);
125               smpi_mpi_send(buf, count, datatype, i, tag, comm);
126               already_sent[i] = 1;
127               sent_count++;
128               break;
129             }
130           }
131         }
132
133
134       }                         /* while loop */
135     }
136
137     /* non-root */
138     else {
139
140       /* send 1-byte message to root */
141       smpi_mpi_send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
142
143       /* wait for header and data, forward when required */
144       smpi_mpi_recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
145                &status);
146       smpi_mpi_recv(buf, count, datatype, MPI_ANY_SOURCE, tag, comm, &status);
147
148       /* search for where it is */
149       int myordering = 0;
150       while (rank != header_buf[myordering]) {
151         myordering++;
152       }
153
154       /* send header followed by data */
155       if (header_buf[myordering + 1] != -1) {
156         smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
157                  tag, comm);
158         smpi_mpi_send(buf, count, datatype, header_buf[myordering + 1], tag, comm);
159       }
160     }
161   }
162   /* pipeline bcast */
163   else {
164     send_request_array =
165         (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
166     recv_request_array =
167         (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
168     send_status_array =
169         (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
170     recv_status_array =
171         (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
172
173     if (rank == 0) {
174       //double start2 = MPI_Wtime();
175       sent_count = 0;
176       //int iteration = 0;
177       while (sent_count < (size - 1)) {
178         //iteration++;
179         //start = MPI_Wtime();
180         for (i = 1; i < size; i++) {
181           smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
182                      &temp_status_array[i]);
183         }
184         //total = MPI_Wtime() - start;
185         //total *= 1000;
186         //printf("Iprobe time = %.2f\n",total);
187         header_index = 0;
188
189         MPI_Wtime();
190         /* recv 1-byte message */
191         for (i = 1; i < size; i++) {
192           /* message arrive */
193           if ((flag_array[i] == 1) && (already_sent[i] == 0)) {
194             smpi_mpi_recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm,
195                      &status);
196             header_buf[header_index] = i;
197             header_index++;
198             sent_count++;
199
200             /* will send in the next step */
201             already_sent[i] = 1;
202           }
203         }
204         //total = MPI_Wtime() - start;
205         //total *= 1000;
206         //printf("Recv 1-byte time = %.2f\n",total);
207
208         /*
209            if (header_index != 0) {
210            printf("header index = %d node = ",header_index);
211            for (i=0;i<header_index;i++) {
212            printf("%d ",header_buf[i]);
213            }
214            printf("\n");
215            }
216          */
217
218         /* send header followed by data */
219         if (header_index != 0) {
220           header_buf[header_index] = -1;
221           to = header_buf[0];
222
223           //start = MPI_Wtime();
224
225           /* send header */
226           smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
227
228           //total = MPI_Wtime() - start;
229           //total *= 1000;
230           //printf("\tSend header to %d time = %.2f\n",to,total);
231
232           //start = MPI_Wtime();
233
234           /* send data - non-pipeline case */
235
236           if (0 == 1) {
237             //if (header_index == 1) {
238             smpi_mpi_send(buf, count, datatype, to, tag, comm);
239           }
240
241
242           /* send data - pipeline */
243           else {
244             for (i = 0; i < pipe_length; i++) {
245               smpi_mpi_send((char *)buf + (i * increment), segment, datatype, to, tag, comm);
246             }
247             //smpi_mpi_waitall((pipe_length), send_request_array, send_status_array);
248           }
249           //total = MPI_Wtime() - start;
250           //total *= 1000;
251           //printf("\tSend data to %d time = %.2f\n",to,total);
252
253         }
254
255
256
257         /* randomly MPI_Send to one node */
258         else {
259           /* search for the first node that never received data before */
260           for (i = 1; i < size; i++) {
261             if (already_sent[i] == 0) {
262               header_buf[0] = i;
263               header_buf[1] = -1;
264               to = i;
265
266               //start = MPI_Wtime();
267               smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
268
269               /* still need to chop data so that we can use the same non-root code */
270               for (j = 0; j < pipe_length; j++) {
271                 smpi_mpi_send((char *)buf + (j * increment), segment, datatype, to, tag,
272                          comm);
273               }
274
275               //smpi_mpi_send(buf,count,datatype,to,tag,comm);
276               //smpi_mpi_wait(&request,MPI_STATUS_IGNORE);
277
278               //total = MPI_Wtime() - start;
279               //total *= 1000;
280               //printf("SEND TO SINGLE node %d time = %.2f\n",i,total);
281
282
283               already_sent[i] = 1;
284               to_clean[i]=1;
285               sent_count++;
286               break;
287             }
288           }
289         }
290
291       }                         /* while loop */
292
293       for(i=0; i<size; i++)
294         if(to_clean[i]!=0)smpi_mpi_recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm,
295                      &status);
296       //total = MPI_Wtime() - start2;
297       //total *= 1000;
298       //printf("Node zero iter = %d time = %.2f\n",iteration,total);
299     }
300
301     /* rank 0 */
302     /* none root */
303     else {
304       /* send 1-byte message to root */
305       smpi_mpi_send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
306
307       /* wait for header forward when required */
308       request = smpi_mpi_irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm);
309       smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
310
311       /* search for where it is */
312       int myordering = 0;
313       while (rank != header_buf[myordering]) {
314         myordering++;
315       }
316
317       /* send header when required */
318       if (header_buf[myordering + 1] != -1) {
319         smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
320                  tag, comm);
321       }
322
323       /* receive data */
324
325       if (0 == -1) {
326         //if (header_buf[1] == -1) {
327         request = smpi_mpi_irecv(buf, count, datatype, 0, tag, comm);
328         smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
329         //printf("\t\tnode %d ordering = %d receive data from root\n",rank,myordering);
330       } else {
331         for (i = 0; i < pipe_length; i++) {
332           recv_request_array[i] = smpi_mpi_irecv((char *)buf + (i * increment), segment, datatype, MPI_ANY_SOURCE,
333                                                  tag, comm);
334         }
335       }
336
337       /* send data */
338       if (header_buf[myordering + 1] != -1) {
339         for (i = 0; i < pipe_length; i++) {
340           smpi_mpi_wait(&recv_request_array[i], MPI_STATUS_IGNORE);
341           send_request_array[i] = smpi_mpi_isend((char *)buf + (i * increment), segment, datatype,
342                     header_buf[myordering + 1], tag, comm);
343         }
344         smpi_mpi_waitall((pipe_length), send_request_array, send_status_array);
345       }else{
346           smpi_mpi_waitall(pipe_length, recv_request_array, recv_status_array);
347           }
348     
349     }
350
351     free(send_request_array);
352     free(recv_request_array);
353     free(send_status_array);
354     free(recv_status_array);
355   }                             /* end pipeline */
356
357   /* when count is not divisible by block size, use default BCAST for the remainder */
358   if ((remainder != 0) && (count > segment)) {
359     XBT_WARN("MPI_bcast_arrival_pattern_aware use default MPI_bcast.");   
360     smpi_mpi_bcast((char *)buf + (pipe_length * increment), remainder, datatype, root, comm);
361   }
362
363   return MPI_SUCCESS;
364 }