Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
d4c51f4e91b07d8c7a294467b3d553c6d3ac01b5
[simgrid.git] / src / smpi / smpi_rma.cpp
1
2 /* Copyright (c) 2007-2015. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 #define RMA_TAG -1234
13
14 xbt_bar_t creation_bar = NULL;
15
16 typedef struct s_smpi_mpi_win{
17   void* base;
18   MPI_Aint size;
19   int disp_unit;
20   MPI_Comm comm;
21   MPI_Info info;
22   int assert;
23   xbt_dynar_t requests;
24   xbt_bar_t bar;
25   MPI_Win* connected_wins;
26   char* name;
27   int opened;
28   MPI_Group group;
29 } s_smpi_mpi_win_t;
30
31
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33   MPI_Win win;
34
35   int comm_size = smpi_comm_size(comm);
36   int rank=smpi_comm_rank(comm);
37   XBT_DEBUG("Creating window");
38
39   win = xbt_new(s_smpi_mpi_win_t, 1);
40   win->base = base;
41   win->size = size;
42   win->disp_unit = disp_unit;
43   win->assert = 0;
44   win->info = info;
45   if(info!=MPI_INFO_NULL)
46     info->refcount++;
47   win->comm = comm;
48   win->name = NULL;
49   win->opened = 0;
50   win->group = MPI_GROUP_NULL;
51   win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
52   win->connected_wins = xbt_new0(MPI_Win, comm_size);
53   win->connected_wins[rank] = win;
54
55   if(rank==0){
56     win->bar=xbt_barrier_init(comm_size);
57   }
58   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
59                          MPI_BYTE, comm);
60
61   mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
62
63   mpi_coll_barrier_fun(comm);
64
65   return win;
66 }
67
68 int smpi_mpi_win_free( MPI_Win* win){
69   //As per the standard, perform a barrier to ensure every async comm is finished
70   xbt_barrier_wait((*win)->bar);
71   xbt_dynar_free(&(*win)->requests);
72   xbt_free((*win)->connected_wins);
73   if ((*win)->name != NULL){
74     xbt_free((*win)->name);
75   }
76   if((*win)->info!=MPI_INFO_NULL){
77     MPI_Info_free(&(*win)->info);
78   }
79
80   mpi_coll_barrier_fun((*win)->comm);
81   int rank=smpi_comm_rank((*win)->comm);
82   if(rank == 0)
83     xbt_barrier_destroy((*win)->bar);
84   xbt_free(*win);
85   *win = MPI_WIN_NULL;
86   return MPI_SUCCESS;
87 }
88
89 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
90   if(win->name==NULL){
91     *length=0;
92     name=NULL;
93     return;
94   }
95   *length = strlen(win->name);
96   strncpy(name, win->name, *length+1);
97 }
98
99 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
100   if(win->comm != MPI_COMM_NULL){
101     *group = smpi_comm_group(win->comm);
102   }
103 }
104
105 void smpi_mpi_win_set_name(MPI_Win win, char* name){
106   win->name = xbt_strdup(name);
107 }
108
109 int smpi_mpi_win_fence( int assert,  MPI_Win win){
110   XBT_DEBUG("Entering fence");
111   if(win->opened==0)
112     win->opened=1;
113   if(assert != MPI_MODE_NOPRECEDE){
114     xbt_barrier_wait(win->bar);
115
116     xbt_dynar_t reqs = win->requests;
117     int size = xbt_dynar_length(reqs);
118     unsigned int cpt=0;
119     MPI_Request req;
120     // start all requests that have been prepared by another process
121     xbt_dynar_foreach(reqs, cpt, req){
122       if (req->flags & PREPARED) 
123         smpi_mpi_start(req);
124     }
125
126     MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
127     win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
128     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
129     xbt_free(treqs);
130
131   }
132   win->assert = assert;
133
134   xbt_barrier_wait(win->bar);
135   XBT_DEBUG("Leaving fence ");
136
137   return MPI_SUCCESS;
138 }
139
140 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
141               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
142 {
143   if(win->opened==0)//check that post/start has been done
144     return MPI_ERR_WIN;
145   //get receiver pointer
146   MPI_Win recv_win = win->connected_wins[target_rank];
147
148   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
149   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
150
151   if(target_rank != smpi_comm_rank(win->comm)){
152     //prepare send_request
153     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
154         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
155
156     //prepare receiver request
157     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
158         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
159
160     //push request to receiver's win
161     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
162
163     //start send
164     smpi_mpi_start(sreq);
165
166     //push request to sender's win
167     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
168   }else{
169     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
170   }
171
172   return MPI_SUCCESS;
173 }
174
175 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
176               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
177 {
178   if(win->opened==0)//check that post/start has been done
179     return MPI_ERR_WIN;
180   //get sender pointer
181   MPI_Win send_win = win->connected_wins[target_rank];
182
183   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
184   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
185
186   if(target_rank != smpi_comm_rank(win->comm)){
187     //prepare send_request
188     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
189         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
190         MPI_OP_NULL);
191
192     //prepare receiver request
193     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
194         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
195         MPI_OP_NULL);
196
197     //start the send, with another process than us as sender. 
198     smpi_mpi_start(sreq);
199
200     //push request to receiver's win
201     xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
202
203     //start recv
204     smpi_mpi_start(rreq);
205
206     //push request to sender's win
207     xbt_dynar_push_as(win->requests, MPI_Request, rreq);
208   }else{
209     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
210   }
211
212   return MPI_SUCCESS;
213 }
214
215
216 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
217               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
218 {
219   if(win->opened==0)//check that post/start has been done
220     return MPI_ERR_WIN;
221   //FIXME: local version 
222   //get receiver pointer
223   MPI_Win recv_win = win->connected_wins[target_rank];
224
225   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
226   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
227
228     //prepare send_request
229     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
230         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
231
232     //prepare receiver request
233     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
234         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
235     //push request to receiver's win
236     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
237     //start send
238     smpi_mpi_start(sreq);
239
240     //push request to sender's win
241     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
242
243   return MPI_SUCCESS;
244 }
245
246 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
247     /* From MPI forum advices
248     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
249     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
250     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
251     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
252     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
253     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
254     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
255     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
256     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
257     must complete, without further dependencies.  */
258
259   //naive, blocking implementation.
260   int i=0,j=0;
261   int size = smpi_group_size(group);
262   MPI_Request* reqs = xbt_new0(MPI_Request, size);
263
264   while(j!=size){
265     int src=smpi_group_index(group,j);
266     if(src!=smpi_process_index()){
267       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
268       i++;
269     }
270     j++;
271   }
272   size=i;
273   smpi_mpi_startall(size, reqs);
274   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
275   for(i=0;i<size;i++){
276     smpi_mpi_request_free(&reqs[i]);
277   }
278   xbt_free(reqs);
279   win->opened++; //we're open for business !
280   win->group=group;
281   smpi_group_use(group);
282   return MPI_SUCCESS;
283 }
284
285 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
286   //let's make a synchronous send here
287   int i=0,j=0;
288   int size = smpi_group_size(group);
289   MPI_Request* reqs = xbt_new0(MPI_Request, size);
290
291   while(j!=size){
292     int dst=smpi_group_index(group,j);
293     if(dst!=smpi_process_index()){
294       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
295       i++;
296     }
297     j++;
298   }
299   size=i;
300
301   smpi_mpi_startall(size, reqs);
302   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
303   for(i=0;i<size;i++){
304     smpi_mpi_request_free(&reqs[i]);
305   }
306   xbt_free(reqs);
307   win->opened++; //we're open for business !
308   win->group=group;
309   smpi_group_use(group);
310   return MPI_SUCCESS;
311 }
312
313 int smpi_mpi_win_complete(MPI_Win win){
314   if(win->opened==0)
315     xbt_die("Complete called on already opened MPI_Win");
316
317   XBT_DEBUG("Entering MPI_Win_Complete");
318   int i=0,j=0;
319   int size = smpi_group_size(win->group);
320   MPI_Request* reqs = xbt_new0(MPI_Request, size);
321
322   while(j!=size){
323     int dst=smpi_group_index(win->group,j);
324     if(dst!=smpi_process_index()){
325       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
326       i++;
327     }
328     j++;
329   }
330   size=i;
331   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
332   smpi_mpi_startall(size, reqs);
333   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
334
335   for(i=0;i<size;i++){
336     smpi_mpi_request_free(&reqs[i]);
337   }
338   xbt_free(reqs);
339
340   //now we can finish RMA calls
341
342   xbt_dynar_t reqqs = win->requests;
343   size = xbt_dynar_length(reqqs);
344
345   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
346   unsigned int cpt=0;
347   MPI_Request req;
348   // start all requests that have been prepared by another process
349   xbt_dynar_foreach(reqqs, cpt, req){
350     if (req->flags & PREPARED) 
351       smpi_mpi_start(req);
352   }
353
354   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
355   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
356   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
357   xbt_free(treqs);
358   smpi_group_unuse(win->group);
359   win->opened--; //we're closed for business !
360   return MPI_SUCCESS;
361 }
362
363 int smpi_mpi_win_wait(MPI_Win win){
364   //naive, blocking implementation.
365   XBT_DEBUG("Entering MPI_Win_Wait");
366   int i=0,j=0;
367   int size = smpi_group_size(win->group);
368   MPI_Request* reqs = xbt_new0(MPI_Request, size);
369
370   while(j!=size){
371     int src=smpi_group_index(win->group,j);
372     if(src!=smpi_process_index()){
373       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
374       i++;
375     }
376     j++;
377   }
378   size=i;
379   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
380   smpi_mpi_startall(size, reqs);
381   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
382   for(i=0;i<size;i++){
383     smpi_mpi_request_free(&reqs[i]);
384   }
385   xbt_free(reqs);
386
387   xbt_dynar_t reqqs = win->requests;
388   size = xbt_dynar_length(reqqs);
389
390   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
391
392   unsigned int cpt=0;
393   MPI_Request req;
394   // start all requests that have been prepared by another process
395   xbt_dynar_foreach(reqqs, cpt, req){
396     if (req->flags & PREPARED) 
397       smpi_mpi_start(req);
398   }
399
400   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
401   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
402   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
403   xbt_free(treqs);
404   smpi_group_unuse(win->group);
405   win->opened--; //we're opened for business !
406   return MPI_SUCCESS;
407 }