Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
87c8a8a2ce6934b39dcd4854c4e501c54aaae712
[simgrid.git] / src / smpi / smpi_rma.cpp
1
2 /* Copyright (c) 2007-2015. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 #define RMA_TAG -1234
13
14 xbt_bar_t creation_bar = NULL;
15
16 typedef struct s_smpi_mpi_win{
17   void* base;
18   MPI_Aint size;
19   int disp_unit;
20   MPI_Comm comm;
21   MPI_Info info;
22   int assert;
23   xbt_dynar_t requests;
24   xbt_bar_t bar;
25   MPI_Win* connected_wins;
26   char* name;
27   int opened;
28   MPI_Group group;
29 } s_smpi_mpi_win_t;
30
31
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33   MPI_Win win;
34
35   int comm_size = smpi_comm_size(comm);
36   int rank=smpi_comm_rank(comm);
37   XBT_DEBUG("Creating window");
38
39   win = xbt_new(s_smpi_mpi_win_t, 1);
40   win->base = base;
41   win->size = size;
42   win->disp_unit = disp_unit;
43   win->assert = 0;
44   win->info = info;
45   if(info!=MPI_INFO_NULL)
46     info->refcount++;
47   win->comm = comm;
48   win->name = NULL;
49   win->opened = 0;
50   win->group = MPI_GROUP_NULL;
51   win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
52   win->connected_wins = xbt_new0(MPI_Win, comm_size);
53   win->connected_wins[rank] = win;
54
55   if(rank==0){
56     win->bar=xbt_barrier_init(comm_size);
57   }
58   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
59                          MPI_BYTE, comm);
60
61   mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
62
63   mpi_coll_barrier_fun(comm);
64
65   return win;
66 }
67
68 int smpi_mpi_win_free( MPI_Win* win){
69   //As per the standard, perform a barrier to ensure every async comm is finished
70   xbt_barrier_wait((*win)->bar);
71   xbt_dynar_free(&(*win)->requests);
72   xbt_free((*win)->connected_wins);
73   if ((*win)->name != NULL){
74     xbt_free((*win)->name);
75   }
76   if((*win)->info!=MPI_INFO_NULL){
77     MPI_Info_free(&(*win)->info);
78   }
79
80   mpi_coll_barrier_fun((*win)->comm);
81   int rank=smpi_comm_rank((*win)->comm);
82   if(rank == 0)
83     xbt_barrier_destroy((*win)->bar);
84   xbt_free(*win);
85   *win = MPI_WIN_NULL;
86   return MPI_SUCCESS;
87 }
88
89 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
90   if(win->name==NULL){
91     *length=0;
92     name=NULL;
93     return;
94   }
95   *length = strlen(win->name);
96   strcpy(name, win->name);
97 }
98
99 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
100   if(win->comm != MPI_COMM_NULL){
101     *group = smpi_comm_group(win->comm);
102   }
103 }
104
105 void smpi_mpi_win_set_name(MPI_Win win, char* name){
106   win->name = xbt_strdup(name);;
107 }
108
109 int smpi_mpi_win_fence( int assert,  MPI_Win win){
110   XBT_DEBUG("Entering fence");
111   if(!win->opened)
112     win->opened=1;
113   if(assert != MPI_MODE_NOPRECEDE){
114     xbt_barrier_wait(win->bar);
115
116     xbt_dynar_t reqs = win->requests;
117     int size = xbt_dynar_length(reqs);
118     unsigned int cpt=0;
119     MPI_Request req;
120     // start all requests that have been prepared by another process
121     xbt_dynar_foreach(reqs, cpt, req){
122       if (req->flags & PREPARED) smpi_mpi_start(req);
123     }
124
125     MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
126     win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
127     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
128     xbt_free(treqs);
129
130   }
131   win->assert = assert;
132
133   xbt_barrier_wait(win->bar);
134   XBT_DEBUG("Leaving fence ");
135
136   return MPI_SUCCESS;
137 }
138
139 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
140               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
141 {
142   if(!win->opened)//check that post/start has been done
143     return MPI_ERR_WIN;
144   //get receiver pointer
145   MPI_Win recv_win = win->connected_wins[target_rank];
146
147   void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
148   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
149
150   if(target_rank != smpi_comm_rank(win->comm)){
151     //prepare send_request
152     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
153         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
154
155     //prepare receiver request
156     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
157         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
158
159     //push request to receiver's win
160     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
161
162     //start send
163     smpi_mpi_start(sreq);
164
165     //push request to sender's win
166     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
167   }else{
168     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
169   }
170
171   return MPI_SUCCESS;
172 }
173
174 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
175               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
176 {
177   if(!win->opened)//check that post/start has been done
178     return MPI_ERR_WIN;
179   //get sender pointer
180   MPI_Win send_win = win->connected_wins[target_rank];
181
182   void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
183   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
184
185   if(target_rank != smpi_comm_rank(win->comm)){
186     //prepare send_request
187     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
188         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
189         MPI_OP_NULL);
190
191     //prepare receiver request
192     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
193         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
194         MPI_OP_NULL);
195
196     //start the send, with another process than us as sender. 
197     smpi_mpi_start(sreq);
198
199     //push request to receiver's win
200     xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
201
202     //start recv
203     smpi_mpi_start(rreq);
204
205     //push request to sender's win
206     xbt_dynar_push_as(win->requests, MPI_Request, rreq);
207   }else{
208     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
209   }
210
211   return MPI_SUCCESS;
212 }
213
214
215 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
216               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
217 {
218   if(!win->opened)//check that post/start has been done
219     return MPI_ERR_WIN;
220   //FIXME: local version 
221   //get receiver pointer
222   MPI_Win recv_win = win->connected_wins[target_rank];
223
224   void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
225   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
226
227     //prepare send_request
228     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
229         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
230
231     //prepare receiver request
232     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
233         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
234     //push request to receiver's win
235     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
236     //start send
237     smpi_mpi_start(sreq);
238
239     //push request to sender's win
240     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
241
242   return MPI_SUCCESS;
243 }
244
245 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
246     /* From MPI forum advices
247     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
248     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
249     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
250     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
251     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
252     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
253     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
254     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
255     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
256     must complete, without further dependencies.  */
257
258   //naive, blocking implementation.
259   int i=0,j=0;
260   int size = smpi_group_size(group);
261   MPI_Request* reqs = xbt_new0(MPI_Request, size);
262
263 //  for(i=0;i<size;i++){
264   while(j!=size){
265     int src=smpi_group_index(group,j);
266     if(src!=smpi_process_index()){
267       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
268       i++;
269     }
270     j++;
271   }
272   size=i;
273   smpi_mpi_startall(size, reqs);
274   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
275   for(i=0;i<size;i++){
276     smpi_mpi_request_free(&reqs[i]);
277   }
278   xbt_free(reqs);
279   win->opened++; //we're open for business !
280   win->group=group;
281   smpi_group_use(group);
282   return MPI_SUCCESS;
283 }
284
285 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
286   //let's make a synchronous send here
287   int i=0,j=0;
288   int size = smpi_group_size(group);
289   MPI_Request* reqs = xbt_new0(MPI_Request, size);
290
291   while(j!=size){
292     int dst=smpi_group_index(group,j);
293     if(dst!=smpi_process_index()){
294       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
295       i++;
296     }
297     j++;
298   }
299   size=i;
300
301   smpi_mpi_startall(size, reqs);
302   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
303   for(i=0;i<size;i++){
304     smpi_mpi_request_free(&reqs[i]);
305   }
306   xbt_free(reqs);
307   win->opened++; //we're open for business !
308   win->group=group;
309   smpi_group_use(group);
310   return MPI_SUCCESS;
311 }
312
313 int smpi_mpi_win_complete(MPI_Win win){
314   if(win->opened==0)
315     xbt_die("Complete called on already opened MPI_Win");
316 //  xbt_barrier_wait(win->bar);
317   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
318   //mpi_coll_barrier_fun(comm);
319   //smpi_comm_destroy(comm);
320
321   XBT_DEBUG("Entering MPI_Win_Complete");
322   int i=0,j=0;
323   int size = smpi_group_size(win->group);
324   MPI_Request* reqs = xbt_new0(MPI_Request, size);
325
326   while(j!=size){
327     int dst=smpi_group_index(win->group,j);
328     if(dst!=smpi_process_index()){
329       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
330       i++;
331     }
332     j++;
333   }
334   size=i;
335   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
336   smpi_mpi_startall(size, reqs);
337   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
338
339   for(i=0;i<size;i++){
340     smpi_mpi_request_free(&reqs[i]);
341   }
342   xbt_free(reqs);
343
344   //now we can finish RMA calls
345
346   xbt_dynar_t reqqs = win->requests;
347   size = xbt_dynar_length(reqqs);
348
349   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
350   unsigned int cpt=0;
351   MPI_Request req;
352   // start all requests that have been prepared by another process
353   xbt_dynar_foreach(reqqs, cpt, req){
354     if (req->flags & PREPARED) smpi_mpi_start(req);
355   }
356
357   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
358   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
359   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
360   xbt_free(treqs);
361   smpi_group_unuse(win->group);
362   win->opened--; //we're closed for business !
363   return MPI_SUCCESS;
364 }
365
366 int smpi_mpi_win_wait(MPI_Win win){
367 //  xbt_barrier_wait(win->bar);
368   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
369   //mpi_coll_barrier_fun(comm);
370   //smpi_comm_destroy(comm);
371   //naive, blocking implementation.
372   XBT_DEBUG("Entering MPI_Win_Wait");
373   int i=0,j=0;
374   int size = smpi_group_size(win->group);
375   MPI_Request* reqs = xbt_new0(MPI_Request, size);
376
377 //  for(i=0;i<size;i++){
378   while(j!=size){
379     int src=smpi_group_index(win->group,j);
380     if(src!=smpi_process_index()){
381       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
382       i++;
383     }
384     j++;
385   }
386   size=i;
387   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
388   smpi_mpi_startall(size, reqs);
389   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
390   for(i=0;i<size;i++){
391     smpi_mpi_request_free(&reqs[i]);
392   }
393   xbt_free(reqs);
394
395   xbt_dynar_t reqqs = win->requests;
396   size = xbt_dynar_length(reqqs);
397
398   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
399
400   unsigned int cpt=0;
401   MPI_Request req;
402   // start all requests that have been prepared by another process
403   xbt_dynar_foreach(reqqs, cpt, req){
404     if (req->flags & PREPARED) smpi_mpi_start(req);
405   }
406
407   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
408   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
409   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
410   xbt_free(treqs);
411   smpi_group_unuse(win->group);
412   win->opened--; //we're opened for business !
413   return MPI_SUCCESS;
414 }