1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26 int comm_size = comm->size();
28 XBT_DEBUG("Creating window");
29 if(info!=MPI_INFO_NULL)
33 group_ = MPI_GROUP_NULL;
34 requests_ = new std::vector<MPI_Request>();
35 mut_ = s4u::Mutex::create();
36 lock_mut_ = s4u::Mutex::create();
37 atomic_mut_ = s4u::Mutex::create();
38 connected_wins_ = new MPI_Win[comm_size];
39 connected_wins_[rank_] = this;
42 bar_ = new s4u::Barrier(comm_size);
45 errhandler_=MPI_ERRORS_ARE_FATAL;
46 comm->add_rma_win(this);
49 colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
52 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
58 //As per the standard, perform a barrier to ensure every async comm is finished
61 int finished = finish_comms();
62 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
65 delete[] connected_wins_;
66 if (name_ != nullptr){
69 if (info_ != MPI_INFO_NULL)
70 simgrid::smpi::Info::unref(info_);
72 comm_->remove_rma_win(this);
74 colls::barrier(comm_);
86 int Win::attach(void* /*base*/, MPI_Aint size)
88 if (not(base_ == MPI_BOTTOM || base_ == 0))
90 base_=0;//actually the address will be given in the RMA calls, as being the disp.
95 int Win::detach(const void* /*base*/)
102 void Win::get_name(char* name, int* length){
108 *length = strlen(name_);
109 strncpy(name, name_, *length+1);
112 void Win::get_group(MPI_Group* group){
113 if(comm_ != MPI_COMM_NULL){
114 *group = comm_->group();
116 *group = MPI_GROUP_NULL;
122 if (info_ == MPI_INFO_NULL)
132 MPI_Aint Win::size(){
140 int Win::disp_unit(){
148 void Win::set_info(MPI_Info info)
150 if (info_ != MPI_INFO_NULL)
151 simgrid::smpi::Info::unref(info_);
153 if (info_ != MPI_INFO_NULL)
157 void Win::set_name(const char* name){
158 name_ = xbt_strdup(name);
161 int Win::fence(int assert)
163 XBT_DEBUG("Entering fence");
166 if (assert != MPI_MODE_NOPRECEDE) {
167 // This is not the first fence => finalize what came before
170 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
171 // Without this, the vector could get redimensioned when another process pushes.
172 // This would result in the array used by Request::waitall() to be invalidated.
173 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
174 std::vector<MPI_Request> *reqs = requests_;
175 int size = static_cast<int>(reqs->size());
176 // start all requests that have been prepared by another process
178 MPI_Request* treqs = &(*reqs)[0];
179 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
185 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
190 XBT_DEBUG("Leaving fence");
195 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
196 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
198 //get receiver pointer
199 MPI_Win recv_win = connected_wins_[target_rank];
201 if(opened_==0){//check that post/start has been done
202 // no fence or start .. lock ok ?
204 for (auto const& it : recv_win->lockers_)
205 if (it == comm_->rank())
211 if(target_count*target_datatype->get_extent()>recv_win->size_)
214 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
216 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
217 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
218 // prepare send_request
220 // TODO cheinrich Check for rank / pid conversion
221 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
224 //prepare receiver request
225 // TODO cheinrich Check for rank / pid conversion
226 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
227 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
232 if(request!=nullptr){
236 requests_->push_back(sreq);
240 //push request to receiver's win
241 recv_win->mut_->lock();
242 recv_win->requests_->push_back(rreq);
244 recv_win->mut_->unlock();
246 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
247 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
249 *request = MPI_REQUEST_NULL;
255 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
256 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
259 MPI_Win send_win = connected_wins_[target_rank];
261 if(opened_==0){//check that post/start has been done
262 // no fence or start .. lock ok ?
264 for (auto const& it : send_win->lockers_)
265 if (it == comm_->rank())
271 if(target_count*target_datatype->get_extent()>send_win->size_)
274 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
275 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
277 if(target_rank != comm_->rank()){
278 //prepare send_request
279 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
280 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
282 //prepare receiver request
283 MPI_Request rreq = Request::rma_recv_init(
284 origin_addr, origin_count, origin_datatype, target_rank,
285 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
286 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
288 //start the send, with another process than us as sender.
290 //push request to receiver's win
291 send_win->mut_->lock();
292 send_win->requests_->push_back(sreq);
293 send_win->mut_->unlock();
298 if(request!=nullptr){
302 requests_->push_back(rreq);
306 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
308 *request=MPI_REQUEST_NULL;
313 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
314 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
316 XBT_DEBUG("Entering MPI_Win_Accumulate");
317 //get receiver pointer
318 MPI_Win recv_win = connected_wins_[target_rank];
320 if(opened_==0){//check that post/start has been done
321 // no fence or start .. lock ok ?
323 for (auto const& it : recv_win->lockers_)
324 if (it == comm_->rank())
329 //FIXME: local version
331 if(target_count*target_datatype->get_extent()>recv_win->size_)
334 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
335 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
336 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
337 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
338 // prepare send_request
340 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
341 SMPI_RMA_TAG - 3 - count_, comm_, op);
343 // prepare receiver request
344 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
345 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
351 // push request to receiver's win
352 recv_win->mut_->lock();
353 recv_win->requests_->push_back(rreq);
355 recv_win->mut_->unlock();
357 if (request != nullptr) {
361 requests_->push_back(sreq);
365 XBT_DEBUG("Leaving MPI_Win_Accumulate");
369 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
370 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
371 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
374 MPI_Win send_win = connected_wins_[target_rank];
376 if(opened_==0){//check that post/start has been done
377 // no fence or start .. lock ok ?
379 for (auto const& it : send_win->lockers_)
380 if (it == comm_->rank())
386 if(target_count*target_datatype->get_extent()>send_win->size_)
389 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
390 //need to be sure ops are correctly ordered, so finish request here ? slow.
392 send_win->atomic_mut_->lock();
393 get(result_addr, result_count, result_datatype, target_rank,
394 target_disp, target_count, target_datatype, &req);
395 if (req != MPI_REQUEST_NULL)
396 Request::wait(&req, MPI_STATUS_IGNORE);
398 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
399 target_disp, target_count, target_datatype, op, &req);
400 if (req != MPI_REQUEST_NULL)
401 Request::wait(&req, MPI_STATUS_IGNORE);
402 send_win->atomic_mut_->unlock();
406 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
407 void *result_addr, MPI_Datatype datatype, int target_rank,
408 MPI_Aint target_disp){
410 MPI_Win send_win = connected_wins_[target_rank];
412 if(opened_==0){//check that post/start has been done
413 // no fence or start .. lock ok ?
415 for (auto const& it : send_win->lockers_)
416 if (it == comm_->rank())
422 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
423 MPI_Request req = MPI_REQUEST_NULL;
424 send_win->atomic_mut_->lock();
425 get(result_addr, 1, datatype, target_rank,
426 target_disp, 1, datatype, &req);
427 if (req != MPI_REQUEST_NULL)
428 Request::wait(&req, MPI_STATUS_IGNORE);
429 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
430 put(origin_addr, 1, datatype, target_rank,
431 target_disp, 1, datatype);
433 send_win->atomic_mut_->unlock();
437 int Win::start(MPI_Group group, int /*assert*/)
439 /* From MPI forum advices
440 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
441 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
442 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
443 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
444 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
445 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
446 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
447 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
448 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
449 must complete, without further dependencies. */
451 //naive, blocking implementation.
454 int size = group->size();
455 MPI_Request* reqs = xbt_new0(MPI_Request, size);
457 XBT_DEBUG("Entering MPI_Win_Start");
459 int src = comm_->group()->rank(group->actor(j));
460 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
461 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
467 Request::startall(size, reqs);
468 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
469 for (i = 0; i < size; i++) {
470 Request::unref(&reqs[i]);
473 opened_++; //we're open for business !
476 XBT_DEBUG("Leaving MPI_Win_Start");
480 int Win::post(MPI_Group group, int /*assert*/)
482 //let's make a synchronous send here
485 int size = group->size();
486 MPI_Request* reqs = xbt_new0(MPI_Request, size);
488 XBT_DEBUG("Entering MPI_Win_Post");
490 int dst = comm_->group()->rank(group->actor(j));
491 if (dst != rank_ && dst != MPI_UNDEFINED) {
492 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
499 Request::startall(size, reqs);
500 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
502 Request::unref(&reqs[i]);
505 opened_++; //we're open for business !
508 XBT_DEBUG("Leaving MPI_Win_Post");
514 xbt_die("Complete called on already opened MPI_Win");
516 XBT_DEBUG("Entering MPI_Win_Complete");
519 int size = group_->size();
520 MPI_Request* reqs = xbt_new0(MPI_Request, size);
523 int dst = comm_->group()->rank(group_->actor(j));
524 if (dst != rank_ && dst != MPI_UNDEFINED) {
525 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
531 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
532 Request::startall(size, reqs);
533 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
536 Request::unref(&reqs[i]);
540 int finished = finish_comms();
541 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
543 Group::unref(group_);
544 opened_--; //we're closed for business !
549 //naive, blocking implementation.
550 XBT_DEBUG("Entering MPI_Win_Wait");
553 int size = group_->size();
554 MPI_Request* reqs = xbt_new0(MPI_Request, size);
557 int src = comm_->group()->rank(group_->actor(j));
558 if (src != rank_ && src != MPI_UNDEFINED) {
559 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
565 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
566 Request::startall(size, reqs);
567 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
569 Request::unref(&reqs[i]);
572 int finished = finish_comms();
573 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
575 Group::unref(group_);
576 opened_--; //we're opened for business !
580 int Win::lock(int lock_type, int rank, int /*assert*/)
582 MPI_Win target_win = connected_wins_[rank];
584 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
585 target_win->lock_mut_->lock();
586 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
587 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
588 target_win->lock_mut_->unlock();
590 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
591 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
593 target_win->lockers_.push_back(comm_->rank());
595 int finished = finish_comms(rank);
596 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
597 finished = target_win->finish_comms(rank_);
598 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
602 int Win::lock_all(int assert){
604 int retval = MPI_SUCCESS;
605 for (i=0; i<comm_->size();i++){
606 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
607 if(ret != MPI_SUCCESS)
613 int Win::unlock(int rank){
614 MPI_Win target_win = connected_wins_[rank];
615 int target_mode = target_win->mode_;
616 target_win->mode_= 0;
617 target_win->lockers_.remove(comm_->rank());
618 if (target_mode==MPI_LOCK_EXCLUSIVE){
619 target_win->lock_mut_->unlock();
622 int finished = finish_comms(rank);
623 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
624 finished = target_win->finish_comms(rank_);
625 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
629 int Win::unlock_all(){
631 int retval = MPI_SUCCESS;
632 for (i=0; i<comm_->size();i++){
633 int ret = this->unlock(i);
634 if (ret != MPI_SUCCESS)
640 int Win::flush(int rank){
641 MPI_Win target_win = connected_wins_[rank];
642 int finished = finish_comms(rank_);
643 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
644 finished = target_win->finish_comms(rank);
645 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
649 int Win::flush_local(int rank){
650 int finished = finish_comms(rank);
651 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
655 int Win::flush_all(){
656 int finished = finish_comms();
657 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
658 for (int i = 0; i < comm_->size(); i++) {
659 finished = connected_wins_[i]->finish_comms(rank_);
660 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
665 int Win::flush_local_all(){
666 int finished = finish_comms();
667 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
671 Win* Win::f2c(int id){
672 return static_cast<Win*>(F2C::f2c(id));
675 int Win::finish_comms(){
677 //Finish own requests
678 std::vector<MPI_Request> *reqqs = requests_;
679 int size = static_cast<int>(reqqs->size());
681 MPI_Request* treqs = &(*reqqs)[0];
682 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
689 int Win::finish_comms(int rank){
691 //Finish own requests
692 std::vector<MPI_Request> *reqqs = requests_;
693 int size = static_cast<int>(reqqs->size());
696 std::vector<MPI_Request> myreqqs;
697 std::vector<MPI_Request>::iterator iter = reqqs->begin();
698 int proc_id = comm_->group()->actor(rank)->get_pid();
699 while (iter != reqqs->end()){
700 // Let's see if we're either the destination or the sender of this request
701 // because we only wait for requests that we are responsible for.
702 // Also use the process id here since the request itself returns from src()
703 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
704 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
705 myreqqs.push_back(*iter);
706 iter = reqqs->erase(iter);
713 MPI_Request* treqs = &myreqqs[0];
714 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
722 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
724 MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
725 for (int i = 0; not target_win && i < comm_->size(); i++) {
726 if (connected_wins_[i]->size_ > 0)
727 target_win = connected_wins_[i];
730 *size = target_win->size_;
731 *disp_unit = target_win->disp_unit_;
732 *static_cast<void**>(baseptr) = target_win->base_;
735 *static_cast<void**>(baseptr) = xbt_malloc(0);
740 MPI_Errhandler Win::errhandler()
745 void Win::set_errhandler(MPI_Errhandler errhandler)
747 errhandler_ = errhandler;
748 if (errhandler_ != MPI_ERRHANDLER_NULL)
752 } // namespace simgrid