-/* Copyright (c) 2007-2014. The SimGrid Team.
+/* Copyright (c) 2007-2015. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
xbt_bar_t bar;
MPI_Win* connected_wins;
char* name;
+ int opened;
+ MPI_Group group;
} s_smpi_mpi_win_t;
info->refcount++;
win->comm = comm;
win->name = NULL;
+ win->opened = 0;
+ win->group = MPI_GROUP_NULL;
win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
win->connected_wins = xbt_malloc0(comm_size*sizeof(MPI_Win));
win->connected_wins[rank] = win;
MPI_Info_free(&(*win)->info);
}
xbt_free(*win);
- win = MPI_WIN_NULL;
+ *win = MPI_WIN_NULL;
return MPI_SUCCESS;
}
}
void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
- if(win->comm != MPI_COMM_NULL)
+ if(win->comm != MPI_COMM_NULL){
*group = smpi_comm_group(win->comm);
+ smpi_group_use(*group);
+ }
}
void smpi_mpi_win_set_name(MPI_Win win, char* name){
- win->name = strdup(name);;
+ win->name = xbt_strdup(name);;
}
int smpi_mpi_win_fence( int assert, MPI_Win win){
XBT_DEBUG("Entering fence");
-
+ if(!win->opened)
+ win->opened=1;
if(assert != MPI_MODE_NOPRECEDE){
xbt_barrier_wait(win->bar);
int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
{
+
+ if(!win->opened)//check that post/start has been done
+ return MPI_ERR_WIN;
//get receiver pointer
MPI_Win recv_win = win->connected_wins[target_rank];
int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
{
+ if(!win->opened)//check that post/start has been done
+ return MPI_ERR_WIN;
//get sender pointer
MPI_Win send_win = win->connected_wins[target_rank];
int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
{
+ if(!win->opened)//check that post/start has been done
+ return MPI_ERR_WIN;
//FIXME: local version
//get receiver pointer
MPI_Win recv_win = win->connected_wins[target_rank];
return MPI_SUCCESS;
}
+int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
+ /* From MPI forum advices
+ The call to MPI_WIN_COMPLETE does not return until the put call has completed at
+ the origin; and the target window will be accessed by the put operation only
+ after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by the target
+ process. This still leaves much choice to implementors. The call to
+ MPI_WIN_START can block until the matching call to MPI_WIN_POST occurs at all
+ target processes. One can also have implementations where the call to
+ MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching
+ call to MPI_WIN_POST occurred; or implementations where the first two calls are
+ nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call to
+ MPI_WIN_POST occurred; or even implementations where all three calls can
+ complete before any target process called MPI_WIN_POST --- the data put must be
+ buffered, in this last case, so as to allow the put to complete at the origin
+ ahead of its completion at the target. However, once the call to MPI_WIN_POST is
+ issued, the sequence above must complete, without further dependencies.
+ */
+
+ //naive, blocking implementation.
+ int i=0,j=0;
+ int size = smpi_group_size(group);
+ MPI_Request* reqs = xbt_new0(MPI_Request, size);
+
+// for(i=0;i<size;i++){
+ while(j!=size){
+ int src=smpi_group_index(group,j);
+ if(src!=smpi_process_index()){
+ reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
+ i++;
+ }
+ j++;
+ }
+ size=i;
+ smpi_mpi_startall(size, reqs);
+ smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
+ for(i=0;i<size;i++){
+ smpi_mpi_request_free(&reqs[i]);
+ }
+ xbt_free(reqs);
+ win->opened++; //we're open for business !
+ win->group=group;
+ smpi_group_use(group);
+ return MPI_SUCCESS;
+}
+
+int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
+ //let's make a synchronous send here
+ int i=0,j=0;
+ int size = smpi_group_size(group);
+ MPI_Request* reqs = xbt_new0(MPI_Request, size);
+
+ while(j!=size){
+ int dst=smpi_group_index(group,j);
+ if(dst!=smpi_process_index()){
+ reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst,
+ RMA_TAG+4, MPI_COMM_WORLD);
+ i++;
+ }
+ j++;
+ }
+ size=i;
+
+ smpi_mpi_startall(size, reqs);
+ smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
+ for(i=0;i<size;i++){
+ smpi_mpi_request_free(&reqs[i]);
+ }
+ xbt_free(reqs);
+ win->opened++; //we're open for business !
+ win->group=group;
+ smpi_group_use(group);
+ return MPI_SUCCESS;
+}
+
+int smpi_mpi_win_complete(MPI_Win win){
+ if(win->opened==0)
+ xbt_die("Complete called on already opened MPI_Win");
+// xbt_barrier_wait(win->bar);
+ //MPI_Comm comm = smpi_comm_new(win->group, NULL);
+ //mpi_coll_barrier_fun(comm);
+ //smpi_comm_destroy(comm);
+
+ XBT_DEBUG("Entering MPI_Win_Complete");
+ int i=0,j=0;
+ int size = smpi_group_size(win->group);
+ MPI_Request* reqs = xbt_new0(MPI_Request, size);
+
+ while(j!=size){
+ int dst=smpi_group_index(win->group,j);
+ if(dst!=smpi_process_index()){
+ reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst,
+ RMA_TAG+5, MPI_COMM_WORLD);
+ i++;
+ }
+ j++;
+ }
+ size=i;
+ XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
+ smpi_mpi_startall(size, reqs);
+ smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
+
+ for(i=0;i<size;i++){
+ smpi_mpi_request_free(&reqs[i]);
+ }
+ xbt_free(reqs);
+
+ //now we can finish RMA calls
+
+ xbt_dynar_t reqqs = win->requests;
+ size = xbt_dynar_length(reqqs);
+
+ XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
+ unsigned int cpt=0;
+ MPI_Request req;
+ // start all requests that have been prepared by another process
+ xbt_dynar_foreach(reqqs, cpt, req){
+ if (req->flags & PREPARED) smpi_mpi_start(req);
+ }
+
+ MPI_Request* treqs = xbt_dynar_to_array(reqqs);
+ smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+ xbt_free(treqs);
+ win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
+ win->opened--; //we're closed for business !
+ return MPI_SUCCESS;
+}
+
+
+
+int smpi_mpi_win_wait(MPI_Win win){
+// xbt_barrier_wait(win->bar);
+ //MPI_Comm comm = smpi_comm_new(win->group, NULL);
+ //mpi_coll_barrier_fun(comm);
+ //smpi_comm_destroy(comm);
+ //naive, blocking implementation.
+ XBT_DEBUG("Entering MPI_Win_Wait");
+ int i=0,j=0;
+ int size = smpi_group_size(win->group);
+ MPI_Request* reqs = xbt_new0(MPI_Request, size);
+
+// for(i=0;i<size;i++){
+ while(j!=size){
+ int src=smpi_group_index(win->group,j);
+ if(src!=smpi_process_index()){
+ reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
+ i++;
+ }
+ j++;
+ }
+ size=i;
+ XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
+ smpi_mpi_startall(size, reqs);
+ smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
+ for(i=0;i<size;i++){
+ smpi_mpi_request_free(&reqs[i]);
+ }
+ xbt_free(reqs);
+
+ xbt_dynar_t reqqs = win->requests;
+ size = xbt_dynar_length(reqqs);
+
+ XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
+
+ unsigned int cpt=0;
+ MPI_Request req;
+ // start all requests that have been prepared by another process
+ xbt_dynar_foreach(reqqs, cpt, req){
+ if (req->flags & PREPARED) smpi_mpi_start(req);
+ }
+
+ MPI_Request* treqs = xbt_dynar_to_array(reqqs);
+ smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+ xbt_free(treqs);
+ win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
+ win->opened--; //we're opened for business !
+ return MPI_SUCCESS;
+}