Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
- implemented MPI_MAXLOC & MPI_MINLOC in operations
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 void smpi_process_init(int* argc, char*** argv) {
17   int index;
18   smpi_process_data_t data;
19   smx_process_t proc;
20
21   proc = SIMIX_process_self();
22   index = atoi((*argv)[1]);
23   data = smpi_process_remote_data(index);
24   SIMIX_process_set_data(proc, data);
25   if (*argc > 2) {
26     free((*argv)[1]);
27     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
28     (*argv)[(*argc) - 1] = NULL;
29   }
30   (*argc)--;
31   DEBUG2("<%d> New process in the game: %p", index, proc);
32 }
33
34 void smpi_process_destroy(void) {
35   int index = smpi_process_index();
36
37   DEBUG1("<%d> Process left the game", index);
38 }
39
40 /* MPI Low level calls */
41 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
42   MPI_Request request;
43
44   request = xbt_new(s_smpi_mpi_request_t, 1);
45   request->comm = comm;
46   request->src = smpi_comm_rank(comm);
47   request->dst = dst;
48   request->tag = tag;
49   request->size = smpi_datatype_size(datatype) * count;
50   request->complete = 0;
51   request->data = request;
52   smpi_process_post_send(comm, request);
53   request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, NULL);
54   return request;
55 }
56
57 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
58   MPI_Request request;
59
60   request = xbt_new(s_smpi_mpi_request_t, 1);
61   request->comm = comm;
62   request->src = src;
63   request->dst = smpi_comm_rank(comm);
64   request->tag = tag;
65   request->size = smpi_datatype_size(datatype) * count;
66   request->complete = 0;
67   request->data = MPI_REQUEST_NULL;
68   smpi_process_post_recv(request);
69   request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
70   return request;
71 }
72
73 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
74   MPI_Request request;
75
76   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
77   smpi_mpi_wait(&request, status);
78 }
79
80 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
81   MPI_Request request;
82
83   request = smpi_mpi_isend(buf, count, datatype, src, tag, comm);
84   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
85 }
86
87 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
88   MPI_Request requests[2];
89   MPI_Status stats[2];
90
91   requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
92   requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
93   smpi_mpi_waitall(2, requests, stats);
94   if(status != MPI_STATUS_IGNORE) {
95     // Copy receive status
96     memcpy(status, &stats[1], sizeof(MPI_Status));
97   }
98 }
99
100 static void finish_wait(MPI_Request* request, MPI_Status* status) {
101   MPI_Request data = (*request)->data;
102
103   xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation");
104   if(status != MPI_STATUS_IGNORE) {
105     status->MPI_SOURCE = (*request)->src;
106     status->MPI_TAG = (*request)->tag;
107     status->MPI_ERROR = MPI_SUCCESS;
108     status->_count = (*request)->size; // size in bytes
109     status->_cancelled = 0;            // FIXME: cancellation of requests not handled yet 
110   }
111   DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
112   // data == *request if sender is first to finish its wait
113   // data != *request if receiver is first to finish its wait
114   if(data->complete == 0) {
115     // first arrives here
116     data->complete = 1;
117     if(data != *request) {
118       // receveiver cleans its part
119       xbt_free(*request);
120     }
121   } else {
122     // second arrives here
123     if(data != *request) {
124       // receiver cleans everything
125       xbt_free(data);
126     }
127     SIMIX_rdv_destroy((*request)->rdv);
128     xbt_free(*request);
129   }
130   *request = MPI_REQUEST_NULL;
131 }
132
133 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
134   MPI_Request data = (*request)->data;
135   int flag = data && data->complete == 1;
136
137   if(flag) {
138     SIMIX_communication_destroy((*request)->pair);
139     finish_wait(request, status);
140   }
141   return flag;
142 }
143
144 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
145   MPI_Request data;
146   int i, flag;
147
148   *index = MPI_UNDEFINED;
149   flag = 0;
150   for(i = 0; i < count; i++) {
151     if(requests[i] != MPI_REQUEST_NULL) {
152       data = requests[i]->data;
153       if(data != MPI_REQUEST_NULL && data->complete == 1) {
154         SIMIX_communication_destroy(requests[i]->pair);
155         finish_wait(&requests[i], status);
156         *index = i;
157         flag = 1;
158         break;
159       }
160     }
161   }
162   return flag;
163 }
164
165
166 void smpi_mpi_get_count(MPI_Status *status, MPI_Datatype datatype, int *count) {
167  int size = smpi_datatype_size(datatype);
168           *count = (int)(status->_count / size);
169           if ( (int)((*count) * size) != status->_count )
170                     *count = MPI_UNDEFINED;
171 }
172
173
174 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
175   MPI_Request data = (*request)->data;
176
177   DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]",
178          *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag);
179   // data is null if receiver waits before sender enters the rdv
180   if(data == MPI_REQUEST_NULL || data->complete == 0) {
181     SIMIX_network_wait((*request)->pair, -1.0);
182   } else {
183     SIMIX_communication_destroy((*request)->pair);
184   }
185   finish_wait(request, status);
186 }
187
188 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
189   xbt_dynar_t comms;
190   MPI_Request data;
191   int i, size, index;
192   int* map;
193
194   index = MPI_UNDEFINED;
195   if(count > 0) {
196     // First check for already completed requests
197     for(i = 0; i < count; i++) {
198       if(requests[i] != MPI_REQUEST_NULL) {
199         data = requests[i]->data;
200         if(data != MPI_REQUEST_NULL && data->complete == 1) {
201           index = i;
202           SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
203           break;
204         }
205       }
206     }
207     if(index == MPI_UNDEFINED) {
208       // Otherwise, wait for a request to complete
209       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
210       map = xbt_new(int, count);
211       size = 0;
212       DEBUG0("Wait for one of");
213       for(i = 0; i < count; i++) {
214         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
215          DEBUG4("   request %p [src = %d, dst = %d, tag = %d]",
216                 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
217           xbt_dynar_push(comms, &requests[i]->pair);
218           map[size] = i;
219           size++;
220         }
221       }
222       if(size > 0) {
223         index = SIMIX_network_waitany(comms);
224         index = map[index];
225       }
226       xbt_free(map);
227       xbt_dynar_free_container(&comms);
228     }
229     if(index != MPI_UNDEFINED) {
230       finish_wait(&requests[index], status);
231     }
232   }
233   return index;
234 }
235
236 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
237   int index;
238   MPI_Status stat;
239
240   while(count > 0) {
241     index = smpi_mpi_waitany(count, requests, &stat);
242     if(index == MPI_UNDEFINED) {
243       break;
244     }
245     if(status != MPI_STATUS_IGNORE) {
246       memcpy(&status[index], &stat, sizeof(stat));
247     }
248     // Move the last request to the found position
249     requests[index] = requests[count - 1];
250     requests[count - 1] = MPI_REQUEST_NULL;
251     count--;
252   }
253 }
254
255 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
256   MPI_Request data;
257   int i, count;
258
259   count = 0;
260   for(i = 0; i < incount; i++) {
261     if(requests[i] != MPI_REQUEST_NULL) {
262       data = requests[i]->data;
263       if(data != MPI_REQUEST_NULL && data->complete == 1) {
264         SIMIX_communication_destroy(requests[i]->pair);
265         finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
266         indices[count] = i;
267         count++;
268       }
269     }
270   }
271   return count;
272 }
273
274 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
275   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
276   nary_tree_bcast(buf, count, datatype, root, comm, 4);
277 }
278
279 void smpi_mpi_barrier(MPI_Comm comm) {
280   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
281   nary_tree_barrier(comm, 4);
282 }
283
284 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
285   int system_tag = 666;
286   int rank, size, src, index, sendsize, recvsize;
287   MPI_Request* requests;
288
289   rank = smpi_comm_rank(comm);
290   size = smpi_comm_size(comm);
291   if(rank != root) {
292     // Send buffer to root
293     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
294   } else {
295     sendsize = smpi_datatype_size(sendtype);
296     recvsize = smpi_datatype_size(recvtype);
297     // Local copy from root
298     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
299     // Receive buffers from senders
300     requests = xbt_new(MPI_Request, size - 1);
301     index = 0;
302     for(src = 0; src < size; src++) {
303       if(src != root) {
304         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
305         index++;
306       }
307     }
308     // Wait for completion of irecv's.
309     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
310     xbt_free(requests);
311   }
312 }
313
314 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
315   int system_tag = 666;
316   int rank, size, src, index, sendsize;
317   MPI_Request* requests;
318
319   rank = smpi_comm_rank(comm);
320   size = smpi_comm_size(comm);
321   if(rank != root) {
322     // Send buffer to root
323     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
324   } else {
325     sendsize = smpi_datatype_size(sendtype);
326     // Local copy from root
327     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
328     // Receive buffers from senders
329     requests = xbt_new(MPI_Request, size - 1);
330     index = 0;
331     for(src = 0; src < size; src++) {
332       if(src != root) {
333         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
334         index++;
335       }
336     }
337     // Wait for completion of irecv's.
338     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
339     xbt_free(requests);
340   }
341 }
342
343 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
344   int system_tag = 666;
345   int rank, size, other, index, sendsize, recvsize;
346   MPI_Request* requests;
347
348   rank = smpi_comm_rank(comm);
349   size = smpi_comm_size(comm);
350   sendsize = smpi_datatype_size(sendtype);
351   recvsize = smpi_datatype_size(recvtype);
352   // Local copy from self
353   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
354   // Send/Recv buffers to/from others;
355   requests = xbt_new(MPI_Request, 2 * (size - 1));
356   index = 0;
357   for(other = 0; other < size; other++) {
358     if(other != rank) {
359       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
360       index++;
361       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
362       index++;
363     }
364   }
365   // Wait for completion of all comms.
366   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
367   xbt_free(requests);
368 }
369
370 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
371   int system_tag = 666;
372   int rank, size, other, index, sendsize, recvsize;
373   MPI_Request* requests;
374
375   rank = smpi_comm_rank(comm);
376   size = smpi_comm_size(comm);
377   sendsize = smpi_datatype_size(sendtype);
378   recvsize = smpi_datatype_size(recvtype);
379   // Local copy from self
380   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
381   // Send buffers to others;
382   requests = xbt_new(MPI_Request, 2 * (size - 1));
383   index = 0;
384   for(other = 0; other < size; other++) {
385     if(other != rank) {
386       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
387       index++;
388       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
389       index++;
390     }
391   }
392   // Wait for completion of all comms.
393   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
394   xbt_free(requests);
395 }
396
397 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
398   int system_tag = 666;
399   int rank, size, dst, index, sendsize, recvsize;
400   MPI_Request* requests;
401
402   rank = smpi_comm_rank(comm);
403   size = smpi_comm_size(comm);
404   if(rank != root) {
405     // Recv buffer from root
406     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
407   } else {
408     sendsize = smpi_datatype_size(sendtype);
409     recvsize = smpi_datatype_size(recvtype);
410     // Local copy from root
411     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
412     // Send buffers to receivers
413     requests = xbt_new(MPI_Request, size - 1);
414     index = 0;
415     for(dst = 0; dst < size; dst++) {
416       if(dst != root) {
417         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
418         index++;
419       }
420     }
421     // Wait for completion of isend's.
422     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
423     xbt_free(requests);
424   }
425 }
426
427 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
428   int system_tag = 666;
429   int rank, size, dst, index, sendsize, recvsize;
430   MPI_Request* requests;
431
432   rank = smpi_comm_rank(comm);
433   size = smpi_comm_size(comm);
434   if(rank != root) {
435     // Recv buffer from root
436     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
437   } else {
438     sendsize = smpi_datatype_size(sendtype);
439     recvsize = smpi_datatype_size(recvtype);
440     // Local copy from root
441     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
442     // Send buffers to receivers
443     requests = xbt_new(MPI_Request, size - 1);
444     index = 0;
445     for(dst = 0; dst < size; dst++) {
446       if(dst != root) {
447         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
448         index++;
449       }
450     }
451     // Wait for completion of isend's.
452     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
453     xbt_free(requests);
454   }
455 }
456
457 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
458   int system_tag = 666;
459   int rank, size, src, index, datasize;
460   MPI_Request* requests;
461   void** tmpbufs;
462  
463   rank = smpi_comm_rank(comm);
464   size = smpi_comm_size(comm);
465   if(rank != root) {
466     // Send buffer to root
467     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
468   } else {
469     datasize = smpi_datatype_size(datatype);
470     // Local copy from root
471     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
472     // Receive buffers from senders
473     //TODO: make a MPI_barrier here ?
474     requests = xbt_new(MPI_Request, size - 1);
475     tmpbufs = xbt_new(void*, size - 1);
476     index = 0;
477     for(src = 0; src < size; src++) {
478       if(src != root) {
479         tmpbufs[index] = xbt_malloc(count * datasize);
480         requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
481         index++;
482       }
483     }
484     // Wait for completion of irecv's.
485     for(src = 0; src < size - 1; src++) {
486       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
487       if(index == MPI_UNDEFINED) {
488         break;
489       }
490       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
491     }
492     for(index = 0; index < size - 1; index++) {
493       xbt_free(tmpbufs[index]);
494     }
495     xbt_free(tmpbufs);
496     xbt_free(requests);
497   }
498 }
499
500 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
501   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
502   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
503
504 /*
505 FIXME: buggy implementation
506
507   int system_tag = 666;
508   int rank, size, other, index, datasize;
509   MPI_Request* requests;
510   void** tmpbufs;
511  
512   rank = smpi_comm_rank(comm);
513   size = smpi_comm_size(comm);
514   datasize = smpi_datatype_size(datatype);
515   // Local copy from self
516   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
517   // Send/Recv buffers to/from others;
518   //TODO: make a MPI_barrier here ?
519   requests = xbt_new(MPI_Request, 2 * (size - 1));
520   tmpbufs = xbt_new(void*, size - 1);
521   index = 0;
522   for(other = 0; other < size; other++) {
523     if(other != rank) {
524       tmpbufs[index / 2] = xbt_malloc(count * datasize);
525       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
526       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
527       index += 2;
528     }
529   }
530   // Wait for completion of all comms.
531   for(other = 0; other < 2 * (size - 1); other++) {
532     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
533     if(index == MPI_UNDEFINED) {
534       break;
535     }
536     if((index & 1) == 1) {
537       // Request is odd: it's a irecv
538       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
539     }
540   }
541   for(index = 0; index < size - 1; index++) {
542     xbt_free(tmpbufs[index]);
543   }
544   xbt_free(tmpbufs);
545   xbt_free(requests);
546 */
547 }