Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
First steps for implemtation of MPI Onesided Operations :
authorAugustin Degomme <augustin.degomme@imag.fr>
Mon, 23 Jun 2014 15:30:01 +0000 (17:30 +0200)
committerAugustin Degomme <augustin.degomme@imag.fr>
Mon, 23 Jun 2014 15:48:09 +0000 (17:48 +0200)
Support for MPI_Win structures, MPI_Put, MPI_Get, MPI_Accumulate, MPI_Win_Fence

buildtools/Cmake/DefinePackages.cmake
include/smpi/smpi.h
src/smpi/private.h
src/smpi/smpi_base.c
src/smpi/smpi_global.c
src/smpi/smpi_mpi.c
src/smpi/smpi_mpi_dt.c
src/smpi/smpi_pmpi.c
src/smpi/smpi_rma.c [new file with mode: 0644]

index ca21665..662d2f8 100644 (file)
@@ -231,6 +231,7 @@ set(SMPI_SRC
   src/smpi/smpi_mpi_dt.c
   src/smpi/smpi_pmpi.c
   src/smpi/smpi_replay.c
+  src/smpi/smpi_rma.c
   src/smpi/smpi_topo.c
   )
 
index 3d0fb98..788e001 100644 (file)
@@ -68,6 +68,7 @@ SG_BEGIN_DECL()
 #define MPI_ERR_DIMS      17
 #define MPI_ERR_TOPOLOGY  18
 #define MPI_ERR_NO_MEM    19
+#define MPI_ERR_WIN       20
 #define MPI_ERRCODES_IGNORE (int *)0
 #define MPI_IDENT     0
 #define MPI_SIMILAR   1
@@ -79,6 +80,14 @@ SG_BEGIN_DECL()
 #define MPI_IO               0
 #define MPI_BSEND_OVERHEAD   0
 
+
+#define MPI_MODE_NOSTORE 0x1
+#define MPI_MODE_NOPUT 0x2
+#define MPI_MODE_NOPRECEDE 0x4
+#define MPI_MODE_NOSUCCEED 0x8
+#define MPI_MODE_NOCHECK 0x10
+
+
 #define MPI_KEYVAL_INVALID 0
 #define MPI_NULL_COPY_FN NULL
 #define MPI_NULL_DELETE_FN NULL
@@ -122,6 +131,7 @@ SG_BEGIN_DECL()
 #define MPI_ROOT 0
 #define MPI_INFO_NULL -1
 #define MPI_COMM_TYPE_SHARED    1
+#define MPI_WIN_NULL NULL
 
 #define MPI_VERSION 1
 #define MPI_SUBVERSION 1
@@ -182,6 +192,10 @@ typedef struct {
   int count;
 } MPI_Status;
 
+struct s_smpi_mpi_win;
+typedef struct s_smpi_mpi_win* MPI_Win;
+typedef int MPI_Info;
+
 #define MPI_STATUS_IGNORE ((MPI_Status*)NULL)
 #define MPI_STATUSES_IGNORE ((MPI_Status*)NULL)
 
@@ -251,6 +265,8 @@ XBT_PUBLIC_DATA( MPI_Op ) MPI_LXOR;
 XBT_PUBLIC_DATA( MPI_Op ) MPI_BAND;
 XBT_PUBLIC_DATA( MPI_Op ) MPI_BOR;
 XBT_PUBLIC_DATA( MPI_Op ) MPI_BXOR;
+//For accumulate
+XBT_PUBLIC_DATA( MPI_Op ) MPI_REPLACE;
 
 struct s_smpi_mpi_topology;
 typedef struct s_smpi_mpi_topology *MPI_Topology;
@@ -531,11 +547,26 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Get_library_version,
 MPI_CALL(XBT_PUBLIC(int), MPI_Reduce_local,(void *inbuf, void *inoutbuf, int count,
     MPI_Datatype datatype, MPI_Op op));
 
+MPI_CALL(XBT_PUBLIC(int), MPI_Win_free,( MPI_Win* win));
+
+MPI_CALL(XBT_PUBLIC(int), MPI_Win_create,( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, MPI_Win *win));
+
+MPI_CALL(XBT_PUBLIC(int), MPI_Win_fence,( int assert,  MPI_Win win));
+
+MPI_CALL(XBT_PUBLIC(int), MPI_Get,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+    MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win));
+MPI_CALL(XBT_PUBLIC(int), MPI_Put,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+    MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win));
+MPI_CALL(XBT_PUBLIC(int), MPI_Accumulate,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+    MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win));
+MPI_CALL(XBT_PUBLIC(int), MPI_Alloc_mem, (MPI_Aint size, MPI_Info info, void *baseptr));
+MPI_CALL(XBT_PUBLIC(int), MPI_Free_mem, (void *base));
+
+
 //FIXME: these are not yet implemented
 
 typedef void MPI_Handler_function(MPI_Comm*, int*, ...);
