1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win)\
30 if(opened_==0){ /*check that post/start has been done*/\
32 for (auto const& it : win->lockers_)\
33 if (it == comm_->rank())\
41 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
42 int Win::keyval_id_=0;
44 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
47 , disp_unit_(disp_unit)
50 , connected_wins_(comm->size())
52 , allocated_(allocated)
55 XBT_DEBUG("Creating window");
56 if(info!=MPI_INFO_NULL)
58 connected_wins_[rank_] = this;
60 bar_ = new s4u::Barrier(comm->size());
63 comm->add_rma_win(this);
66 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
69 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
76 //As per the standard, perform a barrier to ensure every async comm is finished
79 int finished = finish_comms();
80 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
82 if (info_ != MPI_INFO_NULL)
83 simgrid::smpi::Info::unref(info_);
84 if (errhandler_ != MPI_ERRHANDLER_NULL)
85 simgrid::smpi::Errhandler::unref(errhandler_);
87 comm_->remove_rma_win(this);
89 colls::barrier(comm_);
98 F2C::free_f(this->f2c_id());
102 int Win::attach(void* /*base*/, MPI_Aint size)
104 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
106 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
111 int Win::detach(const void* /*base*/)
118 void Win::get_name(char* name, int* length) const
120 *length = static_cast<int>(name_.length());
121 if (not name_.empty()) {
122 name_.copy(name, *length);
123 name[*length] = '\0';
127 void Win::get_group(MPI_Group* group){
128 if(comm_ != MPI_COMM_NULL){
129 *group = comm_->group();
131 *group = MPI_GROUP_NULL;
137 if (info_ == MPI_INFO_NULL)
143 int Win::rank() const
148 MPI_Comm Win::comm() const
153 MPI_Aint Win::size() const
158 void* Win::base() const
163 int Win::disp_unit() const
168 int Win::dynamic() const
173 void Win::set_info(MPI_Info info)
175 if (info_ != MPI_INFO_NULL)
176 simgrid::smpi::Info::unref(info_);
178 if (info_ != MPI_INFO_NULL)
182 void Win::set_name(const char* name){
186 int Win::fence(int assert)
188 XBT_DEBUG("Entering fence");
191 if (assert != MPI_MODE_NOPRECEDE) {
192 // This is not the first fence => finalize what came before
195 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
196 // Without this, the vector could get redimensioned when another process pushes.
197 // This would result in the array used by Request::waitall() to be invalidated.
198 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
200 // start all requests that have been prepared by another process
201 if (not requests_.empty()) {
202 int size = static_cast<int>(requests_.size());
203 MPI_Request* treqs = requests_.data();
204 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
210 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
215 XBT_DEBUG("Leaving fence");
220 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
221 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
223 //get receiver pointer
224 Win* recv_win = connected_wins_[target_rank];
226 if(opened_==0){//check that post/start has been done
227 // no fence or start .. lock ok ?
229 for (auto const& it : recv_win->lockers_)
230 if (it == comm_->rank())
236 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
238 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
240 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
241 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
242 // prepare send_request
244 // TODO cheinrich Check for rank / pid conversion
245 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
248 //prepare receiver request
249 // TODO cheinrich Check for rank / pid conversion
250 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
251 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
256 if(request!=nullptr){
260 requests_.push_back(sreq);
264 //push request to receiver's win
265 recv_win->mut_->lock();
266 recv_win->requests_.push_back(rreq);
268 recv_win->mut_->unlock();
270 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
271 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
273 *request = MPI_REQUEST_NULL;
279 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
280 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
283 Win* send_win = connected_wins_[target_rank];
285 CHECK_WIN_LOCKED(send_win)
286 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
288 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
289 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
291 if(target_rank != comm_->rank()){
292 //prepare send_request
293 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
294 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
296 //prepare receiver request
297 MPI_Request rreq = Request::rma_recv_init(
298 origin_addr, origin_count, origin_datatype, target_rank,
299 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
300 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
302 //start the send, with another process than us as sender.
304 //push request to receiver's win
305 send_win->mut_->lock();
306 send_win->requests_.push_back(sreq);
307 send_win->mut_->unlock();
312 if(request!=nullptr){
316 requests_.push_back(rreq);
320 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
322 *request=MPI_REQUEST_NULL;
327 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
328 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
330 XBT_DEBUG("Entering MPI_Win_Accumulate");
331 //get receiver pointer
332 Win* recv_win = connected_wins_[target_rank];
334 //FIXME: local version
335 CHECK_WIN_LOCKED(recv_win)
336 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
338 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
339 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
340 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
341 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
342 // prepare send_request
344 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
345 SMPI_RMA_TAG - 3 - count_, comm_, op);
347 // prepare receiver request
348 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
349 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
355 // push request to receiver's win
356 recv_win->mut_->lock();
357 recv_win->requests_.push_back(rreq);
359 recv_win->mut_->unlock();
361 if (request != nullptr) {
365 requests_.push_back(sreq);
369 XBT_DEBUG("Leaving MPI_Win_Accumulate");
373 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
374 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
375 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
378 const Win* send_win = connected_wins_[target_rank];
380 CHECK_WIN_LOCKED(send_win)
381 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
383 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
384 //need to be sure ops are correctly ordered, so finish request here ? slow.
386 send_win->atomic_mut_->lock();
387 get(result_addr, result_count, result_datatype, target_rank,
388 target_disp, target_count, target_datatype, &req);
389 if (req != MPI_REQUEST_NULL)
390 Request::wait(&req, MPI_STATUS_IGNORE);
392 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
393 target_disp, target_count, target_datatype, op, &req);
394 if (req != MPI_REQUEST_NULL)
395 Request::wait(&req, MPI_STATUS_IGNORE);
396 send_win->atomic_mut_->unlock();
400 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
401 int target_rank, MPI_Aint target_disp)
404 const Win* send_win = connected_wins_[target_rank];
406 CHECK_WIN_LOCKED(send_win)
408 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
409 MPI_Request req = MPI_REQUEST_NULL;
410 send_win->atomic_mut_->lock();
411 get(result_addr, 1, datatype, target_rank,
412 target_disp, 1, datatype, &req);
413 if (req != MPI_REQUEST_NULL)
414 Request::wait(&req, MPI_STATUS_IGNORE);
415 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
416 put(origin_addr, 1, datatype, target_rank,
417 target_disp, 1, datatype);
419 send_win->atomic_mut_->unlock();
423 int Win::start(MPI_Group group, int /*assert*/)
425 /* From MPI forum advices
426 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
427 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
428 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
429 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
430 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
431 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
432 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
433 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
434 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
435 must complete, without further dependencies. */
437 //naive, blocking implementation.
440 int size = group->size();
441 std::vector<MPI_Request> reqs(size);
443 XBT_DEBUG("Entering MPI_Win_Start");
445 int src = comm_->group()->rank(group->actor(j));
446 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
447 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
453 Request::startall(size, reqs.data());
454 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
455 for (i = 0; i < size; i++) {
456 Request::unref(&reqs[i]);
458 opened_++; //we're open for business !
461 XBT_DEBUG("Leaving MPI_Win_Start");
465 int Win::post(MPI_Group group, int /*assert*/)
467 //let's make a synchronous send here
470 int size = group->size();
471 std::vector<MPI_Request> reqs(size);
473 XBT_DEBUG("Entering MPI_Win_Post");
475 int dst = comm_->group()->rank(group->actor(j));
476 if (dst != rank_ && dst != MPI_UNDEFINED) {
477 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
484 Request::startall(size, reqs.data());
485 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
487 Request::unref(&reqs[i]);
489 opened_++; //we're open for business !
492 XBT_DEBUG("Leaving MPI_Win_Post");
497 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
499 XBT_DEBUG("Entering MPI_Win_Complete");
502 int size = group_->size();
503 std::vector<MPI_Request> reqs(size);
506 int dst = comm_->group()->rank(group_->actor(j));
507 if (dst != rank_ && dst != MPI_UNDEFINED) {
508 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
514 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
515 Request::startall(size, reqs.data());
516 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
519 Request::unref(&reqs[i]);
522 int finished = finish_comms();
523 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
525 Group::unref(group_);
526 opened_--; //we're closed for business !
531 //naive, blocking implementation.
532 XBT_DEBUG("Entering MPI_Win_Wait");
535 int size = group_->size();
536 std::vector<MPI_Request> reqs(size);
539 int src = comm_->group()->rank(group_->actor(j));
540 if (src != rank_ && src != MPI_UNDEFINED) {
541 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
547 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
548 Request::startall(size, reqs.data());
549 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
551 Request::unref(&reqs[i]);
553 int finished = finish_comms();
554 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
556 Group::unref(group_);
557 opened_--; //we're opened for business !
561 int Win::lock(int lock_type, int rank, int /*assert*/)
563 MPI_Win target_win = connected_wins_[rank];
565 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
566 target_win->lock_mut_->lock();
567 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
568 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
569 target_win->lock_mut_->unlock();
571 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
572 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
574 target_win->lockers_.push_back(comm_->rank());
576 int finished = finish_comms(rank);
577 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
578 finished = target_win->finish_comms(rank_);
579 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
583 int Win::lock_all(int assert){
584 int retval = MPI_SUCCESS;
585 for (int i = 0; i < comm_->size(); i++) {
586 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
587 if (ret != MPI_SUCCESS)
593 int Win::unlock(int rank){
594 MPI_Win target_win = connected_wins_[rank];
595 int target_mode = target_win->mode_;
596 target_win->mode_= 0;
597 target_win->lockers_.remove(comm_->rank());
598 if (target_mode==MPI_LOCK_EXCLUSIVE){
599 target_win->lock_mut_->unlock();
602 int finished = finish_comms(rank);
603 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
604 finished = target_win->finish_comms(rank_);
605 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
609 int Win::unlock_all(){
610 int retval = MPI_SUCCESS;
611 for (int i = 0; i < comm_->size(); i++) {
612 int ret = this->unlock(i);
613 if (ret != MPI_SUCCESS)
619 int Win::flush(int rank){
620 MPI_Win target_win = connected_wins_[rank];
621 int finished = finish_comms(rank_);
622 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
623 finished = target_win->finish_comms(rank);
624 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
628 int Win::flush_local(int rank){
629 int finished = finish_comms(rank);
630 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
634 int Win::flush_all(){
635 int finished = finish_comms();
636 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
637 for (int i = 0; i < comm_->size(); i++) {
638 finished = connected_wins_[i]->finish_comms(rank_);
639 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
644 int Win::flush_local_all(){
645 int finished = finish_comms();
646 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
650 Win* Win::f2c(int id){
651 return static_cast<Win*>(F2C::f2c(id));
654 int Win::finish_comms(){
656 //Finish own requests
657 int size = static_cast<int>(requests_.size());
659 MPI_Request* treqs = requests_.data();
660 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
667 int Win::finish_comms(int rank){
669 // Finish own requests
670 // Let's see if we're either the destination or the sender of this request
671 // because we only wait for requests that we are responsible for.
672 // Also use the process id here since the request itself returns from src()
673 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
674 int proc_id = comm_->group()->actor(rank)->get_pid();
675 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
676 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
678 std::vector<MPI_Request> myreqqs(it, end(requests_));
679 requests_.erase(it, end(requests_));
680 int size = static_cast<int>(myreqqs.size());
682 MPI_Request* treqs = myreqqs.data();
683 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
690 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
692 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
693 for (int i = 0; not target_win && i < comm_->size(); i++) {
694 if (connected_wins_[i]->size_ > 0)
695 target_win = connected_wins_[i];
698 *size = target_win->size_;
699 *disp_unit = target_win->disp_unit_;
700 *static_cast<void**>(baseptr) = target_win->base_;
703 *static_cast<void**>(baseptr) = nullptr;
708 MPI_Errhandler Win::errhandler()
710 if (errhandler_ != MPI_ERRHANDLER_NULL)
715 void Win::set_errhandler(MPI_Errhandler errhandler)
717 if (errhandler_ != MPI_ERRHANDLER_NULL)
718 simgrid::smpi::Errhandler::unref(errhandler_);
719 errhandler_ = errhandler;
720 if (errhandler_ != MPI_ERRHANDLER_NULL)
724 } // namespace simgrid