Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix for [#136] on github.
[simgrid.git] / src / smpi / smpi_rma.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 msg_bar_t creation_bar = nullptr;
13
14 typedef struct s_smpi_mpi_win{
15   void* base;
16   MPI_Aint size;
17   int disp_unit;
18   MPI_Comm comm;
19   MPI_Info info;
20   int assert;
21   std::vector<MPI_Request> *requests;
22   msg_bar_t bar;
23   MPI_Win* connected_wins;
24   char* name;
25   int opened;
26   MPI_Group group;
27   int count; //for ordering the accs
28 } s_smpi_mpi_win_t;
29
30
31 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
32   MPI_Win win;
33
34   int comm_size = smpi_comm_size(comm);
35   int rank=smpi_comm_rank(comm);
36   XBT_DEBUG("Creating window");
37
38   win = xbt_new(s_smpi_mpi_win_t, 1);
39   win->base = base;
40   win->size = size;
41   win->disp_unit = disp_unit;
42   win->assert = 0;
43   win->info = info;
44   if(info!=MPI_INFO_NULL)
45     info->refcount++;
46   win->comm = comm;
47   win->name = nullptr;
48   win->opened = 0;
49   win->group = MPI_GROUP_NULL;
50   win->requests = new std::vector<MPI_Request>();
51   win->connected_wins = xbt_new0(MPI_Win, comm_size);
52   win->connected_wins[rank] = win;
53   win->count = 0;
54   if(rank==0){
55     win->bar = MSG_barrier_init(comm_size);
56   }
57   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
58                          MPI_BYTE, comm);
59
60   mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
61
62   mpi_coll_barrier_fun(comm);
63
64   return win;
65 }
66
67 int smpi_mpi_win_free( MPI_Win* win){
68   //As per the standard, perform a barrier to ensure every async comm is finished
69   MSG_barrier_wait((*win)->bar);
70   delete (*win)->requests;
71   xbt_free((*win)->connected_wins);
72   if ((*win)->name != nullptr){
73     xbt_free((*win)->name);
74   }
75   if((*win)->info!=MPI_INFO_NULL){
76     MPI_Info_free(&(*win)->info);
77   }
78
79   mpi_coll_barrier_fun((*win)->comm);
80   int rank=smpi_comm_rank((*win)->comm);
81   if(rank == 0)
82     MSG_barrier_destroy((*win)->bar);
83   xbt_free(*win);
84   *win = MPI_WIN_NULL;
85   return MPI_SUCCESS;
86 }
87
88 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
89   if(win->name==nullptr){
90     *length=0;
91     name=nullptr;
92     return;
93   }
94   *length = strlen(win->name);
95   strncpy(name, win->name, *length+1);
96 }
97
98 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
99   if(win->comm != MPI_COMM_NULL){
100     *group = smpi_comm_group(win->comm);
101   } else {
102     *group = MPI_GROUP_NULL;
103   }
104 }
105
106 void smpi_mpi_win_set_name(MPI_Win win, char* name){
107   win->name = xbt_strdup(name);
108 }
109
110 int smpi_mpi_win_fence( int assert,  MPI_Win win){
111   XBT_DEBUG("Entering fence");
112   if(win->opened==0)
113     win->opened=1;
114   if(assert != MPI_MODE_NOPRECEDE){
115     MSG_barrier_wait(win->bar);
116
117     std::vector<MPI_Request> *reqs = win->requests;
118     int size = static_cast<int>(reqs->size());
119     // start all requests that have been prepared by another process
120     for(auto req: *reqs){
121       if (req && (req->flags & PREPARED))
122         smpi_mpi_start(req);
123     }
124
125     MPI_Request* treqs = &(*reqs)[0];
126     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
127     win->count=0;
128   }
129   win->assert = assert;
130
131   MSG_barrier_wait(win->bar);
132   XBT_DEBUG("Leaving fence ");
133
134   return MPI_SUCCESS;
135 }
136
137 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
138               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
139 {
140   if(win->opened==0)//check that post/start has been done
141     return MPI_ERR_WIN;
142   //get receiver pointer
143   MPI_Win recv_win = win->connected_wins[target_rank];
144
145   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
146   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
147
148   if(target_rank != smpi_comm_rank(win->comm)){
149     //prepare send_request
150     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
151         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
152
153     //prepare receiver request
154     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
155         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
156
157     //push request to receiver's win
158     recv_win->requests->push_back(rreq);
159
160     //start send
161     smpi_mpi_start(sreq);
162
163     //push request to sender's win
164     win->requests->push_back(sreq);
165   }else{
166     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
167   }
168
169   return MPI_SUCCESS;
170 }
171
172 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
173               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
174 {
175   if(win->opened==0)//check that post/start has been done
176     return MPI_ERR_WIN;
177   //get sender pointer
178   MPI_Win send_win = win->connected_wins[target_rank];
179
180   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
181   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
182
183   if(target_rank != smpi_comm_rank(win->comm)){
184     //prepare send_request
185     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
186         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
187         MPI_OP_NULL);
188
189     //prepare receiver request
190     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
191         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
192         MPI_OP_NULL);
193
194     //start the send, with another process than us as sender. 
195     smpi_mpi_start(sreq);
196
197     //push request to receiver's win
198     send_win->requests->push_back(sreq);
199
200     //start recv
201     smpi_mpi_start(rreq);
202
203     //push request to sender's win
204     win->requests->push_back(rreq);
205   }else{
206     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
207   }
208
209   return MPI_SUCCESS;
210 }
211
212
213 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
214               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
215 {
216   if(win->opened==0)//check that post/start has been done
217     return MPI_ERR_WIN;
218   //FIXME: local version 
219   //get receiver pointer
220   MPI_Win recv_win = win->connected_wins[target_rank];
221
222   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
223   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
224     //As the tag will be used for ordering of the operations, add count to it
225     //prepare send_request
226     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
227         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
228
229     //prepare receiver request
230     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
231         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
232
233     win->count++;
234     //push request to receiver's win
235     recv_win->requests->push_back(rreq);
236     //start send
237     smpi_mpi_start(sreq);
238
239     //push request to sender's win
240     win->requests->push_back(sreq);
241
242   return MPI_SUCCESS;
243 }
244
245 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
246     /* From MPI forum advices
247     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
248     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
249     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
250     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
251     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
252     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
253     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
254     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
255     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
256     must complete, without further dependencies.  */
257
258   //naive, blocking implementation.
259   int i=0,j=0;
260   int size = smpi_group_size(group);
261   MPI_Request* reqs = xbt_new0(MPI_Request, size);
262
263   while(j!=size){
264     int src=smpi_group_index(group,j);
265     if(src!=smpi_process_index()){
266       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
267       i++;
268     }
269     j++;
270   }
271   size=i;
272   smpi_mpi_startall(size, reqs);
273   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
274   for(i=0;i<size;i++){
275     smpi_mpi_request_free(&reqs[i]);
276   }
277   xbt_free(reqs);
278   win->opened++; //we're open for business !
279   win->group=group;
280   smpi_group_use(group);
281   return MPI_SUCCESS;
282 }
283
284 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
285   //let's make a synchronous send here
286   int i=0,j=0;
287   int size = smpi_group_size(group);
288   MPI_Request* reqs = xbt_new0(MPI_Request, size);
289
290   while(j!=size){
291     int dst=smpi_group_index(group,j);
292     if(dst!=smpi_process_index()){
293       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
294       i++;
295     }
296     j++;
297   }
298   size=i;
299
300   smpi_mpi_startall(size, reqs);
301   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
302   for(i=0;i<size;i++){
303     smpi_mpi_request_free(&reqs[i]);
304   }
305   xbt_free(reqs);
306   win->opened++; //we're open for business !
307   win->group=group;
308   smpi_group_use(group);
309   return MPI_SUCCESS;
310 }
311
312 int smpi_mpi_win_complete(MPI_Win win){
313   if(win->opened==0)
314     xbt_die("Complete called on already opened MPI_Win");
315
316   XBT_DEBUG("Entering MPI_Win_Complete");
317   int i=0,j=0;
318   int size = smpi_group_size(win->group);
319   MPI_Request* reqs = xbt_new0(MPI_Request, size);
320
321   while(j!=size){
322     int dst=smpi_group_index(win->group,j);
323     if(dst!=smpi_process_index()){
324       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
325       i++;
326     }
327     j++;
328   }
329   size=i;
330   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
331   smpi_mpi_startall(size, reqs);
332   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
333
334   for(i=0;i<size;i++){
335     smpi_mpi_request_free(&reqs[i]);
336   }
337   xbt_free(reqs);
338
339   //now we can finish RMA calls
340
341   std::vector<MPI_Request> *reqqs = win->requests;
342   size = static_cast<int>(reqqs->size());
343
344   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
345   // start all requests that have been prepared by another process
346   for (auto req: *reqqs){
347     if (req && (req->flags & PREPARED))
348       smpi_mpi_start(req);
349   }
350
351   MPI_Request* treqs = &(*reqqs)[0];
352   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
353   reqqs->clear();
354   smpi_group_unuse(win->group);
355   win->opened--; //we're closed for business !
356   return MPI_SUCCESS;
357 }
358
359 int smpi_mpi_win_wait(MPI_Win win){
360   //naive, blocking implementation.
361   XBT_DEBUG("Entering MPI_Win_Wait");
362   int i=0,j=0;
363   int size = smpi_group_size(win->group);
364   MPI_Request* reqs = xbt_new0(MPI_Request, size);
365
366   while(j!=size){
367     int src=smpi_group_index(win->group,j);
368     if(src!=smpi_process_index()){
369       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
370       i++;
371     }
372     j++;
373   }
374   size=i;
375   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
376   smpi_mpi_startall(size, reqs);
377   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
378   for(i=0;i<size;i++){
379     smpi_mpi_request_free(&reqs[i]);
380   }
381   xbt_free(reqs);
382
383   std::vector<MPI_Request> *reqqs = win->requests;
384   size = static_cast<int>(reqqs->size());
385
386   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
387
388   // start all requests that have been prepared by another process
389   for(auto req: *reqqs){
390     if (req && (req->flags & PREPARED))
391       smpi_mpi_start(req);
392   }
393
394   MPI_Request* treqs = &(*reqqs)[0];
395   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
396   reqqs->clear();
397   smpi_group_unuse(win->group);
398   win->opened--; //we're opened for business !
399   return MPI_SUCCESS;
400 }