Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
WIP stop using const char* in C++ layers
[simgrid.git] / src / smpi / smpi_rma.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 typedef struct s_smpi_mpi_win{
13   void* base;
14   MPI_Aint size;
15   int disp_unit;
16   MPI_Comm comm;
17   MPI_Info info;
18   int assert;
19   std::vector<MPI_Request> *requests;
20   xbt_mutex_t mut;
21   msg_bar_t bar;
22   MPI_Win* connected_wins;
23   char* name;
24   int opened;
25   MPI_Group group;
26   int count; //for ordering the accs
27 } s_smpi_mpi_win_t;
28
29
30 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
31   MPI_Win win;
32
33   int comm_size = smpi_comm_size(comm);
34   int rank=smpi_comm_rank(comm);
35   XBT_DEBUG("Creating window");
36
37   win = xbt_new(s_smpi_mpi_win_t, 1);
38   win->base = base;
39   win->size = size;
40   win->disp_unit = disp_unit;
41   win->assert = 0;
42   win->info = info;
43   if(info!=MPI_INFO_NULL)
44     info->refcount++;
45   win->comm = comm;
46   win->name = nullptr;
47   win->opened = 0;
48   win->group = MPI_GROUP_NULL;
49   win->requests = new std::vector<MPI_Request>();
50   win->mut=xbt_mutex_init();
51   win->connected_wins = xbt_new0(MPI_Win, comm_size);
52   win->connected_wins[rank] = win;
53   win->count = 0;
54   if(rank==0){
55     win->bar = MSG_barrier_init(comm_size);
56   }
57   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
58                          MPI_BYTE, comm);
59
60   mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
61
62   mpi_coll_barrier_fun(comm);
63
64   return win;
65 }
66
67 int smpi_mpi_win_free( MPI_Win* win){
68   //As per the standard, perform a barrier to ensure every async comm is finished
69   MSG_barrier_wait((*win)->bar);
70   xbt_mutex_acquire((*win)->mut);
71   delete (*win)->requests;
72   xbt_mutex_release((*win)->mut);
73   xbt_free((*win)->connected_wins);
74   if ((*win)->name != nullptr){
75     xbt_free((*win)->name);
76   }
77   if((*win)->info!=MPI_INFO_NULL){
78     MPI_Info_free(&(*win)->info);
79   }
80
81   mpi_coll_barrier_fun((*win)->comm);
82   int rank=smpi_comm_rank((*win)->comm);
83   if(rank == 0)
84     MSG_barrier_destroy((*win)->bar);
85   xbt_mutex_destroy((*win)->mut);
86   xbt_free(*win);
87   *win = MPI_WIN_NULL;
88   return MPI_SUCCESS;
89 }
90
91 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
92   if(win->name==nullptr){
93     *length=0;
94     name=nullptr;
95     return;
96   }
97   *length = strlen(win->name);
98   strncpy(name, win->name, *length+1);
99 }
100
101 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
102   if(win->comm != MPI_COMM_NULL){
103     *group = smpi_comm_group(win->comm);
104   } else {
105     *group = MPI_GROUP_NULL;
106   }
107 }
108
109 void smpi_mpi_win_set_name(MPI_Win win, char* name){
110   win->name = xbt_strdup(name);
111 }
112
113 int smpi_mpi_win_fence(int assert, MPI_Win win)
114 {
115   XBT_DEBUG("Entering fence");
116   if (win->opened == 0)
117     win->opened=1;
118   if (assert != MPI_MODE_NOPRECEDE) {
119     // This is not the first fence => finalize what came before
120     MSG_barrier_wait(win->bar);
121     xbt_mutex_acquire(win->mut);
122     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
123     // Without this, the vector could get redimensionned when another process pushes.
124     // This would result in the array used by smpi_mpi_waitall() to be invalidated.
125     // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall
126     std::vector<MPI_Request> *reqs = win->requests;
127     int size = static_cast<int>(reqs->size());
128     // start all requests that have been prepared by another process
129     if (size > 0) {
130       for (const auto& req : *reqs) {
131         if (req && (req->flags & PREPARED))
132           smpi_mpi_start(req);
133       }
134
135       MPI_Request* treqs = &(*reqs)[0];
136
137       smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
138     }
139     win->count=0;
140     xbt_mutex_release(win->mut);
141   }
142   win->assert = assert;
143
144   MSG_barrier_wait(win->bar);
145   XBT_DEBUG("Leaving fence");
146
147   return MPI_SUCCESS;
148 }
149
150 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
151               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
152 {
153   if(win->opened==0)//check that post/start has been done
154     return MPI_ERR_WIN;
155   //get receiver pointer
156   MPI_Win recv_win = win->connected_wins[target_rank];
157
158   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
159   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
160
161   if(target_rank != smpi_comm_rank(win->comm)){
162     //prepare send_request
163     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
164         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
165
166     //prepare receiver request
167     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
168         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
169
170     //push request to receiver's win
171     xbt_mutex_acquire(recv_win->mut);
172     recv_win->requests->push_back(rreq);
173     xbt_mutex_release(recv_win->mut);
174     //start send
175     smpi_mpi_start(sreq);
176
177     //push request to sender's win
178     xbt_mutex_acquire(win->mut);
179     win->requests->push_back(sreq);
180     xbt_mutex_release(win->mut);
181   }else{
182     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
183   }
184
185   return MPI_SUCCESS;
186 }
187
188 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
189               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
190 {
191   if(win->opened==0)//check that post/start has been done
192     return MPI_ERR_WIN;
193   //get sender pointer
194   MPI_Win send_win = win->connected_wins[target_rank];
195
196   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
197   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
198
199   if(target_rank != smpi_comm_rank(win->comm)){
200     //prepare send_request
201     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
202         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
203         MPI_OP_NULL);
204
205     //prepare receiver request
206     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
207         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
208         MPI_OP_NULL);
209
210     //start the send, with another process than us as sender. 
211     smpi_mpi_start(sreq);
212     //push request to receiver's win
213     xbt_mutex_acquire(send_win->mut);
214     send_win->requests->push_back(sreq);
215     xbt_mutex_release(send_win->mut);
216
217     //start recv
218     smpi_mpi_start(rreq);
219     //push request to sender's win
220     xbt_mutex_acquire(win->mut);
221     win->requests->push_back(rreq);
222     xbt_mutex_release(win->mut);
223   }else{
224     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
225   }
226
227   return MPI_SUCCESS;
228 }
229
230
231 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
232               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
233 {
234   if(win->opened==0)//check that post/start has been done
235     return MPI_ERR_WIN;
236   //FIXME: local version 
237   //get receiver pointer
238   MPI_Win recv_win = win->connected_wins[target_rank];
239
240   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
241   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
242     //As the tag will be used for ordering of the operations, add count to it
243     //prepare send_request
244     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
245         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
246
247     //prepare receiver request
248     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
249         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
250
251     win->count++;
252     //push request to receiver's win
253     xbt_mutex_acquire(recv_win->mut);
254     recv_win->requests->push_back(rreq);
255     xbt_mutex_release(recv_win->mut);
256     //start send
257     smpi_mpi_start(sreq);
258
259     //push request to sender's win
260     xbt_mutex_acquire(win->mut);
261     win->requests->push_back(sreq);
262     xbt_mutex_release(win->mut);
263
264   return MPI_SUCCESS;
265 }
266
267 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
268     /* From MPI forum advices
269     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
270     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
271     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
272     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
273     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
274     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
275     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
276     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
277     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
278     must complete, without further dependencies.  */
279
280   //naive, blocking implementation.
281   int i=0,j=0;
282   int size = smpi_group_size(group);
283   MPI_Request* reqs = xbt_new0(MPI_Request, size);
284
285   while(j!=size){
286     int src=smpi_group_index(group,j);
287     if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){
288       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
289       i++;
290     }
291     j++;
292   }
293   size=i;
294   smpi_mpi_startall(size, reqs);
295   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
296   for(i=0;i<size;i++){
297     smpi_mpi_request_free(&reqs[i]);
298   }
299   xbt_free(reqs);
300   win->opened++; //we're open for business !
301   win->group=group;
302   smpi_group_use(group);
303   return MPI_SUCCESS;
304 }
305
306 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
307   //let's make a synchronous send here
308   int i=0,j=0;
309   int size = smpi_group_size(group);
310   MPI_Request* reqs = xbt_new0(MPI_Request, size);
311
312   while(j!=size){
313     int dst=smpi_group_index(group,j);
314     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
315       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
316       i++;
317     }
318     j++;
319   }
320   size=i;
321
322   smpi_mpi_startall(size, reqs);
323   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
324   for(i=0;i<size;i++){
325     smpi_mpi_request_free(&reqs[i]);
326   }
327   xbt_free(reqs);
328   win->opened++; //we're open for business !
329   win->group=group;
330   smpi_group_use(group);
331   return MPI_SUCCESS;
332 }
333
334 int smpi_mpi_win_complete(MPI_Win win){
335   if(win->opened==0)
336     xbt_die("Complete called on already opened MPI_Win");
337
338   XBT_DEBUG("Entering MPI_Win_Complete");
339   int i=0,j=0;
340   int size = smpi_group_size(win->group);
341   MPI_Request* reqs = xbt_new0(MPI_Request, size);
342
343   while(j!=size){
344     int dst=smpi_group_index(win->group,j);
345     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
346       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
347       i++;
348     }
349     j++;
350   }
351   size=i;
352   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
353   smpi_mpi_startall(size, reqs);
354   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
355
356   for(i=0;i<size;i++){
357     smpi_mpi_request_free(&reqs[i]);
358   }
359   xbt_free(reqs);
360
361   //now we can finish RMA calls
362   xbt_mutex_acquire(win->mut);
363   std::vector<MPI_Request> *reqqs = win->requests;
364   size = static_cast<int>(reqqs->size());
365
366   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
367   if (size > 0) {
368     // start all requests that have been prepared by another process
369     for (const auto& req : *reqqs) {
370       if (req && (req->flags & PREPARED))
371         smpi_mpi_start(req);
372     }
373
374     MPI_Request* treqs = &(*reqqs)[0];
375     smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
376     reqqs->clear();
377   }
378   xbt_mutex_release(win->mut);
379
380   smpi_group_unuse(win->group);
381   win->opened--; //we're closed for business !
382   return MPI_SUCCESS;
383 }
384
385 int smpi_mpi_win_wait(MPI_Win win){
386   //naive, blocking implementation.
387   XBT_DEBUG("Entering MPI_Win_Wait");
388   int i=0,j=0;
389   int size = smpi_group_size(win->group);
390   MPI_Request* reqs = xbt_new0(MPI_Request, size);
391
392   while(j!=size){
393     int src=smpi_group_index(win->group,j);
394     if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
395       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
396       i++;
397     }
398     j++;
399   }
400   size=i;
401   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
402   smpi_mpi_startall(size, reqs);
403   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
404   for(i=0;i<size;i++){
405     smpi_mpi_request_free(&reqs[i]);
406   }
407   xbt_free(reqs);
408   xbt_mutex_acquire(win->mut);
409   std::vector<MPI_Request> *reqqs = win->requests;
410   size = static_cast<int>(reqqs->size());
411
412   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
413   if (size > 0) {
414     // start all requests that have been prepared by another process
415     for (const auto& req : *reqqs) {
416       if (req && (req->flags & PREPARED))
417         smpi_mpi_start(req);
418     }
419
420     MPI_Request* treqs = &(*reqqs)[0];
421     smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
422     reqqs->clear();
423   }
424   xbt_mutex_release(win->mut);
425
426   smpi_group_unuse(win->group);
427   win->opened--; //we're opened for business !
428   return MPI_SUCCESS;
429 }