Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Plug the memleak of smx_rdv_t
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 void smpi_process_init(int* argc, char*** argv) {
17   int index;
18   smpi_process_data_t data;
19   smx_process_t proc;
20
21   proc = SIMIX_process_self();
22   index = atoi((*argv)[1]);
23   data = smpi_process_remote_data(index);
24   SIMIX_process_set_data(proc, data);
25   if (*argc > 2) {
26     free((*argv)[1]);
27     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
28     (*argv)[(*argc) - 1] = NULL;
29   }
30   (*argc)--;
31   DEBUG2("<%d> New process in the game: %p", index, proc);
32 }
33
34 void smpi_process_destroy(void) {
35   int index = smpi_process_index();
36
37   DEBUG1("<%d> Process left the game", index);
38 }
39
40 /* MPI Low level calls */
41 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
42   MPI_Request request;
43
44   request = xbt_new(s_smpi_mpi_request_t, 1);
45   request->comm = comm;
46   request->src = smpi_comm_rank(comm);
47   request->dst = dst;
48   request->tag = tag;
49   request->size = smpi_datatype_size(datatype) * count;
50   request->complete = 0;
51   request->data = request;
52   smpi_process_post_send(comm, request);
53   request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, NULL);
54   return request;
55 }
56
57 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
58   MPI_Request request;
59
60   request = xbt_new(s_smpi_mpi_request_t, 1);
61   request->comm = comm;
62   request->src = src;
63   request->dst = smpi_comm_rank(comm);
64   request->tag = tag;
65   request->size = smpi_datatype_size(datatype) * count;
66   request->complete = 0;
67   request->data = MPI_REQUEST_NULL;
68   smpi_process_post_recv(request);
69   request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
70   return request;
71 }
72
73 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
74   MPI_Request request;
75
76   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
77   smpi_mpi_wait(&request, status);
78 }
79
80 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
81   MPI_Request request;
82
83   request = smpi_mpi_isend(buf, count, datatype, src, tag, comm);
84   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
85 }
86
87 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
88   MPI_Request requests[2];
89   MPI_Status stats[2];
90
91   requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
92   requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
93   smpi_mpi_waitall(2, requests, stats);
94   if(status != MPI_STATUS_IGNORE) {
95     // Copy receive status
96     memcpy(status, &stats[1], sizeof(MPI_Status));
97   }
98 }
99
100 static void finish_wait(MPI_Request* request, MPI_Status* status) {
101   MPI_Request data = (*request)->data;
102
103   xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation");
104   if(status != MPI_STATUS_IGNORE) {
105     status->MPI_SOURCE = (*request)->src;
106     status->MPI_TAG = (*request)->tag;
107     status->MPI_ERROR = MPI_SUCCESS;
108   }
109   DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
110   // data == *request if sender is first to finish its wait
111   // data != *request if receiver is first to finish its wait
112   if(data->complete == 0) {
113     // first arrives here
114     data->complete = 1;
115     if(data != *request) {
116       // receveiver cleans its part
117       xbt_free(*request);
118     }
119   } else {
120     // second arrives here
121     if(data != *request) {
122       // receiver cleans everything
123       xbt_free(data);
124     }
125     SIMIX_rdv_destroy((*request)->rdv);
126     xbt_free(*request);
127   }
128   *request = MPI_REQUEST_NULL;
129 }
130
131 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
132   MPI_Request data = (*request)->data;
133   int flag = data && data->complete == 1;
134
135   if(flag) {
136     SIMIX_communication_destroy((*request)->pair);
137     finish_wait(request, status);
138   }
139   return flag;
140 }
141
142 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
143   MPI_Request data;
144   int i, flag;
145
146   *index = MPI_UNDEFINED;
147   flag = 0;
148   for(i = 0; i < count; i++) {
149     if(requests[i] != MPI_REQUEST_NULL) {
150       data = requests[i]->data;
151       if(data != MPI_REQUEST_NULL && data->complete == 1) {
152         SIMIX_communication_destroy(requests[i]->pair);
153         finish_wait(&requests[i], status);
154         *index = i;
155         flag = 1;
156         break;
157       }
158     }
159   }
160   return flag;
161 }
162
163 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
164   MPI_Request data = (*request)->data;
165
166   DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]",
167          *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag);
168   // data is null if receiver waits before sender enters the rdv
169   if(data == MPI_REQUEST_NULL || data->complete == 0) {
170     SIMIX_network_wait((*request)->pair, -1.0);
171   } else {
172     SIMIX_communication_destroy((*request)->pair);
173   }
174   finish_wait(request, status);
175 }
176
177 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
178   xbt_dynar_t comms;
179   MPI_Request data;
180   int i, size, index;
181   int* map;
182
183   index = MPI_UNDEFINED;
184   if(count > 0) {
185     // First check for already completed requests
186     for(i = 0; i < count; i++) {
187       if(requests[i] != MPI_REQUEST_NULL) {
188         data = requests[i]->data;
189         if(data != MPI_REQUEST_NULL && data->complete == 1) {
190           index = i;
191           SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
192           break;
193         }
194       }
195     }
196     if(index == MPI_UNDEFINED) {
197       // Otherwise, wait for a request to complete
198       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
199       map = xbt_new(int, count);
200       size = 0;
201       DEBUG0("Wait for one of");
202       for(i = 0; i < count; i++) {
203         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
204          DEBUG4("   request %p [src = %d, dst = %d, tag = %d]",
205                 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
206           xbt_dynar_push(comms, &requests[i]->pair);
207           map[size] = i;
208           size++;
209         }
210       }
211       if(size > 0) {
212         index = SIMIX_network_waitany(comms);
213         index = map[index];
214       }
215       xbt_free(map);
216       xbt_dynar_free_container(&comms);
217     }
218     if(index != MPI_UNDEFINED) {
219       finish_wait(&requests[index], status);
220     }
221   }
222   return index;
223 }
224
225 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
226   int index;
227   MPI_Status stat;
228
229   while(count > 0) {
230     index = smpi_mpi_waitany(count, requests, &stat);
231     if(index == MPI_UNDEFINED) {
232       break;
233     }
234     if(status != MPI_STATUS_IGNORE) {
235       memcpy(&status[index], &stat, sizeof(stat));
236     }
237     // Move the last request to the found position
238     requests[index] = requests[count - 1];
239     requests[count - 1] = MPI_REQUEST_NULL;
240     count--;
241   }
242 }
243
244 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
245   MPI_Request data;
246   int i, count;
247
248   count = 0;
249   for(i = 0; i < incount; i++) {
250     if(requests[i] != MPI_REQUEST_NULL) {
251       data = requests[i]->data;
252       if(data != MPI_REQUEST_NULL && data->complete == 1) {
253         SIMIX_communication_destroy(requests[i]->pair);
254         finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
255         indices[count] = i;
256         count++;
257       }
258     }
259   }
260   return count;
261 }
262
263 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
264   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
265   nary_tree_bcast(buf, count, datatype, root, comm, 4);
266 }
267
268 void smpi_mpi_barrier(MPI_Comm comm) {
269   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
270   nary_tree_barrier(comm, 4);
271 }
272
273 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
274   int system_tag = 666;
275   int rank, size, src, index, sendsize, recvsize;
276   MPI_Request* requests;
277
278   rank = smpi_comm_rank(comm);
279   size = smpi_comm_size(comm);
280   if(rank != root) {
281     // Send buffer to root
282     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
283   } else {
284     sendsize = smpi_datatype_size(sendtype);
285     recvsize = smpi_datatype_size(recvtype);
286     // Local copy from root
287     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
288     // Receive buffers from senders
289     requests = xbt_new(MPI_Request, size - 1);
290     index = 0;
291     for(src = 0; src < size; src++) {
292       if(src != root) {
293         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
294         index++;
295       }
296     }
297     // Wait for completion of irecv's.
298     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
299     xbt_free(requests);
300   }
301 }
302
303 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
304   int system_tag = 666;
305   int rank, size, src, index, sendsize;
306   MPI_Request* requests;
307
308   rank = smpi_comm_rank(comm);
309   size = smpi_comm_size(comm);
310   if(rank != root) {
311     // Send buffer to root
312     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
313   } else {
314     sendsize = smpi_datatype_size(sendtype);
315     // Local copy from root
316     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
317     // Receive buffers from senders
318     requests = xbt_new(MPI_Request, size - 1);
319     index = 0;
320     for(src = 0; src < size; src++) {
321       if(src != root) {
322         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
323         index++;
324       }
325     }
326     // Wait for completion of irecv's.
327     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
328     xbt_free(requests);
329   }
330 }
331
332 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
333   int system_tag = 666;
334   int rank, size, other, index, sendsize, recvsize;
335   MPI_Request* requests;
336
337   rank = smpi_comm_rank(comm);
338   size = smpi_comm_size(comm);
339   sendsize = smpi_datatype_size(sendtype);
340   recvsize = smpi_datatype_size(recvtype);
341   // Local copy from self
342   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
343   // Send/Recv buffers to/from others;
344   requests = xbt_new(MPI_Request, 2 * (size - 1));
345   index = 0;
346   for(other = 0; other < size; other++) {
347     if(other != rank) {
348       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
349       index++;
350       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
351       index++;
352     }
353   }
354   // Wait for completion of all comms.
355   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
356   xbt_free(requests);
357 }
358
359 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
360   int system_tag = 666;
361   int rank, size, other, index, sendsize, recvsize;
362   MPI_Request* requests;
363
364   rank = smpi_comm_rank(comm);
365   size = smpi_comm_size(comm);
366   sendsize = smpi_datatype_size(sendtype);
367   recvsize = smpi_datatype_size(recvtype);
368   // Local copy from self
369   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
370   // Send buffers to others;
371   requests = xbt_new(MPI_Request, 2 * (size - 1));
372   index = 0;
373   for(other = 0; other < size; other++) {
374     if(other != rank) {
375       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
376       index++;
377       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
378       index++;
379     }
380   }
381   // Wait for completion of all comms.
382   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
383   xbt_free(requests);
384 }
385
386 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
387   int system_tag = 666;
388   int rank, size, dst, index, sendsize, recvsize;
389   MPI_Request* requests;
390
391   rank = smpi_comm_rank(comm);
392   size = smpi_comm_size(comm);
393   if(rank != root) {
394     // Recv buffer from root
395     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
396   } else {
397     sendsize = smpi_datatype_size(sendtype);
398     recvsize = smpi_datatype_size(recvtype);
399     // Local copy from root
400     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
401     // Send buffers to receivers
402     requests = xbt_new(MPI_Request, size - 1);
403     index = 0;
404     for(dst = 0; dst < size; dst++) {
405       if(dst != root) {
406         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
407         index++;
408       }
409     }
410     // Wait for completion of isend's.
411     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
412     xbt_free(requests);
413   }
414 }
415
416 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
417   int system_tag = 666;
418   int rank, size, dst, index, sendsize, recvsize;
419   MPI_Request* requests;
420
421   rank = smpi_comm_rank(comm);
422   size = smpi_comm_size(comm);
423   if(rank != root) {
424     // Recv buffer from root
425     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
426   } else {
427     sendsize = smpi_datatype_size(sendtype);
428     recvsize = smpi_datatype_size(recvtype);
429     // Local copy from root
430     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
431     // Send buffers to receivers
432     requests = xbt_new(MPI_Request, size - 1);
433     index = 0;
434     for(dst = 0; dst < size; dst++) {
435       if(dst != root) {
436         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
437         index++;
438       }
439     }
440     // Wait for completion of isend's.
441     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
442     xbt_free(requests);
443   }
444 }
445
446 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
447   int system_tag = 666;
448   int rank, size, src, index, datasize;
449   MPI_Request* requests;
450   void** tmpbufs;
451  
452   rank = smpi_comm_rank(comm);
453   size = smpi_comm_size(comm);
454   if(rank != root) {
455     // Send buffer to root
456     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
457   } else {
458     datasize = smpi_datatype_size(datatype);
459     // Local copy from root
460     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
461     // Receive buffers from senders
462     //TODO: make a MPI_barrier here ?
463     requests = xbt_new(MPI_Request, size - 1);
464     tmpbufs = xbt_new(void*, size - 1);
465     index = 0;
466     for(src = 0; src < size; src++) {
467       if(src != root) {
468         tmpbufs[index] = xbt_malloc(count * datasize);
469         requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
470         index++;
471       }
472     }
473     // Wait for completion of irecv's.
474     for(src = 0; src < size - 1; src++) {
475       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
476       if(index == MPI_UNDEFINED) {
477         break;
478       }
479       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
480     }
481     for(index = 0; index < size - 1; index++) {
482       xbt_free(tmpbufs[index]);
483     }
484     xbt_free(tmpbufs);
485     xbt_free(requests);
486   }
487 }
488
489 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
490   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
491   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
492
493 /*
494 FIXME: buggy implementation
495
496   int system_tag = 666;
497   int rank, size, other, index, datasize;
498   MPI_Request* requests;
499   void** tmpbufs;
500  
501   rank = smpi_comm_rank(comm);
502   size = smpi_comm_size(comm);
503   datasize = smpi_datatype_size(datatype);
504   // Local copy from self
505   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
506   // Send/Recv buffers to/from others;
507   //TODO: make a MPI_barrier here ?
508   requests = xbt_new(MPI_Request, 2 * (size - 1));
509   tmpbufs = xbt_new(void*, size - 1);
510   index = 0;
511   for(other = 0; other < size; other++) {
512     if(other != rank) {
513       tmpbufs[index / 2] = xbt_malloc(count * datasize);
514       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
515       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
516       index += 2;
517     }
518   }
519   // Wait for completion of all comms.
520   for(other = 0; other < 2 * (size - 1); other++) {
521     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
522     if(index == MPI_UNDEFINED) {
523       break;
524     }
525     if((index & 1) == 1) {
526       // Request is odd: it's a irecv
527       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
528     }
529   }
530   for(index = 0; index < size - 1; index++) {
531     xbt_free(tmpbufs[index]);
532   }
533   xbt_free(tmpbufs);
534   xbt_free(requests);
535 */
536 }