Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
kill crude debugging info for ruby: that part of the shit works now
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 void smpi_process_init(int* argc, char*** argv) {
17   int index;
18   smpi_process_data_t data;
19   smx_process_t proc;
20
21   proc = SIMIX_process_self();
22   index = atoi((*argv)[1]);
23   data = smpi_process_remote_data(index);
24   SIMIX_process_set_data(proc, data);
25   if (*argc > 2) {
26     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
27     (*argv)[(*argc) - 1] = NULL;
28   }
29   (*argc)--;
30   DEBUG2("<%d> New process in the game: %p", index, proc);
31 }
32
33 void smpi_process_destroy(void) {
34   int index = smpi_process_index();
35
36   DEBUG1("<%d> Process left the game", index);
37 }
38
39 /* MPI Low level calls */
40 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
41   MPI_Request request;
42
43   request = xbt_new(s_smpi_mpi_request_t, 1);
44   request->comm = comm;
45   request->src = smpi_comm_rank(comm);
46   request->dst = dst;
47   request->tag = tag;
48   request->size = smpi_datatype_size(datatype) * count;
49   request->complete = 0;
50   smpi_process_post_send(comm, request);
51   request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, request);
52   return request;
53 }
54
55 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
56   MPI_Request request;
57
58   request = xbt_new(s_smpi_mpi_request_t, 1);
59   request->comm = comm;
60   request->src = src;
61   request->dst = smpi_comm_rank(comm);
62   request->tag = tag;
63   request->size = smpi_datatype_size(datatype) * count;
64   request->complete = 0;
65   smpi_process_post_recv(request);
66   request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
67   return request;
68 }
69
70 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
71   MPI_Request request;
72
73   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
74   smpi_mpi_wait(&request, status);
75 }
76
77 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
78   MPI_Request request;
79
80   request = smpi_mpi_isend(buf, count, datatype, src, tag, comm);
81   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
82 }
83
84 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
85   MPI_Request requests[2];
86   MPI_Status stats[2];
87
88   requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
89   requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
90   smpi_mpi_waitall(2, requests, stats);
91   if(status != MPI_STATUS_IGNORE) {
92     // Copy receive status
93     memcpy(status, &stats[1], sizeof(MPI_Status));
94   }
95 }
96
97 static void finish_wait(MPI_Request* request, MPI_Status* status) {
98   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
99
100   xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation");
101   if(status != MPI_STATUS_IGNORE) {
102     status->MPI_SOURCE = (*request)->src;
103     status->MPI_TAG = (*request)->tag;
104     status->MPI_ERROR = MPI_SUCCESS;
105   }
106   DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
107   // data == *request if sender is first to finish its wait
108   // data != *request if receiver is first to finish its wait
109   if(data->complete == 0) {
110     // first arrives here
111     data->complete = 1;
112     if(data != *request) {
113       // receveiver cleans its part
114       xbt_free(*request);
115     }
116   } else {
117     // second arrives here
118     if(data != *request) {
119       // receiver cleans everything
120       xbt_free(data);
121     }
122     xbt_free(*request);
123   }
124   *request = MPI_REQUEST_NULL;
125 }
126
127 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
128   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
129   int flag = data && data->complete == 1;
130
131   if(flag) {
132     finish_wait(request, status);
133   }
134   return flag;
135 }
136
137 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
138   MPI_Request data;
139   int i, flag;
140
141   *index = MPI_UNDEFINED;
142   flag = 0;
143   for(i = 0; i < count; i++) {
144     if(requests[i] != MPI_REQUEST_NULL) {
145       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
146       if(data != MPI_REQUEST_NULL && data->complete == 1) {
147         finish_wait(&requests[i], status);
148         *index = i;
149         flag = 1;
150         break;
151       }
152     }
153   }
154   return flag;
155 }
156
157 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
158   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
159
160   DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]",
161          *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag);
162   // data is null if receiver waits before sender enters the rdv
163   if(data == MPI_REQUEST_NULL || data->complete == 0) {
164     SIMIX_network_wait((*request)->pair, -1.0);
165   }
166   finish_wait(request, status);
167 }
168
169 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
170   xbt_dynar_t comms;
171   MPI_Request data;
172   int i, size, index;
173   int* map;
174
175   index = MPI_UNDEFINED;
176   if(count > 0) {
177     // First check for already completed requests
178     for(i = 0; i < count; i++) {
179       if(requests[i] != MPI_REQUEST_NULL) {
180         data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
181         if(data != MPI_REQUEST_NULL && data->complete == 1) {
182           index = i;
183           break;
184         }
185       }
186     }
187     if(index == MPI_UNDEFINED) {
188       // Otherwise, wait for a request to complete
189       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
190       map = xbt_new(int, count);
191       size = 0;
192       DEBUG0("Wait for one of");
193       for(i = 0; i < count; i++) {
194         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
195          DEBUG4("   request %p [src = %d, dst = %d, tag = %d]",
196                 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
197           xbt_dynar_push(comms, &requests[i]->pair);
198           map[size] = i;
199           size++;
200         }
201       }
202       if(size > 0) {
203         index = SIMIX_network_waitany(comms);
204         index = map[index];
205       }
206       xbt_free(map);
207       xbt_dynar_free_container(&comms);
208     }
209     if(index != MPI_UNDEFINED) {
210       finish_wait(&requests[index], status);
211     }
212   }
213   return index;
214 }
215
216 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
217   int index;
218   MPI_Status stat;
219
220   while(count > 0) {
221     index = smpi_mpi_waitany(count, requests, &stat);
222     if(index == MPI_UNDEFINED) {
223       break;
224     }
225     if(status != MPI_STATUS_IGNORE) {
226       memcpy(&status[index], &stat, sizeof(stat));
227     }
228     // Move the last request to the found position
229     requests[index] = requests[count - 1];
230     requests[count - 1] = MPI_REQUEST_NULL;
231     count--;
232   }
233 }
234
235 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
236   MPI_Request data;
237   int i, count;
238
239   count = 0;
240   for(i = 0; i < count; i++) {
241     if(requests[i] != MPI_REQUEST_NULL) {
242       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
243       if(data != MPI_REQUEST_NULL && data->complete == 1) {
244         finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
245         indices[count] = i;
246         count++;
247       }
248     }
249   }
250   return count;
251 }
252
253 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
254   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
255   nary_tree_bcast(buf, count, datatype, root, comm, 4);
256 }
257
258 void smpi_mpi_barrier(MPI_Comm comm) {
259   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
260   nary_tree_barrier(comm, 4);
261 }
262
263 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
264   int system_tag = 666;
265   int rank, size, src, index, sendsize, recvsize;
266   MPI_Request* requests;
267
268   rank = smpi_comm_rank(comm);
269   size = smpi_comm_size(comm);
270   if(rank != root) {
271     // Send buffer to root
272     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
273   } else {
274     sendsize = smpi_datatype_size(sendtype);
275     recvsize = smpi_datatype_size(recvtype);
276     // Local copy from root
277     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
278     // Receive buffers from senders
279     requests = xbt_new(MPI_Request, size - 1);
280     index = 0;
281     for(src = 0; src < size; src++) {
282       if(src != root) {
283         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
284         index++;
285       }
286     }
287     // Wait for completion of irecv's.
288     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
289     xbt_free(requests);
290   }
291 }
292
293 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
294   int system_tag = 666;
295   int rank, size, src, index, sendsize;
296   MPI_Request* requests;
297
298   rank = smpi_comm_rank(comm);
299   size = smpi_comm_size(comm);
300   if(rank != root) {
301     // Send buffer to root
302     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
303   } else {
304     sendsize = smpi_datatype_size(sendtype);
305     // Local copy from root
306     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
307     // Receive buffers from senders
308     requests = xbt_new(MPI_Request, size - 1);
309     index = 0;
310     for(src = 0; src < size; src++) {
311       if(src != root) {
312         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
313         index++;
314       }
315     }
316     // Wait for completion of irecv's.
317     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
318     xbt_free(requests);
319   }
320 }
321
322 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
323   int system_tag = 666;
324   int rank, size, other, index, sendsize, recvsize;
325   MPI_Request* requests;
326
327   rank = smpi_comm_rank(comm);
328   size = smpi_comm_size(comm);
329   sendsize = smpi_datatype_size(sendtype);
330   recvsize = smpi_datatype_size(recvtype);
331   // Local copy from self
332   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
333   // Send/Recv buffers to/from others;
334   requests = xbt_new(MPI_Request, 2 * (size - 1));
335   index = 0;
336   for(other = 0; other < size; other++) {
337     if(other != rank) {
338       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
339       index++;
340       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
341       index++;
342     }
343   }
344   // Wait for completion of all comms.
345   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
346   xbt_free(requests);
347 }
348
349 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
350   int system_tag = 666;
351   int rank, size, other, index, sendsize, recvsize;
352   MPI_Request* requests;
353
354   rank = smpi_comm_rank(comm);
355   size = smpi_comm_size(comm);
356   sendsize = smpi_datatype_size(sendtype);
357   recvsize = smpi_datatype_size(recvtype);
358   // Local copy from self
359   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
360   // Send buffers to others;
361   requests = xbt_new(MPI_Request, 2 * (size - 1));
362   index = 0;
363   for(other = 0; other < size; other++) {
364     if(other != rank) {
365       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
366       index++;
367       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
368       index++;
369     }
370   }
371   // Wait for completion of all comms.
372   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
373   xbt_free(requests);
374 }
375
376 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
377   int system_tag = 666;
378   int rank, size, dst, index, sendsize, recvsize;
379   MPI_Request* requests;
380
381   rank = smpi_comm_rank(comm);
382   size = smpi_comm_size(comm);
383   if(rank != root) {
384     // Recv buffer from root
385     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
386   } else {
387     sendsize = smpi_datatype_size(sendtype);
388     recvsize = smpi_datatype_size(recvtype);
389     // Local copy from root
390     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
391     // Send buffers to receivers
392     requests = xbt_new(MPI_Request, size - 1);
393     index = 0;
394     for(dst = 0; dst < size; dst++) {
395       if(dst != root) {
396         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
397         index++;
398       }
399     }
400     // Wait for completion of isend's.
401     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
402     xbt_free(requests);
403   }
404 }
405
406 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
407   int system_tag = 666;
408   int rank, size, dst, index, sendsize, recvsize;
409   MPI_Request* requests;
410
411   rank = smpi_comm_rank(comm);
412   size = smpi_comm_size(comm);
413   if(rank != root) {
414     // Recv buffer from root
415     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
416   } else {
417     sendsize = smpi_datatype_size(sendtype);
418     recvsize = smpi_datatype_size(recvtype);
419     // Local copy from root
420     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
421     // Send buffers to receivers
422     requests = xbt_new(MPI_Request, size - 1);
423     index = 0;
424     for(dst = 0; dst < size; dst++) {
425       if(dst != root) {
426         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
427         index++;
428       }
429     }
430     // Wait for completion of isend's.
431     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
432     xbt_free(requests);
433   }
434 }
435
436 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
437   int system_tag = 666;
438   int rank, size, src, index, datasize;
439   MPI_Request* requests;
440   void** tmpbufs;
441  
442   rank = smpi_comm_rank(comm);
443   size = smpi_comm_size(comm);
444   if(rank != root) {
445     // Send buffer to root
446     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
447   } else {
448     datasize = smpi_datatype_size(datatype);
449     // Local copy from root
450     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
451     // Receive buffers from senders
452     //TODO: make a MPI_barrier here ?
453     requests = xbt_new(MPI_Request, size - 1);
454     tmpbufs = xbt_new(void*, size - 1);
455     index = 0;
456     for(src = 0; src < size; src++) {
457       if(src != root) {
458         tmpbufs[index] = xbt_malloc(count * datasize);
459         requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
460         index++;
461       }
462     }
463     // Wait for completion of irecv's.
464     for(src = 0; src < size - 1; src++) {
465       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
466       if(index == MPI_UNDEFINED) {
467         break;
468       }
469       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
470     }
471     for(index = 0; index < size - 1; index++) {
472       xbt_free(tmpbufs[index]);
473     }
474     xbt_free(tmpbufs);
475     xbt_free(requests);
476   }
477 }
478
479 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
480   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
481   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
482
483 /*
484 FIXME: buggy implementation
485
486   int system_tag = 666;
487   int rank, size, other, index, datasize;
488   MPI_Request* requests;
489   void** tmpbufs;
490  
491   rank = smpi_comm_rank(comm);
492   size = smpi_comm_size(comm);
493   datasize = smpi_datatype_size(datatype);
494   // Local copy from self
495   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
496   // Send/Recv buffers to/from others;
497   //TODO: make a MPI_barrier here ?
498   requests = xbt_new(MPI_Request, 2 * (size - 1));
499   tmpbufs = xbt_new(void*, size - 1);
500   index = 0;
501   for(other = 0; other < size; other++) {
502     if(other != rank) {
503       tmpbufs[index / 2] = xbt_malloc(count * datasize);
504       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
505       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
506       index += 2;
507     }
508   }
509   // Wait for completion of all comms.
510   for(other = 0; other < 2 * (size - 1); other++) {
511     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
512     if(index == MPI_UNDEFINED) {
513       break;
514     }
515     if((index & 1) == 1) {
516       // Request is odd: it's a irecv
517       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
518     }
519   }
520   for(index = 0; index < size - 1; index++) {
521     xbt_free(tmpbufs[index]);
522   }
523   xbt_free(tmpbufs);
524   xbt_free(requests);
525 */
526 }