1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win) \
30 if (opened_ == 0) { /*check that post/start has been done*/ \
31 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44 , disp_unit_(disp_unit)
47 , connected_wins_(comm->size())
49 , allocated_(allocated)
52 XBT_DEBUG("Creating window");
53 if(info!=MPI_INFO_NULL)
55 connected_wins_[rank_] = this;
57 bar_ = new s4u::Barrier(comm->size());
60 comm->add_rma_win(this);
63 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
66 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
73 //As per the standard, perform a barrier to ensure every async comm is finished
78 if (info_ != MPI_INFO_NULL)
79 simgrid::smpi::Info::unref(info_);
80 if (errhandler_ != MPI_ERRHANDLER_NULL)
81 simgrid::smpi::Errhandler::unref(errhandler_);
83 comm_->remove_rma_win(this);
85 colls::barrier(comm_);
94 F2C::free_f(this->f2c_id());
98 int Win::attach(void* /*base*/, MPI_Aint size)
100 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
102 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
107 int Win::detach(const void* /*base*/)
114 void Win::get_name(char* name, int* length) const
116 *length = static_cast<int>(name_.length());
117 if (not name_.empty()) {
118 name_.copy(name, *length);
119 name[*length] = '\0';
123 void Win::get_group(MPI_Group* group){
124 if(comm_ != MPI_COMM_NULL){
125 *group = comm_->group();
127 *group = MPI_GROUP_NULL;
133 if (info_ == MPI_INFO_NULL)
139 int Win::rank() const
144 MPI_Comm Win::comm() const
149 MPI_Aint Win::size() const
154 void* Win::base() const
159 int Win::disp_unit() const
164 bool Win::dynamic() const
169 void Win::set_info(MPI_Info info)
171 if (info_ != MPI_INFO_NULL)
172 simgrid::smpi::Info::unref(info_);
174 if (info_ != MPI_INFO_NULL)
178 void Win::set_name(const char* name){
182 int Win::fence(int assert)
184 XBT_DEBUG("Entering fence");
186 if (not (assert & MPI_MODE_NOPRECEDE)) {
187 // This is not the first fence => finalize what came before
193 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
198 XBT_DEBUG("Leaving fence");
203 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
204 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
206 //get receiver pointer
207 Win* recv_win = connected_wins_[target_rank];
209 CHECK_WIN_LOCKED(recv_win)
210 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
212 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
214 if (target_rank != rank_) { // This is not for myself, so we need to send messages
215 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
216 // prepare send_request
218 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
221 //prepare receiver request
222 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
223 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
228 if(request!=nullptr){
232 requests_.push_back(sreq);
236 //push request to receiver's win
237 recv_win->mut_->lock();
238 recv_win->requests_.push_back(rreq);
240 recv_win->mut_->unlock();
242 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
243 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
245 *request = MPI_REQUEST_NULL;
251 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
252 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
255 Win* send_win = connected_wins_[target_rank];
257 CHECK_WIN_LOCKED(send_win)
258 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
260 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
261 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
263 if (target_rank != rank_) {
264 //prepare send_request
265 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
266 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
268 //prepare receiver request
269 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
270 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
272 //start the send, with another process than us as sender.
274 // push request to sender's win
275 send_win->mut_->lock();
276 send_win->requests_.push_back(sreq);
277 send_win->mut_->unlock();
282 if(request!=nullptr){
286 requests_.push_back(rreq);
290 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
292 *request=MPI_REQUEST_NULL;
297 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
298 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
300 XBT_DEBUG("Entering MPI_Win_Accumulate");
301 //get receiver pointer
302 Win* recv_win = connected_wins_[target_rank];
304 //FIXME: local version
305 CHECK_WIN_LOCKED(recv_win)
306 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
308 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
309 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
310 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
311 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
312 // prepare send_request
314 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
315 SMPI_RMA_TAG - 3 - count_, comm_, op);
317 // prepare receiver request
318 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
319 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
325 // push request to receiver's win
326 recv_win->mut_->lock();
327 recv_win->requests_.push_back(rreq);
329 recv_win->mut_->unlock();
331 if (request != nullptr) {
335 requests_.push_back(sreq);
339 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
340 // 'flush' is a workaround to fix that.
342 XBT_DEBUG("Leaving MPI_Win_Accumulate");
346 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
347 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
348 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
351 const Win* send_win = connected_wins_[target_rank];
353 CHECK_WIN_LOCKED(send_win)
354 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
356 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
357 //need to be sure ops are correctly ordered, so finish request here ? slow.
358 MPI_Request req = MPI_REQUEST_NULL;
359 send_win->atomic_mut_->lock();
360 get(result_addr, result_count, result_datatype, target_rank,
361 target_disp, target_count, target_datatype, &req);
362 if (req != MPI_REQUEST_NULL)
363 Request::wait(&req, MPI_STATUS_IGNORE);
365 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
366 target_disp, target_count, target_datatype, op, &req);
367 if (req != MPI_REQUEST_NULL)
368 Request::wait(&req, MPI_STATUS_IGNORE);
369 send_win->atomic_mut_->unlock();
373 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
374 int target_rank, MPI_Aint target_disp)
377 const Win* send_win = connected_wins_[target_rank];
379 CHECK_WIN_LOCKED(send_win)
381 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
382 MPI_Request req = MPI_REQUEST_NULL;
383 send_win->atomic_mut_->lock();
384 get(result_addr, 1, datatype, target_rank,
385 target_disp, 1, datatype, &req);
386 if (req != MPI_REQUEST_NULL)
387 Request::wait(&req, MPI_STATUS_IGNORE);
388 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
389 put(origin_addr, 1, datatype, target_rank,
390 target_disp, 1, datatype);
392 send_win->atomic_mut_->unlock();
396 int Win::start(MPI_Group group, int /*assert*/)
398 /* From MPI forum advices
399 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
400 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
401 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
402 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
403 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
404 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
405 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
406 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
407 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
408 must complete, without further dependencies. */
410 //naive, blocking implementation.
411 XBT_DEBUG("Entering MPI_Win_Start");
412 std::vector<MPI_Request> reqs;
413 for (int i = 0; i < group->size(); i++) {
414 int src = comm_->group()->rank(group->actor(i));
415 xbt_assert(src != MPI_UNDEFINED);
417 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
419 int size = static_cast<int>(reqs.size());
421 Request::startall(size, reqs.data());
422 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
423 for (auto& req : reqs)
424 Request::unref(&req);
428 opened_++; // we're open for business !
429 XBT_DEBUG("Leaving MPI_Win_Start");
433 int Win::post(MPI_Group group, int /*assert*/)
435 //let's make a synchronous send here
436 XBT_DEBUG("Entering MPI_Win_Post");
437 std::vector<MPI_Request> reqs;
438 for (int i = 0; i < group->size(); i++) {
439 int dst = comm_->group()->rank(group->actor(i));
440 xbt_assert(dst != MPI_UNDEFINED);
442 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
444 int size = static_cast<int>(reqs.size());
446 Request::startall(size, reqs.data());
447 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
448 for (auto& req : reqs)
449 Request::unref(&req);
453 opened_++; // we're open for business !
454 XBT_DEBUG("Leaving MPI_Win_Post");
459 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
461 XBT_DEBUG("Entering MPI_Win_Complete");
462 std::vector<MPI_Request> reqs;
463 for (int i = 0; i < dst_group_->size(); i++) {
464 int dst = comm_->group()->rank(dst_group_->actor(i));
465 xbt_assert(dst != MPI_UNDEFINED);
467 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
469 int size = static_cast<int>(reqs.size());
471 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
472 Request::startall(size, reqs.data());
473 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
474 for (auto& req : reqs)
475 Request::unref(&req);
479 opened_--; //we're closed for business !
480 Group::unref(dst_group_);
481 dst_group_ = MPI_GROUP_NULL;
486 //naive, blocking implementation.
487 XBT_DEBUG("Entering MPI_Win_Wait");
488 std::vector<MPI_Request> reqs;
489 for (int i = 0; i < src_group_->size(); i++) {
490 int src = comm_->group()->rank(src_group_->actor(i));
491 xbt_assert(src != MPI_UNDEFINED);
493 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
495 int size = static_cast<int>(reqs.size());
497 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
498 Request::startall(size, reqs.data());
499 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
500 for (auto& req : reqs)
501 Request::unref(&req);
505 opened_--; //we're closed for business !
506 Group::unref(src_group_);
507 src_group_ = MPI_GROUP_NULL;
511 int Win::lock(int lock_type, int rank, int /*assert*/)
513 MPI_Win target_win = connected_wins_[rank];
515 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
516 target_win->lock_mut_->lock();
517 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
518 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
519 target_win->lock_mut_->unlock();
521 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
522 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
524 target_win->lockers_.push_back(rank_);
530 int Win::lock_all(int assert){
531 int retval = MPI_SUCCESS;
532 for (int i = 0; i < comm_->size(); i++) {
533 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
534 if (ret != MPI_SUCCESS)
540 int Win::unlock(int rank){
541 MPI_Win target_win = connected_wins_[rank];
542 int target_mode = target_win->mode_;
543 target_win->mode_= 0;
544 target_win->lockers_.remove(rank_);
545 if (target_mode==MPI_LOCK_EXCLUSIVE){
546 target_win->lock_mut_->unlock();
553 int Win::unlock_all(){
554 int retval = MPI_SUCCESS;
555 for (int i = 0; i < comm_->size(); i++) {
556 int ret = this->unlock(i);
557 if (ret != MPI_SUCCESS)
563 int Win::flush(int rank){
564 int finished = finish_comms(rank);
565 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
567 finished = connected_wins_[rank]->finish_comms(rank_);
568 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
573 int Win::flush_local(int rank){
574 int finished = finish_comms(rank);
575 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
579 int Win::flush_all(){
580 int finished = finish_comms();
581 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
582 for (int i = 0; i < comm_->size(); i++) {
584 finished = connected_wins_[i]->finish_comms(rank_);
585 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
591 int Win::flush_local_all(){
592 int finished = finish_comms();
593 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
597 Win* Win::f2c(int id){
598 return static_cast<Win*>(F2C::f2c(id));
601 int Win::finish_comms(){
602 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
603 // Without this, the vector could get redimensioned when another process pushes.
604 // This would result in the array used by Request::waitall() to be invalidated.
605 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
607 //Finish own requests
608 int size = static_cast<int>(requests_.size());
610 MPI_Request* treqs = requests_.data();
611 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
618 int Win::finish_comms(int rank){
619 // See comment about the mutex in finish_comms() above
621 // Finish own requests
622 // Let's see if we're either the destination or the sender of this request
623 // because we only wait for requests that we are responsible for.
624 // Also use the process id here since the request itself returns from src()
625 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
626 aid_t proc_id = comm_->group()->actor(rank);
627 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
628 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
630 std::vector<MPI_Request> myreqqs(it, end(requests_));
631 requests_.erase(it, end(requests_));
632 int size = static_cast<int>(myreqqs.size());
634 MPI_Request* treqs = myreqqs.data();
635 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
642 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
644 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
645 for (int i = 0; not target_win && i < comm_->size(); i++) {
646 if (connected_wins_[i]->size_ > 0)
647 target_win = connected_wins_[i];
650 *size = target_win->size_;
651 *disp_unit = target_win->disp_unit_;
652 *static_cast<void**>(baseptr) = target_win->base_;
655 *static_cast<void**>(baseptr) = nullptr;
660 MPI_Errhandler Win::errhandler()
662 if (errhandler_ != MPI_ERRHANDLER_NULL)
667 void Win::set_errhandler(MPI_Errhandler errhandler)
669 if (errhandler_ != MPI_ERRHANDLER_NULL)
670 simgrid::smpi::Errhandler::unref(errhandler_);
671 errhandler_ = errhandler;
672 if (errhandler_ != MPI_ERRHANDLER_NULL)
676 } // namespace simgrid