From 7ff29e8672c30f460d929a39dfda8e912b8da6e7 Mon Sep 17 00:00:00 2001 From: genaud Date: Sat, 27 Jun 2009 15:28:46 +0000 Subject: [PATCH] some preliminary additions to implement more collectives git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6384 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- include/smpi/smpi.h | 13 ++++- src/smpi/sample/reduce.c | 27 +++++++++ src/smpi/smpi_base.c | 116 ++++++++++++++++++++++++++++++++++++--- src/smpi/smpi_global.c | 14 +++-- src/smpi/smpi_mpi.c | 70 +++++++++++++++++++++++ 5 files changed, 228 insertions(+), 12 deletions(-) create mode 100644 src/smpi/sample/reduce.c diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index bf5df23687..ce5fea7844 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -21,6 +21,7 @@ SG_BEGIN_DECL() #define MPI_ERR_COUNT 6 #define MPI_ERR_RANK 7 #define MPI_ERR_TAG 8 + // MPI_Comm typedef struct smpi_mpi_communicator_t *smpi_mpi_communicator_t; typedef smpi_mpi_communicator_t MPI_Comm; @@ -53,10 +54,13 @@ SG_BEGIN_DECL() smpi_mpi_datatype_t mpi_byte; smpi_mpi_datatype_t mpi_int; + smpi_mpi_datatype_t mpi_float; smpi_mpi_datatype_t mpi_double; smpi_mpi_op_t mpi_land; smpi_mpi_op_t mpi_sum; + smpi_mpi_op_t mpi_min; + smpi_mpi_op_t mpi_max; } s_smpi_mpi_global_t; typedef struct smpi_mpi_global_t *smpi_mpi_global_t; @@ -68,11 +72,14 @@ SG_BEGIN_DECL() #define MPI_STATUS_IGNORE NULL #define MPI_BYTE (smpi_mpi_global->mpi_byte) -#define MPI_DOUBLE (smpi_mpi_global->mpi_double) #define MPI_INT (smpi_mpi_global->mpi_int) +#define MPI_FLOAT (smpi_mpi_global->mpi_float) +#define MPI_DOUBLE (smpi_mpi_global->mpi_double) #define MPI_LAND (smpi_mpi_global->mpi_land) #define MPI_SUM (smpi_mpi_global->mpi_sum) +#define MPI_MIN (smpi_mpi_global->mpi_min) +#define MPI_MAX (smpi_mpi_global->mpi_max) // MPI macros #define MPI_Init(a, b) SMPI_MPI_Init(a, b) @@ -90,6 +97,7 @@ SG_BEGIN_DECL() #define MPI_Wait(a, b) SMPI_MPI_Wait(a, b) #define MPI_Comm_split(a, b, c, d) SMPI_MPI_Comm_split(a, b, c, d) #define MPI_Wtime() SMPI_MPI_Wtime() +#define MPI_Reduce( a, b, c, d, e, f, g) SMPI_MPI_Reduce( a, b, c, d, e, f, g) // SMPI Functions XBT_PUBLIC(int) SMPI_MPI_Init(int *argc, char ***argv); @@ -117,6 +125,9 @@ XBT_PUBLIC(int) SMPI_MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm * comm_out); XBT_PUBLIC(double) SMPI_MPI_Wtime(void); +XBT_PUBLIC(int) SMPI_MPI_Reduce(void *sendbuf, void *recvbuf, int count, + MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm); + // smpi functions XBT_IMPORT_NO_EXPORT(int) smpi_simulated_main(int argc, char **argv); XBT_PUBLIC(unsigned int) smpi_sleep(unsigned int); diff --git a/src/smpi/sample/reduce.c b/src/smpi/sample/reduce.c new file mode 100644 index 0000000000..b71c1a6934 --- /dev/null +++ b/src/smpi/sample/reduce.c @@ -0,0 +1,27 @@ +#include +#include + +int main (int argc, char **argv) { + int size, rank; + int root=0; + int value = 1; + int sum=-99; + + double start_timer; + + + MPI_Init(&argc, &argv); + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + start_timer = MPI_Wtime(); + + printf("rank %d has value %d\n", rank, value); + MPI_Reduce(&value, &sum, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD); + if ( rank == root) { + printf("On root: sum=%d\n",sum); + printf("Elapsed time=%lf s\n", MPI_Wtime()-start_timer); + } + MPI_Finalize(); + return 0; +} diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index 1d619ba9de..6dbf4ff887 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -13,6 +13,9 @@ XBT_LOG_EXTERNAL_CATEGORY(smpi_util); smpi_mpi_global_t smpi_mpi_global = NULL; +/** + * Operations of MPI_OP : implemented=land,sum,min,max + **/ void smpi_mpi_land_func(void *a, void *b, int *length, MPI_Datatype * datatype); @@ -28,20 +31,119 @@ void smpi_mpi_land_func(void *a, void *b, int *length, } } +/** + * sum two vectors element-wise + * + * @param a the first vectors + * @param b the second vectors + * @return the second vector is modified and contains the element-wise sums + **/ void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype); void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype) { - int i; - if (*datatype == smpi_mpi_global->mpi_int) { - int *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] + y[i]; - } - } + int i; + if (*datatype == smpi_mpi_global->mpi_byte) { + char *x = a, *y = b; + for (i = 0; i < *length; i++) { + y[i] = x[i] + y[i]; + } + } else { + if (*datatype == smpi_mpi_global->mpi_int) { + int *x = a, *y = b; + for (i = 0; i < *length; i++) { + y[i] = x[i] + y[i]; + } + } else { + if (*datatype == smpi_mpi_global->mpi_float) { + float *x = a, *y = b; + for (i = 0; i < *length; i++) { + y[i] = x[i] + y[i]; + } + } else { + if (*datatype == smpi_mpi_global->mpi_double) { + double *x = a, *y = b; + for (i = 0; i < *length; i++) { + y[i] = x[i] + y[i]; + } + }}}} } +/** + * compute the min of two vectors element-wise + **/ +void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype); + +void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype) +{ + int i; + if (*datatype == smpi_mpi_global->mpi_byte) { + char *x = a, *y = b; + for (i = 0; i < *length; i++) { + y[i] = x[i] < y[i] ? x[i] : y[i]; + } + } else { + if (*datatype == smpi_mpi_global->mpi_int) { + int *x = a, *y = b; + for (i = 0; i < *length; i++) { + y[i] = x[i] < y[i] ? x[i] : y[i]; + } + } else { + if (*datatype == smpi_mpi_global->mpi_float) { + float *x = a, *y = b; + for (i = 0; i < *length; i++) { + y[i] = x[i] < y[i] ? x[i] : y[i]; + } + } else { + if (*datatype == smpi_mpi_global->mpi_double) { + double *x = a, *y = b; + for (i = 0; i < *length; i++) { + y[i] = x[i] < y[i] ? x[i] : y[i]; + } + + }}}} +} +/** + * compute the max of two vectors element-wise + **/ +void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype); + +void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype) +{ + int i; + if (*datatype == smpi_mpi_global->mpi_byte) { + char *x = a, *y = b; + for (i = 0; i < *length; i++) { + y[i] = x[i] > y[i] ? x[i] : y[i]; + } + } else { + if (*datatype == smpi_mpi_global->mpi_int) { + int *x = a, *y = b; + for (i = 0; i > *length; i++) { + y[i] = x[i] < y[i] ? x[i] : y[i]; + } + } else { + if (*datatype == smpi_mpi_global->mpi_float) { + float *x = a, *y = b; + for (i = 0; i > *length; i++) { + y[i] = x[i] < y[i] ? x[i] : y[i]; + } + } else { + if (*datatype == smpi_mpi_global->mpi_double) { + double *x = a, *y = b; + for (i = 0; i > *length; i++) { + y[i] = x[i] < y[i] ? x[i] : y[i]; + } + + }}}} +} + + + +/** + * tell the MPI rank of the calling process (from its SIMIX process id) + **/ int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm) { return comm->index_to_rank_map[smpi_process_index()]; diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index b458bce363..4971ece6d8 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -89,6 +89,7 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype, smpi_mpi_request_t request = NULL; + printf("in create-req(): MPI_ANY_SOURCE=%d,src=%d,comm->size=%d\n",MPI_ANY_SOURCE,src,comm->size); // parameter checking prob belongs in smpi_mpi, but this is less repeat code if (NULL == buf) { retval = MPI_ERR_INTERN; @@ -99,6 +100,7 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype, } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) { retval = MPI_ERR_RANK; } else if (0 > dst || comm->size <= dst) { + printf("err MPI_ERR_RANK => MPI_ANY_SOURCE=%d,src=%d,dst=%d,comm->size=%d\n",MPI_ANY_SOURCE,src,dst,comm->size); retval = MPI_ERR_RANK; } else if (MPI_ANY_TAG != tag && 0 > tag) { retval = MPI_ERR_TAG; @@ -121,10 +123,10 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype, return retval; } /* FIXME: understand what they do and put the prototypes in a header file (live in smpi_base.c) */ -void smpi_mpi_land_func(void *a, void *b, int *length, - MPI_Datatype * datatype); -void smpi_mpi_sum_func(void *a, void *b, int *length, - MPI_Datatype * datatype); +void smpi_mpi_land_func(void *a, void *b, int *length, MPI_Datatype * datatype); +void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype); +void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype); +void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype); void smpi_global_init() { @@ -197,6 +199,10 @@ void smpi_global_init() smpi_mpi_global->mpi_land->func = smpi_mpi_land_func; smpi_mpi_global->mpi_sum = xbt_new(s_smpi_mpi_op_t, 1); smpi_mpi_global->mpi_sum->func = smpi_mpi_sum_func; + smpi_mpi_global->mpi_min = xbt_new(s_smpi_mpi_op_t, 1); + smpi_mpi_global->mpi_min->func = smpi_mpi_min_func; + smpi_mpi_global->mpi_max = xbt_new(s_smpi_mpi_op_t, 1); + smpi_mpi_global->mpi_min->func = smpi_mpi_max_func; } diff --git a/src/smpi/smpi_mpi.c b/src/smpi/smpi_mpi.c index 08d3eba5f4..d19913a062 100644 --- a/src/smpi/smpi_mpi.c +++ b/src/smpi/smpi_mpi.c @@ -186,6 +186,9 @@ int SMPI_MPI_Wait(MPI_Request * request, MPI_Status * status) return smpi_mpi_wait(*request, status); } +/** + * MPI_Bcast + **/ int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) { @@ -217,6 +220,73 @@ int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, return retval; } +/** + * MPI_Reduce + **/ + +int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, + MPI_Op op, int root, MPI_Comm comm ) +{ + int retval = MPI_SUCCESS; + int rank; + int size; + int i; + smpi_mpi_request_t *tabrequest; + + smpi_bench_end(); + + rank = smpi_mpi_comm_rank(comm); + size = comm->size; + + tabrequest = malloc((size)*sizeof(smpi_mpi_request_t)); + if (NULL==tabrequest) { + fprintf(stderr,"[smpi] %s:%d : cannot alloc memory for %d requests. Exiting.\n",__FILE__,__LINE__,size); + exit(1); + } + + if (rank != root) { // if i am not root, simply send my buffer to root + retval = smpi_create_request(sendbuf, count, datatype, + rank, root, 0, comm, &(tabrequest[rank])); + smpi_mpi_isend(tabrequest[rank]); + smpi_mpi_wait(tabrequest[rank], MPI_STATUS_IGNORE); + //printf("DEBUG: rank %d sent my sendbuf to root (rank %d)\n",rank,root); + } else { + // i am the root: wait for all buffers by creating requests + // i can not use: 'request->forward = size-1;' (which would progagate size-1 receive reqs) + // since we should op values as soon as one receiving request matches. + for (i=0; isize; i++) { + if ( rank != i ) { // except for me + // reminder: for smpi_create_request() the src is always the process sending. + retval = smpi_create_request(recvbuf, count, datatype, MPI_ANY_SOURCE, root, + 0, comm, &(tabrequest[i])); + if (NULL != tabrequest[i] && MPI_SUCCESS == retval) { + if (MPI_SUCCESS == retval) { + smpi_mpi_irecv(tabrequest[i]); + } + } + } + } + // now, wait for completion of all irecv's. + // FIXME: we should implement smpi_waill_all for a more asynchronous behavior + for (i=0; isize; i++) { + if ( rank != i ) { // except for me + smpi_mpi_wait(tabrequest[i], MPI_STATUS_IGNORE); + + // FIXME: the core part is here. To be written ... + fprintf(stderr,"[smpi] %s:%d : MPI_Reduce *Not yet implemented*.\n",__FILE__,__LINE__); + + } + + } + } + for (i=0; isize; i++) + xbt_mallocator_release(smpi_global->request_mallocator, tabrequest[i]); + + smpi_bench_begin(); + + return retval; +} + // used by comm_split to sort ranks based on key values int smpi_compare_rankkeys(const void *a, const void *b); int smpi_compare_rankkeys(const void *a, const void *b) -- 2.20.1