-typedef int MPI_Win;
-typedef int MPI_Info;
+
 typedef void* MPI_Errhandler;
 
 typedef int MPI_Copy_function(MPI_Comm oldcomm, int keyval, void* extra_state, void* attribute_val_in,
@@ -638,9 +669,7 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Get_elements, (MPI_Status* status, MPI_Datatype da
 MPI_CALL(XBT_PUBLIC(int), MPI_Dims_create, (int nnodes, int ndims, int* dims));
 MPI_CALL(XBT_PUBLIC(int), MPI_Initialized, (int* flag));
 MPI_CALL(XBT_PUBLIC(int), MPI_Pcontrol, (const int level ));
-MPI_CALL(XBT_PUBLIC(int), MPI_Win_fence,( int assert,  MPI_Win win));
-MPI_CALL(XBT_PUBLIC(int), MPI_Win_free,( MPI_Win* win));
-MPI_CALL(XBT_PUBLIC(int), MPI_Win_create,( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, MPI_Win *win));
+
 MPI_CALL(XBT_PUBLIC(int), MPI_Info_create,( MPI_Info *info));
 MPI_CALL(XBT_PUBLIC(int), MPI_Info_set,( MPI_Info info, char *key, char *value));
 MPI_CALL(XBT_PUBLIC(int), MPI_Info_get,(MPI_Info info,char *key,int valuelen, char *value, int *flag));
@@ -651,8 +680,8 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Info_get_nkeys,( MPI_Info info, int *nkeys));
 MPI_CALL(XBT_PUBLIC(int), MPI_Info_get_nthkey,( MPI_Info info, int n, char *key));
 MPI_CALL(XBT_PUBLIC(int), MPI_Info_get_valuelen,( MPI_Info info, char *key, int *valuelen, int *flag));
 
-MPI_CALL(XBT_PUBLIC(int), MPI_Get,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
-    MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win));
+
+MPI_CALL(XBT_PUBLIC(int), MPI_Win_set_errhandler, (MPI_Win win, MPI_Errhandler errhandler));
 MPI_CALL(XBT_PUBLIC(int), MPI_Type_get_envelope,(MPI_Datatype datatype,int *num_integers,int *num_addresses,int *num_datatypes, int *combiner));
 MPI_CALL(XBT_PUBLIC(int), MPI_Type_get_contents,(MPI_Datatype datatype, int max_integers, int max_addresses,
                             int max_datatypes, int* array_of_integers, MPI_Aint* array_of_addresses, 
index 2b8ce55..6029735 100644 (file)
@@ -29,6 +29,8 @@ typedef struct s_smpi_process_data *smpi_process_data_t;
 #define SSEND          0x40
 #define PREPARED       0x80
 #define FINISHED       0x100
+#define RMA            0x200
+#define ACCUMULATE     0x400
 
 
 enum smpi_process_state{
@@ -107,6 +109,7 @@ typedef struct s_smpi_mpi_request {
 #endif
 } s_smpi_mpi_request_t;
 
+
 void smpi_process_destroy(void);
 void smpi_process_finalize(void);
 int smpi_process_finalized(void);
@@ -146,6 +149,8 @@ smpi_process_data_t smpi_process_data(void);
 smpi_process_data_t smpi_process_remote_data(int index);
 void smpi_process_set_user_data(void *);
 void* smpi_process_get_user_data(void);
+int smpi_process_get_win_id();
+void smpi_process_set_win_id(int);
 int smpi_process_count(void);
 MPI_Comm smpi_process_comm_world(void);
 smx_rdv_t smpi_process_mailbox(void);
@@ -175,6 +180,7 @@ int is_datatype_valid(MPI_Datatype datatype);
 size_t smpi_datatype_size(MPI_Datatype datatype);
 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype);
 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype);
+MPI_Datatype smpi_datatype_dup(MPI_Datatype datatype);
 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
                          MPI_Aint * extent);
 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype);
