From: Augustin Degomme Date: Mon, 23 Jun 2014 15:30:01 +0000 (+0200) Subject: First steps for implemtation of MPI Onesided Operations : X-Git-Tag: v3_12~964^2~14 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/2b4010b2a5fda082278a02a35a9e9eb18b58e110 First steps for implemtation of MPI Onesided Operations : Support for MPI_Win structures, MPI_Put, MPI_Get, MPI_Accumulate, MPI_Win_Fence --- diff --git a/buildtools/Cmake/DefinePackages.cmake b/buildtools/Cmake/DefinePackages.cmake index ca2166553d..662d2f8c4b 100644 --- a/buildtools/Cmake/DefinePackages.cmake +++ b/buildtools/Cmake/DefinePackages.cmake @@ -231,6 +231,7 @@ set(SMPI_SRC src/smpi/smpi_mpi_dt.c src/smpi/smpi_pmpi.c src/smpi/smpi_replay.c + src/smpi/smpi_rma.c src/smpi/smpi_topo.c ) diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index 3d0fb98dac..788e00189c 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -68,6 +68,7 @@ SG_BEGIN_DECL() #define MPI_ERR_DIMS 17 #define MPI_ERR_TOPOLOGY 18 #define MPI_ERR_NO_MEM 19 +#define MPI_ERR_WIN 20 #define MPI_ERRCODES_IGNORE (int *)0 #define MPI_IDENT 0 #define MPI_SIMILAR 1 @@ -79,6 +80,14 @@ SG_BEGIN_DECL() #define MPI_IO 0 #define MPI_BSEND_OVERHEAD 0 + +#define MPI_MODE_NOSTORE 0x1 +#define MPI_MODE_NOPUT 0x2 +#define MPI_MODE_NOPRECEDE 0x4 +#define MPI_MODE_NOSUCCEED 0x8 +#define MPI_MODE_NOCHECK 0x10 + + #define MPI_KEYVAL_INVALID 0 #define MPI_NULL_COPY_FN NULL #define MPI_NULL_DELETE_FN NULL @@ -122,6 +131,7 @@ SG_BEGIN_DECL() #define MPI_ROOT 0 #define MPI_INFO_NULL -1 #define MPI_COMM_TYPE_SHARED 1 +#define MPI_WIN_NULL NULL #define MPI_VERSION 1 #define MPI_SUBVERSION 1 @@ -182,6 +192,10 @@ typedef struct { int count; } MPI_Status; +struct s_smpi_mpi_win; +typedef struct s_smpi_mpi_win* MPI_Win; +typedef int MPI_Info; + #define MPI_STATUS_IGNORE ((MPI_Status*)NULL) #define MPI_STATUSES_IGNORE ((MPI_Status*)NULL) @@ -251,6 +265,8 @@ XBT_PUBLIC_DATA( MPI_Op ) MPI_LXOR; XBT_PUBLIC_DATA( MPI_Op ) MPI_BAND; XBT_PUBLIC_DATA( MPI_Op ) MPI_BOR; XBT_PUBLIC_DATA( MPI_Op ) MPI_BXOR; +//For accumulate +XBT_PUBLIC_DATA( MPI_Op ) MPI_REPLACE; struct s_smpi_mpi_topology; typedef struct s_smpi_mpi_topology *MPI_Topology; @@ -531,11 +547,26 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Get_library_version, MPI_CALL(XBT_PUBLIC(int), MPI_Reduce_local,(void *inbuf, void *inoutbuf, int count, MPI_Datatype datatype, MPI_Op op)); +MPI_CALL(XBT_PUBLIC(int), MPI_Win_free,( MPI_Win* win)); + +MPI_CALL(XBT_PUBLIC(int), MPI_Win_create,( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, MPI_Win *win)); + +MPI_CALL(XBT_PUBLIC(int), MPI_Win_fence,( int assert, MPI_Win win)); + +MPI_CALL(XBT_PUBLIC(int), MPI_Get,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Put,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Accumulate,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Alloc_mem, (MPI_Aint size, MPI_Info info, void *baseptr)); +MPI_CALL(XBT_PUBLIC(int), MPI_Free_mem, (void *base)); + + //FIXME: these are not yet implemented typedef void MPI_Handler_function(MPI_Comm*, int*, ...); -typedef int MPI_Win; -typedef int MPI_Info; + typedef void* MPI_Errhandler; typedef int MPI_Copy_function(MPI_Comm oldcomm, int keyval, void* extra_state, void* attribute_val_in, @@ -638,9 +669,7 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Get_elements, (MPI_Status* status, MPI_Datatype da MPI_CALL(XBT_PUBLIC(int), MPI_Dims_create, (int nnodes, int ndims, int* dims)); MPI_CALL(XBT_PUBLIC(int), MPI_Initialized, (int* flag)); MPI_CALL(XBT_PUBLIC(int), MPI_Pcontrol, (const int level )); -MPI_CALL(XBT_PUBLIC(int), MPI_Win_fence,( int assert, MPI_Win win)); -MPI_CALL(XBT_PUBLIC(int), MPI_Win_free,( MPI_Win* win)); -MPI_CALL(XBT_PUBLIC(int), MPI_Win_create,( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, MPI_Win *win)); + MPI_CALL(XBT_PUBLIC(int), MPI_Info_create,( MPI_Info *info)); MPI_CALL(XBT_PUBLIC(int), MPI_Info_set,( MPI_Info info, char *key, char *value)); MPI_CALL(XBT_PUBLIC(int), MPI_Info_get,(MPI_Info info,char *key,int valuelen, char *value, int *flag)); @@ -651,8 +680,8 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Info_get_nkeys,( MPI_Info info, int *nkeys)); MPI_CALL(XBT_PUBLIC(int), MPI_Info_get_nthkey,( MPI_Info info, int n, char *key)); MPI_CALL(XBT_PUBLIC(int), MPI_Info_get_valuelen,( MPI_Info info, char *key, int *valuelen, int *flag)); -MPI_CALL(XBT_PUBLIC(int), MPI_Get,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)); + +MPI_CALL(XBT_PUBLIC(int), MPI_Win_set_errhandler, (MPI_Win win, MPI_Errhandler errhandler)); MPI_CALL(XBT_PUBLIC(int), MPI_Type_get_envelope,(MPI_Datatype datatype,int *num_integers,int *num_addresses,int *num_datatypes, int *combiner)); MPI_CALL(XBT_PUBLIC(int), MPI_Type_get_contents,(MPI_Datatype datatype, int max_integers, int max_addresses, int max_datatypes, int* array_of_integers, MPI_Aint* array_of_addresses, diff --git a/src/smpi/private.h b/src/smpi/private.h index 2b8ce55c44..60297357c9 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -29,6 +29,8 @@ typedef struct s_smpi_process_data *smpi_process_data_t; #define SSEND 0x40 #define PREPARED 0x80 #define FINISHED 0x100 +#define RMA 0x200 +#define ACCUMULATE 0x400 enum smpi_process_state{ @@ -107,6 +109,7 @@ typedef struct s_smpi_mpi_request { #endif } s_smpi_mpi_request_t; + void smpi_process_destroy(void); void smpi_process_finalize(void); int smpi_process_finalized(void); @@ -146,6 +149,8 @@ smpi_process_data_t smpi_process_data(void); smpi_process_data_t smpi_process_remote_data(int index); void smpi_process_set_user_data(void *); void* smpi_process_get_user_data(void); +int smpi_process_get_win_id(); +void smpi_process_set_win_id(int); int smpi_process_count(void); MPI_Comm smpi_process_comm_world(void); smx_rdv_t smpi_process_mailbox(void); @@ -175,6 +180,7 @@ int is_datatype_valid(MPI_Datatype datatype); size_t smpi_datatype_size(MPI_Datatype datatype); MPI_Aint smpi_datatype_lb(MPI_Datatype datatype); MPI_Aint smpi_datatype_ub(MPI_Datatype datatype); +MPI_Datatype smpi_datatype_dup(MPI_Datatype datatype); int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb, MPI_Aint * extent); MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype); @@ -254,6 +260,10 @@ MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm); MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm); +MPI_Request smpi_rma_send_init(void *buf, int count, MPI_Datatype datatype, + int src, int dst, int tag, MPI_Comm comm); +MPI_Request smpi_rma_recv_init(void *buf, int count, MPI_Datatype datatype, + int src, int dst, int tag, MPI_Comm comm); void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status * status); void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst, @@ -317,6 +327,19 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); +int smpi_mpi_win_free( MPI_Win* win); + +MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm); + +int smpi_mpi_win_fence( int assert, MPI_Win win); + +int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win); +int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win); +int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win); + void nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, int arity); void nary_tree_barrier(MPI_Comm comm, int arity); diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index 9ac0887107..830cd2d623 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -326,7 +326,7 @@ void smpi_mpi_start(MPI_Request request) if (request->flags & RECV) { print_request("New recv", request); //FIXME: if receive is posted with a large size, but send is smaller, mailboxes may not match ! - if (request->size < sg_cfg_get_int("smpi/async_small_thres")) + if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres")) mailbox = smpi_process_mailbox_small(); else mailbox = smpi_process_mailbox(); @@ -335,7 +335,8 @@ void smpi_mpi_start(MPI_Request request) smpi_datatype_use(request->old_type); smpi_comm_use(request->comm); request->action = simcall_comm_irecv(mailbox, request->buf, - &request->real_size, &match_recv, &smpi_comm_copy_buffer_callback, + &request->real_size, &match_recv, + (request->flags & ACCUMULATE)? NULL : &smpi_comm_copy_buffer_callback, request, -1.0); //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0 @@ -361,7 +362,7 @@ void smpi_mpi_start(MPI_Request request) /* return;*/ /* }*/ print_request("New send", request); - if (request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode + if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode mailbox = smpi_process_remote_mailbox_small(receiver); }else{ XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf); @@ -411,7 +412,7 @@ void smpi_mpi_start(MPI_Request request) buf, request->real_size, &match_send, &xbt_free_f, // how to free the userdata if a detached send fails - &smpi_comm_copy_buffer_callback, + (request->flags & ACCUMULATE)? NULL : &smpi_comm_copy_buffer_callback, request, // detach if msg size < eager/rdv switch limit request->detached); @@ -455,6 +456,26 @@ void smpi_mpi_request_free(MPI_Request * request) } } + +MPI_Request smpi_rma_send_init(void *buf, int count, MPI_Datatype datatype, + int src, int dst, int tag, MPI_Comm comm) +{ + MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */ + request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf , count, datatype, src, dst, tag, + comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED); + return request; +} + +MPI_Request smpi_rma_recv_init(void *buf, int count, MPI_Datatype datatype, + int src, int dst, int tag, MPI_Comm comm) +{ + MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */ + request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, src, dst, tag, + comm, RMA | NON_PERSISTENT | RECV | PREPARED); + return request; +} + + MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { @@ -484,8 +505,6 @@ MPI_Request smpi_mpi_issend(void *buf, int count, MPI_Datatype datatype, return request; } - - MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) { diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index 0072cda76e..8e48139f26 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -34,6 +34,7 @@ typedef struct s_smpi_process_data { char state; int sampling; /* inside an SMPI_SAMPLE_ block? */ char* instance_id; + int nb_wins; xbt_bar_t finalization_barrier; } s_smpi_process_data_t; @@ -102,6 +103,7 @@ void smpi_process_init(int *argc, char ***argv) (*argc)-=2; data->argc = argc; data->argv = argv; + data->nb_wins=0; // set the process attached to the mailbox simcall_rdv_set_receiver(data->mailbox_small, proc); XBT_DEBUG("<%d> New process in the game: %p", index, proc); @@ -203,6 +205,20 @@ void *smpi_process_get_user_data() return process_data->data; } + +int smpi_process_get_win_id() +{ + smpi_process_data_t process_data = smpi_process_data(); + return process_data->nb_wins; +} + +void smpi_process_set_win_id(int id) +{ + smpi_process_data_t process_data = smpi_process_data(); + process_data->nb_wins = id; +} + + int smpi_process_count(void) { return process_count; @@ -481,6 +497,7 @@ static void smpi_init_logs(){ XBT_LOG_CONNECT(smpi_mpi_dt); XBT_LOG_CONNECT(smpi_pmpi); XBT_LOG_CONNECT(smpi_replay); + XBT_LOG_CONNECT(smpi_rma); } diff --git a/src/smpi/smpi_mpi.c b/src/smpi/smpi_mpi.c index 99c527456b..e7899faaed 100644 --- a/src/smpi/smpi_mpi.c +++ b/src/smpi/smpi_mpi.c @@ -512,6 +512,13 @@ int MPI_Alltoallv(void *sendbuf, int *sendcounts, int *senddisps, comm); } +int MPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){ + return PMPI_Alloc_mem(size, info, baseptr); +} + +int MPI_Free_mem(void *baseptr){ + return PMPI_Free_mem(baseptr); +} int MPI_Get_processor_name(char *name, int *resultlen) { @@ -612,13 +619,17 @@ int MPI_Errhandler_set(MPI_Comm comm, MPI_Errhandler errhandler) { } int MPI_Comm_set_errhandler(MPI_Comm comm, MPI_Errhandler errhandler) { - return PMPI_Errhandler_set(comm, errhandler); + return PMPI_Comm_set_errhandler(comm, errhandler); } int MPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler* errhandler) { return PMPI_Errhandler_set(comm, errhandler); } +int MPI_Win_set_errhandler(MPI_Win win, MPI_Errhandler errhandler) { + return PMPI_Win_set_errhandler(win, errhandler); +} + int MPI_Type_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* newtype) { return PMPI_Type_contiguous(count, old_type, newtype); } @@ -837,6 +848,18 @@ int MPI_Get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, target_disp, target_count,target_datatype, win); } +int MPI_Put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win){ + return PMPI_Put( origin_addr,origin_count, origin_datatype,target_rank, + target_disp, target_count,target_datatype, win); +} + +int MPI_Accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win){ + return PMPI_Accumulate( origin_addr,origin_count, origin_datatype,target_rank, + target_disp, target_count,target_datatype,op, win); +} + int MPI_Type_get_envelope( MPI_Datatype datatype, int *num_integers, int *num_addresses, int *num_datatypes, int *combiner){ return PMPI_Type_get_envelope( datatype, num_integers, diff --git a/src/smpi/smpi_mpi_dt.c b/src/smpi/smpi_mpi_dt.c index 7cb76b7949..2ad759ca9a 100644 --- a/src/smpi/smpi_mpi_dt.c +++ b/src/smpi/smpi_mpi_dt.c @@ -145,6 +145,15 @@ MPI_Aint smpi_datatype_ub(MPI_Datatype datatype) return datatype->ub; } +MPI_Datatype smpi_datatype_dup(MPI_Datatype datatype) +{ + MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1); + memcpy(new_t, datatype, sizeof(s_smpi_mpi_datatype_t)); + if (datatype->has_subtype) + memcpy(new_t->substruct, datatype->substruct, sizeof(s_smpi_subtype_t)); + return new_t; +} + int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb, MPI_Aint * extent) { @@ -292,6 +301,7 @@ s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride, new_t->block_stride = block_stride; new_t->block_length = block_length; new_t->block_count = block_count; + smpi_datatype_use(old_type); new_t->old_type = old_type; new_t->size_oldtype = size_oldtype; return new_t; @@ -346,7 +356,7 @@ void smpi_datatype_use(MPI_Datatype type){ void smpi_datatype_unuse(MPI_Datatype type){ if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED)) smpi_datatype_free(&type); - + #ifdef HAVE_MC if(MC_is_active()) MC_ignore(&(type->in_use), sizeof(type->in_use)); @@ -404,6 +414,7 @@ void unserialize_contiguous( const void *contiguous_vector, } void free_contiguous(MPI_Datatype* d){ + smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type); } /* @@ -424,6 +435,7 @@ s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb, new_t->block_count = block_count; new_t->old_type = old_type; new_t->size_oldtype = size_oldtype; + smpi_datatype_use(old_type); return new_t; } @@ -491,6 +503,7 @@ int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_t } void free_vector(MPI_Datatype* d){ + smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type); } /* @@ -591,11 +604,13 @@ s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride, new_t->block_count = block_count; new_t->old_type = old_type; new_t->size_oldtype = size_oldtype; + smpi_datatype_use(old_type); return new_t; } //do nothing for vector types void free_hvector(MPI_Datatype* d){ + smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type); } int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type) @@ -717,6 +732,7 @@ void unserialize_indexed( const void *contiguous_indexed, void free_indexed(MPI_Datatype* type){ xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths); xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices); + smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type); } /* @@ -742,6 +758,7 @@ s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths, new_t->block_indices[i]=block_indices[i]; } new_t->block_count = block_count; + smpi_datatype_use(old_type); new_t->old_type = old_type; new_t->size_oldtype = size_oldtype; return new_t; @@ -878,6 +895,7 @@ void unserialize_hindexed( const void *contiguous_hindexed, void free_hindexed(MPI_Datatype* type){ xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths); xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices); + smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type); } /* @@ -1041,6 +1059,9 @@ void unserialize_struct( const void *contiguous_struct, void free_struct(MPI_Datatype* type){ xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths); xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices); + int i=0; + for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++) + smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]); xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types); } @@ -1066,6 +1087,7 @@ s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths, new_t->block_lengths[i]=block_lengths[i]; new_t->block_indices[i]=block_indices[i]; new_t->old_types[i]=old_types[i]; + smpi_datatype_use(new_t->old_types[i]); } //new_t->block_lengths = block_lengths; //new_t->block_indices = block_indices; @@ -1150,7 +1172,7 @@ typedef struct s_smpi_mpi_op { #define BXOR_OP(a, b) (b) ^= (a) #define MAXLOC_OP(a, b) (b) = (a.value) < (b.value) ? (b) : (a) #define MINLOC_OP(a, b) (b) = (a.value) < (b.value) ? (a) : (b) -//TODO : MINLOC & MAXLOC +#define REPLACE_OP(a,b) (b) = (a) #define APPLY_FUNC(a, b, length, type, func) \ { \ @@ -1455,6 +1477,27 @@ static void maxloc_func(void *a, void *b, int *length, } } +static void replace_func(void *a, void *b, int *length, + MPI_Datatype * datatype) +{ + if (*datatype == MPI_CHAR) { + APPLY_FUNC(a, b, length, char, REPLACE_OP); + } else if (*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, REPLACE_OP); + } else if (*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, REPLACE_OP); + } else if (*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, REPLACE_OP); + } else if (*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, REPLACE_OP); + } else if (*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, REPLACE_OP); + } else if (*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, REPLACE_OP); + } else if (*datatype == MPI_BYTE) { + APPLY_FUNC(a, b, length, uint8_t, REPLACE_OP); + } +} #define CREATE_MPI_OP(name, func) \ static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \ @@ -1472,6 +1515,8 @@ CREATE_MPI_OP(MPI_BOR, bor_func); CREATE_MPI_OP(MPI_BXOR, bxor_func); CREATE_MPI_OP(MPI_MAXLOC, maxloc_func); CREATE_MPI_OP(MPI_MINLOC, minloc_func); +CREATE_MPI_OP(MPI_REPLACE, replace_func); + MPI_Op smpi_op_new(MPI_User_function * function, int commute) { diff --git a/src/smpi/smpi_pmpi.c b/src/smpi/smpi_pmpi.c index 4a9a8d22dd..c2aa98cae1 100644 --- a/src/smpi/smpi_pmpi.c +++ b/src/smpi/smpi_pmpi.c @@ -255,6 +255,18 @@ int PMPI_Type_ub(MPI_Datatype datatype, MPI_Aint * disp) return retval; } +int PMPI_Type_dup(MPI_Datatype datatype, MPI_Datatype *newtype){ + int retval = 0; + + if (datatype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else { + *newtype = smpi_datatype_dup(datatype); + retval = MPI_SUCCESS; + } + return retval; +} + int PMPI_Op_create(MPI_User_function * function, int commute, MPI_Op * op) { int retval = 0; @@ -2697,6 +2709,147 @@ int PMPI_Type_create_resized(MPI_Datatype oldtype,MPI_Aint lb, MPI_Aint extent, } + +int PMPI_Win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, MPI_Win *win){ + int retval = 0; + smpi_bench_end(); + if (comm == MPI_COMM_NULL) { + retval= MPI_ERR_COMM; + }else if ((base == NULL && size != 0) + || disp_unit <= 0 || size < 0 ){ + retval= MPI_ERR_OTHER; + }else{ + *win = smpi_mpi_win_create( base, size, disp_unit, info, comm); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Win_free( MPI_Win* win){ + int retval = 0; + smpi_bench_end(); + if (win == NULL || *win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + }else{ + retval=smpi_mpi_win_free(win); + } + smpi_bench_begin(); + return retval; +} + + +int PMPI_Win_fence( int assert, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else { + retval = smpi_mpi_win_fence(assert, win); + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (target_rank == MPI_PROC_NULL) { + retval = MPI_SUCCESS; + } else if (target_rank <0){ + retval = MPI_ERR_RANK; + } else if (target_disp <0){ + retval = MPI_ERR_ARG; + } else if (origin_count < 0 || target_count < 0) { + retval = MPI_ERR_COUNT; + } else if (origin_addr==NULL && origin_count > 0){ + retval = MPI_ERR_COUNT; + } else if ((!is_datatype_valid(origin_datatype)) || + (!is_datatype_valid(target_datatype))) { + retval = MPI_ERR_TYPE; + } else { + retval = smpi_mpi_get( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, target_datatype, win); + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (target_rank == MPI_PROC_NULL) { + retval = MPI_SUCCESS; + } else if (target_rank <0){ + retval = MPI_ERR_RANK; + } else if (target_disp <0){ + retval = MPI_ERR_ARG; + } else if (origin_count < 0 || target_count < 0) { + retval = MPI_ERR_COUNT; + } else if (origin_addr==NULL && origin_count > 0){ + retval = MPI_ERR_COUNT; + } else if ((!is_datatype_valid(origin_datatype)) || + (!is_datatype_valid(target_datatype))) { + retval = MPI_ERR_TYPE; + } else { + retval = smpi_mpi_put( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, target_datatype, win); + } + smpi_bench_begin(); + return retval; +} + + +int PMPI_Accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (target_rank == MPI_PROC_NULL) { + retval = MPI_SUCCESS; + } else if (target_rank <0){ + retval = MPI_ERR_RANK; + } else if (target_disp <0){ + retval = MPI_ERR_ARG; + } else if (origin_count < 0 || target_count < 0) { + retval = MPI_ERR_COUNT; + } else if (origin_addr==NULL && origin_count > 0){ + retval = MPI_ERR_COUNT; + } else if ((!is_datatype_valid(origin_datatype)) || + (!is_datatype_valid(target_datatype))) { + retval = MPI_ERR_TYPE; + } else if (op == MPI_OP_NULL) { + retval = MPI_ERR_OP; + } else { + retval = smpi_mpi_accumulate( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, target_datatype, op, win); + } + smpi_bench_begin(); + return retval; +} + + +int PMPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){ + void *ptr = xbt_malloc(size); + if(!ptr) + return MPI_ERR_NO_MEM; + else { + *(void **)baseptr = ptr; + return MPI_SUCCESS; + } +} + +int PMPI_Free_mem(void *baseptr){ + xbt_free(baseptr); + return MPI_SUCCESS; +} + + + /* The following calls are not yet implemented and will fail at runtime. */ /* Once implemented, please move them above this notice. */ @@ -2705,10 +2858,6 @@ int PMPI_Type_create_resized(MPI_Datatype oldtype,MPI_Aint lb, MPI_Aint extent, return MPI_SUCCESS; \ } -int PMPI_Type_dup(MPI_Datatype datatype, MPI_Datatype *newtype){ - NOT_YET_IMPLEMENTED -} - int PMPI_Type_set_name(MPI_Datatype datatype, char * name) { NOT_YET_IMPLEMENTED @@ -2781,6 +2930,10 @@ int PMPI_Comm_set_errhandler(MPI_Comm comm, MPI_Errhandler errhandler) { NOT_YET_IMPLEMENTED } +int PMPI_Win_set_errhandler(MPI_Win win, MPI_Errhandler errhandler) { + NOT_YET_IMPLEMENTED +} + int PMPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler* errhandler) { NOT_YET_IMPLEMENTED } @@ -2942,18 +3095,6 @@ int PMPI_Get_elements(MPI_Status* status, MPI_Datatype datatype, int* elements) NOT_YET_IMPLEMENTED } -int PMPI_Win_fence( int assert, MPI_Win win){ - NOT_YET_IMPLEMENTED -} - -int PMPI_Win_free( MPI_Win* win){ - NOT_YET_IMPLEMENTED -} - -int PMPI_Win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, MPI_Win *win){ - NOT_YET_IMPLEMENTED -} - int PMPI_Info_create( MPI_Info *info){ NOT_YET_IMPLEMENTED } @@ -2966,11 +3107,6 @@ int PMPI_Info_free( MPI_Info *info){ NOT_YET_IMPLEMENTED } -int PMPI_Get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win){ - NOT_YET_IMPLEMENTED -} - int PMPI_Type_get_envelope( MPI_Datatype datatype, int *num_integers, int *num_addresses, int *num_datatypes, int *combiner){ NOT_YET_IMPLEMENTED diff --git a/src/smpi/smpi_rma.c b/src/smpi/smpi_rma.c new file mode 100644 index 0000000000..9ff12c8afe --- /dev/null +++ b/src/smpi/smpi_rma.c @@ -0,0 +1,224 @@ + +/* Copyright (c) 2007-2014. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "private.h" + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)"); + +#define RMA_TAG -1234 + +/* FIXME:using a global array of MPI_Win simplifies the way to exchange pointers and info, + * but it breaks distributed simulation + */ + +xbt_bar_t creation_bar = NULL; + +typedef struct s_smpi_mpi_win{ + void* base; + MPI_Aint size; + int disp_unit; + MPI_Comm comm; + //MPI_Info info + int assert; + xbt_dynar_t requests; + xbt_bar_t bar; + MPI_Win* connected_wins; +} s_smpi_mpi_win_t; + + +MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){ + + MPI_Win win; + + int comm_size = smpi_comm_size(comm); + int rank=smpi_comm_rank(comm); + XBT_DEBUG("Creating window"); + + win = xbt_new(s_smpi_mpi_win_t, 1); + win->base = base; + win->size = size; + win->disp_unit = disp_unit; + win->assert = 0; + //win->info = info; + win->comm = comm; + win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL); + win->connected_wins = xbt_malloc0(comm_size*sizeof(MPI_Win)); + win->connected_wins[rank] = win; + + if(rank==0){ + win->bar=xbt_barrier_init(comm_size); + } + + mpi_coll_allgather_fun(&(win->connected_wins[rank]), + sizeof(MPI_Win), + MPI_BYTE, + win->connected_wins, + sizeof(MPI_Win), + MPI_BYTE, + comm); + + mpi_coll_bcast_fun( &(win->bar), + sizeof(xbt_bar_t), + MPI_BYTE, + 0, + comm); + + mpi_coll_barrier_fun(comm); + + return win; +} + +int smpi_mpi_win_free( MPI_Win* win){ + + //As per the standard, perform a barrier to ensure every async comm is finished + xbt_barrier_wait((*win)->bar); + xbt_dynar_free(&(*win)->requests); + xbt_free((*win)->connected_wins); + xbt_free(*win); + win = MPI_WIN_NULL; + return MPI_SUCCESS; +} + + +int smpi_mpi_win_fence( int assert, MPI_Win win){ + + XBT_DEBUG("Entering fence"); + + if(assert != MPI_MODE_NOPRECEDE){ + xbt_barrier_wait(win->bar); + + xbt_dynar_t reqs = win->requests; + int size = xbt_dynar_length(reqs); + unsigned int cpt=0; + MPI_Request req; + // start all requests that have been prepared by another process + xbt_dynar_foreach(reqs, cpt, req){ + if (req->flags & PREPARED) smpi_mpi_start(req); + } + + MPI_Request* treqs = xbt_dynar_to_array(reqs); + smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); + xbt_free(treqs); + win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL); + + } + win->assert = assert; + + xbt_barrier_wait(win->bar); + XBT_DEBUG("Leaving fence "); + + return MPI_SUCCESS; +} + +int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win) +{ + //get receiver pointer + MPI_Win recv_win = win->connected_wins[target_rank]; + + void* recv_addr = recv_win->base + target_disp * smpi_datatype_size(target_datatype)/* recv_win->disp_unit*/; + smpi_datatype_use(origin_datatype); + smpi_datatype_use(target_datatype); + XBT_DEBUG("Entering MPI_Put to %d", target_rank); + + if(target_rank != smpi_comm_rank(win->comm)){ + //prepare send_request + MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, + smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm); + + //prepare receiver request + MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, + smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm); + + //push request to receiver's win + xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq); + + //start send + smpi_mpi_start(sreq); + + //push request to sender's win + xbt_dynar_push_as(win->requests, MPI_Request, sreq); + } + //perform actual copy + /*smpi_datatype_copy(origin_addr, origin_count, origin_datatype, + recv_addr, target_count, target_datatype);*/ + + return MPI_SUCCESS; +} + +int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win) +{ + //get sender pointer + MPI_Win send_win = win->connected_wins[target_rank]; + + void* send_addr = send_win->base + target_disp * smpi_datatype_size(target_datatype)/** send_win->disp_unit*/; + smpi_datatype_use(origin_datatype); + smpi_datatype_use(target_datatype); + XBT_DEBUG("Entering MPI_Get from %d", target_rank); + + if(target_rank != smpi_comm_rank(win->comm)){ + //prepare send_request + MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype, + smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm); + + //prepare receiver request + MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype, + smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm); + + //push request to receiver's win + xbt_dynar_push_as(send_win->requests, MPI_Request, sreq); + + //start recv + smpi_mpi_start(rreq); + + //push request to sender's win + xbt_dynar_push_as(win->requests, MPI_Request, rreq); + } + //perform actual copy + /*smpi_datatype_copy(send_addr, target_count, target_datatype, + origin_addr, origin_count, origin_datatype);*/ + + + return MPI_SUCCESS; +} + + +int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win) +{ + //get receiver pointer + MPI_Win recv_win = win->connected_wins[target_rank]; + + void* recv_addr = recv_win->base + target_disp * smpi_datatype_size(target_datatype) /** recv_win->disp_unit*/; + XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank); + + smpi_datatype_use(origin_datatype); + smpi_datatype_use(target_datatype); + + if(target_rank != smpi_comm_rank(win->comm)){ + //prepare send_request + MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, + smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm); + + //prepare receiver request + MPI_Request rreq = smpi_rma_recv_init(NULL, 0, target_datatype, + smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm); + rreq->flags |= ACCUMULATE; + //push request to receiver's win + xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq); + //start send + smpi_mpi_start(sreq); + //push request to sender's win + xbt_dynar_push_as(win->requests, MPI_Request, sreq); + } + //perform actual accumulation + smpi_op_apply(op, origin_addr, recv_addr, &origin_count, &origin_datatype); + + return MPI_SUCCESS; +} +