Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
21f3e02ea0e603071aa28146bf3ebc2354f90e0d
[simgrid.git] / src / smpi / smpi_rma.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 #define RMA_TAG -1234
13
14 xbt_bar_t creation_bar = nullptr;
15
16 typedef struct s_smpi_mpi_win{
17   void* base;
18   MPI_Aint size;
19   int disp_unit;
20   MPI_Comm comm;
21   MPI_Info info;
22   int assert;
23   std::vector<MPI_Request> *requests;
24   xbt_bar_t bar;
25   MPI_Win* connected_wins;
26   char* name;
27   int opened;
28   MPI_Group group;
29 } s_smpi_mpi_win_t;
30
31
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33   MPI_Win win;
34
35   int comm_size = smpi_comm_size(comm);
36   int rank=smpi_comm_rank(comm);
37   XBT_DEBUG("Creating window");
38
39   win = xbt_new(s_smpi_mpi_win_t, 1);
40   win->base = base;
41   win->size = size;
42   win->disp_unit = disp_unit;
43   win->assert = 0;
44   win->info = info;
45   if(info!=MPI_INFO_NULL)
46     info->refcount++;
47   win->comm = comm;
48   win->name = nullptr;
49   win->opened = 0;
50   win->group = MPI_GROUP_NULL;
51   win->requests = new std::vector<MPI_Request>();
52   win->connected_wins = xbt_new0(MPI_Win, comm_size);
53   win->connected_wins[rank] = win;
54
55   if(rank==0){
56     win->bar=xbt_barrier_init(comm_size);
57   }
58   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
59                          MPI_BYTE, comm);
60
61   mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
62
63   mpi_coll_barrier_fun(comm);
64
65   return win;
66 }
67
68 int smpi_mpi_win_free( MPI_Win* win){
69   //As per the standard, perform a barrier to ensure every async comm is finished
70   xbt_barrier_wait((*win)->bar);
71   delete (*win)->requests;
72   xbt_free((*win)->connected_wins);
73   if ((*win)->name != nullptr){
74     xbt_free((*win)->name);
75   }
76   if((*win)->info!=MPI_INFO_NULL){
77     MPI_Info_free(&(*win)->info);
78   }
79
80   mpi_coll_barrier_fun((*win)->comm);
81   int rank=smpi_comm_rank((*win)->comm);
82   if(rank == 0)
83     xbt_barrier_destroy((*win)->bar);
84   xbt_free(*win);
85   *win = MPI_WIN_NULL;
86   return MPI_SUCCESS;
87 }
88
89 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
90   if(win->name==nullptr){
91     *length=0;
92     name=nullptr;
93     return;
94   }
95   *length = strlen(win->name);
96   strncpy(name, win->name, *length+1);
97 }
98
99 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
100   if(win->comm != MPI_COMM_NULL){
101     *group = smpi_comm_group(win->comm);
102   }
103 }
104
105 void smpi_mpi_win_set_name(MPI_Win win, char* name){
106   win->name = xbt_strdup(name);
107 }
108
109 int smpi_mpi_win_fence( int assert,  MPI_Win win){
110   XBT_DEBUG("Entering fence");
111   if(win->opened==0)
112     win->opened=1;
113   if(assert != MPI_MODE_NOPRECEDE){
114     xbt_barrier_wait(win->bar);
115
116     std::vector<MPI_Request> *reqs = win->requests;
117     int size = static_cast<int>(reqs->size());
118     // start all requests that have been prepared by another process
119     for(auto req: *reqs){
120       if (req && (req->flags & PREPARED))
121         smpi_mpi_start(req);
122     }
123
124     MPI_Request* treqs = &(*reqs)[0];
125     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
126   }
127   win->assert = assert;
128
129   xbt_barrier_wait(win->bar);
130   XBT_DEBUG("Leaving fence ");
131
132   return MPI_SUCCESS;
133 }
134
135 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
136               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
137 {
138   if(win->opened==0)//check that post/start has been done
139     return MPI_ERR_WIN;
140   //get receiver pointer
141   MPI_Win recv_win = win->connected_wins[target_rank];
142
143   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
144   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
145
146   if(target_rank != smpi_comm_rank(win->comm)){
147     //prepare send_request
148     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
149         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
150
151     //prepare receiver request
152     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
153         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
154
155     //push request to receiver's win
156     recv_win->requests->push_back(rreq);
157
158     //start send
159     smpi_mpi_start(sreq);
160
161     //push request to sender's win
162     win->requests->push_back(sreq);
163   }else{
164     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
165   }
166
167   return MPI_SUCCESS;
168 }
169
170 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
171               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
172 {
173   if(win->opened==0)//check that post/start has been done
174     return MPI_ERR_WIN;
175   //get sender pointer
176   MPI_Win send_win = win->connected_wins[target_rank];
177
178   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
179   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
180
181   if(target_rank != smpi_comm_rank(win->comm)){
182     //prepare send_request
183     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
184         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
185         MPI_OP_NULL);
186
187     //prepare receiver request
188     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
189         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
190         MPI_OP_NULL);
191
192     //start the send, with another process than us as sender. 
193     smpi_mpi_start(sreq);
194
195     //push request to receiver's win
196     send_win->requests->push_back(sreq);
197
198     //start recv
199     smpi_mpi_start(rreq);
200
201     //push request to sender's win
202     win->requests->push_back(rreq);
203   }else{
204     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
205   }
206
207   return MPI_SUCCESS;
208 }
209
210
211 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
212               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
213 {
214   if(win->opened==0)//check that post/start has been done
215     return MPI_ERR_WIN;
216   //FIXME: local version 
217   //get receiver pointer
218   MPI_Win recv_win = win->connected_wins[target_rank];
219
220   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
221   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
222
223     //prepare send_request
224     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
225         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
226
227     //prepare receiver request
228     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
229         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
230     //push request to receiver's win
231     recv_win->requests->push_back(rreq);
232     //start send
233     smpi_mpi_start(sreq);
234
235     //push request to sender's win
236     win->requests->push_back(sreq);
237
238   return MPI_SUCCESS;
239 }
240
241 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
242     /* From MPI forum advices
243     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
244     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
245     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
246     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
247     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
248     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
249     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
250     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
251     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
252     must complete, without further dependencies.  */
253
254   //naive, blocking implementation.
255   int i=0,j=0;
256   int size = smpi_group_size(group);
257   MPI_Request* reqs = xbt_new0(MPI_Request, size);
258
259   while(j!=size){
260     int src=smpi_group_index(group,j);
261     if(src!=smpi_process_index()){
262       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
263       i++;
264     }
265     j++;
266   }
267   size=i;
268   smpi_mpi_startall(size, reqs);
269   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
270   for(i=0;i<size;i++){
271     smpi_mpi_request_free(&reqs[i]);
272   }
273   xbt_free(reqs);
274   win->opened++; //we're open for business !
275   win->group=group;
276   smpi_group_use(group);
277   return MPI_SUCCESS;
278 }
279
280 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
281   //let's make a synchronous send here
282   int i=0,j=0;
283   int size = smpi_group_size(group);
284   MPI_Request* reqs = xbt_new0(MPI_Request, size);
285
286   while(j!=size){
287     int dst=smpi_group_index(group,j);
288     if(dst!=smpi_process_index()){
289       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
290       i++;
291     }
292     j++;
293   }
294   size=i;
295
296   smpi_mpi_startall(size, reqs);
297   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
298   for(i=0;i<size;i++){
299     smpi_mpi_request_free(&reqs[i]);
300   }
301   xbt_free(reqs);
302   win->opened++; //we're open for business !
303   win->group=group;
304   smpi_group_use(group);
305   return MPI_SUCCESS;
306 }
307
308 int smpi_mpi_win_complete(MPI_Win win){
309   if(win->opened==0)
310     xbt_die("Complete called on already opened MPI_Win");
311
312   XBT_DEBUG("Entering MPI_Win_Complete");
313   int i=0,j=0;
314   int size = smpi_group_size(win->group);
315   MPI_Request* reqs = xbt_new0(MPI_Request, size);
316
317   while(j!=size){
318     int dst=smpi_group_index(win->group,j);
319     if(dst!=smpi_process_index()){
320       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
321       i++;
322     }
323     j++;
324   }
325   size=i;
326   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
327   smpi_mpi_startall(size, reqs);
328   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
329
330   for(i=0;i<size;i++){
331     smpi_mpi_request_free(&reqs[i]);
332   }
333   xbt_free(reqs);
334
335   //now we can finish RMA calls
336
337   std::vector<MPI_Request> *reqqs = win->requests;
338   size = static_cast<int>(reqqs->size());
339
340   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
341   // start all requests that have been prepared by another process
342   for (auto req: *reqqs){
343     if (req && (req->flags & PREPARED))
344       smpi_mpi_start(req);
345   }
346
347   MPI_Request* treqs = &(*reqqs)[0];
348   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
349   reqqs->clear();
350   smpi_group_unuse(win->group);
351   win->opened--; //we're closed for business !
352   return MPI_SUCCESS;
353 }
354
355 int smpi_mpi_win_wait(MPI_Win win){
356   //naive, blocking implementation.
357   XBT_DEBUG("Entering MPI_Win_Wait");
358   int i=0,j=0;
359   int size = smpi_group_size(win->group);
360   MPI_Request* reqs = xbt_new0(MPI_Request, size);
361
362   while(j!=size){
363     int src=smpi_group_index(win->group,j);
364     if(src!=smpi_process_index()){
365       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
366       i++;
367     }
368     j++;
369   }
370   size=i;
371   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
372   smpi_mpi_startall(size, reqs);
373   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
374   for(i=0;i<size;i++){
375     smpi_mpi_request_free(&reqs[i]);
376   }
377   xbt_free(reqs);
378
379   std::vector<MPI_Request> *reqqs = win->requests;
380   size = static_cast<int>(reqqs->size());
381
382   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
383
384   // start all requests that have been prepared by another process
385   for(auto req: *reqqs){
386     if (req && (req->flags & PREPARED))
387       smpi_mpi_start(req);
388   }
389
390   MPI_Request* treqs = &(*reqqs)[0];
391   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
392   reqqs->clear();
393   smpi_group_unuse(win->group);
394   win->opened--; //we're opened for business !
395   return MPI_SUCCESS;
396 }