1 /* Copyright (c) 2007-2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 connected_wins_ = new MPI_Win[comm_size];
30 connected_wins_[rank_] = this;
33 bar_ = MSG_barrier_init(comm_size);
37 comm->add_rma_win(this);
39 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
42 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
48 //As per the standard, perform a barrier to ensure every async comm is finished
49 MSG_barrier_wait(bar_);
51 int finished = finish_comms();
52 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
55 delete[] connected_wins_;
56 if (name_ != nullptr){
59 if(info_!=MPI_INFO_NULL){
60 MPI_Info_free(&info_);
63 comm_->remove_rma_win(this);
65 Colls::barrier(comm_);
66 int rank=comm_->rank();
68 MSG_barrier_destroy(bar_);
69 xbt_mutex_destroy(mut_);
70 xbt_mutex_destroy(lock_mut_);
75 void Win::get_name(char* name, int* length){
81 *length = strlen(name_);
82 strncpy(name, name_, *length+1);
85 void Win::get_group(MPI_Group* group){
86 if(comm_ != MPI_COMM_NULL){
87 *group = comm_->group();
89 *group = MPI_GROUP_NULL;
105 int Win::disp_unit(){
110 void Win::set_name(char* name){
111 name_ = xbt_strdup(name);
114 int Win::fence(int assert)
116 XBT_DEBUG("Entering fence");
119 if (assert != MPI_MODE_NOPRECEDE) {
120 // This is not the first fence => finalize what came before
121 MSG_barrier_wait(bar_);
122 xbt_mutex_acquire(mut_);
123 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
124 // Without this, the vector could get redimensionned when another process pushes.
125 // This would result in the array used by Request::waitall() to be invalidated.
126 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
127 std::vector<MPI_Request> *reqs = requests_;
128 int size = static_cast<int>(reqs->size());
129 // start all requests that have been prepared by another process
131 for (const auto& req : *reqs) {
132 if (req && (req->flags() & PREPARED))
136 MPI_Request* treqs = &(*reqs)[0];
138 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
141 xbt_mutex_release(mut_);
145 MSG_barrier_wait(bar_);
146 XBT_DEBUG("Leaving fence");
151 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
152 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
154 //get receiver pointer
155 MPI_Win recv_win = connected_wins_[target_rank];
157 if(opened_==0){//check that post/start has been done
158 // no fence or start .. lock ok ?
160 for(auto it : recv_win->lockers_)
161 if (it == comm_->rank())
167 if(target_count*target_datatype->get_extent()>recv_win->size_)
170 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
171 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
173 if(target_rank != comm_->rank()){
174 //prepare send_request
175 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
176 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
178 //prepare receiver request
179 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
180 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
182 //push request to receiver's win
183 xbt_mutex_acquire(recv_win->mut_);
184 recv_win->requests_->push_back(rreq);
185 xbt_mutex_release(recv_win->mut_);
189 //push request to sender's win
190 xbt_mutex_acquire(mut_);
191 requests_->push_back(sreq);
192 xbt_mutex_release(mut_);
194 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
200 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
201 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
204 MPI_Win send_win = connected_wins_[target_rank];
206 if(opened_==0){//check that post/start has been done
207 // no fence or start .. lock ok ?
209 for(auto it : send_win->lockers_)
210 if (it == comm_->rank())
216 if(target_count*target_datatype->get_extent()>send_win->size_)
219 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
220 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
222 if(target_rank != comm_->rank()){
223 //prepare send_request
224 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
225 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
228 //prepare receiver request
229 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
230 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
233 //start the send, with another process than us as sender.
235 //push request to receiver's win
236 xbt_mutex_acquire(send_win->mut_);
237 send_win->requests_->push_back(sreq);
238 xbt_mutex_release(send_win->mut_);
242 //push request to sender's win
243 xbt_mutex_acquire(mut_);
244 requests_->push_back(rreq);
245 xbt_mutex_release(mut_);
247 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
254 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
255 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
258 //get receiver pointer
259 MPI_Win recv_win = connected_wins_[target_rank];
261 if(opened_==0){//check that post/start has been done
262 // no fence or start .. lock ok ?
264 for(auto it : recv_win->lockers_)
265 if (it == comm_->rank())
270 //FIXME: local version
272 if(target_count*target_datatype->get_extent()>recv_win->size_)
275 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
276 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
277 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
278 //prepare send_request
280 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
281 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
283 //prepare receiver request
284 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
285 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
288 //push request to receiver's win
289 xbt_mutex_acquire(recv_win->mut_);
290 recv_win->requests_->push_back(rreq);
291 xbt_mutex_release(recv_win->mut_);
295 //push request to sender's win
296 xbt_mutex_acquire(mut_);
297 requests_->push_back(sreq);
298 xbt_mutex_release(mut_);
303 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
304 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
305 MPI_Datatype target_datatype, MPI_Op op){
308 MPI_Win send_win = connected_wins_[target_rank];
310 if(opened_==0){//check that post/start has been done
311 // no fence or start .. lock ok ?
313 for(auto it : send_win->lockers_)
314 if (it == comm_->rank())
320 if(target_count*target_datatype->get_extent()>send_win->size_)
323 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
325 get(result_addr, result_count, result_datatype, target_rank,
326 target_disp, target_count, target_datatype);
327 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
328 target_disp, target_count, target_datatype, op);
334 int Win::start(MPI_Group group, int assert){
335 /* From MPI forum advices
336 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
337 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
338 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
339 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
340 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
341 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
342 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
343 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
344 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
345 must complete, without further dependencies. */
347 //naive, blocking implementation.
350 int size = group->size();
351 MPI_Request* reqs = xbt_new0(MPI_Request, size);
354 int src = group->index(j);
355 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
356 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
362 Request::startall(size, reqs);
363 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
365 Request::unref(&reqs[i]);
368 opened_++; //we're open for business !
374 int Win::post(MPI_Group group, int assert){
375 //let's make a synchronous send here
378 int size = group->size();
379 MPI_Request* reqs = xbt_new0(MPI_Request, size);
382 int dst=group->index(j);
383 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
384 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
391 Request::startall(size, reqs);
392 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
394 Request::unref(&reqs[i]);
397 opened_++; //we're open for business !
405 xbt_die("Complete called on already opened MPI_Win");
407 XBT_DEBUG("Entering MPI_Win_Complete");
410 int size = group_->size();
411 MPI_Request* reqs = xbt_new0(MPI_Request, size);
414 int dst=group_->index(j);
415 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
416 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
422 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
423 Request::startall(size, reqs);
424 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
427 Request::unref(&reqs[i]);
431 int finished = finish_comms();
432 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
434 Group::unref(group_);
435 opened_--; //we're closed for business !
440 //naive, blocking implementation.
441 XBT_DEBUG("Entering MPI_Win_Wait");
444 int size = group_->size();
445 MPI_Request* reqs = xbt_new0(MPI_Request, size);
448 int src=group_->index(j);
449 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
450 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
456 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
457 Request::startall(size, reqs);
458 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
460 Request::unref(&reqs[i]);
463 int finished = finish_comms();
464 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
466 Group::unref(group_);
467 opened_--; //we're opened for business !
471 int Win::lock(int lock_type, int rank, int assert){
475 MPI_Win target_win = connected_wins_[rank];
477 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
478 xbt_mutex_acquire(target_win->lock_mut_);
479 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
480 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
481 xbt_mutex_release(target_win->lock_mut_);
483 } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
484 target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
486 target_win->lockers_.push_back(comm_->rank());
488 int finished = finish_comms();
489 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
494 int Win::unlock(int rank){
498 MPI_Win target_win = connected_wins_[rank];
499 int target_mode = target_win->mode_;
500 target_win->mode_= 0;
501 target_win->lockers_.remove(comm_->rank());
502 if (target_mode==MPI_LOCK_EXCLUSIVE){
503 xbt_mutex_release(target_win->lock_mut_);
506 int finished = finish_comms();
507 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
512 Win* Win::f2c(int id){
513 return static_cast<Win*>(F2C::f2c(id));
517 int Win::finish_comms(){
518 xbt_mutex_acquire(mut_);
519 //Finish own requests
520 std::vector<MPI_Request> *reqqs = requests_;
521 int size = static_cast<int>(reqqs->size());
523 // start all requests that have been prepared by another process
524 for (const auto& req : *reqqs) {
525 if (req && (req->flags() & PREPARED))
529 MPI_Request* treqs = &(*reqqs)[0];
530 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
533 xbt_mutex_release(mut_);