1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win)\
30 if(opened_==0){ /*check that post/start has been done*/\
32 for (auto const& it : win->lockers_)\
33 if (it == comm_->rank())\
41 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
42 int Win::keyval_id_=0;
44 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
47 , disp_unit_(disp_unit)
50 , connected_wins_(comm->size())
52 , allocated_(allocated)
55 XBT_DEBUG("Creating window");
56 if(info!=MPI_INFO_NULL)
58 connected_wins_[rank_] = this;
60 bar_ = new s4u::Barrier(comm->size());
63 comm->add_rma_win(this);
66 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
69 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
76 //As per the standard, perform a barrier to ensure every async comm is finished
79 int finished = finish_comms();
80 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
82 if (info_ != MPI_INFO_NULL)
83 simgrid::smpi::Info::unref(info_);
84 if (errhandler_ != MPI_ERRHANDLER_NULL)
85 simgrid::smpi::Errhandler::unref(errhandler_);
87 comm_->remove_rma_win(this);
89 colls::barrier(comm_);
98 F2C::free_f(this->f2c_id());
102 int Win::attach(void* /*base*/, MPI_Aint size)
104 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
106 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
111 int Win::detach(const void* /*base*/)
118 void Win::get_name(char* name, int* length) const
120 *length = static_cast<int>(name_.length());
121 if (not name_.empty()) {
122 name_.copy(name, *length);
123 name[*length] = '\0';
127 void Win::get_group(MPI_Group* group){
128 if(comm_ != MPI_COMM_NULL){
129 *group = comm_->group();
131 *group = MPI_GROUP_NULL;
137 if (info_ == MPI_INFO_NULL)
143 int Win::rank() const
148 MPI_Comm Win::comm() const
153 MPI_Aint Win::size() const
158 void* Win::base() const
163 int Win::disp_unit() const
168 int Win::dynamic() const
173 void Win::set_info(MPI_Info info)
175 if (info_ != MPI_INFO_NULL)
176 simgrid::smpi::Info::unref(info_);
178 if (info_ != MPI_INFO_NULL)
182 void Win::set_name(const char* name){
186 int Win::fence(int assert)
188 XBT_DEBUG("Entering fence");
191 if (assert != MPI_MODE_NOPRECEDE) {
192 // This is not the first fence => finalize what came before
195 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
196 // Without this, the vector could get redimensioned when another process pushes.
197 // This would result in the array used by Request::waitall() to be invalidated.
198 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
200 // start all requests that have been prepared by another process
201 if (not requests_.empty()) {
202 int size = static_cast<int>(requests_.size());
203 MPI_Request* treqs = requests_.data();
204 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
210 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
215 XBT_DEBUG("Leaving fence");
220 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
221 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
223 //get receiver pointer
224 Win* recv_win = connected_wins_[target_rank];
226 if(opened_==0){//check that post/start has been done
227 // no fence or start .. lock ok ?
229 for (auto const& it : recv_win->lockers_)
230 if (it == comm_->rank())
236 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
238 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
240 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
241 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
242 // prepare send_request
244 // TODO cheinrich Check for rank / pid conversion
245 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
248 //prepare receiver request
249 // TODO cheinrich Check for rank / pid conversion
250 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
251 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
256 if(request!=nullptr){
260 requests_.push_back(sreq);
264 //push request to receiver's win
265 recv_win->mut_->lock();
266 recv_win->requests_.push_back(rreq);
268 recv_win->mut_->unlock();
270 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
271 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
273 *request = MPI_REQUEST_NULL;
279 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
280 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
283 Win* send_win = connected_wins_[target_rank];
285 CHECK_WIN_LOCKED(send_win)
286 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
288 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
289 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
291 if(target_rank != comm_->rank()){
292 //prepare send_request
293 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
294 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
296 //prepare receiver request
297 MPI_Request rreq = Request::rma_recv_init(
298 origin_addr, origin_count, origin_datatype, target_rank,
299 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
300 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
302 //start the send, with another process than us as sender.
304 //push request to receiver's win
305 send_win->mut_->lock();
306 send_win->requests_.push_back(sreq);
307 send_win->mut_->unlock();
312 if(request!=nullptr){
316 requests_.push_back(rreq);
320 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
322 *request=MPI_REQUEST_NULL;
327 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
328 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
330 XBT_DEBUG("Entering MPI_Win_Accumulate");
331 //get receiver pointer
332 Win* recv_win = connected_wins_[target_rank];
334 //FIXME: local version
335 CHECK_WIN_LOCKED(recv_win)
336 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
338 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
339 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
340 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
341 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
342 // prepare send_request
344 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
345 SMPI_RMA_TAG - 3 - count_, comm_, op);
347 // prepare receiver request
348 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
349 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)),
350 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
356 // push request to receiver's win
357 recv_win->mut_->lock();
358 recv_win->requests_.push_back(rreq);
360 recv_win->mut_->unlock();
362 if (request != nullptr) {
366 requests_.push_back(sreq);
370 XBT_DEBUG("Leaving MPI_Win_Accumulate");
374 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
375 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
376 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
379 const Win* send_win = connected_wins_[target_rank];
381 CHECK_WIN_LOCKED(send_win)
382 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
384 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
385 //need to be sure ops are correctly ordered, so finish request here ? slow.
387 send_win->atomic_mut_->lock();
388 get(result_addr, result_count, result_datatype, target_rank,
389 target_disp, target_count, target_datatype, &req);
390 if (req != MPI_REQUEST_NULL)
391 Request::wait(&req, MPI_STATUS_IGNORE);
393 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
394 target_disp, target_count, target_datatype, op, &req);
395 if (req != MPI_REQUEST_NULL)
396 Request::wait(&req, MPI_STATUS_IGNORE);
397 send_win->atomic_mut_->unlock();
401 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
402 int target_rank, MPI_Aint target_disp)
405 const Win* send_win = connected_wins_[target_rank];
407 CHECK_WIN_LOCKED(send_win)
409 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
410 MPI_Request req = MPI_REQUEST_NULL;
411 send_win->atomic_mut_->lock();
412 get(result_addr, 1, datatype, target_rank,
413 target_disp, 1, datatype, &req);
414 if (req != MPI_REQUEST_NULL)
415 Request::wait(&req, MPI_STATUS_IGNORE);
416 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
417 put(origin_addr, 1, datatype, target_rank,
418 target_disp, 1, datatype);
420 send_win->atomic_mut_->unlock();
424 int Win::start(MPI_Group group, int /*assert*/)
426 /* From MPI forum advices
427 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
428 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
429 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
430 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
431 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
432 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
433 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
434 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
435 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
436 must complete, without further dependencies. */
438 //naive, blocking implementation.
441 int size = group->size();
442 std::vector<MPI_Request> reqs(size);
444 XBT_DEBUG("Entering MPI_Win_Start");
446 int src = comm_->group()->rank(group->actor(j));
447 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
448 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
454 Request::startall(size, reqs.data());
455 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
456 for (i = 0; i < size; i++) {
457 Request::unref(&reqs[i]);
459 opened_++; //we're open for business !
462 XBT_DEBUG("Leaving MPI_Win_Start");
466 int Win::post(MPI_Group group, int /*assert*/)
468 //let's make a synchronous send here
471 int size = group->size();
472 std::vector<MPI_Request> reqs(size);
474 XBT_DEBUG("Entering MPI_Win_Post");
476 int dst = comm_->group()->rank(group->actor(j));
477 if (dst != rank_ && dst != MPI_UNDEFINED) {
478 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
485 Request::startall(size, reqs.data());
486 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
488 Request::unref(&reqs[i]);
490 opened_++; //we're open for business !
493 XBT_DEBUG("Leaving MPI_Win_Post");
498 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
500 XBT_DEBUG("Entering MPI_Win_Complete");
503 int size = group_->size();
504 std::vector<MPI_Request> reqs(size);
507 int dst = comm_->group()->rank(group_->actor(j));
508 if (dst != rank_ && dst != MPI_UNDEFINED) {
509 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
515 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
516 Request::startall(size, reqs.data());
517 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
520 Request::unref(&reqs[i]);
523 int finished = finish_comms();
524 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
526 Group::unref(group_);
527 opened_--; //we're closed for business !
532 //naive, blocking implementation.
533 XBT_DEBUG("Entering MPI_Win_Wait");
536 int size = group_->size();
537 std::vector<MPI_Request> reqs(size);
540 int src = comm_->group()->rank(group_->actor(j));
541 if (src != rank_ && src != MPI_UNDEFINED) {
542 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
548 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
549 Request::startall(size, reqs.data());
550 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
552 Request::unref(&reqs[i]);
554 int finished = finish_comms();
555 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
557 Group::unref(group_);
558 opened_--; //we're closed for business !
562 int Win::lock(int lock_type, int rank, int /*assert*/)
564 MPI_Win target_win = connected_wins_[rank];
566 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
567 target_win->lock_mut_->lock();
568 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
569 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
570 target_win->lock_mut_->unlock();
572 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
573 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
575 target_win->lockers_.push_back(comm_->rank());
577 int finished = finish_comms(rank);
578 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
579 finished = target_win->finish_comms(rank_);
580 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
584 int Win::lock_all(int assert){
585 int retval = MPI_SUCCESS;
586 for (int i = 0; i < comm_->size(); i++) {
587 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
588 if (ret != MPI_SUCCESS)
594 int Win::unlock(int rank){
595 MPI_Win target_win = connected_wins_[rank];
596 int target_mode = target_win->mode_;
597 target_win->mode_= 0;
598 target_win->lockers_.remove(comm_->rank());
599 if (target_mode==MPI_LOCK_EXCLUSIVE){
600 target_win->lock_mut_->unlock();
603 int finished = finish_comms(rank);
604 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
605 finished = target_win->finish_comms(rank_);
606 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
610 int Win::unlock_all(){
611 int retval = MPI_SUCCESS;
612 for (int i = 0; i < comm_->size(); i++) {
613 int ret = this->unlock(i);
614 if (ret != MPI_SUCCESS)
620 int Win::flush(int rank){
621 MPI_Win target_win = connected_wins_[rank];
622 int finished = finish_comms(rank_);
623 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
624 finished = target_win->finish_comms(rank);
625 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
629 int Win::flush_local(int rank){
630 int finished = finish_comms(rank);
631 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
635 int Win::flush_all(){
636 int finished = finish_comms();
637 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
638 for (int i = 0; i < comm_->size(); i++) {
639 finished = connected_wins_[i]->finish_comms(rank_);
640 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
645 int Win::flush_local_all(){
646 int finished = finish_comms();
647 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
651 Win* Win::f2c(int id){
652 return static_cast<Win*>(F2C::f2c(id));
655 int Win::finish_comms(){
657 //Finish own requests
658 int size = static_cast<int>(requests_.size());
660 MPI_Request* treqs = requests_.data();
661 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
668 int Win::finish_comms(int rank){
670 // Finish own requests
671 // Let's see if we're either the destination or the sender of this request
672 // because we only wait for requests that we are responsible for.
673 // Also use the process id here since the request itself returns from src()
674 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
675 int proc_id = comm_->group()->actor(rank);
676 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
677 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
679 std::vector<MPI_Request> myreqqs(it, end(requests_));
680 requests_.erase(it, end(requests_));
681 int size = static_cast<int>(myreqqs.size());
683 MPI_Request* treqs = myreqqs.data();
684 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
691 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
693 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
694 for (int i = 0; not target_win && i < comm_->size(); i++) {
695 if (connected_wins_[i]->size_ > 0)
696 target_win = connected_wins_[i];
699 *size = target_win->size_;
700 *disp_unit = target_win->disp_unit_;
701 *static_cast<void**>(baseptr) = target_win->base_;
704 *static_cast<void**>(baseptr) = nullptr;
709 MPI_Errhandler Win::errhandler()
711 if (errhandler_ != MPI_ERRHANDLER_NULL)
716 void Win::set_errhandler(MPI_Errhandler errhandler)
718 if (errhandler_ != MPI_ERRHANDLER_NULL)
719 simgrid::smpi::Errhandler::unref(errhandler_);
720 errhandler_ = errhandler;
721 if (errhandler_ != MPI_ERRHANDLER_NULL)
725 } // namespace simgrid