1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
30 , disp_unit_(disp_unit)
33 , connected_wins_(comm->size())
35 , allocated_(allocated)
38 XBT_DEBUG("Creating window");
39 if(info!=MPI_INFO_NULL)
41 connected_wins_[rank_] = this;
43 bar_ = new s4u::Barrier(comm->size());
46 comm->add_rma_win(this);
49 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
52 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
58 //As per the standard, perform a barrier to ensure every async comm is finished
61 int finished = finish_comms();
62 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64 if (info_ != MPI_INFO_NULL)
65 simgrid::smpi::Info::unref(info_);
66 if (errhandler_ != MPI_ERRHANDLER_NULL)
67 simgrid::smpi::Errhandler::unref(errhandler_);
69 comm_->remove_rma_win(this);
71 colls::barrier(comm_);
83 int Win::attach(void* /*base*/, MPI_Aint size)
85 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
87 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
92 int Win::detach(const void* /*base*/)
99 void Win::get_name(char* name, int* length) const
101 *length = static_cast<int>(name_.length());
102 if (not name_.empty()) {
103 name_.copy(name, *length);
104 name[*length] = '\0';
108 void Win::get_group(MPI_Group* group){
109 if(comm_ != MPI_COMM_NULL){
110 *group = comm_->group();
112 *group = MPI_GROUP_NULL;
118 if (info_ == MPI_INFO_NULL)
124 int Win::rank() const
129 MPI_Aint Win::size() const
134 void* Win::base() const
139 int Win::disp_unit() const
144 int Win::dynamic() const
149 void Win::set_info(MPI_Info info)
151 if (info_ != MPI_INFO_NULL)
152 simgrid::smpi::Info::unref(info_);
154 if (info_ != MPI_INFO_NULL)
158 void Win::set_name(const char* name){
162 int Win::fence(int assert)
164 XBT_DEBUG("Entering fence");
167 if (assert != MPI_MODE_NOPRECEDE) {
168 // This is not the first fence => finalize what came before
171 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
172 // Without this, the vector could get redimensioned when another process pushes.
173 // This would result in the array used by Request::waitall() to be invalidated.
174 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
176 // start all requests that have been prepared by another process
177 if (not requests_.empty()) {
178 int size = static_cast<int>(requests_.size());
179 MPI_Request* treqs = requests_.data();
180 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
186 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
191 XBT_DEBUG("Leaving fence");
196 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
197 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
199 //get receiver pointer
200 Win* recv_win = connected_wins_[target_rank];
202 if(opened_==0){//check that post/start has been done
203 // no fence or start .. lock ok ?
205 for (auto const& it : recv_win->lockers_)
206 if (it == comm_->rank())
212 if(target_count*target_datatype->get_extent()>recv_win->size_)
215 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
217 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
218 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
219 // prepare send_request
221 // TODO cheinrich Check for rank / pid conversion
222 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
225 //prepare receiver request
226 // TODO cheinrich Check for rank / pid conversion
227 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
228 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
233 if(request!=nullptr){
237 requests_.push_back(sreq);
241 //push request to receiver's win
242 recv_win->mut_->lock();
243 recv_win->requests_.push_back(rreq);
245 recv_win->mut_->unlock();
247 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
248 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
250 *request = MPI_REQUEST_NULL;
256 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
257 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
260 Win* send_win = connected_wins_[target_rank];
262 if(opened_==0){//check that post/start has been done
263 // no fence or start .. lock ok ?
265 for (auto const& it : send_win->lockers_)
266 if (it == comm_->rank())
272 if(target_count*target_datatype->get_extent()>send_win->size_)
275 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
276 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
278 if(target_rank != comm_->rank()){
279 //prepare send_request
280 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
281 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
283 //prepare receiver request
284 MPI_Request rreq = Request::rma_recv_init(
285 origin_addr, origin_count, origin_datatype, target_rank,
286 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
287 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
289 //start the send, with another process than us as sender.
291 //push request to receiver's win
292 send_win->mut_->lock();
293 send_win->requests_.push_back(sreq);
294 send_win->mut_->unlock();
299 if(request!=nullptr){
303 requests_.push_back(rreq);
307 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
309 *request=MPI_REQUEST_NULL;
314 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
315 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
317 XBT_DEBUG("Entering MPI_Win_Accumulate");
318 //get receiver pointer
319 Win* recv_win = connected_wins_[target_rank];
321 if(opened_==0){//check that post/start has been done
322 // no fence or start .. lock ok ?
324 for (auto const& it : recv_win->lockers_)
325 if (it == comm_->rank())
330 //FIXME: local version
332 if(target_count*target_datatype->get_extent()>recv_win->size_)
335 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
336 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
337 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
338 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
339 // prepare send_request
341 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
342 SMPI_RMA_TAG - 3 - count_, comm_, op);
344 // prepare receiver request
345 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
346 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
352 // push request to receiver's win
353 recv_win->mut_->lock();
354 recv_win->requests_.push_back(rreq);
356 recv_win->mut_->unlock();
358 if (request != nullptr) {
362 requests_.push_back(sreq);
366 XBT_DEBUG("Leaving MPI_Win_Accumulate");
370 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
371 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
372 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
375 const Win* send_win = connected_wins_[target_rank];
377 if(opened_==0){//check that post/start has been done
378 // no fence or start .. lock ok ?
380 for (auto const& it : send_win->lockers_)
381 if (it == comm_->rank())
387 if(target_count*target_datatype->get_extent()>send_win->size_)
390 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
391 //need to be sure ops are correctly ordered, so finish request here ? slow.
393 send_win->atomic_mut_->lock();
394 get(result_addr, result_count, result_datatype, target_rank,
395 target_disp, target_count, target_datatype, &req);
396 if (req != MPI_REQUEST_NULL)
397 Request::wait(&req, MPI_STATUS_IGNORE);
399 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
400 target_disp, target_count, target_datatype, op, &req);
401 if (req != MPI_REQUEST_NULL)
402 Request::wait(&req, MPI_STATUS_IGNORE);
403 send_win->atomic_mut_->unlock();
407 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
408 int target_rank, MPI_Aint target_disp)
411 const Win* send_win = connected_wins_[target_rank];
413 if(opened_==0){//check that post/start has been done
414 // no fence or start .. lock ok ?
416 for (auto const& it : send_win->lockers_)
417 if (it == comm_->rank())
423 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
424 MPI_Request req = MPI_REQUEST_NULL;
425 send_win->atomic_mut_->lock();
426 get(result_addr, 1, datatype, target_rank,
427 target_disp, 1, datatype, &req);
428 if (req != MPI_REQUEST_NULL)
429 Request::wait(&req, MPI_STATUS_IGNORE);
430 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
431 put(origin_addr, 1, datatype, target_rank,
432 target_disp, 1, datatype);
434 send_win->atomic_mut_->unlock();
438 int Win::start(MPI_Group group, int /*assert*/)
440 /* From MPI forum advices
441 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
442 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
443 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
444 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
445 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
446 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
447 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
448 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
449 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
450 must complete, without further dependencies. */
452 //naive, blocking implementation.
455 int size = group->size();
456 std::vector<MPI_Request> reqs(size);
458 XBT_DEBUG("Entering MPI_Win_Start");
460 int src = comm_->group()->rank(group->actor(j));
461 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
462 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
468 Request::startall(size, reqs.data());
469 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
470 for (i = 0; i < size; i++) {
471 Request::unref(&reqs[i]);
473 opened_++; //we're open for business !
476 XBT_DEBUG("Leaving MPI_Win_Start");
480 int Win::post(MPI_Group group, int /*assert*/)
482 //let's make a synchronous send here
485 int size = group->size();
486 std::vector<MPI_Request> reqs(size);
488 XBT_DEBUG("Entering MPI_Win_Post");
490 int dst = comm_->group()->rank(group->actor(j));
491 if (dst != rank_ && dst != MPI_UNDEFINED) {
492 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
499 Request::startall(size, reqs.data());
500 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
502 Request::unref(&reqs[i]);
504 opened_++; //we're open for business !
507 XBT_DEBUG("Leaving MPI_Win_Post");
513 xbt_die("Complete called on already opened MPI_Win");
515 XBT_DEBUG("Entering MPI_Win_Complete");
518 int size = group_->size();
519 std::vector<MPI_Request> reqs(size);
522 int dst = comm_->group()->rank(group_->actor(j));
523 if (dst != rank_ && dst != MPI_UNDEFINED) {
524 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
530 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
531 Request::startall(size, reqs.data());
532 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
535 Request::unref(&reqs[i]);
538 int finished = finish_comms();
539 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
541 Group::unref(group_);
542 opened_--; //we're closed for business !
547 //naive, blocking implementation.
548 XBT_DEBUG("Entering MPI_Win_Wait");
551 int size = group_->size();
552 std::vector<MPI_Request> reqs(size);
555 int src = comm_->group()->rank(group_->actor(j));
556 if (src != rank_ && src != MPI_UNDEFINED) {
557 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
563 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
564 Request::startall(size, reqs.data());
565 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
567 Request::unref(&reqs[i]);
569 int finished = finish_comms();
570 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
572 Group::unref(group_);
573 opened_--; //we're opened for business !
577 int Win::lock(int lock_type, int rank, int /*assert*/)
579 MPI_Win target_win = connected_wins_[rank];
581 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
582 target_win->lock_mut_->lock();
583 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
584 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
585 target_win->lock_mut_->unlock();
587 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
588 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
590 target_win->lockers_.push_back(comm_->rank());
592 int finished = finish_comms(rank);
593 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
594 finished = target_win->finish_comms(rank_);
595 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
599 int Win::lock_all(int assert){
600 int retval = MPI_SUCCESS;
601 for (int i = 0; i < comm_->size(); i++) {
602 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
603 if (ret != MPI_SUCCESS)
609 int Win::unlock(int rank){
610 MPI_Win target_win = connected_wins_[rank];
611 int target_mode = target_win->mode_;
612 target_win->mode_= 0;
613 target_win->lockers_.remove(comm_->rank());
614 if (target_mode==MPI_LOCK_EXCLUSIVE){
615 target_win->lock_mut_->unlock();
618 int finished = finish_comms(rank);
619 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
620 finished = target_win->finish_comms(rank_);
621 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
625 int Win::unlock_all(){
626 int retval = MPI_SUCCESS;
627 for (int i = 0; i < comm_->size(); i++) {
628 int ret = this->unlock(i);
629 if (ret != MPI_SUCCESS)
635 int Win::flush(int rank){
636 MPI_Win target_win = connected_wins_[rank];
637 int finished = finish_comms(rank_);
638 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
639 finished = target_win->finish_comms(rank);
640 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
644 int Win::flush_local(int rank){
645 int finished = finish_comms(rank);
646 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
650 int Win::flush_all(){
651 int finished = finish_comms();
652 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
653 for (int i = 0; i < comm_->size(); i++) {
654 finished = connected_wins_[i]->finish_comms(rank_);
655 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
660 int Win::flush_local_all(){
661 int finished = finish_comms();
662 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
666 Win* Win::f2c(int id){
667 return static_cast<Win*>(F2C::f2c(id));
670 int Win::finish_comms(){
672 //Finish own requests
673 int size = static_cast<int>(requests_.size());
675 MPI_Request* treqs = requests_.data();
676 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
683 int Win::finish_comms(int rank){
685 // Finish own requests
686 // Let's see if we're either the destination or the sender of this request
687 // because we only wait for requests that we are responsible for.
688 // Also use the process id here since the request itself returns from src()
689 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
690 int proc_id = comm_->group()->actor(rank)->get_pid();
691 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
692 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
694 std::vector<MPI_Request> myreqqs(it, end(requests_));
695 requests_.erase(it, end(requests_));
696 int size = static_cast<int>(myreqqs.size());
698 MPI_Request* treqs = myreqqs.data();
699 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
706 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
708 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
709 for (int i = 0; not target_win && i < comm_->size(); i++) {
710 if (connected_wins_[i]->size_ > 0)
711 target_win = connected_wins_[i];
714 *size = target_win->size_;
715 *disp_unit = target_win->disp_unit_;
716 *static_cast<void**>(baseptr) = target_win->base_;
719 *static_cast<void**>(baseptr) = nullptr;
724 MPI_Errhandler Win::errhandler()
726 if (errhandler_ != MPI_ERRHANDLER_NULL)
731 void Win::set_errhandler(MPI_Errhandler errhandler)
733 if (errhandler_ != MPI_ERRHANDLER_NULL)
734 simgrid::smpi::Errhandler::unref(errhandler_);
735 errhandler_ = errhandler;
736 if (errhandler_ != MPI_ERRHANDLER_NULL)
740 } // namespace simgrid