Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'smpi_cpp'
[simgrid.git] / src / smpi / smpi_rma.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 typedef struct s_smpi_mpi_win{
13   void* base;
14   MPI_Aint size;
15   int disp_unit;
16   MPI_Comm comm;
17   MPI_Info info;
18   int assert;
19   std::vector<MPI_Request> *requests;
20   xbt_mutex_t mut;
21   msg_bar_t bar;
22   MPI_Win* connected_wins;
23   char* name;
24   int opened;
25   MPI_Group group;
26   int count; //for ordering the accs
27 } s_smpi_mpi_win_t;
28
29
30 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
31   int comm_size = comm->size();
32   int rank      = comm->rank();
33   XBT_DEBUG("Creating window");
34
35   MPI_Win win    = xbt_new(s_smpi_mpi_win_t, 1);
36   win->base = base;
37   win->size = size;
38   win->disp_unit = disp_unit;
39   win->assert = 0;
40   win->info = info;
41   if(info!=MPI_INFO_NULL)
42     info->refcount++;
43   win->comm = comm;
44   win->name = nullptr;
45   win->opened = 0;
46   win->group = MPI_GROUP_NULL;
47   win->requests = new std::vector<MPI_Request>();
48   win->mut=xbt_mutex_init();
49   win->connected_wins = xbt_new0(MPI_Win, comm_size);
50   win->connected_wins[rank] = win;
51   win->count = 0;
52   if(rank==0){
53     win->bar = MSG_barrier_init(comm_size);
54   }
55   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
56                          MPI_BYTE, comm);
57
58   mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
59
60   mpi_coll_barrier_fun(comm);
61
62   return win;
63 }
64
65 int smpi_mpi_win_free( MPI_Win* win){
66   //As per the standard, perform a barrier to ensure every async comm is finished
67   MSG_barrier_wait((*win)->bar);
68   xbt_mutex_acquire((*win)->mut);
69   delete (*win)->requests;
70   xbt_mutex_release((*win)->mut);
71   xbt_free((*win)->connected_wins);
72   if ((*win)->name != nullptr){
73     xbt_free((*win)->name);
74   }
75   if((*win)->info!=MPI_INFO_NULL){
76     MPI_Info_free(&(*win)->info);
77   }
78
79   mpi_coll_barrier_fun((*win)->comm);
80   int rank=(*win)->comm->rank();
81   if(rank == 0)
82     MSG_barrier_destroy((*win)->bar);
83   xbt_mutex_destroy((*win)->mut);
84   xbt_free(*win);
85   *win = MPI_WIN_NULL;
86   return MPI_SUCCESS;
87 }
88
89 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
90   if(win->name==nullptr){
91     *length=0;
92     name=nullptr;
93     return;
94   }
95   *length = strlen(win->name);
96   strncpy(name, win->name, *length+1);
97 }
98
99 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
100   if(win->comm != MPI_COMM_NULL){
101     *group = win->comm->group();
102   } else {
103     *group = MPI_GROUP_NULL;
104   }
105 }
106
107 void smpi_mpi_win_set_name(MPI_Win win, char* name){
108   win->name = xbt_strdup(name);
109 }
110
111 int smpi_mpi_win_fence(int assert, MPI_Win win)
112 {
113   XBT_DEBUG("Entering fence");
114   if (win->opened == 0)
115     win->opened=1;
116   if (assert != MPI_MODE_NOPRECEDE) {
117     // This is not the first fence => finalize what came before
118     MSG_barrier_wait(win->bar);
119     xbt_mutex_acquire(win->mut);
120     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
121     // Without this, the vector could get redimensionned when another process pushes.
122     // This would result in the array used by smpi_mpi_waitall() to be invalidated.
123     // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall
124     std::vector<MPI_Request> *reqs = win->requests;
125     int size = static_cast<int>(reqs->size());
126     // start all requests that have been prepared by another process
127     if (size > 0) {
128       for (const auto& req : *reqs) {
129         if (req && (req->flags & PREPARED))
130           smpi_mpi_start(req);
131       }
132
133       MPI_Request* treqs = &(*reqs)[0];
134
135       smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
136     }
137     win->count=0;
138     xbt_mutex_release(win->mut);
139   }
140   win->assert = assert;
141
142   MSG_barrier_wait(win->bar);
143   XBT_DEBUG("Leaving fence");
144
145   return MPI_SUCCESS;
146 }
147
148 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
149               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
150 {
151   if(win->opened==0)//check that post/start has been done
152     return MPI_ERR_WIN;
153   //get receiver pointer
154   MPI_Win recv_win = win->connected_wins[target_rank];
155
156   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
157   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
158
159   if(target_rank != win->comm->rank()){
160     //prepare send_request
161     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
162         win->comm->group()->index(target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
163
164     //prepare receiver request
165     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
166         win->comm->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
167
168     //push request to receiver's win
169     xbt_mutex_acquire(recv_win->mut);
170     recv_win->requests->push_back(rreq);
171     xbt_mutex_release(recv_win->mut);
172     //start send
173     smpi_mpi_start(sreq);
174
175     //push request to sender's win
176     xbt_mutex_acquire(win->mut);
177     win->requests->push_back(sreq);
178     xbt_mutex_release(win->mut);
179   }else{
180     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
181   }
182
183   return MPI_SUCCESS;
184 }
185
186 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
187               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
188 {
189   if(win->opened==0)//check that post/start has been done
190     return MPI_ERR_WIN;
191   //get sender pointer
192   MPI_Win send_win = win->connected_wins[target_rank];
193
194   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
195   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
196
197   if(target_rank != win->comm->rank()){
198     //prepare send_request
199     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
200         win->comm->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
201         MPI_OP_NULL);
202
203     //prepare receiver request
204     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
205         win->comm->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
206         MPI_OP_NULL);
207
208     //start the send, with another process than us as sender. 
209     smpi_mpi_start(sreq);
210     //push request to receiver's win
211     xbt_mutex_acquire(send_win->mut);
212     send_win->requests->push_back(sreq);
213     xbt_mutex_release(send_win->mut);
214
215     //start recv
216     smpi_mpi_start(rreq);
217     //push request to sender's win
218     xbt_mutex_acquire(win->mut);
219     win->requests->push_back(rreq);
220     xbt_mutex_release(win->mut);
221   }else{
222     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
223   }
224
225   return MPI_SUCCESS;
226 }
227
228
229 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
230               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
231 {
232   if(win->opened==0)//check that post/start has been done
233     return MPI_ERR_WIN;
234   //FIXME: local version 
235   //get receiver pointer
236   MPI_Win recv_win = win->connected_wins[target_rank];
237
238   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
239   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
240     //As the tag will be used for ordering of the operations, add count to it
241     //prepare send_request
242     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
243         smpi_process_index(), win->comm->group()->index(target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
244
245     //prepare receiver request
246     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
247         smpi_process_index(), win->comm->group()->index(target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
248
249     win->count++;
250     //push request to receiver's win
251     xbt_mutex_acquire(recv_win->mut);
252     recv_win->requests->push_back(rreq);
253     xbt_mutex_release(recv_win->mut);
254     //start send
255     smpi_mpi_start(sreq);
256
257     //push request to sender's win
258     xbt_mutex_acquire(win->mut);
259     win->requests->push_back(sreq);
260     xbt_mutex_release(win->mut);
261
262   return MPI_SUCCESS;
263 }
264
265 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
266     /* From MPI forum advices
267     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
268     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
269     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
270     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
271     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
272     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
273     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
274     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
275     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
276     must complete, without further dependencies.  */
277
278   //naive, blocking implementation.
279     int i             = 0;
280     int j             = 0;
281     int size          = group->size();
282     MPI_Request* reqs = xbt_new0(MPI_Request, size);
283
284     while (j != size) {
285       int src = group->index(j);
286       if (src != smpi_process_index() && src != MPI_UNDEFINED) {
287         reqs[i] = smpi_irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
288         i++;
289       }
290       j++;
291   }
292   size=i;
293   smpi_mpi_startall(size, reqs);
294   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
295   for(i=0;i<size;i++){
296     smpi_mpi_request_free(&reqs[i]);
297   }
298   xbt_free(reqs);
299   win->opened++; //we're open for business !
300   win->group=group;
301   group->use();
302   return MPI_SUCCESS;
303 }
304
305 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
306   //let's make a synchronous send here
307   int i             = 0;
308   int j             = 0;
309   int size = group->size();
310   MPI_Request* reqs = xbt_new0(MPI_Request, size);
311
312   while(j!=size){
313     int dst=group->index(j);
314     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
315       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
316       i++;
317     }
318     j++;
319   }
320   size=i;
321
322   smpi_mpi_startall(size, reqs);
323   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
324   for(i=0;i<size;i++){
325     smpi_mpi_request_free(&reqs[i]);
326   }
327   xbt_free(reqs);
328   win->opened++; //we're open for business !
329   win->group=group;
330   group->use();
331   return MPI_SUCCESS;
332 }
333
334 int smpi_mpi_win_complete(MPI_Win win){
335   if(win->opened==0)
336     xbt_die("Complete called on already opened MPI_Win");
337
338   XBT_DEBUG("Entering MPI_Win_Complete");
339   int i             = 0;
340   int j             = 0;
341   int size = win->group->size();
342   MPI_Request* reqs = xbt_new0(MPI_Request, size);
343
344   while(j!=size){
345     int dst=win->group->index(j);
346     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
347       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
348       i++;
349     }
350     j++;
351   }
352   size=i;
353   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
354   smpi_mpi_startall(size, reqs);
355   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
356
357   for(i=0;i<size;i++){
358     smpi_mpi_request_free(&reqs[i]);
359   }
360   xbt_free(reqs);
361
362   //now we can finish RMA calls
363   xbt_mutex_acquire(win->mut);
364   std::vector<MPI_Request> *reqqs = win->requests;
365   size = static_cast<int>(reqqs->size());
366
367   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
368   if (size > 0) {
369     // start all requests that have been prepared by another process
370     for (const auto& req : *reqqs) {
371       if (req && (req->flags & PREPARED))
372         smpi_mpi_start(req);
373     }
374
375     MPI_Request* treqs = &(*reqqs)[0];
376     smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
377     reqqs->clear();
378   }
379   xbt_mutex_release(win->mut);
380
381   win->group->unuse();
382   win->opened--; //we're closed for business !
383   return MPI_SUCCESS;
384 }
385
386 int smpi_mpi_win_wait(MPI_Win win){
387   //naive, blocking implementation.
388   XBT_DEBUG("Entering MPI_Win_Wait");
389   int i=0,j=0;
390   int size = win->group->size();
391   MPI_Request* reqs = xbt_new0(MPI_Request, size);
392
393   while(j!=size){
394     int src=win->group->index(j);
395     if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
396       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
397       i++;
398     }
399     j++;
400   }
401   size=i;
402   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
403   smpi_mpi_startall(size, reqs);
404   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
405   for(i=0;i<size;i++){
406     smpi_mpi_request_free(&reqs[i]);
407   }
408   xbt_free(reqs);
409   xbt_mutex_acquire(win->mut);
410   std::vector<MPI_Request> *reqqs = win->requests;
411   size = static_cast<int>(reqqs->size());
412
413   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
414   if (size > 0) {
415     // start all requests that have been prepared by another process
416     for (const auto& req : *reqqs) {
417       if (req && (req->flags & PREPARED))
418         smpi_mpi_start(req);
419     }
420
421     MPI_Request* treqs = &(*reqqs)[0];
422     smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
423     reqqs->clear();
424   }
425   xbt_mutex_release(win->mut);
426
427   win->group->unuse();
428   win->opened--; //we're opened for business !
429   return MPI_SUCCESS;
430 }