Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
some preliminary additions to implement more collectives
authorgenaud <genaud@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Sat, 27 Jun 2009 15:28:46 +0000 (15:28 +0000)
committergenaud <genaud@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Sat, 27 Jun 2009 15:28:46 +0000 (15:28 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6384 48e7efb5-ca39-0410-a469-dd3cf9ba447f

include/smpi/smpi.h
src/smpi/sample/reduce.c [new file with mode: 0644]
src/smpi/smpi_base.c
src/smpi/smpi_global.c
src/smpi/smpi_mpi.c

index bf5df23..ce5fea7 100644 (file)
@@ -21,6 +21,7 @@ SG_BEGIN_DECL()
 #define MPI_ERR_COUNT   6
 #define MPI_ERR_RANK    7
 #define MPI_ERR_TAG     8
 #define MPI_ERR_COUNT   6
 #define MPI_ERR_RANK    7
 #define MPI_ERR_TAG     8
+
 // MPI_Comm
      typedef struct smpi_mpi_communicator_t *smpi_mpi_communicator_t;
      typedef smpi_mpi_communicator_t MPI_Comm;
 // MPI_Comm
      typedef struct smpi_mpi_communicator_t *smpi_mpi_communicator_t;
      typedef smpi_mpi_communicator_t MPI_Comm;
@@ -53,10 +54,13 @@ SG_BEGIN_DECL()
 
        smpi_mpi_datatype_t mpi_byte;
        smpi_mpi_datatype_t mpi_int;
 
        smpi_mpi_datatype_t mpi_byte;
        smpi_mpi_datatype_t mpi_int;
+       smpi_mpi_datatype_t mpi_float;
        smpi_mpi_datatype_t mpi_double;
 
        smpi_mpi_op_t mpi_land;
        smpi_mpi_op_t mpi_sum;
        smpi_mpi_datatype_t mpi_double;
 
        smpi_mpi_op_t mpi_land;
        smpi_mpi_op_t mpi_sum;
+       smpi_mpi_op_t mpi_min;
+       smpi_mpi_op_t mpi_max;
 
      } s_smpi_mpi_global_t;
      typedef struct smpi_mpi_global_t *smpi_mpi_global_t;
 
      } s_smpi_mpi_global_t;
      typedef struct smpi_mpi_global_t *smpi_mpi_global_t;
@@ -68,11 +72,14 @@ SG_BEGIN_DECL()
 #define MPI_STATUS_IGNORE NULL
 
 #define MPI_BYTE          (smpi_mpi_global->mpi_byte)
 #define MPI_STATUS_IGNORE NULL
 
 #define MPI_BYTE          (smpi_mpi_global->mpi_byte)
-#define MPI_DOUBLE        (smpi_mpi_global->mpi_double)
 #define MPI_INT           (smpi_mpi_global->mpi_int)
 #define MPI_INT           (smpi_mpi_global->mpi_int)
+#define MPI_FLOAT         (smpi_mpi_global->mpi_float)
+#define MPI_DOUBLE        (smpi_mpi_global->mpi_double)
 
 #define MPI_LAND          (smpi_mpi_global->mpi_land)
 #define MPI_SUM           (smpi_mpi_global->mpi_sum)
 
 #define MPI_LAND          (smpi_mpi_global->mpi_land)
 #define MPI_SUM           (smpi_mpi_global->mpi_sum)
+#define MPI_MIN           (smpi_mpi_global->mpi_min)
+#define MPI_MAX           (smpi_mpi_global->mpi_max)
 
 // MPI macros
 #define MPI_Init(a, b) SMPI_MPI_Init(a, b)
 
 // MPI macros
 #define MPI_Init(a, b) SMPI_MPI_Init(a, b)
@@ -90,6 +97,7 @@ SG_BEGIN_DECL()
 #define MPI_Wait(a, b) SMPI_MPI_Wait(a, b)
 #define MPI_Comm_split(a, b, c, d) SMPI_MPI_Comm_split(a, b, c, d)
 #define MPI_Wtime() SMPI_MPI_Wtime()
 #define MPI_Wait(a, b) SMPI_MPI_Wait(a, b)
 #define MPI_Comm_split(a, b, c, d) SMPI_MPI_Comm_split(a, b, c, d)
 #define MPI_Wtime() SMPI_MPI_Wtime()
+#define MPI_Reduce( a, b, c, d, e, f, g) SMPI_MPI_Reduce( a, b, c, d, e, f, g) 
 
 // SMPI Functions
 XBT_PUBLIC(int) SMPI_MPI_Init(int *argc, char ***argv);
 
 // SMPI Functions
 XBT_PUBLIC(int) SMPI_MPI_Init(int *argc, char ***argv);
