Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Move misplaced memleak fix.
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 void smpi_process_init(int* argc, char*** argv) {
17   int index;
18   smpi_process_data_t data;
19   smx_process_t proc;
20
21   proc = SIMIX_process_self();
22   index = atoi((*argv)[1]);
23   data = smpi_process_remote_data(index);
24   SIMIX_process_set_data(proc, data);
25   if (*argc > 2) {
26     free((*argv)[1]);
27     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
28     (*argv)[(*argc) - 1] = NULL;
29   }
30   (*argc)--;
31   DEBUG2("<%d> New process in the game: %p", index, proc);
32 }
33
34 void smpi_process_destroy(void) {
35   int index = smpi_process_index();
36
37   DEBUG1("<%d> Process left the game", index);
38 }
39
40 /* MPI Low level calls */
41 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
42   MPI_Request request;
43
44   request = xbt_new(s_smpi_mpi_request_t, 1);
45   request->comm = comm;
46   request->src = smpi_comm_rank(comm);
47   request->dst = dst;
48   request->tag = tag;
49   request->size = smpi_datatype_size(datatype) * count;
50   request->complete = 0;
51   smpi_process_post_send(comm, request);
52   request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, request);
53   return request;
54 }
55
56 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
57   MPI_Request request;
58
59   request = xbt_new(s_smpi_mpi_request_t, 1);
60   request->comm = comm;
61   request->src = src;
62   request->dst = smpi_comm_rank(comm);
63   request->tag = tag;
64   request->size = smpi_datatype_size(datatype) * count;
65   request->complete = 0;
66   smpi_process_post_recv(request);
67   request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
68   return request;
69 }
70
71 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
72   MPI_Request request;
73
74   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
75   smpi_mpi_wait(&request, status);
76 }
77
78 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
79   MPI_Request request;
80
81   request = smpi_mpi_isend(buf, count, datatype, src, tag, comm);
82   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
83 }
84
85 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
86   MPI_Request requests[2];
87   MPI_Status stats[2];
88
89   requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
90   requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
91   smpi_mpi_waitall(2, requests, stats);
92   if(status != MPI_STATUS_IGNORE) {
93     // Copy receive status
94     memcpy(status, &stats[1], sizeof(MPI_Status));
95   }
96 }
97
98 static void finish_wait(MPI_Request* request, MPI_Status* status) {
99   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
100
101   xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation");
102   if(status != MPI_STATUS_IGNORE) {
103     status->MPI_SOURCE = (*request)->src;
104     status->MPI_TAG = (*request)->tag;
105     status->MPI_ERROR = MPI_SUCCESS;
106   }
107   DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
108   // data == *request if sender is first to finish its wait
109   // data != *request if receiver is first to finish its wait
110   if(data->complete == 0) {
111     // first arrives here
112     data->complete = 1;
113     if(data != *request) {
114       // receveiver cleans its part
115       xbt_free(*request);
116     }
117   } else {
118     // second arrives here
119     if(data != *request) {
120       // receiver cleans everything
121       xbt_free(data);
122     }
123     xbt_free(*request);
124   }
125   *request = MPI_REQUEST_NULL;
126 }
127
128 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
129   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
130   int flag = data && data->complete == 1;
131
132   if(flag) {
133     finish_wait(request, status);
134   }
135   return flag;
136 }
137
138 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
139   MPI_Request data;
140   int i, flag;
141
142   *index = MPI_UNDEFINED;
143   flag = 0;
144   for(i = 0; i < count; i++) {
145     if(requests[i] != MPI_REQUEST_NULL) {
146       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
147       if(data != MPI_REQUEST_NULL && data->complete == 1) {
148         finish_wait(&requests[i], status);
149         *index = i;
150         flag = 1;
151         break;
152       }
153     }
154   }
155   return flag;
156 }
157
158 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
159   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
160
161   DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]",
162          *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag);
163   // data is null if receiver waits before sender enters the rdv
164   if(data == MPI_REQUEST_NULL || data->complete == 0) {
165     SIMIX_network_wait((*request)->pair, -1.0);
166   }
167   finish_wait(request, status);
168 }
169
170 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
171   xbt_dynar_t comms;
172   MPI_Request data;
173   int i, size, index;
174   int* map;
175
176   index = MPI_UNDEFINED;
177   if(count > 0) {
178     // First check for already completed requests
179     for(i = 0; i < count; i++) {
180       if(requests[i] != MPI_REQUEST_NULL) {
181         data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
182         if(data != MPI_REQUEST_NULL && data->complete == 1) {
183           index = i;
184           break;
185         }
186       }
187     }
188     if(index == MPI_UNDEFINED) {
189       // Otherwise, wait for a request to complete
190       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
191       map = xbt_new(int, count);
192       size = 0;
193       DEBUG0("Wait for one of");
194       for(i = 0; i < count; i++) {
195         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
196          DEBUG4("   request %p [src = %d, dst = %d, tag = %d]",
197                 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
198           xbt_dynar_push(comms, &requests[i]->pair);
199           map[size] = i;
200           size++;
201         }
202       }
203       if(size > 0) {
204         index = SIMIX_network_waitany(comms);
205         index = map[index];
206       }
207       xbt_free(map);
208       xbt_dynar_free_container(&comms);
209     }
210     if(index != MPI_UNDEFINED) {
211       finish_wait(&requests[index], status);
212     }
213   }
214   return index;
215 }
216
217 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
218   int index;
219   MPI_Status stat;
220
221   while(count > 0) {
222     index = smpi_mpi_waitany(count, requests, &stat);
223     if(index == MPI_UNDEFINED) {
224       break;
225     }
226     if(status != MPI_STATUS_IGNORE) {
227       memcpy(&status[index], &stat, sizeof(stat));
228     }
229     // Move the last request to the found position
230     requests[index] = requests[count - 1];
231     requests[count - 1] = MPI_REQUEST_NULL;
232     count--;
233   }
234 }
235
236 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
237   MPI_Request data;
238   int i, count;
239
240   count = 0;
241   for(i = 0; i < count; i++) {
242     if(requests[i] != MPI_REQUEST_NULL) {
243       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
244       if(data != MPI_REQUEST_NULL && data->complete == 1) {
245         finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
246         indices[count] = i;
247         count++;
248       }
249     }
250   }
251   return count;
252 }
253
254 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
255   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
256   nary_tree_bcast(buf, count, datatype, root, comm, 4);
257 }
258
259 void smpi_mpi_barrier(MPI_Comm comm) {
260   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
261   nary_tree_barrier(comm, 4);
262 }
263
264 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
265   int system_tag = 666;
266   int rank, size, src, index, sendsize, recvsize;
267   MPI_Request* requests;
268
269   rank = smpi_comm_rank(comm);
270   size = smpi_comm_size(comm);
271   if(rank != root) {
272     // Send buffer to root
273     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
274   } else {
275     sendsize = smpi_datatype_size(sendtype);
276     recvsize = smpi_datatype_size(recvtype);
277     // Local copy from root
278     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
279     // Receive buffers from senders
280     requests = xbt_new(MPI_Request, size - 1);
281     index = 0;
282     for(src = 0; src < size; src++) {
283       if(src != root) {
284         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
285         index++;
286       }
287     }
288     // Wait for completion of irecv's.
289     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
290     xbt_free(requests);
291   }
292 }
293
294 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
295   int system_tag = 666;
296   int rank, size, src, index, sendsize;
297   MPI_Request* requests;
298
299   rank = smpi_comm_rank(comm);
300   size = smpi_comm_size(comm);
301   if(rank != root) {
302     // Send buffer to root
303     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
304   } else {
305     sendsize = smpi_datatype_size(sendtype);
306     // Local copy from root
307     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
308     // Receive buffers from senders
309     requests = xbt_new(MPI_Request, size - 1);
310     index = 0;
311     for(src = 0; src < size; src++) {
312       if(src != root) {
313         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
314         index++;
315       }
316     }
317     // Wait for completion of irecv's.
318     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
319     xbt_free(requests);
320   }
321 }
322
323 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
324   int system_tag = 666;
325   int rank, size, other, index, sendsize, recvsize;
326   MPI_Request* requests;
327
328   rank = smpi_comm_rank(comm);
329   size = smpi_comm_size(comm);
330   sendsize = smpi_datatype_size(sendtype);
331   recvsize = smpi_datatype_size(recvtype);
332   // Local copy from self
333   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
334   // Send/Recv buffers to/from others;
335   requests = xbt_new(MPI_Request, 2 * (size - 1));
336   index = 0;
337   for(other = 0; other < size; other++) {
338     if(other != rank) {
339       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
340       index++;
341       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
342       index++;
343     }
344   }
345   // Wait for completion of all comms.
346   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
347   xbt_free(requests);
348 }
349
350 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
351   int system_tag = 666;
352   int rank, size, other, index, sendsize, recvsize;
353   MPI_Request* requests;
354
355   rank = smpi_comm_rank(comm);
356   size = smpi_comm_size(comm);
357   sendsize = smpi_datatype_size(sendtype);
358   recvsize = smpi_datatype_size(recvtype);
359   // Local copy from self
360   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
361   // Send buffers to others;
362   requests = xbt_new(MPI_Request, 2 * (size - 1));
363   index = 0;
364   for(other = 0; other < size; other++) {
365     if(other != rank) {
366       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
367       index++;
368       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
369       index++;
370     }
371   }
372   // Wait for completion of all comms.
373   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
374   xbt_free(requests);
375 }
376
377 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
378   int system_tag = 666;
379   int rank, size, dst, index, sendsize, recvsize;
380   MPI_Request* requests;
381
382   rank = smpi_comm_rank(comm);
383   size = smpi_comm_size(comm);
384   if(rank != root) {
385     // Recv buffer from root
386     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
387   } else {
388     sendsize = smpi_datatype_size(sendtype);
389     recvsize = smpi_datatype_size(recvtype);
390     // Local copy from root
391     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
392     // Send buffers to receivers
393     requests = xbt_new(MPI_Request, size - 1);
394     index = 0;
395     for(dst = 0; dst < size; dst++) {
396       if(dst != root) {
397         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
398         index++;
399       }
400     }
401     // Wait for completion of isend's.
402     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
403     xbt_free(requests);
404   }
405 }
406
407 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
408   int system_tag = 666;
409   int rank, size, dst, index, sendsize, recvsize;
410   MPI_Request* requests;
411
412   rank = smpi_comm_rank(comm);
413   size = smpi_comm_size(comm);
414   if(rank != root) {
415     // Recv buffer from root
416     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
417   } else {
418     sendsize = smpi_datatype_size(sendtype);
419     recvsize = smpi_datatype_size(recvtype);
420     // Local copy from root
421     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
422     // Send buffers to receivers
423     requests = xbt_new(MPI_Request, size - 1);
424     index = 0;
425     for(dst = 0; dst < size; dst++) {
426       if(dst != root) {
427         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
428         index++;
429       }
430     }
431     // Wait for completion of isend's.
432     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
433     xbt_free(requests);
434   }
435 }
436
437 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
438   int system_tag = 666;
439   int rank, size, src, index, datasize;
440   MPI_Request* requests;
441   void** tmpbufs;
442  
443   rank = smpi_comm_rank(comm);
444   size = smpi_comm_size(comm);
445   if(rank != root) {
446     // Send buffer to root
447     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
448   } else {
449     datasize = smpi_datatype_size(datatype);
450     // Local copy from root
451     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
452     // Receive buffers from senders
453     //TODO: make a MPI_barrier here ?
454     requests = xbt_new(MPI_Request, size - 1);
455     tmpbufs = xbt_new(void*, size - 1);
456     index = 0;
457     for(src = 0; src < size; src++) {
458       if(src != root) {
459         tmpbufs[index] = xbt_malloc(count * datasize);
460         requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
461         index++;
462       }
463     }
464     // Wait for completion of irecv's.
465     for(src = 0; src < size - 1; src++) {
466       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
467       if(index == MPI_UNDEFINED) {
468         break;
469       }
470       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
471     }
472     for(index = 0; index < size - 1; index++) {
473       xbt_free(tmpbufs[index]);
474     }
475     xbt_free(tmpbufs);
476     xbt_free(requests);
477   }
478 }
479
480 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
481   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
482   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
483
484 /*
485 FIXME: buggy implementation
486
487   int system_tag = 666;
488   int rank, size, other, index, datasize;
489   MPI_Request* requests;
490   void** tmpbufs;
491  
492   rank = smpi_comm_rank(comm);
493   size = smpi_comm_size(comm);
494   datasize = smpi_datatype_size(datatype);
495   // Local copy from self
496   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
497   // Send/Recv buffers to/from others;
498   //TODO: make a MPI_barrier here ?
499   requests = xbt_new(MPI_Request, 2 * (size - 1));
500   tmpbufs = xbt_new(void*, size - 1);
501   index = 0;
502   for(other = 0; other < size; other++) {
503     if(other != rank) {
504       tmpbufs[index / 2] = xbt_malloc(count * datasize);
505       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
506       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
507       index += 2;
508     }
509   }
510   // Wait for completion of all comms.
511   for(other = 0; other < 2 * (size - 1); other++) {
512     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
513     if(index == MPI_UNDEFINED) {
514       break;
515     }
516     if((index & 1) == 1) {
517       // Request is odd: it's a irecv
518       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
519     }
520   }
521   for(index = 0; index < size - 1; index++) {
522     xbt_free(tmpbufs[index]);
523   }
524   xbt_free(tmpbufs);
525   xbt_free(requests);
526 */
527 }