1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28 , disp_unit_(disp_unit)
32 , allocated_(allocated)
35 XBT_DEBUG("Creating window");
36 if(info!=MPI_INFO_NULL)
38 int comm_size = comm->size();
41 group_ = MPI_GROUP_NULL;
42 requests_ = new std::vector<MPI_Request>();
43 mut_ = s4u::Mutex::create();
44 lock_mut_ = s4u::Mutex::create();
45 atomic_mut_ = s4u::Mutex::create();
46 connected_wins_ = new MPI_Win[comm_size];
47 connected_wins_[rank_] = this;
50 bar_ = new s4u::Barrier(comm_size);
53 errhandler_=MPI_ERRORS_ARE_FATAL;
55 comm->add_rma_win(this);
58 colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
61 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67 //As per the standard, perform a barrier to ensure every async comm is finished
70 int finished = finish_comms();
71 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
74 delete[] connected_wins_;
75 if (name_ != nullptr){
78 if (info_ != MPI_INFO_NULL)
79 simgrid::smpi::Info::unref(info_);
80 if (errhandler_ != MPI_ERRHANDLER_NULL)
81 simgrid::smpi::Errhandler::unref(errhandler_);
83 comm_->remove_rma_win(this);
85 colls::barrier(comm_);
97 int Win::attach(void* /*base*/, MPI_Aint size)
99 if (not(base_ == MPI_BOTTOM || base_ == 0))
101 base_=0;//actually the address will be given in the RMA calls, as being the disp.
106 int Win::detach(const void* /*base*/)
113 void Win::get_name(char* name, int* length){
119 *length = strlen(name_);
120 strncpy(name, name_, *length+1);
123 void Win::get_group(MPI_Group* group){
124 if(comm_ != MPI_COMM_NULL){
125 *group = comm_->group();
127 *group = MPI_GROUP_NULL;
133 if (info_ == MPI_INFO_NULL)
143 MPI_Aint Win::size(){
151 int Win::disp_unit(){
159 void Win::set_info(MPI_Info info)
161 if (info_ != MPI_INFO_NULL)
162 simgrid::smpi::Info::unref(info_);
164 if (info_ != MPI_INFO_NULL)
168 void Win::set_name(const char* name){
169 name_ = xbt_strdup(name);
172 int Win::fence(int assert)
174 XBT_DEBUG("Entering fence");
177 if (assert != MPI_MODE_NOPRECEDE) {
178 // This is not the first fence => finalize what came before
181 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
182 // Without this, the vector could get redimensioned when another process pushes.
183 // This would result in the array used by Request::waitall() to be invalidated.
184 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
185 std::vector<MPI_Request> *reqs = requests_;
186 int size = static_cast<int>(reqs->size());
187 // start all requests that have been prepared by another process
189 MPI_Request* treqs = &(*reqs)[0];
190 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
196 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
201 XBT_DEBUG("Leaving fence");
206 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
207 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
209 //get receiver pointer
210 MPI_Win recv_win = connected_wins_[target_rank];
212 if(opened_==0){//check that post/start has been done
213 // no fence or start .. lock ok ?
215 for (auto const& it : recv_win->lockers_)
216 if (it == comm_->rank())
222 if(target_count*target_datatype->get_extent()>recv_win->size_)
225 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
227 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
228 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
229 // prepare send_request
231 // TODO cheinrich Check for rank / pid conversion
232 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
235 //prepare receiver request
236 // TODO cheinrich Check for rank / pid conversion
237 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
238 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
243 if(request!=nullptr){
247 requests_->push_back(sreq);
251 //push request to receiver's win
252 recv_win->mut_->lock();
253 recv_win->requests_->push_back(rreq);
255 recv_win->mut_->unlock();
257 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
258 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
260 *request = MPI_REQUEST_NULL;
266 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
267 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
270 MPI_Win send_win = connected_wins_[target_rank];
272 if(opened_==0){//check that post/start has been done
273 // no fence or start .. lock ok ?
275 for (auto const& it : send_win->lockers_)
276 if (it == comm_->rank())
282 if(target_count*target_datatype->get_extent()>send_win->size_)
285 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
286 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
288 if(target_rank != comm_->rank()){
289 //prepare send_request
290 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
291 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
293 //prepare receiver request
294 MPI_Request rreq = Request::rma_recv_init(
295 origin_addr, origin_count, origin_datatype, target_rank,
296 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
297 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
299 //start the send, with another process than us as sender.
301 //push request to receiver's win
302 send_win->mut_->lock();
303 send_win->requests_->push_back(sreq);
304 send_win->mut_->unlock();
309 if(request!=nullptr){
313 requests_->push_back(rreq);
317 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
319 *request=MPI_REQUEST_NULL;
324 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
325 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
327 XBT_DEBUG("Entering MPI_Win_Accumulate");
328 //get receiver pointer
329 MPI_Win recv_win = connected_wins_[target_rank];
331 if(opened_==0){//check that post/start has been done
332 // no fence or start .. lock ok ?
334 for (auto const& it : recv_win->lockers_)
335 if (it == comm_->rank())
340 //FIXME: local version
342 if(target_count*target_datatype->get_extent()>recv_win->size_)
345 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
346 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
347 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
348 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
349 // prepare send_request
351 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
352 SMPI_RMA_TAG - 3 - count_, comm_, op);
354 // prepare receiver request
355 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
356 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
362 // push request to receiver's win
363 recv_win->mut_->lock();
364 recv_win->requests_->push_back(rreq);
366 recv_win->mut_->unlock();
368 if (request != nullptr) {
372 requests_->push_back(sreq);
376 XBT_DEBUG("Leaving MPI_Win_Accumulate");
380 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
381 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
382 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
385 MPI_Win send_win = connected_wins_[target_rank];
387 if(opened_==0){//check that post/start has been done
388 // no fence or start .. lock ok ?
390 for (auto const& it : send_win->lockers_)
391 if (it == comm_->rank())
397 if(target_count*target_datatype->get_extent()>send_win->size_)
400 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
401 //need to be sure ops are correctly ordered, so finish request here ? slow.
403 send_win->atomic_mut_->lock();
404 get(result_addr, result_count, result_datatype, target_rank,
405 target_disp, target_count, target_datatype, &req);
406 if (req != MPI_REQUEST_NULL)
407 Request::wait(&req, MPI_STATUS_IGNORE);
409 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
410 target_disp, target_count, target_datatype, op, &req);
411 if (req != MPI_REQUEST_NULL)
412 Request::wait(&req, MPI_STATUS_IGNORE);
413 send_win->atomic_mut_->unlock();
417 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
418 void *result_addr, MPI_Datatype datatype, int target_rank,
419 MPI_Aint target_disp){
421 MPI_Win send_win = connected_wins_[target_rank];
423 if(opened_==0){//check that post/start has been done
424 // no fence or start .. lock ok ?
426 for (auto const& it : send_win->lockers_)
427 if (it == comm_->rank())
433 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
434 MPI_Request req = MPI_REQUEST_NULL;
435 send_win->atomic_mut_->lock();
436 get(result_addr, 1, datatype, target_rank,
437 target_disp, 1, datatype, &req);
438 if (req != MPI_REQUEST_NULL)
439 Request::wait(&req, MPI_STATUS_IGNORE);
440 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
441 put(origin_addr, 1, datatype, target_rank,
442 target_disp, 1, datatype);
444 send_win->atomic_mut_->unlock();
448 int Win::start(MPI_Group group, int /*assert*/)
450 /* From MPI forum advices
451 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
452 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
453 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
454 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
455 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
456 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
457 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
458 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
459 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
460 must complete, without further dependencies. */
462 //naive, blocking implementation.
465 int size = group->size();
466 MPI_Request* reqs = xbt_new0(MPI_Request, size);
468 XBT_DEBUG("Entering MPI_Win_Start");
470 int src = comm_->group()->rank(group->actor(j));
471 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
472 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
478 Request::startall(size, reqs);
479 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
480 for (i = 0; i < size; i++) {
481 Request::unref(&reqs[i]);
484 opened_++; //we're open for business !
487 XBT_DEBUG("Leaving MPI_Win_Start");
491 int Win::post(MPI_Group group, int /*assert*/)
493 //let's make a synchronous send here
496 int size = group->size();
497 MPI_Request* reqs = xbt_new0(MPI_Request, size);
499 XBT_DEBUG("Entering MPI_Win_Post");
501 int dst = comm_->group()->rank(group->actor(j));
502 if (dst != rank_ && dst != MPI_UNDEFINED) {
503 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
510 Request::startall(size, reqs);
511 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
513 Request::unref(&reqs[i]);
516 opened_++; //we're open for business !
519 XBT_DEBUG("Leaving MPI_Win_Post");
525 xbt_die("Complete called on already opened MPI_Win");
527 XBT_DEBUG("Entering MPI_Win_Complete");
530 int size = group_->size();
531 MPI_Request* reqs = xbt_new0(MPI_Request, size);
534 int dst = comm_->group()->rank(group_->actor(j));
535 if (dst != rank_ && dst != MPI_UNDEFINED) {
536 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
542 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
543 Request::startall(size, reqs);
544 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
547 Request::unref(&reqs[i]);
551 int finished = finish_comms();
552 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
554 Group::unref(group_);
555 opened_--; //we're closed for business !
560 //naive, blocking implementation.
561 XBT_DEBUG("Entering MPI_Win_Wait");
564 int size = group_->size();
565 MPI_Request* reqs = xbt_new0(MPI_Request, size);
568 int src = comm_->group()->rank(group_->actor(j));
569 if (src != rank_ && src != MPI_UNDEFINED) {
570 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
576 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
577 Request::startall(size, reqs);
578 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
580 Request::unref(&reqs[i]);
583 int finished = finish_comms();
584 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
586 Group::unref(group_);
587 opened_--; //we're opened for business !
591 int Win::lock(int lock_type, int rank, int /*assert*/)
593 MPI_Win target_win = connected_wins_[rank];
595 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
596 target_win->lock_mut_->lock();
597 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
598 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
599 target_win->lock_mut_->unlock();
601 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
602 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
604 target_win->lockers_.push_back(comm_->rank());
606 int finished = finish_comms(rank);
607 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
608 finished = target_win->finish_comms(rank_);
609 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
613 int Win::lock_all(int assert){
615 int retval = MPI_SUCCESS;
616 for (i=0; i<comm_->size();i++){
617 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
618 if(ret != MPI_SUCCESS)
624 int Win::unlock(int rank){
625 MPI_Win target_win = connected_wins_[rank];
626 int target_mode = target_win->mode_;
627 target_win->mode_= 0;
628 target_win->lockers_.remove(comm_->rank());
629 if (target_mode==MPI_LOCK_EXCLUSIVE){
630 target_win->lock_mut_->unlock();
633 int finished = finish_comms(rank);
634 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
635 finished = target_win->finish_comms(rank_);
636 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
640 int Win::unlock_all(){
642 int retval = MPI_SUCCESS;
643 for (i=0; i<comm_->size();i++){
644 int ret = this->unlock(i);
645 if (ret != MPI_SUCCESS)
651 int Win::flush(int rank){
652 MPI_Win target_win = connected_wins_[rank];
653 int finished = finish_comms(rank_);
654 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
655 finished = target_win->finish_comms(rank);
656 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
660 int Win::flush_local(int rank){
661 int finished = finish_comms(rank);
662 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
666 int Win::flush_all(){
667 int finished = finish_comms();
668 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
669 for (int i = 0; i < comm_->size(); i++) {
670 finished = connected_wins_[i]->finish_comms(rank_);
671 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
676 int Win::flush_local_all(){
677 int finished = finish_comms();
678 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
682 Win* Win::f2c(int id){
683 return static_cast<Win*>(F2C::f2c(id));
686 int Win::finish_comms(){
688 //Finish own requests
689 std::vector<MPI_Request> *reqqs = requests_;
690 int size = static_cast<int>(reqqs->size());
692 MPI_Request* treqs = &(*reqqs)[0];
693 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
700 int Win::finish_comms(int rank){
702 //Finish own requests
703 std::vector<MPI_Request> *reqqs = requests_;
704 int size = static_cast<int>(reqqs->size());
707 std::vector<MPI_Request> myreqqs;
708 std::vector<MPI_Request>::iterator iter = reqqs->begin();
709 int proc_id = comm_->group()->actor(rank)->get_pid();
710 while (iter != reqqs->end()){
711 // Let's see if we're either the destination or the sender of this request
712 // because we only wait for requests that we are responsible for.
713 // Also use the process id here since the request itself returns from src()
714 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
715 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
716 myreqqs.push_back(*iter);
717 iter = reqqs->erase(iter);
724 MPI_Request* treqs = &myreqqs[0];
725 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
733 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
735 MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
736 for (int i = 0; not target_win && i < comm_->size(); i++) {
737 if (connected_wins_[i]->size_ > 0)
738 target_win = connected_wins_[i];
741 *size = target_win->size_;
742 *disp_unit = target_win->disp_unit_;
743 *static_cast<void**>(baseptr) = target_win->base_;
746 *static_cast<void**>(baseptr) = nullptr;
751 MPI_Errhandler Win::errhandler()
753 if (errhandler_ != MPI_ERRHANDLER_NULL)
758 void Win::set_errhandler(MPI_Errhandler errhandler)
760 if (errhandler_ != MPI_ERRHANDLER_NULL)
761 simgrid::smpi::Errhandler::unref(errhandler_);
762 errhandler_ = errhandler;
763 if (errhandler_ != MPI_ERRHANDLER_NULL)
767 } // namespace simgrid