@@ -117,6 +125,9 @@ XBT_PUBLIC(int) SMPI_MPI_Comm_split(MPI_Comm comm, int color, int key,
                                     MPI_Comm * comm_out);
 XBT_PUBLIC(double) SMPI_MPI_Wtime(void);
 
                                     MPI_Comm * comm_out);
 XBT_PUBLIC(double) SMPI_MPI_Wtime(void);
 
+XBT_PUBLIC(int) SMPI_MPI_Reduce(void *sendbuf, void *recvbuf, int count, 
+                                   MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm);
+
 // smpi functions
 XBT_IMPORT_NO_EXPORT(int) smpi_simulated_main(int argc, char **argv);
 XBT_PUBLIC(unsigned int) smpi_sleep(unsigned int);
 // smpi functions
 XBT_IMPORT_NO_EXPORT(int) smpi_simulated_main(int argc, char **argv);
 XBT_PUBLIC(unsigned int) smpi_sleep(unsigned int);
diff --git a/src/smpi/sample/reduce.c b/src/smpi/sample/reduce.c
new file mode 100644 (file)
index 0000000..b71c1a6
--- /dev/null
@@ -0,0 +1,27 @@
+#include <stdio.h>
+#include <mpi.h>
+
+int main (int argc, char **argv) {
+  int size, rank;
+  int root=0;
+  int value = 1;
+  int sum=-99;
+
+  double start_timer;
+
+
+  MPI_Init(&argc, &argv);
+  MPI_Comm_size(MPI_COMM_WORLD, &size);
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+
+  start_timer = MPI_Wtime();
+
+  printf("rank %d has value %d\n", rank, value);
+  MPI_Reduce(&value, &sum, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
+  if ( rank == root) {
+          printf("On root: sum=%d\n",sum);
+           printf("Elapsed time=%lf s\n", MPI_Wtime()-start_timer);
+  }
+  MPI_Finalize();
+  return 0;
+}
index 1d619ba..6dbf4ff 100644 (file)
@@ -13,6 +13,9 @@ XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
 
 smpi_mpi_global_t smpi_mpi_global = NULL;
 
 
 smpi_mpi_global_t smpi_mpi_global = NULL;
 
+/**
+ * Operations of MPI_OP : implemented=land,sum,min,max
+ **/
 void smpi_mpi_land_func(void *a, void *b, int *length,
                         MPI_Datatype * datatype);
 
 void smpi_mpi_land_func(void *a, void *b, int *length,
                         MPI_Datatype * datatype);
 
@@ -28,20 +31,119 @@ void smpi_mpi_land_func(void *a, void *b, int *length,
   }
 }
 
   }
 }
 
+/**
+ * sum two vectors element-wise
+ *
+ * @param a the first vectors 
+ * @param b the second vectors
+ * @return the second vector is modified and contains the element-wise sums
+ **/
 void smpi_mpi_sum_func(void *a, void *b, int *length,
                        MPI_Datatype * datatype);
 
 void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
 {
 void smpi_mpi_sum_func(void *a, void *b, int *length,
                        MPI_Datatype * datatype);
 
 void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
 {
-  int i;
-  if (*datatype == smpi_mpi_global->mpi_int) {
-    int *x = a, *y = b;
-    for (i = 0; i < *length; i++) {
-      y[i] = x[i] + y[i];
-    }
-  }
+         int i;
+         if (*datatype == smpi_mpi_global->mpi_byte) {
+                               char *x = a, *y = b;
+                               for (i = 0; i < *length; i++) {
+                                         y[i] = x[i] + y[i];
+                               }
+         } else {
+         if (*datatype == smpi_mpi_global->mpi_int) {
+                               int *x = a, *y = b;
+                               for (i = 0; i < *length; i++) {
+                                         y[i] = x[i] + y[i];
+                               }
+         } else {
+         if (*datatype == smpi_mpi_global->mpi_float) {
+                               float *x = a, *y = b;
+                               for (i = 0; i < *length; i++) {
+                                         y[i] = x[i] + y[i];
+                               }
+         } else {
+         if (*datatype == smpi_mpi_global->mpi_double) {
+                               double *x = a, *y = b;
+                               for (i = 0; i < *length; i++) {
+                                         y[i] = x[i] + y[i];
+                               }
+         }}}}
 }
 }
