2 /* Copyright (c) 2007-2015. The SimGrid Team.
3 * All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 xbt_bar_t creation_bar = NULL;
16 typedef struct s_smpi_mpi_win{
25 MPI_Win* connected_wins;
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
35 int comm_size = smpi_comm_size(comm);
36 int rank=smpi_comm_rank(comm);
37 XBT_DEBUG("Creating window");
39 win = xbt_new(s_smpi_mpi_win_t, 1);
42 win->disp_unit = disp_unit;
45 if(info!=MPI_INFO_NULL)
50 win->group = MPI_GROUP_NULL;
51 win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
52 win->connected_wins = xbt_new0(MPI_Win, comm_size);
53 win->connected_wins[rank] = win;
56 win->bar=xbt_barrier_init(comm_size);
59 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
62 mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
64 mpi_coll_barrier_fun(comm);
69 int smpi_mpi_win_free( MPI_Win* win){
70 //As per the standard, perform a barrier to ensure every async comm is finished
71 xbt_barrier_wait((*win)->bar);
72 xbt_dynar_free(&(*win)->requests);
73 xbt_free((*win)->connected_wins);
74 if ((*win)->name != NULL){
75 xbt_free((*win)->name);
77 if((*win)->info!=MPI_INFO_NULL){
78 MPI_Info_free(&(*win)->info);
81 mpi_coll_barrier_fun((*win)->comm);
82 int rank=smpi_comm_rank((*win)->comm);
84 xbt_barrier_destroy((*win)->bar);
90 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
96 *length = strlen(win->name);
97 strcpy(name, win->name);
100 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
101 if(win->comm != MPI_COMM_NULL){
102 *group = smpi_comm_group(win->comm);
103 smpi_group_use(*group);
107 void smpi_mpi_win_set_name(MPI_Win win, char* name){
108 win->name = xbt_strdup(name);;
111 int smpi_mpi_win_fence( int assert, MPI_Win win){
112 XBT_DEBUG("Entering fence");
115 if(assert != MPI_MODE_NOPRECEDE){
116 xbt_barrier_wait(win->bar);
118 xbt_dynar_t reqs = win->requests;
119 int size = xbt_dynar_length(reqs);
122 // start all requests that have been prepared by another process
123 xbt_dynar_foreach(reqs, cpt, req){
124 if (req->flags & PREPARED) smpi_mpi_start(req);
127 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
128 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
129 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
133 win->assert = assert;
135 xbt_barrier_wait(win->bar);
136 XBT_DEBUG("Leaving fence ");
141 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
142 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
144 if(!win->opened)//check that post/start has been done
146 //get receiver pointer
147 MPI_Win recv_win = win->connected_wins[target_rank];
149 void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
150 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
152 if(target_rank != smpi_comm_rank(win->comm)){
153 //prepare send_request
154 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
155 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
157 //prepare receiver request
158 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
159 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
161 //push request to receiver's win
162 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
165 smpi_mpi_start(sreq);
167 //push request to sender's win
168 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
170 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
176 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
177 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
179 if(!win->opened)//check that post/start has been done
182 MPI_Win send_win = win->connected_wins[target_rank];
184 void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
185 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
187 if(target_rank != smpi_comm_rank(win->comm)){
188 //prepare send_request
189 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
190 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
193 //prepare receiver request
194 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
195 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
198 //start the send, with another process than us as sender.
199 smpi_mpi_start(sreq);
201 //push request to receiver's win
202 xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
205 smpi_mpi_start(rreq);
207 //push request to sender's win
208 xbt_dynar_push_as(win->requests, MPI_Request, rreq);
210 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
217 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
218 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
220 if(!win->opened)//check that post/start has been done
222 //FIXME: local version
223 //get receiver pointer
224 MPI_Win recv_win = win->connected_wins[target_rank];
226 void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
227 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
229 //prepare send_request
230 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
231 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
233 //prepare receiver request
234 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
235 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
236 //push request to receiver's win
237 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
239 smpi_mpi_start(sreq);
241 //push request to sender's win
242 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
247 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
248 /* From MPI forum advices
249 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
250 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
251 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
252 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
253 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
254 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
255 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
256 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
257 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
258 must complete, without further dependencies. */
260 //naive, blocking implementation.
262 int size = smpi_group_size(group);
263 MPI_Request* reqs = xbt_new0(MPI_Request, size);
265 // for(i=0;i<size;i++){
267 int src=smpi_group_index(group,j);
268 if(src!=smpi_process_index()){
269 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
275 smpi_mpi_startall(size, reqs);
276 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
278 smpi_mpi_request_free(&reqs[i]);
281 win->opened++; //we're open for business !
283 smpi_group_use(group);
287 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
288 //let's make a synchronous send here
290 int size = smpi_group_size(group);
291 MPI_Request* reqs = xbt_new0(MPI_Request, size);
294 int dst=smpi_group_index(group,j);
295 if(dst!=smpi_process_index()){
296 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
303 smpi_mpi_startall(size, reqs);
304 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
306 smpi_mpi_request_free(&reqs[i]);
309 win->opened++; //we're open for business !
311 smpi_group_use(group);
315 int smpi_mpi_win_complete(MPI_Win win){
317 xbt_die("Complete called on already opened MPI_Win");
318 // xbt_barrier_wait(win->bar);
319 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
320 //mpi_coll_barrier_fun(comm);
321 //smpi_comm_destroy(comm);
323 XBT_DEBUG("Entering MPI_Win_Complete");
325 int size = smpi_group_size(win->group);
326 MPI_Request* reqs = xbt_new0(MPI_Request, size);
329 int dst=smpi_group_index(win->group,j);
330 if(dst!=smpi_process_index()){
331 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
337 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
338 smpi_mpi_startall(size, reqs);
339 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
342 smpi_mpi_request_free(&reqs[i]);
346 //now we can finish RMA calls
348 xbt_dynar_t reqqs = win->requests;
349 size = xbt_dynar_length(reqqs);
351 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
354 // start all requests that have been prepared by another process
355 xbt_dynar_foreach(reqqs, cpt, req){
356 if (req->flags & PREPARED) smpi_mpi_start(req);
359 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
360 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
361 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
363 smpi_group_unuse(win->group);
364 win->opened--; //we're closed for business !
368 int smpi_mpi_win_wait(MPI_Win win){
369 // xbt_barrier_wait(win->bar);
370 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
371 //mpi_coll_barrier_fun(comm);
372 //smpi_comm_destroy(comm);
373 //naive, blocking implementation.
374 XBT_DEBUG("Entering MPI_Win_Wait");
376 int size = smpi_group_size(win->group);
377 MPI_Request* reqs = xbt_new0(MPI_Request, size);
379 // for(i=0;i<size;i++){
381 int src=smpi_group_index(win->group,j);
382 if(src!=smpi_process_index()){
383 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
389 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
390 smpi_mpi_startall(size, reqs);
391 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
393 smpi_mpi_request_free(&reqs[i]);
397 xbt_dynar_t reqqs = win->requests;
398 size = xbt_dynar_length(reqqs);
400 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
404 // start all requests that have been prepared by another process
405 xbt_dynar_foreach(reqqs, cpt, req){
406 if (req->flags & PREPARED) smpi_mpi_start(req);
409 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
410 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
411 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
413 smpi_group_unuse(win->group);
414 win->opened--; //we're opened for business !