1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win) \
30 if (opened_ == 0) { /*check that post/start has been done*/ \
31 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44 , disp_unit_(disp_unit)
47 , connected_wins_(comm->size())
49 , allocated_(allocated)
52 XBT_DEBUG("Creating window");
53 if(info!=MPI_INFO_NULL)
55 connected_wins_[rank_] = this;
57 bar_ = new s4u::Barrier(comm->size());
60 comm->add_rma_win(this);
63 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
66 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
73 //As per the standard, perform a barrier to ensure every async comm is finished
76 int finished = finish_comms();
77 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
79 if (info_ != MPI_INFO_NULL)
80 simgrid::smpi::Info::unref(info_);
81 if (errhandler_ != MPI_ERRHANDLER_NULL)
82 simgrid::smpi::Errhandler::unref(errhandler_);
84 comm_->remove_rma_win(this);
86 colls::barrier(comm_);
95 F2C::free_f(this->f2c_id());
99 int Win::attach(void* /*base*/, MPI_Aint size)
101 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
103 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
108 int Win::detach(const void* /*base*/)
115 void Win::get_name(char* name, int* length) const
117 *length = static_cast<int>(name_.length());
118 if (not name_.empty()) {
119 name_.copy(name, *length);
120 name[*length] = '\0';
124 void Win::get_group(MPI_Group* group){
125 if(comm_ != MPI_COMM_NULL){
126 *group = comm_->group();
128 *group = MPI_GROUP_NULL;
134 if (info_ == MPI_INFO_NULL)
140 int Win::rank() const
145 MPI_Comm Win::comm() const
150 MPI_Aint Win::size() const
155 void* Win::base() const
160 int Win::disp_unit() const
165 bool Win::dynamic() const
170 void Win::set_info(MPI_Info info)
172 if (info_ != MPI_INFO_NULL)
173 simgrid::smpi::Info::unref(info_);
175 if (info_ != MPI_INFO_NULL)
179 void Win::set_name(const char* name){
183 int Win::fence(int assert)
185 XBT_DEBUG("Entering fence");
188 if (not (assert & MPI_MODE_NOPRECEDE)) {
189 // This is not the first fence => finalize what came before
192 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
193 // Without this, the vector could get redimensioned when another process pushes.
194 // This would result in the array used by Request::waitall() to be invalidated.
195 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
197 // start all requests that have been prepared by another process
198 if (not requests_.empty()) {
199 int size = static_cast<int>(requests_.size());
200 MPI_Request* treqs = requests_.data();
201 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
207 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
212 XBT_DEBUG("Leaving fence");
217 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
218 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
220 //get receiver pointer
221 Win* recv_win = connected_wins_[target_rank];
223 CHECK_WIN_LOCKED(recv_win)
224 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
226 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
228 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
229 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
230 // prepare send_request
232 // TODO cheinrich Check for rank / pid conversion
233 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
236 //prepare receiver request
237 // TODO cheinrich Check for rank / pid conversion
238 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
239 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
244 if(request!=nullptr){
248 requests_.push_back(sreq);
252 //push request to receiver's win
253 recv_win->mut_->lock();
254 recv_win->requests_.push_back(rreq);
256 recv_win->mut_->unlock();
258 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
259 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
261 *request = MPI_REQUEST_NULL;
267 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
268 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
271 Win* send_win = connected_wins_[target_rank];
273 CHECK_WIN_LOCKED(send_win)
274 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
276 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
277 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
279 if(target_rank != comm_->rank()){
280 //prepare send_request
281 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
282 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
284 //prepare receiver request
285 MPI_Request rreq = Request::rma_recv_init(
286 origin_addr, origin_count, origin_datatype, target_rank,
287 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
288 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
290 //start the send, with another process than us as sender.
292 //push request to receiver's win
293 send_win->mut_->lock();
294 send_win->requests_.push_back(sreq);
295 send_win->mut_->unlock();
300 if(request!=nullptr){
304 requests_.push_back(rreq);
308 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
310 *request=MPI_REQUEST_NULL;
315 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
316 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
318 XBT_DEBUG("Entering MPI_Win_Accumulate");
319 //get receiver pointer
320 Win* recv_win = connected_wins_[target_rank];
322 //FIXME: local version
323 CHECK_WIN_LOCKED(recv_win)
324 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
326 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
327 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
328 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
329 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
330 // prepare send_request
332 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
333 SMPI_RMA_TAG - 3 - count_, comm_, op);
335 // prepare receiver request
336 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
337 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)),
338 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
344 // push request to receiver's win
345 recv_win->mut_->lock();
346 recv_win->requests_.push_back(rreq);
348 recv_win->mut_->unlock();
350 if (request != nullptr) {
354 requests_.push_back(sreq);
358 XBT_DEBUG("Leaving MPI_Win_Accumulate");
362 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
363 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
364 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
367 const Win* send_win = connected_wins_[target_rank];
369 CHECK_WIN_LOCKED(send_win)
370 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
372 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
373 //need to be sure ops are correctly ordered, so finish request here ? slow.
375 send_win->atomic_mut_->lock();
376 get(result_addr, result_count, result_datatype, target_rank,
377 target_disp, target_count, target_datatype, &req);
378 if (req != MPI_REQUEST_NULL)
379 Request::wait(&req, MPI_STATUS_IGNORE);
381 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
382 target_disp, target_count, target_datatype, op, &req);
383 if (req != MPI_REQUEST_NULL)
384 Request::wait(&req, MPI_STATUS_IGNORE);
385 send_win->atomic_mut_->unlock();
389 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
390 int target_rank, MPI_Aint target_disp)
393 const Win* send_win = connected_wins_[target_rank];
395 CHECK_WIN_LOCKED(send_win)
397 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
398 MPI_Request req = MPI_REQUEST_NULL;
399 send_win->atomic_mut_->lock();
400 get(result_addr, 1, datatype, target_rank,
401 target_disp, 1, datatype, &req);
402 if (req != MPI_REQUEST_NULL)
403 Request::wait(&req, MPI_STATUS_IGNORE);
404 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
405 put(origin_addr, 1, datatype, target_rank,
406 target_disp, 1, datatype);
408 send_win->atomic_mut_->unlock();
412 int Win::start(MPI_Group group, int /*assert*/)
414 /* From MPI forum advices
415 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
416 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
417 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
418 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
419 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
420 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
421 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
422 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
423 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
424 must complete, without further dependencies. */
426 //naive, blocking implementation.
427 XBT_DEBUG("Entering MPI_Win_Start");
428 std::vector<MPI_Request> reqs;
429 for (int i = 0; i < group->size(); i++) {
430 int src = comm_->group()->rank(group->actor(i));
431 xbt_assert(src != MPI_UNDEFINED);
433 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
435 int size = static_cast<int>(reqs.size());
437 Request::startall(size, reqs.data());
438 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
439 for (auto& req : reqs)
440 Request::unref(&req);
444 opened_++; // we're open for business !
445 XBT_DEBUG("Leaving MPI_Win_Start");
449 int Win::post(MPI_Group group, int /*assert*/)
451 //let's make a synchronous send here
452 XBT_DEBUG("Entering MPI_Win_Post");
453 std::vector<MPI_Request> reqs;
454 for (int i = 0; i < group->size(); i++) {
455 int dst = comm_->group()->rank(group->actor(i));
456 xbt_assert(dst != MPI_UNDEFINED);
458 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
460 int size = static_cast<int>(reqs.size());
462 Request::startall(size, reqs.data());
463 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
464 for (auto& req : reqs)
465 Request::unref(&req);
469 opened_++; // we're open for business !
470 XBT_DEBUG("Leaving MPI_Win_Post");
475 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
477 XBT_DEBUG("Entering MPI_Win_Complete");
478 std::vector<MPI_Request> reqs;
479 for (int i = 0; i < dst_group_->size(); i++) {
480 int dst = comm_->group()->rank(dst_group_->actor(i));
481 xbt_assert(dst != MPI_UNDEFINED);
483 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
485 int size = static_cast<int>(reqs.size());
487 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
488 Request::startall(size, reqs.data());
489 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
490 for (auto& req : reqs)
491 Request::unref(&req);
493 int finished = finish_comms();
494 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
496 opened_--; //we're closed for business !
497 Group::unref(dst_group_);
498 dst_group_ = MPI_GROUP_NULL;
503 //naive, blocking implementation.
504 XBT_DEBUG("Entering MPI_Win_Wait");
505 std::vector<MPI_Request> reqs;
506 for (int i = 0; i < src_group_->size(); i++) {
507 int src = comm_->group()->rank(src_group_->actor(i));
508 xbt_assert(src != MPI_UNDEFINED);
510 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
512 int size = static_cast<int>(reqs.size());
514 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
515 Request::startall(size, reqs.data());
516 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
517 for (auto& req : reqs)
518 Request::unref(&req);
520 int finished = finish_comms();
521 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
523 opened_--; //we're closed for business !
524 Group::unref(src_group_);
525 src_group_ = MPI_GROUP_NULL;
529 int Win::lock(int lock_type, int rank, int /*assert*/)
531 MPI_Win target_win = connected_wins_[rank];
533 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
534 target_win->lock_mut_->lock();
535 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
536 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
537 target_win->lock_mut_->unlock();
539 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
540 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
542 target_win->lockers_.push_back(comm_->rank());
544 int finished = finish_comms(rank);
545 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
546 finished = target_win->finish_comms(rank_);
547 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
551 int Win::lock_all(int assert){
552 int retval = MPI_SUCCESS;
553 for (int i = 0; i < comm_->size(); i++) {
554 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
555 if (ret != MPI_SUCCESS)
561 int Win::unlock(int rank){
562 MPI_Win target_win = connected_wins_[rank];
563 int target_mode = target_win->mode_;
564 target_win->mode_= 0;
565 target_win->lockers_.remove(comm_->rank());
566 if (target_mode==MPI_LOCK_EXCLUSIVE){
567 target_win->lock_mut_->unlock();
570 int finished = finish_comms(rank);
571 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
572 finished = target_win->finish_comms(rank_);
573 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
577 int Win::unlock_all(){
578 int retval = MPI_SUCCESS;
579 for (int i = 0; i < comm_->size(); i++) {
580 int ret = this->unlock(i);
581 if (ret != MPI_SUCCESS)
587 int Win::flush(int rank){
588 MPI_Win target_win = connected_wins_[rank];
589 int finished = finish_comms(rank_);
590 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
591 finished = target_win->finish_comms(rank);
592 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
596 int Win::flush_local(int rank){
597 int finished = finish_comms(rank);
598 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
602 int Win::flush_all(){
603 int finished = finish_comms();
604 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
605 for (int i = 0; i < comm_->size(); i++) {
606 finished = connected_wins_[i]->finish_comms(rank_);
607 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
612 int Win::flush_local_all(){
613 int finished = finish_comms();
614 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
618 Win* Win::f2c(int id){
619 return static_cast<Win*>(F2C::f2c(id));
622 int Win::finish_comms(){
624 //Finish own requests
625 int size = static_cast<int>(requests_.size());
627 MPI_Request* treqs = requests_.data();
628 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
635 int Win::finish_comms(int rank){
637 // Finish own requests
638 // Let's see if we're either the destination or the sender of this request
639 // because we only wait for requests that we are responsible for.
640 // Also use the process id here since the request itself returns from src()
641 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
642 aid_t proc_id = comm_->group()->actor(rank);
643 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
644 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
646 std::vector<MPI_Request> myreqqs(it, end(requests_));
647 requests_.erase(it, end(requests_));
648 int size = static_cast<int>(myreqqs.size());
650 MPI_Request* treqs = myreqqs.data();
651 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
658 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
660 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
661 for (int i = 0; not target_win && i < comm_->size(); i++) {
662 if (connected_wins_[i]->size_ > 0)
663 target_win = connected_wins_[i];
666 *size = target_win->size_;
667 *disp_unit = target_win->disp_unit_;
668 *static_cast<void**>(baseptr) = target_win->base_;
671 *static_cast<void**>(baseptr) = nullptr;
676 MPI_Errhandler Win::errhandler()
678 if (errhandler_ != MPI_ERRHANDLER_NULL)
683 void Win::set_errhandler(MPI_Errhandler errhandler)
685 if (errhandler_ != MPI_ERRHANDLER_NULL)
686 simgrid::smpi::Errhandler::unref(errhandler_);
687 errhandler_ = errhandler;
688 if (errhandler_ != MPI_ERRHANDLER_NULL)
692 } // namespace simgrid