1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
30 , disp_unit_(disp_unit)
33 , connected_wins_(comm->size())
35 , allocated_(allocated)
38 XBT_DEBUG("Creating window");
39 if(info!=MPI_INFO_NULL)
41 connected_wins_[rank_] = this;
43 bar_ = new s4u::Barrier(comm->size());
46 comm->add_rma_win(this);
49 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
52 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
59 //As per the standard, perform a barrier to ensure every async comm is finished
62 int finished = finish_comms();
63 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
65 if (info_ != MPI_INFO_NULL)
66 simgrid::smpi::Info::unref(info_);
67 if (errhandler_ != MPI_ERRHANDLER_NULL)
68 simgrid::smpi::Errhandler::unref(errhandler_);
70 comm_->remove_rma_win(this);
72 colls::barrier(comm_);
81 F2C::free_f(this->c2f());
85 int Win::attach(void* /*base*/, MPI_Aint size)
87 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
89 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
94 int Win::detach(const void* /*base*/)
101 void Win::get_name(char* name, int* length) const
103 *length = static_cast<int>(name_.length());
104 if (not name_.empty()) {
105 name_.copy(name, *length);
106 name[*length] = '\0';
110 void Win::get_group(MPI_Group* group){
111 if(comm_ != MPI_COMM_NULL){
112 *group = comm_->group();
114 *group = MPI_GROUP_NULL;
120 if (info_ == MPI_INFO_NULL)
126 int Win::rank() const
131 MPI_Comm Win::comm() const
136 MPI_Aint Win::size() const
141 void* Win::base() const
146 int Win::disp_unit() const
151 int Win::dynamic() const
156 void Win::set_info(MPI_Info info)
158 if (info_ != MPI_INFO_NULL)
159 simgrid::smpi::Info::unref(info_);
161 if (info_ != MPI_INFO_NULL)
165 void Win::set_name(const char* name){
169 int Win::fence(int assert)
171 XBT_DEBUG("Entering fence");
174 if (assert != MPI_MODE_NOPRECEDE) {
175 // This is not the first fence => finalize what came before
178 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
179 // Without this, the vector could get redimensioned when another process pushes.
180 // This would result in the array used by Request::waitall() to be invalidated.
181 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
183 // start all requests that have been prepared by another process
184 if (not requests_.empty()) {
185 int size = static_cast<int>(requests_.size());
186 MPI_Request* treqs = requests_.data();
187 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
193 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
198 XBT_DEBUG("Leaving fence");
203 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
204 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
206 //get receiver pointer
207 Win* recv_win = connected_wins_[target_rank];
209 if(opened_==0){//check that post/start has been done
210 // no fence or start .. lock ok ?
212 for (auto const& it : recv_win->lockers_)
213 if (it == comm_->rank())
219 if(target_count*target_datatype->get_extent()>recv_win->size_){
220 XBT_WARN("Trying to put more than the window size - Bailing out.");
221 return MPI_ERR_RMA_RANGE;
224 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
226 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
227 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
228 // prepare send_request
230 // TODO cheinrich Check for rank / pid conversion
231 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
234 //prepare receiver request
235 // TODO cheinrich Check for rank / pid conversion
236 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
237 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
242 if(request!=nullptr){
246 requests_.push_back(sreq);
250 //push request to receiver's win
251 recv_win->mut_->lock();
252 recv_win->requests_.push_back(rreq);
254 recv_win->mut_->unlock();
256 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
257 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
259 *request = MPI_REQUEST_NULL;
265 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
266 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
269 Win* send_win = connected_wins_[target_rank];
271 if(opened_==0){//check that post/start has been done
272 // no fence or start .. lock ok ?
274 for (auto const& it : send_win->lockers_)
275 if (it == comm_->rank())
281 if(target_count*target_datatype->get_extent()>send_win->size_){
282 XBT_WARN("Trying to get more than the window size - Bailing out.");
283 return MPI_ERR_RMA_RANGE;
286 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
287 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
289 if(target_rank != comm_->rank()){
290 //prepare send_request
291 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
292 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
294 //prepare receiver request
295 MPI_Request rreq = Request::rma_recv_init(
296 origin_addr, origin_count, origin_datatype, target_rank,
297 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
298 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
300 //start the send, with another process than us as sender.
302 //push request to receiver's win
303 send_win->mut_->lock();
304 send_win->requests_.push_back(sreq);
305 send_win->mut_->unlock();
310 if(request!=nullptr){
314 requests_.push_back(rreq);
318 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
320 *request=MPI_REQUEST_NULL;
325 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
326 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
328 XBT_DEBUG("Entering MPI_Win_Accumulate");
329 //get receiver pointer
330 Win* recv_win = connected_wins_[target_rank];
332 if(opened_==0){//check that post/start has been done
333 // no fence or start .. lock ok ?
335 for (auto const& it : recv_win->lockers_)
336 if (it == comm_->rank())
341 //FIXME: local version
343 if(target_count*target_datatype->get_extent()>recv_win->size_){
344 XBT_WARN("Trying to accumulate more than the window size - Bailing out.");
345 return MPI_ERR_RMA_RANGE;
348 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
349 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
350 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
351 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
352 // prepare send_request
354 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
355 SMPI_RMA_TAG - 3 - count_, comm_, op);
357 // prepare receiver request
358 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
359 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
365 // push request to receiver's win
366 recv_win->mut_->lock();
367 recv_win->requests_.push_back(rreq);
369 recv_win->mut_->unlock();
371 if (request != nullptr) {
375 requests_.push_back(sreq);
379 XBT_DEBUG("Leaving MPI_Win_Accumulate");
383 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
384 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
385 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
388 const Win* send_win = connected_wins_[target_rank];
390 if(opened_==0){//check that post/start has been done
391 // no fence or start .. lock ok ?
393 for (auto const& it : send_win->lockers_)
394 if (it == comm_->rank())
400 if(target_count*target_datatype->get_extent()>send_win->size_){
401 XBT_WARN("Trying to get_accumulate more than the window size - Bailing out.");
402 return MPI_ERR_RMA_RANGE;
406 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
407 //need to be sure ops are correctly ordered, so finish request here ? slow.
409 send_win->atomic_mut_->lock();
410 get(result_addr, result_count, result_datatype, target_rank,
411 target_disp, target_count, target_datatype, &req);
412 if (req != MPI_REQUEST_NULL)
413 Request::wait(&req, MPI_STATUS_IGNORE);
415 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
416 target_disp, target_count, target_datatype, op, &req);
417 if (req != MPI_REQUEST_NULL)
418 Request::wait(&req, MPI_STATUS_IGNORE);
419 send_win->atomic_mut_->unlock();
423 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
424 int target_rank, MPI_Aint target_disp)
427 const Win* send_win = connected_wins_[target_rank];
429 if(opened_==0){//check that post/start has been done
430 // no fence or start .. lock ok ?
432 for (auto const& it : send_win->lockers_)
433 if (it == comm_->rank())
439 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
440 MPI_Request req = MPI_REQUEST_NULL;
441 send_win->atomic_mut_->lock();
442 get(result_addr, 1, datatype, target_rank,
443 target_disp, 1, datatype, &req);
444 if (req != MPI_REQUEST_NULL)
445 Request::wait(&req, MPI_STATUS_IGNORE);
446 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
447 put(origin_addr, 1, datatype, target_rank,
448 target_disp, 1, datatype);
450 send_win->atomic_mut_->unlock();
454 int Win::start(MPI_Group group, int /*assert*/)
456 /* From MPI forum advices
457 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
458 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
459 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
460 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
461 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
462 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
463 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
464 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
465 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
466 must complete, without further dependencies. */
468 //naive, blocking implementation.
471 int size = group->size();
472 std::vector<MPI_Request> reqs(size);
474 XBT_DEBUG("Entering MPI_Win_Start");
476 int src = comm_->group()->rank(group->actor(j));
477 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
478 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
484 Request::startall(size, reqs.data());
485 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
486 for (i = 0; i < size; i++) {
487 Request::unref(&reqs[i]);
489 opened_++; //we're open for business !
492 XBT_DEBUG("Leaving MPI_Win_Start");
496 int Win::post(MPI_Group group, int /*assert*/)
498 //let's make a synchronous send here
501 int size = group->size();
502 std::vector<MPI_Request> reqs(size);
504 XBT_DEBUG("Entering MPI_Win_Post");
506 int dst = comm_->group()->rank(group->actor(j));
507 if (dst != rank_ && dst != MPI_UNDEFINED) {
508 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
515 Request::startall(size, reqs.data());
516 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
518 Request::unref(&reqs[i]);
520 opened_++; //we're open for business !
523 XBT_DEBUG("Leaving MPI_Win_Post");
528 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
530 XBT_DEBUG("Entering MPI_Win_Complete");
533 int size = group_->size();
534 std::vector<MPI_Request> reqs(size);
537 int dst = comm_->group()->rank(group_->actor(j));
538 if (dst != rank_ && dst != MPI_UNDEFINED) {
539 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
545 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
546 Request::startall(size, reqs.data());
547 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
550 Request::unref(&reqs[i]);
553 int finished = finish_comms();
554 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
556 Group::unref(group_);
557 opened_--; //we're closed for business !
562 //naive, blocking implementation.
563 XBT_DEBUG("Entering MPI_Win_Wait");
566 int size = group_->size();
567 std::vector<MPI_Request> reqs(size);
570 int src = comm_->group()->rank(group_->actor(j));
571 if (src != rank_ && src != MPI_UNDEFINED) {
572 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
578 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
579 Request::startall(size, reqs.data());
580 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
582 Request::unref(&reqs[i]);
584 int finished = finish_comms();
585 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
587 Group::unref(group_);
588 opened_--; //we're opened for business !
592 int Win::lock(int lock_type, int rank, int /*assert*/)
594 MPI_Win target_win = connected_wins_[rank];
596 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
597 target_win->lock_mut_->lock();
598 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
599 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
600 target_win->lock_mut_->unlock();
602 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
603 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
605 target_win->lockers_.push_back(comm_->rank());
607 int finished = finish_comms(rank);
608 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
609 finished = target_win->finish_comms(rank_);
610 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
614 int Win::lock_all(int assert){
615 int retval = MPI_SUCCESS;
616 for (int i = 0; i < comm_->size(); i++) {
617 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
618 if (ret != MPI_SUCCESS)
624 int Win::unlock(int rank){
625 MPI_Win target_win = connected_wins_[rank];
626 int target_mode = target_win->mode_;
627 target_win->mode_= 0;
628 target_win->lockers_.remove(comm_->rank());
629 if (target_mode==MPI_LOCK_EXCLUSIVE){
630 target_win->lock_mut_->unlock();
633 int finished = finish_comms(rank);
634 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
635 finished = target_win->finish_comms(rank_);
636 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
640 int Win::unlock_all(){
641 int retval = MPI_SUCCESS;
642 for (int i = 0; i < comm_->size(); i++) {
643 int ret = this->unlock(i);
644 if (ret != MPI_SUCCESS)
650 int Win::flush(int rank){
651 MPI_Win target_win = connected_wins_[rank];
652 int finished = finish_comms(rank_);
653 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
654 finished = target_win->finish_comms(rank);
655 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
659 int Win::flush_local(int rank){
660 int finished = finish_comms(rank);
661 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
665 int Win::flush_all(){
666 int finished = finish_comms();
667 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
668 for (int i = 0; i < comm_->size(); i++) {
669 finished = connected_wins_[i]->finish_comms(rank_);
670 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
675 int Win::flush_local_all(){
676 int finished = finish_comms();
677 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
681 Win* Win::f2c(int id){
682 return static_cast<Win*>(F2C::f2c(id));
685 int Win::finish_comms(){
687 //Finish own requests
688 int size = static_cast<int>(requests_.size());
690 MPI_Request* treqs = requests_.data();
691 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
698 int Win::finish_comms(int rank){
700 // Finish own requests
701 // Let's see if we're either the destination or the sender of this request
702 // because we only wait for requests that we are responsible for.
703 // Also use the process id here since the request itself returns from src()
704 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
705 int proc_id = comm_->group()->actor(rank)->get_pid();
706 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
707 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
709 std::vector<MPI_Request> myreqqs(it, end(requests_));
710 requests_.erase(it, end(requests_));
711 int size = static_cast<int>(myreqqs.size());
713 MPI_Request* treqs = myreqqs.data();
714 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
721 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
723 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
724 for (int i = 0; not target_win && i < comm_->size(); i++) {
725 if (connected_wins_[i]->size_ > 0)
726 target_win = connected_wins_[i];
729 *size = target_win->size_;
730 *disp_unit = target_win->disp_unit_;
731 *static_cast<void**>(baseptr) = target_win->base_;
734 *static_cast<void**>(baseptr) = nullptr;
739 MPI_Errhandler Win::errhandler()
741 if (errhandler_ != MPI_ERRHANDLER_NULL)
746 void Win::set_errhandler(MPI_Errhandler errhandler)
748 if (errhandler_ != MPI_ERRHANDLER_NULL)
749 simgrid::smpi::Errhandler::unref(errhandler_);
750 errhandler_ = errhandler;
751 if (errhandler_ != MPI_ERRHANDLER_NULL)
755 } // namespace simgrid