1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
12 typedef struct s_smpi_mpi_win{
19 std::vector<MPI_Request> *requests;
22 MPI_Win* connected_wins;
26 int count; //for ordering the accs
30 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
31 int comm_size = comm->size();
32 int rank = comm->rank();
33 XBT_DEBUG("Creating window");
35 MPI_Win win = xbt_new(s_smpi_mpi_win_t, 1);
38 win->disp_unit = disp_unit;
41 if(info!=MPI_INFO_NULL)
46 win->group = MPI_GROUP_NULL;
47 win->requests = new std::vector<MPI_Request>();
48 win->mut=xbt_mutex_init();
49 win->connected_wins = xbt_new0(MPI_Win, comm_size);
50 win->connected_wins[rank] = win;
53 win->bar = MSG_barrier_init(comm_size);
55 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
58 mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
60 mpi_coll_barrier_fun(comm);
65 int smpi_mpi_win_free( MPI_Win* win){
66 //As per the standard, perform a barrier to ensure every async comm is finished
67 MSG_barrier_wait((*win)->bar);
68 xbt_mutex_acquire((*win)->mut);
69 delete (*win)->requests;
70 xbt_mutex_release((*win)->mut);
71 xbt_free((*win)->connected_wins);
72 if ((*win)->name != nullptr){
73 xbt_free((*win)->name);
75 if((*win)->info!=MPI_INFO_NULL){
76 MPI_Info_free(&(*win)->info);
79 mpi_coll_barrier_fun((*win)->comm);
80 int rank=(*win)->comm->rank();
82 MSG_barrier_destroy((*win)->bar);
83 xbt_mutex_destroy((*win)->mut);
89 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
90 if(win->name==nullptr){
95 *length = strlen(win->name);
96 strncpy(name, win->name, *length+1);
99 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
100 if(win->comm != MPI_COMM_NULL){
101 *group = win->comm->group();
103 *group = MPI_GROUP_NULL;
107 void smpi_mpi_win_set_name(MPI_Win win, char* name){
108 win->name = xbt_strdup(name);
111 int smpi_mpi_win_fence(int assert, MPI_Win win)
113 XBT_DEBUG("Entering fence");
114 if (win->opened == 0)
116 if (assert != MPI_MODE_NOPRECEDE) {
117 // This is not the first fence => finalize what came before
118 MSG_barrier_wait(win->bar);
119 xbt_mutex_acquire(win->mut);
120 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
121 // Without this, the vector could get redimensionned when another process pushes.
122 // This would result in the array used by smpi_mpi_waitall() to be invalidated.
123 // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall
124 std::vector<MPI_Request> *reqs = win->requests;
125 int size = static_cast<int>(reqs->size());
126 // start all requests that have been prepared by another process
128 for (const auto& req : *reqs) {
129 if (req && (req->flags & PREPARED))
133 MPI_Request* treqs = &(*reqs)[0];
135 smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
138 xbt_mutex_release(win->mut);
140 win->assert = assert;
142 MSG_barrier_wait(win->bar);
143 XBT_DEBUG("Leaving fence");
148 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
149 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
151 if(win->opened==0)//check that post/start has been done
153 //get receiver pointer
154 MPI_Win recv_win = win->connected_wins[target_rank];
156 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
157 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
159 if(target_rank != win->comm->rank()){
160 //prepare send_request
161 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
162 win->comm->group()->index(target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
164 //prepare receiver request
165 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
166 win->comm->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
168 //push request to receiver's win
169 xbt_mutex_acquire(recv_win->mut);
170 recv_win->requests->push_back(rreq);
171 xbt_mutex_release(recv_win->mut);
173 smpi_mpi_start(sreq);
175 //push request to sender's win
176 xbt_mutex_acquire(win->mut);
177 win->requests->push_back(sreq);
178 xbt_mutex_release(win->mut);
180 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
186 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
187 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
189 if(win->opened==0)//check that post/start has been done
192 MPI_Win send_win = win->connected_wins[target_rank];
194 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
195 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
197 if(target_rank != win->comm->rank()){
198 //prepare send_request
199 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
200 win->comm->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
203 //prepare receiver request
204 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
205 win->comm->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
208 //start the send, with another process than us as sender.
209 smpi_mpi_start(sreq);
210 //push request to receiver's win
211 xbt_mutex_acquire(send_win->mut);
212 send_win->requests->push_back(sreq);
213 xbt_mutex_release(send_win->mut);
216 smpi_mpi_start(rreq);
217 //push request to sender's win
218 xbt_mutex_acquire(win->mut);
219 win->requests->push_back(rreq);
220 xbt_mutex_release(win->mut);
222 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
229 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
230 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
232 if(win->opened==0)//check that post/start has been done
234 //FIXME: local version
235 //get receiver pointer
236 MPI_Win recv_win = win->connected_wins[target_rank];
238 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
239 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
240 //As the tag will be used for ordering of the operations, add count to it
241 //prepare send_request
242 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
243 smpi_process_index(), win->comm->group()->index(target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
245 //prepare receiver request
246 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
247 smpi_process_index(), win->comm->group()->index(target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
250 //push request to receiver's win
251 xbt_mutex_acquire(recv_win->mut);
252 recv_win->requests->push_back(rreq);
253 xbt_mutex_release(recv_win->mut);
255 smpi_mpi_start(sreq);
257 //push request to sender's win
258 xbt_mutex_acquire(win->mut);
259 win->requests->push_back(sreq);
260 xbt_mutex_release(win->mut);
265 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
266 /* From MPI forum advices
267 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
268 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
269 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
270 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
271 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
272 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
273 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
274 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
275 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
276 must complete, without further dependencies. */
278 //naive, blocking implementation.
281 int size = group->size();
282 MPI_Request* reqs = xbt_new0(MPI_Request, size);
285 int src = group->index(j);
286 if (src != smpi_process_index() && src != MPI_UNDEFINED) {
287 reqs[i] = smpi_irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
293 smpi_mpi_startall(size, reqs);
294 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
296 smpi_mpi_request_free(&reqs[i]);
299 win->opened++; //we're open for business !
305 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
306 //let's make a synchronous send here
309 int size = group->size();
310 MPI_Request* reqs = xbt_new0(MPI_Request, size);
313 int dst=group->index(j);
314 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
315 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
322 smpi_mpi_startall(size, reqs);
323 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
325 smpi_mpi_request_free(&reqs[i]);
328 win->opened++; //we're open for business !
334 int smpi_mpi_win_complete(MPI_Win win){
336 xbt_die("Complete called on already opened MPI_Win");
338 XBT_DEBUG("Entering MPI_Win_Complete");
341 int size = win->group->size();
342 MPI_Request* reqs = xbt_new0(MPI_Request, size);
345 int dst=win->group->index(j);
346 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
347 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
353 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
354 smpi_mpi_startall(size, reqs);
355 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
358 smpi_mpi_request_free(&reqs[i]);
362 //now we can finish RMA calls
363 xbt_mutex_acquire(win->mut);
364 std::vector<MPI_Request> *reqqs = win->requests;
365 size = static_cast<int>(reqqs->size());
367 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
369 // start all requests that have been prepared by another process
370 for (const auto& req : *reqqs) {
371 if (req && (req->flags & PREPARED))
375 MPI_Request* treqs = &(*reqqs)[0];
376 smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
379 xbt_mutex_release(win->mut);
382 win->opened--; //we're closed for business !
386 int smpi_mpi_win_wait(MPI_Win win){
387 //naive, blocking implementation.
388 XBT_DEBUG("Entering MPI_Win_Wait");
390 int size = win->group->size();
391 MPI_Request* reqs = xbt_new0(MPI_Request, size);
394 int src=win->group->index(j);
395 if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
396 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
402 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
403 smpi_mpi_startall(size, reqs);
404 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
406 smpi_mpi_request_free(&reqs[i]);
409 xbt_mutex_acquire(win->mut);
410 std::vector<MPI_Request> *reqqs = win->requests;
411 size = static_cast<int>(reqqs->size());
413 XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
415 // start all requests that have been prepared by another process
416 for (const auto& req : *reqqs) {
417 if (req && (req->flags & PREPARED))
421 MPI_Request* treqs = &(*reqqs)[0];
422 smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
425 xbt_mutex_release(win->mut);
428 win->opened--; //we're opened for business !