1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 connected_wins_ = new MPI_Win[comm_size];
30 connected_wins_[rank_] = this;
33 bar_ = MSG_barrier_init(comm_size);
36 comm->add_rma_win(this);
38 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
41 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
47 //As per the standard, perform a barrier to ensure every async comm is finished
48 MSG_barrier_wait(bar_);
50 int finished = finish_comms();
51 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
54 delete[] connected_wins_;
55 if (name_ != nullptr){
58 if(info_!=MPI_INFO_NULL){
59 MPI_Info_free(&info_);
62 comm_->remove_rma_win(this);
64 Colls::barrier(comm_);
65 int rank=comm_->rank();
67 MSG_barrier_destroy(bar_);
68 xbt_mutex_destroy(mut_);
69 xbt_mutex_destroy(lock_mut_);
74 void Win::get_name(char* name, int* length){
80 *length = strlen(name_);
81 strncpy(name, name_, *length+1);
84 void Win::get_group(MPI_Group* group){
85 if(comm_ != MPI_COMM_NULL){
86 *group = comm_->group();
88 *group = MPI_GROUP_NULL;
104 int Win::disp_unit(){
109 void Win::set_name(char* name){
110 name_ = xbt_strdup(name);
113 int Win::fence(int assert)
115 XBT_DEBUG("Entering fence");
118 if (assert != MPI_MODE_NOPRECEDE) {
119 // This is not the first fence => finalize what came before
120 MSG_barrier_wait(bar_);
121 xbt_mutex_acquire(mut_);
122 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
123 // Without this, the vector could get redimensionned when another process pushes.
124 // This would result in the array used by Request::waitall() to be invalidated.
125 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
126 std::vector<MPI_Request> *reqs = requests_;
127 int size = static_cast<int>(reqs->size());
128 // start all requests that have been prepared by another process
130 for (const auto& req : *reqs) {
131 if (req && (req->flags() & PREPARED))
135 MPI_Request* treqs = &(*reqs)[0];
137 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
140 xbt_mutex_release(mut_);
144 MSG_barrier_wait(bar_);
145 XBT_DEBUG("Leaving fence");
150 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
151 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
153 //get receiver pointer
154 MPI_Win recv_win = connected_wins_[target_rank];
156 if(opened_==0){//check that post/start has been done
157 // no fence or start .. lock ok ?
159 for(auto it : recv_win->lockers_)
160 if (it == comm_->rank())
166 if(target_count*target_datatype->get_extent()>recv_win->size_)
169 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
170 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
172 if(target_rank != comm_->rank()){
173 //prepare send_request
174 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
175 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
177 //prepare receiver request
178 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
179 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
181 //push request to receiver's win
182 xbt_mutex_acquire(recv_win->mut_);
183 recv_win->requests_->push_back(rreq);
184 xbt_mutex_release(recv_win->mut_);
188 //push request to sender's win
189 xbt_mutex_acquire(mut_);
190 requests_->push_back(sreq);
191 xbt_mutex_release(mut_);
193 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
199 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
200 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
203 MPI_Win send_win = connected_wins_[target_rank];
205 if(opened_==0){//check that post/start has been done
206 // no fence or start .. lock ok ?
208 for(auto it : send_win->lockers_)
209 if (it == comm_->rank())
215 if(target_count*target_datatype->get_extent()>send_win->size_)
218 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
219 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
221 if(target_rank != comm_->rank()){
222 //prepare send_request
223 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
224 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
227 //prepare receiver request
228 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
229 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
232 //start the send, with another process than us as sender.
234 //push request to receiver's win
235 xbt_mutex_acquire(send_win->mut_);
236 send_win->requests_->push_back(sreq);
237 xbt_mutex_release(send_win->mut_);
241 //push request to sender's win
242 xbt_mutex_acquire(mut_);
243 requests_->push_back(rreq);
244 xbt_mutex_release(mut_);
246 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
253 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
254 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
257 //get receiver pointer
258 MPI_Win recv_win = connected_wins_[target_rank];
260 if(opened_==0){//check that post/start has been done
261 // no fence or start .. lock ok ?
263 for(auto it : recv_win->lockers_)
264 if (it == comm_->rank())
269 //FIXME: local version
271 if(target_count*target_datatype->get_extent()>recv_win->size_)
274 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
275 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
276 //As the tag will be used for ordering of the operations, add count to it
277 //prepare send_request
278 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
279 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
281 //prepare receiver request
282 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
283 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
286 //push request to receiver's win
287 xbt_mutex_acquire(recv_win->mut_);
288 recv_win->requests_->push_back(rreq);
289 xbt_mutex_release(recv_win->mut_);
293 //push request to sender's win
294 xbt_mutex_acquire(mut_);
295 requests_->push_back(sreq);
296 xbt_mutex_release(mut_);
301 int Win::start(MPI_Group group, int assert){
302 /* From MPI forum advices
303 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
304 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
305 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
306 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
307 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
308 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
309 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
310 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
311 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
312 must complete, without further dependencies. */
314 //naive, blocking implementation.
317 int size = group->size();
318 MPI_Request* reqs = xbt_new0(MPI_Request, size);
321 int src = group->index(j);
322 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
323 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
329 Request::startall(size, reqs);
330 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
332 Request::unref(&reqs[i]);
335 opened_++; //we're open for business !
341 int Win::post(MPI_Group group, int assert){
342 //let's make a synchronous send here
345 int size = group->size();
346 MPI_Request* reqs = xbt_new0(MPI_Request, size);
349 int dst=group->index(j);
350 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
351 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
358 Request::startall(size, reqs);
359 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
361 Request::unref(&reqs[i]);
364 opened_++; //we're open for business !
372 xbt_die("Complete called on already opened MPI_Win");
374 XBT_DEBUG("Entering MPI_Win_Complete");
377 int size = group_->size();
378 MPI_Request* reqs = xbt_new0(MPI_Request, size);
381 int dst=group_->index(j);
382 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
383 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
389 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
390 Request::startall(size, reqs);
391 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
394 Request::unref(&reqs[i]);
398 int finished = finish_comms();
399 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
401 Group::unref(group_);
402 opened_--; //we're closed for business !
407 //naive, blocking implementation.
408 XBT_DEBUG("Entering MPI_Win_Wait");
410 int size = group_->size();
411 MPI_Request* reqs = xbt_new0(MPI_Request, size);
414 int src=group_->index(j);
415 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
416 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
422 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
423 Request::startall(size, reqs);
424 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
426 Request::unref(&reqs[i]);
429 int finished = finish_comms();
430 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
432 Group::unref(group_);
433 opened_--; //we're opened for business !
437 int Win::lock(int lock_type, int rank, int assert){
438 MPI_Win target_win = connected_wins_[rank];
440 int finished = finish_comms();
441 XBT_DEBUG("Win_lock - Finished %d RMA calls", finished);
443 //window already locked, we have to wait
444 if (lock_type == MPI_LOCK_EXCLUSIVE)
445 xbt_mutex_acquire(target_win->lock_mut_);
447 xbt_mutex_acquire(target_win->mut_);
448 target_win->lockers_.push_back(comm_->rank());
449 xbt_mutex_release(target_win->mut_);
454 int Win::unlock(int rank){
455 MPI_Win target_win = connected_wins_[rank];
457 int finished = finish_comms();
458 XBT_DEBUG("Win_unlock - Finished %d RMA calls", finished);
460 xbt_mutex_acquire(target_win->mut_);
461 target_win->lockers_.remove(comm_->rank());
462 xbt_mutex_release(target_win->mut_);
464 xbt_mutex_try_acquire(target_win->lock_mut_);
465 xbt_mutex_release(target_win->lock_mut_);
469 Win* Win::f2c(int id){
470 return static_cast<Win*>(F2C::f2c(id));
474 int Win::finish_comms(){
475 //Finish own requests
476 std::vector<MPI_Request> *reqqs = requests_;
477 int size = static_cast<int>(reqqs->size());
479 xbt_mutex_acquire(mut_);
480 // start all requests that have been prepared by another process
481 for (const auto& req : *reqqs) {
482 if (req && (req->flags() & PREPARED))
486 MPI_Request* treqs = &(*reqqs)[0];
487 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
489 xbt_mutex_release(mut_);