Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
7021f117f9fac2220e7f057c5c96cf161ddd0186
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
14
15 smpi_mpi_global_t smpi_mpi_global = NULL;
16
17
18 /**
19  * Get the lower bound and extent for a Datatype 
20  *
21  * FIXME: this an incomplete implementation as we do not support yet MPI_Type_commit.
22  * Hence, this can be called only for primitive type MPI_INT, MPI_DOUBLE, ...
23  *
24  * remark: MPI-1 has also the deprecated 
25  * int MPI_Type_extent(MPI_Datatype datatype, *MPI_Aint *extent);
26  *
27  **/
28 int smpi_mpi_type_get_extent(MPI_Datatype datatype, MPI_Aint *lb, MPI_Aint *extent) {
29         *extent =  datatype->ub - datatype->lb;
30         return( MPI_SUCCESS );
31 }
32
33 /**
34  * Operations of MPI_OP : implemented=land,sum,min,max
35  **/
36 void smpi_mpi_land_func(void *a, void *b, int *length,
37                         MPI_Datatype * datatype);
38
39 void smpi_mpi_land_func(void *a, void *b, int *length,
40                         MPI_Datatype * datatype)
41 {
42   int i;
43   if (*datatype == smpi_mpi_global->mpi_int) {
44     int *x = a, *y = b;
45     for (i = 0; i < *length; i++) {
46       y[i] = x[i] && y[i];
47     }
48   }
49 }
50
51 /**
52  * sum two vectors element-wise
53  *
54  * @param a the first vectors
55  * @param b the second vectors
56  * @return the second vector is modified and contains the element-wise sums
57  **/
58 void smpi_mpi_sum_func(void *a, void *b, int *length,
59                        MPI_Datatype * datatype);
60
61 void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
62 {
63   int i;
64   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
65     char *x = a, *y = b;
66     for (i = 0; i < *length; i++) {
67       y[i] = x[i] + y[i];
68     }
69   } else if (*datatype == smpi_mpi_global->mpi_int) {
70     int *x = a, *y = b;
71     for (i = 0; i < *length; i++) {
72       y[i] = x[i] + y[i];
73     }
74   } else if (*datatype == smpi_mpi_global->mpi_float) {
75     float *x = a, *y = b;
76     for (i = 0; i < *length; i++) {
77       y[i] = x[i] + y[i];
78     }
79   } else if (*datatype == smpi_mpi_global->mpi_double) {
80     double *x = a, *y = b;
81     for (i = 0; i < *length; i++) {
82       y[i] = x[i] + y[i];
83     }
84   }
85 }
86 /**
87  *i multiply two vectors element-wise
88  *
89  * @param a the first vectors
90  * @param b the second vectors
91  * @return the second vector is modified and contains the element-wise products
92  **/
93 void smpi_mpi_prod_func(void *a, void *b, int *length,
94                        MPI_Datatype * datatype);
95
96 void smpi_mpi_prod_func(void *a, void *b, int *length, MPI_Datatype * datatype)
97 {
98   int i;
99   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
100     char *x = a, *y = b;
101     for (i = 0; i < *length; i++) {
102       y[i] = x[i] * y[i];
103     }
104   } else if (*datatype == smpi_mpi_global->mpi_int) {
105     int *x = a, *y = b;
106     for (i = 0; i < *length; i++) {
107       y[i] = x[i] * y[i];
108     }
109   } else if (*datatype == smpi_mpi_global->mpi_float) {
110     float *x = a, *y = b;
111     for (i = 0; i < *length; i++) {
112       y[i] = x[i] * y[i];
113     }
114   } else if (*datatype == smpi_mpi_global->mpi_double) {
115     double *x = a, *y = b;
116     for (i = 0; i < *length; i++) {
117       y[i] = x[i] * y[i];
118     }
119   }
120 }
121 /**
122  * compute the min of two vectors element-wise
123  **/
124 void smpi_mpi_min_func(void *a, void *b, int *length,
125                        MPI_Datatype * datatype);
126
127 void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype)
128 {
129   int i;
130   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
131     char *x = a, *y = b;
132     for (i = 0; i < *length; i++) {
133       y[i] = x[i] < y[i] ? x[i] : y[i];
134     }
135   } else {
136     if (*datatype == smpi_mpi_global->mpi_int) {
137       int *x = a, *y = b;
138       for (i = 0; i < *length; i++) {
139         y[i] = x[i] < y[i] ? x[i] : y[i];
140       }
141     } else {
142       if (*datatype == smpi_mpi_global->mpi_float) {
143         float *x = a, *y = b;
144         for (i = 0; i < *length; i++) {
145           y[i] = x[i] < y[i] ? x[i] : y[i];
146         }
147       } else {
148         if (*datatype == smpi_mpi_global->mpi_double) {
149           double *x = a, *y = b;
150           for (i = 0; i < *length; i++) {
151             y[i] = x[i] < y[i] ? x[i] : y[i];
152           }
153
154         }
155       }
156     }
157   }
158 }
159
160 /**
161  * compute the max of two vectors element-wise
162  **/
163 void smpi_mpi_max_func(void *a, void *b, int *length,
164                        MPI_Datatype * datatype);
165
166 void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype)
167 {
168   int i;
169   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
170     char *x = a, *y = b;
171     for (i = 0; i < *length; i++) {
172       y[i] = x[i] > y[i] ? x[i] : y[i];
173     }
174   } else if (*datatype == smpi_mpi_global->mpi_int) {
175     int *x = a, *y = b;
176     for (i = 0; i < *length; i++) {
177       y[i] = x[i] > y[i] ? x[i] : y[i];
178     }
179   } else if (*datatype == smpi_mpi_global->mpi_float) {
180     float *x = a, *y = b;
181     for (i = 0; i < *length; i++) {
182       y[i] = x[i] > y[i] ? x[i] : y[i];
183     }
184   } else if (*datatype == smpi_mpi_global->mpi_double) {
185     double *x = a, *y = b;
186     for (i = 0; i < *length; i++) {
187       y[i] = x[i] > y[i] ? x[i] : y[i];
188     }
189
190   }
191 }
192
193
194
195
196 /**
197  * tell the MPI rank of the calling process (from its SIMIX process id)
198  **/
199 int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm)
200 {
201   return comm->index_to_rank_map[smpi_process_index()];
202 }
203
204 void smpi_process_init(int *argc, char ***argv)
205 {
206   smpi_process_data_t pdata;
207
208   // initialize some local variables
209
210   pdata = xbt_new(s_smpi_process_data_t, 1);
211   SIMIX_process_set_data(SIMIX_process_self(), pdata);
212
213   /* get rank from command line, and remove it from argv */
214   pdata->index = atoi((*argv)[1]);
215   DEBUG1("I'm rank %d", pdata->index);
216   if (*argc > 2) {
217     memmove((*argv)[1], (*argv)[2], sizeof(char *) * (*argc - 2));
218     (*argv)[(*argc) - 1] = NULL;
219   }
220   (*argc)--;
221
222   pdata->mutex = SIMIX_mutex_init();
223   pdata->cond = SIMIX_cond_init();
224   pdata->finalize = 0;
225
226   pdata->pending_recv_request_queue = xbt_fifo_new();
227   pdata->pending_send_request_queue = xbt_fifo_new();
228   pdata->received_message_queue = xbt_fifo_new();
229
230   pdata->main = SIMIX_process_self();
231   pdata->sender = SIMIX_process_create("smpi_sender",
232                                        smpi_sender, pdata,
233                                        SIMIX_host_get_name(SIMIX_host_self()),
234                                        0, NULL,
235                                        /*props */ NULL);
236   pdata->receiver = SIMIX_process_create("smpi_receiver",
237                                          smpi_receiver, pdata,
238                                          SIMIX_host_get_name(SIMIX_host_self
239                                                              ()), 0, NULL,
240                                          /*props */ NULL);
241
242   smpi_global->main_processes[pdata->index] = SIMIX_process_self();
243   return;
244 }
245
246 void smpi_process_finalize()
247 {
248   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
249
250   pdata->finalize = 2;          /* Tell sender and receiver to quit */
251   SIMIX_process_resume(pdata->sender);
252   SIMIX_process_resume(pdata->receiver);
253   while (pdata->finalize > 0) { /* wait until it's done */
254     SIMIX_cond_wait(pdata->cond, pdata->mutex);
255   }
256
257   SIMIX_mutex_destroy(pdata->mutex);
258   SIMIX_cond_destroy(pdata->cond);
259   xbt_fifo_free(pdata->pending_recv_request_queue);
260   xbt_fifo_free(pdata->pending_send_request_queue);
261   xbt_fifo_free(pdata->received_message_queue);
262   xbt_free(pdata);
263 }
264
265
266 /*int smpi_mpi_barrier(smpi_mpi_communicator_t comm)
267 {
268
269   SIMIX_mutex_lock(comm->barrier_mutex);
270   ++comm->barrier_count;
271   if (comm->barrier_count > comm->size) {       // only happens on second barrier...
272     comm->barrier_count = 0;
273   } else if (comm->barrier_count == comm->size) {
274     SIMIX_cond_broadcast(comm->barrier_cond);
275   }
276   while (comm->barrier_count < comm->size) {
277     SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
278   }
279   SIMIX_mutex_unlock(comm->barrier_mutex);
280
281   return MPI_SUCCESS;
282 }
283 */
284
285 int smpi_mpi_isend(smpi_mpi_request_t request)
286 {
287   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
288   int retval = MPI_SUCCESS;
289
290   if (NULL == request) {
291     retval = MPI_ERR_INTERN;
292   } else {
293     xbt_fifo_push(pdata->pending_send_request_queue, request);
294     SIMIX_process_resume(pdata->sender);
295   }
296
297   return retval;
298 }
299
300 int smpi_mpi_irecv(smpi_mpi_request_t request)
301 {
302   int retval = MPI_SUCCESS;
303   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
304
305   if (NULL == request) {
306     retval = MPI_ERR_INTERN;
307   } else {
308     xbt_fifo_push(pdata->pending_recv_request_queue, request);
309
310     if (SIMIX_process_is_suspended(pdata->receiver)) {
311       SIMIX_process_resume(pdata->receiver);
312     }
313   }
314
315   return retval;
316 }
317
318 void  print_req( smpi_mpi_request_t r ); 
319 void  print_req( smpi_mpi_request_t r ) {
320         printf("***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed);
321 }
322
323
324 /**
325  * wait and friends ...
326  **/
327 int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
328 {
329   int retval = MPI_SUCCESS;
330
331   if (NULL == request) {
332     retval = MPI_ERR_INTERN;
333   } else {
334     SIMIX_mutex_lock(request->mutex);
335 //#define DEBUG_STEPH
336 #ifdef DEBUG_STEPH
337     print_req( request );  //@@
338 #endif
339     while (!request->completed) {
340       SIMIX_cond_wait(request->cond, request->mutex);
341     }
342     if (NULL != status) {
343       status->MPI_SOURCE = request->src;
344       status->MPI_TAG = request->tag;
345       status->MPI_ERROR = MPI_SUCCESS;
346     }
347     SIMIX_mutex_unlock(request->mutex);
348   }
349
350   return retval;
351 }
352
353 /**
354  * waitall
355  **/
356 int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[],
357                 smpi_mpi_status_t status[])
358 {
359         int cpt;
360         int index;
361         int retval;
362         smpi_mpi_status_t stat;
363
364         for (cpt = 0; cpt < count; cpt++) {
365                 retval = smpi_mpi_waitany(count, requests, &index, &stat);
366                 if (retval != MPI_SUCCESS)
367                         return retval;
368                 if (MPI_STATUS_IGNORE != status)
369                         memcpy(&(status[index]), &stat, sizeof(stat));
370         }
371         return MPI_SUCCESS;
372 }
373
374 /**
375  * waitany
376  **/
377 int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
378                      smpi_mpi_status_t * status)
379 {
380   int cpt;
381
382   *index = MPI_UNDEFINED;
383   if (NULL == requests) {
384     return MPI_ERR_INTERN;
385   }
386   /* First check if one of them is already done */
387   for (cpt = 0; cpt < count; cpt++) {
388 #ifdef DEBUG_STEPH
389           printf("...exam req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
390 #endif
391     if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */
392 #ifdef DEBUG_STEPH
393           printf("...found match req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
394 #endif
395       *index = cpt;
396       goto found_request;
397     }
398   }
399   /* If none found, block */
400   /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */
401   while (1) {
402     for (cpt = 0; cpt < count; cpt++) {
403
404 #ifdef DEBUG_STEPH
405       print_req( requests[cpt] );
406 #endif
407       if (!requests[cpt]->completed) {  /* this one is not done, wait on it */
408 #ifdef DEBUG_STEPH
409               printf("... blocked waiting a msg %d->%d, tag=%d\n",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag);
410 #endif
411         while (!requests[cpt]->completed)
412           SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex);
413
414         *index = cpt;
415         goto found_request;
416       }
417     }
418     if (cpt == count)           /* they are all done. Damn user */
419       return MPI_ERR_REQUEST;
420   }
421
422 found_request:
423 #ifdef DEBUG_STEPH
424       print_req( requests[cpt] );
425 #endif
426   requests[*index]->consumed = 1;
427 #ifdef DEBUG_STEPH
428       print_req( requests[cpt] );
429           printf("...accessing *req[%d]->consumed\n",cpt);
430 #endif
431   if (NULL != status) {
432     status->MPI_SOURCE = requests[*index]->src;
433     status->MPI_TAG = requests[*index]->tag;
434     status->MPI_ERROR = MPI_SUCCESS;
435   }
436   return MPI_SUCCESS;
437
438 }