Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of scm.gforge.inria.fr:/gitroot/simgrid/simgrid
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35
36   comm->add_rma_win(this);
37
38   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
39                          MPI_BYTE, comm);
40
41   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
42
43   Colls::barrier(comm);
44 }
45
46 Win::~Win(){
47   //As per the standard, perform a barrier to ensure every async comm is finished
48   MSG_barrier_wait(bar_);
49
50   int finished = finish_comms();
51   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
52
53   delete requests_;
54   delete[] connected_wins_;
55   if (name_ != nullptr){
56     xbt_free(name_);
57   }
58   if(info_!=MPI_INFO_NULL){
59     MPI_Info_free(&info_);
60   }
61
62   comm_->remove_rma_win(this);
63
64   Colls::barrier(comm_);
65   int rank=comm_->rank();
66   if(rank == 0)
67     MSG_barrier_destroy(bar_);
68   xbt_mutex_destroy(mut_);
69   xbt_mutex_destroy(lock_mut_);
70
71   cleanup_attr<Win>();
72 }
73
74 void Win::get_name(char* name, int* length){
75   if(name_==nullptr){
76     *length=0;
77     name=nullptr;
78     return;
79   }
80   *length = strlen(name_);
81   strncpy(name, name_, *length+1);
82 }
83
84 void Win::get_group(MPI_Group* group){
85   if(comm_ != MPI_COMM_NULL){
86     *group = comm_->group();
87   } else {
88     *group = MPI_GROUP_NULL;
89   }
90 }
91
92 int Win::rank(){
93   return rank_;
94 }
95
96 MPI_Aint Win::size(){
97   return size_;
98 }
99
100 void* Win::base(){
101   return base_;
102 }
103
104 int Win::disp_unit(){
105   return disp_unit_;
106 }
107
108
109 void Win::set_name(char* name){
110   name_ = xbt_strdup(name);
111 }
112
113 int Win::fence(int assert)
114 {
115   XBT_DEBUG("Entering fence");
116   if (opened_ == 0)
117     opened_=1;
118   if (assert != MPI_MODE_NOPRECEDE) {
119     // This is not the first fence => finalize what came before
120     MSG_barrier_wait(bar_);
121     xbt_mutex_acquire(mut_);
122     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
123     // Without this, the vector could get redimensionned when another process pushes.
124     // This would result in the array used by Request::waitall() to be invalidated.
125     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
126     std::vector<MPI_Request> *reqs = requests_;
127     int size = static_cast<int>(reqs->size());
128     // start all requests that have been prepared by another process
129     if (size > 0) {
130       for (const auto& req : *reqs) {
131         if (req && (req->flags() & PREPARED))
132           req->start();
133       }
134
135       MPI_Request* treqs = &(*reqs)[0];
136
137       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
138     }
139     count_=0;
140     xbt_mutex_release(mut_);
141   }
142   assert_ = assert;
143
144   MSG_barrier_wait(bar_);
145   XBT_DEBUG("Leaving fence");
146
147   return MPI_SUCCESS;
148 }
149
150 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
151               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
152 {
153   //get receiver pointer
154   MPI_Win recv_win = connected_wins_[target_rank];
155
156   if(opened_==0){//check that post/start has been done
157     // no fence or start .. lock ok ?
158     int locked=0;
159     for(auto it : recv_win->lockers_)
160       if (it == comm_->rank())
161         locked = 1;
162     if(locked != 1)
163       return MPI_ERR_WIN;
164   }
165
166   if(target_count*target_datatype->get_extent()>recv_win->size_)
167     return MPI_ERR_ARG;
168
169   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
170   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
171
172   if(target_rank != comm_->rank()){
173     //prepare send_request
174     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
175         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
176
177     //prepare receiver request
178     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
179         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
180
181     //push request to receiver's win
182     xbt_mutex_acquire(recv_win->mut_);
183     recv_win->requests_->push_back(rreq);
184     xbt_mutex_release(recv_win->mut_);
185     //start send
186     sreq->start();
187
188     //push request to sender's win
189     xbt_mutex_acquire(mut_);
190     requests_->push_back(sreq);
191     xbt_mutex_release(mut_);
192   }else{
193     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
194   }
195
196   return MPI_SUCCESS;
197 }
198
199 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
200               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
201 {
202   //get sender pointer
203   MPI_Win send_win = connected_wins_[target_rank];
204
205   if(opened_==0){//check that post/start has been done
206     // no fence or start .. lock ok ?
207     int locked=0;
208     for(auto it : send_win->lockers_)
209       if (it == comm_->rank())
210         locked = 1;
211     if(locked != 1)
212       return MPI_ERR_WIN;
213   }
214
215   if(target_count*target_datatype->get_extent()>send_win->size_)
216     return MPI_ERR_ARG;
217
218   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
219   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
220
221   if(target_rank != comm_->rank()){
222     //prepare send_request
223     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
224         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
225         MPI_OP_NULL);
226
227     //prepare receiver request
228     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
229         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
230         MPI_OP_NULL);
231
232     //start the send, with another process than us as sender. 
233     sreq->start();
234     //push request to receiver's win
235     xbt_mutex_acquire(send_win->mut_);
236     send_win->requests_->push_back(sreq);
237     xbt_mutex_release(send_win->mut_);
238
239     //start recv
240     rreq->start();
241     //push request to sender's win
242     xbt_mutex_acquire(mut_);
243     requests_->push_back(rreq);
244     xbt_mutex_release(mut_);
245   }else{
246     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
247   }
248
249   return MPI_SUCCESS;
250 }
251
252
253 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
254               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
255 {
256
257   //get receiver pointer
258   MPI_Win recv_win = connected_wins_[target_rank];
259
260   if(opened_==0){//check that post/start has been done
261     // no fence or start .. lock ok ?
262     int locked=0;
263     for(auto it : recv_win->lockers_)
264       if (it == comm_->rank())
265         locked = 1;
266     if(locked != 1)
267       return MPI_ERR_WIN;
268   }
269   //FIXME: local version 
270
271   if(target_count*target_datatype->get_extent()>recv_win->size_)
272     return MPI_ERR_ARG;
273
274   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
275   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
276     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
277     //prepare send_request
278
279     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
280         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
281
282     //prepare receiver request
283     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
284         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
285
286     count_++;
287     //push request to receiver's win
288     xbt_mutex_acquire(recv_win->mut_);
289     recv_win->requests_->push_back(rreq);
290     xbt_mutex_release(recv_win->mut_);
291     //start send
292     sreq->start();
293
294     //push request to sender's win
295     xbt_mutex_acquire(mut_);
296     requests_->push_back(sreq);
297     xbt_mutex_release(mut_);
298
299   return MPI_SUCCESS;
300 }
301
302 int Win::start(MPI_Group group, int assert){
303     /* From MPI forum advices
304     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
305     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
306     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
307     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
308     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
309     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
310     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
311     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
312     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
313     must complete, without further dependencies.  */
314
315   //naive, blocking implementation.
316     int i             = 0;
317     int j             = 0;
318     int size          = group->size();
319     MPI_Request* reqs = xbt_new0(MPI_Request, size);
320
321     while (j != size) {
322       int src = group->index(j);
323       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
324         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
325         i++;
326       }
327       j++;
328   }
329   size=i;
330   Request::startall(size, reqs);
331   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
332   for(i=0;i<size;i++){
333     Request::unref(&reqs[i]);
334   }
335   xbt_free(reqs);
336   opened_++; //we're open for business !
337   group_=group;
338   group->ref();
339   return MPI_SUCCESS;
340 }
341
342 int Win::post(MPI_Group group, int assert){
343   //let's make a synchronous send here
344   int i             = 0;
345   int j             = 0;
346   int size = group->size();
347   MPI_Request* reqs = xbt_new0(MPI_Request, size);
348
349   while(j!=size){
350     int dst=group->index(j);
351     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
352       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
353       i++;
354     }
355     j++;
356   }
357   size=i;
358
359   Request::startall(size, reqs);
360   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
361   for(i=0;i<size;i++){
362     Request::unref(&reqs[i]);
363   }
364   xbt_free(reqs);
365   opened_++; //we're open for business !
366   group_=group;
367   group->ref();
368   return MPI_SUCCESS;
369 }
370
371 int Win::complete(){
372   if(opened_==0)
373     xbt_die("Complete called on already opened MPI_Win");
374
375   XBT_DEBUG("Entering MPI_Win_Complete");
376   int i             = 0;
377   int j             = 0;
378   int size = group_->size();
379   MPI_Request* reqs = xbt_new0(MPI_Request, size);
380
381   while(j!=size){
382     int dst=group_->index(j);
383     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
384       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
385       i++;
386     }
387     j++;
388   }
389   size=i;
390   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
391   Request::startall(size, reqs);
392   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
393
394   for(i=0;i<size;i++){
395     Request::unref(&reqs[i]);
396   }
397   xbt_free(reqs);
398
399   int finished = finish_comms();
400   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
401
402   Group::unref(group_);
403   opened_--; //we're closed for business !
404   return MPI_SUCCESS;
405 }
406
407 int Win::wait(){
408   //naive, blocking implementation.
409   XBT_DEBUG("Entering MPI_Win_Wait");
410   int i=0,j=0;
411   int size = group_->size();
412   MPI_Request* reqs = xbt_new0(MPI_Request, size);
413
414   while(j!=size){
415     int src=group_->index(j);
416     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
417       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
418       i++;
419     }
420     j++;
421   }
422   size=i;
423   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
424   Request::startall(size, reqs);
425   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
426   for(i=0;i<size;i++){
427     Request::unref(&reqs[i]);
428   }
429   xbt_free(reqs);
430   int finished = finish_comms();
431   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
432
433   Group::unref(group_);
434   opened_--; //we're opened for business !
435   return MPI_SUCCESS;
436 }
437
438 int Win::lock(int lock_type, int rank, int assert){
439   MPI_Win target_win = connected_wins_[rank];
440
441   //window already locked, we have to wait
442   if (lock_type == MPI_LOCK_EXCLUSIVE){
443   XBT_DEBUG("Win_lock - Entering lock %d", rank);
444     xbt_mutex_acquire(target_win->lock_mut_);
445   XBT_DEBUG("Win_lock - Released from lock %d", rank);
446 }
447
448   xbt_mutex_acquire(target_win->mut_);
449   target_win->lockers_.push_back(comm_->rank());
450   xbt_mutex_release(target_win->mut_);  
451
452   int finished = finish_comms();
453   XBT_DEBUG("Win_lock - Finished %d RMA calls", finished);
454
455   return MPI_SUCCESS;
456 }
457
458 int Win::unlock(int rank){
459   MPI_Win target_win = connected_wins_[rank];
460
461   xbt_mutex_acquire(target_win->mut_);
462   int size=target_win->lockers_.size();
463   target_win->lockers_.remove(comm_->rank());
464
465
466   if (size<=1){//0 or 1 lockers -> exclusive assumed
467     xbt_mutex_try_acquire(target_win->lock_mut_);
468     xbt_mutex_release(target_win->lock_mut_);
469   }
470   xbt_mutex_release(target_win->mut_);
471   int finished = finish_comms();
472   XBT_DEBUG("Win_unlock - Finished %d RMA calls", finished);
473
474   return MPI_SUCCESS;
475 }
476
477 Win* Win::f2c(int id){
478   return static_cast<Win*>(F2C::f2c(id));
479 }
480
481
482 int Win::finish_comms(){
483   xbt_mutex_acquire(mut_);
484   //Finish own requests
485   std::vector<MPI_Request> *reqqs = requests_;
486   int size = static_cast<int>(reqqs->size());
487   if (size > 0) {
488     // start all requests that have been prepared by another process
489     for (const auto& req : *reqqs) {
490       if (req && (req->flags() & PREPARED))
491         req->start();
492     }
493
494     MPI_Request* treqs = &(*reqqs)[0];
495     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
496     reqqs->clear();
497   }
498   xbt_mutex_release(mut_);
499   return size;
500 }
501
502
503 }
504 }