Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
leak--
[simgrid.git] / src / smpi / smpi_rma.cpp
1
2 /* Copyright (c) 2007-2015. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 #define RMA_TAG -1234
13
14 xbt_bar_t creation_bar = NULL;
15
16 typedef struct s_smpi_mpi_win{
17   void* base;
18   MPI_Aint size;
19   int disp_unit;
20   MPI_Comm comm;
21   MPI_Info info;
22   int assert;
23   xbt_dynar_t requests;
24   xbt_bar_t bar;
25   MPI_Win* connected_wins;
26   char* name;
27   int opened;
28   MPI_Group group;
29 } s_smpi_mpi_win_t;
30
31
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33   MPI_Win win;
34
35   int comm_size = smpi_comm_size(comm);
36   int rank=smpi_comm_rank(comm);
37   XBT_DEBUG("Creating window");
38
39   win = xbt_new(s_smpi_mpi_win_t, 1);
40   win->base = base;
41   win->size = size;
42   win->disp_unit = disp_unit;
43   win->assert = 0;
44   win->info = info;
45   if(info!=MPI_INFO_NULL)
46     info->refcount++;
47   win->comm = comm;
48   win->name = NULL;
49   win->opened = 0;
50   win->group = MPI_GROUP_NULL;
51   win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
52   win->connected_wins = xbt_new0(MPI_Win, comm_size);
53   win->connected_wins[rank] = win;
54
55   if(rank==0){
56     win->bar=xbt_barrier_init(comm_size);
57   }
58
59   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
60                          MPI_BYTE, comm);
61
62   mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
63
64   mpi_coll_barrier_fun(comm);
65
66   return win;
67 }
68
69 int smpi_mpi_win_free( MPI_Win* win){
70   //As per the standard, perform a barrier to ensure every async comm is finished
71   xbt_barrier_wait((*win)->bar);
72   xbt_dynar_free(&(*win)->requests);
73   xbt_free((*win)->connected_wins);
74   if ((*win)->name != NULL){
75     xbt_free((*win)->name);
76   }
77   if((*win)->info!=MPI_INFO_NULL){
78     MPI_Info_free(&(*win)->info);
79   }
80
81   mpi_coll_barrier_fun((*win)->comm);
82   int rank=smpi_comm_rank((*win)->comm);
83   if(rank == 0)
84     xbt_barrier_destroy((*win)->bar);
85   xbt_free(*win);
86   *win = MPI_WIN_NULL;
87   return MPI_SUCCESS;
88 }
89
90 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
91   if(win->name==NULL){
92     *length=0;
93     name=NULL;
94     return;
95   }
96   *length = strlen(win->name);
97   strcpy(name, win->name);
98 }
99
100 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
101   if(win->comm != MPI_COMM_NULL){
102     *group = smpi_comm_group(win->comm);
103     smpi_group_use(*group);
104   }
105 }
106
107 void smpi_mpi_win_set_name(MPI_Win win, char* name){
108   win->name = xbt_strdup(name);;
109 }
110
111 int smpi_mpi_win_fence( int assert,  MPI_Win win){
112   XBT_DEBUG("Entering fence");
113   if(!win->opened)
114     win->opened=1;
115   if(assert != MPI_MODE_NOPRECEDE){
116     xbt_barrier_wait(win->bar);
117
118     xbt_dynar_t reqs = win->requests;
119     int size = xbt_dynar_length(reqs);
120     unsigned int cpt=0;
121     MPI_Request req;
122     // start all requests that have been prepared by another process
123     xbt_dynar_foreach(reqs, cpt, req){
124       if (req->flags & PREPARED) smpi_mpi_start(req);
125     }
126
127     MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
128     win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
129     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
130     xbt_free(treqs);
131
132   }
133   win->assert = assert;
134
135   xbt_barrier_wait(win->bar);
136   XBT_DEBUG("Leaving fence ");
137
138   return MPI_SUCCESS;
139 }
140
141 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
142               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
143 {
144   if(!win->opened)//check that post/start has been done
145     return MPI_ERR_WIN;
146   //get receiver pointer
147   MPI_Win recv_win = win->connected_wins[target_rank];
148
149   void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
150   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
151
152   if(target_rank != smpi_comm_rank(win->comm)){
153     //prepare send_request
154     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
155         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
156
157     //prepare receiver request
158     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
159         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
160
161     //push request to receiver's win
162     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
163
164     //start send
165     smpi_mpi_start(sreq);
166
167     //push request to sender's win
168     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
169   }else{
170     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
171   }
172
173   return MPI_SUCCESS;
174 }
175
176 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
177               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
178 {
179   if(!win->opened)//check that post/start has been done
180     return MPI_ERR_WIN;
181   //get sender pointer
182   MPI_Win send_win = win->connected_wins[target_rank];
183
184   void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
185   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
186
187   if(target_rank != smpi_comm_rank(win->comm)){
188     //prepare send_request
189     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
190         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
191         MPI_OP_NULL);
192
193     //prepare receiver request
194     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
195         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
196         MPI_OP_NULL);
197
198     //start the send, with another process than us as sender. 
199     smpi_mpi_start(sreq);
200
201     //push request to receiver's win
202     xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
203
204     //start recv
205     smpi_mpi_start(rreq);
206
207     //push request to sender's win
208     xbt_dynar_push_as(win->requests, MPI_Request, rreq);
209   }else{
210     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
211   }
212
213   return MPI_SUCCESS;
214 }
215
216
217 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
218               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
219 {
220   if(!win->opened)//check that post/start has been done
221     return MPI_ERR_WIN;
222   //FIXME: local version 
223   //get receiver pointer
224   MPI_Win recv_win = win->connected_wins[target_rank];
225
226   void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
227   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
228
229     //prepare send_request
230     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
231         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
232
233     //prepare receiver request
234     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
235         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
236     //push request to receiver's win
237     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
238     //start send
239     smpi_mpi_start(sreq);
240
241     //push request to sender's win
242     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
243
244   return MPI_SUCCESS;
245 }
246
247 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
248     /* From MPI forum advices
249     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
250     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
251     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
252     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
253     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
254     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
255     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
256     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
257     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
258     must complete, without further dependencies.  */
259
260   //naive, blocking implementation.
261   int i=0,j=0;
262   int size = smpi_group_size(group);
263   MPI_Request* reqs = xbt_new0(MPI_Request, size);
264
265 //  for(i=0;i<size;i++){
266   while(j!=size){
267     int src=smpi_group_index(group,j);
268     if(src!=smpi_process_index()){
269       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
270       i++;
271     }
272     j++;
273   }
274   size=i;
275   smpi_mpi_startall(size, reqs);
276   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
277   for(i=0;i<size;i++){
278     smpi_mpi_request_free(&reqs[i]);
279   }
280   xbt_free(reqs);
281   win->opened++; //we're open for business !
282   win->group=group;
283   smpi_group_use(group);
284   return MPI_SUCCESS;
285 }
286
287 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
288   //let's make a synchronous send here
289   int i=0,j=0;
290   int size = smpi_group_size(group);
291   MPI_Request* reqs = xbt_new0(MPI_Request, size);
292
293   while(j!=size){
294     int dst=smpi_group_index(group,j);
295     if(dst!=smpi_process_index()){
296       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
297       i++;
298     }
299     j++;
300   }
301   size=i;
302
303   smpi_mpi_startall(size, reqs);
304   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
305   for(i=0;i<size;i++){
306     smpi_mpi_request_free(&reqs[i]);
307   }
308   xbt_free(reqs);
309   win->opened++; //we're open for business !
310   win->group=group;
311   smpi_group_use(group);
312   return MPI_SUCCESS;
313 }
314
315 int smpi_mpi_win_complete(MPI_Win win){
316   if(win->opened==0)
317     xbt_die("Complete called on already opened MPI_Win");
318 //  xbt_barrier_wait(win->bar);
319   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
320   //mpi_coll_barrier_fun(comm);
321   //smpi_comm_destroy(comm);
322
323   XBT_DEBUG("Entering MPI_Win_Complete");
324   int i=0,j=0;
325   int size = smpi_group_size(win->group);
326   MPI_Request* reqs = xbt_new0(MPI_Request, size);
327
328   while(j!=size){
329     int dst=smpi_group_index(win->group,j);
330     if(dst!=smpi_process_index()){
331       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
332       i++;
333     }
334     j++;
335   }
336   size=i;
337   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
338   smpi_mpi_startall(size, reqs);
339   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
340
341   for(i=0;i<size;i++){
342     smpi_mpi_request_free(&reqs[i]);
343   }
344   xbt_free(reqs);
345
346   //now we can finish RMA calls
347
348   xbt_dynar_t reqqs = win->requests;
349   size = xbt_dynar_length(reqqs);
350
351   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
352   unsigned int cpt=0;
353   MPI_Request req;
354   // start all requests that have been prepared by another process
355   xbt_dynar_foreach(reqqs, cpt, req){
356     if (req->flags & PREPARED) smpi_mpi_start(req);
357   }
358
359   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
360   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
361   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
362   xbt_free(treqs);
363   smpi_group_unuse(win->group);
364   win->opened--; //we're closed for business !
365   return MPI_SUCCESS;
366 }
367
368 int smpi_mpi_win_wait(MPI_Win win){
369 //  xbt_barrier_wait(win->bar);
370   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
371   //mpi_coll_barrier_fun(comm);
372   //smpi_comm_destroy(comm);
373   //naive, blocking implementation.
374   XBT_DEBUG("Entering MPI_Win_Wait");
375   int i=0,j=0;
376   int size = smpi_group_size(win->group);
377   MPI_Request* reqs = xbt_new0(MPI_Request, size);
378
379 //  for(i=0;i<size;i++){
380   while(j!=size){
381     int src=smpi_group_index(win->group,j);
382     if(src!=smpi_process_index()){
383       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
384       i++;
385     }
386     j++;
387   }
388   size=i;
389   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
390   smpi_mpi_startall(size, reqs);
391   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
392   for(i=0;i<size;i++){
393     smpi_mpi_request_free(&reqs[i]);
394   }
395   xbt_free(reqs);
396
397   xbt_dynar_t reqqs = win->requests;
398   size = xbt_dynar_length(reqqs);
399
400   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
401
402   unsigned int cpt=0;
403   MPI_Request req;
404   // start all requests that have been prepared by another process
405   xbt_dynar_foreach(reqqs, cpt, req){
406     if (req->flags & PREPARED) smpi_mpi_start(req);
407   }
408
409   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
410   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
411   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
412   xbt_free(treqs);
413   smpi_group_unuse(win->group);
414   win->opened--; //we're opened for business !
415   return MPI_SUCCESS;
416 }