1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18 int comm_size = comm->size();
19 int rank = comm->rank();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 connected_wins_ = new MPI_Win[comm_size];
29 connected_wins_[rank] = this;
32 bar_ = MSG_barrier_init(comm_size);
34 Colls::allgather(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
37 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43 //As per the standard, perform a barrier to ensure every async comm is finished
44 MSG_barrier_wait(bar_);
45 xbt_mutex_acquire(mut_);
47 xbt_mutex_release(mut_);
48 delete[] connected_wins_;
49 if (name_ != nullptr){
52 if(info_!=MPI_INFO_NULL){
53 MPI_Info_free(&info_);
56 Colls::barrier(comm_);
57 int rank=comm_->rank();
59 MSG_barrier_destroy(bar_);
60 xbt_mutex_destroy(mut_);
62 if(!attributes_.empty()){
64 for(auto it = attributes_.begin(); it != attributes_.end(); it++){
66 smpi_key_elem elem = keyvals_.at((*it).first);
67 if (elem != nullptr && elem->delete_fn.win_delete_fn != nullptr)
68 elem->delete_fn.win_delete_fn(this, (*it).first, (*it).second, &flag);
69 }catch(const std::out_of_range& oor) {
70 //already deleted, not a problem;
77 void Win::get_name(char* name, int* length){
83 *length = strlen(name_);
84 strncpy(name, name_, *length+1);
87 void Win::get_group(MPI_Group* group){
88 if(comm_ != MPI_COMM_NULL){
89 *group = comm_->group();
91 *group = MPI_GROUP_NULL;
103 int Win::disp_unit(){
108 void Win::set_name(char* name){
109 name_ = xbt_strdup(name);
112 int Win::fence(int assert)
114 XBT_DEBUG("Entering fence");
117 if (assert != MPI_MODE_NOPRECEDE) {
118 // This is not the first fence => finalize what came before
119 MSG_barrier_wait(bar_);
120 xbt_mutex_acquire(mut_);
121 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
122 // Without this, the vector could get redimensionned when another process pushes.
123 // This would result in the array used by Request::waitall() to be invalidated.
124 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
125 std::vector<MPI_Request> *reqs = requests_;
126 int size = static_cast<int>(reqs->size());
127 // start all requests that have been prepared by another process
129 for (const auto& req : *reqs) {
130 if (req && (req->flags() & PREPARED))
134 MPI_Request* treqs = &(*reqs)[0];
136 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
139 xbt_mutex_release(mut_);
143 MSG_barrier_wait(bar_);
144 XBT_DEBUG("Leaving fence");
149 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
150 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
152 if(opened_==0)//check that post/start has been done
154 //get receiver pointer
155 MPI_Win recv_win = connected_wins_[target_rank];
157 if(target_count*target_datatype->get_extent()>recv_win->size_)
160 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
161 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
163 if(target_rank != comm_->rank()){
164 //prepare send_request
165 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
166 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
168 //prepare receiver request
169 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
170 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
172 //push request to receiver's win
173 xbt_mutex_acquire(recv_win->mut_);
174 recv_win->requests_->push_back(rreq);
175 xbt_mutex_release(recv_win->mut_);
179 //push request to sender's win
180 xbt_mutex_acquire(mut_);
181 requests_->push_back(sreq);
182 xbt_mutex_release(mut_);
184 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
190 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
191 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
193 if(opened_==0)//check that post/start has been done
196 MPI_Win send_win = connected_wins_[target_rank];
198 if(target_count*target_datatype->get_extent()>send_win->size_)
201 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
202 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
204 if(target_rank != comm_->rank()){
205 //prepare send_request
206 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
207 comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm_,
210 //prepare receiver request
211 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
212 comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, comm_,
215 //start the send, with another process than us as sender.
217 //push request to receiver's win
218 xbt_mutex_acquire(send_win->mut_);
219 send_win->requests_->push_back(sreq);
220 xbt_mutex_release(send_win->mut_);
224 //push request to sender's win
225 xbt_mutex_acquire(mut_);
226 requests_->push_back(rreq);
227 xbt_mutex_release(mut_);
229 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
236 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
237 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
239 if(opened_==0)//check that post/start has been done
241 //FIXME: local version
242 //get receiver pointer
243 MPI_Win recv_win = connected_wins_[target_rank];
245 if(target_count*target_datatype->get_extent()>recv_win->size_)
248 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
249 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
250 //As the tag will be used for ordering of the operations, add count to it
251 //prepare send_request
252 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
253 smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
255 //prepare receiver request
256 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
257 smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
260 //push request to receiver's win
261 xbt_mutex_acquire(recv_win->mut_);
262 recv_win->requests_->push_back(rreq);
263 xbt_mutex_release(recv_win->mut_);
267 //push request to sender's win
268 xbt_mutex_acquire(mut_);
269 requests_->push_back(sreq);
270 xbt_mutex_release(mut_);
275 int Win::start(MPI_Group group, int assert){
276 /* From MPI forum advices
277 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
278 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
279 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
280 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
281 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
282 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
283 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
284 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
285 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
286 must complete, without further dependencies. */
288 //naive, blocking implementation.
291 int size = group->size();
292 MPI_Request* reqs = xbt_new0(MPI_Request, size);
295 int src = group->index(j);
296 if (src != smpi_process_index() && src != MPI_UNDEFINED) {
297 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
303 Request::startall(size, reqs);
304 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
306 Request::unref(&reqs[i]);
309 opened_++; //we're open for business !
315 int Win::post(MPI_Group group, int assert){
316 //let's make a synchronous send here
319 int size = group->size();
320 MPI_Request* reqs = xbt_new0(MPI_Request, size);
323 int dst=group->index(j);
324 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
325 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
332 Request::startall(size, reqs);
333 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
335 Request::unref(&reqs[i]);
338 opened_++; //we're open for business !
346 xbt_die("Complete called on already opened MPI_Win");
348 XBT_DEBUG("Entering MPI_Win_Complete");
351 int size = group_->size();
352 MPI_Request* reqs = xbt_new0(MPI_Request, size);
355 int dst=group_->index(j);
356 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
357 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
363 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
364 Request::startall(size, reqs);
365 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
368 Request::unref(&reqs[i]);
372 //now we can finish RMA calls
373 xbt_mutex_acquire(mut_);
374 std::vector<MPI_Request> *reqqs = requests_;
375 size = static_cast<int>(reqqs->size());
377 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
379 // start all requests that have been prepared by another process
380 for (const auto& req : *reqqs) {
381 if (req && (req->flags() & PREPARED))
385 MPI_Request* treqs = &(*reqqs)[0];
386 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
389 xbt_mutex_release(mut_);
391 Group::unref(group_);
392 opened_--; //we're closed for business !
397 //naive, blocking implementation.
398 XBT_DEBUG("Entering MPI_Win_Wait");
400 int size = group_->size();
401 MPI_Request* reqs = xbt_new0(MPI_Request, size);
404 int src=group_->index(j);
405 if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
406 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
412 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
413 Request::startall(size, reqs);
414 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
416 Request::unref(&reqs[i]);
419 xbt_mutex_acquire(mut_);
420 std::vector<MPI_Request> *reqqs = requests_;
421 size = static_cast<int>(reqqs->size());
423 XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
425 // start all requests that have been prepared by another process
426 for (const auto& req : *reqqs) {
427 if (req && (req->flags() & PREPARED))
431 MPI_Request* treqs = &(*reqqs)[0];
432 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
435 xbt_mutex_release(mut_);
437 Group::unref(group_);
438 opened_--; //we're opened for business !
442 Win* Win::f2c(int id){
443 return static_cast<Win*>(F2C::f2c(id));