Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Protect access to remote process list of rma requests, to avoid race conditions..
[simgrid.git] / src / smpi / smpi_rma.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 msg_bar_t creation_bar = nullptr;
13
14 typedef struct s_smpi_mpi_win{
15   void* base;
16   MPI_Aint size;
17   int disp_unit;
18   MPI_Comm comm;
19   MPI_Info info;
20   int assert;
21   std::vector<MPI_Request> *requests;
22   xbt_mutex_t mut;
23   msg_bar_t bar;
24   MPI_Win* connected_wins;
25   char* name;
26   int opened;
27   MPI_Group group;
28   int count; //for ordering the accs
29 } s_smpi_mpi_win_t;
30
31
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33   MPI_Win win;
34
35   int comm_size = smpi_comm_size(comm);
36   int rank=smpi_comm_rank(comm);
37   XBT_DEBUG("Creating window");
38
39   win = xbt_new(s_smpi_mpi_win_t, 1);
40   win->base = base;
41   win->size = size;
42   win->disp_unit = disp_unit;
43   win->assert = 0;
44   win->info = info;
45   if(info!=MPI_INFO_NULL)
46     info->refcount++;
47   win->comm = comm;
48   win->name = nullptr;
49   win->opened = 0;
50   win->group = MPI_GROUP_NULL;
51   win->requests = new std::vector<MPI_Request>();
52   win->mut=xbt_mutex_init();
53   win->connected_wins = xbt_new0(MPI_Win, comm_size);
54   win->connected_wins[rank] = win;
55   win->count = 0;
56   if(rank==0){
57     win->bar = MSG_barrier_init(comm_size);
58   }
59   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
60                          MPI_BYTE, comm);
61
62   mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
63
64   mpi_coll_barrier_fun(comm);
65
66   return win;
67 }
68
69 int smpi_mpi_win_free( MPI_Win* win){
70   //As per the standard, perform a barrier to ensure every async comm is finished
71   MSG_barrier_wait((*win)->bar);
72   xbt_mutex_acquire((*win)->mut);
73   delete (*win)->requests;
74   xbt_mutex_release((*win)->mut);
75   xbt_free((*win)->connected_wins);
76   if ((*win)->name != nullptr){
77     xbt_free((*win)->name);
78   }
79   if((*win)->info!=MPI_INFO_NULL){
80     MPI_Info_free(&(*win)->info);
81   }
82
83   mpi_coll_barrier_fun((*win)->comm);
84   int rank=smpi_comm_rank((*win)->comm);
85   if(rank == 0)
86     MSG_barrier_destroy((*win)->bar);
87   xbt_mutex_destroy((*win)->mut);
88   xbt_free(*win);
89   *win = MPI_WIN_NULL;
90   return MPI_SUCCESS;
91 }
92
93 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
94   if(win->name==nullptr){
95     *length=0;
96     name=nullptr;
97     return;
98   }
99   *length = strlen(win->name);
100   strncpy(name, win->name, *length+1);
101 }
102
103 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
104   if(win->comm != MPI_COMM_NULL){
105     *group = smpi_comm_group(win->comm);
106   } else {
107     *group = MPI_GROUP_NULL;
108   }
109 }
110
111 void smpi_mpi_win_set_name(MPI_Win win, char* name){
112   win->name = xbt_strdup(name);
113 }
114
115 int smpi_mpi_win_fence( int assert,  MPI_Win win){
116   XBT_DEBUG("Entering fence");
117   if(win->opened==0)
118     win->opened=1;
119   if(assert != MPI_MODE_NOPRECEDE){
120     MSG_barrier_wait(win->bar);
121     xbt_mutex_acquire(win->mut);
122     std::vector<MPI_Request> *reqs = win->requests;
123     int size = static_cast<int>(reqs->size());
124     // start all requests that have been prepared by another process
125     if(size>0){
126         for(auto req: *reqs){
127           if (req && (req->flags & PREPARED))
128             smpi_mpi_start(req);
129         }
130
131         MPI_Request* treqs = &(*reqs)[0];
132
133         smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
134     }
135     win->count=0;
136     xbt_mutex_release(win->mut);
137   }
138   win->assert = assert;
139
140   MSG_barrier_wait(win->bar);
141   XBT_DEBUG("Leaving fence ");
142
143   return MPI_SUCCESS;
144 }
145
146 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
147               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
148 {
149   if(win->opened==0)//check that post/start has been done
150     return MPI_ERR_WIN;
151   //get receiver pointer
152   MPI_Win recv_win = win->connected_wins[target_rank];
153
154   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
155   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
156
157   if(target_rank != smpi_comm_rank(win->comm)){
158     //prepare send_request
159     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
160         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
161
162     //prepare receiver request
163     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
164         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
165
166     //push request to receiver's win
167     xbt_mutex_acquire(recv_win->mut);
168     recv_win->requests->push_back(rreq);
169     xbt_mutex_release(recv_win->mut);
170     //start send
171     smpi_mpi_start(sreq);
172
173     //push request to sender's win
174     xbt_mutex_acquire(win->mut);
175     win->requests->push_back(sreq);
176     xbt_mutex_release(win->mut);
177   }else{
178     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
179   }
180
181   return MPI_SUCCESS;
182 }
183
184 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
185               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
186 {
187   if(win->opened==0)//check that post/start has been done
188     return MPI_ERR_WIN;
189   //get sender pointer
190   MPI_Win send_win = win->connected_wins[target_rank];
191
192   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
193   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
194
195   if(target_rank != smpi_comm_rank(win->comm)){
196     //prepare send_request
197     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
198         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
199         MPI_OP_NULL);
200
201     //prepare receiver request
202     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
203         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
204         MPI_OP_NULL);
205
206     //start the send, with another process than us as sender. 
207     smpi_mpi_start(sreq);
208     //push request to receiver's win
209     xbt_mutex_acquire(send_win->mut);
210     send_win->requests->push_back(sreq);
211     xbt_mutex_release(send_win->mut);
212
213     //start recv
214     smpi_mpi_start(rreq);
215     //push request to sender's win
216     xbt_mutex_acquire(win->mut);
217     win->requests->push_back(rreq);
218     xbt_mutex_release(win->mut);
219   }else{
220     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
221   }
222
223   return MPI_SUCCESS;
224 }
225
226
227 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
228               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
229 {
230   if(win->opened==0)//check that post/start has been done
231     return MPI_ERR_WIN;
232   //FIXME: local version 
233   //get receiver pointer
234   MPI_Win recv_win = win->connected_wins[target_rank];
235
236   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
237   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
238     //As the tag will be used for ordering of the operations, add count to it
239     //prepare send_request
240     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
241         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
242
243     //prepare receiver request
244     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
245         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
246
247     win->count++;
248     //push request to receiver's win
249     xbt_mutex_acquire(recv_win->mut);
250     recv_win->requests->push_back(rreq);
251     xbt_mutex_release(recv_win->mut);
252     //start send
253     smpi_mpi_start(sreq);
254
255     //push request to sender's win
256     xbt_mutex_acquire(win->mut);
257     win->requests->push_back(sreq);
258     xbt_mutex_release(win->mut);
259
260   return MPI_SUCCESS;
261 }
262
263 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
264     /* From MPI forum advices
265     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
266     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
267     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
268     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
269     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
270     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
271     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
272     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
273     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
274     must complete, without further dependencies.  */
275
276   //naive, blocking implementation.
277   int i=0,j=0;
278   int size = smpi_group_size(group);
279   MPI_Request* reqs = xbt_new0(MPI_Request, size);
280
281   while(j!=size){
282     int src=smpi_group_index(group,j);
283     if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){
284       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
285       i++;
286     }
287     j++;
288   }
289   size=i;
290   smpi_mpi_startall(size, reqs);
291   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
292   for(i=0;i<size;i++){
293     smpi_mpi_request_free(&reqs[i]);
294   }
295   xbt_free(reqs);
296   win->opened++; //we're open for business !
297   win->group=group;
298   smpi_group_use(group);
299   return MPI_SUCCESS;
300 }
301
302 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
303   //let's make a synchronous send here
304   int i=0,j=0;
305   int size = smpi_group_size(group);
306   MPI_Request* reqs = xbt_new0(MPI_Request, size);
307
308   while(j!=size){
309     int dst=smpi_group_index(group,j);
310     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
311       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
312       i++;
313     }
314     j++;
315   }
316   size=i;
317
318   smpi_mpi_startall(size, reqs);
319   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
320   for(i=0;i<size;i++){
321     smpi_mpi_request_free(&reqs[i]);
322   }
323   xbt_free(reqs);
324   win->opened++; //we're open for business !
325   win->group=group;
326   smpi_group_use(group);
327   return MPI_SUCCESS;
328 }
329
330 int smpi_mpi_win_complete(MPI_Win win){
331   if(win->opened==0)
332     xbt_die("Complete called on already opened MPI_Win");
333
334   XBT_DEBUG("Entering MPI_Win_Complete");
335   int i=0,j=0;
336   int size = smpi_group_size(win->group);
337   MPI_Request* reqs = xbt_new0(MPI_Request, size);
338
339   while(j!=size){
340     int dst=smpi_group_index(win->group,j);
341     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
342       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
343       i++;
344     }
345     j++;
346   }
347   size=i;
348   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
349   smpi_mpi_startall(size, reqs);
350   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
351
352   for(i=0;i<size;i++){
353     smpi_mpi_request_free(&reqs[i]);
354   }
355   xbt_free(reqs);
356
357   //now we can finish RMA calls
358   xbt_mutex_acquire(win->mut);
359   std::vector<MPI_Request> *reqqs = win->requests;
360   size = static_cast<int>(reqqs->size());
361
362   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
363   // start all requests that have been prepared by another process
364   for (auto req: *reqqs){
365     if (req && (req->flags & PREPARED))
366       smpi_mpi_start(req);
367   }
368
369   MPI_Request* treqs = &(*reqqs)[0];
370   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
371   reqqs->clear();
372   xbt_mutex_release(win->mut);
373
374   smpi_group_unuse(win->group);
375   win->opened--; //we're closed for business !
376   return MPI_SUCCESS;
377 }
378
379 int smpi_mpi_win_wait(MPI_Win win){
380   //naive, blocking implementation.
381   XBT_DEBUG("Entering MPI_Win_Wait");
382   int i=0,j=0;
383   int size = smpi_group_size(win->group);
384   MPI_Request* reqs = xbt_new0(MPI_Request, size);
385
386   while(j!=size){
387     int src=smpi_group_index(win->group,j);
388     if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
389       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
390       i++;
391     }
392     j++;
393   }
394   size=i;
395   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
396   smpi_mpi_startall(size, reqs);
397   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
398   for(i=0;i<size;i++){
399     smpi_mpi_request_free(&reqs[i]);
400   }
401   xbt_free(reqs);
402   xbt_mutex_acquire(win->mut);
403   std::vector<MPI_Request> *reqqs = win->requests;
404   size = static_cast<int>(reqqs->size());
405
406   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
407
408   // start all requests that have been prepared by another process
409   for(auto req: *reqqs){
410     if (req && (req->flags & PREPARED))
411       smpi_mpi_start(req);
412   }
413
414   MPI_Request* treqs = &(*reqqs)[0];
415   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
416   reqqs->clear();
417   xbt_mutex_release(win->mut);
418
419   smpi_group_unuse(win->group);
420   win->opened--; //we're opened for business !
421   return MPI_SUCCESS;
422 }