1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 xbt_bar_t creation_bar = nullptr;
16 typedef struct s_smpi_mpi_win{
23 std::vector<MPI_Request> *requests;
25 MPI_Win* connected_wins;
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
35 int comm_size = smpi_comm_size(comm);
36 int rank=smpi_comm_rank(comm);
37 XBT_DEBUG("Creating window");
39 win = xbt_new(s_smpi_mpi_win_t, 1);
42 win->disp_unit = disp_unit;
45 if(info!=MPI_INFO_NULL)
50 win->group = MPI_GROUP_NULL;
51 win->requests = new std::vector<MPI_Request>();
52 win->connected_wins = xbt_new0(MPI_Win, comm_size);
53 win->connected_wins[rank] = win;
56 win->bar=xbt_barrier_init(comm_size);
58 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
61 mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
63 mpi_coll_barrier_fun(comm);
68 int smpi_mpi_win_free( MPI_Win* win){
69 //As per the standard, perform a barrier to ensure every async comm is finished
70 xbt_barrier_wait((*win)->bar);
71 delete (*win)->requests;
72 xbt_free((*win)->connected_wins);
73 if ((*win)->name != nullptr){
74 xbt_free((*win)->name);
76 if((*win)->info!=MPI_INFO_NULL){
77 MPI_Info_free(&(*win)->info);
80 mpi_coll_barrier_fun((*win)->comm);
81 int rank=smpi_comm_rank((*win)->comm);
83 xbt_barrier_destroy((*win)->bar);
89 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
90 if(win->name==nullptr){
95 *length = strlen(win->name);
96 strncpy(name, win->name, *length+1);
99 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
100 if(win->comm != MPI_COMM_NULL){
101 *group = smpi_comm_group(win->comm);
105 void smpi_mpi_win_set_name(MPI_Win win, char* name){
106 win->name = xbt_strdup(name);
109 int smpi_mpi_win_fence( int assert, MPI_Win win){
110 XBT_DEBUG("Entering fence");
113 if(assert != MPI_MODE_NOPRECEDE){
114 xbt_barrier_wait(win->bar);
116 std::vector<MPI_Request> *reqs = win->requests;
117 int size = static_cast<int>(reqs->size());
118 // start all requests that have been prepared by another process
119 for(auto req: *reqs){
120 if (req && (req->flags & PREPARED))
124 MPI_Request* treqs = &(*reqs)[0];
125 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
127 win->assert = assert;
129 xbt_barrier_wait(win->bar);
130 XBT_DEBUG("Leaving fence ");
135 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
136 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
138 if(win->opened==0)//check that post/start has been done
140 //get receiver pointer
141 MPI_Win recv_win = win->connected_wins[target_rank];
143 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
144 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
146 if(target_rank != smpi_comm_rank(win->comm)){
147 //prepare send_request
148 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
149 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
151 //prepare receiver request
152 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
153 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
155 //push request to receiver's win
156 recv_win->requests->push_back(rreq);
159 smpi_mpi_start(sreq);
161 //push request to sender's win
162 win->requests->push_back(sreq);
164 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
170 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
171 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
173 if(win->opened==0)//check that post/start has been done
176 MPI_Win send_win = win->connected_wins[target_rank];
178 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
179 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
181 if(target_rank != smpi_comm_rank(win->comm)){
182 //prepare send_request
183 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
184 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
187 //prepare receiver request
188 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
189 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
192 //start the send, with another process than us as sender.
193 smpi_mpi_start(sreq);
195 //push request to receiver's win
196 send_win->requests->push_back(sreq);
199 smpi_mpi_start(rreq);
201 //push request to sender's win
202 win->requests->push_back(rreq);
204 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
211 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
212 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
214 if(win->opened==0)//check that post/start has been done
216 //FIXME: local version
217 //get receiver pointer
218 MPI_Win recv_win = win->connected_wins[target_rank];
220 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
221 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
223 //prepare send_request
224 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
225 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
227 //prepare receiver request
228 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
229 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
230 //push request to receiver's win
231 recv_win->requests->push_back(rreq);
233 smpi_mpi_start(sreq);
235 //push request to sender's win
236 win->requests->push_back(sreq);
241 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
242 /* From MPI forum advices
243 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
244 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
245 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
246 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
247 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
248 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
249 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
250 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
251 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
252 must complete, without further dependencies. */
254 //naive, blocking implementation.
256 int size = smpi_group_size(group);
257 MPI_Request* reqs = xbt_new0(MPI_Request, size);
260 int src=smpi_group_index(group,j);
261 if(src!=smpi_process_index()){
262 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
268 smpi_mpi_startall(size, reqs);
269 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
271 smpi_mpi_request_free(&reqs[i]);
274 win->opened++; //we're open for business !
276 smpi_group_use(group);
280 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
281 //let's make a synchronous send here
283 int size = smpi_group_size(group);
284 MPI_Request* reqs = xbt_new0(MPI_Request, size);
287 int dst=smpi_group_index(group,j);
288 if(dst!=smpi_process_index()){
289 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
296 smpi_mpi_startall(size, reqs);
297 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
299 smpi_mpi_request_free(&reqs[i]);
302 win->opened++; //we're open for business !
304 smpi_group_use(group);
308 int smpi_mpi_win_complete(MPI_Win win){
310 xbt_die("Complete called on already opened MPI_Win");
312 XBT_DEBUG("Entering MPI_Win_Complete");
314 int size = smpi_group_size(win->group);
315 MPI_Request* reqs = xbt_new0(MPI_Request, size);
318 int dst=smpi_group_index(win->group,j);
319 if(dst!=smpi_process_index()){
320 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
326 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
327 smpi_mpi_startall(size, reqs);
328 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
331 smpi_mpi_request_free(&reqs[i]);
335 //now we can finish RMA calls
337 std::vector<MPI_Request> *reqqs = win->requests;
338 size = static_cast<int>(reqqs->size());
340 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
341 // start all requests that have been prepared by another process
342 for (auto req: *reqqs){
343 if (req && (req->flags & PREPARED))
347 MPI_Request* treqs = &(*reqqs)[0];
348 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
350 smpi_group_unuse(win->group);
351 win->opened--; //we're closed for business !
355 int smpi_mpi_win_wait(MPI_Win win){
356 //naive, blocking implementation.
357 XBT_DEBUG("Entering MPI_Win_Wait");
359 int size = smpi_group_size(win->group);
360 MPI_Request* reqs = xbt_new0(MPI_Request, size);
363 int src=smpi_group_index(win->group,j);
364 if(src!=smpi_process_index()){
365 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
371 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
372 smpi_mpi_startall(size, reqs);
373 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
375 smpi_mpi_request_free(&reqs[i]);
379 std::vector<MPI_Request> *reqqs = win->requests;
380 size = static_cast<int>(reqqs->size());
382 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
384 // start all requests that have been prepared by another process
385 for(auto req: *reqqs){
386 if (req && (req->flags & PREPARED))
390 MPI_Request* treqs = &(*reqqs)[0];
391 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
393 smpi_group_unuse(win->group);
394 win->opened--; //we're opened for business !