Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
5e00ce514360f7b8da2d03fc959aae57d31da72f
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7 #include "private.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
17
18 using simgrid::s4u::Actor;
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26   int comm_size = comm->size();
27   rank_         = comm->rank();
28   XBT_DEBUG("Creating window");
29   if(info!=MPI_INFO_NULL)
30     info->ref();
31   name_                  = nullptr;
32   opened_                = 0;
33   group_                 = MPI_GROUP_NULL;
34   requests_              = new std::vector<MPI_Request>();
35   mut_                   = xbt_mutex_init();
36   lock_mut_              = xbt_mutex_init();
37   atomic_mut_            = xbt_mutex_init();
38   connected_wins_        = new MPI_Win[comm_size];
39   connected_wins_[rank_] = this;
40   count_                 = 0;
41   if(rank_==0){
42     bar_ = new simgrid::s4u::Barrier(comm_size);
43   }
44   mode_=0;
45
46   comm->add_rma_win(this);
47   comm->ref();
48
49   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
50                          MPI_BYTE, comm);
51
52   Colls::bcast(&(bar_), sizeof(simgrid::s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   Colls::barrier(comm);
55 }
56
57 Win::~Win(){
58   //As per the standard, perform a barrier to ensure every async comm is finished
59   bar_->wait();
60
61   int finished = finish_comms();
62   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
63
64   delete requests_;
65   delete[] connected_wins_;
66   if (name_ != nullptr){
67     xbt_free(name_);
68   }
69   if(info_!=MPI_INFO_NULL){
70     MPI_Info_free(&info_);
71   }
72
73   comm_->remove_rma_win(this);
74
75   Colls::barrier(comm_);
76   Comm::unref(comm_);
77   
78   if (rank_ == 0)
79     delete bar_;
80   xbt_mutex_destroy(mut_);
81   xbt_mutex_destroy(lock_mut_);
82   xbt_mutex_destroy(atomic_mut_);
83
84   if(allocated_ !=0)
85     xbt_free(base_);
86
87   cleanup_attr<Win>();
88 }
89
90 int Win::attach (void *base, MPI_Aint size){
91   if (not(base_ == MPI_BOTTOM || base_ == 0))
92     return MPI_ERR_ARG;
93   base_=0;//actually the address will be given in the RMA calls, as being the disp.
94   size_+=size;
95   return MPI_SUCCESS;
96 }
97
98 int Win::detach (void *base){
99   base_=MPI_BOTTOM;
100   size_=-1;
101   return MPI_SUCCESS;
102 }
103
104 void Win::get_name(char* name, int* length){
105   if(name_==nullptr){
106     *length=0;
107     name=nullptr;
108     return;
109   }
110   *length = strlen(name_);
111   strncpy(name, name_, *length+1);
112 }
113
114 void Win::get_group(MPI_Group* group){
115   if(comm_ != MPI_COMM_NULL){
116     *group = comm_->group();
117   } else {
118     *group = MPI_GROUP_NULL;
119   }
120 }
121
122 MPI_Info Win::info(){
123   if(info_== MPI_INFO_NULL)
124     info_ = new Info();
125   info_->ref();
126   return info_;
127 }
128
129 int Win::rank(){
130   return rank_;
131 }
132
133 MPI_Aint Win::size(){
134   return size_;
135 }
136
137 void* Win::base(){
138   return base_;
139 }
140
141 int Win::disp_unit(){
142   return disp_unit_;
143 }
144
145 int Win::dynamic(){
146   return dynamic_;
147 }
148
149 void Win::set_info(MPI_Info info){
150   if(info_!= MPI_INFO_NULL)
151     info->ref();
152   info_=info;
153 }
154
155 void Win::set_name(char* name){
156   name_ = xbt_strdup(name);
157 }
158
159 int Win::fence(int assert)
160 {
161   XBT_DEBUG("Entering fence");
162   if (opened_ == 0)
163     opened_=1;
164   if (assert != MPI_MODE_NOPRECEDE) {
165     // This is not the first fence => finalize what came before
166     bar_->wait();
167     xbt_mutex_acquire(mut_);
168     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
169     // Without this, the vector could get redimensionned when another process pushes.
170     // This would result in the array used by Request::waitall() to be invalidated.
171     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
172     std::vector<MPI_Request> *reqs = requests_;
173     int size = static_cast<int>(reqs->size());
174     // start all requests that have been prepared by another process
175     if (size > 0) {
176       MPI_Request* treqs = &(*reqs)[0];
177       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
178     }
179     count_=0;
180     xbt_mutex_release(mut_);
181   }
182
183   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
184     opened_=0;
185   assert_ = assert;
186
187   bar_->wait();
188   XBT_DEBUG("Leaving fence");
189
190   return MPI_SUCCESS;
191 }
192
193 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
194               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
195 {
196   //get receiver pointer
197   MPI_Win recv_win = connected_wins_[target_rank];
198
199   if(opened_==0){//check that post/start has been done
200     // no fence or start .. lock ok ?
201     int locked=0;
202     for (auto const& it : recv_win->lockers_)
203       if (it == comm_->rank())
204         locked = 1;
205     if(locked != 1)
206       return MPI_ERR_WIN;
207   }
208
209   if(target_count*target_datatype->get_extent()>recv_win->size_)
210     return MPI_ERR_ARG;
211
212   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
213
214   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
215     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
216     // prepare send_request
217     MPI_Request sreq =
218         // TODO cheinrich Check for rank / pid conversion
219         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
220                                comm_, MPI_OP_NULL);
221
222     //prepare receiver request
223     // TODO cheinrich Check for rank / pid conversion
224     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
225                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
226
227     //start send
228     sreq->start();
229
230     if(request!=nullptr){
231       *request=sreq;
232     }else{
233       xbt_mutex_acquire(mut_);
234       requests_->push_back(sreq);
235       xbt_mutex_release(mut_);
236     }
237
238     //push request to receiver's win
239     xbt_mutex_acquire(recv_win->mut_);
240     recv_win->requests_->push_back(rreq);
241     rreq->start();
242     xbt_mutex_release(recv_win->mut_);
243
244   }else{
245     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
246     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
247     if(request!=nullptr)
248       *request = MPI_REQUEST_NULL;
249   }
250
251   return MPI_SUCCESS;
252 }
253
254 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
255               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
256 {
257   //get sender pointer
258   MPI_Win send_win = connected_wins_[target_rank];
259
260   if(opened_==0){//check that post/start has been done
261     // no fence or start .. lock ok ?
262     int locked=0;
263     for (auto const& it : send_win->lockers_)
264       if (it == comm_->rank())
265         locked = 1;
266     if(locked != 1)
267       return MPI_ERR_WIN;
268   }
269
270   if(target_count*target_datatype->get_extent()>send_win->size_)
271     return MPI_ERR_ARG;
272
273   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
274   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
275
276   if(target_rank != comm_->rank()){
277     //prepare send_request
278     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
279                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
280
281     //prepare receiver request
282     MPI_Request rreq = Request::rma_recv_init(
283         origin_addr, origin_count, origin_datatype, target_rank,
284         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
285         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
286
287     //start the send, with another process than us as sender.
288     sreq->start();
289     //push request to receiver's win
290     xbt_mutex_acquire(send_win->mut_);
291     send_win->requests_->push_back(sreq);
292     xbt_mutex_release(send_win->mut_);
293
294     //start recv
295     rreq->start();
296
297     if(request!=nullptr){
298       *request=rreq;
299     }else{
300       xbt_mutex_acquire(mut_);
301       requests_->push_back(rreq);
302       xbt_mutex_release(mut_);
303     }
304
305   }else{
306     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
307     if(request!=nullptr)
308       *request=MPI_REQUEST_NULL;
309   }
310
311   return MPI_SUCCESS;
312 }
313
314
315 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
316               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
317 {
318   XBT_DEBUG("Entering MPI_Win_Accumulate");
319   //get receiver pointer
320   MPI_Win recv_win = connected_wins_[target_rank];
321
322   if(opened_==0){//check that post/start has been done
323     // no fence or start .. lock ok ?
324     int locked=0;
325     for (auto const& it : recv_win->lockers_)
326       if (it == comm_->rank())
327         locked = 1;
328     if(locked != 1)
329       return MPI_ERR_WIN;
330   }
331   //FIXME: local version
332
333   if(target_count*target_datatype->get_extent()>recv_win->size_)
334     return MPI_ERR_ARG;
335
336   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
337   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
338     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
339     //prepare send_request
340
341   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
342                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
343
344   // prepare receiver request
345   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
346                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
347
348   count_++;
349
350   // start send
351   sreq->start();
352   // push request to receiver's win
353   xbt_mutex_acquire(recv_win->mut_);
354   recv_win->requests_->push_back(rreq);
355   rreq->start();
356   xbt_mutex_release(recv_win->mut_);
357
358   if (request != nullptr) {
359     *request = sreq;
360   } else {
361     xbt_mutex_acquire(mut_);
362     requests_->push_back(sreq);
363     xbt_mutex_release(mut_);
364   }
365
366   XBT_DEBUG("Leaving MPI_Win_Accumulate");
367   return MPI_SUCCESS;
368 }
369
370 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
371               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
372               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
373
374   //get sender pointer
375   MPI_Win send_win = connected_wins_[target_rank];
376
377   if(opened_==0){//check that post/start has been done
378     // no fence or start .. lock ok ?
379     int locked=0;
380     for (auto const& it : send_win->lockers_)
381       if (it == comm_->rank())
382         locked = 1;
383     if(locked != 1)
384       return MPI_ERR_WIN;
385   }
386
387   if(target_count*target_datatype->get_extent()>send_win->size_)
388     return MPI_ERR_ARG;
389
390   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
391   //need to be sure ops are correctly ordered, so finish request here ? slow.
392   MPI_Request req;
393   xbt_mutex_acquire(send_win->atomic_mut_);
394   get(result_addr, result_count, result_datatype, target_rank,
395               target_disp, target_count, target_datatype, &req);
396   if (req != MPI_REQUEST_NULL)
397     Request::wait(&req, MPI_STATUS_IGNORE);
398   if(op!=MPI_NO_OP)
399     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
400               target_disp, target_count, target_datatype, op, &req);
401   if (req != MPI_REQUEST_NULL)
402     Request::wait(&req, MPI_STATUS_IGNORE);
403   xbt_mutex_release(send_win->atomic_mut_);
404   return MPI_SUCCESS;
405
406 }
407
408 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
409         void *result_addr, MPI_Datatype datatype, int target_rank,
410         MPI_Aint target_disp){
411   //get sender pointer
412   MPI_Win send_win = connected_wins_[target_rank];
413
414   if(opened_==0){//check that post/start has been done
415     // no fence or start .. lock ok ?
416     int locked=0;
417     for (auto const& it : send_win->lockers_)
418       if (it == comm_->rank())
419         locked = 1;
420     if(locked != 1)
421       return MPI_ERR_WIN;
422   }
423
424   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
425   MPI_Request req = MPI_REQUEST_NULL;
426   xbt_mutex_acquire(send_win->atomic_mut_);
427   get(result_addr, 1, datatype, target_rank,
428               target_disp, 1, datatype, &req);
429   if (req != MPI_REQUEST_NULL)
430     Request::wait(&req, MPI_STATUS_IGNORE);
431   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
432     put(origin_addr, 1, datatype, target_rank,
433               target_disp, 1, datatype);
434   }
435   xbt_mutex_release(send_win->atomic_mut_);
436   return MPI_SUCCESS;
437 }
438
439 int Win::start(MPI_Group group, int assert){
440   /* From MPI forum advices
441   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
442   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
443   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
444   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
445   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
446   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
447   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
448   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
449   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
450   must complete, without further dependencies.  */
451
452   //naive, blocking implementation.
453   int i             = 0;
454   int j             = 0;
455   int size          = group->size();
456   MPI_Request* reqs = xbt_new0(MPI_Request, size);
457
458   XBT_DEBUG("Entering MPI_Win_Start");
459   while (j != size) {
460     int src = comm_->group()->rank(group->actor(j));
461     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
462       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
463       i++;
464     }
465     j++;
466   }
467   size = i;
468   Request::startall(size, reqs);
469   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
470   for (i = 0; i < size; i++) {
471     Request::unref(&reqs[i]);
472   }
473   xbt_free(reqs);
474   opened_++; //we're open for business !
475   group_=group;
476   group->ref();
477   XBT_DEBUG("Leaving MPI_Win_Start");
478   return MPI_SUCCESS;
479 }
480
481 int Win::post(MPI_Group group, int assert){
482   //let's make a synchronous send here
483   int i             = 0;
484   int j             = 0;
485   int size = group->size();
486   MPI_Request* reqs = xbt_new0(MPI_Request, size);
487
488   XBT_DEBUG("Entering MPI_Win_Post");
489   while(j!=size){
490     int dst = comm_->group()->rank(group->actor(j));
491     if (dst != rank_ && dst != MPI_UNDEFINED) {
492       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
493       i++;
494     }
495     j++;
496   }
497   size=i;
498
499   Request::startall(size, reqs);
500   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
501   for(i=0;i<size;i++){
502     Request::unref(&reqs[i]);
503   }
504   xbt_free(reqs);
505   opened_++; //we're open for business !
506   group_=group;
507   group->ref();
508   XBT_DEBUG("Leaving MPI_Win_Post");
509   return MPI_SUCCESS;
510 }
511
512 int Win::complete(){
513   if(opened_==0)
514     xbt_die("Complete called on already opened MPI_Win");
515
516   XBT_DEBUG("Entering MPI_Win_Complete");
517   int i             = 0;
518   int j             = 0;
519   int size          = group_->size();
520   MPI_Request* reqs = xbt_new0(MPI_Request, size);
521
522   while(j!=size){
523     int dst = comm_->group()->rank(group_->actor(j));
524     if (dst != rank_ && dst != MPI_UNDEFINED) {
525       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
526       i++;
527     }
528     j++;
529   }
530   size=i;
531   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
532   Request::startall(size, reqs);
533   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
534
535   for(i=0;i<size;i++){
536     Request::unref(&reqs[i]);
537   }
538   xbt_free(reqs);
539
540   int finished = finish_comms();
541   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
542
543   Group::unref(group_);
544   opened_--; //we're closed for business !
545   return MPI_SUCCESS;
546 }
547
548 int Win::wait(){
549   //naive, blocking implementation.
550   XBT_DEBUG("Entering MPI_Win_Wait");
551   int i             = 0;
552   int j             = 0;
553   int size          = group_->size();
554   MPI_Request* reqs = xbt_new0(MPI_Request, size);
555
556   while(j!=size){
557     int src = comm_->group()->rank(group_->actor(j));
558     if (src != rank_ && src != MPI_UNDEFINED) {
559       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
560       i++;
561     }
562     j++;
563   }
564   size=i;
565   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
566   Request::startall(size, reqs);
567   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
568   for(i=0;i<size;i++){
569     Request::unref(&reqs[i]);
570   }
571   xbt_free(reqs);
572   int finished = finish_comms();
573   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
574
575   Group::unref(group_);
576   opened_--; //we're opened for business !
577   return MPI_SUCCESS;
578 }
579
580 int Win::lock(int lock_type, int rank, int assert){
581   MPI_Win target_win = connected_wins_[rank];
582
583   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
584     xbt_mutex_acquire(target_win->lock_mut_);
585     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
586     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
587       xbt_mutex_release(target_win->lock_mut_);
588    }
589   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
590     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
591
592   target_win->lockers_.push_back(comm_->rank());
593
594   int finished = finish_comms(rank);
595   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
596   finished = target_win->finish_comms(rank_);
597   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
598   return MPI_SUCCESS;
599 }
600
601 int Win::lock_all(int assert){
602   int i=0;
603   int retval = MPI_SUCCESS;
604   for (i=0; i<comm_->size();i++){
605       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
606       if(ret != MPI_SUCCESS)
607         retval = ret;
608   }
609   return retval;
610 }
611
612 int Win::unlock(int rank){
613   MPI_Win target_win = connected_wins_[rank];
614   int target_mode = target_win->mode_;
615   target_win->mode_= 0;
616   target_win->lockers_.remove(comm_->rank());
617   if (target_mode==MPI_LOCK_EXCLUSIVE){
618     xbt_mutex_release(target_win->lock_mut_);
619   }
620
621   int finished = finish_comms(rank);
622   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
623   finished = target_win->finish_comms(rank_);
624   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
625   return MPI_SUCCESS;
626 }
627
628 int Win::unlock_all(){
629   int i=0;
630   int retval = MPI_SUCCESS;
631   for (i=0; i<comm_->size();i++){
632     int ret = this->unlock(i);
633     if (ret != MPI_SUCCESS)
634       retval = ret;
635   }
636   return retval;
637 }
638
639 int Win::flush(int rank){
640   MPI_Win target_win = connected_wins_[rank];
641   int finished       = finish_comms(rank_);
642   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
643   finished = target_win->finish_comms(rank);
644   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
645   return MPI_SUCCESS;
646 }
647
648 int Win::flush_local(int rank){
649   int finished = finish_comms(rank);
650   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
651   return MPI_SUCCESS;
652 }
653
654 int Win::flush_all(){
655   int finished = finish_comms();
656   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
657   for (int i = 0; i < comm_->size(); i++) {
658     finished = connected_wins_[i]->finish_comms(rank_);
659     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
660   }
661   return MPI_SUCCESS;
662 }
663
664 int Win::flush_local_all(){
665   int finished = finish_comms();
666   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
667   return MPI_SUCCESS;
668 }
669
670 Win* Win::f2c(int id){
671   return static_cast<Win*>(F2C::f2c(id));
672 }
673
674 int Win::finish_comms(){
675   xbt_mutex_acquire(mut_);
676   //Finish own requests
677   std::vector<MPI_Request> *reqqs = requests_;
678   int size = static_cast<int>(reqqs->size());
679   if (size > 0) {
680     MPI_Request* treqs = &(*reqqs)[0];
681     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
682     reqqs->clear();
683   }
684   xbt_mutex_release(mut_);
685   return size;
686 }
687
688 int Win::finish_comms(int rank){
689   xbt_mutex_acquire(mut_);
690   //Finish own requests
691   std::vector<MPI_Request> *reqqs = requests_;
692   int size = static_cast<int>(reqqs->size());
693   if (size > 0) {
694     size = 0;
695     std::vector<MPI_Request> myreqqs;
696     std::vector<MPI_Request>::iterator iter = reqqs->begin();
697     int proc_id                             = comm_->group()->actor(rank)->get_pid();
698     while (iter != reqqs->end()){
699       // Let's see if we're either the destination or the sender of this request
700       // because we only wait for requests that we are responsible for.
701       // Also use the process id here since the request itself returns from src()
702       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
703       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
704         myreqqs.push_back(*iter);
705         iter = reqqs->erase(iter);
706         size++;
707       } else {
708         ++iter;
709       }
710     }
711     if(size >0){
712       MPI_Request* treqs = &myreqqs[0];
713       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
714       myreqqs.clear();
715     }
716   }
717   xbt_mutex_release(mut_);
718   return size;
719 }
720
721 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
722 {
723   MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
724   for (int i = 0; not target_win && i < comm_->size(); i++) {
725     if (connected_wins_[i]->size_ > 0)
726       target_win = connected_wins_[i];
727   }
728   if (target_win) {
729     *size                         = target_win->size_;
730     *disp_unit                    = target_win->disp_unit_;
731     *static_cast<void**>(baseptr) = target_win->base_;
732   } else {
733     *size                         = 0;
734     *static_cast<void**>(baseptr) = xbt_malloc(0);
735   }
736   return MPI_SUCCESS;
737 }
738 }
739 }