Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
b91fd0f8b298d7bbef529a2f5ccc79fa77309478
[simgrid.git] / src / smpi / smpi_rma.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9 #include "src/smpi/smpi_group.hpp"
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
12
13 typedef struct s_smpi_mpi_win{
14   void* base;
15   MPI_Aint size;
16   int disp_unit;
17   MPI_Comm comm;
18   MPI_Info info;
19   int assert;
20   std::vector<MPI_Request> *requests;
21   xbt_mutex_t mut;
22   msg_bar_t bar;
23   MPI_Win* connected_wins;
24   char* name;
25   int opened;
26   MPI_Group group;
27   int count; //for ordering the accs
28 } s_smpi_mpi_win_t;
29
30
31 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
32   int comm_size = smpi_comm_size(comm);
33   int rank      = smpi_comm_rank(comm);
34   XBT_DEBUG("Creating window");
35
36   MPI_Win win    = xbt_new(s_smpi_mpi_win_t, 1);
37   win->base = base;
38   win->size = size;
39   win->disp_unit = disp_unit;
40   win->assert = 0;
41   win->info = info;
42   if(info!=MPI_INFO_NULL)
43     info->refcount++;
44   win->comm = comm;
45   win->name = nullptr;
46   win->opened = 0;
47   win->group = MPI_GROUP_NULL;
48   win->requests = new std::vector<MPI_Request>();
49   win->mut=xbt_mutex_init();
50   win->connected_wins = xbt_new0(MPI_Win, comm_size);
51   win->connected_wins[rank] = win;
52   win->count = 0;
53   if(rank==0){
54     win->bar = MSG_barrier_init(comm_size);
55   }
56   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
57                          MPI_BYTE, comm);
58
59   mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
60
61   mpi_coll_barrier_fun(comm);
62
63   return win;
64 }
65
66 int smpi_mpi_win_free( MPI_Win* win){
67   //As per the standard, perform a barrier to ensure every async comm is finished
68   MSG_barrier_wait((*win)->bar);
69   xbt_mutex_acquire((*win)->mut);
70   delete (*win)->requests;
71   xbt_mutex_release((*win)->mut);
72   xbt_free((*win)->connected_wins);
73   if ((*win)->name != nullptr){
74     xbt_free((*win)->name);
75   }
76   if((*win)->info!=MPI_INFO_NULL){
77     MPI_Info_free(&(*win)->info);
78   }
79
80   mpi_coll_barrier_fun((*win)->comm);
81   int rank=smpi_comm_rank((*win)->comm);
82   if(rank == 0)
83     MSG_barrier_destroy((*win)->bar);
84   xbt_mutex_destroy((*win)->mut);
85   xbt_free(*win);
86   *win = MPI_WIN_NULL;
87   return MPI_SUCCESS;
88 }
89
90 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
91   if(win->name==nullptr){
92     *length=0;
93     name=nullptr;
94     return;
95   }
96   *length = strlen(win->name);
97   strncpy(name, win->name, *length+1);
98 }
99
100 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
101   if(win->comm != MPI_COMM_NULL){
102     *group = smpi_comm_group(win->comm);
103   } else {
104     *group = MPI_GROUP_NULL;
105   }
106 }
107
108 void smpi_mpi_win_set_name(MPI_Win win, char* name){
109   win->name = xbt_strdup(name);
110 }
111
112 int smpi_mpi_win_fence(int assert, MPI_Win win)
113 {
114   XBT_DEBUG("Entering fence");
115   if (win->opened == 0)
116     win->opened=1;
117   if (assert != MPI_MODE_NOPRECEDE) {
118     // This is not the first fence => finalize what came before
119     MSG_barrier_wait(win->bar);
120     xbt_mutex_acquire(win->mut);
121     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
122     // Without this, the vector could get redimensionned when another process pushes.
123     // This would result in the array used by smpi_mpi_waitall() to be invalidated.
124     // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall
125     std::vector<MPI_Request> *reqs = win->requests;
126     int size = static_cast<int>(reqs->size());
127     // start all requests that have been prepared by another process
128     if (size > 0) {
129       for (const auto& req : *reqs) {
130         if (req && (req->flags & PREPARED))
131           smpi_mpi_start(req);
132       }
133
134       MPI_Request* treqs = &(*reqs)[0];
135
136       smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
137     }
138     win->count=0;
139     xbt_mutex_release(win->mut);
140   }
141   win->assert = assert;
142
143   MSG_barrier_wait(win->bar);
144   XBT_DEBUG("Leaving fence");
145
146   return MPI_SUCCESS;
147 }
148
149 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
150               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
151 {
152   if(win->opened==0)//check that post/start has been done
153     return MPI_ERR_WIN;
154   //get receiver pointer
155   MPI_Win recv_win = win->connected_wins[target_rank];
156
157   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
158   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
159
160   if(target_rank != smpi_comm_rank(win->comm)){
161     //prepare send_request
162     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
163         smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
164
165     //prepare receiver request
166     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
167         smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
168
169     //push request to receiver's win
170     xbt_mutex_acquire(recv_win->mut);
171     recv_win->requests->push_back(rreq);
172     xbt_mutex_release(recv_win->mut);
173     //start send
174     smpi_mpi_start(sreq);
175
176     //push request to sender's win
177     xbt_mutex_acquire(win->mut);
178     win->requests->push_back(sreq);
179     xbt_mutex_release(win->mut);
180   }else{
181     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
182   }
183
184   return MPI_SUCCESS;
185 }
186
187 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
188               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
189 {
190   if(win->opened==0)//check that post/start has been done
191     return MPI_ERR_WIN;
192   //get sender pointer
193   MPI_Win send_win = win->connected_wins[target_rank];
194
195   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
196   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
197
198   if(target_rank != smpi_comm_rank(win->comm)){
199     //prepare send_request
200     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
201         smpi_comm_group(win->comm)->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
202         MPI_OP_NULL);
203
204     //prepare receiver request
205     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
206         smpi_comm_group(win->comm)->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
207         MPI_OP_NULL);
208
209     //start the send, with another process than us as sender. 
210     smpi_mpi_start(sreq);
211     //push request to receiver's win
212     xbt_mutex_acquire(send_win->mut);
213     send_win->requests->push_back(sreq);
214     xbt_mutex_release(send_win->mut);
215
216     //start recv
217     smpi_mpi_start(rreq);
218     //push request to sender's win
219     xbt_mutex_acquire(win->mut);
220     win->requests->push_back(rreq);
221     xbt_mutex_release(win->mut);
222   }else{
223     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
224   }
225
226   return MPI_SUCCESS;
227 }
228
229
230 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
231               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
232 {
233   if(win->opened==0)//check that post/start has been done
234     return MPI_ERR_WIN;
235   //FIXME: local version 
236   //get receiver pointer
237   MPI_Win recv_win = win->connected_wins[target_rank];
238
239   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
240   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
241     //As the tag will be used for ordering of the operations, add count to it
242     //prepare send_request
243     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
244         smpi_process_index(), smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
245
246     //prepare receiver request
247     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
248         smpi_process_index(), smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
249
250     win->count++;
251     //push request to receiver's win
252     xbt_mutex_acquire(recv_win->mut);
253     recv_win->requests->push_back(rreq);
254     xbt_mutex_release(recv_win->mut);
255     //start send
256     smpi_mpi_start(sreq);
257
258     //push request to sender's win
259     xbt_mutex_acquire(win->mut);
260     win->requests->push_back(sreq);
261     xbt_mutex_release(win->mut);
262
263   return MPI_SUCCESS;
264 }
265
266 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
267     /* From MPI forum advices
268     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
269     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
270     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
271     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
272     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
273     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
274     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
275     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
276     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
277     must complete, without further dependencies.  */
278
279   //naive, blocking implementation.
280     int i             = 0;
281     int j             = 0;
282     int size          = group->getsize();
283     MPI_Request* reqs = xbt_new0(MPI_Request, size);
284
285     while (j != size) {
286       int src = group->index(j);
287       if (src != smpi_process_index() && src != MPI_UNDEFINED) {
288         reqs[i] = smpi_irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
289         i++;
290       }
291       j++;
292   }
293   size=i;
294   smpi_mpi_startall(size, reqs);
295   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
296   for(i=0;i<size;i++){
297     smpi_mpi_request_free(&reqs[i]);
298   }
299   xbt_free(reqs);
300   win->opened++; //we're open for business !
301   win->group=group;
302   group->use();
303   return MPI_SUCCESS;
304 }
305
306 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
307   //let's make a synchronous send here
308   int i             = 0;
309   int j             = 0;
310   int size = group->getsize();
311   MPI_Request* reqs = xbt_new0(MPI_Request, size);
312
313   while(j!=size){
314     int dst=group->index(j);
315     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
316       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
317       i++;
318     }
319     j++;
320   }
321   size=i;
322
323   smpi_mpi_startall(size, reqs);
324   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
325   for(i=0;i<size;i++){
326     smpi_mpi_request_free(&reqs[i]);
327   }
328   xbt_free(reqs);
329   win->opened++; //we're open for business !
330   win->group=group;
331   group->use();
332   return MPI_SUCCESS;
333 }
334
335 int smpi_mpi_win_complete(MPI_Win win){
336   if(win->opened==0)
337     xbt_die("Complete called on already opened MPI_Win");
338
339   XBT_DEBUG("Entering MPI_Win_Complete");
340   int i             = 0;
341   int j             = 0;
342   int size = win->group->getsize();
343   MPI_Request* reqs = xbt_new0(MPI_Request, size);
344
345   while(j!=size){
346     int dst=win->group->index(j);
347     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
348       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
349       i++;
350     }
351     j++;
352   }
353   size=i;
354   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
355   smpi_mpi_startall(size, reqs);
356   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
357
358   for(i=0;i<size;i++){
359     smpi_mpi_request_free(&reqs[i]);
360   }
361   xbt_free(reqs);
362
363   //now we can finish RMA calls
364   xbt_mutex_acquire(win->mut);
365   std::vector<MPI_Request> *reqqs = win->requests;
366   size = static_cast<int>(reqqs->size());
367
368   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
369   if (size > 0) {
370     // start all requests that have been prepared by another process
371     for (const auto& req : *reqqs) {
372       if (req && (req->flags & PREPARED))
373         smpi_mpi_start(req);
374     }
375
376     MPI_Request* treqs = &(*reqqs)[0];
377     smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
378     reqqs->clear();
379   }
380   xbt_mutex_release(win->mut);
381
382   win->group->unuse();
383   win->opened--; //we're closed for business !
384   return MPI_SUCCESS;
385 }
386
387 int smpi_mpi_win_wait(MPI_Win win){
388   //naive, blocking implementation.
389   XBT_DEBUG("Entering MPI_Win_Wait");
390   int i=0,j=0;
391   int size = win->group->getsize();
392   MPI_Request* reqs = xbt_new0(MPI_Request, size);
393
394   while(j!=size){
395     int src=win->group->index(j);
396     if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
397       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
398       i++;
399     }
400     j++;
401   }
402   size=i;
403   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
404   smpi_mpi_startall(size, reqs);
405   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
406   for(i=0;i<size;i++){
407     smpi_mpi_request_free(&reqs[i]);
408   }
409   xbt_free(reqs);
410   xbt_mutex_acquire(win->mut);
411   std::vector<MPI_Request> *reqqs = win->requests;
412   size = static_cast<int>(reqqs->size());
413
414   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
415   if (size > 0) {
416     // start all requests that have been prepared by another process
417     for (const auto& req : *reqqs) {
418       if (req && (req->flags & PREPARED))
419         smpi_mpi_start(req);
420     }
421
422     MPI_Request* treqs = &(*reqqs)[0];
423     smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
424     reqqs->clear();
425   }
426   xbt_mutex_release(win->mut);
427
428   win->group->unuse();
429   win->opened--; //we're opened for business !
430   return MPI_SUCCESS;
431 }