Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix stripping of first argument.
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 smpi_mpi_global_t smpi_mpi_global = NULL;
17
18
19 /**
20  * Operations of MPI_OP : implemented=land,sum,min,max
21  **/
22 void smpi_mpi_land_func(void *a, void *b, int *length,
23                         MPI_Datatype * datatype);
24
25 void smpi_mpi_land_func(void *a, void *b, int *length,
26                         MPI_Datatype * datatype)
27 {
28   int i;
29   if (*datatype == smpi_mpi_global->mpi_int) {
30     int *x = a, *y = b;
31     for (i = 0; i < *length; i++) {
32       y[i] = x[i] && y[i];
33     }
34   }
35 }
36
37 /**
38  * sum two vectors element-wise
39  *
40  * @param a the first vectors
41  * @param b the second vectors
42  * @return the second vector is modified and contains the element-wise sums
43  **/
44 void smpi_mpi_sum_func(void *a, void *b, int *length,
45                        MPI_Datatype * datatype);
46
47 void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
48 {
49   int i;
50   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
51     char *x = a, *y = b;
52     for (i = 0; i < *length; i++) {
53       y[i] = x[i] + y[i];
54     }
55   } else if (*datatype == smpi_mpi_global->mpi_int) {
56     int *x = a, *y = b;
57     for (i = 0; i < *length; i++) {
58       y[i] = x[i] + y[i];
59     }
60   } else if (*datatype == smpi_mpi_global->mpi_float) {
61     float *x = a, *y = b;
62     for (i = 0; i < *length; i++) {
63       y[i] = x[i] + y[i];
64     }
65   } else if (*datatype == smpi_mpi_global->mpi_double) {
66     double *x = a, *y = b;
67     for (i = 0; i < *length; i++) {
68       y[i] = x[i] + y[i];
69     }
70   }
71 }
72 /**
73  *i multiply two vectors element-wise
74  *
75  * @param a the first vectors
76  * @param b the second vectors
77  * @return the second vector is modified and contains the element-wise products
78  **/
79 void smpi_mpi_prod_func(void *a, void *b, int *length,
80                        MPI_Datatype * datatype);
81
82 void smpi_mpi_prod_func(void *a, void *b, int *length, MPI_Datatype * datatype)
83 {
84   int i;
85   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
86     char *x = a, *y = b;
87     for (i = 0; i < *length; i++) {
88       y[i] = x[i] * y[i];
89     }
90   } else if (*datatype == smpi_mpi_global->mpi_int) {
91     int *x = a, *y = b;
92     for (i = 0; i < *length; i++) {
93       y[i] = x[i] * y[i];
94     }
95   } else if (*datatype == smpi_mpi_global->mpi_float) {
96     float *x = a, *y = b;
97     for (i = 0; i < *length; i++) {
98       y[i] = x[i] * y[i];
99     }
100   } else if (*datatype == smpi_mpi_global->mpi_double) {
101     double *x = a, *y = b;
102     for (i = 0; i < *length; i++) {
103       y[i] = x[i] * y[i];
104     }
105   }
106 }
107 /**
108  * compute the min of two vectors element-wise
109  **/
110 void smpi_mpi_min_func(void *a, void *b, int *length,
111                        MPI_Datatype * datatype);
112
113 void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype)
114 {
115   int i;
116   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
117     char *x = a, *y = b;
118     for (i = 0; i < *length; i++) {
119       y[i] = x[i] < y[i] ? x[i] : y[i];
120     }
121   } else {
122     if (*datatype == smpi_mpi_global->mpi_int) {
123       int *x = a, *y = b;
124       for (i = 0; i < *length; i++) {
125         y[i] = x[i] < y[i] ? x[i] : y[i];
126       }
127     } else {
128       if (*datatype == smpi_mpi_global->mpi_float) {
129         float *x = a, *y = b;
130         for (i = 0; i < *length; i++) {
131           y[i] = x[i] < y[i] ? x[i] : y[i];
132         }
133       } else {
134         if (*datatype == smpi_mpi_global->mpi_double) {
135           double *x = a, *y = b;
136           for (i = 0; i < *length; i++) {
137             y[i] = x[i] < y[i] ? x[i] : y[i];
138           }
139
140         }
141       }
142     }
143   }
144 }
145
146 /**
147  * compute the max of two vectors element-wise
148  **/
149 void smpi_mpi_max_func(void *a, void *b, int *length,
150                        MPI_Datatype * datatype);
151
152 void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype)
153 {
154   int i;
155   if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
156     char *x = a, *y = b;
157     for (i = 0; i < *length; i++) {
158       y[i] = x[i] > y[i] ? x[i] : y[i];
159     }
160   } else if (*datatype == smpi_mpi_global->mpi_int) {
161     int *x = a, *y = b;
162     for (i = 0; i < *length; i++) {
163       y[i] = x[i] > y[i] ? x[i] : y[i];
164     }
165   } else if (*datatype == smpi_mpi_global->mpi_float) {
166     float *x = a, *y = b;
167     for (i = 0; i < *length; i++) {
168       y[i] = x[i] > y[i] ? x[i] : y[i];
169     }
170   } else if (*datatype == smpi_mpi_global->mpi_double) {
171     double *x = a, *y = b;
172     for (i = 0; i < *length; i++) {
173       y[i] = x[i] > y[i] ? x[i] : y[i];
174     }
175
176   }
177 }
178
179
180
181
182 /**
183  * tell the MPI rank of the calling process (from its SIMIX process id)
184  **/
185 int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm)
186 {
187   return comm->index_to_rank_map[smpi_process_index()];
188 }
189
190 void smpi_process_init(int *argc, char ***argv)
191 {
192   smpi_process_data_t pdata;
193
194   // initialize some local variables
195
196   pdata = xbt_new(s_smpi_process_data_t, 1);
197   SIMIX_process_set_data(SIMIX_process_self(), pdata);
198
199   /* get rank from command line, and remove it from argv */
200   pdata->index = atoi((*argv)[1]);
201   DEBUG1("I'm rank <%d>", pdata->index);
202   if (*argc > 2) {
203     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
204     (*argv)[(*argc) - 1] = NULL;
205   }
206   (*argc)--;
207
208   pdata->mutex = SIMIX_mutex_init();
209   pdata->cond = SIMIX_cond_init();
210   pdata->finalize = 0;
211
212   pdata->pending_recv_request_queue = xbt_fifo_new();
213   pdata->pending_send_request_queue = xbt_fifo_new();
214   pdata->received_message_queue = xbt_fifo_new();
215
216   pdata->main = SIMIX_process_self();
217   pdata->sender = SIMIX_process_create("smpi_sender",
218                                        smpi_sender, pdata,
219                                        SIMIX_host_get_name(SIMIX_host_self()),
220                                        0, NULL,
221                                        /*props */ NULL);
222   pdata->receiver = SIMIX_process_create("smpi_receiver",
223                                          smpi_receiver, pdata,
224                                          SIMIX_host_get_name(SIMIX_host_self
225                                                              ()), 0, NULL,
226                                          /*props */ NULL);
227
228   smpi_global->main_processes[pdata->index] = SIMIX_process_self();
229   return;
230 }
231
232 void smpi_process_finalize()
233 {
234   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
235
236   pdata->finalize = 2;          /* Tell sender and receiver to quit */
237   SIMIX_process_resume(pdata->sender);
238   SIMIX_process_resume(pdata->receiver);
239   while (pdata->finalize > 0) { /* wait until it's done */
240     SIMIX_cond_wait(pdata->cond, pdata->mutex);
241   }
242
243   SIMIX_mutex_destroy(pdata->mutex);
244   SIMIX_cond_destroy(pdata->cond);
245   xbt_fifo_free(pdata->pending_recv_request_queue);
246   xbt_fifo_free(pdata->pending_send_request_queue);
247   xbt_fifo_free(pdata->received_message_queue);
248   xbt_free(pdata);
249 }
250
251
252 /*int smpi_mpi_barrier(smpi_mpi_communicator_t comm)
253 {
254
255   SIMIX_mutex_lock(comm->barrier_mutex);
256   ++comm->barrier_count;
257   if (comm->barrier_count > comm->size) {       // only happens on second barrier...
258     comm->barrier_count = 0;
259   } else if (comm->barrier_count == comm->size) {
260     SIMIX_cond_broadcast(comm->barrier_cond);
261   }
262   while (comm->barrier_count < comm->size) {
263     SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
264   }
265   SIMIX_mutex_unlock(comm->barrier_mutex);
266
267   return MPI_SUCCESS;
268 }
269 */
270
271 int smpi_mpi_isend(smpi_mpi_request_t request)
272 {
273   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
274   int retval = MPI_SUCCESS;
275
276   if (NULL == request) {
277     retval = MPI_ERR_INTERN;
278   } else {
279     xbt_fifo_push(pdata->pending_send_request_queue, request);
280     SIMIX_process_resume(pdata->sender);
281   }
282
283   return retval;
284 }
285
286 int smpi_mpi_irecv(smpi_mpi_request_t request)
287 {
288   int retval = MPI_SUCCESS;
289   smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self());
290
291   if (NULL == request) {
292     retval = MPI_ERR_INTERN;
293   } else {
294     xbt_fifo_push(pdata->pending_recv_request_queue, request);
295
296     if (SIMIX_process_is_suspended(pdata->receiver)) {
297       SIMIX_process_resume(pdata->receiver);
298     }
299   }
300
301   return retval;
302 }
303
304 void  print_req( smpi_mpi_request_t r ); 
305 void  print_req( smpi_mpi_request_t r ) {
306         fprintf(stderr,"***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed);
307 }
308
309
310 /**
311  * wait and friends ...
312  **/
313 int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
314 {
315   int retval = MPI_SUCCESS;
316
317   if (NULL == request) {
318     retval = MPI_ERR_INTERN;
319   } else {
320
321   DEBUG3("entered smpi_mpi_wait() for req_src=%d,req_dst=%d,req_tag=%d",
322                   request->src,request->dst,request->tag);
323     SIMIX_mutex_lock(request->mutex);
324 //#define DEBUG_STEPH
325 #ifdef DEBUG_STEPH
326     print_req( request );  //@@
327 #endif
328     while (!request->completed) {
329       SIMIX_cond_wait(request->cond, request->mutex);
330     }
331     if (NULL != status) {
332       status->MPI_SOURCE = request->src;
333       status->MPI_TAG = request->tag;
334       status->MPI_ERROR = MPI_SUCCESS;
335     }
336     SIMIX_mutex_unlock(request->mutex);
337   }
338
339   return retval;
340 }
341
342 /**
343  * waitall
344  **/
345 int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[],
346                 smpi_mpi_status_t status[])
347 {
348         int cpt;
349         int index;
350         int retval;
351         smpi_mpi_status_t stat;
352
353         for (cpt = 0; cpt < count; cpt++) {
354                 retval = smpi_mpi_waitany(count, requests, &index, &stat);
355                 if (retval != MPI_SUCCESS)
356                         return retval;
357                 if (MPI_STATUS_IGNORE != status)
358                         memcpy(&(status[index]), &stat, sizeof(stat));
359         }
360         return MPI_SUCCESS;
361 }
362
363 /**
364  * waitany
365  **/
366 int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
367                      smpi_mpi_status_t * status)
368 {
369   int cpt;
370
371   DEBUG0("entering smpi_wait_any() ...");
372   *index = MPI_UNDEFINED;
373   if (NULL == requests) {
374     return MPI_ERR_INTERN;
375   }
376   /* First check if one of them is already done */
377   for (cpt = 0; cpt < count; cpt++) {
378           DEBUG2(" exam req[%d] of msg from <%d>",cpt,requests[cpt]->src);
379     if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */
380           DEBUG2("smpi_wait_any() found match req[%d] of msg from <%d>",cpt,requests[cpt]->src);
381       *index = cpt;
382       goto found_request;
383     }
384   }
385   /* If none found, block */
386   /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */
387   while (1) {
388     for (cpt = 0; cpt < count; cpt++) {
389
390 #ifdef DEBUG_STEPH
391       print_req( requests[cpt] );
392 #endif
393       if (!requests[cpt]->completed) {  /* this one is not done, wait on it */
394               DEBUG3("smpi_waitany() blocked waiting a msg <%d> -> <%d>, tag=%d",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag);
395         while (!requests[cpt]->completed)
396           SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex);
397
398         *index = cpt;
399         goto found_request;
400       }
401     }
402     if (cpt == count)           /* they are all done. Damn user */
403       return MPI_ERR_REQUEST;
404   }
405
406 found_request:
407 #ifdef DEBUG_STEPH
408   print_req( requests[cpt] );
409 #endif
410   requests[*index]->consumed = 1;
411 #ifdef DEBUG_STEPH
412   print_req( requests[cpt] );
413 #endif
414   DEBUG2("smpi_waitany() request %p unblocked ... mark *req[%d]->consumed",requests[*index],cpt);
415   if (NULL != status) {
416           status->MPI_SOURCE = requests[*index]->src;
417           status->MPI_TAG = requests[*index]->tag;
418           status->MPI_ERROR = MPI_SUCCESS;
419   }
420   return MPI_SUCCESS;
421
422 }