1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
12 msg_bar_t creation_bar = nullptr;
14 typedef struct s_smpi_mpi_win{
21 std::vector<MPI_Request> *requests;
23 MPI_Win* connected_wins;
27 int count; //for ordering the accs
31 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
34 int comm_size = smpi_comm_size(comm);
35 int rank=smpi_comm_rank(comm);
36 XBT_DEBUG("Creating window");
38 win = xbt_new(s_smpi_mpi_win_t, 1);
41 win->disp_unit = disp_unit;
44 if(info!=MPI_INFO_NULL)
49 win->group = MPI_GROUP_NULL;
50 win->requests = new std::vector<MPI_Request>();
51 win->connected_wins = xbt_new0(MPI_Win, comm_size);
52 win->connected_wins[rank] = win;
55 win->bar = MSG_barrier_init(comm_size);
57 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
60 mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
62 mpi_coll_barrier_fun(comm);
67 int smpi_mpi_win_free( MPI_Win* win){
68 //As per the standard, perform a barrier to ensure every async comm is finished
69 MSG_barrier_wait((*win)->bar);
70 delete (*win)->requests;
71 xbt_free((*win)->connected_wins);
72 if ((*win)->name != nullptr){
73 xbt_free((*win)->name);
75 if((*win)->info!=MPI_INFO_NULL){
76 MPI_Info_free(&(*win)->info);
79 mpi_coll_barrier_fun((*win)->comm);
80 int rank=smpi_comm_rank((*win)->comm);
82 MSG_barrier_destroy((*win)->bar);
88 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
89 if(win->name==nullptr){
94 *length = strlen(win->name);
95 strncpy(name, win->name, *length+1);
98 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
99 if(win->comm != MPI_COMM_NULL){
100 *group = smpi_comm_group(win->comm);
102 *group = MPI_GROUP_NULL;
106 void smpi_mpi_win_set_name(MPI_Win win, char* name){
107 win->name = xbt_strdup(name);
110 int smpi_mpi_win_fence( int assert, MPI_Win win){
111 XBT_DEBUG("Entering fence");
114 if(assert != MPI_MODE_NOPRECEDE){
115 MSG_barrier_wait(win->bar);
117 std::vector<MPI_Request> *reqs = win->requests;
118 int size = static_cast<int>(reqs->size());
119 // start all requests that have been prepared by another process
120 for(auto req: *reqs){
121 if (req && (req->flags & PREPARED))
125 MPI_Request* treqs = &(*reqs)[0];
126 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
129 win->assert = assert;
131 MSG_barrier_wait(win->bar);
132 XBT_DEBUG("Leaving fence ");
137 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
138 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
140 if(win->opened==0)//check that post/start has been done
142 //get receiver pointer
143 MPI_Win recv_win = win->connected_wins[target_rank];
145 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
146 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
148 if(target_rank != smpi_comm_rank(win->comm)){
149 //prepare send_request
150 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
151 smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
153 //prepare receiver request
154 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
155 smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
157 //push request to receiver's win
158 recv_win->requests->push_back(rreq);
161 smpi_mpi_start(sreq);
163 //push request to sender's win
164 win->requests->push_back(sreq);
166 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
172 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
173 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
175 if(win->opened==0)//check that post/start has been done
178 MPI_Win send_win = win->connected_wins[target_rank];
180 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
181 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
183 if(target_rank != smpi_comm_rank(win->comm)){
184 //prepare send_request
185 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
186 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
189 //prepare receiver request
190 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
191 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
194 //start the send, with another process than us as sender.
195 smpi_mpi_start(sreq);
197 //push request to receiver's win
198 send_win->requests->push_back(sreq);
201 smpi_mpi_start(rreq);
203 //push request to sender's win
204 win->requests->push_back(rreq);
206 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
213 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
214 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
216 if(win->opened==0)//check that post/start has been done
218 //FIXME: local version
219 //get receiver pointer
220 MPI_Win recv_win = win->connected_wins[target_rank];
222 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
223 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
224 //As the tag will be used for ordering of the operations, add count to it
225 //prepare send_request
226 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
227 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
229 //prepare receiver request
230 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
231 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
234 //push request to receiver's win
235 recv_win->requests->push_back(rreq);
237 smpi_mpi_start(sreq);
239 //push request to sender's win
240 win->requests->push_back(sreq);
245 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
246 /* From MPI forum advices
247 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
248 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
249 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
250 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
251 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
252 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
253 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
254 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
255 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
256 must complete, without further dependencies. */
258 //naive, blocking implementation.
260 int size = smpi_group_size(group);
261 MPI_Request* reqs = xbt_new0(MPI_Request, size);
264 int src=smpi_group_index(group,j);
265 if(src!=smpi_process_index()){
266 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
272 smpi_mpi_startall(size, reqs);
273 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
275 smpi_mpi_request_free(&reqs[i]);
278 win->opened++; //we're open for business !
280 smpi_group_use(group);
284 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
285 //let's make a synchronous send here
287 int size = smpi_group_size(group);
288 MPI_Request* reqs = xbt_new0(MPI_Request, size);
291 int dst=smpi_group_index(group,j);
292 if(dst!=smpi_process_index()){
293 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
300 smpi_mpi_startall(size, reqs);
301 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
303 smpi_mpi_request_free(&reqs[i]);
306 win->opened++; //we're open for business !
308 smpi_group_use(group);
312 int smpi_mpi_win_complete(MPI_Win win){
314 xbt_die("Complete called on already opened MPI_Win");
316 XBT_DEBUG("Entering MPI_Win_Complete");
318 int size = smpi_group_size(win->group);
319 MPI_Request* reqs = xbt_new0(MPI_Request, size);
322 int dst=smpi_group_index(win->group,j);
323 if(dst!=smpi_process_index()){
324 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
330 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
331 smpi_mpi_startall(size, reqs);
332 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
335 smpi_mpi_request_free(&reqs[i]);
339 //now we can finish RMA calls
341 std::vector<MPI_Request> *reqqs = win->requests;
342 size = static_cast<int>(reqqs->size());
344 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
345 // start all requests that have been prepared by another process
346 for (auto req: *reqqs){
347 if (req && (req->flags & PREPARED))
351 MPI_Request* treqs = &(*reqqs)[0];
352 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
354 smpi_group_unuse(win->group);
355 win->opened--; //we're closed for business !
359 int smpi_mpi_win_wait(MPI_Win win){
360 //naive, blocking implementation.
361 XBT_DEBUG("Entering MPI_Win_Wait");
363 int size = smpi_group_size(win->group);
364 MPI_Request* reqs = xbt_new0(MPI_Request, size);
367 int src=smpi_group_index(win->group,j);
368 if(src!=smpi_process_index()){
369 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
375 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
376 smpi_mpi_startall(size, reqs);
377 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
379 smpi_mpi_request_free(&reqs[i]);
383 std::vector<MPI_Request> *reqqs = win->requests;
384 size = static_cast<int>(reqqs->size());
386 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
388 // start all requests that have been prepared by another process
389 for(auto req: *reqqs){
390 if (req && (req->flags & PREPARED))
394 MPI_Request* treqs = &(*reqqs)[0];
395 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
397 smpi_group_unuse(win->group);
398 win->opened--; //we're opened for business !