Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Avoid doing things when there is nothing to do.
[simgrid.git] / src / smpi / smpi_rma.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 msg_bar_t creation_bar = nullptr;
13
14 typedef struct s_smpi_mpi_win{
15   void* base;
16   MPI_Aint size;
17   int disp_unit;
18   MPI_Comm comm;
19   MPI_Info info;
20   int assert;
21   std::vector<MPI_Request> *requests;
22   msg_bar_t bar;
23   MPI_Win* connected_wins;
24   char* name;
25   int opened;
26   MPI_Group group;
27   int count; //for ordering the accs
28 } s_smpi_mpi_win_t;
29
30
31 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
32   MPI_Win win;
33
34   int comm_size = smpi_comm_size(comm);
35   int rank=smpi_comm_rank(comm);
36   XBT_DEBUG("Creating window");
37
38   win = xbt_new(s_smpi_mpi_win_t, 1);
39   win->base = base;
40   win->size = size;
41   win->disp_unit = disp_unit;
42   win->assert = 0;
43   win->info = info;
44   if(info!=MPI_INFO_NULL)
45     info->refcount++;
46   win->comm = comm;
47   win->name = nullptr;
48   win->opened = 0;
49   win->group = MPI_GROUP_NULL;
50   win->requests = new std::vector<MPI_Request>();
51   win->connected_wins = xbt_new0(MPI_Win, comm_size);
52   win->connected_wins[rank] = win;
53   win->count = 0;
54   if(rank==0){
55     win->bar = MSG_barrier_init(comm_size);
56   }
57   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
58                          MPI_BYTE, comm);
59
60   mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
61
62   mpi_coll_barrier_fun(comm);
63
64   return win;
65 }
66
67 int smpi_mpi_win_free( MPI_Win* win){
68   //As per the standard, perform a barrier to ensure every async comm is finished
69   MSG_barrier_wait((*win)->bar);
70   delete (*win)->requests;
71   xbt_free((*win)->connected_wins);
72   if ((*win)->name != nullptr){
73     xbt_free((*win)->name);
74   }
75   if((*win)->info!=MPI_INFO_NULL){
76     MPI_Info_free(&(*win)->info);
77   }
78
79   mpi_coll_barrier_fun((*win)->comm);
80   int rank=smpi_comm_rank((*win)->comm);
81   if(rank == 0)
82     MSG_barrier_destroy((*win)->bar);
83   xbt_free(*win);
84   *win = MPI_WIN_NULL;
85   return MPI_SUCCESS;
86 }
87
88 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
89   if(win->name==nullptr){
90     *length=0;
91     name=nullptr;
92     return;
93   }
94   *length = strlen(win->name);
95   strncpy(name, win->name, *length+1);
96 }
97
98 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
99   if(win->comm != MPI_COMM_NULL){
100     *group = smpi_comm_group(win->comm);
101   } else {
102     *group = MPI_GROUP_NULL;
103   }
104 }
105
106 void smpi_mpi_win_set_name(MPI_Win win, char* name){
107   win->name = xbt_strdup(name);
108 }
109
110 int smpi_mpi_win_fence( int assert,  MPI_Win win){
111   XBT_DEBUG("Entering fence");
112   if(win->opened==0)
113     win->opened=1;
114   if(assert != MPI_MODE_NOPRECEDE){
115     MSG_barrier_wait(win->bar);
116
117     std::vector<MPI_Request> *reqs = win->requests;
118     int size = static_cast<int>(reqs->size());
119     // start all requests that have been prepared by another process
120     if(size>0){
121         for(auto req: *reqs){
122           if (req && (req->flags & PREPARED))
123             smpi_mpi_start(req);
124         }
125
126         MPI_Request* treqs = &(*reqs)[0];
127
128         smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
129     }
130     win->count=0;
131   }
132   win->assert = assert;
133
134   MSG_barrier_wait(win->bar);
135   XBT_DEBUG("Leaving fence ");
136
137   return MPI_SUCCESS;
138 }
139
140 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
141               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
142 {
143   if(win->opened==0)//check that post/start has been done
144     return MPI_ERR_WIN;
145   //get receiver pointer
146   MPI_Win recv_win = win->connected_wins[target_rank];
147
148   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
149   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
150
151   if(target_rank != smpi_comm_rank(win->comm)){
152     //prepare send_request
153     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
154         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
155
156     //prepare receiver request
157     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
158         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
159
160     //push request to receiver's win
161     recv_win->requests->push_back(rreq);
162
163     //start send
164     smpi_mpi_start(sreq);
165
166     //push request to sender's win
167     win->requests->push_back(sreq);
168   }else{
169     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
170   }
171
172   return MPI_SUCCESS;
173 }
174
175 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
176               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
177 {
178   if(win->opened==0)//check that post/start has been done
179     return MPI_ERR_WIN;
180   //get sender pointer
181   MPI_Win send_win = win->connected_wins[target_rank];
182
183   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
184   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
185
186   if(target_rank != smpi_comm_rank(win->comm)){
187     //prepare send_request
188     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
189         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
190         MPI_OP_NULL);
191
192     //prepare receiver request
193     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
194         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
195         MPI_OP_NULL);
196
197     //start the send, with another process than us as sender. 
198     smpi_mpi_start(sreq);
199
200     //push request to receiver's win
201     send_win->requests->push_back(sreq);
202
203     //start recv
204     smpi_mpi_start(rreq);
205
206     //push request to sender's win
207     win->requests->push_back(rreq);
208   }else{
209     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
210   }
211
212   return MPI_SUCCESS;
213 }
214
215
216 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
217               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
218 {
219   if(win->opened==0)//check that post/start has been done
220     return MPI_ERR_WIN;
221   //FIXME: local version 
222   //get receiver pointer
223   MPI_Win recv_win = win->connected_wins[target_rank];
224
225   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
226   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
227     //As the tag will be used for ordering of the operations, add count to it
228     //prepare send_request
229     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
230         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
231
232     //prepare receiver request
233     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
234         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
235
236     win->count++;
237     //push request to receiver's win
238     recv_win->requests->push_back(rreq);
239     //start send
240     smpi_mpi_start(sreq);
241
242     //push request to sender's win
243     win->requests->push_back(sreq);
244
245   return MPI_SUCCESS;
246 }
247
248 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
249     /* From MPI forum advices
250     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
251     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
252     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
253     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
254     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
255     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
256     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
257     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
258     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
259     must complete, without further dependencies.  */
260
261   //naive, blocking implementation.
262   int i=0,j=0;
263   int size = smpi_group_size(group);
264   MPI_Request* reqs = xbt_new0(MPI_Request, size);
265
266   while(j!=size){
267     int src=smpi_group_index(group,j);
268     if(src!=smpi_process_index()){
269       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
270       i++;
271     }
272     j++;
273   }
274   size=i;
275   smpi_mpi_startall(size, reqs);
276   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
277   for(i=0;i<size;i++){
278     smpi_mpi_request_free(&reqs[i]);
279   }
280   xbt_free(reqs);
281   win->opened++; //we're open for business !
282   win->group=group;
283   smpi_group_use(group);
284   return MPI_SUCCESS;
285 }
286
287 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
288   //let's make a synchronous send here
289   int i=0,j=0;
290   int size = smpi_group_size(group);
291   MPI_Request* reqs = xbt_new0(MPI_Request, size);
292
293   while(j!=size){
294     int dst=smpi_group_index(group,j);
295     if(dst!=smpi_process_index()){
296       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
297       i++;
298     }
299     j++;
300   }
301   size=i;
302
303   smpi_mpi_startall(size, reqs);
304   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
305   for(i=0;i<size;i++){
306     smpi_mpi_request_free(&reqs[i]);
307   }
308   xbt_free(reqs);
309   win->opened++; //we're open for business !
310   win->group=group;
311   smpi_group_use(group);
312   return MPI_SUCCESS;
313 }
314
315 int smpi_mpi_win_complete(MPI_Win win){
316   if(win->opened==0)
317     xbt_die("Complete called on already opened MPI_Win");
318
319   XBT_DEBUG("Entering MPI_Win_Complete");
320   int i=0,j=0;
321   int size = smpi_group_size(win->group);
322   MPI_Request* reqs = xbt_new0(MPI_Request, size);
323
324   while(j!=size){
325     int dst=smpi_group_index(win->group,j);
326     if(dst!=smpi_process_index()){
327       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
328       i++;
329     }
330     j++;
331   }
332   size=i;
333   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
334   smpi_mpi_startall(size, reqs);
335   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
336
337   for(i=0;i<size;i++){
338     smpi_mpi_request_free(&reqs[i]);
339   }
340   xbt_free(reqs);
341
342   //now we can finish RMA calls
343
344   std::vector<MPI_Request> *reqqs = win->requests;
345   size = static_cast<int>(reqqs->size());
346
347   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
348   // start all requests that have been prepared by another process
349   for (auto req: *reqqs){
350     if (req && (req->flags & PREPARED))
351       smpi_mpi_start(req);
352   }
353
354   MPI_Request* treqs = &(*reqqs)[0];
355   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
356   reqqs->clear();
357   smpi_group_unuse(win->group);
358   win->opened--; //we're closed for business !
359   return MPI_SUCCESS;
360 }
361
362 int smpi_mpi_win_wait(MPI_Win win){
363   //naive, blocking implementation.
364   XBT_DEBUG("Entering MPI_Win_Wait");
365   int i=0,j=0;
366   int size = smpi_group_size(win->group);
367   MPI_Request* reqs = xbt_new0(MPI_Request, size);
368
369   while(j!=size){
370     int src=smpi_group_index(win->group,j);
371     if(src!=smpi_process_index()){
372       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
373       i++;
374     }
375     j++;
376   }
377   size=i;
378   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
379   smpi_mpi_startall(size, reqs);
380   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
381   for(i=0;i<size;i++){
382     smpi_mpi_request_free(&reqs[i]);
383   }
384   xbt_free(reqs);
385
386   std::vector<MPI_Request> *reqqs = win->requests;
387   size = static_cast<int>(reqqs->size());
388
389   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
390
391   // start all requests that have been prepared by another process
392   for(auto req: *reqqs){
393     if (req && (req->flags & PREPARED))
394       smpi_mpi_start(req);
395   }
396
397   MPI_Request* treqs = &(*reqqs)[0];
398   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
399   reqqs->clear();
400   smpi_group_unuse(win->group);
401   win->opened--; //we're opened for business !
402   return MPI_SUCCESS;
403 }