@@ -254,6 +260,10 @@ MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
                             int src, int tag, MPI_Comm comm);
 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
                            int src, int tag, MPI_Comm comm);
+MPI_Request smpi_rma_send_init(void *buf, int count, MPI_Datatype datatype,
+                            int src, int dst, int tag, MPI_Comm comm);
+MPI_Request smpi_rma_recv_init(void *buf, int count, MPI_Datatype datatype,
+                            int src, int dst, int tag, MPI_Comm comm);
 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
                    int tag, MPI_Comm comm, MPI_Status * status);
 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
@@ -317,6 +327,19 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count,
                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
 
+int smpi_mpi_win_free( MPI_Win* win);
+
+MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm);
+
+int smpi_mpi_win_fence( int assert,  MPI_Win win);
+
+int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win);
+int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win);
+int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win);
+
 void nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root,
                      MPI_Comm comm, int arity);
 void nary_tree_barrier(MPI_Comm comm, int arity);
index 9ac0887..830cd2d 100644 (file)
@@ -326,7 +326,7 @@ void smpi_mpi_start(MPI_Request request)
   if (request->flags & RECV) {
     print_request("New recv", request);
     //FIXME: if receive is posted with a large size, but send is smaller, mailboxes may not match !
-    if (request->size < sg_cfg_get_int("smpi/async_small_thres"))
+    if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres"))
       mailbox = smpi_process_mailbox_small();
     else
       mailbox = smpi_process_mailbox();
@@ -335,7 +335,8 @@ void smpi_mpi_start(MPI_Request request)
     smpi_datatype_use(request->old_type);
     smpi_comm_use(request->comm);
     request->action = simcall_comm_irecv(mailbox, request->buf,
-                                         &request->real_size, &match_recv, &smpi_comm_copy_buffer_callback,
+                                         &request->real_size, &match_recv,
+                                         (request->flags & ACCUMULATE)? NULL : &smpi_comm_copy_buffer_callback,
                                          request, -1.0);
 
     //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0
@@ -361,7 +362,7 @@ void smpi_mpi_start(MPI_Request request)
 /*      return;*/
 /*    }*/
     print_request("New send", request);
-    if (request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
+    if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
       mailbox = smpi_process_remote_mailbox_small(receiver);
     }else{
       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
@@ -411,7 +412,7 @@ void smpi_mpi_start(MPI_Request request)
                          buf, request->real_size,
                          &match_send,
                          &xbt_free_f, // how to free the userdata if a detached send fails
-                         &smpi_comm_copy_buffer_callback,
+                         (request->flags & ACCUMULATE)? NULL : &smpi_comm_copy_buffer_callback,
                          request,
                          // detach if msg size < eager/rdv switch limit
                          request->detached);
@@ -455,6 +456,26 @@ void smpi_mpi_request_free(MPI_Request * request)
   }
 }
 
+
+MPI_Request smpi_rma_send_init(void *buf, int count, MPI_Datatype datatype,
+                            int src, int dst, int tag, MPI_Comm comm)
+{
+  MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
+  request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf , count, datatype, src, dst, tag,
+                          comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED);
+  return request;
+}
+
+MPI_Request smpi_rma_recv_init(void *buf, int count, MPI_Datatype datatype,
+                            int src, int dst, int tag, MPI_Comm comm)
+{
+  MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
+  request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype,  src, dst, tag,
+                          comm, RMA | NON_PERSISTENT | RECV | PREPARED);
+  return request;
+}
+
+
 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
                             int dst, int tag, MPI_Comm comm)
 {
@@ -484,8 +505,6 @@ MPI_Request smpi_mpi_issend(void *buf, int count, MPI_Datatype datatype,
   return request;
 }
 
-
-
 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
                             int src, int tag, MPI_Comm comm)
 {
index 0072cda..8e48139 100644 (file)
@@ -34,6 +34,7 @@ typedef struct s_smpi_process_data {
   char state;
   int sampling;                 /* inside an SMPI_SAMPLE_ block? */
   char* instance_id;
+  int nb_wins;
   xbt_bar_t finalization_barrier;
 } s_smpi_process_data_t;
 
@@ -102,6 +103,7 @@ void smpi_process_init(int *argc, char ***argv)
     (*argc)-=2;
     data->argc = argc;
     data->argv = argv;
+    data->nb_wins=0;
     // set the process attached to the mailbox
     simcall_rdv_set_receiver(data->mailbox_small, proc);
     XBT_DEBUG("<%d> New process in the game: %p", index, proc);
@@ -203,6 +205,20 @@ void *smpi_process_get_user_data()
   return process_data->data;
 }
 
