Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fixed buggy implementation of MPI_Waitsome
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 void smpi_process_init(int* argc, char*** argv) {
17   int index;
18   smpi_process_data_t data;
19   smx_process_t proc;
20
21   proc = SIMIX_process_self();
22   index = atoi((*argv)[1]);
23   data = smpi_process_remote_data(index);
24   SIMIX_process_set_data(proc, data);
25   if (*argc > 2) {
26     free((*argv)[1]);
27     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
28     (*argv)[(*argc) - 1] = NULL;
29   }
30   (*argc)--;
31   DEBUG2("<%d> New process in the game: %p", index, proc);
32 }
33
34 void smpi_process_destroy(void) {
35   int index = smpi_process_index();
36
37   DEBUG1("<%d> Process left the game", index);
38 }
39
40 /* MPI Low level calls */
41 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
42   MPI_Request request;
43
44   request = xbt_new(s_smpi_mpi_request_t, 1);
45   request->comm = comm;
46   request->src = smpi_comm_rank(comm);
47   request->dst = dst;
48   request->tag = tag;
49   request->size = smpi_datatype_size(datatype) * count;
50   request->complete = 0;
51   smpi_process_post_send(comm, request);
52   request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, request);
53   return request;
54 }
55
56 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
57   MPI_Request request;
58
59   request = xbt_new(s_smpi_mpi_request_t, 1);
60   request->comm = comm;
61   request->src = src;
62   request->dst = smpi_comm_rank(comm);
63   request->tag = tag;
64   request->size = smpi_datatype_size(datatype) * count;
65   request->complete = 0;
66   smpi_process_post_recv(request);
67   request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
68   return request;
69 }
70
71 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
72   MPI_Request request;
73
74   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
75   smpi_mpi_wait(&request, status);
76 }
77
78 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
79   MPI_Request request;
80
81   request = smpi_mpi_isend(buf, count, datatype, src, tag, comm);
82   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
83 }
84
85 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
86   MPI_Request requests[2];
87   MPI_Status stats[2];
88
89   requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
90   requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
91   smpi_mpi_waitall(2, requests, stats);
92   if(status != MPI_STATUS_IGNORE) {
93     // Copy receive status
94     memcpy(status, &stats[1], sizeof(MPI_Status));
95   }
96 }
97
98 static void finish_wait(MPI_Request* request, MPI_Status* status) {
99   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
100
101   xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation");
102   if(status != MPI_STATUS_IGNORE) {
103     status->MPI_SOURCE = (*request)->src;
104     status->MPI_TAG = (*request)->tag;
105     status->MPI_ERROR = MPI_SUCCESS;
106   }
107   DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
108   // data == *request if sender is first to finish its wait
109   // data != *request if receiver is first to finish its wait
110   if(data->complete == 0) {
111     // first arrives here
112     data->complete = 1;
113     if(data != *request) {
114       // receveiver cleans its part
115       xbt_free(*request);
116     }
117   } else {
118     // second arrives here
119     if(data != *request) {
120       // receiver cleans everything
121       xbt_free(data);
122     }
123     xbt_free(*request);
124   }
125   *request = MPI_REQUEST_NULL;
126 }
127
128 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
129   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
130   int flag = data && data->complete == 1;
131
132   if(flag) {
133     SIMIX_communication_destroy((*request)->pair);
134     finish_wait(request, status);
135   }
136   return flag;
137 }
138
139 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
140   MPI_Request data;
141   int i, flag;
142
143   *index = MPI_UNDEFINED;
144   flag = 0;
145   for(i = 0; i < count; i++) {
146     if(requests[i] != MPI_REQUEST_NULL) {
147       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
148       if(data != MPI_REQUEST_NULL && data->complete == 1) {
149         SIMIX_communication_destroy(requests[i]->pair);
150         finish_wait(&requests[i], status);
151         *index = i;
152         flag = 1;
153         break;
154       }
155     }
156   }
157   return flag;
158 }
159
160 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
161   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
162
163   DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]",
164          *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag);
165   // data is null if receiver waits before sender enters the rdv
166   if(data == MPI_REQUEST_NULL || data->complete == 0) {
167     SIMIX_network_wait((*request)->pair, -1.0);
168   } else {
169     SIMIX_communication_destroy((*request)->pair);
170   }
171   finish_wait(request, status);
172 }
173
174 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
175   xbt_dynar_t comms;
176   MPI_Request data;
177   int i, size, index;
178   int* map;
179
180   index = MPI_UNDEFINED;
181   if(count > 0) {
182     // First check for already completed requests
183     for(i = 0; i < count; i++) {
184       if(requests[i] != MPI_REQUEST_NULL) {
185         data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
186         if(data != MPI_REQUEST_NULL && data->complete == 1) {
187           index = i;
188           SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
189           break;
190         }
191       }
192     }
193     if(index == MPI_UNDEFINED) {
194       // Otherwise, wait for a request to complete
195       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
196       map = xbt_new(int, count);
197       size = 0;
198       DEBUG0("Wait for one of");
199       for(i = 0; i < count; i++) {
200         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
201          DEBUG4("   request %p [src = %d, dst = %d, tag = %d]",
202                 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
203           xbt_dynar_push(comms, &requests[i]->pair);
204           map[size] = i;
205           size++;
206         }
207       }
208       if(size > 0) {
209         index = SIMIX_network_waitany(comms);
210         index = map[index];
211       }
212       xbt_free(map);
213       xbt_dynar_free_container(&comms);
214     }
215     if(index != MPI_UNDEFINED) {
216       finish_wait(&requests[index], status);
217     }
218   }
219   return index;
220 }
221
222 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
223   int index;
224   MPI_Status stat;
225
226   while(count > 0) {
227     index = smpi_mpi_waitany(count, requests, &stat);
228     if(index == MPI_UNDEFINED) {
229       break;
230     }
231     if(status != MPI_STATUS_IGNORE) {
232       memcpy(&status[index], &stat, sizeof(stat));
233     }
234     // Move the last request to the found position
235     requests[index] = requests[count - 1];
236     requests[count - 1] = MPI_REQUEST_NULL;
237     count--;
238   }
239 }
240
241 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
242   MPI_Request data;
243   int i, count;
244
245   count = 0;
246   for(i = 0; i < incount; i++) {
247     if(requests[i] != MPI_REQUEST_NULL) {
248       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
249       if(data != MPI_REQUEST_NULL && data->complete == 1) {
250         SIMIX_communication_destroy(requests[i]->pair);
251         finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
252         indices[count] = i;
253         count++;
254       }
255     }
256   }
257   return count;
258 }
259
260 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
261   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
262   nary_tree_bcast(buf, count, datatype, root, comm, 4);
263 }
264
265 void smpi_mpi_barrier(MPI_Comm comm) {
266   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
267   nary_tree_barrier(comm, 4);
268 }
269
270 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
271   int system_tag = 666;
272   int rank, size, src, index, sendsize, recvsize;
273   MPI_Request* requests;
274
275   rank = smpi_comm_rank(comm);
276   size = smpi_comm_size(comm);
277   if(rank != root) {
278     // Send buffer to root
279     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
280   } else {
281     sendsize = smpi_datatype_size(sendtype);
282     recvsize = smpi_datatype_size(recvtype);
283     // Local copy from root
284     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
285     // Receive buffers from senders
286     requests = xbt_new(MPI_Request, size - 1);
287     index = 0;
288     for(src = 0; src < size; src++) {
289       if(src != root) {
290         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
291         index++;
292       }
293     }
294     // Wait for completion of irecv's.
295     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
296     xbt_free(requests);
297   }
298 }
299
300 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
301   int system_tag = 666;
302   int rank, size, src, index, sendsize;
303   MPI_Request* requests;
304
305   rank = smpi_comm_rank(comm);
306   size = smpi_comm_size(comm);
307   if(rank != root) {
308     // Send buffer to root
309     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
310   } else {
311     sendsize = smpi_datatype_size(sendtype);
312     // Local copy from root
313     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
314     // Receive buffers from senders
315     requests = xbt_new(MPI_Request, size - 1);
316     index = 0;
317     for(src = 0; src < size; src++) {
318       if(src != root) {
319         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
320         index++;
321       }
322     }
323     // Wait for completion of irecv's.
324     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
325     xbt_free(requests);
326   }
327 }
328
329 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
330   int system_tag = 666;
331   int rank, size, other, index, sendsize, recvsize;
332   MPI_Request* requests;
333
334   rank = smpi_comm_rank(comm);
335   size = smpi_comm_size(comm);
336   sendsize = smpi_datatype_size(sendtype);
337   recvsize = smpi_datatype_size(recvtype);
338   // Local copy from self
339   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
340   // Send/Recv buffers to/from others;
341   requests = xbt_new(MPI_Request, 2 * (size - 1));
342   index = 0;
343   for(other = 0; other < size; other++) {
344     if(other != rank) {
345       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
346       index++;
347       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
348       index++;
349     }
350   }
351   // Wait for completion of all comms.
352   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
353   xbt_free(requests);
354 }
355
356 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
357   int system_tag = 666;
358   int rank, size, other, index, sendsize, recvsize;
359   MPI_Request* requests;
360
361   rank = smpi_comm_rank(comm);
362   size = smpi_comm_size(comm);
363   sendsize = smpi_datatype_size(sendtype);
364   recvsize = smpi_datatype_size(recvtype);
365   // Local copy from self
366   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
367   // Send buffers to others;
368   requests = xbt_new(MPI_Request, 2 * (size - 1));
369   index = 0;
370   for(other = 0; other < size; other++) {
371     if(other != rank) {
372       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
373       index++;
374       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
375       index++;
376     }
377   }
378   // Wait for completion of all comms.
379   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
380   xbt_free(requests);
381 }
382
383 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
384   int system_tag = 666;
385   int rank, size, dst, index, sendsize, recvsize;
386   MPI_Request* requests;
387
388   rank = smpi_comm_rank(comm);
389   size = smpi_comm_size(comm);
390   if(rank != root) {
391     // Recv buffer from root
392     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
393   } else {
394     sendsize = smpi_datatype_size(sendtype);
395     recvsize = smpi_datatype_size(recvtype);
396     // Local copy from root
397     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
398     // Send buffers to receivers
399     requests = xbt_new(MPI_Request, size - 1);
400     index = 0;
401     for(dst = 0; dst < size; dst++) {
402       if(dst != root) {
403         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
404         index++;
405       }
406     }
407     // Wait for completion of isend's.
408     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
409     xbt_free(requests);
410   }
411 }
412
413 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
414   int system_tag = 666;
415   int rank, size, dst, index, sendsize, recvsize;
416   MPI_Request* requests;
417
418   rank = smpi_comm_rank(comm);
419   size = smpi_comm_size(comm);
420   if(rank != root) {
421     // Recv buffer from root
422     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
423   } else {
424     sendsize = smpi_datatype_size(sendtype);
425     recvsize = smpi_datatype_size(recvtype);
426     // Local copy from root
427     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
428     // Send buffers to receivers
429     requests = xbt_new(MPI_Request, size - 1);
430     index = 0;
431     for(dst = 0; dst < size; dst++) {
432       if(dst != root) {
433         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
434         index++;
435       }
436     }
437     // Wait for completion of isend's.
438     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
439     xbt_free(requests);
440   }
441 }
442
443 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
444   int system_tag = 666;
445   int rank, size, src, index, datasize;
446   MPI_Request* requests;
447   void** tmpbufs;
448  
449   rank = smpi_comm_rank(comm);
450   size = smpi_comm_size(comm);
451   if(rank != root) {
452     // Send buffer to root
453     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
454   } else {
455     datasize = smpi_datatype_size(datatype);
456     // Local copy from root
457     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
458     // Receive buffers from senders
459     //TODO: make a MPI_barrier here ?
460     requests = xbt_new(MPI_Request, size - 1);
461     tmpbufs = xbt_new(void*, size - 1);
462     index = 0;
463     for(src = 0; src < size; src++) {
464       if(src != root) {
465         tmpbufs[index] = xbt_malloc(count * datasize);
466         requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
467         index++;
468       }
469     }
470     // Wait for completion of irecv's.
471     for(src = 0; src < size - 1; src++) {
472       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
473       if(index == MPI_UNDEFINED) {
474         break;
475       }
476       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
477     }
478     for(index = 0; index < size - 1; index++) {
479       xbt_free(tmpbufs[index]);
480     }
481     xbt_free(tmpbufs);
482     xbt_free(requests);
483   }
484 }
485
486 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
487   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
488   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
489
490 /*
491 FIXME: buggy implementation
492
493   int system_tag = 666;
494   int rank, size, other, index, datasize;
495   MPI_Request* requests;
496   void** tmpbufs;
497  
498   rank = smpi_comm_rank(comm);
499   size = smpi_comm_size(comm);
500   datasize = smpi_datatype_size(datatype);
501   // Local copy from self
502   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
503   // Send/Recv buffers to/from others;
504   //TODO: make a MPI_barrier here ?
505   requests = xbt_new(MPI_Request, 2 * (size - 1));
506   tmpbufs = xbt_new(void*, size - 1);
507   index = 0;
508   for(other = 0; other < size; other++) {
509     if(other != rank) {
510       tmpbufs[index / 2] = xbt_malloc(count * datasize);
511       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
512       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
513       index += 2;
514     }
515   }
516   // Wait for completion of all comms.
517   for(other = 0; other < 2 * (size - 1); other++) {
518     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
519     if(index == MPI_UNDEFINED) {
520       break;
521     }
522     if((index & 1) == 1) {
523       // Request is odd: it's a irecv
524       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
525     }
526   }
527   for(index = 0; index < size - 1; index++) {
528     xbt_free(tmpbufs[index]);
529   }
530   xbt_free(tmpbufs);
531   xbt_free(requests);
532 */
533 }