Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add MPI_Win_allocate, MPI_Win_create_dynamic, MPI_Win_attach, MPI_Win_detach, MPI_Win...
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35   mode_=0;
36
37   comm->add_rma_win(this);
38
39   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
40                          MPI_BYTE, comm);
41
42   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43
44   Colls::barrier(comm);
45 }
46
47 Win::~Win(){
48   //As per the standard, perform a barrier to ensure every async comm is finished
49   MSG_barrier_wait(bar_);
50
51   int finished = finish_comms();
52   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
53
54   delete requests_;
55   delete[] connected_wins_;
56   if (name_ != nullptr){
57     xbt_free(name_);
58   }
59   if(info_!=MPI_INFO_NULL){
60     MPI_Info_free(&info_);
61   }
62
63   comm_->remove_rma_win(this);
64
65   Colls::barrier(comm_);
66   int rank=comm_->rank();
67   if(rank == 0)
68     MSG_barrier_destroy(bar_);
69   xbt_mutex_destroy(mut_);
70   xbt_mutex_destroy(lock_mut_);
71
72   if(allocated_ !=0)
73     xbt_free(base_);
74
75   cleanup_attr<Win>();
76 }
77
78 int Win::attach (void *base, MPI_Aint size){
79   if (!(base_ == MPI_BOTTOM || base_ == 0))
80     return MPI_ERR_ARG;
81   base_=0;//actually the address will be given in the RMA calls, as being the disp.
82   size_+=size;
83   return MPI_SUCCESS;
84 }
85
86 int Win::detach (void *base){
87   base_=MPI_BOTTOM;
88   size_=-1;
89   return MPI_SUCCESS;
90 }
91
92 void Win::get_name(char* name, int* length){
93   if(name_==nullptr){
94     *length=0;
95     name=nullptr;
96     return;
97   }
98   *length = strlen(name_);
99   strncpy(name, name_, *length+1);
100 }
101
102 void Win::get_group(MPI_Group* group){
103   if(comm_ != MPI_COMM_NULL){
104     *group = comm_->group();
105   } else {
106     *group = MPI_GROUP_NULL;
107   }
108 }
109
110 MPI_Info Win::info(){
111   if(info_== MPI_INFO_NULL)
112     info_ = new Info();
113   info_->ref();
114   return info_;
115 }
116
117 int Win::rank(){
118   return rank_;
119 }
120
121 MPI_Aint Win::size(){
122   return size_;
123 }
124
125 void* Win::base(){
126   return base_;
127 }
128
129 int Win::disp_unit(){
130   return disp_unit_;
131 }
132
133 void Win::set_info(MPI_Info info){
134   if(info_!= MPI_INFO_NULL)
135     info->ref();
136   info_=info;
137 }
138
139 void Win::set_name(char* name){
140   name_ = xbt_strdup(name);
141 }
142
143 int Win::fence(int assert)
144 {
145   XBT_DEBUG("Entering fence");
146   if (opened_ == 0)
147     opened_=1;
148   if (assert != MPI_MODE_NOPRECEDE) {
149     // This is not the first fence => finalize what came before
150     MSG_barrier_wait(bar_);
151     xbt_mutex_acquire(mut_);
152     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
153     // Without this, the vector could get redimensionned when another process pushes.
154     // This would result in the array used by Request::waitall() to be invalidated.
155     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
156     std::vector<MPI_Request> *reqs = requests_;
157     int size = static_cast<int>(reqs->size());
158     // start all requests that have been prepared by another process
159     if (size > 0) {
160       for (const auto& req : *reqs) {
161         if (req && (req->flags() & PREPARED))
162           req->start();
163       }
164
165       MPI_Request* treqs = &(*reqs)[0];
166
167       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
168     }
169     count_=0;
170     xbt_mutex_release(mut_);
171   }
172   assert_ = assert;
173
174   MSG_barrier_wait(bar_);
175   XBT_DEBUG("Leaving fence");
176
177   return MPI_SUCCESS;
178 }
179
180 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
181               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
182 {
183   //get receiver pointer
184   MPI_Win recv_win = connected_wins_[target_rank];
185
186   if(opened_==0){//check that post/start has been done
187     // no fence or start .. lock ok ?
188     int locked=0;
189     for(auto it : recv_win->lockers_)
190       if (it == comm_->rank())
191         locked = 1;
192     if(locked != 1)
193       return MPI_ERR_WIN;
194   }
195
196   if(target_count*target_datatype->get_extent()>recv_win->size_)
197     return MPI_ERR_ARG;
198
199   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
200   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
201
202   if(target_rank != comm_->rank()){
203     //prepare send_request
204     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
205         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
206
207     //prepare receiver request
208     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
209         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
210
211     //push request to receiver's win
212     xbt_mutex_acquire(recv_win->mut_);
213     recv_win->requests_->push_back(rreq);
214     xbt_mutex_release(recv_win->mut_);
215     //start send
216     sreq->start();
217
218     //push request to sender's win
219     xbt_mutex_acquire(mut_);
220     requests_->push_back(sreq);
221     xbt_mutex_release(mut_);
222   }else{
223     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
224   }
225
226   return MPI_SUCCESS;
227 }
228
229 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
230               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
231 {
232   //get sender pointer
233   MPI_Win send_win = connected_wins_[target_rank];
234
235   if(opened_==0){//check that post/start has been done
236     // no fence or start .. lock ok ?
237     int locked=0;
238     for(auto it : send_win->lockers_)
239       if (it == comm_->rank())
240         locked = 1;
241     if(locked != 1)
242       return MPI_ERR_WIN;
243   }
244
245   if(target_count*target_datatype->get_extent()>send_win->size_)
246     return MPI_ERR_ARG;
247
248   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
249   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
250
251   if(target_rank != comm_->rank()){
252     //prepare send_request
253     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
254         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
255         MPI_OP_NULL);
256
257     //prepare receiver request
258     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
259         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
260         MPI_OP_NULL);
261
262     //start the send, with another process than us as sender. 
263     sreq->start();
264     //push request to receiver's win
265     xbt_mutex_acquire(send_win->mut_);
266     send_win->requests_->push_back(sreq);
267     xbt_mutex_release(send_win->mut_);
268
269     //start recv
270     rreq->start();
271     //push request to sender's win
272     xbt_mutex_acquire(mut_);
273     requests_->push_back(rreq);
274     xbt_mutex_release(mut_);
275   }else{
276     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
277   }
278
279   return MPI_SUCCESS;
280 }
281
282
283 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
284               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
285 {
286
287   //get receiver pointer
288   MPI_Win recv_win = connected_wins_[target_rank];
289
290   if(opened_==0){//check that post/start has been done
291     // no fence or start .. lock ok ?
292     int locked=0;
293     for(auto it : recv_win->lockers_)
294       if (it == comm_->rank())
295         locked = 1;
296     if(locked != 1)
297       return MPI_ERR_WIN;
298   }
299   //FIXME: local version 
300
301   if(target_count*target_datatype->get_extent()>recv_win->size_)
302     return MPI_ERR_ARG;
303
304   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
305   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
306     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
307     //prepare send_request
308
309     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
310         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
311
312     //prepare receiver request
313     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
314         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
315
316     count_++;
317     //push request to receiver's win
318     xbt_mutex_acquire(recv_win->mut_);
319     recv_win->requests_->push_back(rreq);
320     xbt_mutex_release(recv_win->mut_);
321     //start send
322     sreq->start();
323
324     //push request to sender's win
325     xbt_mutex_acquire(mut_);
326     requests_->push_back(sreq);
327     xbt_mutex_release(mut_);
328
329   return MPI_SUCCESS;
330 }
331
332 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
333               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
334               MPI_Datatype target_datatype, MPI_Op op){
335
336   //get sender pointer
337   MPI_Win send_win = connected_wins_[target_rank];
338
339   if(opened_==0){//check that post/start has been done
340     // no fence or start .. lock ok ?
341     int locked=0;
342     for(auto it : send_win->lockers_)
343       if (it == comm_->rank())
344         locked = 1;
345     if(locked != 1)
346       return MPI_ERR_WIN;
347   }
348
349   if(target_count*target_datatype->get_extent()>send_win->size_)
350     return MPI_ERR_ARG;
351
352   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
353
354   get(result_addr, result_count, result_datatype, target_rank,
355               target_disp, target_count, target_datatype);
356   accumulate(origin_addr, origin_count, origin_datatype, target_rank,
357               target_disp, target_count, target_datatype, op);
358
359   return MPI_SUCCESS;
360
361 }
362
363 int Win::start(MPI_Group group, int assert){
364     /* From MPI forum advices
365     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
366     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
367     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
368     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
369     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
370     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
371     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
372     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
373     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
374     must complete, without further dependencies.  */
375
376   //naive, blocking implementation.
377     int i             = 0;
378     int j             = 0;
379     int size          = group->size();
380     MPI_Request* reqs = xbt_new0(MPI_Request, size);
381
382     while (j != size) {
383       int src = group->index(j);
384       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
385         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
386         i++;
387       }
388       j++;
389   }
390   size=i;
391   Request::startall(size, reqs);
392   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
393   for(i=0;i<size;i++){
394     Request::unref(&reqs[i]);
395   }
396   xbt_free(reqs);
397   opened_++; //we're open for business !
398   group_=group;
399   group->ref();
400   return MPI_SUCCESS;
401 }
402
403 int Win::post(MPI_Group group, int assert){
404   //let's make a synchronous send here
405   int i             = 0;
406   int j             = 0;
407   int size = group->size();
408   MPI_Request* reqs = xbt_new0(MPI_Request, size);
409
410   while(j!=size){
411     int dst=group->index(j);
412     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
413       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
414       i++;
415     }
416     j++;
417   }
418   size=i;
419
420   Request::startall(size, reqs);
421   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
422   for(i=0;i<size;i++){
423     Request::unref(&reqs[i]);
424   }
425   xbt_free(reqs);
426   opened_++; //we're open for business !
427   group_=group;
428   group->ref();
429   return MPI_SUCCESS;
430 }
431
432 int Win::complete(){
433   if(opened_==0)
434     xbt_die("Complete called on already opened MPI_Win");
435
436   XBT_DEBUG("Entering MPI_Win_Complete");
437   int i             = 0;
438   int j             = 0;
439   int size = group_->size();
440   MPI_Request* reqs = xbt_new0(MPI_Request, size);
441
442   while(j!=size){
443     int dst=group_->index(j);
444     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
445       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
446       i++;
447     }
448     j++;
449   }
450   size=i;
451   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
452   Request::startall(size, reqs);
453   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
454
455   for(i=0;i<size;i++){
456     Request::unref(&reqs[i]);
457   }
458   xbt_free(reqs);
459
460   int finished = finish_comms();
461   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
462
463   Group::unref(group_);
464   opened_--; //we're closed for business !
465   return MPI_SUCCESS;
466 }
467
468 int Win::wait(){
469   //naive, blocking implementation.
470   XBT_DEBUG("Entering MPI_Win_Wait");
471   int i             = 0;
472   int j             = 0;
473   int size          = group_->size();
474   MPI_Request* reqs = xbt_new0(MPI_Request, size);
475
476   while(j!=size){
477     int src=group_->index(j);
478     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
479       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
480       i++;
481     }
482     j++;
483   }
484   size=i;
485   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
486   Request::startall(size, reqs);
487   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
488   for(i=0;i<size;i++){
489     Request::unref(&reqs[i]);
490   }
491   xbt_free(reqs);
492   int finished = finish_comms();
493   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
494
495   Group::unref(group_);
496   opened_--; //we're opened for business !
497   return MPI_SUCCESS;
498 }
499
500 int Win::lock(int lock_type, int rank, int assert){
501   if(opened_!=0)
502     return MPI_ERR_WIN;
503
504   MPI_Win target_win = connected_wins_[rank];
505
506   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
507     xbt_mutex_acquire(target_win->lock_mut_);
508     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
509     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
510       xbt_mutex_release(target_win->lock_mut_);
511    }
512   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
513         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
514
515   target_win->lockers_.push_back(comm_->rank());
516
517   int finished = finish_comms();
518   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
519
520   return MPI_SUCCESS;
521 }
522
523 int Win::unlock(int rank){
524   if(opened_!=0)
525     return MPI_ERR_WIN;
526
527   MPI_Win target_win = connected_wins_[rank];
528   int target_mode = target_win->mode_;
529   target_win->mode_= 0;
530   target_win->lockers_.remove(comm_->rank());
531   if (target_mode==MPI_LOCK_EXCLUSIVE){
532     xbt_mutex_release(target_win->lock_mut_);
533   }
534
535   int finished = finish_comms();
536   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
537
538   return MPI_SUCCESS;
539 }
540
541 Win* Win::f2c(int id){
542   return static_cast<Win*>(F2C::f2c(id));
543 }
544
545
546 int Win::finish_comms(){
547   xbt_mutex_acquire(mut_);
548   //Finish own requests
549   std::vector<MPI_Request> *reqqs = requests_;
550   int size = static_cast<int>(reqqs->size());
551   if (size > 0) {
552     // start all requests that have been prepared by another process
553     for (const auto& req : *reqqs) {
554       if (req && (req->flags() & PREPARED))
555         req->start();
556     }
557
558     MPI_Request* treqs = &(*reqqs)[0];
559     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
560     reqqs->clear();
561   }
562   xbt_mutex_release(mut_);
563   return size;
564 }
565
566
567 }
568 }