1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18 int comm_size = comm->size();
19 int rank = comm->rank();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 connected_wins_ = new MPI_Win[comm_size];
29 connected_wins_[rank] = this;
32 bar_ = MSG_barrier_init(comm_size);
34 Colls::allgather(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
37 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43 //As per the standard, perform a barrier to ensure every async comm is finished
44 MSG_barrier_wait(bar_);
45 xbt_mutex_acquire(mut_);
47 xbt_mutex_release(mut_);
48 delete[] connected_wins_;
49 if (name_ != nullptr){
52 if(info_!=MPI_INFO_NULL){
53 MPI_Info_free(&info_);
56 Colls::barrier(comm_);
57 int rank=comm_->rank();
59 MSG_barrier_destroy(bar_);
60 xbt_mutex_destroy(mut_);
65 void Win::get_name(char* name, int* length){
71 *length = strlen(name_);
72 strncpy(name, name_, *length+1);
75 void Win::get_group(MPI_Group* group){
76 if(comm_ != MPI_COMM_NULL){
77 *group = comm_->group();
79 *group = MPI_GROUP_NULL;
96 void Win::set_name(char* name){
97 name_ = xbt_strdup(name);
100 int Win::fence(int assert)
102 XBT_DEBUG("Entering fence");
105 if (assert != MPI_MODE_NOPRECEDE) {
106 // This is not the first fence => finalize what came before
107 MSG_barrier_wait(bar_);
108 xbt_mutex_acquire(mut_);
109 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
110 // Without this, the vector could get redimensionned when another process pushes.
111 // This would result in the array used by Request::waitall() to be invalidated.
112 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
113 std::vector<MPI_Request> *reqs = requests_;
114 int size = static_cast<int>(reqs->size());
115 // start all requests that have been prepared by another process
117 for (const auto& req : *reqs) {
118 if (req && (req->flags() & PREPARED))
122 MPI_Request* treqs = &(*reqs)[0];
124 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
127 xbt_mutex_release(mut_);
131 MSG_barrier_wait(bar_);
132 XBT_DEBUG("Leaving fence");
137 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
138 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
140 if(opened_==0)//check that post/start has been done
142 //get receiver pointer
143 MPI_Win recv_win = connected_wins_[target_rank];
145 if(target_count*target_datatype->get_extent()>recv_win->size_)
148 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
149 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
151 if(target_rank != comm_->rank()){
152 //prepare send_request
153 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
154 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
156 //prepare receiver request
157 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
158 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
160 //push request to receiver's win
161 xbt_mutex_acquire(recv_win->mut_);
162 recv_win->requests_->push_back(rreq);
163 xbt_mutex_release(recv_win->mut_);
167 //push request to sender's win
168 xbt_mutex_acquire(mut_);
169 requests_->push_back(sreq);
170 xbt_mutex_release(mut_);
172 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
178 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
179 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
181 if(opened_==0)//check that post/start has been done
184 MPI_Win send_win = connected_wins_[target_rank];
186 if(target_count*target_datatype->get_extent()>send_win->size_)
189 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
190 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
192 if(target_rank != comm_->rank()){
193 //prepare send_request
194 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
195 comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm_,
198 //prepare receiver request
199 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
200 comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, comm_,
203 //start the send, with another process than us as sender.
205 //push request to receiver's win
206 xbt_mutex_acquire(send_win->mut_);
207 send_win->requests_->push_back(sreq);
208 xbt_mutex_release(send_win->mut_);
212 //push request to sender's win
213 xbt_mutex_acquire(mut_);
214 requests_->push_back(rreq);
215 xbt_mutex_release(mut_);
217 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
224 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
225 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
227 if(opened_==0)//check that post/start has been done
229 //FIXME: local version
230 //get receiver pointer
231 MPI_Win recv_win = connected_wins_[target_rank];
233 if(target_count*target_datatype->get_extent()>recv_win->size_)
236 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
237 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
238 //As the tag will be used for ordering of the operations, add count to it
239 //prepare send_request
240 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
241 smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
243 //prepare receiver request
244 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
245 smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
248 //push request to receiver's win
249 xbt_mutex_acquire(recv_win->mut_);
250 recv_win->requests_->push_back(rreq);
251 xbt_mutex_release(recv_win->mut_);
255 //push request to sender's win
256 xbt_mutex_acquire(mut_);
257 requests_->push_back(sreq);
258 xbt_mutex_release(mut_);
263 int Win::start(MPI_Group group, int assert){
264 /* From MPI forum advices
265 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
266 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
267 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
268 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
269 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
270 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
271 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
272 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
273 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
274 must complete, without further dependencies. */
276 //naive, blocking implementation.
279 int size = group->size();
280 MPI_Request* reqs = xbt_new0(MPI_Request, size);
283 int src = group->index(j);
284 if (src != smpi_process_index() && src != MPI_UNDEFINED) {
285 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
291 Request::startall(size, reqs);
292 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
294 Request::unref(&reqs[i]);
297 opened_++; //we're open for business !
303 int Win::post(MPI_Group group, int assert){
304 //let's make a synchronous send here
307 int size = group->size();
308 MPI_Request* reqs = xbt_new0(MPI_Request, size);
311 int dst=group->index(j);
312 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
313 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
320 Request::startall(size, reqs);
321 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
323 Request::unref(&reqs[i]);
326 opened_++; //we're open for business !
334 xbt_die("Complete called on already opened MPI_Win");
336 XBT_DEBUG("Entering MPI_Win_Complete");
339 int size = group_->size();
340 MPI_Request* reqs = xbt_new0(MPI_Request, size);
343 int dst=group_->index(j);
344 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
345 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
351 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
352 Request::startall(size, reqs);
353 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
356 Request::unref(&reqs[i]);
360 //now we can finish RMA calls
361 xbt_mutex_acquire(mut_);
362 std::vector<MPI_Request> *reqqs = requests_;
363 size = static_cast<int>(reqqs->size());
365 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
367 // start all requests that have been prepared by another process
368 for (const auto& req : *reqqs) {
369 if (req && (req->flags() & PREPARED))
373 MPI_Request* treqs = &(*reqqs)[0];
374 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
377 xbt_mutex_release(mut_);
379 Group::unref(group_);
380 opened_--; //we're closed for business !
385 //naive, blocking implementation.
386 XBT_DEBUG("Entering MPI_Win_Wait");
388 int size = group_->size();
389 MPI_Request* reqs = xbt_new0(MPI_Request, size);
392 int src=group_->index(j);
393 if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
394 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
400 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
401 Request::startall(size, reqs);
402 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
404 Request::unref(&reqs[i]);
407 xbt_mutex_acquire(mut_);
408 std::vector<MPI_Request> *reqqs = requests_;
409 size = static_cast<int>(reqqs->size());
411 XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
413 // start all requests that have been prepared by another process
414 for (const auto& req : *reqqs) {
415 if (req && (req->flags() & PREPARED))
419 MPI_Request* treqs = &(*reqqs)[0];
420 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
423 xbt_mutex_release(mut_);
425 Group::unref(group_);
426 opened_--; //we're opened for business !
430 Win* Win::f2c(int id){
431 return static_cast<Win*>(F2C::f2c(id));