Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2b5609228e0c8b6f815ca4bc2c0053d58aba3816
[simgrid.git] / src / smpi / colls / bcast-arrival-scatter.c
1 #include "colls_private.h"
2
3 #ifndef BCAST_ARRIVAL_PATTERN_AWARE_HEADER_SIZE
4 #define BCAST_ARRIVAL_PATTERN_AWARE_HEADER_SIZE 128
5 #endif
6
7 #ifndef BCAST_ARRIVAL_PATTERN_AWARE_MAX_NODE
8 #define BCAST_ARRIVAL_PATTERN_AWARE_MAX_NODE 128
9 #endif
10
11 /* Non-topology-specific pipelined linear-bcast function */
12 int smpi_coll_tuned_bcast_arrival_scatter(void *buf, int count,
13                                           MPI_Datatype datatype, int root,
14                                           MPI_Comm comm)
15 {
16   int tag = -COLL_TAG_BCAST;//in order to use ANY_TAG, make this one positive
17   int header_tag = 10;
18   MPI_Status status;
19
20   int curr_remainder;
21   int curr_size;
22   int curr_increment;
23   int send_offset;
24   int recv_offset;
25   int send_count;
26   int recv_count;
27
28   MPI_Status temp_status_array[BCAST_ARRIVAL_PATTERN_AWARE_MAX_NODE];
29
30   int rank, size;
31   int i, k;
32
33   int sent_count;
34   int header_index;
35   int flag_array[BCAST_ARRIVAL_PATTERN_AWARE_MAX_NODE];
36   int already_sent[BCAST_ARRIVAL_PATTERN_AWARE_MAX_NODE];
37   int header_buf[BCAST_ARRIVAL_PATTERN_AWARE_HEADER_SIZE];
38   char temp_buf[BCAST_ARRIVAL_PATTERN_AWARE_MAX_NODE];
39   int will_send[BCAST_ARRIVAL_PATTERN_AWARE_MAX_NODE];
40   int max_node = BCAST_ARRIVAL_PATTERN_AWARE_MAX_NODE;
41   int header_size = BCAST_ARRIVAL_PATTERN_AWARE_HEADER_SIZE;
42
43   MPI_Aint extent;
44   extent = smpi_datatype_get_extent(datatype);
45
46
47   /* source and destination */
48   int to, from;
49
50   rank = smpi_comm_rank(MPI_COMM_WORLD);
51   size = smpi_comm_size(MPI_COMM_WORLD);
52
53   /* message too small */
54   if (count < size) {
55     XBT_WARN("MPI_bcast_arrival_scatter use default MPI_bcast.");
56     smpi_mpi_bcast(buf, count, datatype, root, comm);
57     return MPI_SUCCESS;        
58   }
59
60
61
62   /* if root is not zero send to rank zero first
63      this can be modified to make it faster by using logical src, dst.
64    */
65   if (root != 0) {
66     if (rank == root) {
67       smpi_mpi_send(buf, count, datatype, 0, tag - 1, comm);
68     } else if (rank == 0) {
69       smpi_mpi_recv(buf, count, datatype, root, tag - 1, comm, &status);
70     }
71   }
72
73
74   /* value == 0 means root has not send data (or header) to the node yet */
75   for (i = 0; i < max_node; i++) {
76     already_sent[i] = 0;
77   }
78
79   /* start bcast */
80
81   /* root */
82   if (rank == 0) {
83
84     for (i = 0; i < max_node; i++)
85       will_send[i] = 0;
86
87     sent_count = 0;
88     while (sent_count < (size - 1)) {
89
90       for (k = 0; k < 3; k++) {
91         for (i = 1; i < size; i++) {
92           if ((already_sent[i] == 0) && (will_send[i] == 0)) {
93             smpi_mpi_iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i],
94                        &temp_status_array[i]);
95             if (flag_array[i] == 1) {
96               will_send[i] = 1;
97               smpi_mpi_recv(&temp_buf[i], 1, MPI_CHAR, i, tag, MPI_COMM_WORLD,
98                        &status);
99               i = 0;
100             }
101           }
102         }
103       }
104       header_index = 0;
105
106       /* recv 1-byte message in this round */
107       for (i = 1; i < size; i++) {
108         /* message arrive */
109         if ((will_send[i] == 1) && (already_sent[i] == 0)) {
110           header_buf[header_index] = i;
111           header_index++;
112           sent_count++;
113
114           /* will send in the next step */
115           already_sent[i] = 1;
116         }
117       }
118
119       /*
120          if (header_index != 0) {
121          printf("header index = %d node = ",header_index);
122          for (i=0;i<header_index;i++) {
123          printf("%d ",header_buf[i]);
124          }
125          printf("\n");
126          }
127        */
128
129       /* send header followed by data */
130       if (header_index != 0) {
131         header_buf[header_index] = -1;
132
133         /* send header */
134         for (i = 0; i < header_index; i++) {
135           to = header_buf[i];
136           smpi_mpi_send(header_buf, header_size, MPI_INT, to, header_tag, comm);
137         }
138
139         curr_remainder = count % header_index;
140         curr_size = (count / header_index);
141         curr_increment = curr_size * extent;
142
143         /* send data */
144
145         for (i = 0; i < header_index; i++) {
146           to = header_buf[i];
147           if ((i == (header_index - 1)) || (curr_size == 0))
148             curr_size += curr_remainder;
149           //printf("Root send to %d index %d\n",to,(i*curr_increment));
150           smpi_mpi_send((char *) buf + (i * curr_increment), curr_size, datatype, to,
151                    tag, comm);
152         }
153       }
154     }                           /* while (sent_count < size-1) */
155   }
156
157   /* rank 0 */
158   /* none root */
159   else {
160     /* send 1-byte message to root */
161     smpi_mpi_send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
162
163     /* wait for header forward when required */
164     smpi_mpi_recv(header_buf, header_size, MPI_INT, 0, header_tag, comm, &status);
165
166     /* search for where it is */
167     int myordering = 0;
168     while (rank != header_buf[myordering]) {
169       myordering++;
170     }
171
172     int total_nodes = 0;
173     while (header_buf[total_nodes] != -1) {
174       total_nodes++;
175     }
176
177     curr_remainder = count % total_nodes;
178     curr_size = (count / total_nodes);
179     curr_increment = curr_size * extent;
180     int recv_size = curr_size;
181
182     /* receive data */
183     if (myordering == (total_nodes - 1))
184       recv_size += curr_remainder;
185     smpi_mpi_recv((char *) buf + (myordering * curr_increment), recv_size, datatype,
186              0, tag, comm, &status);
187
188     /* at this point all nodes in this set perform all-gather operation */
189     to = header_buf[myordering + 1];
190     from = header_buf[myordering - 1];
191     if (myordering == 0)
192       from = header_buf[total_nodes - 1];
193     if (myordering == (total_nodes - 1))
194       to = header_buf[0];
195
196
197     /* last segment may have a larger size since it also include the remainder */
198     int last_segment_ptr = (total_nodes - 1) * (count / total_nodes) * extent;
199
200
201     /* allgather */
202     for (i = 0; i < total_nodes - 1; i++) {
203       send_offset =
204           ((myordering - i + total_nodes) % total_nodes) * curr_increment;
205       recv_offset =
206           ((myordering - i - 1 + total_nodes) % total_nodes) * curr_increment;
207
208       /* adjust size */
209       if (send_offset != last_segment_ptr)
210         send_count = curr_size;
211       else
212         send_count = curr_size + curr_remainder;
213
214       if (recv_offset != last_segment_ptr)
215         recv_count = curr_size;
216       else
217         recv_count = curr_size + curr_remainder;
218
219       //printf("\t\tnode %d sent_to %d recv_from %d send_size %d recv_size %d\n",rank,to,from,send_count,recv_count);
220       //printf("\tnode %d sent_offset %d send_count %d\n",rank,send_offset,send_count);
221
222
223       smpi_mpi_sendrecv((char *) buf + send_offset, send_count, datatype, to,
224                    tag + i, (char *) buf + recv_offset, recv_count, datatype,
225                    from, tag + i, comm, &status);
226     }
227   }                             /* non-root */
228
229   return MPI_SUCCESS;
230 }