1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
19 using simgrid::s4u::Actor;
23 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
24 int Win::keyval_id_=0;
26 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
27 int comm_size = comm->size();
29 XBT_DEBUG("Creating window");
30 if(info!=MPI_INFO_NULL)
34 group_ = MPI_GROUP_NULL;
35 requests_ = new std::vector<MPI_Request>();
36 mut_ = s4u::Mutex::create();
37 lock_mut_ = s4u::Mutex::create();
38 atomic_mut_ = s4u::Mutex::create();
39 connected_wins_ = new MPI_Win[comm_size];
40 connected_wins_[rank_] = this;
43 bar_ = new simgrid::s4u::Barrier(comm_size);
47 comm->add_rma_win(this);
50 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
53 Colls::bcast(&(bar_), sizeof(simgrid::s4u::Barrier*), MPI_BYTE, 0, comm);
59 //As per the standard, perform a barrier to ensure every async comm is finished
62 int finished = finish_comms();
63 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
66 delete[] connected_wins_;
67 if (name_ != nullptr){
70 if(info_!=MPI_INFO_NULL){
71 MPI_Info_free(&info_);
74 comm_->remove_rma_win(this);
76 Colls::barrier(comm_);
88 int Win::attach (void *base, MPI_Aint size){
89 if (not(base_ == MPI_BOTTOM || base_ == 0))
91 base_=0;//actually the address will be given in the RMA calls, as being the disp.
96 int Win::detach (void *base){
102 void Win::get_name(char* name, int* length){
108 *length = strlen(name_);
109 strncpy(name, name_, *length+1);
112 void Win::get_group(MPI_Group* group){
113 if(comm_ != MPI_COMM_NULL){
114 *group = comm_->group();
116 *group = MPI_GROUP_NULL;
120 MPI_Info Win::info(){
121 if(info_== MPI_INFO_NULL)
131 MPI_Aint Win::size(){
139 int Win::disp_unit(){
147 void Win::set_info(MPI_Info info){
148 if(info_!= MPI_INFO_NULL)
153 void Win::set_name(char* name){
154 name_ = xbt_strdup(name);
157 int Win::fence(int assert)
159 XBT_DEBUG("Entering fence");
162 if (assert != MPI_MODE_NOPRECEDE) {
163 // This is not the first fence => finalize what came before
166 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
167 // Without this, the vector could get redimensionned when another process pushes.
168 // This would result in the array used by Request::waitall() to be invalidated.
169 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
170 std::vector<MPI_Request> *reqs = requests_;
171 int size = static_cast<int>(reqs->size());
172 // start all requests that have been prepared by another process
174 MPI_Request* treqs = &(*reqs)[0];
175 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
181 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
186 XBT_DEBUG("Leaving fence");
191 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
192 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
194 //get receiver pointer
195 MPI_Win recv_win = connected_wins_[target_rank];
197 if(opened_==0){//check that post/start has been done
198 // no fence or start .. lock ok ?
200 for (auto const& it : recv_win->lockers_)
201 if (it == comm_->rank())
207 if(target_count*target_datatype->get_extent()>recv_win->size_)
210 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
212 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
213 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
214 // prepare send_request
216 // TODO cheinrich Check for rank / pid conversion
217 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
220 //prepare receiver request
221 // TODO cheinrich Check for rank / pid conversion
222 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
223 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
228 if(request!=nullptr){
232 requests_->push_back(sreq);
236 //push request to receiver's win
237 recv_win->mut_->lock();
238 recv_win->requests_->push_back(rreq);
240 recv_win->mut_->unlock();
243 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
244 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
246 *request = MPI_REQUEST_NULL;
252 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
253 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
256 MPI_Win send_win = connected_wins_[target_rank];
258 if(opened_==0){//check that post/start has been done
259 // no fence or start .. lock ok ?
261 for (auto const& it : send_win->lockers_)
262 if (it == comm_->rank())
268 if(target_count*target_datatype->get_extent()>send_win->size_)
271 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
272 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
274 if(target_rank != comm_->rank()){
275 //prepare send_request
276 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
277 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
279 //prepare receiver request
280 MPI_Request rreq = Request::rma_recv_init(
281 origin_addr, origin_count, origin_datatype, target_rank,
282 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
283 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
285 //start the send, with another process than us as sender.
287 //push request to receiver's win
288 send_win->mut_->lock();
289 send_win->requests_->push_back(sreq);
290 send_win->mut_->unlock();
295 if(request!=nullptr){
299 requests_->push_back(rreq);
304 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
306 *request=MPI_REQUEST_NULL;
313 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
314 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
316 XBT_DEBUG("Entering MPI_Win_Accumulate");
317 //get receiver pointer
318 MPI_Win recv_win = connected_wins_[target_rank];
320 if(opened_==0){//check that post/start has been done
321 // no fence or start .. lock ok ?
323 for (auto const& it : recv_win->lockers_)
324 if (it == comm_->rank())
329 //FIXME: local version
331 if(target_count*target_datatype->get_extent()>recv_win->size_)
334 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
335 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
336 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
337 //prepare send_request
339 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
340 SMPI_RMA_TAG - 3 - count_, comm_, op);
342 // prepare receiver request
343 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
344 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
350 // push request to receiver's win
351 recv_win->mut_->lock();
352 recv_win->requests_->push_back(rreq);
354 recv_win->mut_->unlock();
356 if (request != nullptr) {
360 requests_->push_back(sreq);
364 XBT_DEBUG("Leaving MPI_Win_Accumulate");
368 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
369 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
370 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
373 MPI_Win send_win = connected_wins_[target_rank];
375 if(opened_==0){//check that post/start has been done
376 // no fence or start .. lock ok ?
378 for (auto const& it : send_win->lockers_)
379 if (it == comm_->rank())
385 if(target_count*target_datatype->get_extent()>send_win->size_)
388 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
389 //need to be sure ops are correctly ordered, so finish request here ? slow.
391 send_win->atomic_mut_->lock();
392 get(result_addr, result_count, result_datatype, target_rank,
393 target_disp, target_count, target_datatype, &req);
394 if (req != MPI_REQUEST_NULL)
395 Request::wait(&req, MPI_STATUS_IGNORE);
397 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
398 target_disp, target_count, target_datatype, op, &req);
399 if (req != MPI_REQUEST_NULL)
400 Request::wait(&req, MPI_STATUS_IGNORE);
401 send_win->atomic_mut_->unlock();
406 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
407 void *result_addr, MPI_Datatype datatype, int target_rank,
408 MPI_Aint target_disp){
410 MPI_Win send_win = connected_wins_[target_rank];
412 if(opened_==0){//check that post/start has been done
413 // no fence or start .. lock ok ?
415 for (auto const& it : send_win->lockers_)
416 if (it == comm_->rank())
422 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
423 MPI_Request req = MPI_REQUEST_NULL;
424 send_win->atomic_mut_->lock();
425 get(result_addr, 1, datatype, target_rank,
426 target_disp, 1, datatype, &req);
427 if (req != MPI_REQUEST_NULL)
428 Request::wait(&req, MPI_STATUS_IGNORE);
429 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
430 put(origin_addr, 1, datatype, target_rank,
431 target_disp, 1, datatype);
433 send_win->atomic_mut_->unlock();
437 int Win::start(MPI_Group group, int assert){
438 /* From MPI forum advices
439 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
440 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
441 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
442 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
443 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
444 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
445 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
446 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
447 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
448 must complete, without further dependencies. */
450 //naive, blocking implementation.
453 int size = group->size();
454 MPI_Request* reqs = xbt_new0(MPI_Request, size);
456 XBT_DEBUG("Entering MPI_Win_Start");
458 int src = comm_->group()->rank(group->actor(j));
459 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
460 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
466 Request::startall(size, reqs);
467 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
468 for (i = 0; i < size; i++) {
469 Request::unref(&reqs[i]);
472 opened_++; //we're open for business !
475 XBT_DEBUG("Leaving MPI_Win_Start");
479 int Win::post(MPI_Group group, int assert){
480 //let's make a synchronous send here
483 int size = group->size();
484 MPI_Request* reqs = xbt_new0(MPI_Request, size);
486 XBT_DEBUG("Entering MPI_Win_Post");
488 int dst = comm_->group()->rank(group->actor(j));
489 if (dst != rank_ && dst != MPI_UNDEFINED) {
490 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
497 Request::startall(size, reqs);
498 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
500 Request::unref(&reqs[i]);
503 opened_++; //we're open for business !
506 XBT_DEBUG("Leaving MPI_Win_Post");
512 xbt_die("Complete called on already opened MPI_Win");
514 XBT_DEBUG("Entering MPI_Win_Complete");
517 int size = group_->size();
518 MPI_Request* reqs = xbt_new0(MPI_Request, size);
521 int dst = comm_->group()->rank(group_->actor(j));
522 if (dst != rank_ && dst != MPI_UNDEFINED) {
523 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
529 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
530 Request::startall(size, reqs);
531 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
534 Request::unref(&reqs[i]);
538 int finished = finish_comms();
539 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
541 Group::unref(group_);
542 opened_--; //we're closed for business !
547 //naive, blocking implementation.
548 XBT_DEBUG("Entering MPI_Win_Wait");
551 int size = group_->size();
552 MPI_Request* reqs = xbt_new0(MPI_Request, size);
555 int src = comm_->group()->rank(group_->actor(j));
556 if (src != rank_ && src != MPI_UNDEFINED) {
557 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
563 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
564 Request::startall(size, reqs);
565 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
567 Request::unref(&reqs[i]);
570 int finished = finish_comms();
571 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
573 Group::unref(group_);
574 opened_--; //we're opened for business !
578 int Win::lock(int lock_type, int rank, int assert){
579 MPI_Win target_win = connected_wins_[rank];
581 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
582 target_win->lock_mut_->lock();
583 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
584 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
585 target_win->lock_mut_->unlock();
587 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
588 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
590 target_win->lockers_.push_back(comm_->rank());
592 int finished = finish_comms(rank);
593 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
594 finished = target_win->finish_comms(rank_);
595 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
599 int Win::lock_all(int assert){
601 int retval = MPI_SUCCESS;
602 for (i=0; i<comm_->size();i++){
603 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
604 if(ret != MPI_SUCCESS)
610 int Win::unlock(int rank){
611 MPI_Win target_win = connected_wins_[rank];
612 int target_mode = target_win->mode_;
613 target_win->mode_= 0;
614 target_win->lockers_.remove(comm_->rank());
615 if (target_mode==MPI_LOCK_EXCLUSIVE){
616 target_win->lock_mut_->unlock();
619 int finished = finish_comms(rank);
620 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
621 finished = target_win->finish_comms(rank_);
622 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
626 int Win::unlock_all(){
628 int retval = MPI_SUCCESS;
629 for (i=0; i<comm_->size();i++){
630 int ret = this->unlock(i);
631 if (ret != MPI_SUCCESS)
637 int Win::flush(int rank){
638 MPI_Win target_win = connected_wins_[rank];
639 int finished = finish_comms(rank_);
640 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
641 finished = target_win->finish_comms(rank);
642 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
646 int Win::flush_local(int rank){
647 int finished = finish_comms(rank);
648 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
652 int Win::flush_all(){
653 int finished = finish_comms();
654 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
655 for (int i = 0; i < comm_->size(); i++) {
656 finished = connected_wins_[i]->finish_comms(rank_);
657 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
662 int Win::flush_local_all(){
663 int finished = finish_comms();
664 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
668 Win* Win::f2c(int id){
669 return static_cast<Win*>(F2C::f2c(id));
672 int Win::finish_comms(){
674 //Finish own requests
675 std::vector<MPI_Request> *reqqs = requests_;
676 int size = static_cast<int>(reqqs->size());
678 MPI_Request* treqs = &(*reqqs)[0];
679 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
686 int Win::finish_comms(int rank){
688 //Finish own requests
689 std::vector<MPI_Request> *reqqs = requests_;
690 int size = static_cast<int>(reqqs->size());
693 std::vector<MPI_Request> myreqqs;
694 std::vector<MPI_Request>::iterator iter = reqqs->begin();
695 int proc_id = comm_->group()->actor(rank)->get_pid();
696 while (iter != reqqs->end()){
697 // Let's see if we're either the destination or the sender of this request
698 // because we only wait for requests that we are responsible for.
699 // Also use the process id here since the request itself returns from src()
700 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
701 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
702 myreqqs.push_back(*iter);
703 iter = reqqs->erase(iter);
710 MPI_Request* treqs = &myreqqs[0];
711 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
719 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
721 MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
722 for (int i = 0; not target_win && i < comm_->size(); i++) {
723 if (connected_wins_[i]->size_ > 0)
724 target_win = connected_wins_[i];
727 *size = target_win->size_;
728 *disp_unit = target_win->disp_unit_;
729 *static_cast<void**>(baseptr) = target_win->base_;
732 *static_cast<void**>(baseptr) = xbt_malloc(0);