Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
separate user-level and internal-level for SMPI_MPI_{Reduce,Bcast}
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
13
14 smpi_mpi_global_t smpi_mpi_global = NULL;
15
16 /**
17  * Operations of MPI_OP : implemented=land,sum,min,max
18  **/
19 void smpi_mpi_land_func(void *a, void *b, int *length,
20                         MPI_Datatype * datatype);
21
22 void smpi_mpi_land_func(void *a, void *b, int *length,
23                         MPI_Datatype * datatype)
24 {
25   int i;
26   if (*datatype == smpi_mpi_global->mpi_int) {
27     int *x = a, *y = b;
28     for (i = 0; i < *length; i++) {
29       y[i] = x[i] && y[i];
30     }
31   }
32 }
33
34 /**
35  * sum two vectors element-wise
36  *
37  * @param a the first vectors
38  * @param b the second vectors
39  * @return the second vector is modified and contains the element-wise sums
40  **/
41 void smpi_mpi_sum_func(void *a, void *b, int *length,
42                        MPI_Datatype * datatype);
43
44 void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
45 {
46   int i;
47   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
48     char *x = a, *y = b;
49     for (i = 0; i < *length; i++) {
50       y[i] = x[i] + y[i];
51     }
52   } else if (*datatype == smpi_mpi_global->mpi_int) {
53     int *x = a, *y = b;
54     for (i = 0; i < *length; i++) {
55       y[i] = x[i] + y[i];
56     }
57   } else if (*datatype == smpi_mpi_global->mpi_float) {
58     float *x = a, *y = b;
59     for (i = 0; i < *length; i++) {
60       y[i] = x[i] + y[i];
61     }
62   } else if (*datatype == smpi_mpi_global->mpi_double) {
63     double *x = a, *y = b;
64     for (i = 0; i < *length; i++) {
65       y[i] = x[i] + y[i];
66     }
67   }
68 }
69 /**
70  *i multiply two vectors element-wise
71  *
72  * @param a the first vectors
73  * @param b the second vectors
74  * @return the second vector is modified and contains the element-wise products
75  **/
76 void smpi_mpi_prod_func(void *a, void *b, int *length,
77                        MPI_Datatype * datatype);
78
79 void smpi_mpi_prod_func(void *a, void *b, int *length, MPI_Datatype * datatype)
80 {
81   int i;
82   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
83     char *x = a, *y = b;
84     for (i = 0; i < *length; i++) {
85       y[i] = x[i] * y[i];
86     }
87   } else if (*datatype == smpi_mpi_global->mpi_int) {
88     int *x = a, *y = b;
89     for (i = 0; i < *length; i++) {
90       y[i] = x[i] * y[i];
91     }
92   } else if (*datatype == smpi_mpi_global->mpi_float) {
93     float *x = a, *y = b;
94     for (i = 0; i < *length; i++) {
95       y[i] = x[i] * y[i];
96     }
97   } else if (*datatype == smpi_mpi_global->mpi_double) {
98     double *x = a, *y = b;
99     for (i = 0; i < *length; i++) {
100       y[i] = x[i] * y[i];
101     }
102   }
103 }
104 /**
105  * compute the min of two vectors element-wise
106  **/
107 void smpi_mpi_min_func(void *a, void *b, int *length,
108                        MPI_Datatype * datatype);
109
110 void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype)
111 {
112   int i;
113   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
114     char *x = a, *y = b;
115     for (i = 0; i < *length; i++) {
116       y[i] = x[i] < y[i] ? x[i] : y[i];
117     }
118   } else {
119     if (*datatype == smpi_mpi_global->mpi_int) {
120       int *x = a, *y = b;
121       for (i = 0; i < *length; i++) {
122         y[i] = x[i] < y[i] ? x[i] : y[i];
123       }
124     } else {
125       if (*datatype == smpi_mpi_global->mpi_float) {
126         float *x = a, *y = b;
127         for (i = 0; i < *length; i++) {
128           y[i] = x[i] < y[i] ? x[i] : y[i];
129         }
130       } else {
131         if (*datatype == smpi_mpi_global->mpi_double) {
132           double *x = a, *y = b;
133           for (i = 0; i < *length; i++) {
134             y[i] = x[i] < y[i] ? x[i] : y[i];
135           }
136
137         }
138       }
139     }
140   }
141 }
142
143 /**
144  * compute the max of two vectors element-wise
145  **/
146 void smpi_mpi_max_func(void *a, void *b, int *length,
147                        MPI_Datatype * datatype);
148
149 void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype)
150 {
151   int i;
152   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
153     char *x = a, *y = b;
154     for (i = 0; i < *length; i++) {
155       y[i] = x[i] > y[i] ? x[i] : y[i];
156     }
157   } else if (*datatype == smpi_mpi_global->mpi_int) {
158     int *x = a, *y = b;
159     for (i = 0; i < *length; i++) {
160       y[i] = x[i] > y[i] ? x[i] : y[i];
161     }
162   } else if (*datatype == smpi_mpi_global->mpi_float) {
163     float *x = a, *y = b;
164     for (i = 0; i < *length; i++) {
165       y[i] = x[i] > y[i] ? x[i] : y[i];
166     }
167   } else if (*datatype == smpi_mpi_global->mpi_double) {
168     double *x = a, *y = b;
169     for (i = 0; i < *length; i++) {
170       y[i] = x[i] > y[i] ? x[i] : y[i];
171     }
172
173   }
174 }
175
176
177
178
179 /**
180  * tell the MPI rank of the calling process (from its SIMIX process id)
181  **/
182 int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm)
183 {
184   return comm->index_to_rank_map[smpi_process_index()];
185 }
186
187 void smpi_process_init(int *argc, char ***argv)
188 {
189   smpi_process_data_t pdata;
190
191   // initialize some local variables
192
193   pdata = xbt_new(s_smpi_process_data_t, 1);
194   SIMIX_process_set_data(SIMIX_process_self(), pdata);
195
196   /* get rank from command line, and remove it from argv */
197   pdata->index = atoi((*argv)[1]);
198   DEBUG1("I'm rank %d", pdata->index);
199   if (*argc > 2) {
200     memmove((*argv)[1], (*argv)[2], sizeof(char *) * (*argc - 2));
201     (*argv)[(*argc) - 1] = NULL;
202   }
203   (*argc)--;
204
205   pdata->mutex = SIMIX_mutex_init();
206   pdata->cond = SIMIX_cond_init();
207   pdata->finalize = 0;
208
209   pdata->pending_recv_request_queue = xbt_fifo_new();
210   pdata->pending_send_request_queue = xbt_fifo_new();
211   pdata->received_message_queue = xbt_fifo_new();
212
213   pdata->main = SIMIX_process_self();
214   pdata->sender = SIMIX_process_create("smpi_sender",
215                                        smpi_sender, pdata,
216                                        SIMIX_host_get_name(SIMIX_host_self()),
217                                        0, NULL,
218                                        /*props */ NULL);
219   pdata->receiver = SIMIX_process_create("smpi_receiver",
220                                          smpi_receiver, pdata,
221                                          SIMIX_host_get_name(SIMIX_host_self
222                                                              ()), 0, NULL,
223                                          /*props */ NULL);
224
225   smpi_global->main_processes[pdata->index] = SIMIX_process_self();
226   return;
227 }
228
229 void smpi_process_finalize()
230 {
231   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
232
233   pdata->finalize = 2;          /* Tell sender and receiver to quit */
234   SIMIX_process_resume(pdata->sender);
235   SIMIX_process_resume(pdata->receiver);
236   while (pdata->finalize > 0) { /* wait until it's done */
237     SIMIX_cond_wait(pdata->cond, pdata->mutex);
238   }
239
240   SIMIX_mutex_destroy(pdata->mutex);
241   SIMIX_cond_destroy(pdata->cond);
242   xbt_fifo_free(pdata->pending_recv_request_queue);
243   xbt_fifo_free(pdata->pending_send_request_queue);
244   xbt_fifo_free(pdata->received_message_queue);
245   xbt_free(pdata);
246 }
247
248
249 /*int smpi_mpi_barrier(smpi_mpi_communicator_t comm)
250 {
251
252   SIMIX_mutex_lock(comm->barrier_mutex);
253   ++comm->barrier_count;
254   if (comm->barrier_count > comm->size) {       // only happens on second barrier...
255     comm->barrier_count = 0;
256   } else if (comm->barrier_count == comm->size) {
257     SIMIX_cond_broadcast(comm->barrier_cond);
258   }
259   while (comm->barrier_count < comm->size) {
260     SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
261   }
262   SIMIX_mutex_unlock(comm->barrier_mutex);
263
264   return MPI_SUCCESS;
265 }
266 */
267
268 int smpi_mpi_isend(smpi_mpi_request_t request)
269 {
270   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
271   int retval = MPI_SUCCESS;
272
273   if (NULL == request) {
274     retval = MPI_ERR_INTERN;
275   } else {
276     xbt_fifo_push(pdata->pending_send_request_queue, request);
277     SIMIX_process_resume(pdata->sender);
278   }
279
280   return retval;
281 }
282
283 int smpi_mpi_irecv(smpi_mpi_request_t request)
284 {
285   int retval = MPI_SUCCESS;
286   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
287
288   if (NULL == request) {
289     retval = MPI_ERR_INTERN;
290   } else {
291     xbt_fifo_push(pdata->pending_recv_request_queue, request);
292
293     if (SIMIX_process_is_suspended(pdata->receiver)) {
294       SIMIX_process_resume(pdata->receiver);
295     }
296   }
297
298   return retval;
299 }
300
301 void  print_req( smpi_mpi_request_t r ); 
302 void  print_req( smpi_mpi_request_t r ) {
303         printf("***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed);
304 }
305
306
307 /**
308  * wait and friends ...
309  **/
310 int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
311 {
312   int retval = MPI_SUCCESS;
313
314   if (NULL == request) {
315     retval = MPI_ERR_INTERN;
316   } else {
317     SIMIX_mutex_lock(request->mutex);
318 //#define DEBUG_STEPH
319 #ifdef DEBUG_STEPH
320     print_req( request );  //@@
321 #endif
322     while (!request->completed) {
323       SIMIX_cond_wait(request->cond, request->mutex);
324     }
325     if (NULL != status) {
326       status->MPI_SOURCE = request->src;
327       status->MPI_TAG = request->tag;
328       status->MPI_ERROR = MPI_SUCCESS;
329     }
330     SIMIX_mutex_unlock(request->mutex);
331   }
332
333   return retval;
334 }
335
336 /**
337  * waitall
338  **/
339 int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[],
340                 smpi_mpi_status_t status[])
341 {
342         int cpt;
343         int index;
344         int retval;
345         smpi_mpi_status_t stat;
346
347         for (cpt = 0; cpt < count; cpt++) {
348                 retval = smpi_mpi_waitany(count, requests, &index, &stat);
349                 if (retval != MPI_SUCCESS)
350                         return retval;
351                 if (MPI_STATUS_IGNORE != status)
352                         memcpy(&(status[index]), &stat, sizeof(stat));
353         }
354         return MPI_SUCCESS;
355 }
356
357 /**
358  * waitany
359  **/
360 int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
361                      smpi_mpi_status_t * status)
362 {
363   int cpt;
364
365   *index = MPI_UNDEFINED;
366   if (NULL == requests) {
367     return MPI_ERR_INTERN;
368   }
369   /* First check if one of them is already done */
370   for (cpt = 0; cpt < count; cpt++) {
371 #ifdef DEBUG_STEPH
372           printf("...exam req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
373 #endif
374     if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */
375 #ifdef DEBUG_STEPH
376           printf("...found match req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
377 #endif
378       *index = cpt;
379       goto found_request;
380     }
381   }
382   /* If none found, block */
383   /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */
384   while (1) {
385     for (cpt = 0; cpt < count; cpt++) {
386
387 #ifdef DEBUG_STEPH
388       print_req( requests[cpt] );
389 #endif
390       if (!requests[cpt]->completed) {  /* this one is not done, wait on it */
391 #ifdef DEBUG_STEPH
392               printf("... blocked waiting a msg %d->%d, tag=%d\n",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag);
393 #endif
394         while (!requests[cpt]->completed)
395           SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex);
396
397         *index = cpt;
398         goto found_request;
399       }
400     }
401     if (cpt == count)           /* they are all done. Damn user */
402       return MPI_ERR_REQUEST;
403   }
404
405 found_request:
406 #ifdef DEBUG_STEPH
407       print_req( requests[cpt] );
408 #endif
409   requests[*index]->consumed = 1;
410 #ifdef DEBUG_STEPH
411       print_req( requests[cpt] );
412           printf("...accessing *req[%d]->consumed\n",cpt);
413 #endif
414   if (NULL != status) {
415     status->MPI_SOURCE = requests[*index]->src;
416     status->MPI_TAG = requests[*index]->tag;
417     status->MPI_ERROR = MPI_SUCCESS;
418   }
419   return MPI_SUCCESS;
420
421 }