Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into mc-process
[simgrid.git] / src / smpi / smpi_rma.c
1
2 /* Copyright (c) 2007-2014. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 #define RMA_TAG -1234
13
14 xbt_bar_t creation_bar = NULL;
15
16 typedef struct s_smpi_mpi_win{
17   void* base;
18   MPI_Aint size;
19   int disp_unit;
20   MPI_Comm comm;
21   MPI_Info info;
22   int assert;
23   xbt_dynar_t requests;
24   xbt_bar_t bar;
25   MPI_Win* connected_wins;
26   char* name;
27   int opened;
28   MPI_Group group;
29 } s_smpi_mpi_win_t;
30
31
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33
34   MPI_Win win;
35   
36   int comm_size = smpi_comm_size(comm);
37   int rank=smpi_comm_rank(comm);
38   XBT_DEBUG("Creating window");
39
40   win = xbt_new(s_smpi_mpi_win_t, 1);
41   win->base = base;
42   win->size = size;
43   win->disp_unit = disp_unit;
44   win->assert = 0;
45   win->info = info;
46   if(info!=MPI_INFO_NULL)
47     info->refcount++;
48   win->comm = comm;
49   win->name = NULL;
50   win->opened = 0;
51   win->group = MPI_GROUP_NULL;
52   win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
53   win->connected_wins = xbt_malloc0(comm_size*sizeof(MPI_Win));
54   win->connected_wins[rank] = win;
55   
56   if(rank==0){
57     win->bar=xbt_barrier_init(comm_size);
58   }
59   
60   mpi_coll_allgather_fun(&(win->connected_wins[rank]),
61                      sizeof(MPI_Win),
62                      MPI_BYTE,
63                      win->connected_wins,
64                      sizeof(MPI_Win),
65                      MPI_BYTE,
66                      comm);
67                      
68   mpi_coll_bcast_fun( &(win->bar),
69                      sizeof(xbt_bar_t),
70                      MPI_BYTE,
71                      0,
72                      comm);
73                      
74   mpi_coll_barrier_fun(comm);
75   
76   return win;
77 }
78
79 int smpi_mpi_win_free( MPI_Win* win){
80
81   //As per the standard, perform a barrier to ensure every async comm is finished
82   xbt_barrier_wait((*win)->bar);
83   xbt_dynar_free(&(*win)->requests);
84   xbt_free((*win)->connected_wins);
85   if ((*win)->name != NULL){
86     xbt_free((*win)->name);
87   }
88   if((*win)->info!=MPI_INFO_NULL){
89     MPI_Info_free(&(*win)->info);
90   }
91   xbt_free(*win);
92   win = MPI_WIN_NULL;
93   return MPI_SUCCESS;
94 }
95
96 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
97   if(win->name==NULL){
98     *length=0;
99     name=NULL;
100     return;
101   }
102   *length = strlen(win->name);
103   strcpy(name, win->name);
104 }
105
106 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
107   if(win->comm != MPI_COMM_NULL){
108     *group = smpi_comm_group(win->comm);
109     smpi_group_use(*group);
110   }
111 }
112
113 void smpi_mpi_win_set_name(MPI_Win win, char* name){
114   win->name = strdup(name);;
115 }
116
117
118 int smpi_mpi_win_fence( int assert,  MPI_Win win){
119
120   XBT_DEBUG("Entering fence");
121   if(!win->opened)
122     win->opened=1;
123   if(assert != MPI_MODE_NOPRECEDE){
124     xbt_barrier_wait(win->bar);
125
126     xbt_dynar_t reqs = win->requests;
127     int size = xbt_dynar_length(reqs);
128     unsigned int cpt=0;
129     MPI_Request req;
130     // start all requests that have been prepared by another process
131     xbt_dynar_foreach(reqs, cpt, req){
132       if (req->flags & PREPARED) smpi_mpi_start(req);
133     }
134
135     MPI_Request* treqs = xbt_dynar_to_array(reqs);
136     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
137     xbt_free(treqs);
138     win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
139
140   }
141   win->assert = assert;
142   
143   xbt_barrier_wait(win->bar);
144   XBT_DEBUG("Leaving fence ");
145
146   return MPI_SUCCESS;
147 }
148
149 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
150               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
151 {
152
153   if(!win->opened)//check that post/start has been done
154     return MPI_ERR_WIN;
155   //get receiver pointer
156   MPI_Win recv_win = win->connected_wins[target_rank];
157
158   void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
159   smpi_datatype_use(origin_datatype);
160   smpi_datatype_use(target_datatype);
161   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
162
163   if(target_rank != smpi_comm_rank(win->comm)){
164     //prepare send_request
165     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
166         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
167
168     //prepare receiver request
169     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
170         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
171
172     //push request to receiver's win
173     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
174
175     //start send
176     smpi_mpi_start(sreq);
177
178     //push request to sender's win
179     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
180   }else{
181     smpi_datatype_copy(origin_addr, origin_count, origin_datatype,
182                        recv_addr, target_count, target_datatype);
183   }
184
185   return MPI_SUCCESS;
186 }
187
188 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
189               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
190 {
191   if(!win->opened)//check that post/start has been done
192     return MPI_ERR_WIN;
193   //get sender pointer
194   MPI_Win send_win = win->connected_wins[target_rank];
195
196   void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
197   smpi_datatype_use(origin_datatype);
198   smpi_datatype_use(target_datatype);
199   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
200
201   if(target_rank != smpi_comm_rank(win->comm)){
202     //prepare send_request
203     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
204         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm, MPI_OP_NULL);
205
206     //prepare receiver request
207     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
208         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm, MPI_OP_NULL);
209         
210     //start the send, with another process than us as sender. 
211     smpi_mpi_start(sreq);
212     
213     //push request to receiver's win
214     xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
215
216     //start recv
217     smpi_mpi_start(rreq);
218
219     //push request to sender's win
220     xbt_dynar_push_as(win->requests, MPI_Request, rreq);
221   }else{
222     smpi_datatype_copy(send_addr, target_count, target_datatype,
223                        origin_addr, origin_count, origin_datatype);
224   }
225
226   return MPI_SUCCESS;
227 }
228
229
230 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
231               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
232 {
233   if(!win->opened)//check that post/start has been done
234     return MPI_ERR_WIN;
235   //FIXME: local version 
236   //get receiver pointer
237   MPI_Win recv_win = win->connected_wins[target_rank];
238
239   void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
240   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
241
242   smpi_datatype_use(origin_datatype);
243   smpi_datatype_use(target_datatype);
244
245
246     //prepare send_request
247     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
248         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
249
250     //prepare receiver request
251     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
252         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
253     //push request to receiver's win
254     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
255     //start send
256     smpi_mpi_start(sreq);
257     
258     //push request to sender's win
259     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
260   
261
262
263   return MPI_SUCCESS;
264 }
265
266 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
267     /* From MPI forum advices
268     The call to MPI_WIN_COMPLETE does not return until the put call has completed at 
269     the origin; and the target window will be accessed by the put operation only 
270     after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by the target
271      process. This still leaves much choice to implementors. The call to 
272      MPI_WIN_START can block until the matching call to MPI_WIN_POST occurs at all 
273     target processes. One can also have implementations where the call to 
274     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching 
275     call to MPI_WIN_POST occurred; or implementations where the first two calls are 
276     nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call to 
277     MPI_WIN_POST occurred; or even implementations where all three calls can 
278     complete before any target process called MPI_WIN_POST --- the data put must be 
279     buffered, in this last case, so as to allow the put to complete at the origin 
280     ahead of its completion at the target. However, once the call to MPI_WIN_POST is
281      issued, the sequence above must complete, without further dependencies.
282     */
283   
284   //naive, blocking implementation.
285   int i=0,j=0;
286   int size = smpi_group_size(group);
287   MPI_Request* reqs = xbt_new0(MPI_Request, size);
288   
289 //  for(i=0;i<size;i++){
290   while(j!=size){
291     int src=smpi_group_index(group,j);
292     if(src!=smpi_process_index()){
293       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
294       i++;
295     }
296     j++;
297   }
298   size=i;
299   smpi_mpi_startall(size, reqs);
300   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
301   for(i=0;i<size;i++){
302     smpi_mpi_request_free(&reqs[i]);
303   }
304   xbt_free(reqs);
305   win->opened++; //we're open for business !
306   win->group=group;
307   smpi_group_use(group);
308   return MPI_SUCCESS;
309 }
310
311 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
312   //let's make a synchronous send here
313   int i=0,j=0;
314   int size = smpi_group_size(group);
315   MPI_Request* reqs = xbt_new0(MPI_Request, size);
316   
317   while(j!=size){
318     int dst=smpi_group_index(group,j);
319     if(dst!=smpi_process_index()){
320       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst,
321         RMA_TAG+4, MPI_COMM_WORLD);
322       i++;
323     }
324     j++;
325   }
326   size=i;
327   
328   smpi_mpi_startall(size, reqs);
329   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
330   for(i=0;i<size;i++){
331     smpi_mpi_request_free(&reqs[i]);
332   }
333   xbt_free(reqs);
334   win->opened++; //we're open for business !
335   win->group=group;
336   smpi_group_use(group);
337   return MPI_SUCCESS;
338 }
339
340 int smpi_mpi_win_complete(MPI_Win win){
341   if(win->opened==0)
342     xbt_die("Complete called on already opened MPI_Win");
343 //  xbt_barrier_wait(win->bar);
344   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
345   //mpi_coll_barrier_fun(comm);
346   //smpi_comm_destroy(comm);
347   
348   XBT_DEBUG("Entering MPI_Win_Complete");
349   int i=0,j=0;
350   int size = smpi_group_size(win->group);
351   MPI_Request* reqs = xbt_new0(MPI_Request, size);
352   
353   while(j!=size){
354     int dst=smpi_group_index(win->group,j);
355     if(dst!=smpi_process_index()){
356       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst,
357         RMA_TAG+5, MPI_COMM_WORLD);
358       i++;
359     }
360     j++;
361   }
362   size=i;
363   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
364   smpi_mpi_startall(size, reqs);
365   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
366   
367   for(i=0;i<size;i++){
368     smpi_mpi_request_free(&reqs[i]);
369   }
370   xbt_free(reqs);
371   
372   //now we can finish RMA calls
373   
374   xbt_dynar_t reqqs = win->requests;
375   size = xbt_dynar_length(reqqs);
376   
377   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
378   unsigned int cpt=0;
379   MPI_Request req;
380   // start all requests that have been prepared by another process
381   xbt_dynar_foreach(reqqs, cpt, req){
382     if (req->flags & PREPARED) smpi_mpi_start(req);
383   }
384
385   MPI_Request* treqs = xbt_dynar_to_array(reqqs);
386   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
387   xbt_free(treqs);
388   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
389   win->opened--; //we're closed for business !
390   return MPI_SUCCESS;
391 }
392
393
394
395 int smpi_mpi_win_wait(MPI_Win win){
396 //  xbt_barrier_wait(win->bar);
397   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
398   //mpi_coll_barrier_fun(comm);
399   //smpi_comm_destroy(comm);
400   //naive, blocking implementation.
401   XBT_DEBUG("Entering MPI_Win_Wait");
402   int i=0,j=0;
403   int size = smpi_group_size(win->group);
404   MPI_Request* reqs = xbt_new0(MPI_Request, size);
405   
406 //  for(i=0;i<size;i++){
407   while(j!=size){
408     int src=smpi_group_index(win->group,j);
409     if(src!=smpi_process_index()){
410       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
411       i++;
412     }
413     j++;
414   }
415   size=i;
416   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
417   smpi_mpi_startall(size, reqs);
418   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
419   for(i=0;i<size;i++){
420     smpi_mpi_request_free(&reqs[i]);
421   }
422   xbt_free(reqs);
423
424   xbt_dynar_t reqqs = win->requests;
425   size = xbt_dynar_length(reqqs);
426   
427   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
428
429   unsigned int cpt=0;
430   MPI_Request req;
431   // start all requests that have been prepared by another process
432   xbt_dynar_foreach(reqqs, cpt, req){
433     if (req->flags & PREPARED) smpi_mpi_start(req);
434   }
435
436   MPI_Request* treqs = xbt_dynar_to_array(reqqs);
437   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
438   xbt_free(treqs);
439   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
440   win->opened--; //we're opened for business !
441   return MPI_SUCCESS;
442 }
443
444
445