+/**
+ * compute the min of two vectors element-wise
+ **/
+void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+
+void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype)
+{
+         int i;
+         if (*datatype == smpi_mpi_global->mpi_byte) {
+                               char *x = a, *y = b;
+                               for (i = 0; i < *length; i++) {
+                                         y[i] = x[i] < y[i] ? x[i] : y[i];
+                               }
+         } else {
+         if (*datatype == smpi_mpi_global->mpi_int) {
+                               int *x = a, *y = b;
+                               for (i = 0; i < *length; i++) {
+                                         y[i] = x[i] < y[i] ? x[i] : y[i];
+                               }
+         } else {
+         if (*datatype == smpi_mpi_global->mpi_float) {
+                               float *x = a, *y = b;
+                               for (i = 0; i < *length; i++) {
+                                         y[i] = x[i] < y[i] ? x[i] : y[i];
+                               }
+         } else {
+         if (*datatype == smpi_mpi_global->mpi_double) {
+                               double *x = a, *y = b;
+                               for (i = 0; i < *length; i++) {
+                                         y[i] = x[i] < y[i] ? x[i] : y[i];
+                               }
+
+         }}}}
+}
+/**
+ * compute the max of two vectors element-wise
+ **/
+void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+
+void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype)
+{
+         int i;
+         if (*datatype == smpi_mpi_global->mpi_byte) {
+                               char *x = a, *y = b;
+                               for (i = 0; i < *length; i++) {
+                                         y[i] = x[i] > y[i] ? x[i] : y[i];
+                               }
+         } else {
+         if (*datatype == smpi_mpi_global->mpi_int) {
+                               int *x = a, *y = b;
+                               for (i = 0; i > *length; i++) {
+                                         y[i] = x[i] < y[i] ? x[i] : y[i];
+                               }
+         } else {
+         if (*datatype == smpi_mpi_global->mpi_float) {
+                               float *x = a, *y = b;
+                               for (i = 0; i > *length; i++) {
+                                         y[i] = x[i] < y[i] ? x[i] : y[i];
+                               }
+         } else {
+         if (*datatype == smpi_mpi_global->mpi_double) {
+                               double *x = a, *y = b;
+                               for (i = 0; i > *length; i++) {
+                                         y[i] = x[i] < y[i] ? x[i] : y[i];
+                               }
+
+         }}}}
+}
+
+
+
 
 
+/**
+ * tell the MPI rank of the calling process (from its SIMIX process id)
+ **/
 int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm)
 {
   return comm->index_to_rank_map[smpi_process_index()];
 int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm)
 {
   return comm->index_to_rank_map[smpi_process_index()];
index b458bce..4971ece 100644 (file)
@@ -89,6 +89,7 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype,
 
   smpi_mpi_request_t request = NULL;
 
 
   smpi_mpi_request_t request = NULL;
 
+           printf("in create-req():  MPI_ANY_SOURCE=%d,src=%d,comm->size=%d\n",MPI_ANY_SOURCE,src,comm->size);
   // parameter checking prob belongs in smpi_mpi, but this is less repeat code
   if (NULL == buf) {
     retval = MPI_ERR_INTERN;
   // parameter checking prob belongs in smpi_mpi, but this is less repeat code
   if (NULL == buf) {
     retval = MPI_ERR_INTERN;
@@ -99,6 +100,7 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype,
   } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
     retval = MPI_ERR_RANK;
   } else if (0 > dst || comm->size <= dst) {
   } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
     retval = MPI_ERR_RANK;
   } else if (0 > dst || comm->size <= dst) {
+           printf("err MPI_ERR_RANK => MPI_ANY_SOURCE=%d,src=%d,dst=%d,comm->size=%d\n",MPI_ANY_SOURCE,src,dst,comm->size);
     retval = MPI_ERR_RANK;
   } else if (MPI_ANY_TAG != tag && 0 > tag) {
     retval = MPI_ERR_TAG;
     retval = MPI_ERR_RANK;
   } else if (MPI_ANY_TAG != tag && 0 > tag) {
     retval = MPI_ERR_TAG;
@@ -121,10 +123,10 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype,
   return retval;
 }
 /* FIXME: understand what they do and put the prototypes in a header file (live in smpi_base.c) */
   return retval;
 }
 /* FIXME: understand what they do and put the prototypes in a header file (live in smpi_base.c) */
