Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bugfix to dirty cleanup of SIMIX from SMPI.
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 void smpi_process_init(int* argc, char*** argv) {
17   int index;
18   smpi_process_data_t data;
19   smx_process_t proc;
20
21   proc = SIMIX_process_self();
22   index = atoi((*argv)[1]);
23   data = smpi_process_remote_data(index);
24   SIMIX_process_set_data(proc, data);
25   if (*argc > 2) {
26     free((*argv)[1]);
27     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
28     (*argv)[(*argc) - 1] = NULL;
29   }
30   (*argc)--;
31   DEBUG2("<%d> New process in the game: %p", index, proc);
32 }
33
34 void smpi_process_destroy(void) {
35   int index = smpi_process_index();
36
37   DEBUG1("<%d> Process left the game", index);
38 }
39
40 /* MPI Low level calls */
41 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
42   MPI_Request request;
43
44   request = xbt_new(s_smpi_mpi_request_t, 1);
45   request->comm = comm;
46   request->src = smpi_comm_rank(comm);
47   request->dst = dst;
48   request->tag = tag;
49   request->size = smpi_datatype_size(datatype) * count;
50   request->complete = 0;
51   smpi_process_post_send(comm, request);
52   request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, request);
53   return request;
54 }
55
56 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
57   MPI_Request request;
58
59   request = xbt_new(s_smpi_mpi_request_t, 1);
60   request->comm = comm;
61   request->src = src;
62   request->dst = smpi_comm_rank(comm);
63   request->tag = tag;
64   request->size = smpi_datatype_size(datatype) * count;
65   request->complete = 0;
66   smpi_process_post_recv(request);
67   request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
68   return request;
69 }
70
71 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
72   MPI_Request request;
73
74   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
75   smpi_mpi_wait(&request, status);
76 }
77
78 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
79   MPI_Request request;
80
81   request = smpi_mpi_isend(buf, count, datatype, src, tag, comm);
82   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
83 }
84
85 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
86   MPI_Request requests[2];
87   MPI_Status stats[2];
88
89   requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
90   requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
91   smpi_mpi_waitall(2, requests, stats);
92   if(status != MPI_STATUS_IGNORE) {
93     // Copy receive status
94     memcpy(status, &stats[1], sizeof(MPI_Status));
95   }
96 }
97
98 static void finish_wait(MPI_Request* request, MPI_Status* status) {
99   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
100
101   xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation");
102   if(status != MPI_STATUS_IGNORE) {
103     status->MPI_SOURCE = (*request)->src;
104     status->MPI_TAG = (*request)->tag;
105     status->MPI_ERROR = MPI_SUCCESS;
106   }
107   DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
108   // data == *request if sender is first to finish its wait
109   // data != *request if receiver is first to finish its wait
110   if(data->complete == 0) {
111     // first arrives here
112     data->complete = 1;
113     if(data != *request) {
114       // receveiver cleans its part
115       xbt_free(*request);
116     }
117   } else {
118     // second arrives here
119     if(data != *request) {
120       // receiver cleans everything
121       xbt_free(data);
122     }
123     xbt_free(*request);
124   }
125   *request = MPI_REQUEST_NULL;
126 }
127
128 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
129   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
130   int flag = data && data->complete == 1;
131
132   if(flag) {
133     finish_wait(request, status);
134   }
135   return flag;
136 }
137
138 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
139   MPI_Request data;
140   int i, flag;
141
142   *index = MPI_UNDEFINED;
143   flag = 0;
144   for(i = 0; i < count; i++) {
145     if(requests[i] != MPI_REQUEST_NULL) {
146       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
147       if(data != MPI_REQUEST_NULL && data->complete == 1) {
148         finish_wait(&requests[i], status);
149         *index = i;
150         flag = 1;
151         break;
152       }
153     }
154   }
155   return flag;
156 }
157
158 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
159   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
160
161   DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]",
162          *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag);
163   // data is null if receiver waits before sender enters the rdv
164   if(data == MPI_REQUEST_NULL || data->complete == 0) {
165     SIMIX_network_wait((*request)->pair, -1.0);
166   } else {
167     SIMIX_communication_destroy((*request)->pair);
168   }
169   finish_wait(request, status);
170 }
171
172 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
173   xbt_dynar_t comms;
174   MPI_Request data;
175   int i, size, index;
176   int* map;
177
178   index = MPI_UNDEFINED;
179   if(count > 0) {
180     // First check for already completed requests
181     for(i = 0; i < count; i++) {
182       if(requests[i] != MPI_REQUEST_NULL) {
183         data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
184         if(data != MPI_REQUEST_NULL && data->complete == 1) {
185           index = i;
186           SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
187           break;
188         }
189       }
190     }
191     if(index == MPI_UNDEFINED) {
192       // Otherwise, wait for a request to complete
193       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
194       map = xbt_new(int, count);
195       size = 0;
196       DEBUG0("Wait for one of");
197       for(i = 0; i < count; i++) {
198         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
199          DEBUG4("   request %p [src = %d, dst = %d, tag = %d]",
200                 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
201           xbt_dynar_push(comms, &requests[i]->pair);
202           map[size] = i;
203           size++;
204         }
205       }
206       if(size > 0) {
207         index = SIMIX_network_waitany(comms);
208         index = map[index];
209       }
210       xbt_free(map);
211       xbt_dynar_free_container(&comms);
212     }
213     if(index != MPI_UNDEFINED) {
214       finish_wait(&requests[index], status);
215     }
216   }
217   return index;
218 }
219
220 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
221   int index;
222   MPI_Status stat;
223
224   while(count > 0) {
225     index = smpi_mpi_waitany(count, requests, &stat);
226     if(index == MPI_UNDEFINED) {
227       break;
228     }
229     if(status != MPI_STATUS_IGNORE) {
230       memcpy(&status[index], &stat, sizeof(stat));
231     }
232     // Move the last request to the found position
233     requests[index] = requests[count - 1];
234     requests[count - 1] = MPI_REQUEST_NULL;
235     count--;
236   }
237 }
238
239 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
240   MPI_Request data;
241   int i, count;
242
243   count = 0;
244   for(i = 0; i < count; i++) {
245     if(requests[i] != MPI_REQUEST_NULL) {
246       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
247       if(data != MPI_REQUEST_NULL && data->complete == 1) {
248         finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
249         indices[count] = i;
250         count++;
251       }
252     }
253   }
254   return count;
255 }
256
257 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
258   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
259   nary_tree_bcast(buf, count, datatype, root, comm, 4);
260 }
261
262 void smpi_mpi_barrier(MPI_Comm comm) {
263   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
264   nary_tree_barrier(comm, 4);
265 }
266
267 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
268   int system_tag = 666;
269   int rank, size, src, index, sendsize, recvsize;
270   MPI_Request* requests;
271
272   rank = smpi_comm_rank(comm);
273   size = smpi_comm_size(comm);
274   if(rank != root) {
275     // Send buffer to root
276     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
277   } else {
278     sendsize = smpi_datatype_size(sendtype);
279     recvsize = smpi_datatype_size(recvtype);
280     // Local copy from root
281     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
282     // Receive buffers from senders
283     requests = xbt_new(MPI_Request, size - 1);
284     index = 0;
285     for(src = 0; src < size; src++) {
286       if(src != root) {
287         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
288         index++;
289       }
290     }
291     // Wait for completion of irecv's.
292     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
293     xbt_free(requests);
294   }
295 }
296
297 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
298   int system_tag = 666;
299   int rank, size, src, index, sendsize;
300   MPI_Request* requests;
301
302   rank = smpi_comm_rank(comm);
303   size = smpi_comm_size(comm);
304   if(rank != root) {
305     // Send buffer to root
306     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
307   } else {
308     sendsize = smpi_datatype_size(sendtype);
309     // Local copy from root
310     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
311     // Receive buffers from senders
312     requests = xbt_new(MPI_Request, size - 1);
313     index = 0;
314     for(src = 0; src < size; src++) {
315       if(src != root) {
316         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
317         index++;
318       }
319     }
320     // Wait for completion of irecv's.
321     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
322     xbt_free(requests);
323   }
324 }
325
326 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
327   int system_tag = 666;
328   int rank, size, other, index, sendsize, recvsize;
329   MPI_Request* requests;
330
331   rank = smpi_comm_rank(comm);
332   size = smpi_comm_size(comm);
333   sendsize = smpi_datatype_size(sendtype);
334   recvsize = smpi_datatype_size(recvtype);
335   // Local copy from self
336   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
337   // Send/Recv buffers to/from others;
338   requests = xbt_new(MPI_Request, 2 * (size - 1));
339   index = 0;
340   for(other = 0; other < size; other++) {
341     if(other != rank) {
342       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
343       index++;
344       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
345       index++;
346     }
347   }
348   // Wait for completion of all comms.
349   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
350   xbt_free(requests);
351 }
352
353 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
354   int system_tag = 666;
355   int rank, size, other, index, sendsize, recvsize;
356   MPI_Request* requests;
357
358   rank = smpi_comm_rank(comm);
359   size = smpi_comm_size(comm);
360   sendsize = smpi_datatype_size(sendtype);
361   recvsize = smpi_datatype_size(recvtype);
362   // Local copy from self
363   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
364   // Send buffers to others;
365   requests = xbt_new(MPI_Request, 2 * (size - 1));
366   index = 0;
367   for(other = 0; other < size; other++) {
368     if(other != rank) {
369       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
370       index++;
371       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
372       index++;
373     }
374   }
375   // Wait for completion of all comms.
376   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
377   xbt_free(requests);
378 }
379
380 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
381   int system_tag = 666;
382   int rank, size, dst, index, sendsize, recvsize;
383   MPI_Request* requests;
384
385   rank = smpi_comm_rank(comm);
386   size = smpi_comm_size(comm);
387   if(rank != root) {
388     // Recv buffer from root
389     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
390   } else {
391     sendsize = smpi_datatype_size(sendtype);
392     recvsize = smpi_datatype_size(recvtype);
393     // Local copy from root
394     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
395     // Send buffers to receivers
396     requests = xbt_new(MPI_Request, size - 1);
397     index = 0;
398     for(dst = 0; dst < size; dst++) {
399       if(dst != root) {
400         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
401         index++;
402       }
403     }
404     // Wait for completion of isend's.
405     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
406     xbt_free(requests);
407   }
408 }
409
410 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
411   int system_tag = 666;
412   int rank, size, dst, index, sendsize, recvsize;
413   MPI_Request* requests;
414
415   rank = smpi_comm_rank(comm);
416   size = smpi_comm_size(comm);
417   if(rank != root) {
418     // Recv buffer from root
419     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
420   } else {
421     sendsize = smpi_datatype_size(sendtype);
422     recvsize = smpi_datatype_size(recvtype);
423     // Local copy from root
424     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
425     // Send buffers to receivers
426     requests = xbt_new(MPI_Request, size - 1);
427     index = 0;
428     for(dst = 0; dst < size; dst++) {
429       if(dst != root) {
430         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
431         index++;
432       }
433     }
434     // Wait for completion of isend's.
435     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
436     xbt_free(requests);
437   }
438 }
439
440 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
441   int system_tag = 666;
442   int rank, size, src, index, datasize;
443   MPI_Request* requests;
444   void** tmpbufs;
445  
446   rank = smpi_comm_rank(comm);
447   size = smpi_comm_size(comm);
448   if(rank != root) {
449     // Send buffer to root
450     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
451   } else {
452     datasize = smpi_datatype_size(datatype);
453     // Local copy from root
454     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
455     // Receive buffers from senders
456     //TODO: make a MPI_barrier here ?
457     requests = xbt_new(MPI_Request, size - 1);
458     tmpbufs = xbt_new(void*, size - 1);
459     index = 0;
460     for(src = 0; src < size; src++) {
461       if(src != root) {
462         tmpbufs[index] = xbt_malloc(count * datasize);
463         requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
464         index++;
465       }
466     }
467     // Wait for completion of irecv's.
468     for(src = 0; src < size - 1; src++) {
469       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
470       if(index == MPI_UNDEFINED) {
471         break;
472       }
473       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
474     }
475     for(index = 0; index < size - 1; index++) {
476       xbt_free(tmpbufs[index]);
477     }
478     xbt_free(tmpbufs);
479     xbt_free(requests);
480   }
481 }
482
483 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
484   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
485   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
486
487 /*
488 FIXME: buggy implementation
489
490   int system_tag = 666;
491   int rank, size, other, index, datasize;
492   MPI_Request* requests;
493   void** tmpbufs;
494  
495   rank = smpi_comm_rank(comm);
496   size = smpi_comm_size(comm);
497   datasize = smpi_datatype_size(datatype);
498   // Local copy from self
499   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
500   // Send/Recv buffers to/from others;
501   //TODO: make a MPI_barrier here ?
502   requests = xbt_new(MPI_Request, 2 * (size - 1));
503   tmpbufs = xbt_new(void*, size - 1);
504   index = 0;
505   for(other = 0; other < size; other++) {
506     if(other != rank) {
507       tmpbufs[index / 2] = xbt_malloc(count * datasize);
508       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
509       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
510       index += 2;
511     }
512   }
513   // Wait for completion of all comms.
514   for(other = 0; other < 2 * (size - 1); other++) {
515     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
516     if(index == MPI_UNDEFINED) {
517       break;
518     }
519     if((index & 1) == 1) {
520       // Request is odd: it's a irecv
521       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
522     }
523   }
524   for(index = 0; index < size - 1; index++) {
525     xbt_free(tmpbufs[index]);
526   }
527   xbt_free(tmpbufs);
528   xbt_free(requests);
529 */
530 }