+
+int smpi_process_get_win_id()
+{
+  smpi_process_data_t process_data = smpi_process_data();
+  return process_data->nb_wins;
+}
+
+void smpi_process_set_win_id(int id)
+{
+  smpi_process_data_t process_data = smpi_process_data();
+  process_data->nb_wins = id;
+}
+
+
 int smpi_process_count(void)
 {
   return process_count;
@@ -481,6 +497,7 @@ static void smpi_init_logs(){
   XBT_LOG_CONNECT(smpi_mpi_dt);
   XBT_LOG_CONNECT(smpi_pmpi);
   XBT_LOG_CONNECT(smpi_replay);
+  XBT_LOG_CONNECT(smpi_rma);
 
 }
 
index 99c5274..e7899fa 100644 (file)
@@ -512,6 +512,13 @@ int MPI_Alltoallv(void *sendbuf, int *sendcounts, int *senddisps,
                         comm);
 }
 
+int MPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){
+  return PMPI_Alloc_mem(size, info, baseptr);
+}
+
+int MPI_Free_mem(void *baseptr){
+  return PMPI_Free_mem(baseptr);
+}
 
 int MPI_Get_processor_name(char *name, int *resultlen)
 {
@@ -612,13 +619,17 @@ int MPI_Errhandler_set(MPI_Comm comm, MPI_Errhandler errhandler) {
 }
 
 int MPI_Comm_set_errhandler(MPI_Comm comm, MPI_Errhandler errhandler) {
-  return PMPI_Errhandler_set(comm, errhandler);
+  return PMPI_Comm_set_errhandler(comm, errhandler);
 }
 
 int MPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler* errhandler) {
   return PMPI_Errhandler_set(comm, errhandler);
 }
 
+int MPI_Win_set_errhandler(MPI_Win win, MPI_Errhandler errhandler) {
+  return PMPI_Win_set_errhandler(win, errhandler);
+}
+
 int MPI_Type_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* newtype) {
   return PMPI_Type_contiguous(count, old_type, newtype);
 }
@@ -837,6 +848,18 @@ int MPI_Get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
       target_disp, target_count,target_datatype, win);
 }
 
