1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
30 , disp_unit_(disp_unit)
33 , connected_wins_(comm->size())
35 , allocated_(allocated)
38 XBT_DEBUG("Creating window");
39 if(info!=MPI_INFO_NULL)
41 connected_wins_[rank_] = this;
43 bar_ = new s4u::Barrier(comm->size());
46 comm->add_rma_win(this);
49 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
52 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
59 //As per the standard, perform a barrier to ensure every async comm is finished
62 int finished = finish_comms();
63 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
65 if (info_ != MPI_INFO_NULL)
66 simgrid::smpi::Info::unref(info_);
67 if (errhandler_ != MPI_ERRHANDLER_NULL)
68 simgrid::smpi::Errhandler::unref(errhandler_);
70 comm_->remove_rma_win(this);
72 colls::barrier(comm_);
81 F2C::free_f(this->f2c_id());
85 int Win::attach(void* /*base*/, MPI_Aint size)
87 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
89 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
94 int Win::detach(const void* /*base*/)
101 void Win::get_name(char* name, int* length) const
103 *length = static_cast<int>(name_.length());
104 if (not name_.empty()) {
105 name_.copy(name, *length);
106 name[*length] = '\0';
110 void Win::get_group(MPI_Group* group){
111 if(comm_ != MPI_COMM_NULL){
112 *group = comm_->group();
114 *group = MPI_GROUP_NULL;
120 if (info_ == MPI_INFO_NULL)
126 int Win::rank() const
131 MPI_Comm Win::comm() const
136 MPI_Aint Win::size() const
141 void* Win::base() const
146 int Win::disp_unit() const
151 int Win::dynamic() const
156 void Win::set_info(MPI_Info info)
158 if (info_ != MPI_INFO_NULL)
159 simgrid::smpi::Info::unref(info_);
161 if (info_ != MPI_INFO_NULL)
165 void Win::set_name(const char* name){
169 int Win::fence(int assert)
171 XBT_DEBUG("Entering fence");
174 if (assert != MPI_MODE_NOPRECEDE) {
175 // This is not the first fence => finalize what came before
178 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
179 // Without this, the vector could get redimensioned when another process pushes.
180 // This would result in the array used by Request::waitall() to be invalidated.
181 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
183 // start all requests that have been prepared by another process
184 if (not requests_.empty()) {
185 int size = static_cast<int>(requests_.size());
186 MPI_Request* treqs = requests_.data();
187 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
193 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
198 XBT_DEBUG("Leaving fence");
203 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
204 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
206 //get receiver pointer
207 Win* recv_win = connected_wins_[target_rank];
209 if(opened_==0){//check that post/start has been done
210 // no fence or start .. lock ok ?
212 for (auto const& it : recv_win->lockers_)
213 if (it == comm_->rank())
219 if(target_count*target_datatype->get_extent()>recv_win->size_){
220 XBT_WARN("MPI_Put: Trying to put %ld, which is more than the window size on target process %d : %ld - Bailing out.",
221 target_count*target_datatype->get_extent(), target_rank, recv_win->size_);
222 return MPI_ERR_RMA_RANGE;
225 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
227 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
228 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
229 // prepare send_request
231 // TODO cheinrich Check for rank / pid conversion
232 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
235 //prepare receiver request
236 // TODO cheinrich Check for rank / pid conversion
237 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
238 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
243 if(request!=nullptr){
247 requests_.push_back(sreq);
251 //push request to receiver's win
252 recv_win->mut_->lock();
253 recv_win->requests_.push_back(rreq);
255 recv_win->mut_->unlock();
257 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
258 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
260 *request = MPI_REQUEST_NULL;
266 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
267 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
270 Win* send_win = connected_wins_[target_rank];
272 if(opened_==0){//check that post/start has been done
273 // no fence or start .. lock ok ?
275 for (auto const& it : send_win->lockers_)
276 if (it == comm_->rank())
282 if(target_count*target_datatype->get_extent()>send_win->size_){
283 XBT_WARN("MPI_Get: Trying to get %ld, which is more than the window size on target process %d : %ld - Bailing out.",
284 target_count*target_datatype->get_extent(), target_rank, send_win->size_);
285 return MPI_ERR_RMA_RANGE;
288 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
289 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
291 if(target_rank != comm_->rank()){
292 //prepare send_request
293 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
294 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
296 //prepare receiver request
297 MPI_Request rreq = Request::rma_recv_init(
298 origin_addr, origin_count, origin_datatype, target_rank,
299 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
300 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
302 //start the send, with another process than us as sender.
304 //push request to receiver's win
305 send_win->mut_->lock();
306 send_win->requests_.push_back(sreq);
307 send_win->mut_->unlock();
312 if(request!=nullptr){
316 requests_.push_back(rreq);
320 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
322 *request=MPI_REQUEST_NULL;
327 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
328 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
330 XBT_DEBUG("Entering MPI_Win_Accumulate");
331 //get receiver pointer
332 Win* recv_win = connected_wins_[target_rank];
334 if(opened_==0){//check that post/start has been done
335 // no fence or start .. lock ok ?
337 for (auto const& it : recv_win->lockers_)
338 if (it == comm_->rank())
343 //FIXME: local version
345 if(target_count*target_datatype->get_extent()>recv_win->size_){
346 XBT_WARN("MPI_Accumulate: Trying to accumulate %ld, which is more than the window size on target process %d : %ld - Bailing out.",
347 target_count*target_datatype->get_extent(), target_rank, recv_win->size_);
348 return MPI_ERR_RMA_RANGE;
351 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
352 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
353 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
354 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
355 // prepare send_request
357 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
358 SMPI_RMA_TAG - 3 - count_, comm_, op);
360 // prepare receiver request
361 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
362 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
368 // push request to receiver's win
369 recv_win->mut_->lock();
370 recv_win->requests_.push_back(rreq);
372 recv_win->mut_->unlock();
374 if (request != nullptr) {
378 requests_.push_back(sreq);
382 XBT_DEBUG("Leaving MPI_Win_Accumulate");
386 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
387 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
388 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
391 const Win* send_win = connected_wins_[target_rank];
393 if(opened_==0){//check that post/start has been done
394 // no fence or start .. lock ok ?
396 for (auto const& it : send_win->lockers_)
397 if (it == comm_->rank())
403 if(target_count*target_datatype->get_extent()>send_win->size_){
404 XBT_WARN("MPI_Get_accumulate: Trying to get_accumulate %ld, which is more than the window size on target process %d : %ld - Bailing out.",
405 target_count*target_datatype->get_extent(), target_rank, send_win->size_);
406 return MPI_ERR_RMA_RANGE;
410 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
411 //need to be sure ops are correctly ordered, so finish request here ? slow.
413 send_win->atomic_mut_->lock();
414 get(result_addr, result_count, result_datatype, target_rank,
415 target_disp, target_count, target_datatype, &req);
416 if (req != MPI_REQUEST_NULL)
417 Request::wait(&req, MPI_STATUS_IGNORE);
419 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
420 target_disp, target_count, target_datatype, op, &req);
421 if (req != MPI_REQUEST_NULL)
422 Request::wait(&req, MPI_STATUS_IGNORE);
423 send_win->atomic_mut_->unlock();
427 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
428 int target_rank, MPI_Aint target_disp)
431 const Win* send_win = connected_wins_[target_rank];
433 if(opened_==0){//check that post/start has been done
434 // no fence or start .. lock ok ?
436 for (auto const& it : send_win->lockers_)
437 if (it == comm_->rank())
443 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
444 MPI_Request req = MPI_REQUEST_NULL;
445 send_win->atomic_mut_->lock();
446 get(result_addr, 1, datatype, target_rank,
447 target_disp, 1, datatype, &req);
448 if (req != MPI_REQUEST_NULL)
449 Request::wait(&req, MPI_STATUS_IGNORE);
450 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
451 put(origin_addr, 1, datatype, target_rank,
452 target_disp, 1, datatype);
454 send_win->atomic_mut_->unlock();
458 int Win::start(MPI_Group group, int /*assert*/)
460 /* From MPI forum advices
461 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
462 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
463 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
464 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
465 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
466 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
467 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
468 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
469 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
470 must complete, without further dependencies. */
472 //naive, blocking implementation.
475 int size = group->size();
476 std::vector<MPI_Request> reqs(size);
478 XBT_DEBUG("Entering MPI_Win_Start");
480 int src = comm_->group()->rank(group->actor(j));
481 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
482 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
488 Request::startall(size, reqs.data());
489 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
490 for (i = 0; i < size; i++) {
491 Request::unref(&reqs[i]);
493 opened_++; //we're open for business !
496 XBT_DEBUG("Leaving MPI_Win_Start");
500 int Win::post(MPI_Group group, int /*assert*/)
502 //let's make a synchronous send here
505 int size = group->size();
506 std::vector<MPI_Request> reqs(size);
508 XBT_DEBUG("Entering MPI_Win_Post");
510 int dst = comm_->group()->rank(group->actor(j));
511 if (dst != rank_ && dst != MPI_UNDEFINED) {
512 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
519 Request::startall(size, reqs.data());
520 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
522 Request::unref(&reqs[i]);
524 opened_++; //we're open for business !
527 XBT_DEBUG("Leaving MPI_Win_Post");
532 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
534 XBT_DEBUG("Entering MPI_Win_Complete");
537 int size = group_->size();
538 std::vector<MPI_Request> reqs(size);
541 int dst = comm_->group()->rank(group_->actor(j));
542 if (dst != rank_ && dst != MPI_UNDEFINED) {
543 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
549 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
550 Request::startall(size, reqs.data());
551 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
554 Request::unref(&reqs[i]);
557 int finished = finish_comms();
558 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
560 Group::unref(group_);
561 opened_--; //we're closed for business !
566 //naive, blocking implementation.
567 XBT_DEBUG("Entering MPI_Win_Wait");
570 int size = group_->size();
571 std::vector<MPI_Request> reqs(size);
574 int src = comm_->group()->rank(group_->actor(j));
575 if (src != rank_ && src != MPI_UNDEFINED) {
576 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
582 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
583 Request::startall(size, reqs.data());
584 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
586 Request::unref(&reqs[i]);
588 int finished = finish_comms();
589 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
591 Group::unref(group_);
592 opened_--; //we're opened for business !
596 int Win::lock(int lock_type, int rank, int /*assert*/)
598 MPI_Win target_win = connected_wins_[rank];
600 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
601 target_win->lock_mut_->lock();
602 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
603 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
604 target_win->lock_mut_->unlock();
606 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
607 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
609 target_win->lockers_.push_back(comm_->rank());
611 int finished = finish_comms(rank);
612 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
613 finished = target_win->finish_comms(rank_);
614 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
618 int Win::lock_all(int assert){
619 int retval = MPI_SUCCESS;
620 for (int i = 0; i < comm_->size(); i++) {
621 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
622 if (ret != MPI_SUCCESS)
628 int Win::unlock(int rank){
629 MPI_Win target_win = connected_wins_[rank];
630 int target_mode = target_win->mode_;
631 target_win->mode_= 0;
632 target_win->lockers_.remove(comm_->rank());
633 if (target_mode==MPI_LOCK_EXCLUSIVE){
634 target_win->lock_mut_->unlock();
637 int finished = finish_comms(rank);
638 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
639 finished = target_win->finish_comms(rank_);
640 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
644 int Win::unlock_all(){
645 int retval = MPI_SUCCESS;
646 for (int i = 0; i < comm_->size(); i++) {
647 int ret = this->unlock(i);
648 if (ret != MPI_SUCCESS)
654 int Win::flush(int rank){
655 MPI_Win target_win = connected_wins_[rank];
656 int finished = finish_comms(rank_);
657 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
658 finished = target_win->finish_comms(rank);
659 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
663 int Win::flush_local(int rank){
664 int finished = finish_comms(rank);
665 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
669 int Win::flush_all(){
670 int finished = finish_comms();
671 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
672 for (int i = 0; i < comm_->size(); i++) {
673 finished = connected_wins_[i]->finish_comms(rank_);
674 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
679 int Win::flush_local_all(){
680 int finished = finish_comms();
681 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
685 Win* Win::f2c(int id){
686 return static_cast<Win*>(F2C::f2c(id));
689 int Win::finish_comms(){
691 //Finish own requests
692 int size = static_cast<int>(requests_.size());
694 MPI_Request* treqs = requests_.data();
695 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
702 int Win::finish_comms(int rank){
704 // Finish own requests
705 // Let's see if we're either the destination or the sender of this request
706 // because we only wait for requests that we are responsible for.
707 // Also use the process id here since the request itself returns from src()
708 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
709 int proc_id = comm_->group()->actor(rank)->get_pid();
710 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
711 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
713 std::vector<MPI_Request> myreqqs(it, end(requests_));
714 requests_.erase(it, end(requests_));
715 int size = static_cast<int>(myreqqs.size());
717 MPI_Request* treqs = myreqqs.data();
718 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
725 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
727 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
728 for (int i = 0; not target_win && i < comm_->size(); i++) {
729 if (connected_wins_[i]->size_ > 0)
730 target_win = connected_wins_[i];
733 *size = target_win->size_;
734 *disp_unit = target_win->disp_unit_;
735 *static_cast<void**>(baseptr) = target_win->base_;
738 *static_cast<void**>(baseptr) = nullptr;
743 MPI_Errhandler Win::errhandler()
745 if (errhandler_ != MPI_ERRHANDLER_NULL)
750 void Win::set_errhandler(MPI_Errhandler errhandler)
752 if (errhandler_ != MPI_ERRHANDLER_NULL)
753 simgrid::smpi::Errhandler::unref(errhandler_);
754 errhandler_ = errhandler;
755 if (errhandler_ != MPI_ERRHANDLER_NULL)
759 } // namespace simgrid