Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
leak --
[simgrid.git] / src / smpi / smpi_rma.cpp
1
2 /* Copyright (c) 2007-2015. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 #define RMA_TAG -1234
13
14 xbt_bar_t creation_bar = NULL;
15
16 typedef struct s_smpi_mpi_win{
17   void* base;
18   MPI_Aint size;
19   int disp_unit;
20   MPI_Comm comm;
21   MPI_Info info;
22   int assert;
23   xbt_dynar_t requests;
24   xbt_bar_t bar;
25   MPI_Win* connected_wins;
26   char* name;
27   int opened;
28   MPI_Group group;
29 } s_smpi_mpi_win_t;
30
31
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33   MPI_Win win;
34
35   int comm_size = smpi_comm_size(comm);
36   int rank=smpi_comm_rank(comm);
37   XBT_DEBUG("Creating window");
38
39   win = xbt_new(s_smpi_mpi_win_t, 1);
40   win->base = base;
41   win->size = size;
42   win->disp_unit = disp_unit;
43   win->assert = 0;
44   win->info = info;
45   if(info!=MPI_INFO_NULL)
46     info->refcount++;
47   win->comm = comm;
48   win->name = NULL;
49   win->opened = 0;
50   win->group = MPI_GROUP_NULL;
51   win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
52   win->connected_wins = xbt_new0(MPI_Win, comm_size);
53   win->connected_wins[rank] = win;
54
55   if(rank==0){
56     win->bar=xbt_barrier_init(comm_size);
57   }
58
59   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
60                          MPI_BYTE, comm);
61
62   mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
63
64   mpi_coll_barrier_fun(comm);
65
66   return win;
67 }
68
69 int smpi_mpi_win_free( MPI_Win* win){
70   //As per the standard, perform a barrier to ensure every async comm is finished
71   xbt_barrier_wait((*win)->bar);
72   xbt_dynar_free(&(*win)->requests);
73   xbt_free((*win)->connected_wins);
74   if ((*win)->name != NULL){
75     xbt_free((*win)->name);
76   }
77   if((*win)->info!=MPI_INFO_NULL){
78     MPI_Info_free(&(*win)->info);
79   }
80   xbt_free(*win);
81   *win = MPI_WIN_NULL;
82   return MPI_SUCCESS;
83 }
84
85 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
86   if(win->name==NULL){
87     *length=0;
88     name=NULL;
89     return;
90   }
91   *length = strlen(win->name);
92   strcpy(name, win->name);
93 }
94
95 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
96   if(win->comm != MPI_COMM_NULL){
97     *group = smpi_comm_group(win->comm);
98     smpi_group_use(*group);
99   }
100 }
101
102 void smpi_mpi_win_set_name(MPI_Win win, char* name){
103   win->name = xbt_strdup(name);;
104 }
105
106 int smpi_mpi_win_fence( int assert,  MPI_Win win){
107   XBT_DEBUG("Entering fence");
108   if(!win->opened)
109     win->opened=1;
110   if(assert != MPI_MODE_NOPRECEDE){
111     xbt_barrier_wait(win->bar);
112
113     xbt_dynar_t reqs = win->requests;
114     int size = xbt_dynar_length(reqs);
115     unsigned int cpt=0;
116     MPI_Request req;
117     // start all requests that have been prepared by another process
118     xbt_dynar_foreach(reqs, cpt, req){
119       if (req->flags & PREPARED) smpi_mpi_start(req);
120     }
121
122     MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
123     win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
124     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
125     xbt_free(treqs);
126
127   }
128   win->assert = assert;
129
130   xbt_barrier_wait(win->bar);
131   XBT_DEBUG("Leaving fence ");
132
133   return MPI_SUCCESS;
134 }
135
136 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
137               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
138 {
139   if(!win->opened)//check that post/start has been done
140     return MPI_ERR_WIN;
141   //get receiver pointer
142   MPI_Win recv_win = win->connected_wins[target_rank];
143
144   void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
145   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
146
147   if(target_rank != smpi_comm_rank(win->comm)){
148     //prepare send_request
149     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
150         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
151
152     //prepare receiver request
153     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
154         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
155
156     //push request to receiver's win
157     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
158
159     //start send
160     smpi_mpi_start(sreq);
161
162     //push request to sender's win
163     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
164   }else{
165     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
166   }
167
168   return MPI_SUCCESS;
169 }
170
171 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
172               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
173 {
174   if(!win->opened)//check that post/start has been done
175     return MPI_ERR_WIN;
176   //get sender pointer
177   MPI_Win send_win = win->connected_wins[target_rank];
178
179   void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
180   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
181
182   if(target_rank != smpi_comm_rank(win->comm)){
183     //prepare send_request
184     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
185         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
186         MPI_OP_NULL);
187
188     //prepare receiver request
189     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
190         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
191         MPI_OP_NULL);
192
193     //start the send, with another process than us as sender. 
194     smpi_mpi_start(sreq);
195
196     //push request to receiver's win
197     xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
198
199     //start recv
200     smpi_mpi_start(rreq);
201
202     //push request to sender's win
203     xbt_dynar_push_as(win->requests, MPI_Request, rreq);
204   }else{
205     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
206   }
207
208   return MPI_SUCCESS;
209 }
210
211
212 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
213               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
214 {
215   if(!win->opened)//check that post/start has been done
216     return MPI_ERR_WIN;
217   //FIXME: local version 
218   //get receiver pointer
219   MPI_Win recv_win = win->connected_wins[target_rank];
220
221   void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
222   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
223
224     //prepare send_request
225     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
226         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
227
228     //prepare receiver request
229     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
230         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
231     //push request to receiver's win
232     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
233     //start send
234     smpi_mpi_start(sreq);
235
236     //push request to sender's win
237     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
238
239   return MPI_SUCCESS;
240 }
241
242 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
243     /* From MPI forum advices
244     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
245     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
246     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
247     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
248     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
249     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
250     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
251     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
252     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
253     must complete, without further dependencies.  */
254
255   //naive, blocking implementation.
256   int i=0,j=0;
257   int size = smpi_group_size(group);
258   MPI_Request* reqs = xbt_new0(MPI_Request, size);
259
260 //  for(i=0;i<size;i++){
261   while(j!=size){
262     int src=smpi_group_index(group,j);
263     if(src!=smpi_process_index()){
264       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
265       i++;
266     }
267     j++;
268   }
269   size=i;
270   smpi_mpi_startall(size, reqs);
271   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
272   for(i=0;i<size;i++){
273     smpi_mpi_request_free(&reqs[i]);
274   }
275   xbt_free(reqs);
276   win->opened++; //we're open for business !
277   win->group=group;
278   smpi_group_use(group);
279   return MPI_SUCCESS;
280 }
281
282 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
283   //let's make a synchronous send here
284   int i=0,j=0;
285   int size = smpi_group_size(group);
286   MPI_Request* reqs = xbt_new0(MPI_Request, size);
287
288   while(j!=size){
289     int dst=smpi_group_index(group,j);
290     if(dst!=smpi_process_index()){
291       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
292       i++;
293     }
294     j++;
295   }
296   size=i;
297
298   smpi_mpi_startall(size, reqs);
299   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
300   for(i=0;i<size;i++){
301     smpi_mpi_request_free(&reqs[i]);
302   }
303   xbt_free(reqs);
304   win->opened++; //we're open for business !
305   win->group=group;
306   smpi_group_use(group);
307   return MPI_SUCCESS;
308 }
309
310 int smpi_mpi_win_complete(MPI_Win win){
311   if(win->opened==0)
312     xbt_die("Complete called on already opened MPI_Win");
313 //  xbt_barrier_wait(win->bar);
314   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
315   //mpi_coll_barrier_fun(comm);
316   //smpi_comm_destroy(comm);
317
318   XBT_DEBUG("Entering MPI_Win_Complete");
319   int i=0,j=0;
320   int size = smpi_group_size(win->group);
321   MPI_Request* reqs = xbt_new0(MPI_Request, size);
322
323   while(j!=size){
324     int dst=smpi_group_index(win->group,j);
325     if(dst!=smpi_process_index()){
326       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
327       i++;
328     }
329     j++;
330   }
331   size=i;
332   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
333   smpi_mpi_startall(size, reqs);
334   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
335
336   for(i=0;i<size;i++){
337     smpi_mpi_request_free(&reqs[i]);
338   }
339   xbt_free(reqs);
340
341   //now we can finish RMA calls
342
343   xbt_dynar_t reqqs = win->requests;
344   size = xbt_dynar_length(reqqs);
345
346   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
347   unsigned int cpt=0;
348   MPI_Request req;
349   // start all requests that have been prepared by another process
350   xbt_dynar_foreach(reqqs, cpt, req){
351     if (req->flags & PREPARED) smpi_mpi_start(req);
352   }
353
354   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
355   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
356   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
357   xbt_free(treqs);
358   smpi_group_unuse(win->group);
359   win->opened--; //we're closed for business !
360   return MPI_SUCCESS;
361 }
362
363 int smpi_mpi_win_wait(MPI_Win win){
364 //  xbt_barrier_wait(win->bar);
365   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
366   //mpi_coll_barrier_fun(comm);
367   //smpi_comm_destroy(comm);
368   //naive, blocking implementation.
369   XBT_DEBUG("Entering MPI_Win_Wait");
370   int i=0,j=0;
371   int size = smpi_group_size(win->group);
372   MPI_Request* reqs = xbt_new0(MPI_Request, size);
373
374 //  for(i=0;i<size;i++){
375   while(j!=size){
376     int src=smpi_group_index(win->group,j);
377     if(src!=smpi_process_index()){
378       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
379       i++;
380     }
381     j++;
382   }
383   size=i;
384   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
385   smpi_mpi_startall(size, reqs);
386   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
387   for(i=0;i<size;i++){
388     smpi_mpi_request_free(&reqs[i]);
389   }
390   xbt_free(reqs);
391
392   xbt_dynar_t reqqs = win->requests;
393   size = xbt_dynar_length(reqqs);
394
395   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
396
397   unsigned int cpt=0;
398   MPI_Request req;
399   // start all requests that have been prepared by another process
400   xbt_dynar_foreach(reqqs, cpt, req){
401     if (req->flags & PREPARED) smpi_mpi_start(req);
402   }
403
404   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
405   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
406   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
407   xbt_free(treqs);
408   smpi_group_unuse(win->group);
409   win->opened--; //we're opened for business !
410   return MPI_SUCCESS;
411 }