-void smpi_mpi_land_func(void *a, void *b, int *length,
-                        MPI_Datatype * datatype);
-void smpi_mpi_sum_func(void *a, void *b, int *length,
-                       MPI_Datatype * datatype);
+void smpi_mpi_land_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype);
 
 void smpi_global_init()
 {
 
 void smpi_global_init()
 {
@@ -197,6 +199,10 @@ void smpi_global_init()
   smpi_mpi_global->mpi_land->func = smpi_mpi_land_func;
   smpi_mpi_global->mpi_sum = xbt_new(s_smpi_mpi_op_t, 1);
   smpi_mpi_global->mpi_sum->func = smpi_mpi_sum_func;
   smpi_mpi_global->mpi_land->func = smpi_mpi_land_func;
   smpi_mpi_global->mpi_sum = xbt_new(s_smpi_mpi_op_t, 1);
   smpi_mpi_global->mpi_sum->func = smpi_mpi_sum_func;
+  smpi_mpi_global->mpi_min = xbt_new(s_smpi_mpi_op_t, 1);
+  smpi_mpi_global->mpi_min->func = smpi_mpi_min_func;
+  smpi_mpi_global->mpi_max = xbt_new(s_smpi_mpi_op_t, 1);
+  smpi_mpi_global->mpi_min->func = smpi_mpi_max_func;
 
 }
 
 
 }
 
index 08d3eba..d19913a 100644 (file)
@@ -186,6 +186,9 @@ int SMPI_MPI_Wait(MPI_Request * request, MPI_Status * status)
   return smpi_mpi_wait(*request, status);
 }
 
   return smpi_mpi_wait(*request, status);
 }
 
+/**
+ * MPI_Bcast
+ **/
 int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
                    MPI_Comm comm)
 {
 int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
                    MPI_Comm comm)
 {
@@ -217,6 +220,73 @@ int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
   return retval;
 }
 
   return retval;
 }
 
+/**
+ * MPI_Reduce
+ **/
+
+int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, 
+                        MPI_Op op, int root, MPI_Comm comm )
+{
+  int retval = MPI_SUCCESS;
+  int rank;
+  int size;
+  int i;
+  smpi_mpi_request_t *tabrequest; 
+
+  smpi_bench_end();
+
+  rank = smpi_mpi_comm_rank(comm);
+  size = comm->size; 
+
+  tabrequest = malloc((size)*sizeof(smpi_mpi_request_t));
+  if (NULL==tabrequest) {
+       fprintf(stderr,"[smpi] %s:%d : cannot alloc memory for %d requests. Exiting.\n",__FILE__,__LINE__,size);
+       exit(1);
+  }
+
+  if (rank != root) { // if i am not root, simply send my buffer to root
+           retval = smpi_create_request(sendbuf, count, datatype, 
+                                 rank, root, 0, comm, &(tabrequest[rank]));
+           smpi_mpi_isend(tabrequest[rank]);
+           smpi_mpi_wait(tabrequest[rank], MPI_STATUS_IGNORE);
+           //printf("DEBUG: rank %d sent my sendbuf to root (rank %d)\n",rank,root);
+  } else {
+           // i am the root: wait for all buffers by creating requests
+           // i can not use: 'request->forward = size-1;' (which would progagate size-1 receive reqs)
+           // since we should op values as soon as one receiving request matches.
+           for (i=0; i<comm->size; i++) {
+                       if ( rank != i ) { // except for me
+                                 // reminder: for smpi_create_request() the src is always the process sending.
+                                 retval = smpi_create_request(recvbuf, count, datatype, MPI_ANY_SOURCE, root, 
+                                                       0, comm, &(tabrequest[i]));
+                                 if (NULL != tabrequest[i] && MPI_SUCCESS == retval) {
+                                           if (MPI_SUCCESS == retval) {
+                                                       smpi_mpi_irecv(tabrequest[i]);
+                                           }
+                                 }
+                       }
+           }
+           // now, wait for completion of all irecv's.
+           // FIXME: we should implement smpi_waill_all for a more asynchronous behavior
+           for (i=0; i<comm->size; i++) {
+                       if ( rank != i ) { // except for me
+                                 smpi_mpi_wait(tabrequest[i], MPI_STATUS_IGNORE);
+
+                                 // FIXME: the core part is here. To be written ...
+                                 fprintf(stderr,"[smpi] %s:%d : MPI_Reduce *Not yet implemented*.\n",__FILE__,__LINE__);
+
+                       }
+
+           }
+  }
+  for (i=0; i<comm->size; i++) 
+           xbt_mallocator_release(smpi_global->request_mallocator, tabrequest[i]);
+
+  smpi_bench_begin();
+
+  return retval;
+}
+
 // used by comm_split to sort ranks based on key values
 int smpi_compare_rankkeys(const void *a, const void *b);
 int smpi_compare_rankkeys(const void *a, const void *b)
 // used by comm_split to sort ranks based on key values
 int smpi_compare_rankkeys(const void *a, const void *b);
 int smpi_compare_rankkeys(const void *a, const void *b)