+int MPI_Put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+    MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win){
+  return PMPI_Put( origin_addr,origin_count, origin_datatype,target_rank,
+      target_disp, target_count,target_datatype, win);
+}
+
+int MPI_Accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+    MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win){
+  return PMPI_Accumulate( origin_addr,origin_count, origin_datatype,target_rank,
+      target_disp, target_count,target_datatype,op, win);
+}
+
 int MPI_Type_get_envelope( MPI_Datatype datatype, int *num_integers,
                           int *num_addresses, int *num_datatypes, int *combiner){
   return PMPI_Type_get_envelope(  datatype, num_integers,
index 7cb76b7..2ad759c 100644 (file)
@@ -145,6 +145,15 @@ MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
   return datatype->ub;
 }
 
+MPI_Datatype smpi_datatype_dup(MPI_Datatype datatype)
+{
+  MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
+  memcpy(new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
+  if (datatype->has_subtype)
+    memcpy(new_t->substruct, datatype->substruct, sizeof(s_smpi_subtype_t));
+  return new_t;
+}
+
 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
                          MPI_Aint * extent)
 {
@@ -292,6 +301,7 @@ s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
   new_t->block_stride = block_stride;
   new_t->block_length = block_length;
   new_t->block_count = block_count;
+  smpi_datatype_use(old_type);
   new_t->old_type = old_type;
   new_t->size_oldtype = size_oldtype;
   return new_t;
@@ -346,7 +356,7 @@ void smpi_datatype_use(MPI_Datatype type){
 void smpi_datatype_unuse(MPI_Datatype type){
   if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED))
     smpi_datatype_free(&type);
-  
+
 #ifdef HAVE_MC
   if(MC_is_active())
     MC_ignore(&(type->in_use), sizeof(type->in_use));
@@ -404,6 +414,7 @@ void unserialize_contiguous( const void *contiguous_vector,
 }
 
 void free_contiguous(MPI_Datatype* d){
+  smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
 }
 
 /*
@@ -424,6 +435,7 @@ s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
   new_t->block_count = block_count;
   new_t->old_type = old_type;
   new_t->size_oldtype = size_oldtype;
+  smpi_datatype_use(old_type);
   return new_t;
 }
 
@@ -491,6 +503,7 @@ int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_t
 }
 
 void free_vector(MPI_Datatype* d){
+  smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
 }
 
 /*
@@ -591,11 +604,13 @@ s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
   new_t->block_count = block_count;
   new_t->old_type = old_type;
   new_t->size_oldtype = size_oldtype;
+  smpi_datatype_use(old_type);
   return new_t;
 }
 
 //do nothing for vector types
 void free_hvector(MPI_Datatype* d){
+  smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
 }
 
 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
@@ -717,6 +732,7 @@ void unserialize_indexed( const void *contiguous_indexed,
 void free_indexed(MPI_Datatype* type){
   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
+  smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
 }
 
 /*
@@ -742,6 +758,7 @@ s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
     new_t->block_indices[i]=block_indices[i];
   }
   new_t->block_count = block_count;
+  smpi_datatype_use(old_type);
   new_t->old_type = old_type;
   new_t->size_oldtype = size_oldtype;
   return new_t;
@@ -878,6 +895,7 @@ void unserialize_hindexed( const void *contiguous_hindexed,
 void free_hindexed(MPI_Datatype* type){
   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
+  smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
 }
 
 /*
@@ -1041,6 +1059,9 @@ void unserialize_struct( const void *contiguous_struct,
 void free_struct(MPI_Datatype* type){
   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
+  int i=0;
+  for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
+    smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
 }
 
@@ -1066,6 +1087,7 @@ s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
     new_t->block_lengths[i]=block_lengths[i];
     new_t->block_indices[i]=block_indices[i];
     new_t->old_types[i]=old_types[i];
+    smpi_datatype_use(new_t->old_types[i]);
   }
   //new_t->block_lengths = block_lengths;
   //new_t->block_indices = block_indices;
@@ -1150,7 +1172,7 @@ typedef struct s_smpi_mpi_op {
 #define BXOR_OP(a, b) (b) ^= (a)
 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
-//TODO : MINLOC & MAXLOC
+#define REPLACE_OP(a,b) (b) = (a)
 
 #define APPLY_FUNC(a, b, length, type, func) \
 {                                          \
@@ -1455,6 +1477,27 @@ static void maxloc_func(void *a, void *b, int *length,
   }
 }
 
+static void replace_func(void *a, void *b, int *length,
+                        MPI_Datatype * datatype)
+{
+  if (*datatype == MPI_CHAR) {
+    APPLY_FUNC(a, b, length, char, REPLACE_OP);
+  } else if (*datatype == MPI_SHORT) {
+    APPLY_FUNC(a, b, length, short, REPLACE_OP);
+  } else if (*datatype == MPI_INT) {
+    APPLY_FUNC(a, b, length, int, REPLACE_OP);
+  } else if (*datatype == MPI_LONG) {
+    APPLY_FUNC(a, b, length, long, REPLACE_OP);
+  } else if (*datatype == MPI_UNSIGNED_SHORT) {
+    APPLY_FUNC(a, b, length, unsigned short, REPLACE_OP);
+  } else if (*datatype == MPI_UNSIGNED) {
+    APPLY_FUNC(a, b, length, unsigned int, REPLACE_OP);
+  } else if (*datatype == MPI_UNSIGNED_LONG) {
+    APPLY_FUNC(a, b, length, unsigned long, REPLACE_OP);
+  } else if (*datatype == MPI_BYTE) {
+    APPLY_FUNC(a, b, length, uint8_t, REPLACE_OP);
+  }
+}
 
 #define CREATE_MPI_OP(name, func)                             \
   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
@@ -1472,6 +1515,8 @@ CREATE_MPI_OP(MPI_BOR, bor_func);
 CREATE_MPI_OP(MPI_BXOR, bxor_func);
 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
+CREATE_MPI_OP(MPI_REPLACE, replace_func);
+
 
 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
 {
index 4a9a8d2..c2aa98c 100644 (file)
@@ -255,6 +255,18 @@ int PMPI_Type_ub(MPI_Datatype datatype, MPI_Aint * disp)
   return retval;
 }
 
+int PMPI_Type_dup(MPI_Datatype datatype, MPI_Datatype *newtype){
+  int retval = 0;
+
+  if (datatype == MPI_DATATYPE_NULL) {
+    retval = MPI_ERR_TYPE;
+  } else {
+    *newtype = smpi_datatype_dup(datatype);
+    retval = MPI_SUCCESS;
+  }
+  return retval;
+}
+
 int PMPI_Op_create(MPI_User_function * function, int commute, MPI_Op * op)
 {
   int retval = 0;
@@ -2697,6 +2709,147 @@ int PMPI_Type_create_resized(MPI_Datatype oldtype,MPI_Aint lb, MPI_Aint extent,
 }
 
 
+
+int PMPI_Win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, MPI_Win *win){
+  int retval = 0;
+  smpi_bench_end();
+  if (comm == MPI_COMM_NULL) {
+    retval= MPI_ERR_COMM;
+  }else if ((base == NULL && size != 0)
+            || disp_unit <= 0 || size < 0 ){
+    retval= MPI_ERR_OTHER;
+  }else{
+    *win = smpi_mpi_win_create( base, size, disp_unit, info, comm);
+    retval = MPI_SUCCESS;
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Win_free( MPI_Win* win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == NULL || *win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  }else{
+    retval=smpi_mpi_win_free(win);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+
+int PMPI_Win_fence( int assert,  MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else {
+    retval = smpi_mpi_win_fence(assert, win);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else if (target_rank == MPI_PROC_NULL) {
+    retval = MPI_SUCCESS;
+  } else if (target_rank <0){
+    retval = MPI_ERR_RANK;
+  } else if (target_disp <0){
+      retval = MPI_ERR_ARG;
+  } else if (origin_count < 0 || target_count < 0) {
+    retval = MPI_ERR_COUNT;
+  } else if (origin_addr==NULL && origin_count > 0){
+    retval = MPI_ERR_COUNT;
+  } else if ((!is_datatype_valid(origin_datatype)) ||
+            (!is_datatype_valid(target_datatype))) {
+    retval = MPI_ERR_TYPE;
+  } else {
+    retval = smpi_mpi_get( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, target_datatype, win);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int PMPI_Put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else if (target_rank == MPI_PROC_NULL) {
+    retval = MPI_SUCCESS;
+  } else if (target_rank <0){
+    retval = MPI_ERR_RANK;
+  } else if (target_disp <0){
+    retval = MPI_ERR_ARG;
+  } else if (origin_count < 0 || target_count < 0) {
+    retval = MPI_ERR_COUNT;
+  } else if (origin_addr==NULL && origin_count > 0){
+    retval = MPI_ERR_COUNT;
+  } else if ((!is_datatype_valid(origin_datatype)) ||
+            (!is_datatype_valid(target_datatype))) {
+    retval = MPI_ERR_TYPE;
+  } else {
+    retval = smpi_mpi_put( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, target_datatype, win);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+
+int PMPI_Accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win){
+  int retval = 0;
+  smpi_bench_end();
+  if (win == MPI_WIN_NULL) {
+    retval = MPI_ERR_WIN;
+  } else if (target_rank == MPI_PROC_NULL) {
+    retval = MPI_SUCCESS;
+  } else if (target_rank <0){
+    retval = MPI_ERR_RANK;
+  } else if (target_disp <0){
+    retval = MPI_ERR_ARG;
+  } else if (origin_count < 0 || target_count < 0) {
+    retval = MPI_ERR_COUNT;
+  } else if (origin_addr==NULL && origin_count > 0){
+    retval = MPI_ERR_COUNT;
+  } else if ((!is_datatype_valid(origin_datatype)) ||
+            (!is_datatype_valid(target_datatype))) {
+    retval = MPI_ERR_TYPE;
+  } else if (op == MPI_OP_NULL) {
+    retval = MPI_ERR_OP;
+  } else {
+    retval = smpi_mpi_accumulate( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, target_datatype, op, win);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+
+int PMPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){
+  void *ptr = xbt_malloc(size);
+  if(!ptr)
+    return MPI_ERR_NO_MEM;
+  else {
+    *(void **)baseptr = ptr;
+    return MPI_SUCCESS;
+  }
+}
+
+int PMPI_Free_mem(void *baseptr){
+  xbt_free(baseptr);
+  return MPI_SUCCESS;
+}
+
+
+
 /* The following calls are not yet implemented and will fail at runtime. */
 /* Once implemented, please move them above this notice. */
 
@@ -2705,10 +2858,6 @@ int PMPI_Type_create_resized(MPI_Datatype oldtype,MPI_Aint lb, MPI_Aint extent,
     return MPI_SUCCESS;                                                 \
   }
 
-int PMPI_Type_dup(MPI_Datatype datatype, MPI_Datatype *newtype){
-  NOT_YET_IMPLEMENTED
-}
-
 int PMPI_Type_set_name(MPI_Datatype  datatype, char * name)
 {
   NOT_YET_IMPLEMENTED
@@ -2781,6 +2930,10 @@ int PMPI_Comm_set_errhandler(MPI_Comm comm, MPI_Errhandler errhandler) {
   NOT_YET_IMPLEMENTED
 }
 
+int PMPI_Win_set_errhandler(MPI_Win win, MPI_Errhandler errhandler) {
+  NOT_YET_IMPLEMENTED
+}
+
 int PMPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler* errhandler) {
   NOT_YET_IMPLEMENTED
 }
@@ -2942,18 +3095,6 @@ int PMPI_Get_elements(MPI_Status* status, MPI_Datatype datatype, int* elements)
   NOT_YET_IMPLEMENTED
 }
 
-int PMPI_Win_fence( int assert,  MPI_Win win){
-  NOT_YET_IMPLEMENTED
-}
-
-int PMPI_Win_free( MPI_Win* win){
-  NOT_YET_IMPLEMENTED
-}
-
-int PMPI_Win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, MPI_Win *win){
-  NOT_YET_IMPLEMENTED
-}
-
 int PMPI_Info_create( MPI_Info *info){
   NOT_YET_IMPLEMENTED
 }
