1 /* Copyright (c) 2007-2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 atomic_mut_=xbt_mutex_init();
30 connected_wins_ = new MPI_Win[comm_size];
31 connected_wins_[rank_] = this;
34 bar_ = MSG_barrier_init(comm_size);
38 comm->add_rma_win(this);
40 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
43 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
49 //As per the standard, perform a barrier to ensure every async comm is finished
50 MSG_barrier_wait(bar_);
52 int finished = finish_comms();
53 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
56 delete[] connected_wins_;
57 if (name_ != nullptr){
60 if(info_!=MPI_INFO_NULL){
61 MPI_Info_free(&info_);
64 comm_->remove_rma_win(this);
66 Colls::barrier(comm_);
67 int rank=comm_->rank();
69 MSG_barrier_destroy(bar_);
70 xbt_mutex_destroy(mut_);
71 xbt_mutex_destroy(lock_mut_);
72 xbt_mutex_destroy(atomic_mut_);
80 int Win::attach (void *base, MPI_Aint size){
81 if (!(base_ == MPI_BOTTOM || base_ == 0))
83 base_=0;//actually the address will be given in the RMA calls, as being the disp.
88 int Win::detach (void *base){
94 void Win::get_name(char* name, int* length){
100 *length = strlen(name_);
101 strncpy(name, name_, *length+1);
104 void Win::get_group(MPI_Group* group){
105 if(comm_ != MPI_COMM_NULL){
106 *group = comm_->group();
108 *group = MPI_GROUP_NULL;
112 MPI_Info Win::info(){
113 if(info_== MPI_INFO_NULL)
123 MPI_Aint Win::size(){
131 int Win::disp_unit(){
139 void Win::set_info(MPI_Info info){
140 if(info_!= MPI_INFO_NULL)
145 void Win::set_name(char* name){
146 name_ = xbt_strdup(name);
149 int Win::fence(int assert)
151 XBT_DEBUG("Entering fence");
154 if (assert != MPI_MODE_NOPRECEDE) {
155 // This is not the first fence => finalize what came before
156 MSG_barrier_wait(bar_);
157 xbt_mutex_acquire(mut_);
158 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
159 // Without this, the vector could get redimensionned when another process pushes.
160 // This would result in the array used by Request::waitall() to be invalidated.
161 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
162 std::vector<MPI_Request> *reqs = requests_;
163 int size = static_cast<int>(reqs->size());
164 // start all requests that have been prepared by another process
166 MPI_Request* treqs = &(*reqs)[0];
167 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
170 xbt_mutex_release(mut_);
173 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
177 MSG_barrier_wait(bar_);
178 XBT_DEBUG("Leaving fence");
183 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
184 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
186 //get receiver pointer
187 MPI_Win recv_win = connected_wins_[target_rank];
189 if(opened_==0){//check that post/start has been done
190 // no fence or start .. lock ok ?
192 for(auto it : recv_win->lockers_)
193 if (it == comm_->rank())
199 if(target_count*target_datatype->get_extent()>recv_win->size_)
202 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
203 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
205 if(target_rank != comm_->rank()){
206 //prepare send_request
207 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
208 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
210 //prepare receiver request
211 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
212 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
216 //push request to receiver's win
217 xbt_mutex_acquire(recv_win->mut_);
218 recv_win->requests_->push_back(rreq);
220 xbt_mutex_release(recv_win->mut_);
221 //push request to sender's win
222 xbt_mutex_acquire(mut_);
223 requests_->push_back(sreq);
224 xbt_mutex_release(mut_);
226 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
232 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
233 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
236 MPI_Win send_win = connected_wins_[target_rank];
238 if(opened_==0){//check that post/start has been done
239 // no fence or start .. lock ok ?
241 for(auto it : send_win->lockers_)
242 if (it == comm_->rank())
248 if(target_count*target_datatype->get_extent()>send_win->size_)
251 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
252 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
254 if(target_rank != comm_->rank()){
255 //prepare send_request
256 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
257 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
260 //prepare receiver request
261 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
262 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
265 //start the send, with another process than us as sender.
267 //push request to receiver's win
268 xbt_mutex_acquire(send_win->mut_);
269 send_win->requests_->push_back(sreq);
270 xbt_mutex_release(send_win->mut_);
274 //push request to sender's win
275 xbt_mutex_acquire(mut_);
276 requests_->push_back(rreq);
277 xbt_mutex_release(mut_);
279 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
286 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
287 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
290 //get receiver pointer
291 MPI_Win recv_win = connected_wins_[target_rank];
293 if(opened_==0){//check that post/start has been done
294 // no fence or start .. lock ok ?
296 for(auto it : recv_win->lockers_)
297 if (it == comm_->rank())
302 //FIXME: local version
304 if(target_count*target_datatype->get_extent()>recv_win->size_)
307 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
308 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
309 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
310 //prepare send_request
312 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
313 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
315 //prepare receiver request
316 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
317 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
323 //push request to receiver's win
324 xbt_mutex_acquire(recv_win->mut_);
325 recv_win->requests_->push_back(rreq);
327 xbt_mutex_release(recv_win->mut_);
328 //push request to sender's win
329 xbt_mutex_acquire(mut_);
330 requests_->push_back(sreq);
331 xbt_mutex_release(mut_);
336 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
337 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
338 MPI_Datatype target_datatype, MPI_Op op){
341 MPI_Win send_win = connected_wins_[target_rank];
343 if(opened_==0){//check that post/start has been done
344 // no fence or start .. lock ok ?
346 for(auto it : send_win->lockers_)
347 if (it == comm_->rank())
353 if(target_count*target_datatype->get_extent()>send_win->size_)
356 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
357 //need to be sure ops are correctly ordered, so finish request here ? slow.
359 xbt_mutex_acquire(send_win->atomic_mut_);
360 get(result_addr, result_count, result_datatype, target_rank,
361 target_disp, target_count, target_datatype, &req);
362 if (req != MPI_REQUEST_NULL)
363 Request::wait(&req, MPI_STATUS_IGNORE);
365 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
366 target_disp, target_count, target_datatype, op, &req);
367 if (req != MPI_REQUEST_NULL)
368 Request::wait(&req, MPI_STATUS_IGNORE);
369 xbt_mutex_release(send_win->atomic_mut_);
374 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
375 void *result_addr, MPI_Datatype datatype, int target_rank,
376 MPI_Aint target_disp){
378 MPI_Win send_win = connected_wins_[target_rank];
380 if(opened_==0){//check that post/start has been done
381 // no fence or start .. lock ok ?
383 for(auto it : send_win->lockers_)
384 if (it == comm_->rank())
390 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
392 xbt_mutex_acquire(send_win->atomic_mut_);
393 get(result_addr, 1, datatype, target_rank,
394 target_disp, 1, datatype, &req);
395 if (req != MPI_REQUEST_NULL)
396 Request::wait(&req, MPI_STATUS_IGNORE);
397 if(! memcmp (result_addr, compare_addr, datatype->get_extent() )){
398 put(origin_addr, 1, datatype, target_rank,
399 target_disp, 1, datatype);
401 xbt_mutex_release(send_win->atomic_mut_);
405 int Win::start(MPI_Group group, int assert){
406 /* From MPI forum advices
407 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
408 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
409 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
410 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
411 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
412 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
413 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
414 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
415 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
416 must complete, without further dependencies. */
418 //naive, blocking implementation.
421 int size = group->size();
422 MPI_Request* reqs = xbt_new0(MPI_Request, size);
425 int src = group->index(j);
426 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
427 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
433 Request::startall(size, reqs);
434 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
436 Request::unref(&reqs[i]);
439 opened_++; //we're open for business !
445 int Win::post(MPI_Group group, int assert){
446 //let's make a synchronous send here
449 int size = group->size();
450 MPI_Request* reqs = xbt_new0(MPI_Request, size);
453 int dst=group->index(j);
454 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
455 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
462 Request::startall(size, reqs);
463 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
465 Request::unref(&reqs[i]);
468 opened_++; //we're open for business !
476 xbt_die("Complete called on already opened MPI_Win");
478 XBT_DEBUG("Entering MPI_Win_Complete");
481 int size = group_->size();
482 MPI_Request* reqs = xbt_new0(MPI_Request, size);
485 int dst=group_->index(j);
486 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
487 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
493 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
494 Request::startall(size, reqs);
495 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
498 Request::unref(&reqs[i]);
502 int finished = finish_comms();
503 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
505 Group::unref(group_);
506 opened_--; //we're closed for business !
511 //naive, blocking implementation.
512 XBT_DEBUG("Entering MPI_Win_Wait");
515 int size = group_->size();
516 MPI_Request* reqs = xbt_new0(MPI_Request, size);
519 int src=group_->index(j);
520 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
521 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
527 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
528 Request::startall(size, reqs);
529 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
531 Request::unref(&reqs[i]);
534 int finished = finish_comms();
535 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
537 Group::unref(group_);
538 opened_--; //we're opened for business !
542 int Win::lock(int lock_type, int rank, int assert){
546 MPI_Win target_win = connected_wins_[rank];
548 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
549 xbt_mutex_acquire(target_win->lock_mut_);
550 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
551 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
552 xbt_mutex_release(target_win->lock_mut_);
554 } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
555 target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
557 target_win->lockers_.push_back(comm_->rank());
559 int finished = finish_comms();
560 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
565 int Win::lock_all(int assert){
567 int retval = MPI_SUCCESS;
568 for (i=0; i<comm_->size();i++){
569 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
570 if(ret != MPI_SUCCESS)
576 int Win::unlock(int rank){
580 MPI_Win target_win = connected_wins_[rank];
581 int target_mode = target_win->mode_;
582 target_win->mode_= 0;
583 target_win->lockers_.remove(comm_->rank());
584 if (target_mode==MPI_LOCK_EXCLUSIVE){
585 xbt_mutex_release(target_win->lock_mut_);
588 int finished = finish_comms();
589 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
594 int Win::unlock_all(){
596 int retval = MPI_SUCCESS;
597 for (i=0; i<comm_->size();i++){
598 int ret = this->unlock(i);
599 if(ret != MPI_SUCCESS)
605 int Win::flush(int rank){
606 MPI_Win target_win = connected_wins_[rank];
607 int finished = finish_comms(rank);
608 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
609 finished = target_win->finish_comms(rank_);
610 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
614 int Win::flush_local(int rank){
615 int finished = finish_comms(rank);
616 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
620 int Win::flush_all(){
623 finished = finish_comms();
624 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
625 for (i=0; i<comm_->size();i++){
626 finished = connected_wins_[i]->finish_comms(rank_);
627 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
632 int Win::flush_local_all(){
633 int finished = finish_comms();
634 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
638 Win* Win::f2c(int id){
639 return static_cast<Win*>(F2C::f2c(id));
643 int Win::finish_comms(){
644 xbt_mutex_acquire(mut_);
645 //Finish own requests
646 std::vector<MPI_Request> *reqqs = requests_;
647 int size = static_cast<int>(reqqs->size());
649 MPI_Request* treqs = &(*reqqs)[0];
650 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
653 xbt_mutex_release(mut_);
657 int Win::finish_comms(int rank){
658 xbt_mutex_acquire(mut_);
659 //Finish own requests
660 std::vector<MPI_Request> *reqqs = requests_;
661 int size = static_cast<int>(reqqs->size());
664 std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
665 std::vector<MPI_Request>::iterator iter = reqqs->begin();
666 while (iter != reqqs->end()){
667 if(((*iter)->src() == rank) || ((*iter)->dst() == rank)){
668 myreqqs->push_back(*iter);
669 iter = reqqs->erase(iter);
676 MPI_Request* treqs = &(*myreqqs)[0];
677 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
682 xbt_mutex_release(mut_);