1 /* Copyright (c) 2007-2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 atomic_mut_=xbt_mutex_init();
30 connected_wins_ = new MPI_Win[comm_size];
31 connected_wins_[rank_] = this;
34 bar_ = MSG_barrier_init(comm_size);
38 comm->add_rma_win(this);
40 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
43 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
49 //As per the standard, perform a barrier to ensure every async comm is finished
50 MSG_barrier_wait(bar_);
52 int finished = finish_comms();
53 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
56 delete[] connected_wins_;
57 if (name_ != nullptr){
60 if(info_!=MPI_INFO_NULL){
61 MPI_Info_free(&info_);
64 comm_->remove_rma_win(this);
66 Colls::barrier(comm_);
67 int rank=comm_->rank();
69 MSG_barrier_destroy(bar_);
70 xbt_mutex_destroy(mut_);
71 xbt_mutex_destroy(lock_mut_);
72 xbt_mutex_destroy(atomic_mut_);
80 int Win::attach (void *base, MPI_Aint size){
81 if (!(base_ == MPI_BOTTOM || base_ == 0))
83 base_=0;//actually the address will be given in the RMA calls, as being the disp.
88 int Win::detach (void *base){
94 void Win::get_name(char* name, int* length){
100 *length = strlen(name_);
101 strncpy(name, name_, *length+1);
104 void Win::get_group(MPI_Group* group){
105 if(comm_ != MPI_COMM_NULL){
106 *group = comm_->group();
108 *group = MPI_GROUP_NULL;
112 MPI_Info Win::info(){
113 if(info_== MPI_INFO_NULL)
123 MPI_Aint Win::size(){
131 int Win::disp_unit(){
139 void Win::set_info(MPI_Info info){
140 if(info_!= MPI_INFO_NULL)
145 void Win::set_name(char* name){
146 name_ = xbt_strdup(name);
149 int Win::fence(int assert)
151 XBT_DEBUG("Entering fence");
154 if (assert != MPI_MODE_NOPRECEDE) {
155 // This is not the first fence => finalize what came before
156 MSG_barrier_wait(bar_);
157 xbt_mutex_acquire(mut_);
158 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
159 // Without this, the vector could get redimensionned when another process pushes.
160 // This would result in the array used by Request::waitall() to be invalidated.
161 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
162 std::vector<MPI_Request> *reqs = requests_;
163 int size = static_cast<int>(reqs->size());
164 // start all requests that have been prepared by another process
166 MPI_Request* treqs = &(*reqs)[0];
167 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
170 xbt_mutex_release(mut_);
173 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
177 MSG_barrier_wait(bar_);
178 XBT_DEBUG("Leaving fence");
183 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
184 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
186 //get receiver pointer
187 MPI_Win recv_win = connected_wins_[target_rank];
189 if(opened_==0){//check that post/start has been done
190 // no fence or start .. lock ok ?
192 for(auto it : recv_win->lockers_)
193 if (it == comm_->rank())
199 if(target_count*target_datatype->get_extent()>recv_win->size_)
202 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
203 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
205 if(target_rank != comm_->rank()){
206 //prepare send_request
207 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
208 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
210 //prepare receiver request
211 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
212 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
217 if(request!=nullptr){
220 xbt_mutex_acquire(mut_);
221 requests_->push_back(sreq);
222 xbt_mutex_release(mut_);
225 //push request to receiver's win
226 xbt_mutex_acquire(recv_win->mut_);
227 recv_win->requests_->push_back(rreq);
229 xbt_mutex_release(recv_win->mut_);
232 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
234 *request = MPI_REQUEST_NULL;
240 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
241 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
244 MPI_Win send_win = connected_wins_[target_rank];
246 if(opened_==0){//check that post/start has been done
247 // no fence or start .. lock ok ?
249 for(auto it : send_win->lockers_)
250 if (it == comm_->rank())
256 if(target_count*target_datatype->get_extent()>send_win->size_)
259 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
260 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
262 if(target_rank != comm_->rank()){
263 //prepare send_request
264 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
265 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
268 //prepare receiver request
269 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
270 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
273 //start the send, with another process than us as sender.
275 //push request to receiver's win
276 xbt_mutex_acquire(send_win->mut_);
277 send_win->requests_->push_back(sreq);
278 xbt_mutex_release(send_win->mut_);
283 if(request!=nullptr){
286 xbt_mutex_acquire(mut_);
287 requests_->push_back(rreq);
288 xbt_mutex_release(mut_);
292 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
294 *request=MPI_REQUEST_NULL;
301 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
302 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
305 //get receiver pointer
306 MPI_Win recv_win = connected_wins_[target_rank];
308 if(opened_==0){//check that post/start has been done
309 // no fence or start .. lock ok ?
311 for(auto it : recv_win->lockers_)
312 if (it == comm_->rank())
317 //FIXME: local version
319 if(target_count*target_datatype->get_extent()>recv_win->size_)
322 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
323 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
324 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
325 //prepare send_request
327 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
328 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
330 //prepare receiver request
331 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
332 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
338 //push request to receiver's win
339 xbt_mutex_acquire(recv_win->mut_);
340 recv_win->requests_->push_back(rreq);
342 xbt_mutex_release(recv_win->mut_);
344 if(request!=nullptr){
347 xbt_mutex_acquire(mut_);
348 requests_->push_back(sreq);
349 xbt_mutex_release(mut_);
355 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
356 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
357 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
360 MPI_Win send_win = connected_wins_[target_rank];
362 if(opened_==0){//check that post/start has been done
363 // no fence or start .. lock ok ?
365 for(auto it : send_win->lockers_)
366 if (it == comm_->rank())
372 if(target_count*target_datatype->get_extent()>send_win->size_)
375 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
376 //need to be sure ops are correctly ordered, so finish request here ? slow.
378 xbt_mutex_acquire(send_win->atomic_mut_);
379 get(result_addr, result_count, result_datatype, target_rank,
380 target_disp, target_count, target_datatype, &req);
381 if (req != MPI_REQUEST_NULL)
382 Request::wait(&req, MPI_STATUS_IGNORE);
384 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
385 target_disp, target_count, target_datatype, op, &req);
386 if (req != MPI_REQUEST_NULL)
387 Request::wait(&req, MPI_STATUS_IGNORE);
388 xbt_mutex_release(send_win->atomic_mut_);
393 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
394 void *result_addr, MPI_Datatype datatype, int target_rank,
395 MPI_Aint target_disp){
397 MPI_Win send_win = connected_wins_[target_rank];
399 if(opened_==0){//check that post/start has been done
400 // no fence or start .. lock ok ?
402 for(auto it : send_win->lockers_)
403 if (it == comm_->rank())
409 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
411 xbt_mutex_acquire(send_win->atomic_mut_);
412 get(result_addr, 1, datatype, target_rank,
413 target_disp, 1, datatype, &req);
414 if (req != MPI_REQUEST_NULL)
415 Request::wait(&req, MPI_STATUS_IGNORE);
416 if(! memcmp (result_addr, compare_addr, datatype->get_extent() )){
417 put(origin_addr, 1, datatype, target_rank,
418 target_disp, 1, datatype);
420 xbt_mutex_release(send_win->atomic_mut_);
424 int Win::start(MPI_Group group, int assert){
425 /* From MPI forum advices
426 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
427 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
428 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
429 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
430 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
431 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
432 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
433 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
434 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
435 must complete, without further dependencies. */
437 //naive, blocking implementation.
440 int size = group->size();
441 MPI_Request* reqs = xbt_new0(MPI_Request, size);
444 int src = group->index(j);
445 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
446 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
452 Request::startall(size, reqs);
453 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
455 Request::unref(&reqs[i]);
458 opened_++; //we're open for business !
464 int Win::post(MPI_Group group, int assert){
465 //let's make a synchronous send here
468 int size = group->size();
469 MPI_Request* reqs = xbt_new0(MPI_Request, size);
472 int dst=group->index(j);
473 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
474 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
481 Request::startall(size, reqs);
482 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
484 Request::unref(&reqs[i]);
487 opened_++; //we're open for business !
495 xbt_die("Complete called on already opened MPI_Win");
497 XBT_DEBUG("Entering MPI_Win_Complete");
500 int size = group_->size();
501 MPI_Request* reqs = xbt_new0(MPI_Request, size);
504 int dst=group_->index(j);
505 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
506 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
512 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
513 Request::startall(size, reqs);
514 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
517 Request::unref(&reqs[i]);
521 int finished = finish_comms();
522 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
524 Group::unref(group_);
525 opened_--; //we're closed for business !
530 //naive, blocking implementation.
531 XBT_DEBUG("Entering MPI_Win_Wait");
534 int size = group_->size();
535 MPI_Request* reqs = xbt_new0(MPI_Request, size);
538 int src=group_->index(j);
539 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
540 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
546 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
547 Request::startall(size, reqs);
548 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
550 Request::unref(&reqs[i]);
553 int finished = finish_comms();
554 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
556 Group::unref(group_);
557 opened_--; //we're opened for business !
561 int Win::lock(int lock_type, int rank, int assert){
562 MPI_Win target_win = connected_wins_[rank];
564 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
565 xbt_mutex_acquire(target_win->lock_mut_);
566 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
567 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
568 xbt_mutex_release(target_win->lock_mut_);
570 } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
571 target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
573 target_win->lockers_.push_back(comm_->rank());
575 int finished = finish_comms(rank);
576 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
577 finished = target_win->finish_comms(rank_);
578 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
582 int Win::lock_all(int assert){
584 int retval = MPI_SUCCESS;
585 for (i=0; i<comm_->size();i++){
586 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
587 if(ret != MPI_SUCCESS)
593 int Win::unlock(int rank){
594 MPI_Win target_win = connected_wins_[rank];
595 int target_mode = target_win->mode_;
596 target_win->mode_= 0;
597 target_win->lockers_.remove(comm_->rank());
598 if (target_mode==MPI_LOCK_EXCLUSIVE){
599 xbt_mutex_release(target_win->lock_mut_);
602 int finished = finish_comms(rank);
603 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
604 finished = target_win->finish_comms(rank_);
605 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
609 int Win::unlock_all(){
611 int retval = MPI_SUCCESS;
612 for (i=0; i<comm_->size();i++){
613 int ret = this->unlock(i);
614 if(ret != MPI_SUCCESS)
620 int Win::flush(int rank){
621 MPI_Win target_win = connected_wins_[rank];
622 int finished = finish_comms(rank);
623 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
624 finished = target_win->finish_comms(rank_);
625 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
629 int Win::flush_local(int rank){
630 int finished = finish_comms(rank);
631 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
635 int Win::flush_all(){
638 finished = finish_comms();
639 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
640 for (i=0; i<comm_->size();i++){
641 finished = connected_wins_[i]->finish_comms(rank_);
642 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
647 int Win::flush_local_all(){
648 int finished = finish_comms();
649 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
653 Win* Win::f2c(int id){
654 return static_cast<Win*>(F2C::f2c(id));
658 int Win::finish_comms(){
659 xbt_mutex_acquire(mut_);
660 //Finish own requests
661 std::vector<MPI_Request> *reqqs = requests_;
662 int size = static_cast<int>(reqqs->size());
664 MPI_Request* treqs = &(*reqqs)[0];
665 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
668 xbt_mutex_release(mut_);
672 int Win::finish_comms(int rank){
673 xbt_mutex_acquire(mut_);
674 //Finish own requests
675 std::vector<MPI_Request> *reqqs = requests_;
676 int size = static_cast<int>(reqqs->size());
679 std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
680 std::vector<MPI_Request>::iterator iter = reqqs->begin();
681 while (iter != reqqs->end()){
682 if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
683 myreqqs->push_back(*iter);
684 iter = reqqs->erase(iter);
691 MPI_Request* treqs = &(*myreqqs)[0];
692 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
697 xbt_mutex_release(mut_);