Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Fix wrong smpi::Request instantiation for RMA's.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7 #include "private.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
17
18 namespace simgrid{
19 namespace smpi{
20 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
21 int Win::keyval_id_=0;
22
23 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
24   int comm_size = comm->size();
25   rank_      = comm->rank();
26   XBT_DEBUG("Creating window");
27   if(info!=MPI_INFO_NULL)
28     info->ref();
29   name_ = nullptr;
30   opened_ = 0;
31   group_ = MPI_GROUP_NULL;
32   requests_ = new std::vector<MPI_Request>();
33   mut_=xbt_mutex_init();
34   lock_mut_=xbt_mutex_init();
35   atomic_mut_=xbt_mutex_init();
36   connected_wins_ = new MPI_Win[comm_size];
37   connected_wins_[rank_] = this;
38   count_ = 0;
39   if(rank_==0){
40     bar_ = MSG_barrier_init(comm_size);
41   }
42   mode_=0;
43
44   comm->add_rma_win(this);
45
46   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
47                          MPI_BYTE, comm);
48
49   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
50
51   Colls::barrier(comm);
52 }
53
54 Win::~Win(){
55   //As per the standard, perform a barrier to ensure every async comm is finished
56   MSG_barrier_wait(bar_);
57
58   int finished = finish_comms();
59   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
60
61   delete requests_;
62   delete[] connected_wins_;
63   if (name_ != nullptr){
64     xbt_free(name_);
65   }
66   if(info_!=MPI_INFO_NULL){
67     MPI_Info_free(&info_);
68   }
69
70   comm_->remove_rma_win(this);
71
72   Colls::barrier(comm_);
73   if (rank_ == 0)
74     MSG_barrier_destroy(bar_);
75   xbt_mutex_destroy(mut_);
76   xbt_mutex_destroy(lock_mut_);
77   xbt_mutex_destroy(atomic_mut_);
78
79   if(allocated_ !=0)
80     xbt_free(base_);
81
82   cleanup_attr<Win>();
83 }
84
85 int Win::attach (void *base, MPI_Aint size){
86   if (not(base_ == MPI_BOTTOM || base_ == 0))
87     return MPI_ERR_ARG;
88   base_=0;//actually the address will be given in the RMA calls, as being the disp.
89   size_+=size;
90   return MPI_SUCCESS;
91 }
92
93 int Win::detach (void *base){
94   base_=MPI_BOTTOM;
95   size_=-1;
96   return MPI_SUCCESS;
97 }
98
99 void Win::get_name(char* name, int* length){
100   if(name_==nullptr){
101     *length=0;
102     name=nullptr;
103     return;
104   }
105   *length = strlen(name_);
106   strncpy(name, name_, *length+1);
107 }
108
109 void Win::get_group(MPI_Group* group){
110   if(comm_ != MPI_COMM_NULL){
111     *group = comm_->group();
112   } else {
113     *group = MPI_GROUP_NULL;
114   }
115 }
116
117 MPI_Info Win::info(){
118   if(info_== MPI_INFO_NULL)
119     info_ = new Info();
120   info_->ref();
121   return info_;
122 }
123
124 int Win::rank(){
125   return rank_;
126 }
127
128 MPI_Aint Win::size(){
129   return size_;
130 }
131
132 void* Win::base(){
133   return base_;
134 }
135
136 int Win::disp_unit(){
137   return disp_unit_;
138 }
139
140 int Win::dynamic(){
141   return dynamic_;
142 }
143
144 void Win::set_info(MPI_Info info){
145   if(info_!= MPI_INFO_NULL)
146     info->ref();
147   info_=info;
148 }
149
150 void Win::set_name(char* name){
151   name_ = xbt_strdup(name);
152 }
153
154 int Win::fence(int assert)
155 {
156   XBT_DEBUG("Entering fence");
157   if (opened_ == 0)
158     opened_=1;
159   if (assert != MPI_MODE_NOPRECEDE) {
160     // This is not the first fence => finalize what came before
161     MSG_barrier_wait(bar_);
162     xbt_mutex_acquire(mut_);
163     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
164     // Without this, the vector could get redimensionned when another process pushes.
165     // This would result in the array used by Request::waitall() to be invalidated.
166     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
167     std::vector<MPI_Request> *reqs = requests_;
168     int size = static_cast<int>(reqs->size());
169     // start all requests that have been prepared by another process
170     if (size > 0) {
171       MPI_Request* treqs = &(*reqs)[0];
172       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
173     }
174     count_=0;
175     xbt_mutex_release(mut_);
176   }
177
178   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
179     opened_=0;
180   assert_ = assert;
181
182   MSG_barrier_wait(bar_);
183   XBT_DEBUG("Leaving fence");
184
185   return MPI_SUCCESS;
186 }
187
188 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
189               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
190 {
191   //get receiver pointer
192   MPI_Win recv_win = connected_wins_[target_rank];
193
194   if(opened_==0){//check that post/start has been done
195     // no fence or start .. lock ok ?
196     int locked=0;
197     for (auto const& it : recv_win->lockers_)
198       if (it == comm_->rank())
199         locked = 1;
200     if(locked != 1)
201       return MPI_ERR_WIN;
202   }
203
204   if(target_count*target_datatype->get_extent()>recv_win->size_)
205     return MPI_ERR_ARG;
206
207   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
208   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
209
210   if (target_rank != comm_->rank()) {
211     //prepare send_request
212     MPI_Request sreq =
213         Request::rma_send_init(origin_addr, origin_count, origin_datatype, simgrid::s4u::Actor::self()->getPid(),
214                                comm_->group()->actor(target_rank)->getPid(), SMPI_RMA_TAG + 1, comm_, MPI_OP_NULL);
215
216     //prepare receiver request
217     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, simgrid::s4u::Actor::self()->getPid(),
218                                               comm_->group()->actor(target_rank)->getPid(), SMPI_RMA_TAG + 1,
219                                               recv_win->comm_, MPI_OP_NULL);
220
221     //start send
222     sreq->start();
223
224     if(request!=nullptr){
225       *request=sreq;
226     }else{
227       xbt_mutex_acquire(mut_);
228       requests_->push_back(sreq);
229       xbt_mutex_release(mut_);
230     }
231
232     //push request to receiver's win
233     xbt_mutex_acquire(recv_win->mut_);
234     recv_win->requests_->push_back(rreq);
235     rreq->start();
236     xbt_mutex_release(recv_win->mut_);
237
238   }else{
239     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
240     if(request!=nullptr)
241       *request = MPI_REQUEST_NULL;
242   }
243
244   return MPI_SUCCESS;
245 }
246
247 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
248               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
249 {
250   //get sender pointer
251   MPI_Win send_win = connected_wins_[target_rank];
252
253   if(opened_==0){//check that post/start has been done
254     // no fence or start .. lock ok ?
255     int locked=0;
256     for (auto const& it : send_win->lockers_)
257       if (it == comm_->rank())
258         locked = 1;
259     if(locked != 1)
260       return MPI_ERR_WIN;
261   }
262
263   if(target_count*target_datatype->get_extent()>send_win->size_)
264     return MPI_ERR_ARG;
265
266   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
267   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
268
269   if(target_rank != comm_->rank()){
270     //prepare send_request
271     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
272                                               comm_->group()->actor(target_rank)->getPid(), simgrid::s4u::Actor::self()->getPid(),
273                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
274
275     //prepare receiver request
276     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
277                                               comm_->group()->actor(target_rank)->getPid(), simgrid::s4u::Actor::self()->getPid(),
278                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
279
280     //start the send, with another process than us as sender.
281     sreq->start();
282     //push request to receiver's win
283     xbt_mutex_acquire(send_win->mut_);
284     send_win->requests_->push_back(sreq);
285     xbt_mutex_release(send_win->mut_);
286
287     //start recv
288     rreq->start();
289
290     if(request!=nullptr){
291       *request=rreq;
292     }else{
293       xbt_mutex_acquire(mut_);
294       requests_->push_back(rreq);
295       xbt_mutex_release(mut_);
296     }
297
298   }else{
299     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
300     if(request!=nullptr)
301       *request=MPI_REQUEST_NULL;
302   }
303
304   return MPI_SUCCESS;
305 }
306
307
308 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
309               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
310 {
311   XBT_DEBUG("Entering MPI_Win_Accumulate");
312   //get receiver pointer
313   MPI_Win recv_win = connected_wins_[target_rank];
314
315   if(opened_==0){//check that post/start has been done
316     // no fence or start .. lock ok ?
317     int locked=0;
318     for (auto const& it : recv_win->lockers_)
319       if (it == comm_->rank())
320         locked = 1;
321     if(locked != 1)
322       return MPI_ERR_WIN;
323   }
324   //FIXME: local version
325
326   if(target_count*target_datatype->get_extent()>recv_win->size_)
327     return MPI_ERR_ARG;
328
329   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
330   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
331     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
332     //prepare send_request
333
334   MPI_Request sreq =
335       Request::rma_send_init(origin_addr, origin_count, origin_datatype, simgrid::s4u::Actor::self()->getPid(),
336                              comm_->group()->actor(target_rank)->getPid(), SMPI_RMA_TAG - 3 - count_, comm_, op);
337
338   // prepare receiver request
339   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, simgrid::s4u::Actor::self()->getPid(),
340                                             comm_->group()->actor(target_rank)->getPid(), SMPI_RMA_TAG - 3 - count_,
341                                             recv_win->comm_, op);
342
343   count_++;
344
345   // start send
346   sreq->start();
347   // push request to receiver's win
348   xbt_mutex_acquire(recv_win->mut_);
349   recv_win->requests_->push_back(rreq);
350   rreq->start();
351   xbt_mutex_release(recv_win->mut_);
352
353   if (request != nullptr) {
354     *request = sreq;
355     }else{
356       xbt_mutex_acquire(mut_);
357       requests_->push_back(sreq);
358       xbt_mutex_release(mut_);
359     }
360
361   XBT_DEBUG("Leaving MPI_Win_Accumulate");
362   return MPI_SUCCESS;
363 }
364
365 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
366               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
367               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
368
369   //get sender pointer
370   MPI_Win send_win = connected_wins_[target_rank];
371
372   if(opened_==0){//check that post/start has been done
373     // no fence or start .. lock ok ?
374     int locked=0;
375     for (auto const& it : send_win->lockers_)
376       if (it == comm_->rank())
377         locked = 1;
378     if(locked != 1)
379       return MPI_ERR_WIN;
380   }
381
382   if(target_count*target_datatype->get_extent()>send_win->size_)
383     return MPI_ERR_ARG;
384
385   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
386   //need to be sure ops are correctly ordered, so finish request here ? slow.
387   MPI_Request req;
388   xbt_mutex_acquire(send_win->atomic_mut_);
389   get(result_addr, result_count, result_datatype, target_rank,
390               target_disp, target_count, target_datatype, &req);
391   if (req != MPI_REQUEST_NULL)
392     Request::wait(&req, MPI_STATUS_IGNORE);
393   if(op!=MPI_NO_OP)
394     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
395               target_disp, target_count, target_datatype, op, &req);
396   if (req != MPI_REQUEST_NULL)
397     Request::wait(&req, MPI_STATUS_IGNORE);
398   xbt_mutex_release(send_win->atomic_mut_);
399   return MPI_SUCCESS;
400
401 }
402
403 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
404         void *result_addr, MPI_Datatype datatype, int target_rank,
405         MPI_Aint target_disp){
406   //get sender pointer
407   MPI_Win send_win = connected_wins_[target_rank];
408
409   if(opened_==0){//check that post/start has been done
410     // no fence or start .. lock ok ?
411     int locked=0;
412     for (auto const& it : send_win->lockers_)
413       if (it == comm_->rank())
414         locked = 1;
415     if(locked != 1)
416       return MPI_ERR_WIN;
417   }
418
419   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
420   MPI_Request req = MPI_REQUEST_NULL;
421   xbt_mutex_acquire(send_win->atomic_mut_);
422   get(result_addr, 1, datatype, target_rank,
423               target_disp, 1, datatype, &req);
424   if (req != MPI_REQUEST_NULL)
425     Request::wait(&req, MPI_STATUS_IGNORE);
426   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
427     put(origin_addr, 1, datatype, target_rank,
428               target_disp, 1, datatype);
429   }
430   xbt_mutex_release(send_win->atomic_mut_);
431   return MPI_SUCCESS;
432 }
433
434 int Win::start(MPI_Group group, int assert){
435     /* From MPI forum advices
436     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
437     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
438     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
439     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
440     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
441     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
442     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
443     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
444     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
445     must complete, without further dependencies.  */
446
447   //naive, blocking implementation.
448     int i             = 0;
449     int j             = 0;
450     int size          = group->size();
451     MPI_Request* reqs = xbt_new0(MPI_Request, size);
452
453   XBT_DEBUG("Entering MPI_Win_Start");
454     while (j != size) {
455       int src = group->actor(j)->getPid();
456       if ((unsigned)src != simgrid::s4u::Actor::self()->getPid() && src != MPI_UNDEFINED) {
457         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
458         i++;
459       }
460       j++;
461   }
462   size=i;
463   Request::startall(size, reqs);
464   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
465   for(i=0;i<size;i++){
466     Request::unref(&reqs[i]);
467   }
468   xbt_free(reqs);
469   opened_++; //we're open for business !
470   group_=group;
471   group->ref();
472   XBT_DEBUG("Leaving MPI_Win_Start");
473   return MPI_SUCCESS;
474 }
475
476 int Win::post(MPI_Group group, int assert){
477   //let's make a synchronous send here
478   int i             = 0;
479   int j             = 0;
480   int size = group->size();
481   MPI_Request* reqs = xbt_new0(MPI_Request, size);
482
483   XBT_DEBUG("Entering MPI_Win_Post");
484   while(j!=size){
485     int dst=group->actor(j)->getPid();
486     if ((unsigned)dst != simgrid::s4u::Actor::self()->getPid() && dst != MPI_UNDEFINED) {
487       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
488       i++;
489     }
490     j++;
491   }
492   size=i;
493
494   Request::startall(size, reqs);
495   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
496   for(i=0;i<size;i++){
497     Request::unref(&reqs[i]);
498   }
499   xbt_free(reqs);
500   opened_++; //we're open for business !
501   group_=group;
502   group->ref();
503   XBT_DEBUG("Leaving MPI_Win_Post");
504   return MPI_SUCCESS;
505 }
506
507 int Win::complete(){
508   if(opened_==0)
509     xbt_die("Complete called on already opened MPI_Win");
510
511   XBT_DEBUG("Entering MPI_Win_Complete");
512   int i             = 0;
513   int j             = 0;
514   int size = group_->size();
515   MPI_Request* reqs = xbt_new0(MPI_Request, size);
516
517   while(j!=size){
518     int dst=group_->actor(j)->getPid();
519     if ((unsigned)dst != simgrid::s4u::Actor::self()->getPid() && dst != MPI_UNDEFINED) {
520       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
521       i++;
522     }
523     j++;
524   }
525   size=i;
526   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
527   Request::startall(size, reqs);
528   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
529
530   for(i=0;i<size;i++){
531     Request::unref(&reqs[i]);
532   }
533   xbt_free(reqs);
534
535   int finished = finish_comms();
536   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
537
538   Group::unref(group_);
539   opened_--; //we're closed for business !
540   return MPI_SUCCESS;
541 }
542
543 int Win::wait(){
544   //naive, blocking implementation.
545   XBT_DEBUG("Entering MPI_Win_Wait");
546   int i             = 0;
547   int j             = 0;
548   int size          = group_->size();
549   MPI_Request* reqs = xbt_new0(MPI_Request, size);
550
551   while(j!=size){
552     int src=group_->actor(j)->getPid();
553     if ((unsigned)src != simgrid::s4u::Actor::self()->getPid() && src != MPI_UNDEFINED) {
554       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
555       i++;
556     }
557     j++;
558   }
559   size=i;
560   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
561   Request::startall(size, reqs);
562   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
563   for(i=0;i<size;i++){
564     Request::unref(&reqs[i]);
565   }
566   xbt_free(reqs);
567   int finished = finish_comms();
568   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
569
570   Group::unref(group_);
571   opened_--; //we're opened for business !
572   return MPI_SUCCESS;
573 }
574
575 int Win::lock(int lock_type, int rank, int assert){
576   MPI_Win target_win = connected_wins_[rank];
577
578   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
579     xbt_mutex_acquire(target_win->lock_mut_);
580     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
581     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
582       xbt_mutex_release(target_win->lock_mut_);
583    }
584   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
585     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
586
587   target_win->lockers_.push_back(comm_->rank());
588
589   int finished = finish_comms(rank);
590   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
591   finished = target_win->finish_comms(rank_);
592   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
593   return MPI_SUCCESS;
594 }
595
596 int Win::lock_all(int assert){
597   int i=0;
598   int retval = MPI_SUCCESS;
599   for (i=0; i<comm_->size();i++){
600       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
601       if(ret != MPI_SUCCESS)
602         retval = ret;
603   }
604   return retval;
605 }
606
607 int Win::unlock(int rank){
608   MPI_Win target_win = connected_wins_[rank];
609   int target_mode = target_win->mode_;
610   target_win->mode_= 0;
611   target_win->lockers_.remove(comm_->rank());
612   if (target_mode==MPI_LOCK_EXCLUSIVE){
613     xbt_mutex_release(target_win->lock_mut_);
614   }
615
616   int finished = finish_comms(rank);
617   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
618   finished = target_win->finish_comms(rank_);
619   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
620   return MPI_SUCCESS;
621 }
622
623 int Win::unlock_all(){
624   int i=0;
625   int retval = MPI_SUCCESS;
626   for (i=0; i<comm_->size();i++){
627       int ret = this->unlock(i);
628       if(ret != MPI_SUCCESS)
629         retval = ret;
630   }
631   return retval;
632 }
633
634 int Win::flush(int rank){
635   MPI_Win target_win = connected_wins_[rank];
636   int finished = finish_comms(rank);
637   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
638   finished = target_win->finish_comms(rank_);
639   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
640   return MPI_SUCCESS;
641 }
642
643 int Win::flush_local(int rank){
644   int finished = finish_comms(rank);
645   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
646   return MPI_SUCCESS;
647 }
648
649 int Win::flush_all(){
650   int i=0;
651   int finished = 0;
652   finished = finish_comms();
653   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
654   for (i=0; i<comm_->size();i++){
655     finished = connected_wins_[i]->finish_comms(rank_);
656     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
657   }
658   return MPI_SUCCESS;
659 }
660
661 int Win::flush_local_all(){
662   int finished = finish_comms();
663   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
664   return MPI_SUCCESS;
665 }
666
667 Win* Win::f2c(int id){
668   return static_cast<Win*>(F2C::f2c(id));
669 }
670
671
672 int Win::finish_comms(){
673   xbt_mutex_acquire(mut_);
674   //Finish own requests
675   std::vector<MPI_Request> *reqqs = requests_;
676   int size = static_cast<int>(reqqs->size());
677   if (size > 0) {
678     MPI_Request* treqs = &(*reqqs)[0];
679     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
680     reqqs->clear();
681   }
682   xbt_mutex_release(mut_);
683   return size;
684 }
685
686 int Win::finish_comms(int rank){
687   xbt_mutex_acquire(mut_);
688   //Finish own requests
689   std::vector<MPI_Request> *reqqs = requests_;
690   int size = static_cast<int>(reqqs->size());
691   if (size > 0) {
692     size = 0;
693     std::vector<MPI_Request> myreqqs;
694     std::vector<MPI_Request>::iterator iter = reqqs->begin();
695     while (iter != reqqs->end()){
696       if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
697         myreqqs.push_back(*iter);
698         iter = reqqs->erase(iter);
699         size++;
700       } else {
701         ++iter;
702       }
703     }
704     if(size >0){
705       MPI_Request* treqs = &myreqqs[0];
706       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
707       myreqqs.clear();
708     }
709   }
710   xbt_mutex_release(mut_);
711   return size;
712 }
713
714
715 }
716 }