Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
b442f0dbfd01e348a5aadda5adf7bfef386e2156
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 void smpi_process_init(int* argc, char*** argv) {
17   int index;
18   smpi_process_data_t data;
19   smx_process_t proc;
20
21   proc = SIMIX_process_self();
22   index = atoi((*argv)[1]);
23   data = smpi_process_remote_data(index);
24   SIMIX_process_set_data(proc, data);
25   if (*argc > 2) {
26     free((*argv)[1]);
27     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
28     (*argv)[(*argc) - 1] = NULL;
29   }
30   (*argc)--;
31   DEBUG2("<%d> New process in the game: %p", index, proc);
32 }
33
34 void smpi_process_destroy(void) {
35   int index = smpi_process_index();
36
37   DEBUG1("<%d> Process left the game", index);
38 }
39
40 /* MPI Low level calls */
41 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
42   MPI_Request request;
43
44   request = xbt_new(s_smpi_mpi_request_t, 1);
45   request->comm = comm;
46   request->src = smpi_comm_rank(comm);
47   request->dst = dst;
48   request->tag = tag;
49   request->size = smpi_datatype_size(datatype) * count;
50   request->complete = 0;
51   request->data = request;
52   smpi_process_post_send(comm, request);
53   request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, NULL);
54   return request;
55 }
56
57 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
58   MPI_Request request;
59
60   request = xbt_new(s_smpi_mpi_request_t, 1);
61   request->comm = comm;
62   request->src = src;
63   request->dst = smpi_comm_rank(comm);
64   request->tag = tag;
65   request->size = smpi_datatype_size(datatype) * count;
66   request->complete = 0;
67   request->data = MPI_REQUEST_NULL;
68   smpi_process_post_recv(request);
69   request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
70   return request;
71 }
72
73 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
74   MPI_Request request;
75
76   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
77   smpi_mpi_wait(&request, status);
78 }
79
80 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
81   MPI_Request request;
82
83   request = smpi_mpi_isend(buf, count, datatype, src, tag, comm);
84   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
85 }
86
87 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
88   MPI_Request requests[2];
89   MPI_Status stats[2];
90
91   requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
92   requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
93   smpi_mpi_waitall(2, requests, stats);
94   if(status != MPI_STATUS_IGNORE) {
95     // Copy receive status
96     memcpy(status, &stats[1], sizeof(MPI_Status));
97   }
98 }
99
100 static void finish_wait(MPI_Request* request, MPI_Status* status) {
101   MPI_Request data = (*request)->data;
102
103   xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation");
104   if(status != MPI_STATUS_IGNORE) {
105     status->MPI_SOURCE = (*request)->src;
106     status->MPI_TAG = (*request)->tag;
107     status->MPI_ERROR = MPI_SUCCESS;
108   }
109   DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
110   // data == *request if sender is first to finish its wait
111   // data != *request if receiver is first to finish its wait
112   if(data->complete == 0) {
113     // first arrives here
114     data->complete = 1;
115     if(data != *request) {
116       // receveiver cleans its part
117       xbt_free(*request);
118     }
119   } else {
120     // second arrives here
121     if(data != *request) {
122       // receiver cleans everything
123       xbt_free(data);
124     }
125     xbt_free(*request);
126   }
127   *request = MPI_REQUEST_NULL;
128 }
129
130 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
131   MPI_Request data = (*request)->data;
132   int flag = data && data->complete == 1;
133
134   if(flag) {
135     SIMIX_communication_destroy((*request)->pair);
136     finish_wait(request, status);
137   }
138   return flag;
139 }
140
141 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
142   MPI_Request data;
143   int i, flag;
144
145   *index = MPI_UNDEFINED;
146   flag = 0;
147   for(i = 0; i < count; i++) {
148     if(requests[i] != MPI_REQUEST_NULL) {
149       data = requests[i]->data;
150       if(data != MPI_REQUEST_NULL && data->complete == 1) {
151         SIMIX_communication_destroy(requests[i]->pair);
152         finish_wait(&requests[i], status);
153         *index = i;
154         flag = 1;
155         break;
156       }
157     }
158   }
159   return flag;
160 }
161
162 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
163   MPI_Request data = (*request)->data;
164
165   DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]",
166          *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag);
167   // data is null if receiver waits before sender enters the rdv
168   if(data == MPI_REQUEST_NULL || data->complete == 0) {
169     SIMIX_network_wait((*request)->pair, -1.0);
170   } else {
171     SIMIX_communication_destroy((*request)->pair);
172   }
173   finish_wait(request, status);
174 }
175
176 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
177   xbt_dynar_t comms;
178   MPI_Request data;
179   int i, size, index;
180   int* map;
181
182   index = MPI_UNDEFINED;
183   if(count > 0) {
184     // First check for already completed requests
185     for(i = 0; i < count; i++) {
186       if(requests[i] != MPI_REQUEST_NULL) {
187         data = requests[i]->data;
188         if(data != MPI_REQUEST_NULL && data->complete == 1) {
189           index = i;
190           SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
191           break;
192         }
193       }
194     }
195     if(index == MPI_UNDEFINED) {
196       // Otherwise, wait for a request to complete
197       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
198       map = xbt_new(int, count);
199       size = 0;
200       DEBUG0("Wait for one of");
201       for(i = 0; i < count; i++) {
202         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
203          DEBUG4("   request %p [src = %d, dst = %d, tag = %d]",
204                 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
205           xbt_dynar_push(comms, &requests[i]->pair);
206           map[size] = i;
207           size++;
208         }
209       }
210       if(size > 0) {
211         index = SIMIX_network_waitany(comms);
212         index = map[index];
213       }
214       xbt_free(map);
215       xbt_dynar_free_container(&comms);
216     }
217     if(index != MPI_UNDEFINED) {
218       finish_wait(&requests[index], status);
219     }
220   }
221   return index;
222 }
223
224 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
225   int index;
226   MPI_Status stat;
227
228   while(count > 0) {
229     index = smpi_mpi_waitany(count, requests, &stat);
230     if(index == MPI_UNDEFINED) {
231       break;
232     }
233     if(status != MPI_STATUS_IGNORE) {
234       memcpy(&status[index], &stat, sizeof(stat));
235     }
236     // Move the last request to the found position
237     requests[index] = requests[count - 1];
238     requests[count - 1] = MPI_REQUEST_NULL;
239     count--;
240   }
241 }
242
243 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
244   MPI_Request data;
245   int i, count;
246
247   count = 0;
248   for(i = 0; i < incount; i++) {
249     if(requests[i] != MPI_REQUEST_NULL) {
250       data = requests[i]->data;
251       if(data != MPI_REQUEST_NULL && data->complete == 1) {
252         SIMIX_communication_destroy(requests[i]->pair);
253         finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
254         indices[count] = i;
255         count++;
256       }
257     }
258   }
259   return count;
260 }
261
262 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
263   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
264   nary_tree_bcast(buf, count, datatype, root, comm, 4);
265 }
266
267 void smpi_mpi_barrier(MPI_Comm comm) {
268   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
269   nary_tree_barrier(comm, 4);
270 }
271
272 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
273   int system_tag = 666;
274   int rank, size, src, index, sendsize, recvsize;
275   MPI_Request* requests;
276
277   rank = smpi_comm_rank(comm);
278   size = smpi_comm_size(comm);
279   if(rank != root) {
280     // Send buffer to root
281     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
282   } else {
283     sendsize = smpi_datatype_size(sendtype);
284     recvsize = smpi_datatype_size(recvtype);
285     // Local copy from root
286     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
287     // Receive buffers from senders
288     requests = xbt_new(MPI_Request, size - 1);
289     index = 0;
290     for(src = 0; src < size; src++) {
291       if(src != root) {
292         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
293         index++;
294       }
295     }
296     // Wait for completion of irecv's.
297     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
298     xbt_free(requests);
299   }
300 }
301
302 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
303   int system_tag = 666;
304   int rank, size, src, index, sendsize;
305   MPI_Request* requests;
306
307   rank = smpi_comm_rank(comm);
308   size = smpi_comm_size(comm);
309   if(rank != root) {
310     // Send buffer to root
311     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
312   } else {
313     sendsize = smpi_datatype_size(sendtype);
314     // Local copy from root
315     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
316     // Receive buffers from senders
317     requests = xbt_new(MPI_Request, size - 1);
318     index = 0;
319     for(src = 0; src < size; src++) {
320       if(src != root) {
321         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
322         index++;
323       }
324     }
325     // Wait for completion of irecv's.
326     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
327     xbt_free(requests);
328   }
329 }
330
331 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
332   int system_tag = 666;
333   int rank, size, other, index, sendsize, recvsize;
334   MPI_Request* requests;
335
336   rank = smpi_comm_rank(comm);
337   size = smpi_comm_size(comm);
338   sendsize = smpi_datatype_size(sendtype);
339   recvsize = smpi_datatype_size(recvtype);
340   // Local copy from self
341   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
342   // Send/Recv buffers to/from others;
343   requests = xbt_new(MPI_Request, 2 * (size - 1));
344   index = 0;
345   for(other = 0; other < size; other++) {
346     if(other != rank) {
347       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
348       index++;
349       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
350       index++;
351     }
352   }
353   // Wait for completion of all comms.
354   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
355   xbt_free(requests);
356 }
357
358 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
359   int system_tag = 666;
360   int rank, size, other, index, sendsize, recvsize;
361   MPI_Request* requests;
362
363   rank = smpi_comm_rank(comm);
364   size = smpi_comm_size(comm);
365   sendsize = smpi_datatype_size(sendtype);
366   recvsize = smpi_datatype_size(recvtype);
367   // Local copy from self
368   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
369   // Send buffers to others;
370   requests = xbt_new(MPI_Request, 2 * (size - 1));
371   index = 0;
372   for(other = 0; other < size; other++) {
373     if(other != rank) {
374       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
375       index++;
376       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
377       index++;
378     }
379   }
380   // Wait for completion of all comms.
381   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
382   xbt_free(requests);
383 }
384
385 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
386   int system_tag = 666;
387   int rank, size, dst, index, sendsize, recvsize;
388   MPI_Request* requests;
389
390   rank = smpi_comm_rank(comm);
391   size = smpi_comm_size(comm);
392   if(rank != root) {
393     // Recv buffer from root
394     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
395   } else {
396     sendsize = smpi_datatype_size(sendtype);
397     recvsize = smpi_datatype_size(recvtype);
398     // Local copy from root
399     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
400     // Send buffers to receivers
401     requests = xbt_new(MPI_Request, size - 1);
402     index = 0;
403     for(dst = 0; dst < size; dst++) {
404       if(dst != root) {
405         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
406         index++;
407       }
408     }
409     // Wait for completion of isend's.
410     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
411     xbt_free(requests);
412   }
413 }
414
415 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
416   int system_tag = 666;
417   int rank, size, dst, index, sendsize, recvsize;
418   MPI_Request* requests;
419
420   rank = smpi_comm_rank(comm);
421   size = smpi_comm_size(comm);
422   if(rank != root) {
423     // Recv buffer from root
424     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
425   } else {
426     sendsize = smpi_datatype_size(sendtype);
427     recvsize = smpi_datatype_size(recvtype);
428     // Local copy from root
429     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
430     // Send buffers to receivers
431     requests = xbt_new(MPI_Request, size - 1);
432     index = 0;
433     for(dst = 0; dst < size; dst++) {
434       if(dst != root) {
435         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
436         index++;
437       }
438     }
439     // Wait for completion of isend's.
440     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
441     xbt_free(requests);
442   }
443 }
444
445 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
446   int system_tag = 666;
447   int rank, size, src, index, datasize;
448   MPI_Request* requests;
449   void** tmpbufs;
450  
451   rank = smpi_comm_rank(comm);
452   size = smpi_comm_size(comm);
453   if(rank != root) {
454     // Send buffer to root
455     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
456   } else {
457     datasize = smpi_datatype_size(datatype);
458     // Local copy from root
459     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
460     // Receive buffers from senders
461     //TODO: make a MPI_barrier here ?
462     requests = xbt_new(MPI_Request, size - 1);
463     tmpbufs = xbt_new(void*, size - 1);
464     index = 0;
465     for(src = 0; src < size; src++) {
466       if(src != root) {
467         tmpbufs[index] = xbt_malloc(count * datasize);
468         requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
469         index++;
470       }
471     }
472     // Wait for completion of irecv's.
473     for(src = 0; src < size - 1; src++) {
474       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
475       if(index == MPI_UNDEFINED) {
476         break;
477       }
478       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
479     }
480     for(index = 0; index < size - 1; index++) {
481       xbt_free(tmpbufs[index]);
482     }
483     xbt_free(tmpbufs);
484     xbt_free(requests);
485   }
486 }
487
488 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
489   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
490   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
491
492 /*
493 FIXME: buggy implementation
494
495   int system_tag = 666;
496   int rank, size, other, index, datasize;
497   MPI_Request* requests;
498   void** tmpbufs;
499  
500   rank = smpi_comm_rank(comm);
501   size = smpi_comm_size(comm);
502   datasize = smpi_datatype_size(datatype);
503   // Local copy from self
504   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
505   // Send/Recv buffers to/from others;
506   //TODO: make a MPI_barrier here ?
507   requests = xbt_new(MPI_Request, 2 * (size - 1));
508   tmpbufs = xbt_new(void*, size - 1);
509   index = 0;
510   for(other = 0; other < size; other++) {
511     if(other != rank) {
512       tmpbufs[index / 2] = xbt_malloc(count * datasize);
513       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
514       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
515       index += 2;
516     }
517   }
518   // Wait for completion of all comms.
519   for(other = 0; other < 2 * (size - 1); other++) {
520     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
521     if(index == MPI_UNDEFINED) {
522       break;
523     }
524     if((index & 1) == 1) {
525       // Request is odd: it's a irecv
526       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
527     }
528   }
529   for(index = 0; index < size - 1; index++) {
530     xbt_free(tmpbufs[index]);
531   }
532   xbt_free(tmpbufs);
533   xbt_free(requests);
534 */
535 }