@@ -2966,11 +3107,6 @@ int PMPI_Info_free( MPI_Info *info){
   NOT_YET_IMPLEMENTED
 }
 
-int PMPI_Get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
-              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win){
-  NOT_YET_IMPLEMENTED
-}
-
 int PMPI_Type_get_envelope( MPI_Datatype datatype, int *num_integers,
                             int *num_addresses, int *num_datatypes, int *combiner){
   NOT_YET_IMPLEMENTED
diff --git a/src/smpi/smpi_rma.c b/src/smpi/smpi_rma.c
new file mode 100644 (file)
index 0000000..9ff12c8
--- /dev/null
@@ -0,0 +1,224 @@
+
+/* Copyright (c) 2007-2014. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "private.h"
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
+
+#define RMA_TAG -1234
+
+/* FIXME:using a global array of MPI_Win simplifies the way to exchange pointers and info,
+ * but it breaks distributed simulation
+ */
+
+xbt_bar_t creation_bar = NULL;
+
+typedef struct s_smpi_mpi_win{
+  void* base;
+  MPI_Aint size;
+  int disp_unit;
+  MPI_Comm comm;
+  //MPI_Info info
+  int assert;
+  xbt_dynar_t requests;
+  xbt_bar_t bar;
+  MPI_Win* connected_wins;
+} s_smpi_mpi_win_t;
+
+
+MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
+
+  MPI_Win win;
+  
+  int comm_size = smpi_comm_size(comm);
+  int rank=smpi_comm_rank(comm);
+  XBT_DEBUG("Creating window");
+
+  win = xbt_new(s_smpi_mpi_win_t, 1);
+  win->base = base;
+  win->size = size;
+  win->disp_unit = disp_unit;
+  win->assert = 0;
+  //win->info = info;
+  win->comm = comm;
+  win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
+  win->connected_wins = xbt_malloc0(comm_size*sizeof(MPI_Win));
+  win->connected_wins[rank] = win;
+  
+  if(rank==0){
+    win->bar=xbt_barrier_init(comm_size);
+  }
+  
+  mpi_coll_allgather_fun(&(win->connected_wins[rank]),
+                     sizeof(MPI_Win),
+                     MPI_BYTE,
+                     win->connected_wins,
+                     sizeof(MPI_Win),
+                     MPI_BYTE,
+                     comm);
+                     
+  mpi_coll_bcast_fun( &(win->bar),
+                     sizeof(xbt_bar_t),
+                     MPI_BYTE,
+                     0,
+                     comm);
+                     
+  mpi_coll_barrier_fun(comm);
+  
+  return win;
+}
+
+int smpi_mpi_win_free( MPI_Win* win){
+
+  //As per the standard, perform a barrier to ensure every async comm is finished
+  xbt_barrier_wait((*win)->bar);
+  xbt_dynar_free(&(*win)->requests);
+  xbt_free((*win)->connected_wins);
+  xbt_free(*win);
+  win = MPI_WIN_NULL;
+  return MPI_SUCCESS;
+}
+
+
+int smpi_mpi_win_fence( int assert,  MPI_Win win){
+
+  XBT_DEBUG("Entering fence");
+
+  if(assert != MPI_MODE_NOPRECEDE){
+    xbt_barrier_wait(win->bar);
+
+    xbt_dynar_t reqs = win->requests;
+    int size = xbt_dynar_length(reqs);
+    unsigned int cpt=0;
+    MPI_Request req;
+    // start all requests that have been prepared by another process
+    xbt_dynar_foreach(reqs, cpt, req){
+      if (req->flags & PREPARED) smpi_mpi_start(req);
+    }
+
+    MPI_Request* treqs = xbt_dynar_to_array(reqs);
+    smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+    xbt_free(treqs);
+    win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
+
+  }
+  win->assert = assert;
+  
+  xbt_barrier_wait(win->bar);
+  XBT_DEBUG("Leaving fence ");
+
+  return MPI_SUCCESS;
+}
+
+int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
+{
+  //get receiver pointer
+  MPI_Win recv_win = win->connected_wins[target_rank];
+
+  void* recv_addr = recv_win->base + target_disp * smpi_datatype_size(target_datatype)/* recv_win->disp_unit*/;
+  smpi_datatype_use(origin_datatype);
+  smpi_datatype_use(target_datatype);
+  XBT_DEBUG("Entering MPI_Put to %d", target_rank);
+
+  if(target_rank != smpi_comm_rank(win->comm)){
+    //prepare send_request
+    MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
+        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm);
+
+    //prepare receiver request
+    MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
+        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm);
+
+    //push request to receiver's win
+    xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
+
+    //start send
+    smpi_mpi_start(sreq);
+
+    //push request to sender's win
+    xbt_dynar_push_as(win->requests, MPI_Request, sreq);
+  }
+  //perform actual copy
+  /*smpi_datatype_copy(origin_addr, origin_count, origin_datatype,
+                    recv_addr, target_count, target_datatype);*/
+
+  return MPI_SUCCESS;
+}
+
+int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
+{
+  //get sender pointer
+  MPI_Win send_win = win->connected_wins[target_rank];
+
+  void* send_addr = send_win->base + target_disp * smpi_datatype_size(target_datatype)/** send_win->disp_unit*/;
+  smpi_datatype_use(origin_datatype);
+  smpi_datatype_use(target_datatype);
+  XBT_DEBUG("Entering MPI_Get from %d", target_rank);
+
+  if(target_rank != smpi_comm_rank(win->comm)){
+    //prepare send_request
+    MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
+        smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm);
+
+    //prepare receiver request
+    MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
+        smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm);
+
+    //push request to receiver's win
+    xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
+
+    //start recv
+    smpi_mpi_start(rreq);
+
+    //push request to sender's win
+    xbt_dynar_push_as(win->requests, MPI_Request, rreq);
+  }
+  //perform actual copy
+  /*smpi_datatype_copy(send_addr, target_count, target_datatype,
+                     origin_addr, origin_count, origin_datatype);*/
+
+
+  return MPI_SUCCESS;
+}
+
+
+int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+              MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
+{
+  //get receiver pointer
+  MPI_Win recv_win = win->connected_wins[target_rank];
+
+  void* recv_addr = recv_win->base + target_disp * smpi_datatype_size(target_datatype) /** recv_win->disp_unit*/;
+  XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
+
+  smpi_datatype_use(origin_datatype);
+  smpi_datatype_use(target_datatype);
+
+  if(target_rank != smpi_comm_rank(win->comm)){
+    //prepare send_request
+    MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
+        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm);
+
+    //prepare receiver request
+    MPI_Request rreq = smpi_rma_recv_init(NULL, 0, target_datatype,
+        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm);
+    rreq->flags |= ACCUMULATE;
+    //push request to receiver's win
+    xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
+    //start send
+    smpi_mpi_start(sreq);
+    //push request to sender's win
+    xbt_dynar_push_as(win->requests, MPI_Request, sreq);
+   }
+  //perform actual accumulation
+  smpi_op_apply(op, origin_addr, recv_addr, &origin_count, &origin_datatype);
+
+  return MPI_SUCCESS;
+}
+