Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
* added support for optimized collectives:
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
13
14 smpi_mpi_global_t smpi_mpi_global = NULL;
15
16 /**
17  * Operations of MPI_OP : implemented=land,sum,min,max
18  **/
19 void smpi_mpi_land_func(void *a, void *b, int *length,
20                         MPI_Datatype * datatype);
21
22 void smpi_mpi_land_func(void *a, void *b, int *length,
23                         MPI_Datatype * datatype)
24 {
25   int i;
26   if (*datatype == smpi_mpi_global->mpi_int) {
27     int *x = a, *y = b;
28     for (i = 0; i < *length; i++) {
29       y[i] = x[i] && y[i];
30     }
31   }
32 }
33
34 /**
35  * sum two vectors element-wise
36  *
37  * @param a the first vectors
38  * @param b the second vectors
39  * @return the second vector is modified and contains the element-wise sums
40  **/
41 void smpi_mpi_sum_func(void *a, void *b, int *length,
42                        MPI_Datatype * datatype);
43
44 void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
45 {
46   int i;
47   if (*datatype == smpi_mpi_global->mpi_byte) {
48     char *x = a, *y = b;
49     for (i = 0; i < *length; i++) {
50       y[i] = x[i] + y[i];
51     }
52   } else if (*datatype == smpi_mpi_global->mpi_int) {
53     int *x = a, *y = b;
54     for (i = 0; i < *length; i++) {
55       y[i] = x[i] + y[i];
56     }
57   } else if (*datatype == smpi_mpi_global->mpi_float) {
58     float *x = a, *y = b;
59     for (i = 0; i < *length; i++) {
60       y[i] = x[i] + y[i];
61     }
62   } else if (*datatype == smpi_mpi_global->mpi_double) {
63     double *x = a, *y = b;
64     for (i = 0; i < *length; i++) {
65       y[i] = x[i] + y[i];
66     }
67   }
68 }
69
70 /**
71  * compute the min of two vectors element-wise
72  **/
73 void smpi_mpi_min_func(void *a, void *b, int *length,
74                        MPI_Datatype * datatype);
75
76 void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype)
77 {
78   int i;
79   if (*datatype == smpi_mpi_global->mpi_byte) {
80     char *x = a, *y = b;
81     for (i = 0; i < *length; i++) {
82       y[i] = x[i] < y[i] ? x[i] : y[i];
83     }
84   } else {
85     if (*datatype == smpi_mpi_global->mpi_int) {
86       int *x = a, *y = b;
87       for (i = 0; i < *length; i++) {
88         y[i] = x[i] < y[i] ? x[i] : y[i];
89       }
90     } else {
91       if (*datatype == smpi_mpi_global->mpi_float) {
92         float *x = a, *y = b;
93         for (i = 0; i < *length; i++) {
94           y[i] = x[i] < y[i] ? x[i] : y[i];
95         }
96       } else {
97         if (*datatype == smpi_mpi_global->mpi_double) {
98           double *x = a, *y = b;
99           for (i = 0; i < *length; i++) {
100             y[i] = x[i] < y[i] ? x[i] : y[i];
101           }
102
103         }
104       }
105     }
106   }
107 }
108
109 /**
110  * compute the max of two vectors element-wise
111  **/
112 void smpi_mpi_max_func(void *a, void *b, int *length,
113                        MPI_Datatype * datatype);
114
115 void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype)
116 {
117   int i;
118   if (*datatype == smpi_mpi_global->mpi_byte) {
119     char *x = a, *y = b;
120     for (i = 0; i < *length; i++) {
121       y[i] = x[i] > y[i] ? x[i] : y[i];
122     }
123   } else if (*datatype == smpi_mpi_global->mpi_int) {
124     int *x = a, *y = b;
125     for (i = 0; i < *length; i++) {
126       y[i] = x[i] > y[i] ? x[i] : y[i];
127     }
128   } else if (*datatype == smpi_mpi_global->mpi_float) {
129     float *x = a, *y = b;
130     for (i = 0; i < *length; i++) {
131       y[i] = x[i] > y[i] ? x[i] : y[i];
132     }
133   } else if (*datatype == smpi_mpi_global->mpi_double) {
134     double *x = a, *y = b;
135     for (i = 0; i < *length; i++) {
136       y[i] = x[i] > y[i] ? x[i] : y[i];
137     }
138
139   }
140 }
141
142
143
144
145 /**
146  * tell the MPI rank of the calling process (from its SIMIX process id)
147  **/
148 int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm)
149 {
150   return comm->index_to_rank_map[smpi_process_index()];
151 }
152
153 void smpi_process_init(int *argc, char ***argv)
154 {
155   smpi_process_data_t pdata;
156
157   // initialize some local variables
158
159   pdata = xbt_new(s_smpi_process_data_t, 1);
160   SIMIX_process_set_data(SIMIX_process_self(), pdata);
161
162   /* get rank from command line, and remove it from argv */
163   pdata->index = atoi((*argv)[1]);
164   DEBUG1("I'm rank %d", pdata->index);
165   if (*argc > 2) {
166     memmove((*argv)[1], (*argv)[2], sizeof(char *) * (*argc - 2));
167     (*argv)[(*argc) - 1] = NULL;
168   }
169   (*argc)--;
170
171   pdata->mutex = SIMIX_mutex_init();
172   pdata->cond = SIMIX_cond_init();
173   pdata->finalize = 0;
174
175   pdata->pending_recv_request_queue = xbt_fifo_new();
176   pdata->pending_send_request_queue = xbt_fifo_new();
177   pdata->received_message_queue = xbt_fifo_new();
178
179   pdata->main = SIMIX_process_self();
180   pdata->sender = SIMIX_process_create("smpi_sender",
181                                        smpi_sender, pdata,
182                                        SIMIX_host_get_name(SIMIX_host_self()),
183                                        0, NULL,
184                                        /*props */ NULL);
185   pdata->receiver = SIMIX_process_create("smpi_receiver",
186                                          smpi_receiver, pdata,
187                                          SIMIX_host_get_name(SIMIX_host_self
188                                                              ()), 0, NULL,
189                                          /*props */ NULL);
190
191   smpi_global->main_processes[pdata->index] = SIMIX_process_self();
192   return;
193 }
194
195 void smpi_process_finalize()
196 {
197   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
198
199   pdata->finalize = 2;          /* Tell sender and receiver to quit */
200   SIMIX_process_resume(pdata->sender);
201   SIMIX_process_resume(pdata->receiver);
202   while (pdata->finalize > 0) { /* wait until it's done */
203     SIMIX_cond_wait(pdata->cond, pdata->mutex);
204   }
205
206   SIMIX_mutex_destroy(pdata->mutex);
207   SIMIX_cond_destroy(pdata->cond);
208   xbt_fifo_free(pdata->pending_recv_request_queue);
209   xbt_fifo_free(pdata->pending_send_request_queue);
210   xbt_fifo_free(pdata->received_message_queue);
211   xbt_free(pdata);
212 }
213
214 int smpi_mpi_barrier(smpi_mpi_communicator_t comm)
215 {
216
217   SIMIX_mutex_lock(comm->barrier_mutex);
218   ++comm->barrier_count;
219   if (comm->barrier_count > comm->size) {       // only happens on second barrier...
220     comm->barrier_count = 0;
221   } else if (comm->barrier_count == comm->size) {
222     SIMIX_cond_broadcast(comm->barrier_cond);
223   }
224   while (comm->barrier_count < comm->size) {
225     SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
226   }
227   SIMIX_mutex_unlock(comm->barrier_mutex);
228
229   return MPI_SUCCESS;
230 }
231
232 int smpi_mpi_isend(smpi_mpi_request_t request)
233 {
234   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
235   int retval = MPI_SUCCESS;
236
237   if (NULL == request) {
238     retval = MPI_ERR_INTERN;
239   } else {
240     xbt_fifo_push(pdata->pending_send_request_queue, request);
241     SIMIX_process_resume(pdata->sender);
242   }
243
244   return retval;
245 }
246
247 int smpi_mpi_irecv(smpi_mpi_request_t request)
248 {
249   int retval = MPI_SUCCESS;
250   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
251
252   if (NULL == request) {
253     retval = MPI_ERR_INTERN;
254   } else {
255     xbt_fifo_push(pdata->pending_recv_request_queue, request);
256
257     if (SIMIX_process_is_suspended(pdata->receiver)) {
258       SIMIX_process_resume(pdata->receiver);
259     }
260   }
261
262   return retval;
263 }
264
265 void  print_req( smpi_mpi_request_t r ); 
266 void  print_req( smpi_mpi_request_t r ) {
267         printf("***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed);
268 }
269
270
271 /**
272  * wait and friends ...
273  **/
274 int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
275 {
276   int retval = MPI_SUCCESS;
277
278   if (NULL == request) {
279     retval = MPI_ERR_INTERN;
280   } else {
281     SIMIX_mutex_lock(request->mutex);
282
283 #ifdef DEBUG_STEPH
284     print_req( request );  //@@
285 #endif
286     while (!request->completed) {
287       SIMIX_cond_wait(request->cond, request->mutex);
288     }
289     if (NULL != status) {
290       status->MPI_SOURCE = request->src;
291       status->MPI_TAG = request->tag;
292       status->MPI_ERROR = MPI_SUCCESS;
293     }
294     SIMIX_mutex_unlock(request->mutex);
295   }
296
297   return retval;
298 }
299
300 /**
301  * waitall
302  **/
303 int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[],
304                 smpi_mpi_status_t status[])
305 {
306         int cpt;
307         int index;
308         int retval;
309         smpi_mpi_status_t stat;
310
311         for (cpt = 0; cpt < count; cpt++) {
312                 retval = smpi_mpi_waitany(count, requests, &index, &stat);
313                 if (retval != MPI_SUCCESS)
314                         return retval;
315                 if (MPI_STATUS_IGNORE != status)
316                         memcpy(&(status[index]), &stat, sizeof(stat));
317         }
318         return MPI_SUCCESS;
319 }
320
321 /**
322  * waitany
323  **/
324 int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
325                      smpi_mpi_status_t * status)
326 {
327   int cpt;
328
329   *index = MPI_UNDEFINED;
330   if (NULL == requests) {
331     return MPI_ERR_INTERN;
332   }
333   /* First check if one of them is already done */
334   for (cpt = 0; cpt < count; cpt++) {
335           printf("...exam req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
336     if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */
337           printf("...found match req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
338       *index = cpt;
339       goto found_request;
340     }
341   }
342   /* If none found, block */
343   /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */
344   while (1) {
345     for (cpt = 0; cpt < count; cpt++) {
346
347 #ifdef DEBUG_STEPH
348       print_req( requests[cpt] );
349 #endif
350       if (!requests[cpt]->completed) {  /* this one is not done, wait on it */
351               printf("... blocked waiting a msg %d->%d, tag=%d\n",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag);
352         while (!requests[cpt]->completed)
353           SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex);
354
355         *index = cpt;
356         goto found_request;
357       }
358     }
359     if (cpt == count)           /* they are all done. Damn user */
360       return MPI_ERR_REQUEST;
361   }
362
363 found_request:
364 #ifdef DEBUG_STEPH
365       print_req( requests[cpt] );
366 #endif
367   requests[*index]->consumed = 1;
368 #ifdef DEBUG_STEPH
369       print_req( requests[cpt] );
370 #endif
371           printf("...accessing *req[%d]->consumed\n",cpt);
372   if (NULL != status) {
373     status->MPI_SOURCE = requests[*index]->src;
374     status->MPI_TAG = requests[*index]->tag;
375     status->MPI_ERROR = MPI_SUCCESS;
376   }
377   return MPI_SUCCESS;
378
379 }