Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
a2c2220a3e0887be446210d8d2f1cc227a00c7ec
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18   int comm_size = comm->size();
19   int rank      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   connected_wins_ = new MPI_Win[comm_size];
29   connected_wins_[rank] = this;
30   count_ = 0;
31   if(rank==0){
32     bar_ = MSG_barrier_init(comm_size);
33   }
34   Colls::allgather(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
35                          MPI_BYTE, comm);
36
37   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
38
39   Colls::barrier(comm);
40 }
41
42 Win::~Win(){
43   //As per the standard, perform a barrier to ensure every async comm is finished
44   MSG_barrier_wait(bar_);
45   xbt_mutex_acquire(mut_);
46   delete requests_;
47   xbt_mutex_release(mut_);
48   delete[] connected_wins_;
49   if (name_ != nullptr){
50     xbt_free(name_);
51   }
52   if(info_!=MPI_INFO_NULL){
53     MPI_Info_free(&info_);
54   }
55
56   Colls::barrier(comm_);
57   int rank=comm_->rank();
58   if(rank == 0)
59     MSG_barrier_destroy(bar_);
60   xbt_mutex_destroy(mut_);
61
62   if(!attributes_.empty()){
63     int flag;
64     for(auto it = attributes_.begin(); it != attributes_.end(); it++){
65       try{
66         smpi_key_elem elem = keyvals_.at((*it).first);
67         if (elem != nullptr && elem->delete_fn.win_delete_fn != nullptr)
68           elem->delete_fn.win_delete_fn(this, (*it).first, (*it).second, &flag);
69       }catch(const std::out_of_range& oor) {
70         //already deleted, not a problem;
71       }
72     }
73   }
74
75 }
76
77 void Win::get_name(char* name, int* length){
78   if(name_==nullptr){
79     *length=0;
80     name=nullptr;
81     return;
82   }
83   *length = strlen(name_);
84   strncpy(name, name_, *length+1);
85 }
86
87 void Win::get_group(MPI_Group* group){
88   if(comm_ != MPI_COMM_NULL){
89     *group = comm_->group();
90   } else {
91     *group = MPI_GROUP_NULL;
92   }
93 }
94
95 MPI_Aint Win::size(){
96   return size_;
97 }
98
99 void* Win::base(){
100   return base_;
101 }
102
103 int Win::disp_unit(){
104   return disp_unit_;
105 }
106
107
108 void Win::set_name(char* name){
109   name_ = xbt_strdup(name);
110 }
111
112 int Win::fence(int assert)
113 {
114   XBT_DEBUG("Entering fence");
115   if (opened_ == 0)
116     opened_=1;
117   if (assert != MPI_MODE_NOPRECEDE) {
118     // This is not the first fence => finalize what came before
119     MSG_barrier_wait(bar_);
120     xbt_mutex_acquire(mut_);
121     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
122     // Without this, the vector could get redimensionned when another process pushes.
123     // This would result in the array used by Request::waitall() to be invalidated.
124     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
125     std::vector<MPI_Request> *reqs = requests_;
126     int size = static_cast<int>(reqs->size());
127     // start all requests that have been prepared by another process
128     if (size > 0) {
129       for (const auto& req : *reqs) {
130         if (req && (req->flags() & PREPARED))
131           req->start();
132       }
133
134       MPI_Request* treqs = &(*reqs)[0];
135
136       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
137     }
138     count_=0;
139     xbt_mutex_release(mut_);
140   }
141   assert_ = assert;
142
143   MSG_barrier_wait(bar_);
144   XBT_DEBUG("Leaving fence");
145
146   return MPI_SUCCESS;
147 }
148
149 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
150               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
151 {
152   if(opened_==0)//check that post/start has been done
153     return MPI_ERR_WIN;
154   //get receiver pointer
155   MPI_Win recv_win = connected_wins_[target_rank];
156
157   if(target_count*target_datatype->get_extent()>recv_win->size_)
158     return MPI_ERR_ARG;
159
160   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
161   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
162
163   if(target_rank != comm_->rank()){
164     //prepare send_request
165     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
166         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
167
168     //prepare receiver request
169     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
170         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
171
172     //push request to receiver's win
173     xbt_mutex_acquire(recv_win->mut_);
174     recv_win->requests_->push_back(rreq);
175     xbt_mutex_release(recv_win->mut_);
176     //start send
177     sreq->start();
178
179     //push request to sender's win
180     xbt_mutex_acquire(mut_);
181     requests_->push_back(sreq);
182     xbt_mutex_release(mut_);
183   }else{
184     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
185   }
186
187   return MPI_SUCCESS;
188 }
189
190 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
191               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
192 {
193   if(opened_==0)//check that post/start has been done
194     return MPI_ERR_WIN;
195   //get sender pointer
196   MPI_Win send_win = connected_wins_[target_rank];
197
198   if(target_count*target_datatype->get_extent()>send_win->size_)
199     return MPI_ERR_ARG;
200
201   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
202   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
203
204   if(target_rank != comm_->rank()){
205     //prepare send_request
206     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
207         comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm_,
208         MPI_OP_NULL);
209
210     //prepare receiver request
211     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
212         comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, comm_,
213         MPI_OP_NULL);
214
215     //start the send, with another process than us as sender. 
216     sreq->start();
217     //push request to receiver's win
218     xbt_mutex_acquire(send_win->mut_);
219     send_win->requests_->push_back(sreq);
220     xbt_mutex_release(send_win->mut_);
221
222     //start recv
223     rreq->start();
224     //push request to sender's win
225     xbt_mutex_acquire(mut_);
226     requests_->push_back(rreq);
227     xbt_mutex_release(mut_);
228   }else{
229     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
230   }
231
232   return MPI_SUCCESS;
233 }
234
235
236 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
237               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
238 {
239   if(opened_==0)//check that post/start has been done
240     return MPI_ERR_WIN;
241   //FIXME: local version 
242   //get receiver pointer
243   MPI_Win recv_win = connected_wins_[target_rank];
244
245   if(target_count*target_datatype->get_extent()>recv_win->size_)
246     return MPI_ERR_ARG;
247
248   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
249   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
250     //As the tag will be used for ordering of the operations, add count to it
251     //prepare send_request
252     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
253         smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
254
255     //prepare receiver request
256     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
257         smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
258
259     count_++;
260     //push request to receiver's win
261     xbt_mutex_acquire(recv_win->mut_);
262     recv_win->requests_->push_back(rreq);
263     xbt_mutex_release(recv_win->mut_);
264     //start send
265     sreq->start();
266
267     //push request to sender's win
268     xbt_mutex_acquire(mut_);
269     requests_->push_back(sreq);
270     xbt_mutex_release(mut_);
271
272   return MPI_SUCCESS;
273 }
274
275 int Win::start(MPI_Group group, int assert){
276     /* From MPI forum advices
277     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
278     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
279     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
280     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
281     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
282     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
283     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
284     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
285     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
286     must complete, without further dependencies.  */
287
288   //naive, blocking implementation.
289     int i             = 0;
290     int j             = 0;
291     int size          = group->size();
292     MPI_Request* reqs = xbt_new0(MPI_Request, size);
293
294     while (j != size) {
295       int src = group->index(j);
296       if (src != smpi_process_index() && src != MPI_UNDEFINED) {
297         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
298         i++;
299       }
300       j++;
301   }
302   size=i;
303   Request::startall(size, reqs);
304   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
305   for(i=0;i<size;i++){
306     Request::unref(&reqs[i]);
307   }
308   xbt_free(reqs);
309   opened_++; //we're open for business !
310   group_=group;
311   group->ref();
312   return MPI_SUCCESS;
313 }
314
315 int Win::post(MPI_Group group, int assert){
316   //let's make a synchronous send here
317   int i             = 0;
318   int j             = 0;
319   int size = group->size();
320   MPI_Request* reqs = xbt_new0(MPI_Request, size);
321
322   while(j!=size){
323     int dst=group->index(j);
324     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
325       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
326       i++;
327     }
328     j++;
329   }
330   size=i;
331
332   Request::startall(size, reqs);
333   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
334   for(i=0;i<size;i++){
335     Request::unref(&reqs[i]);
336   }
337   xbt_free(reqs);
338   opened_++; //we're open for business !
339   group_=group;
340   group->ref();
341   return MPI_SUCCESS;
342 }
343
344 int Win::complete(){
345   if(opened_==0)
346     xbt_die("Complete called on already opened MPI_Win");
347
348   XBT_DEBUG("Entering MPI_Win_Complete");
349   int i             = 0;
350   int j             = 0;
351   int size = group_->size();
352   MPI_Request* reqs = xbt_new0(MPI_Request, size);
353
354   while(j!=size){
355     int dst=group_->index(j);
356     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
357       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
358       i++;
359     }
360     j++;
361   }
362   size=i;
363   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
364   Request::startall(size, reqs);
365   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
366
367   for(i=0;i<size;i++){
368     Request::unref(&reqs[i]);
369   }
370   xbt_free(reqs);
371
372   //now we can finish RMA calls
373   xbt_mutex_acquire(mut_);
374   std::vector<MPI_Request> *reqqs = requests_;
375   size = static_cast<int>(reqqs->size());
376
377   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
378   if (size > 0) {
379     // start all requests that have been prepared by another process
380     for (const auto& req : *reqqs) {
381       if (req && (req->flags() & PREPARED))
382         req->start();
383     }
384
385     MPI_Request* treqs = &(*reqqs)[0];
386     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
387     reqqs->clear();
388   }
389   xbt_mutex_release(mut_);
390
391   Group::unref(group_);
392   opened_--; //we're closed for business !
393   return MPI_SUCCESS;
394 }
395
396 int Win::wait(){
397   //naive, blocking implementation.
398   XBT_DEBUG("Entering MPI_Win_Wait");
399   int i=0,j=0;
400   int size = group_->size();
401   MPI_Request* reqs = xbt_new0(MPI_Request, size);
402
403   while(j!=size){
404     int src=group_->index(j);
405     if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
406       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
407       i++;
408     }
409     j++;
410   }
411   size=i;
412   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
413   Request::startall(size, reqs);
414   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
415   for(i=0;i<size;i++){
416     Request::unref(&reqs[i]);
417   }
418   xbt_free(reqs);
419   xbt_mutex_acquire(mut_);
420   std::vector<MPI_Request> *reqqs = requests_;
421   size = static_cast<int>(reqqs->size());
422
423   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
424   if (size > 0) {
425     // start all requests that have been prepared by another process
426     for (const auto& req : *reqqs) {
427       if (req && (req->flags() & PREPARED))
428         req->start();
429     }
430
431     MPI_Request* treqs = &(*reqqs)[0];
432     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
433     reqqs->clear();
434   }
435   xbt_mutex_release(mut_);
436
437   Group::unref(group_);
438   opened_--; //we're opened for business !
439   return MPI_SUCCESS;
440 }
441
442 Win* Win::f2c(int id){
443   return static_cast<Win*>(F2C::f2c(id));
444 }
445
446 }
447 }