1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
12 msg_bar_t creation_bar = nullptr;
14 typedef struct s_smpi_mpi_win{
21 std::vector<MPI_Request> *requests;
24 MPI_Win* connected_wins;
28 int count; //for ordering the accs
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
35 int comm_size = smpi_comm_size(comm);
36 int rank=smpi_comm_rank(comm);
37 XBT_DEBUG("Creating window");
39 win = xbt_new(s_smpi_mpi_win_t, 1);
42 win->disp_unit = disp_unit;
45 if(info!=MPI_INFO_NULL)
50 win->group = MPI_GROUP_NULL;
51 win->requests = new std::vector<MPI_Request>();
52 win->mut=xbt_mutex_init();
53 win->connected_wins = xbt_new0(MPI_Win, comm_size);
54 win->connected_wins[rank] = win;
57 win->bar = MSG_barrier_init(comm_size);
59 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
62 mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
64 mpi_coll_barrier_fun(comm);
69 int smpi_mpi_win_free( MPI_Win* win){
70 //As per the standard, perform a barrier to ensure every async comm is finished
71 MSG_barrier_wait((*win)->bar);
72 xbt_mutex_acquire((*win)->mut);
73 delete (*win)->requests;
74 xbt_mutex_release((*win)->mut);
75 xbt_free((*win)->connected_wins);
76 if ((*win)->name != nullptr){
77 xbt_free((*win)->name);
79 if((*win)->info!=MPI_INFO_NULL){
80 MPI_Info_free(&(*win)->info);
83 mpi_coll_barrier_fun((*win)->comm);
84 int rank=smpi_comm_rank((*win)->comm);
86 MSG_barrier_destroy((*win)->bar);
87 xbt_mutex_destroy((*win)->mut);
93 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
94 if(win->name==nullptr){
99 *length = strlen(win->name);
100 strncpy(name, win->name, *length+1);
103 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
104 if(win->comm != MPI_COMM_NULL){
105 *group = smpi_comm_group(win->comm);
107 *group = MPI_GROUP_NULL;
111 void smpi_mpi_win_set_name(MPI_Win win, char* name){
112 win->name = xbt_strdup(name);
115 int smpi_mpi_win_fence( int assert, MPI_Win win){
116 XBT_DEBUG("Entering fence");
119 if(assert != MPI_MODE_NOPRECEDE){
120 MSG_barrier_wait(win->bar);
121 xbt_mutex_acquire(win->mut);
122 std::vector<MPI_Request> *reqs = win->requests;
123 int size = static_cast<int>(reqs->size());
124 // start all requests that have been prepared by another process
126 for(auto req: *reqs){
127 if (req && (req->flags & PREPARED))
131 MPI_Request* treqs = &(*reqs)[0];
133 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
136 xbt_mutex_release(win->mut);
138 win->assert = assert;
140 MSG_barrier_wait(win->bar);
141 XBT_DEBUG("Leaving fence ");
146 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
147 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
149 if(win->opened==0)//check that post/start has been done
151 //get receiver pointer
152 MPI_Win recv_win = win->connected_wins[target_rank];
154 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
155 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
157 if(target_rank != smpi_comm_rank(win->comm)){
158 //prepare send_request
159 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
160 smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
162 //prepare receiver request
163 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
164 smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
166 //push request to receiver's win
167 xbt_mutex_acquire(recv_win->mut);
168 recv_win->requests->push_back(rreq);
169 xbt_mutex_release(recv_win->mut);
171 smpi_mpi_start(sreq);
173 //push request to sender's win
174 xbt_mutex_acquire(win->mut);
175 win->requests->push_back(sreq);
176 xbt_mutex_release(win->mut);
178 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
184 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
185 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
187 if(win->opened==0)//check that post/start has been done
190 MPI_Win send_win = win->connected_wins[target_rank];
192 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
193 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
195 if(target_rank != smpi_comm_rank(win->comm)){
196 //prepare send_request
197 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
198 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
201 //prepare receiver request
202 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
203 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
206 //start the send, with another process than us as sender.
207 smpi_mpi_start(sreq);
208 //push request to receiver's win
209 xbt_mutex_acquire(send_win->mut);
210 send_win->requests->push_back(sreq);
211 xbt_mutex_release(send_win->mut);
214 smpi_mpi_start(rreq);
215 //push request to sender's win
216 xbt_mutex_acquire(win->mut);
217 win->requests->push_back(rreq);
218 xbt_mutex_release(win->mut);
220 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
227 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
228 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
230 if(win->opened==0)//check that post/start has been done
232 //FIXME: local version
233 //get receiver pointer
234 MPI_Win recv_win = win->connected_wins[target_rank];
236 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
237 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
238 //As the tag will be used for ordering of the operations, add count to it
239 //prepare send_request
240 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
241 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
243 //prepare receiver request
244 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
245 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
248 //push request to receiver's win
249 xbt_mutex_acquire(recv_win->mut);
250 recv_win->requests->push_back(rreq);
251 xbt_mutex_release(recv_win->mut);
253 smpi_mpi_start(sreq);
255 //push request to sender's win
256 xbt_mutex_acquire(win->mut);
257 win->requests->push_back(sreq);
258 xbt_mutex_release(win->mut);
263 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
264 /* From MPI forum advices
265 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
266 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
267 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
268 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
269 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
270 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
271 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
272 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
273 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
274 must complete, without further dependencies. */
276 //naive, blocking implementation.
278 int size = smpi_group_size(group);
279 MPI_Request* reqs = xbt_new0(MPI_Request, size);
282 int src=smpi_group_index(group,j);
283 if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){
284 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
290 smpi_mpi_startall(size, reqs);
291 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
293 smpi_mpi_request_free(&reqs[i]);
296 win->opened++; //we're open for business !
298 smpi_group_use(group);
302 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
303 //let's make a synchronous send here
305 int size = smpi_group_size(group);
306 MPI_Request* reqs = xbt_new0(MPI_Request, size);
309 int dst=smpi_group_index(group,j);
310 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
311 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
318 smpi_mpi_startall(size, reqs);
319 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
321 smpi_mpi_request_free(&reqs[i]);
324 win->opened++; //we're open for business !
326 smpi_group_use(group);
330 int smpi_mpi_win_complete(MPI_Win win){
332 xbt_die("Complete called on already opened MPI_Win");
334 XBT_DEBUG("Entering MPI_Win_Complete");
336 int size = smpi_group_size(win->group);
337 MPI_Request* reqs = xbt_new0(MPI_Request, size);
340 int dst=smpi_group_index(win->group,j);
341 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
342 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
348 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
349 smpi_mpi_startall(size, reqs);
350 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
353 smpi_mpi_request_free(&reqs[i]);
357 //now we can finish RMA calls
358 xbt_mutex_acquire(win->mut);
359 std::vector<MPI_Request> *reqqs = win->requests;
360 size = static_cast<int>(reqqs->size());
362 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
363 // start all requests that have been prepared by another process
364 for (auto req: *reqqs){
365 if (req && (req->flags & PREPARED))
369 MPI_Request* treqs = &(*reqqs)[0];
370 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
372 xbt_mutex_release(win->mut);
374 smpi_group_unuse(win->group);
375 win->opened--; //we're closed for business !
379 int smpi_mpi_win_wait(MPI_Win win){
380 //naive, blocking implementation.
381 XBT_DEBUG("Entering MPI_Win_Wait");
383 int size = smpi_group_size(win->group);
384 MPI_Request* reqs = xbt_new0(MPI_Request, size);
387 int src=smpi_group_index(win->group,j);
388 if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
389 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
395 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
396 smpi_mpi_startall(size, reqs);
397 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
399 smpi_mpi_request_free(&reqs[i]);
402 xbt_mutex_acquire(win->mut);
403 std::vector<MPI_Request> *reqqs = win->requests;
404 size = static_cast<int>(reqqs->size());
406 XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
408 // start all requests that have been prepared by another process
409 for(auto req: *reqqs){
410 if (req && (req->flags & PREPARED))
414 MPI_Request* treqs = &(*reqqs)[0];
415 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
417 xbt_mutex_release(win->mut);
419 smpi_group_unuse(win->group);
420 win->opened--; //we're opened for business !