Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Now it works (if you don't look at the leaks)
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35
36   comm->add_rma_win(this);
37
38   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
39                          MPI_BYTE, comm);
40
41   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
42
43   Colls::barrier(comm);
44 }
45
46 Win::~Win(){
47   //As per the standard, perform a barrier to ensure every async comm is finished
48   MSG_barrier_wait(bar_);
49
50   int finished = finish_comms();
51   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
52
53   delete requests_;
54   delete[] connected_wins_;
55   if (name_ != nullptr){
56     xbt_free(name_);
57   }
58   if(info_!=MPI_INFO_NULL){
59     MPI_Info_free(&info_);
60   }
61
62   comm_->remove_rma_win(this);
63
64   Colls::barrier(comm_);
65   int rank=comm_->rank();
66   if(rank == 0)
67     MSG_barrier_destroy(bar_);
68   xbt_mutex_destroy(mut_);
69   xbt_mutex_destroy(lock_mut_);
70
71   cleanup_attr<Win>();
72 }
73
74 void Win::get_name(char* name, int* length){
75   if(name_==nullptr){
76     *length=0;
77     name=nullptr;
78     return;
79   }
80   *length = strlen(name_);
81   strncpy(name, name_, *length+1);
82 }
83
84 void Win::get_group(MPI_Group* group){
85   if(comm_ != MPI_COMM_NULL){
86     *group = comm_->group();
87   } else {
88     *group = MPI_GROUP_NULL;
89   }
90 }
91
92 int Win::rank(){
93   return rank_;
94 }
95
96 MPI_Aint Win::size(){
97   return size_;
98 }
99
100 void* Win::base(){
101   return base_;
102 }
103
104 int Win::disp_unit(){
105   return disp_unit_;
106 }
107
108
109 void Win::set_name(char* name){
110   name_ = xbt_strdup(name);
111 }
112
113 int Win::fence(int assert)
114 {
115   XBT_DEBUG("Entering fence");
116   if (opened_ == 0)
117     opened_=1;
118   if (assert != MPI_MODE_NOPRECEDE) {
119     // This is not the first fence => finalize what came before
120     MSG_barrier_wait(bar_);
121     xbt_mutex_acquire(mut_);
122     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
123     // Without this, the vector could get redimensionned when another process pushes.
124     // This would result in the array used by Request::waitall() to be invalidated.
125     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
126     std::vector<MPI_Request> *reqs = requests_;
127     int size = static_cast<int>(reqs->size());
128     // start all requests that have been prepared by another process
129     if (size > 0) {
130       for (const auto& req : *reqs) {
131         if (req && (req->flags() & PREPARED))
132           req->start();
133       }
134
135       MPI_Request* treqs = &(*reqs)[0];
136
137       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
138     }
139     count_=0;
140     xbt_mutex_release(mut_);
141   }
142   assert_ = assert;
143
144   MSG_barrier_wait(bar_);
145   XBT_DEBUG("Leaving fence");
146
147   return MPI_SUCCESS;
148 }
149
150 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
151               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
152 {
153   //get receiver pointer
154   MPI_Win recv_win = connected_wins_[target_rank];
155
156   if(opened_==0){//check that post/start has been done
157     // no fence or start .. lock ok ?
158     int locked=0;
159     for(auto it : recv_win->lockers_)
160       if (it == comm_->rank())
161         locked = 1;
162     if(locked != 1)
163       return MPI_ERR_WIN;
164   }
165
166   if(target_count*target_datatype->get_extent()>recv_win->size_)
167     return MPI_ERR_ARG;
168
169   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
170   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
171
172   if(target_rank != comm_->rank()){
173     //prepare send_request
174     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
175         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
176
177     //prepare receiver request
178     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
179         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
180
181     //push request to receiver's win
182     xbt_mutex_acquire(recv_win->mut_);
183     recv_win->requests_->push_back(rreq);
184     xbt_mutex_release(recv_win->mut_);
185     //start send
186     sreq->start();
187
188     //push request to sender's win
189     xbt_mutex_acquire(mut_);
190     requests_->push_back(sreq);
191     xbt_mutex_release(mut_);
192   }else{
193     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
194   }
195
196   return MPI_SUCCESS;
197 }
198
199 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
200               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
201 {
202   //get sender pointer
203   MPI_Win send_win = connected_wins_[target_rank];
204
205   if(opened_==0){//check that post/start has been done
206     // no fence or start .. lock ok ?
207     int locked=0;
208     for(auto it : send_win->lockers_)
209       if (it == comm_->rank())
210         locked = 1;
211     if(locked != 1)
212       return MPI_ERR_WIN;
213   }
214
215   if(target_count*target_datatype->get_extent()>send_win->size_)
216     return MPI_ERR_ARG;
217
218   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
219   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
220
221   if(target_rank != comm_->rank()){
222     //prepare send_request
223     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
224         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
225         MPI_OP_NULL);
226
227     //prepare receiver request
228     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
229         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
230         MPI_OP_NULL);
231
232     //start the send, with another process than us as sender. 
233     sreq->start();
234     //push request to receiver's win
235     xbt_mutex_acquire(send_win->mut_);
236     send_win->requests_->push_back(sreq);
237     xbt_mutex_release(send_win->mut_);
238
239     //start recv
240     rreq->start();
241     //push request to sender's win
242     xbt_mutex_acquire(mut_);
243     requests_->push_back(rreq);
244     xbt_mutex_release(mut_);
245   }else{
246     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
247   }
248
249   return MPI_SUCCESS;
250 }
251
252
253 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
254               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
255 {
256
257   //get receiver pointer
258   MPI_Win recv_win = connected_wins_[target_rank];
259
260   if(opened_==0){//check that post/start has been done
261     // no fence or start .. lock ok ?
262     int locked=0;
263     for(auto it : recv_win->lockers_)
264       if (it == comm_->rank())
265         locked = 1;
266     if(locked != 1)
267       return MPI_ERR_WIN;
268   }
269   //FIXME: local version 
270
271   if(target_count*target_datatype->get_extent()>recv_win->size_)
272     return MPI_ERR_ARG;
273
274   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
275   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
276     //As the tag will be used for ordering of the operations, add count to it
277     //prepare send_request
278     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
279         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
280
281     //prepare receiver request
282     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
283         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
284
285     count_++;
286     //push request to receiver's win
287     xbt_mutex_acquire(recv_win->mut_);
288     recv_win->requests_->push_back(rreq);
289     xbt_mutex_release(recv_win->mut_);
290     //start send
291     sreq->start();
292
293     //push request to sender's win
294     xbt_mutex_acquire(mut_);
295     requests_->push_back(sreq);
296     xbt_mutex_release(mut_);
297
298   return MPI_SUCCESS;
299 }
300
301 int Win::start(MPI_Group group, int assert){
302     /* From MPI forum advices
303     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
304     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
305     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
306     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
307     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
308     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
309     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
310     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
311     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
312     must complete, without further dependencies.  */
313
314   //naive, blocking implementation.
315     int i             = 0;
316     int j             = 0;
317     int size          = group->size();
318     MPI_Request* reqs = xbt_new0(MPI_Request, size);
319
320     while (j != size) {
321       int src = group->index(j);
322       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
323         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
324         i++;
325       }
326       j++;
327   }
328   size=i;
329   Request::startall(size, reqs);
330   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
331   for(i=0;i<size;i++){
332     Request::unref(&reqs[i]);
333   }
334   xbt_free(reqs);
335   opened_++; //we're open for business !
336   group_=group;
337   group->ref();
338   return MPI_SUCCESS;
339 }
340
341 int Win::post(MPI_Group group, int assert){
342   //let's make a synchronous send here
343   int i             = 0;
344   int j             = 0;
345   int size = group->size();
346   MPI_Request* reqs = xbt_new0(MPI_Request, size);
347
348   while(j!=size){
349     int dst=group->index(j);
350     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
351       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
352       i++;
353     }
354     j++;
355   }
356   size=i;
357
358   Request::startall(size, reqs);
359   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
360   for(i=0;i<size;i++){
361     Request::unref(&reqs[i]);
362   }
363   xbt_free(reqs);
364   opened_++; //we're open for business !
365   group_=group;
366   group->ref();
367   return MPI_SUCCESS;
368 }
369
370 int Win::complete(){
371   if(opened_==0)
372     xbt_die("Complete called on already opened MPI_Win");
373
374   XBT_DEBUG("Entering MPI_Win_Complete");
375   int i             = 0;
376   int j             = 0;
377   int size = group_->size();
378   MPI_Request* reqs = xbt_new0(MPI_Request, size);
379
380   while(j!=size){
381     int dst=group_->index(j);
382     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
383       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
384       i++;
385     }
386     j++;
387   }
388   size=i;
389   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
390   Request::startall(size, reqs);
391   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
392
393   for(i=0;i<size;i++){
394     Request::unref(&reqs[i]);
395   }
396   xbt_free(reqs);
397
398   int finished = finish_comms();
399   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
400
401   Group::unref(group_);
402   opened_--; //we're closed for business !
403   return MPI_SUCCESS;
404 }
405
406 int Win::wait(){
407   //naive, blocking implementation.
408   XBT_DEBUG("Entering MPI_Win_Wait");
409   int i=0,j=0;
410   int size = group_->size();
411   MPI_Request* reqs = xbt_new0(MPI_Request, size);
412
413   while(j!=size){
414     int src=group_->index(j);
415     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
416       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
417       i++;
418     }
419     j++;
420   }
421   size=i;
422   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
423   Request::startall(size, reqs);
424   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
425   for(i=0;i<size;i++){
426     Request::unref(&reqs[i]);
427   }
428   xbt_free(reqs);
429   int finished = finish_comms();
430   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
431
432   Group::unref(group_);
433   opened_--; //we're opened for business !
434   return MPI_SUCCESS;
435 }
436
437 int Win::lock(int lock_type, int rank, int assert){
438   MPI_Win target_win = connected_wins_[rank];
439
440   int finished = finish_comms();
441   XBT_DEBUG("Win_lock - Finished %d RMA calls", finished);
442
443   //window already locked, we have to wait
444   if (lock_type == MPI_LOCK_EXCLUSIVE)
445     xbt_mutex_acquire(target_win->lock_mut_);
446
447   xbt_mutex_acquire(target_win->mut_);
448   target_win->lockers_.push_back(comm_->rank());
449   xbt_mutex_release(target_win->mut_);  
450
451   return MPI_SUCCESS;
452 }
453
454 int Win::unlock(int rank){
455   MPI_Win target_win = connected_wins_[rank];
456
457   int finished = finish_comms();
458   XBT_DEBUG("Win_unlock - Finished %d RMA calls", finished);
459
460   xbt_mutex_acquire(target_win->mut_);
461   target_win->lockers_.remove(comm_->rank());
462   xbt_mutex_release(target_win->mut_);
463
464   xbt_mutex_try_acquire(target_win->lock_mut_);
465   xbt_mutex_release(target_win->lock_mut_);
466   return MPI_SUCCESS;
467 }
468
469 Win* Win::f2c(int id){
470   return static_cast<Win*>(F2C::f2c(id));
471 }
472
473
474 int Win::finish_comms(){
475   //Finish own requests
476   std::vector<MPI_Request> *reqqs = requests_;
477   int size = static_cast<int>(reqqs->size());
478   if (size > 0) {
479     xbt_mutex_acquire(mut_);
480     // start all requests that have been prepared by another process
481     for (const auto& req : *reqqs) {
482       if (req && (req->flags() & PREPARED))
483         req->start();
484     }
485
486     MPI_Request* treqs = &(*reqqs)[0];
487     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
488     reqqs->clear();
489     xbt_mutex_release(mut_);
490   }
491
492   return size;
493 }
494
495
496 }
497 }