Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e44173929e7414f76c93081a44e9b9246a97e11a
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
13
14 smpi_mpi_global_t smpi_mpi_global = NULL;
15
16 /**
17  * Operations of MPI_OP : implemented=land,sum,min,max
18  **/
19 void smpi_mpi_land_func(void *a, void *b, int *length,
20                         MPI_Datatype * datatype);
21
22 void smpi_mpi_land_func(void *a, void *b, int *length,
23                         MPI_Datatype * datatype)
24 {
25   int i;
26   if (*datatype == smpi_mpi_global->mpi_int) {
27     int *x = a, *y = b;
28     for (i = 0; i < *length; i++) {
29       y[i] = x[i] && y[i];
30     }
31   }
32 }
33
34 /**
35  * sum two vectors element-wise
36  *
37  * @param a the first vectors
38  * @param b the second vectors
39  * @return the second vector is modified and contains the element-wise sums
40  **/
41 void smpi_mpi_sum_func(void *a, void *b, int *length,
42                        MPI_Datatype * datatype);
43
44 void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
45 {
46   int i;
47   if (*datatype == smpi_mpi_global->mpi_byte) {
48     char *x = a, *y = b;
49     for (i = 0; i < *length; i++) {
50       y[i] = x[i] + y[i];
51     }
52   } else if (*datatype == smpi_mpi_global->mpi_int) {
53     int *x = a, *y = b;
54     for (i = 0; i < *length; i++) {
55       y[i] = x[i] + y[i];
56     }
57   } else if (*datatype == smpi_mpi_global->mpi_float) {
58     float *x = a, *y = b;
59     for (i = 0; i < *length; i++) {
60       y[i] = x[i] + y[i];
61     }
62   } else if (*datatype == smpi_mpi_global->mpi_double) {
63     double *x = a, *y = b;
64     for (i = 0; i < *length; i++) {
65       y[i] = x[i] + y[i];
66     }
67   }
68 }
69
70 /**
71  * compute the min of two vectors element-wise
72  **/
73 void smpi_mpi_min_func(void *a, void *b, int *length,
74                        MPI_Datatype * datatype);
75
76 void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype)
77 {
78   int i;
79   if (*datatype == smpi_mpi_global->mpi_byte) {
80     char *x = a, *y = b;
81     for (i = 0; i < *length; i++) {
82       y[i] = x[i] < y[i] ? x[i] : y[i];
83     }
84   } else {
85     if (*datatype == smpi_mpi_global->mpi_int) {
86       int *x = a, *y = b;
87       for (i = 0; i < *length; i++) {
88         y[i] = x[i] < y[i] ? x[i] : y[i];
89       }
90     } else {
91       if (*datatype == smpi_mpi_global->mpi_float) {
92         float *x = a, *y = b;
93         for (i = 0; i < *length; i++) {
94           y[i] = x[i] < y[i] ? x[i] : y[i];
95         }
96       } else {
97         if (*datatype == smpi_mpi_global->mpi_double) {
98           double *x = a, *y = b;
99           for (i = 0; i < *length; i++) {
100             y[i] = x[i] < y[i] ? x[i] : y[i];
101           }
102
103         }
104       }
105     }
106   }
107 }
108
109 /**
110  * compute the max of two vectors element-wise
111  **/
112 void smpi_mpi_max_func(void *a, void *b, int *length,
113                        MPI_Datatype * datatype);
114
115 void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype)
116 {
117   int i;
118   if (*datatype == smpi_mpi_global->mpi_byte) {
119     char *x = a, *y = b;
120     for (i = 0; i < *length; i++) {
121       y[i] = x[i] > y[i] ? x[i] : y[i];
122     }
123   } else if (*datatype == smpi_mpi_global->mpi_int) {
124     int *x = a, *y = b;
125     for (i = 0; i < *length; i++) {
126       y[i] = x[i] > y[i] ? x[i] : y[i];
127     }
128   } else if (*datatype == smpi_mpi_global->mpi_float) {
129     float *x = a, *y = b;
130     for (i = 0; i < *length; i++) {
131       y[i] = x[i] > y[i] ? x[i] : y[i];
132     }
133   } else if (*datatype == smpi_mpi_global->mpi_double) {
134     double *x = a, *y = b;
135     for (i = 0; i < *length; i++) {
136       y[i] = x[i] > y[i] ? x[i] : y[i];
137     }
138
139   }
140 }
141
142
143
144
145 /**
146  * tell the MPI rank of the calling process (from its SIMIX process id)
147  **/
148 int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm)
149 {
150   return comm->index_to_rank_map[smpi_process_index()];
151 }
152
153 void smpi_process_init(int *argc, char ***argv)
154 {
155   smpi_process_data_t pdata;
156
157   // initialize some local variables
158
159   pdata = xbt_new(s_smpi_process_data_t, 1);
160   SIMIX_process_set_data(SIMIX_process_self(), pdata);
161
162   /* get rank from command line, and remove it from argv */
163   pdata->index = atoi((*argv)[1]);
164   DEBUG1("I'm rank %d", pdata->index);
165   if (*argc > 2) {
166     memmove((*argv)[1], (*argv)[2], sizeof(char *) * (*argc - 2));
167     (*argv)[(*argc) - 1] = NULL;
168   }
169   (*argc)--;
170
171   pdata->mutex = SIMIX_mutex_init();
172   pdata->cond = SIMIX_cond_init();
173   pdata->finalize = 0;
174
175   pdata->pending_recv_request_queue = xbt_fifo_new();
176   pdata->pending_send_request_queue = xbt_fifo_new();
177   pdata->received_message_queue = xbt_fifo_new();
178
179   pdata->main = SIMIX_process_self();
180   pdata->sender = SIMIX_process_create("smpi_sender",
181                                        smpi_sender, pdata,
182                                        SIMIX_host_get_name(SIMIX_host_self()),
183                                        0, NULL,
184                                        /*props */ NULL);
185   pdata->receiver = SIMIX_process_create("smpi_receiver",
186                                          smpi_receiver, pdata,
187                                          SIMIX_host_get_name(SIMIX_host_self
188                                                              ()), 0, NULL,
189                                          /*props */ NULL);
190
191   smpi_global->main_processes[pdata->index] = SIMIX_process_self();
192   return;
193 }
194
195 void smpi_process_finalize()
196 {
197   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
198
199   pdata->finalize = 2;          /* Tell sender and receiver to quit */
200   SIMIX_process_resume(pdata->sender);
201   SIMIX_process_resume(pdata->receiver);
202   while (pdata->finalize > 0) { /* wait until it's done */
203     SIMIX_cond_wait(pdata->cond, pdata->mutex);
204   }
205
206   SIMIX_mutex_destroy(pdata->mutex);
207   SIMIX_cond_destroy(pdata->cond);
208   xbt_fifo_free(pdata->pending_recv_request_queue);
209   xbt_fifo_free(pdata->pending_send_request_queue);
210   xbt_fifo_free(pdata->received_message_queue);
211   xbt_free(pdata);
212 }
213
214
215 /*int smpi_mpi_barrier(smpi_mpi_communicator_t comm)
216 {
217
218   SIMIX_mutex_lock(comm->barrier_mutex);
219   ++comm->barrier_count;
220   if (comm->barrier_count > comm->size) {       // only happens on second barrier...
221     comm->barrier_count = 0;
222   } else if (comm->barrier_count == comm->size) {
223     SIMIX_cond_broadcast(comm->barrier_cond);
224   }
225   while (comm->barrier_count < comm->size) {
226     SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
227   }
228   SIMIX_mutex_unlock(comm->barrier_mutex);
229
230   return MPI_SUCCESS;
231 }
232 */
233
234 int smpi_mpi_isend(smpi_mpi_request_t request)
235 {
236   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
237   int retval = MPI_SUCCESS;
238
239   if (NULL == request) {
240     retval = MPI_ERR_INTERN;
241   } else {
242     xbt_fifo_push(pdata->pending_send_request_queue, request);
243     SIMIX_process_resume(pdata->sender);
244   }
245
246   return retval;
247 }
248
249 int smpi_mpi_irecv(smpi_mpi_request_t request)
250 {
251   int retval = MPI_SUCCESS;
252   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
253
254   if (NULL == request) {
255     retval = MPI_ERR_INTERN;
256   } else {
257     xbt_fifo_push(pdata->pending_recv_request_queue, request);
258
259     if (SIMIX_process_is_suspended(pdata->receiver)) {
260       SIMIX_process_resume(pdata->receiver);
261     }
262   }
263
264   return retval;
265 }
266
267 void  print_req( smpi_mpi_request_t r ); 
268 void  print_req( smpi_mpi_request_t r ) {
269         printf("***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed);
270 }
271
272
273 /**
274  * wait and friends ...
275  **/
276 int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
277 {
278   int retval = MPI_SUCCESS;
279
280   if (NULL == request) {
281     retval = MPI_ERR_INTERN;
282   } else {
283     SIMIX_mutex_lock(request->mutex);
284
285 #ifdef DEBUG_STEPH
286     print_req( request );  //@@
287 #endif
288     while (!request->completed) {
289       SIMIX_cond_wait(request->cond, request->mutex);
290     }
291     if (NULL != status) {
292       status->MPI_SOURCE = request->src;
293       status->MPI_TAG = request->tag;
294       status->MPI_ERROR = MPI_SUCCESS;
295     }
296     SIMIX_mutex_unlock(request->mutex);
297   }
298
299   return retval;
300 }
301
302 /**
303  * waitall
304  **/
305 int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[],
306                 smpi_mpi_status_t status[])
307 {
308         int cpt;
309         int index;
310         int retval;
311         smpi_mpi_status_t stat;
312
313         for (cpt = 0; cpt < count; cpt++) {
314                 retval = smpi_mpi_waitany(count, requests, &index, &stat);
315                 if (retval != MPI_SUCCESS)
316                         return retval;
317                 if (MPI_STATUS_IGNORE != status)
318                         memcpy(&(status[index]), &stat, sizeof(stat));
319         }
320         return MPI_SUCCESS;
321 }
322
323 /**
324  * waitany
325  **/
326 int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
327                      smpi_mpi_status_t * status)
328 {
329   int cpt;
330
331   *index = MPI_UNDEFINED;
332   if (NULL == requests) {
333     return MPI_ERR_INTERN;
334   }
335   /* First check if one of them is already done */
336   for (cpt = 0; cpt < count; cpt++) {
337 #ifdef DEBUG_STEPH
338           printf("...exam req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
339 #endif
340     if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */
341 #ifdef DEBUG_STEPH
342           printf("...found match req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
343 #endif
344       *index = cpt;
345       goto found_request;
346     }
347   }
348   /* If none found, block */
349   /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */
350   while (1) {
351     for (cpt = 0; cpt < count; cpt++) {
352
353 #ifdef DEBUG_STEPH
354       print_req( requests[cpt] );
355 #endif
356       if (!requests[cpt]->completed) {  /* this one is not done, wait on it */
357 #ifdef DEBUG_STEPH
358               printf("... blocked waiting a msg %d->%d, tag=%d\n",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag);
359 #endif
360         while (!requests[cpt]->completed)
361           SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex);
362
363         *index = cpt;
364         goto found_request;
365       }
366     }
367     if (cpt == count)           /* they are all done. Damn user */
368       return MPI_ERR_REQUEST;
369   }
370
371 found_request:
372 #ifdef DEBUG_STEPH
373       print_req( requests[cpt] );
374 #endif
375   requests[*index]->consumed = 1;
376 #ifdef DEBUG_STEPH
377       print_req( requests[cpt] );
378           printf("...accessing *req[%d]->consumed\n",cpt);
379 #endif
380   if (NULL != status) {
381     status->MPI_SOURCE = requests[*index]->src;
382     status->MPI_TAG = requests[*index]->tag;
383     status->MPI_ERROR = MPI_SUCCESS;
384   }
385   return MPI_SUCCESS;
386
387 }