Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
080f51916aab512564897fd6babbcc1df401dfd2
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35   mode_=0;
36
37   comm->add_rma_win(this);
38
39   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
40                          MPI_BYTE, comm);
41
42   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43
44   Colls::barrier(comm);
45 }
46
47 Win::~Win(){
48   //As per the standard, perform a barrier to ensure every async comm is finished
49   MSG_barrier_wait(bar_);
50
51   int finished = finish_comms();
52   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
53
54   delete requests_;
55   delete[] connected_wins_;
56   if (name_ != nullptr){
57     xbt_free(name_);
58   }
59   if(info_!=MPI_INFO_NULL){
60     MPI_Info_free(&info_);
61   }
62
63   comm_->remove_rma_win(this);
64
65   Colls::barrier(comm_);
66   int rank=comm_->rank();
67   if(rank == 0)
68     MSG_barrier_destroy(bar_);
69   xbt_mutex_destroy(mut_);
70   xbt_mutex_destroy(lock_mut_);
71
72   cleanup_attr<Win>();
73 }
74
75 void Win::get_name(char* name, int* length){
76   if(name_==nullptr){
77     *length=0;
78     name=nullptr;
79     return;
80   }
81   *length = strlen(name_);
82   strncpy(name, name_, *length+1);
83 }
84
85 void Win::get_group(MPI_Group* group){
86   if(comm_ != MPI_COMM_NULL){
87     *group = comm_->group();
88   } else {
89     *group = MPI_GROUP_NULL;
90   }
91 }
92
93 int Win::rank(){
94   return rank_;
95 }
96
97 MPI_Aint Win::size(){
98   return size_;
99 }
100
101 void* Win::base(){
102   return base_;
103 }
104
105 int Win::disp_unit(){
106   return disp_unit_;
107 }
108
109
110 void Win::set_name(char* name){
111   name_ = xbt_strdup(name);
112 }
113
114 int Win::fence(int assert)
115 {
116   XBT_DEBUG("Entering fence");
117   if (opened_ == 0)
118     opened_=1;
119   if (assert != MPI_MODE_NOPRECEDE) {
120     // This is not the first fence => finalize what came before
121     MSG_barrier_wait(bar_);
122     xbt_mutex_acquire(mut_);
123     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
124     // Without this, the vector could get redimensionned when another process pushes.
125     // This would result in the array used by Request::waitall() to be invalidated.
126     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
127     std::vector<MPI_Request> *reqs = requests_;
128     int size = static_cast<int>(reqs->size());
129     // start all requests that have been prepared by another process
130     if (size > 0) {
131       for (const auto& req : *reqs) {
132         if (req && (req->flags() & PREPARED))
133           req->start();
134       }
135
136       MPI_Request* treqs = &(*reqs)[0];
137
138       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
139     }
140     count_=0;
141     xbt_mutex_release(mut_);
142   }
143   assert_ = assert;
144
145   MSG_barrier_wait(bar_);
146   XBT_DEBUG("Leaving fence");
147
148   return MPI_SUCCESS;
149 }
150
151 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
152               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
153 {
154   //get receiver pointer
155   MPI_Win recv_win = connected_wins_[target_rank];
156
157   if(opened_==0){//check that post/start has been done
158     // no fence or start .. lock ok ?
159     int locked=0;
160     for(auto it : recv_win->lockers_)
161       if (it == comm_->rank())
162         locked = 1;
163     if(locked != 1)
164       return MPI_ERR_WIN;
165   }
166
167   if(target_count*target_datatype->get_extent()>recv_win->size_)
168     return MPI_ERR_ARG;
169
170   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
171   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
172
173   if(target_rank != comm_->rank()){
174     //prepare send_request
175     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
176         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
177
178     //prepare receiver request
179     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
180         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
181
182     //push request to receiver's win
183     xbt_mutex_acquire(recv_win->mut_);
184     recv_win->requests_->push_back(rreq);
185     xbt_mutex_release(recv_win->mut_);
186     //start send
187     sreq->start();
188
189     //push request to sender's win
190     xbt_mutex_acquire(mut_);
191     requests_->push_back(sreq);
192     xbt_mutex_release(mut_);
193   }else{
194     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
195   }
196
197   return MPI_SUCCESS;
198 }
199
200 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
201               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
202 {
203   //get sender pointer
204   MPI_Win send_win = connected_wins_[target_rank];
205
206   if(opened_==0){//check that post/start has been done
207     // no fence or start .. lock ok ?
208     int locked=0;
209     for(auto it : send_win->lockers_)
210       if (it == comm_->rank())
211         locked = 1;
212     if(locked != 1)
213       return MPI_ERR_WIN;
214   }
215
216   if(target_count*target_datatype->get_extent()>send_win->size_)
217     return MPI_ERR_ARG;
218
219   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
220   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
221
222   if(target_rank != comm_->rank()){
223     //prepare send_request
224     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
225         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
226         MPI_OP_NULL);
227
228     //prepare receiver request
229     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
230         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
231         MPI_OP_NULL);
232
233     //start the send, with another process than us as sender. 
234     sreq->start();
235     //push request to receiver's win
236     xbt_mutex_acquire(send_win->mut_);
237     send_win->requests_->push_back(sreq);
238     xbt_mutex_release(send_win->mut_);
239
240     //start recv
241     rreq->start();
242     //push request to sender's win
243     xbt_mutex_acquire(mut_);
244     requests_->push_back(rreq);
245     xbt_mutex_release(mut_);
246   }else{
247     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
248   }
249
250   return MPI_SUCCESS;
251 }
252
253
254 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
255               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
256 {
257
258   //get receiver pointer
259   MPI_Win recv_win = connected_wins_[target_rank];
260
261   if(opened_==0){//check that post/start has been done
262     // no fence or start .. lock ok ?
263     int locked=0;
264     for(auto it : recv_win->lockers_)
265       if (it == comm_->rank())
266         locked = 1;
267     if(locked != 1)
268       return MPI_ERR_WIN;
269   }
270   //FIXME: local version 
271
272   if(target_count*target_datatype->get_extent()>recv_win->size_)
273     return MPI_ERR_ARG;
274
275   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
276   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
277     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
278     //prepare send_request
279
280     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
281         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
282
283     //prepare receiver request
284     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
285         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
286
287     count_++;
288     //push request to receiver's win
289     xbt_mutex_acquire(recv_win->mut_);
290     recv_win->requests_->push_back(rreq);
291     xbt_mutex_release(recv_win->mut_);
292     //start send
293     sreq->start();
294
295     //push request to sender's win
296     xbt_mutex_acquire(mut_);
297     requests_->push_back(sreq);
298     xbt_mutex_release(mut_);
299
300   return MPI_SUCCESS;
301 }
302
303 int Win::start(MPI_Group group, int assert){
304     /* From MPI forum advices
305     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
306     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
307     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
308     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
309     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
310     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
311     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
312     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
313     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
314     must complete, without further dependencies.  */
315
316   //naive, blocking implementation.
317     int i             = 0;
318     int j             = 0;
319     int size          = group->size();
320     MPI_Request* reqs = xbt_new0(MPI_Request, size);
321
322     while (j != size) {
323       int src = group->index(j);
324       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
325         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
326         i++;
327       }
328       j++;
329   }
330   size=i;
331   Request::startall(size, reqs);
332   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
333   for(i=0;i<size;i++){
334     Request::unref(&reqs[i]);
335   }
336   xbt_free(reqs);
337   opened_++; //we're open for business !
338   group_=group;
339   group->ref();
340   return MPI_SUCCESS;
341 }
342
343 int Win::post(MPI_Group group, int assert){
344   //let's make a synchronous send here
345   int i             = 0;
346   int j             = 0;
347   int size = group->size();
348   MPI_Request* reqs = xbt_new0(MPI_Request, size);
349
350   while(j!=size){
351     int dst=group->index(j);
352     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
353       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
354       i++;
355     }
356     j++;
357   }
358   size=i;
359
360   Request::startall(size, reqs);
361   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
362   for(i=0;i<size;i++){
363     Request::unref(&reqs[i]);
364   }
365   xbt_free(reqs);
366   opened_++; //we're open for business !
367   group_=group;
368   group->ref();
369   return MPI_SUCCESS;
370 }
371
372 int Win::complete(){
373   if(opened_==0)
374     xbt_die("Complete called on already opened MPI_Win");
375
376   XBT_DEBUG("Entering MPI_Win_Complete");
377   int i             = 0;
378   int j             = 0;
379   int size = group_->size();
380   MPI_Request* reqs = xbt_new0(MPI_Request, size);
381
382   while(j!=size){
383     int dst=group_->index(j);
384     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
385       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
386       i++;
387     }
388     j++;
389   }
390   size=i;
391   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
392   Request::startall(size, reqs);
393   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
394
395   for(i=0;i<size;i++){
396     Request::unref(&reqs[i]);
397   }
398   xbt_free(reqs);
399
400   int finished = finish_comms();
401   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
402
403   Group::unref(group_);
404   opened_--; //we're closed for business !
405   return MPI_SUCCESS;
406 }
407
408 int Win::wait(){
409   //naive, blocking implementation.
410   XBT_DEBUG("Entering MPI_Win_Wait");
411   int i             = 0;
412   int j             = 0;
413   int size          = group_->size();
414   MPI_Request* reqs = xbt_new0(MPI_Request, size);
415
416   while(j!=size){
417     int src=group_->index(j);
418     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
419       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
420       i++;
421     }
422     j++;
423   }
424   size=i;
425   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
426   Request::startall(size, reqs);
427   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
428   for(i=0;i<size;i++){
429     Request::unref(&reqs[i]);
430   }
431   xbt_free(reqs);
432   int finished = finish_comms();
433   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
434
435   Group::unref(group_);
436   opened_--; //we're opened for business !
437   return MPI_SUCCESS;
438 }
439
440 int Win::lock(int lock_type, int rank, int assert){
441   if(opened_!=0)
442     return MPI_ERR_WIN;
443
444   MPI_Win target_win = connected_wins_[rank];
445
446   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
447     xbt_mutex_acquire(target_win->lock_mut_);
448     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
449     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
450       xbt_mutex_release(target_win->lock_mut_);
451    }
452   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
453         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
454
455   target_win->lockers_.push_back(comm_->rank());
456
457   int finished = finish_comms();
458   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
459
460   return MPI_SUCCESS;
461 }
462
463 int Win::unlock(int rank){
464   if(opened_!=0)
465     return MPI_ERR_WIN;
466
467   MPI_Win target_win = connected_wins_[rank];
468   int target_mode = target_win->mode_;
469   target_win->mode_= 0;
470   target_win->lockers_.remove(comm_->rank());
471   if (target_mode==MPI_LOCK_EXCLUSIVE){
472     xbt_mutex_release(target_win->lock_mut_);
473   }
474
475   int finished = finish_comms();
476   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
477
478   return MPI_SUCCESS;
479 }
480
481 Win* Win::f2c(int id){
482   return static_cast<Win*>(F2C::f2c(id));
483 }
484
485
486 int Win::finish_comms(){
487   xbt_mutex_acquire(mut_);
488   //Finish own requests
489   std::vector<MPI_Request> *reqqs = requests_;
490   int size = static_cast<int>(reqqs->size());
491   if (size > 0) {
492     // start all requests that have been prepared by another process
493     for (const auto& req : *reqqs) {
494       if (req && (req->flags() & PREPARED))
495         req->start();
496     }
497
498     MPI_Request* treqs = &(*reqqs)[0];
499     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
500     reqqs->clear();
501   }
502   xbt_mutex_release(mut_);
503   return size;
504 }
505
506
507 }
508 }