Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
comment to explain what's going on
[simgrid.git] / src / smpi / smpi_rma.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 typedef struct s_smpi_mpi_win{
13   void* base;
14   MPI_Aint size;
15   int disp_unit;
16   MPI_Comm comm;
17   MPI_Info info;
18   int assert;
19   std::vector<MPI_Request> *requests;
20   xbt_mutex_t mut;
21   msg_bar_t bar;
22   MPI_Win* connected_wins;
23   char* name;
24   int opened;
25   MPI_Group group;
26   int count; //for ordering the accs
27 } s_smpi_mpi_win_t;
28
29
30 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
31   MPI_Win win;
32
33   int comm_size = smpi_comm_size(comm);
34   int rank=smpi_comm_rank(comm);
35   XBT_DEBUG("Creating window");
36
37   win = xbt_new(s_smpi_mpi_win_t, 1);
38   win->base = base;
39   win->size = size;
40   win->disp_unit = disp_unit;
41   win->assert = 0;
42   win->info = info;
43   if(info!=MPI_INFO_NULL)
44     info->refcount++;
45   win->comm = comm;
46   win->name = nullptr;
47   win->opened = 0;
48   win->group = MPI_GROUP_NULL;
49   win->requests = new std::vector<MPI_Request>();
50   win->mut=xbt_mutex_init();
51   win->connected_wins = xbt_new0(MPI_Win, comm_size);
52   win->connected_wins[rank] = win;
53   win->count = 0;
54   if(rank==0){
55     win->bar = MSG_barrier_init(comm_size);
56   }
57   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
58                          MPI_BYTE, comm);
59
60   mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
61
62   mpi_coll_barrier_fun(comm);
63
64   return win;
65 }
66
67 int smpi_mpi_win_free( MPI_Win* win){
68   //As per the standard, perform a barrier to ensure every async comm is finished
69   MSG_barrier_wait((*win)->bar);
70   xbt_mutex_acquire((*win)->mut);
71   delete (*win)->requests;
72   xbt_mutex_release((*win)->mut);
73   xbt_free((*win)->connected_wins);
74   if ((*win)->name != nullptr){
75     xbt_free((*win)->name);
76   }
77   if((*win)->info!=MPI_INFO_NULL){
78     MPI_Info_free(&(*win)->info);
79   }
80
81   mpi_coll_barrier_fun((*win)->comm);
82   int rank=smpi_comm_rank((*win)->comm);
83   if(rank == 0)
84     MSG_barrier_destroy((*win)->bar);
85   xbt_mutex_destroy((*win)->mut);
86   xbt_free(*win);
87   *win = MPI_WIN_NULL;
88   return MPI_SUCCESS;
89 }
90
91 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
92   if(win->name==nullptr){
93     *length=0;
94     name=nullptr;
95     return;
96   }
97   *length = strlen(win->name);
98   strncpy(name, win->name, *length+1);
99 }
100
101 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
102   if(win->comm != MPI_COMM_NULL){
103     *group = smpi_comm_group(win->comm);
104   } else {
105     *group = MPI_GROUP_NULL;
106   }
107 }
108
109 void smpi_mpi_win_set_name(MPI_Win win, char* name){
110   win->name = xbt_strdup(name);
111 }
112
113 int smpi_mpi_win_fence(int assert, MPI_Win win)
114 {
115   XBT_DEBUG("Entering fence");
116   if (win->opened == 0)
117     win->opened=1;
118   if (assert != MPI_MODE_NOPRECEDE) {
119     // This is not the first fence => finalize what came before
120     MSG_barrier_wait(win->bar);
121     xbt_mutex_acquire(win->mut);
122     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
123     // Without this, the vector could get redimensionned when another process pushes.
124     // This would result in the array used by smpi_mpi_waitall() to be invalidated.
125     // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall
126     std::vector<MPI_Request> *reqs = win->requests;
127     int size = static_cast<int>(reqs->size());
128     // start all requests that have been prepared by another process
129     if (size > 0) {
130       for (const auto& req : *reqs) {
131         if (req && (req->flags & PREPARED))
132           smpi_mpi_start(req);
133       }
134
135       MPI_Request* treqs = &(*reqs)[0];
136
137       smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
138     }
139     win->count=0;
140     xbt_mutex_release(win->mut);
141   }
142   win->assert = assert;
143
144   MSG_barrier_wait(win->bar);
145   XBT_DEBUG("Leaving fence");
146
147   return MPI_SUCCESS;
148 }
149
150 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
151               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
152 {
153   if(win->opened==0)//check that post/start has been done
154     return MPI_ERR_WIN;
155   //get receiver pointer
156   MPI_Win recv_win = win->connected_wins[target_rank];
157
158   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
159   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
160
161   if(target_rank != smpi_comm_rank(win->comm)){
162     //prepare send_request
163     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
164         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
165
166     //prepare receiver request
167     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
168         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
169
170     //push request to receiver's win
171     xbt_mutex_acquire(recv_win->mut);
172     recv_win->requests->push_back(rreq);
173     xbt_mutex_release(recv_win->mut);
174     //start send
175     smpi_mpi_start(sreq);
176
177     //push request to sender's win
178     xbt_mutex_acquire(win->mut);
179     win->requests->push_back(sreq);
180     xbt_mutex_release(win->mut);
181   }else{
182     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
183   }
184
185   return MPI_SUCCESS;
186 }
187
188 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
189               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
190 {
191   if(win->opened==0)//check that post/start has been done
192     return MPI_ERR_WIN;
193   //get sender pointer
194   MPI_Win send_win = win->connected_wins[target_rank];
195
196   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
197   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
198
199   if(target_rank != smpi_comm_rank(win->comm)){
200     //prepare send_request
201     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
202         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
203         MPI_OP_NULL);
204
205     //prepare receiver request
206     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
207         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
208         MPI_OP_NULL);
209
210     //start the send, with another process than us as sender. 
211     smpi_mpi_start(sreq);
212     //push request to receiver's win
213     xbt_mutex_acquire(send_win->mut);
214     send_win->requests->push_back(sreq);
215     xbt_mutex_release(send_win->mut);
216
217     //start recv
218     smpi_mpi_start(rreq);
219     //push request to sender's win
220     xbt_mutex_acquire(win->mut);
221     win->requests->push_back(rreq);
222     xbt_mutex_release(win->mut);
223   }else{
224     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
225   }
226
227   return MPI_SUCCESS;
228 }
229
230
231 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
232               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
233 {
234   if(win->opened==0)//check that post/start has been done
235     return MPI_ERR_WIN;
236   //FIXME: local version 
237   //get receiver pointer
238   MPI_Win recv_win = win->connected_wins[target_rank];
239
240   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
241   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
242     //As the tag will be used for ordering of the operations, add count to it
243     //prepare send_request
244     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
245         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
246
247     //prepare receiver request
248     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
249         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
250
251     win->count++;
252     //push request to receiver's win
253     xbt_mutex_acquire(recv_win->mut);
254     recv_win->requests->push_back(rreq);
255     xbt_mutex_release(recv_win->mut);
256     //start send
257     smpi_mpi_start(sreq);
258
259     //push request to sender's win
260     xbt_mutex_acquire(win->mut);
261     win->requests->push_back(sreq);
262     xbt_mutex_release(win->mut);
263
264   return MPI_SUCCESS;
265 }
266
267 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
268     /* From MPI forum advices
269     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
270     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
271     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
272     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
273     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
274     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
275     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
276     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
277     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
278     must complete, without further dependencies.  */
279
280   //naive, blocking implementation.
281   int i=0,j=0;
282   int size = smpi_group_size(group);
283   MPI_Request* reqs = xbt_new0(MPI_Request, size);
284
285   while(j!=size){
286     int src=smpi_group_index(group,j);
287     if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){
288       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
289       i++;
290     }
291     j++;
292   }
293   size=i;
294   smpi_mpi_startall(size, reqs);
295   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
296   for(i=0;i<size;i++){
297     smpi_mpi_request_free(&reqs[i]);
298   }
299   xbt_free(reqs);
300   win->opened++; //we're open for business !
301   win->group=group;
302   smpi_group_use(group);
303   return MPI_SUCCESS;
304 }
305
306 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
307   //let's make a synchronous send here
308   int i=0,j=0;
309   int size = smpi_group_size(group);
310   MPI_Request* reqs = xbt_new0(MPI_Request, size);
311
312   while(j!=size){
313     int dst=smpi_group_index(group,j);
314     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
315       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
316       i++;
317     }
318     j++;
319   }
320   size=i;
321
322   smpi_mpi_startall(size, reqs);
323   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
324   for(i=0;i<size;i++){
325     smpi_mpi_request_free(&reqs[i]);
326   }
327   xbt_free(reqs);
328   win->opened++; //we're open for business !
329   win->group=group;
330   smpi_group_use(group);
331   return MPI_SUCCESS;
332 }
333
334 int smpi_mpi_win_complete(MPI_Win win){
335   if(win->opened==0)
336     xbt_die("Complete called on already opened MPI_Win");
337
338   XBT_DEBUG("Entering MPI_Win_Complete");
339   int i=0,j=0;
340   int size = smpi_group_size(win->group);
341   MPI_Request* reqs = xbt_new0(MPI_Request, size);
342
343   while(j!=size){
344     int dst=smpi_group_index(win->group,j);
345     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
346       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
347       i++;
348     }
349     j++;
350   }
351   size=i;
352   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
353   smpi_mpi_startall(size, reqs);
354   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
355
356   for(i=0;i<size;i++){
357     smpi_mpi_request_free(&reqs[i]);
358   }
359   xbt_free(reqs);
360
361   //now we can finish RMA calls
362   xbt_mutex_acquire(win->mut);
363   std::vector<MPI_Request> *reqqs = win->requests;
364   size = static_cast<int>(reqqs->size());
365
366   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
367   // start all requests that have been prepared by another process
368   for (auto req: *reqqs){
369     if (req && (req->flags & PREPARED))
370       smpi_mpi_start(req);
371   }
372
373   MPI_Request* treqs = &(*reqqs)[0];
374   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
375   reqqs->clear();
376   xbt_mutex_release(win->mut);
377
378   smpi_group_unuse(win->group);
379   win->opened--; //we're closed for business !
380   return MPI_SUCCESS;
381 }
382
383 int smpi_mpi_win_wait(MPI_Win win){
384   //naive, blocking implementation.
385   XBT_DEBUG("Entering MPI_Win_Wait");
386   int i=0,j=0;
387   int size = smpi_group_size(win->group);
388   MPI_Request* reqs = xbt_new0(MPI_Request, size);
389
390   while(j!=size){
391     int src=smpi_group_index(win->group,j);
392     if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
393       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
394       i++;
395     }
396     j++;
397   }
398   size=i;
399   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
400   smpi_mpi_startall(size, reqs);
401   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
402   for(i=0;i<size;i++){
403     smpi_mpi_request_free(&reqs[i]);
404   }
405   xbt_free(reqs);
406   xbt_mutex_acquire(win->mut);
407   std::vector<MPI_Request> *reqqs = win->requests;
408   size = static_cast<int>(reqqs->size());
409
410   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
411
412   // start all requests that have been prepared by another process
413   for(auto req: *reqqs){
414     if (req && (req->flags & PREPARED))
415       smpi_mpi_start(req);
416   }
417
418   MPI_Request* treqs = &(*reqqs)[0];
419   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
420   reqqs->clear();
421   xbt_mutex_release(win->mut);
422
423   smpi_group_unuse(win->group);
424   win->opened--; //we're opened for business !
425   return MPI_SUCCESS;
426 }