1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
30 , disp_unit_(disp_unit)
33 , connected_wins_(comm->size())
35 , allocated_(allocated)
38 XBT_DEBUG("Creating window");
39 if(info!=MPI_INFO_NULL)
41 connected_wins_[rank_] = this;
43 bar_ = new s4u::Barrier(comm->size());
46 comm->add_rma_win(this);
49 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
52 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
58 //As per the standard, perform a barrier to ensure every async comm is finished
61 int finished = finish_comms();
62 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64 if (info_ != MPI_INFO_NULL)
65 simgrid::smpi::Info::unref(info_);
66 if (errhandler_ != MPI_ERRHANDLER_NULL)
67 simgrid::smpi::Errhandler::unref(errhandler_);
69 comm_->remove_rma_win(this);
71 colls::barrier(comm_);
83 int Win::attach(void* /*base*/, MPI_Aint size)
85 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
87 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
92 int Win::detach(const void* /*base*/)
99 void Win::get_name(char* name, int* length) const
101 *length = static_cast<int>(name_.length());
102 if (not name_.empty()) {
103 name_.copy(name, *length);
104 name[*length] = '\0';
108 void Win::get_group(MPI_Group* group){
109 if(comm_ != MPI_COMM_NULL){
110 *group = comm_->group();
112 *group = MPI_GROUP_NULL;
118 if (info_ == MPI_INFO_NULL)
124 int Win::rank() const
129 MPI_Aint Win::size() const
134 void* Win::base() const
139 int Win::disp_unit() const
144 int Win::dynamic() const
149 void Win::set_info(MPI_Info info)
151 if (info_ != MPI_INFO_NULL)
152 simgrid::smpi::Info::unref(info_);
154 if (info_ != MPI_INFO_NULL)
158 void Win::set_name(const char* name){
162 int Win::fence(int assert)
164 XBT_DEBUG("Entering fence");
167 if (assert != MPI_MODE_NOPRECEDE) {
168 // This is not the first fence => finalize what came before
171 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
172 // Without this, the vector could get redimensioned when another process pushes.
173 // This would result in the array used by Request::waitall() to be invalidated.
174 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
176 // start all requests that have been prepared by another process
177 if (not requests_.empty()) {
178 int size = static_cast<int>(requests_.size());
179 MPI_Request* treqs = requests_.data();
180 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
186 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
191 XBT_DEBUG("Leaving fence");
196 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
197 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
199 //get receiver pointer
200 Win* recv_win = connected_wins_[target_rank];
202 if(opened_==0){//check that post/start has been done
203 // no fence or start .. lock ok ?
205 for (auto const& it : recv_win->lockers_)
206 if (it == comm_->rank())
212 if(target_count*target_datatype->get_extent()>recv_win->size_){
213 XBT_WARN("Trying to put more than the window size - Bailing out.");
214 return MPI_ERR_RMA_RANGE;
217 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
219 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
220 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
221 // prepare send_request
223 // TODO cheinrich Check for rank / pid conversion
224 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
227 //prepare receiver request
228 // TODO cheinrich Check for rank / pid conversion
229 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
230 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
235 if(request!=nullptr){
239 requests_.push_back(sreq);
243 //push request to receiver's win
244 recv_win->mut_->lock();
245 recv_win->requests_.push_back(rreq);
247 recv_win->mut_->unlock();
249 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
250 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
252 *request = MPI_REQUEST_NULL;
258 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
259 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
262 Win* send_win = connected_wins_[target_rank];
264 if(opened_==0){//check that post/start has been done
265 // no fence or start .. lock ok ?
267 for (auto const& it : send_win->lockers_)
268 if (it == comm_->rank())
274 if(target_count*target_datatype->get_extent()>send_win->size_){
275 XBT_WARN("Trying to get more than the window size - Bailing out.");
276 return MPI_ERR_RMA_RANGE;
279 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
280 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
282 if(target_rank != comm_->rank()){
283 //prepare send_request
284 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
285 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
287 //prepare receiver request
288 MPI_Request rreq = Request::rma_recv_init(
289 origin_addr, origin_count, origin_datatype, target_rank,
290 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
291 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
293 //start the send, with another process than us as sender.
295 //push request to receiver's win
296 send_win->mut_->lock();
297 send_win->requests_.push_back(sreq);
298 send_win->mut_->unlock();
303 if(request!=nullptr){
307 requests_.push_back(rreq);
311 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
313 *request=MPI_REQUEST_NULL;
318 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
319 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
321 XBT_DEBUG("Entering MPI_Win_Accumulate");
322 //get receiver pointer
323 Win* recv_win = connected_wins_[target_rank];
325 if(opened_==0){//check that post/start has been done
326 // no fence or start .. lock ok ?
328 for (auto const& it : recv_win->lockers_)
329 if (it == comm_->rank())
334 //FIXME: local version
336 if(target_count*target_datatype->get_extent()>recv_win->size_){
337 XBT_WARN("Trying to accumulate more than the window size - Bailing out.");
338 return MPI_ERR_RMA_RANGE;
341 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
342 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
343 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
344 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
345 // prepare send_request
347 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
348 SMPI_RMA_TAG - 3 - count_, comm_, op);
350 // prepare receiver request
351 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
352 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
358 // push request to receiver's win
359 recv_win->mut_->lock();
360 recv_win->requests_.push_back(rreq);
362 recv_win->mut_->unlock();
364 if (request != nullptr) {
368 requests_.push_back(sreq);
372 XBT_DEBUG("Leaving MPI_Win_Accumulate");
376 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
377 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
378 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
381 const Win* send_win = connected_wins_[target_rank];
383 if(opened_==0){//check that post/start has been done
384 // no fence or start .. lock ok ?
386 for (auto const& it : send_win->lockers_)
387 if (it == comm_->rank())
393 if(target_count*target_datatype->get_extent()>send_win->size_){
394 XBT_WARN("Trying to get_accumulate more than the window size - Bailing out.");
395 return MPI_ERR_RMA_RANGE;
399 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
400 //need to be sure ops are correctly ordered, so finish request here ? slow.
402 send_win->atomic_mut_->lock();
403 get(result_addr, result_count, result_datatype, target_rank,
404 target_disp, target_count, target_datatype, &req);
405 if (req != MPI_REQUEST_NULL)
406 Request::wait(&req, MPI_STATUS_IGNORE);
408 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
409 target_disp, target_count, target_datatype, op, &req);
410 if (req != MPI_REQUEST_NULL)
411 Request::wait(&req, MPI_STATUS_IGNORE);
412 send_win->atomic_mut_->unlock();
416 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
417 int target_rank, MPI_Aint target_disp)
420 const Win* send_win = connected_wins_[target_rank];
422 if(opened_==0){//check that post/start has been done
423 // no fence or start .. lock ok ?
425 for (auto const& it : send_win->lockers_)
426 if (it == comm_->rank())
432 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
433 MPI_Request req = MPI_REQUEST_NULL;
434 send_win->atomic_mut_->lock();
435 get(result_addr, 1, datatype, target_rank,
436 target_disp, 1, datatype, &req);
437 if (req != MPI_REQUEST_NULL)
438 Request::wait(&req, MPI_STATUS_IGNORE);
439 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
440 put(origin_addr, 1, datatype, target_rank,
441 target_disp, 1, datatype);
443 send_win->atomic_mut_->unlock();
447 int Win::start(MPI_Group group, int /*assert*/)
449 /* From MPI forum advices
450 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
451 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
452 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
453 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
454 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
455 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
456 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
457 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
458 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
459 must complete, without further dependencies. */
461 //naive, blocking implementation.
464 int size = group->size();
465 std::vector<MPI_Request> reqs(size);
467 XBT_DEBUG("Entering MPI_Win_Start");
469 int src = comm_->group()->rank(group->actor(j));
470 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
471 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
477 Request::startall(size, reqs.data());
478 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
479 for (i = 0; i < size; i++) {
480 Request::unref(&reqs[i]);
482 opened_++; //we're open for business !
485 XBT_DEBUG("Leaving MPI_Win_Start");
489 int Win::post(MPI_Group group, int /*assert*/)
491 //let's make a synchronous send here
494 int size = group->size();
495 std::vector<MPI_Request> reqs(size);
497 XBT_DEBUG("Entering MPI_Win_Post");
499 int dst = comm_->group()->rank(group->actor(j));
500 if (dst != rank_ && dst != MPI_UNDEFINED) {
501 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
508 Request::startall(size, reqs.data());
509 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
511 Request::unref(&reqs[i]);
513 opened_++; //we're open for business !
516 XBT_DEBUG("Leaving MPI_Win_Post");
522 xbt_die("Complete called on already opened MPI_Win");
524 XBT_DEBUG("Entering MPI_Win_Complete");
527 int size = group_->size();
528 std::vector<MPI_Request> reqs(size);
531 int dst = comm_->group()->rank(group_->actor(j));
532 if (dst != rank_ && dst != MPI_UNDEFINED) {
533 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
539 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
540 Request::startall(size, reqs.data());
541 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
544 Request::unref(&reqs[i]);
547 int finished = finish_comms();
548 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
550 Group::unref(group_);
551 opened_--; //we're closed for business !
556 //naive, blocking implementation.
557 XBT_DEBUG("Entering MPI_Win_Wait");
560 int size = group_->size();
561 std::vector<MPI_Request> reqs(size);
564 int src = comm_->group()->rank(group_->actor(j));
565 if (src != rank_ && src != MPI_UNDEFINED) {
566 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
572 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
573 Request::startall(size, reqs.data());
574 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
576 Request::unref(&reqs[i]);
578 int finished = finish_comms();
579 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
581 Group::unref(group_);
582 opened_--; //we're opened for business !
586 int Win::lock(int lock_type, int rank, int /*assert*/)
588 MPI_Win target_win = connected_wins_[rank];
590 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
591 target_win->lock_mut_->lock();
592 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
593 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
594 target_win->lock_mut_->unlock();
596 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
597 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
599 target_win->lockers_.push_back(comm_->rank());
601 int finished = finish_comms(rank);
602 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
603 finished = target_win->finish_comms(rank_);
604 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
608 int Win::lock_all(int assert){
609 int retval = MPI_SUCCESS;
610 for (int i = 0; i < comm_->size(); i++) {
611 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
612 if (ret != MPI_SUCCESS)
618 int Win::unlock(int rank){
619 MPI_Win target_win = connected_wins_[rank];
620 int target_mode = target_win->mode_;
621 target_win->mode_= 0;
622 target_win->lockers_.remove(comm_->rank());
623 if (target_mode==MPI_LOCK_EXCLUSIVE){
624 target_win->lock_mut_->unlock();
627 int finished = finish_comms(rank);
628 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
629 finished = target_win->finish_comms(rank_);
630 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
634 int Win::unlock_all(){
635 int retval = MPI_SUCCESS;
636 for (int i = 0; i < comm_->size(); i++) {
637 int ret = this->unlock(i);
638 if (ret != MPI_SUCCESS)
644 int Win::flush(int rank){
645 MPI_Win target_win = connected_wins_[rank];
646 int finished = finish_comms(rank_);
647 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
648 finished = target_win->finish_comms(rank);
649 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
653 int Win::flush_local(int rank){
654 int finished = finish_comms(rank);
655 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
659 int Win::flush_all(){
660 int finished = finish_comms();
661 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
662 for (int i = 0; i < comm_->size(); i++) {
663 finished = connected_wins_[i]->finish_comms(rank_);
664 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
669 int Win::flush_local_all(){
670 int finished = finish_comms();
671 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
675 Win* Win::f2c(int id){
676 return static_cast<Win*>(F2C::f2c(id));
679 int Win::finish_comms(){
681 //Finish own requests
682 int size = static_cast<int>(requests_.size());
684 MPI_Request* treqs = requests_.data();
685 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
692 int Win::finish_comms(int rank){
694 // Finish own requests
695 // Let's see if we're either the destination or the sender of this request
696 // because we only wait for requests that we are responsible for.
697 // Also use the process id here since the request itself returns from src()
698 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
699 int proc_id = comm_->group()->actor(rank)->get_pid();
700 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
701 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
703 std::vector<MPI_Request> myreqqs(it, end(requests_));
704 requests_.erase(it, end(requests_));
705 int size = static_cast<int>(myreqqs.size());
707 MPI_Request* treqs = myreqqs.data();
708 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
715 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
717 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
718 for (int i = 0; not target_win && i < comm_->size(); i++) {
719 if (connected_wins_[i]->size_ > 0)
720 target_win = connected_wins_[i];
723 *size = target_win->size_;
724 *disp_unit = target_win->disp_unit_;
725 *static_cast<void**>(baseptr) = target_win->base_;
728 *static_cast<void**>(baseptr) = nullptr;
733 MPI_Errhandler Win::errhandler()
735 if (errhandler_ != MPI_ERRHANDLER_NULL)
740 void Win::set_errhandler(MPI_Errhandler errhandler)
742 if (errhandler_ != MPI_ERRHANDLER_NULL)
743 simgrid::smpi::Errhandler::unref(errhandler_);
744 errhandler_ = errhandler;
745 if (errhandler_ != MPI_ERRHANDLER_NULL)
749 } // namespace simgrid