1 /* Copyright (c) 2007-2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 connected_wins_ = new MPI_Win[comm_size];
30 connected_wins_[rank_] = this;
33 bar_ = MSG_barrier_init(comm_size);
37 comm->add_rma_win(this);
39 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
42 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
48 //As per the standard, perform a barrier to ensure every async comm is finished
49 MSG_barrier_wait(bar_);
51 int finished = finish_comms();
52 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
55 delete[] connected_wins_;
56 if (name_ != nullptr){
59 if(info_!=MPI_INFO_NULL){
60 MPI_Info_free(&info_);
63 comm_->remove_rma_win(this);
65 Colls::barrier(comm_);
66 int rank=comm_->rank();
68 MSG_barrier_destroy(bar_);
69 xbt_mutex_destroy(mut_);
70 xbt_mutex_destroy(lock_mut_);
78 int Win::attach (void *base, MPI_Aint size){
79 if (!(base_ == MPI_BOTTOM || base_ == 0))
81 base_=0;//actually the address will be given in the RMA calls, as being the disp.
86 int Win::detach (void *base){
92 void Win::get_name(char* name, int* length){
98 *length = strlen(name_);
99 strncpy(name, name_, *length+1);
102 void Win::get_group(MPI_Group* group){
103 if(comm_ != MPI_COMM_NULL){
104 *group = comm_->group();
106 *group = MPI_GROUP_NULL;
110 MPI_Info Win::info(){
111 if(info_== MPI_INFO_NULL)
121 MPI_Aint Win::size(){
129 int Win::disp_unit(){
137 void Win::set_info(MPI_Info info){
138 if(info_!= MPI_INFO_NULL)
143 void Win::set_name(char* name){
144 name_ = xbt_strdup(name);
147 int Win::fence(int assert)
149 XBT_DEBUG("Entering fence");
152 if (assert != MPI_MODE_NOPRECEDE) {
153 // This is not the first fence => finalize what came before
154 MSG_barrier_wait(bar_);
155 xbt_mutex_acquire(mut_);
156 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
157 // Without this, the vector could get redimensionned when another process pushes.
158 // This would result in the array used by Request::waitall() to be invalidated.
159 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
160 std::vector<MPI_Request> *reqs = requests_;
161 int size = static_cast<int>(reqs->size());
162 // start all requests that have been prepared by another process
164 for (const auto& req : *reqs) {
165 if (req && (req->flags() & PREPARED))
169 MPI_Request* treqs = &(*reqs)[0];
171 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
174 xbt_mutex_release(mut_);
177 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
181 MSG_barrier_wait(bar_);
182 XBT_DEBUG("Leaving fence");
187 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
188 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
190 //get receiver pointer
191 MPI_Win recv_win = connected_wins_[target_rank];
193 if(opened_==0){//check that post/start has been done
194 // no fence or start .. lock ok ?
196 for(auto it : recv_win->lockers_)
197 if (it == comm_->rank())
203 if(target_count*target_datatype->get_extent()>recv_win->size_)
206 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
207 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
209 if(target_rank != comm_->rank()){
210 //prepare send_request
211 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
212 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
214 //prepare receiver request
215 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
216 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
218 //push request to receiver's win
219 xbt_mutex_acquire(recv_win->mut_);
220 recv_win->requests_->push_back(rreq);
221 xbt_mutex_release(recv_win->mut_);
225 //push request to sender's win
226 xbt_mutex_acquire(mut_);
227 requests_->push_back(sreq);
228 xbt_mutex_release(mut_);
230 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
236 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
237 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
240 MPI_Win send_win = connected_wins_[target_rank];
242 if(opened_==0){//check that post/start has been done
243 // no fence or start .. lock ok ?
245 for(auto it : send_win->lockers_)
246 if (it == comm_->rank())
252 if(target_count*target_datatype->get_extent()>send_win->size_)
255 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
256 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
258 if(target_rank != comm_->rank()){
259 //prepare send_request
260 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
261 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
264 //prepare receiver request
265 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
266 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
269 //start the send, with another process than us as sender.
271 //push request to receiver's win
272 xbt_mutex_acquire(send_win->mut_);
273 send_win->requests_->push_back(sreq);
274 xbt_mutex_release(send_win->mut_);
278 //push request to sender's win
279 xbt_mutex_acquire(mut_);
280 requests_->push_back(rreq);
281 xbt_mutex_release(mut_);
283 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
290 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
291 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
294 //get receiver pointer
295 MPI_Win recv_win = connected_wins_[target_rank];
297 if(opened_==0){//check that post/start has been done
298 // no fence or start .. lock ok ?
300 for(auto it : recv_win->lockers_)
301 if (it == comm_->rank())
306 //FIXME: local version
308 if(target_count*target_datatype->get_extent()>recv_win->size_)
311 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
312 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
313 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
314 //prepare send_request
316 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
317 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
319 //prepare receiver request
320 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
321 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
324 //push request to receiver's win
325 xbt_mutex_acquire(recv_win->mut_);
326 recv_win->requests_->push_back(rreq);
327 xbt_mutex_release(recv_win->mut_);
331 //push request to sender's win
332 xbt_mutex_acquire(mut_);
333 requests_->push_back(sreq);
334 xbt_mutex_release(mut_);
339 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
340 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
341 MPI_Datatype target_datatype, MPI_Op op){
344 MPI_Win send_win = connected_wins_[target_rank];
346 if(opened_==0){//check that post/start has been done
347 // no fence or start .. lock ok ?
349 for(auto it : send_win->lockers_)
350 if (it == comm_->rank())
356 if(target_count*target_datatype->get_extent()>send_win->size_)
359 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
361 get(result_addr, result_count, result_datatype, target_rank,
362 target_disp, target_count, target_datatype);
363 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
364 target_disp, target_count, target_datatype, op);
370 int Win::start(MPI_Group group, int assert){
371 /* From MPI forum advices
372 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
373 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
374 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
375 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
376 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
377 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
378 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
379 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
380 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
381 must complete, without further dependencies. */
383 //naive, blocking implementation.
386 int size = group->size();
387 MPI_Request* reqs = xbt_new0(MPI_Request, size);
390 int src = group->index(j);
391 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
392 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
398 Request::startall(size, reqs);
399 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
401 Request::unref(&reqs[i]);
404 opened_++; //we're open for business !
410 int Win::post(MPI_Group group, int assert){
411 //let's make a synchronous send here
414 int size = group->size();
415 MPI_Request* reqs = xbt_new0(MPI_Request, size);
418 int dst=group->index(j);
419 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
420 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
427 Request::startall(size, reqs);
428 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
430 Request::unref(&reqs[i]);
433 opened_++; //we're open for business !
441 xbt_die("Complete called on already opened MPI_Win");
443 XBT_DEBUG("Entering MPI_Win_Complete");
446 int size = group_->size();
447 MPI_Request* reqs = xbt_new0(MPI_Request, size);
450 int dst=group_->index(j);
451 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
452 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
458 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
459 Request::startall(size, reqs);
460 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
463 Request::unref(&reqs[i]);
467 int finished = finish_comms();
468 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
470 Group::unref(group_);
471 opened_--; //we're closed for business !
476 //naive, blocking implementation.
477 XBT_DEBUG("Entering MPI_Win_Wait");
480 int size = group_->size();
481 MPI_Request* reqs = xbt_new0(MPI_Request, size);
484 int src=group_->index(j);
485 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
486 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
492 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
493 Request::startall(size, reqs);
494 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
496 Request::unref(&reqs[i]);
499 int finished = finish_comms();
500 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
502 Group::unref(group_);
503 opened_--; //we're opened for business !
507 int Win::lock(int lock_type, int rank, int assert){
511 MPI_Win target_win = connected_wins_[rank];
513 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
514 xbt_mutex_acquire(target_win->lock_mut_);
515 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
516 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
517 xbt_mutex_release(target_win->lock_mut_);
519 } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
520 target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
522 target_win->lockers_.push_back(comm_->rank());
524 int finished = finish_comms();
525 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
530 int Win::unlock(int rank){
534 MPI_Win target_win = connected_wins_[rank];
535 int target_mode = target_win->mode_;
536 target_win->mode_= 0;
537 target_win->lockers_.remove(comm_->rank());
538 if (target_mode==MPI_LOCK_EXCLUSIVE){
539 xbt_mutex_release(target_win->lock_mut_);
542 int finished = finish_comms();
543 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
548 Win* Win::f2c(int id){
549 return static_cast<Win*>(F2C::f2c(id));
553 int Win::finish_comms(){
554 xbt_mutex_acquire(mut_);
555 //Finish own requests
556 std::vector<MPI_Request> *reqqs = requests_;
557 int size = static_cast<int>(reqqs->size());
559 // start all requests that have been prepared by another process
560 for (const auto& req : *reqqs) {
561 if (req && (req->flags() & PREPARED))
565 MPI_Request* treqs = &(*reqqs)[0];
566 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
569 xbt_mutex_release(mut_);