From 70ed180f3ce3495678d048e9e396ef5eb65a6a99 Mon Sep 17 00:00:00 2001 From: pini Date: Mon, 8 Feb 2010 14:37:10 +0000 Subject: [PATCH] SMPI over SIMIX_network in a two days rush. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@7063 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- examples/smpi/bcast.tesh | 22 +- examples/smpi/compute2.c | 2 +- examples/smpi/compute3.c | 4 +- examples/smpi/split.c | 7 +- include/smpi/mpi.h | 2 + include/smpi/smpi.h | 370 ++++---- src/smpi/private.h | 224 ++--- src/smpi/smpi_base.c | 792 +++++++++------- src/smpi/smpi_bench.c | 82 +- src/smpi/smpi_coll.c | 991 +++++--------------- src/smpi/smpi_coll_private.h | 20 - src/smpi/smpi_comm.c | 34 + src/smpi/smpi_global.c | 410 +++------ src/smpi/smpi_group.c | 109 +++ src/smpi/smpi_mpi.c | 1560 ++++++++++++++++---------------- src/smpi/smpi_mpi_dt.c | 415 +++++++-- src/smpi/smpi_mpi_dt_private.h | 14 - src/smpi/smpi_receiver.c | 102 --- src/smpi/smpi_sender.c | 110 --- src/smpi/smpi_util.c | 27 +- src/smpi/smpicc.in | 1 + 21 files changed, 2409 insertions(+), 2889 deletions(-) create mode 100644 src/smpi/smpi_comm.c create mode 100644 src/smpi/smpi_group.c delete mode 100644 src/smpi/smpi_receiver.c delete mode 100644 src/smpi/smpi_sender.c diff --git a/examples/smpi/bcast.tesh b/examples/smpi/bcast.tesh index 74f4da56c7..4a5ebe4d5b 100644 --- a/examples/smpi/bcast.tesh +++ b/examples/smpi/bcast.tesh @@ -12,8 +12,8 @@ $ ../../src/smpi/smpirun -map -hostfile ${srcdir:=.}/hostfile -platform ${srcdir > node 1 has value 3 before broadcast > node 2 has value 3 before broadcast > node 1 has value 17 after broadcast -> node 0 has value 17 after broadcast > node 2 has value 17 after broadcast +> node 0 has value 17 after broadcast @@ -33,12 +33,12 @@ $ ../../src/smpi/smpirun -map -hostfile ${srcdir:=.}/hostfile -platform ${srcdir > node 3 has value 3 before broadcast > node 4 has value 3 before broadcast > node 5 has value 3 before broadcast -> node 0 has value 17 after broadcast > node 3 has value 17 after broadcast +> node 4 has value 17 after broadcast > node 2 has value 17 after broadcast +> node 0 has value 17 after broadcast > node 5 has value 17 after broadcast > node 1 has value 17 after broadcast -> node 4 has value 17 after broadcast # Another test @@ -69,15 +69,15 @@ $ ../../src/smpi/smpirun -map -hostfile ${srcdir:=.}/hostfile -platform ${srcdir > node 9 has value 3 before broadcast > node 10 has value 3 before broadcast > node 11 has value 3 before broadcast -> node 0 has value 17 after broadcast -> node 7 has value 17 after broadcast > node 3 has value 17 after broadcast -> node 8 has value 17 after broadcast -> node 5 has value 17 after broadcast -> node 11 has value 17 after broadcast -> node 1 has value 17 after broadcast -> node 9 has value 17 after broadcast -> node 2 has value 17 after broadcast > node 6 has value 17 after broadcast > node 4 has value 17 after broadcast +> node 0 has value 17 after broadcast +> node 9 has value 17 after broadcast +> node 5 has value 17 after broadcast > node 10 has value 17 after broadcast +> node 8 has value 17 after broadcast +> node 7 has value 17 after broadcast +> node 1 has value 17 after broadcast +> node 11 has value 17 after broadcast +> node 2 has value 17 after broadcast diff --git a/examples/smpi/compute2.c b/examples/smpi/compute2.c index 676944f99d..ea524408a4 100644 --- a/examples/smpi/compute2.c +++ b/examples/smpi/compute2.c @@ -7,7 +7,7 @@ int main(int argc, char *argv[]) double d; MPI_Init(&argc, &argv); d = 2.0; - SMPI_DO_ONCE { +/* SMPI_DO_ONCE */{ for (i = 0; i < atoi(argv[1]); i++) { if (d < 10000) { d = d * d; diff --git a/examples/smpi/compute3.c b/examples/smpi/compute3.c index c31fb23f36..d25a531b10 100644 --- a/examples/smpi/compute3.c +++ b/examples/smpi/compute3.c @@ -7,7 +7,7 @@ int main(int argc, char *argv[]) double d; MPI_Init(&argc, &argv); d = 2.0; - SMPI_DO_ONCE { +/* SMPI_DO_ONCE */{ for (i = 0; i < atoi(argv[1]); i++) { if (d < 10000) { d = d * d; @@ -17,7 +17,7 @@ int main(int argc, char *argv[]) } printf("%d %f\n", i, d); } - SMPI_DO_ONCE { +/* SMPI_DO_ONCE */{ for (i = 0; i < 2 * atoi(argv[1]); i++) { if (d < 10000) { d = d * d; diff --git a/examples/smpi/split.c b/examples/smpi/split.c index f451b877af..a87a4e18e4 100644 --- a/examples/smpi/split.c +++ b/examples/smpi/split.c @@ -7,9 +7,10 @@ int main(int argc, char *argv[]) MPI_Comm localcomm; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &worldrank); - MPI_Comm_split(MPI_COMM_WORLD, worldrank % 2, worldrank, &localcomm); - MPI_Comm_rank(localcomm, &localrank); - printf("node with world rank %d has local rank %d\n", worldrank, localrank); + printf("MPI_Comm_split is not implemented\n"); + //MPI_Comm_split(MPI_COMM_WORLD, worldrank % 2, worldrank, &localcomm); + //MPI_Comm_rank(localcomm, &localrank); + //printf("node with world rank %d has local rank %d\n", worldrank, localrank); MPI_Finalize(); return 0; } diff --git a/include/smpi/mpi.h b/include/smpi/mpi.h index c32b8def26..e177954747 100644 --- a/include/smpi/mpi.h +++ b/include/smpi/mpi.h @@ -8,8 +8,10 @@ #include #include +/* #define sleep(x) smpi_sleep(x) #define gettimeofday(x, y) smpi_gettimeofday(x, y) #define main(x, y) smpi_simulated_main(x, y) +*/ #endif diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index a3ad1369ef..016e9ececd 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -7,201 +7,211 @@ #include SG_BEGIN_DECL() +#define MPI_THREAD_SINGLE 0 +#define MPI_THREAD_FUNNELED 1 +#define MPI_THREAD_SERIALIZED 2 +#define MPI_THREAD_MULTIPLE 3 + +//FIXME: check values +#define MPI_MAX_PROCESSOR_NAME 100 +#define MPI_MAX_ERROR_STRING 100 +#define MPI_MAX_DATAREP_STRIN 100 +#define MPI_MAX_INFO_KEY 100 +#define MPI_MAX_INFO_VAL 100 +#define MPI_MAX_OBJECT_NAME 100 +#define MPI_MAX_PORT_NAME 100 + #define SMPI_RAND_SEED 5 #define MPI_ANY_SOURCE -1 #define MPI_ANY_TAG -1 #define MPI_UNDEFINED -1 + // errorcodes -#define MPI_SUCCESS 0 -#define MPI_ERR_COMM 1 -#define MPI_ERR_ARG 2 -#define MPI_ERR_TYPE 3 -#define MPI_ERR_REQUEST 4 -#define MPI_ERR_INTERN 5 -#define MPI_ERR_COUNT 6 -#define MPI_ERR_RANK 7 -#define MPI_ERR_TAG 8 -#define MPI_ERR_TRUNCATE 9 -// MPI_Comm - typedef struct smpi_mpi_communicator_t *smpi_mpi_communicator_t; - typedef smpi_mpi_communicator_t MPI_Comm; - -// MPI_Datatype - typedef struct smpi_mpi_datatype_t *smpi_mpi_datatype_t; - typedef smpi_mpi_datatype_t MPI_Datatype; - -// MPI_Request - typedef struct smpi_mpi_request_t *smpi_mpi_request_t; - typedef smpi_mpi_request_t MPI_Request; - -// MPI_Op - typedef struct smpi_mpi_op_t *smpi_mpi_op_t; - typedef smpi_mpi_op_t MPI_Op; - -// MPI_Status - struct smpi_mpi_status_t { - int MPI_SOURCE; - int MPI_TAG; - int MPI_ERROR; - }; - typedef struct smpi_mpi_status_t smpi_mpi_status_t; - typedef smpi_mpi_status_t MPI_Status; - -// global SMPI data structure - typedef struct smpi_mpi_global_t { - - smpi_mpi_communicator_t mpi_comm_world; - - smpi_mpi_datatype_t mpi_byte; - smpi_mpi_datatype_t mpi_char; - smpi_mpi_datatype_t mpi_int; - smpi_mpi_datatype_t mpi_float; - smpi_mpi_datatype_t mpi_double; - - smpi_mpi_op_t mpi_land; - smpi_mpi_op_t mpi_sum; - smpi_mpi_op_t mpi_prod; - smpi_mpi_op_t mpi_min; - smpi_mpi_op_t mpi_max; - - } s_smpi_mpi_global_t; - typedef struct smpi_mpi_global_t *smpi_mpi_global_t; - extern smpi_mpi_global_t smpi_mpi_global; - -#define MPI_COMM_WORLD (smpi_mpi_global->mpi_comm_world) -#define MPI_COMM_NULL NULL +#define MPI_SUCCESS 0 +#define MPI_ERR_COMM 1 +#define MPI_ERR_ARG 2 +#define MPI_ERR_TYPE 3 +#define MPI_ERR_REQUEST 4 +#define MPI_ERR_INTERN 5 +#define MPI_ERR_COUNT 6 +#define MPI_ERR_RANK 7 +#define MPI_ERR_TAG 8 +#define MPI_ERR_TRUNCATE 9 +#define MPI_ERR_GROUP 10 +#define MPI_ERR_OP 11 + +#define MPI_IDENT 0 +#define MPI_SIMILAR 1 +#define MPI_UNEQUAL 2 +#define MPI_CONGRUENT 3 + +typedef ptrdiff_t MPI_Aint; +typedef long long MPI_Offset; + +struct s_smpi_mpi_datatype; +typedef struct s_smpi_mpi_datatype* MPI_Datatype; + +typedef struct { + int MPI_SOURCE; + int MPI_TAG; + int MPI_ERROR; +} MPI_Status; #define MPI_STATUS_IGNORE NULL -#define MPI_Aint ptrdiff_t - -#define MPI_BYTE (smpi_mpi_global->mpi_byte) -#define MPI_CHAR (smpi_mpi_global->mpi_char) -#define MPI_INT (smpi_mpi_global->mpi_int) -#define MPI_FLOAT (smpi_mpi_global->mpi_float) -#define MPI_DOUBLE (smpi_mpi_global->mpi_double) - -#define MPI_LAND (smpi_mpi_global->mpi_land) -#define MPI_SUM (smpi_mpi_global->mpi_sum) -#define MPI_PROD (smpi_mpi_global->mpi_prod) -#define MPI_MIN (smpi_mpi_global->mpi_min) -#define MPI_MAX (smpi_mpi_global->mpi_max) - -// MPI macros -#define MPI_Init(a, b) SMPI_MPI_Init(a, b) -#define MPI_Finalize() SMPI_MPI_Finalize() -#define MPI_Abort(a, b) SMPI_MPI_Abort(a, b) -#define MPI_Comm_size(a, b) SMPI_MPI_Comm_size(a, b) -#define MPI_Comm_split(a, b, c, d) SMPI_MPI_Comm_split(a, b, c, d) -#define MPI_Comm_rank(a, b) SMPI_MPI_Comm_rank(a, b) -#define MPI_Type_size(a, b) SMPI_MPI_Type_size(a, b) -#define MPI_Type_get_extent(a, b, c) SMPI_MPI_Type_get_extent(a, b, c) -#define MPI_Type_lb(a, b) SMPI_MPI_Type_lb(a, b) -#define MPI_Type_ub(a, b) SMPI_MPI_Type_ub(a, b) - -#define MPI_Barrier(a) SMPI_MPI_Barrier(a) -#define MPI_Irecv(a, b, c, d, e, f, g) SMPI_MPI_Irecv(a, b, c, d, e, f, g) -#define MPI_Recv(a, b, c, d, e, f, g) SMPI_MPI_Recv(a, b, c, d, e, f, g) -#define MPI_Isend(a, b, c, d, e, f, g) SMPI_MPI_Isend(a, b, c, d, e, f, g) -#define MPI_Send(a, b, c, d, e, f) SMPI_MPI_Send(a, b, c, d, e, f) -#define MPI_Sendrecv( a, b, c, d, e, f, g, h, i, j, k, l) SMPI_MPI_Sendrecv(a, b, c, d, e, f, g, h, i, j, k, l) -#define MPI_Bcast(a, b, c, d, e) SMPI_MPI_Bcast(a, b, c, d, e) -#define MPI_Wait(a, b) SMPI_MPI_Wait(a, b) -#define MPI_Waitall(a, b, c) SMPI_MPI_Waitall(a, b, c) -#define MPI_Waitany(a, b, c, d) SMPI_MPI_Waitany(a, b, c, d) -#define MPI_Wtime() SMPI_MPI_Wtime() -#define MPI_Reduce(a, b, c, d, e, f, g) SMPI_MPI_Reduce(a, b, c, d, e, f, g) -#define MPI_Allreduce(a, b, c, d, e, f) SMPI_MPI_Allreduce(a, b, c, d, e, f) -#define MPI_Scatter(a, b, c, d, e, f, g, h ) SMPI_MPI_Scatter(a, b, c, d, e, f, g, h) -#define MPI_Alltoall(a, b, c, d, e, f, g ) SMPI_MPI_Alltoall(a, b, c, d, e, f, g) -#define MPI_Alltoallv(a, b, c, d, e, f, g, h, i) SMPI_MPI_Alltoallv(a, b, c, d, e, f, g, h, i) -#define MPI_Gather(a, b, c, d, e, f, g, h) SMPI_MPI_Gather(a, b, c, d, e, f, g, h) -#define MPI_Gatherv(a, b, c, d, e, f, g, h, i) SMPI_MPI_Gatherv(a, b, c, d, e, f, g, h, i) -#define MPI_Scatterv(a, b, c, d, e, f, g, h, i) SMPI_MPI_Scatterv(a, b, c, d, e, f, g, h, i) -#define MPI_Reduce_scatter(a, b, c, d, e, f) SMPI_MPI_Reduce_scatter(a, b, c, d, e, f) -#define MPI_Allgather(a, b, c, d, e, f, g) SMPI_MPI_Allgather(a, b, c, d, e, f, g) -#define MPI_Allgatherv(a, b, c, d, e, f, g, h) SMPI_MPI_Allgatherv(a, b, c, d, e, f, g, h) - -// SMPI Functions -XBT_PUBLIC(int) SMPI_MPI_Init(int *argc, char ***argv); -XBT_PUBLIC(int) SMPI_MPI_Finalize(void); -XBT_PUBLIC(int) SMPI_MPI_Abort(MPI_Comm comm, int errorcode); -XBT_PUBLIC(int) SMPI_MPI_Comm_size(MPI_Comm comm, int *size); -XBT_PUBLIC(int) SMPI_MPI_Comm_rank(MPI_Comm comm, int *rank); -XBT_PUBLIC(int) SMPI_MPI_Type_size(MPI_Datatype datatype, size_t * size); -XBT_PUBLIC(int) SMPI_MPI_Type_get_extent(MPI_Datatype datatype, MPI_Aint* lb, MPI_Aint *extent); -XBT_PUBLIC(int) SMPI_MPI_Type_lb(MPI_Datatype datatype, MPI_Aint* disp); -XBT_PUBLIC(int) SMPI_MPI_Type_ub(MPI_Datatype datatype, MPI_Aint* disp); - - -XBT_PUBLIC(int) SMPI_MPI_Barrier(MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Irecv(void *buf, int count, MPI_Datatype datatype, - int src, int tag, MPI_Comm comm, - MPI_Request * request); -XBT_PUBLIC(int) SMPI_MPI_Recv(void *buf, int count, MPI_Datatype datatype, - int src, int tag, MPI_Comm comm, - MPI_Status * status); -XBT_PUBLIC(int) SMPI_MPI_Isend(void *buf, int count, MPI_Datatype datatype, - int dst, int tag, MPI_Comm comm, - MPI_Request * request); -XBT_PUBLIC(int) SMPI_MPI_Send(void *buf, int count, MPI_Datatype datatype, - int dst, int tag, MPI_Comm comm); - -XBT_PUBLIC(int) SMPI_MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, - void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, - MPI_Comm comm, MPI_Status *status); - -XBT_PUBLIC(int) SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, - int root, MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Wait(MPI_Request * request, MPI_Status * status); -XBT_PUBLIC(int) SMPI_MPI_Waitall(int count, MPI_Request requests[], - MPI_Status status[]); -XBT_PUBLIC(int) SMPI_MPI_Waitany(int count, MPI_Request requests[], - int *index, MPI_Status status[]); -XBT_PUBLIC(int) SMPI_MPI_Comm_split(MPI_Comm comm, int color, int key, - MPI_Comm * comm_out); -XBT_PUBLIC(double) SMPI_MPI_Wtime(void); - -XBT_PUBLIC(int) SMPI_MPI_Reduce(void *sendbuf, void *recvbuf, int count, - MPI_Datatype datatype, MPI_Op op, int root, - MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Allreduce(void *sendbuf, void *recvbuf, int count, - MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); - -XBT_PUBLIC(int) SMPI_MPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype datatype, - void *recvbuf, int recvcount, MPI_Datatype recvtype,int root, MPI_Comm comm); - -XBT_PUBLIC(int) SMPI_MPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype datatype, - void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Alltoallv(void *sendbuf, int *scounts, int *sdisps, MPI_Datatype datatype, - void *recvbuf, int *rcounts, int *rdisps, MPI_Datatype recvtype, - MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, - void* recvbuf, int recvcount, MPI_Datatype recvtype, - int root, MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, - void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, - int root, MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Scatterv(void* sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, - void* recvbuf, int recvcount, MPI_Datatype recvtype, - int root, MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts, - MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, - void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); -XBT_PUBLIC(int) SMPI_MPI_Allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, - void* recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, - MPI_Comm comm); + +#define MPI_DATATYPE_NULL NULL +extern MPI_Datatype MPI_CHAR; +extern MPI_Datatype MPI_SHORT; +extern MPI_Datatype MPI_INT; +extern MPI_Datatype MPI_LONG; +extern MPI_Datatype MPI_LONG_LONG; +#define MPI_LONG_LONG_INT MPI_LONG_LONG +extern MPI_Datatype MPI_SIGNED_CHAR; +extern MPI_Datatype MPI_UNSIGNED_CHAR; +extern MPI_Datatype MPI_UNSIGNED_SHORT; +extern MPI_Datatype MPI_UNSIGNED; +extern MPI_Datatype MPI_UNSIGNED_LONG; +extern MPI_Datatype MPI_UNSIGNED_LONG_LONG; +extern MPI_Datatype MPI_FLOAT; +extern MPI_Datatype MPI_DOUBLE; +extern MPI_Datatype MPI_LONG_DOUBLE; +extern MPI_Datatype MPI_WCHAR; +extern MPI_Datatype MPI_C_BOOL; +extern MPI_Datatype MPI_INT8_T; +extern MPI_Datatype MPI_INT16_T; +extern MPI_Datatype MPI_INT32_T; +extern MPI_Datatype MPI_INT64_T; +extern MPI_Datatype MPI_UINT8_T; +#define MPI_BYTE MPI_UINT8_T +extern MPI_Datatype MPI_UINT16_T; +extern MPI_Datatype MPI_UINT32_T; +extern MPI_Datatype MPI_UINT64_T; +extern MPI_Datatype MPI_C_FLOAT_COMPLEX; +#define MPI_C_COMPLEX MPI_C_FLOAT_COMPLEX +extern MPI_Datatype MPI_C_DOUBLE_COMPLEX; +extern MPI_Datatype MPI_C_LONG_DOUBLE_COMPLEX; +extern MPI_Datatype MPI_AINT; +extern MPI_Datatype MPI_OFFSET; + +typedef void MPI_User_function(void* invec, void* inoutvec, int* len, MPI_Datatype* datatype); +struct s_smpi_mpi_op; +typedef struct s_smpi_mpi_op* MPI_Op; + +#define MPI_OP_NULL NULL +extern MPI_Op MPI_MAX; +extern MPI_Op MPI_MIN; +extern MPI_Op MPI_SUM; +extern MPI_Op MPI_PROD; +extern MPI_Op MPI_LAND; +extern MPI_Op MPI_LOR; +extern MPI_Op MPI_LXOR; +extern MPI_Op MPI_BAND; +extern MPI_Op MPI_BOR; +extern MPI_Op MPI_BXOR; + +struct s_smpi_mpi_group; +typedef struct s_smpi_mpi_group* MPI_Group; + +#define MPI_GROUP_NULL NULL +extern MPI_Group MPI_GROUP_EMPTY; + +struct s_smpi_mpi_communicator; +typedef struct s_smpi_mpi_communicator* MPI_Comm; + +#define MPI_COMM_NULL NULL +extern MPI_Comm MPI_COMM_WORLD; + +struct s_smpi_mpi_request; +typedef struct s_smpi_mpi_request* MPI_Request; + +#define MPI_REQUEST_NULL NULL + +XBT_PUBLIC(int) MPI_Init(int* argc, char*** argv); +XBT_PUBLIC(int) MPI_Finalize(void); +XBT_PUBLIC(int) MPI_Init_thread(int* argc, char*** argv, int required, int* provided); +XBT_PUBLIC(int) MPI_Query_thread(int* provided); +XBT_PUBLIC(int) MPI_Is_thread_main(int* flag); +XBT_PUBLIC(int) MPI_Abort(MPI_Comm comm, int errorcode); +XBT_PUBLIC(double) MPI_Wtime(void); + +XBT_PUBLIC(int) MPI_Type_size(MPI_Datatype datatype, size_t* size); +XBT_PUBLIC(int) MPI_Type_get_extent(MPI_Datatype datatype, MPI_Aint* lb, MPI_Aint* extent); +XBT_PUBLIC(int) MPI_Type_lb(MPI_Datatype datatype, MPI_Aint* disp); +XBT_PUBLIC(int) MPI_Type_ub(MPI_Datatype datatype, MPI_Aint* disp); + +XBT_PUBLIC(int) MPI_Op_create(MPI_User_function* function, int commute, MPI_Op* op); +XBT_PUBLIC(int) MPI_Op_free(MPI_Op* op); + +XBT_PUBLIC(int) MPI_Group_free(MPI_Group *group); +XBT_PUBLIC(int) MPI_Group_size(MPI_Group group, int* size); +XBT_PUBLIC(int) MPI_Group_rank(MPI_Group group, int* rank); +XBT_PUBLIC(int) MPI_Group_translate_ranks (MPI_Group group1, int n, int* ranks1, MPI_Group group2, int* ranks2); +XBT_PUBLIC(int) MPI_Group_compare(MPI_Group group1, MPI_Group group2, int* result); +XBT_PUBLIC(int) MPI_Group_union(MPI_Group group1, MPI_Group group2, MPI_Group* newgroup); +XBT_PUBLIC(int) MPI_Group_intersection(MPI_Group group1, MPI_Group group2, MPI_Group* newgroup); +XBT_PUBLIC(int) MPI_Group_difference(MPI_Group group1, MPI_Group group2, MPI_Group* newgroup); +XBT_PUBLIC(int) MPI_Group_incl(MPI_Group group, int n, int* ranks, MPI_Group* newgroup); +XBT_PUBLIC(int) MPI_Group_excl(MPI_Group group, int n, int* ranks, MPI_Group* newgroup); +XBT_PUBLIC(int) MPI_Group_range_incl(MPI_Group group, int n, int ranges[][3], MPI_Group* newgroup); +XBT_PUBLIC(int) MPI_Group_range_excl(MPI_Group group, int n, int ranges[][3], MPI_Group* newgroup); + +XBT_PUBLIC(int) MPI_Comm_rank(MPI_Comm comm, int* rank); +XBT_PUBLIC(int) MPI_Comm_size(MPI_Comm comm, int* size); +XBT_PUBLIC(int) MPI_Comm_group(MPI_Comm comm, MPI_Group* group); +XBT_PUBLIC(int) MPI_Comm_compare(MPI_Comm comm1, MPI_Comm comm2, int* result); +XBT_PUBLIC(int) MPI_Comm_dup(MPI_Comm comm, MPI_Comm* newcomm); +XBT_PUBLIC(int) MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm* newcomm); +XBT_PUBLIC(int) MPI_Comm_free(MPI_Comm* comm); + +XBT_PUBLIC(int) MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request* request); +XBT_PUBLIC(int) MPI_Isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm, MPI_Request* request); +XBT_PUBLIC(int) MPI_Recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status); +XBT_PUBLIC(int) MPI_Send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status); +XBT_PUBLIC(int) MPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype, int dst, int sendtag, int src, int recvtag, MPI_Comm comm, MPI_Status* status); + +XBT_PUBLIC(int) MPI_Test(MPI_Request* request, int* flag, MPI_Status* status); +XBT_PUBLIC(int) MPI_Testany(int count, MPI_Request requests[], int* index, int* flag, MPI_Status* status); +XBT_PUBLIC(int) MPI_Wait(MPI_Request* request, MPI_Status* status); +XBT_PUBLIC(int) MPI_Waitany(int count, MPI_Request requests[], int* index, MPI_Status* status); +XBT_PUBLIC(int) MPI_Waitall(int count, MPI_Request requests[], MPI_Status status[]); +XBT_PUBLIC(int) MPI_Waitsome(int incount, MPI_Request requests[], int* outcount, int* indices, MPI_Status status[]); + +XBT_PUBLIC(int) MPI_Bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Barrier(MPI_Comm comm); +XBT_PUBLIC(int) MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Reduce_scatter(void* sendbuf, void* recvbuf, int* recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Alltoall(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); +XBT_PUBLIC(int) MPI_Alltoallv(void* sendbuf, int* sendcounts, int* senddisps, MPI_Datatype sendtype, void* recvbuf, int *recvcounts, int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm); + +/* +TODO +XBT_PUBLIC(int) MPI_Comm_split(MPI_Comm comm, int color, int key, + MPI_Comm* comm_out); +*/ // smpi functions -XBT_IMPORT_NO_EXPORT(int) smpi_simulated_main(int argc, char **argv); +XBT_IMPORT_NO_EXPORT(int) smpi_simulated_main(int argc, char** argv); +/* XBT_PUBLIC(unsigned int) smpi_sleep(unsigned int); XBT_PUBLIC(void) smpi_exit(int); -XBT_PUBLIC(int) smpi_gettimeofday(struct timeval *tv, struct timezone *tz); +XBT_PUBLIC(int) smpi_gettimeofday(struct timeval* tv, struct timezone* tz); +*/ -XBT_PUBLIC(void) smpi_do_once_1(const char *file, int line); +/* +TODO +XBT_PUBLIC(void) smpi_do_once_1(const char* file, int line); XBT_PUBLIC(int) smpi_do_once_2(void); XBT_PUBLIC(void) smpi_do_once_3(void); #define SMPI_DO_ONCE for (smpi_do_once_1(__FILE__, __LINE__); smpi_do_once_2(); smpi_do_once_3()) +*/ SG_END_DECL() #endif diff --git a/src/smpi/private.h b/src/smpi/private.h index 7f9170c948..3962ca02e9 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -3,167 +3,97 @@ #include "xbt/mallocator.h" #include "xbt/xbt_os_time.h" - #include "simix/simix.h" - #include "smpi/smpi.h" -#define SMPI_DEFAULT_SPEED 100 -#define SMPI_REQUEST_MALLOCATOR_SIZE 100 -#define SMPI_MESSAGE_MALLOCATOR_SIZE 100 - -// smpi mpi communicator -typedef struct smpi_mpi_communicator_t { - int size; - int barrier_count; - smx_mutex_t barrier_mutex; - smx_cond_t barrier_cond; +struct s_smpi_process_data; +typedef struct s_smpi_process_data* smpi_process_data_t; - int *rank_to_index_map; - int *index_to_rank_map; - -} s_smpi_mpi_communicator_t; - -// smpi mpi datatype -typedef struct smpi_mpi_datatype_t { - size_t size; - ptrdiff_t lb; - ptrdiff_t ub; - uint16_t flags; /* flags: has it been committed, etc ...*/ - uint16_t id; /* unused so far : data id, normally the index in the data array. */ -} s_smpi_mpi_datatype_t; - -// smpi mpi request -typedef struct smpi_mpi_request_t { - smpi_mpi_communicator_t comm; +typedef struct s_smpi_mpi_request { + MPI_Comm comm; int src; int dst; int tag; - - void *buf; - int count; - smpi_mpi_datatype_t datatype; - - short int completed:1; - short int consumed:1; /* for waitany */ - - smx_mutex_t mutex; - smx_cond_t cond; - - void *data; - int forward; - + size_t size; + smx_rdv_t rdv; + smx_comm_t pair; + int complete; } s_smpi_mpi_request_t; -// smpi mpi op -typedef struct smpi_mpi_op_t { - void (*func) (void *a, void *b, int *length, MPI_Datatype * datatype); -} s_smpi_mpi_op_t; - -// smpi received message -typedef struct smpi_received_message_t { - smpi_mpi_communicator_t comm; - int src; - int tag; - - void *buf; - - void *data; - int forward; - -} s_smpi_received_message_t; -typedef struct smpi_received_message_t *smpi_received_message_t; - -typedef struct smpi_do_once_duration_node_t { - char *file; - int line; - double duration; - struct smpi_do_once_duration_node_t *next; -} s_smpi_do_once_duration_node_t; -typedef struct smpi_do_once_duration_node_t *smpi_do_once_duration_node_t; - -typedef struct smpi_global_t { - - // config vars - double reference_speed; - - // state vars - int process_count; - xbt_mallocator_t request_mallocator; - xbt_mallocator_t message_mallocator; - - smx_process_t *main_processes; - - xbt_os_timer_t timer; - smx_cond_t timer_cond; - - // keeps track of previous times - smpi_do_once_duration_node_t do_once_duration_nodes; - smx_mutex_t do_once_mutex; - double *do_once_duration; - -} s_smpi_global_t; -typedef struct smpi_global_t *smpi_global_t; -extern smpi_global_t smpi_global; - -typedef struct smpi_host_data_t { - int index; - smx_mutex_t mutex; - smx_cond_t cond; - - smx_process_t main; - smx_process_t sender; - smx_process_t receiver; - - int finalize; /* so that main process stops its sender&receiver */ - - xbt_fifo_t pending_recv_request_queue; - xbt_fifo_t pending_send_request_queue; - xbt_fifo_t received_message_queue; -} s_smpi_process_data_t; -typedef struct smpi_host_data_t *smpi_process_data_t; - -// function prototypes -void smpi_process_init(int *argc, char ***argv); -void smpi_process_finalize(void); -int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm); -int smpi_mpi_type_get_extent(MPI_Datatype datatype, MPI_Aint *lb, MPI_Aint *extent); - -int smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm); -int smpi_mpi_barrier(smpi_mpi_communicator_t comm); - -int smpi_mpi_isend(smpi_mpi_request_t request); -int smpi_mpi_irecv(smpi_mpi_request_t request); -int smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm); -int smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, - void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, - MPI_Comm comm, MPI_Status *status); -int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status); -int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[], smpi_mpi_status_t status[]); -int smpi_mpi_waitany(int count, smpi_mpi_request_t requests[], int *index, smpi_mpi_status_t status[]); +smpi_process_data_t smpi_process_data(void); +smpi_process_data_t smpi_process_remote_data(int index); +void smpi_global_init(void); +void smpi_global_destroy(void); +void smpi_process_init(int* argc, char*** argv); +void smpi_process_destroy(void); +int smpi_process_count(void); +int smpi_process_index(void); +xbt_os_timer_t smpi_process_timer(void); +void smpi_process_post_send(MPI_Comm comm, MPI_Request request); +void smpi_process_post_recv(MPI_Request request); + +size_t smpi_datatype_size(MPI_Datatype datatype); +MPI_Aint smpi_datatype_lb(MPI_Datatype datatype); +MPI_Aint smpi_datatype_ub(MPI_Datatype datatype); +int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint* lb, MPI_Aint * extent); +int smpi_datatype_copy(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype); + +MPI_Op smpi_op_new(MPI_User_function* function, int commute); +void smpi_op_destroy(MPI_Op op); +void smpi_op_apply(MPI_Op op, void* invec, void* inoutvec, int* len, MPI_Datatype* datatype); + +MPI_Group smpi_group_new(int size); +void smpi_group_destroy(MPI_Group group); +void smpi_group_set_mapping(MPI_Group group, int index, int rank); +int smpi_group_index(MPI_Group group, int rank); +int smpi_group_rank(MPI_Group group, int index); +int smpi_group_use(MPI_Group group); +int smpi_group_unuse(MPI_Group group); +int smpi_group_size(MPI_Group group); +int smpi_group_compare(MPI_Group group1, MPI_Group group2); + +MPI_Comm smpi_comm_new(MPI_Group group); +void smpi_comm_destroy(MPI_Comm comm); +MPI_Group smpi_comm_group(MPI_Comm comm); +int smpi_comm_size(MPI_Comm comm); +int smpi_comm_rank(MPI_Comm comm); + +MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm); +MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm); +void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status); +void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm); +void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status); +int smpi_mpi_test(MPI_Request* request, MPI_Status* status); +int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status); +void smpi_mpi_wait(MPI_Request* request, MPI_Status* status); +int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status); +void smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]); +int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]); +void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm); +void smpi_mpi_barrier(MPI_Comm comm); +void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); +void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm); +void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); +void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm); +void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); +void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); +void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm); +void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); + +void nary_tree_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, int arity); +void nary_tree_barrier(MPI_Comm comm, int arity); + +int smpi_coll_tuned_alltoall_bruck(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); +int smpi_coll_tuned_alltoall_basic_linear(void *sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); +int smpi_coll_tuned_alltoall_pairwise(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); +int smpi_coll_basic_alltoallv(void* sendbuf, int* sendcounts, int* senddisps, MPI_Datatype sendtype, void* recvbuf, int *recvcounts, int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm); // utilities +void smpi_bench_init(void); +void smpi_bench_destroy(void); void smpi_execute(double duration); -void smpi_start_timer(void); -double smpi_stop_timer(void); void smpi_bench_begin(void); void smpi_bench_end(void); -void smpi_bench_skip(void); - -void smpi_init(void); -void smpi_global_init(void); -void smpi_global_destroy(void); -int smpi_process_index(void); -smx_mutex_t smpi_process_mutex(void); -smx_cond_t smpi_process_cond(void); -int smpi_run_simulation(int *argc, char **argv); -int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype, - int src, int dst, int tag, - smpi_mpi_communicator_t comm, - smpi_mpi_request_t * request); - -int smpi_sender(int argc, char *argv[]); -int smpi_receiver(int argc, char *argv[]); #endif diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index f0605ea5eb..96028af185 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -13,410 +13,514 @@ XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver); XBT_LOG_EXTERNAL_CATEGORY(smpi_sender); XBT_LOG_EXTERNAL_CATEGORY(smpi_util); -smpi_mpi_global_t smpi_mpi_global = NULL; - - -/** - * Operations of MPI_OP : implemented=land,sum,min,max - **/ -void smpi_mpi_land_func(void *a, void *b, int *length, - MPI_Datatype * datatype); - -void smpi_mpi_land_func(void *a, void *b, int *length, - MPI_Datatype * datatype) -{ - int i; - if (*datatype == smpi_mpi_global->mpi_int) { - int *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] && y[i]; - } +void smpi_process_init(int* argc, char*** argv) { + int index; + smpi_process_data_t data; + smx_process_t proc; + + proc = SIMIX_process_self(); + index = atoi((*argv)[1]); + data = smpi_process_remote_data(index); + SIMIX_process_set_data(proc, data); + DEBUG2("<%d> New process in the game: %p", index, proc); + if (*argc > 2) { + memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2)); + (*argv)[(*argc) - 1] = NULL; } + (*argc)--; } -/** - * sum two vectors element-wise - * - * @param a the first vectors - * @param b the second vectors - * @return the second vector is modified and contains the element-wise sums - **/ -void smpi_mpi_sum_func(void *a, void *b, int *length, - MPI_Datatype * datatype); - -void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype) -{ - int i; - if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) { - char *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] + y[i]; - } - } else if (*datatype == smpi_mpi_global->mpi_int) { - int *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] + y[i]; - } - } else if (*datatype == smpi_mpi_global->mpi_float) { - float *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] + y[i]; - } - } else if (*datatype == smpi_mpi_global->mpi_double) { - double *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] + y[i]; - } - } +void smpi_process_destroy(void) { + int index = smpi_process_index(); + + DEBUG1("<%d> Process left the game", index); } -/** - *i multiply two vectors element-wise - * - * @param a the first vectors - * @param b the second vectors - * @return the second vector is modified and contains the element-wise products - **/ -void smpi_mpi_prod_func(void *a, void *b, int *length, - MPI_Datatype * datatype); - -void smpi_mpi_prod_func(void *a, void *b, int *length, MPI_Datatype * datatype) -{ - int i; - if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) { - char *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] * y[i]; - } - } else if (*datatype == smpi_mpi_global->mpi_int) { - int *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] * y[i]; - } - } else if (*datatype == smpi_mpi_global->mpi_float) { - float *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] * y[i]; - } - } else if (*datatype == smpi_mpi_global->mpi_double) { - double *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] * y[i]; - } - } + +/* MPI Low level calls */ +MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { + MPI_Request request; + + request = xbt_new(s_smpi_mpi_request_t, 1); + request->comm = comm; + request->src = smpi_comm_rank(comm); + request->dst = dst; + request->tag = tag; + request->size = smpi_datatype_size(datatype) * count; + request->complete = 0; + smpi_process_post_send(comm, request); + request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, request); + return request; } -/** - * compute the min of two vectors element-wise - **/ -void smpi_mpi_min_func(void *a, void *b, int *length, - MPI_Datatype * datatype); - -void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype) -{ - int i; - if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) { - char *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] < y[i] ? x[i] : y[i]; - } - } else { - if (*datatype == smpi_mpi_global->mpi_int) { - int *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] < y[i] ? x[i] : y[i]; - } - } else { - if (*datatype == smpi_mpi_global->mpi_float) { - float *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] < y[i] ? x[i] : y[i]; - } - } else { - if (*datatype == smpi_mpi_global->mpi_double) { - double *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] < y[i] ? x[i] : y[i]; - } - } - } - } - } +MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) { + MPI_Request request; + + request = xbt_new(s_smpi_mpi_request_t, 1); + request->comm = comm; + request->src = src; + request->dst = smpi_comm_rank(comm); + request->tag = tag; + request->size = smpi_datatype_size(datatype) * count; + request->complete = 0; + smpi_process_post_recv(request); + request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size); + return request; } -/** - * compute the max of two vectors element-wise - **/ -void smpi_mpi_max_func(void *a, void *b, int *length, - MPI_Datatype * datatype); - -void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype) -{ - int i; - if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) { - char *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] > y[i] ? x[i] : y[i]; - } - } else if (*datatype == smpi_mpi_global->mpi_int) { - int *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] > y[i] ? x[i] : y[i]; - } - } else if (*datatype == smpi_mpi_global->mpi_float) { - float *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] > y[i] ? x[i] : y[i]; - } - } else if (*datatype == smpi_mpi_global->mpi_double) { - double *x = a, *y = b; - for (i = 0; i < *length; i++) { - y[i] = x[i] > y[i] ? x[i] : y[i]; - } +void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) { + MPI_Request request; - } + request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm); + smpi_mpi_wait(&request, status); } +void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) { + MPI_Request request; + request = smpi_mpi_isend(buf, count, datatype, src, tag, comm); + smpi_mpi_wait(&request, MPI_STATUS_IGNORE); +} +void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) { + MPI_Request requests[2]; + MPI_Status stats[2]; -/** - * tell the MPI rank of the calling process (from its SIMIX process id) - **/ -int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm) -{ - return comm->index_to_rank_map[smpi_process_index()]; + requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm); + requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm); + smpi_mpi_waitall(2, requests, stats); + if(status != MPI_STATUS_IGNORE) { + // Copy receive status + memcpy(status, &stats[1], sizeof(MPI_Status)); + } } -void smpi_process_init(int *argc, char ***argv) -{ - smpi_process_data_t pdata; +static void finish_wait(MPI_Request* request, MPI_Status* status) { + MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair); - // initialize some local variables + xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation"); + if(status != MPI_STATUS_IGNORE) { + status->MPI_SOURCE = (*request)->src; + status->MPI_TAG = (*request)->tag; + status->MPI_ERROR = MPI_SUCCESS; + } + DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete); + // data == *request if sender is first to finish its wait + // data != *request if receiver is first to finish its wait + if(data->complete == 0) { + // first arrives here + data->complete = 1; + if(data != *request) { + // receveiver cleans its part + xbt_free(*request); + } + } else { + // second arrives here + if(data != *request) { + // receiver cleans everything + xbt_free(data); + } + xbt_free(*request); + } + *request = MPI_REQUEST_NULL; +} - pdata = xbt_new(s_smpi_process_data_t, 1); - SIMIX_process_set_data(SIMIX_process_self(), pdata); +int smpi_mpi_test(MPI_Request* request, MPI_Status* status) { + MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair); + int flag = data && data->complete == 1; - /* get rank from command line, and remove it from argv */ - pdata->index = atoi((*argv)[1]); - DEBUG1("I'm rank <%d>", pdata->index); - if (*argc > 2) { - memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2)); - (*argv)[(*argc) - 1] = NULL; + if(flag) { + finish_wait(request, status); } - (*argc)--; - - pdata->mutex = SIMIX_mutex_init(); - pdata->cond = SIMIX_cond_init(); - pdata->finalize = 0; - - pdata->pending_recv_request_queue = xbt_fifo_new(); - pdata->pending_send_request_queue = xbt_fifo_new(); - pdata->received_message_queue = xbt_fifo_new(); - - pdata->main = SIMIX_process_self(); - pdata->sender = SIMIX_process_create("smpi_sender", - smpi_sender, pdata, - SIMIX_host_get_name(SIMIX_host_self()), - 0, NULL, - /*props */ NULL); - pdata->receiver = SIMIX_process_create("smpi_receiver", - smpi_receiver, pdata, - SIMIX_host_get_name(SIMIX_host_self - ()), 0, NULL, - /*props */ NULL); - - smpi_global->main_processes[pdata->index] = SIMIX_process_self(); - return; + return flag; } -void smpi_process_finalize() -{ - smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self()); +int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) { + MPI_Request data; + int i, flag; - pdata->finalize = 2; /* Tell sender and receiver to quit */ - SIMIX_process_resume(pdata->sender); - SIMIX_process_resume(pdata->receiver); - while (pdata->finalize > 0) { /* wait until it's done */ - SIMIX_cond_wait(pdata->cond, pdata->mutex); + *index = MPI_UNDEFINED; + flag = 0; + for(i = 0; i < count; i++) { + if(requests[i] != MPI_REQUEST_NULL) { + data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair); + if(data != MPI_REQUEST_NULL && data->complete == 1) { + finish_wait(&requests[i], status); + *index = i; + flag = 1; + break; + } + } } + return flag; +} - SIMIX_mutex_destroy(pdata->mutex); - SIMIX_cond_destroy(pdata->cond); - xbt_fifo_free(pdata->pending_recv_request_queue); - xbt_fifo_free(pdata->pending_send_request_queue); - xbt_fifo_free(pdata->received_message_queue); - xbt_free(pdata); +void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) { + MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair); + + DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]", + *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag); + // data is null if receiver waits before sender enters the rdv + if(data == MPI_REQUEST_NULL || data->complete == 0) { + SIMIX_network_wait((*request)->pair, -1.0); + } + finish_wait(request, status); } +int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) { + xbt_dynar_t comms; + MPI_Request data; + int i, size, index; + int* map; + + index = MPI_UNDEFINED; + if(count > 0) { + // First check for already completed requests + for(i = 0; i < count; i++) { + if(requests[i] != MPI_REQUEST_NULL) { + data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair); + if(data != MPI_REQUEST_NULL && data->complete == 1) { + index = i; + break; + } + } + } + if(index == MPI_UNDEFINED) { + // Otherwise, wait for a request to complete + comms = xbt_dynar_new(sizeof(smx_comm_t), NULL); + map = xbt_new(int, count); + size = 0; + DEBUG0("Wait for one of"); + for(i = 0; i < count; i++) { + if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) { + DEBUG4(" request %p [src = %d, dst = %d, tag = %d]", + requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag); + xbt_dynar_push(comms, &requests[i]->pair); + map[size] = i; + size++; + } + } + if(size > 0) { + index = SIMIX_network_waitany(comms); + index = map[index]; + } + xbt_free(map); + xbt_dynar_free_container(&comms); + } + if(index != MPI_UNDEFINED) { + finish_wait(&requests[index], status); + } + } + return index; +} -/*int smpi_mpi_barrier(smpi_mpi_communicator_t comm) -{ +void smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) { + int index; + MPI_Status stat; - SIMIX_mutex_lock(comm->barrier_mutex); - ++comm->barrier_count; - if (comm->barrier_count > comm->size) { // only happens on second barrier... - comm->barrier_count = 0; - } else if (comm->barrier_count == comm->size) { - SIMIX_cond_broadcast(comm->barrier_cond); + while(count > 0) { + index = smpi_mpi_waitany(count, requests, &stat); + if(index == MPI_UNDEFINED) { + break; + } + if(status != MPI_STATUS_IGNORE) { + memcpy(&status[index], &stat, sizeof(stat)); + } + // Move the last request to the found position + requests[index] = requests[count - 1]; + requests[count - 1] = MPI_REQUEST_NULL; + count--; } - while (comm->barrier_count < comm->size) { - SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex); +} + +int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) { + MPI_Request data; + int i, count; + + count = 0; + for(i = 0; i < count; i++) { + if(requests[i] != MPI_REQUEST_NULL) { + data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair); + if(data != MPI_REQUEST_NULL && data->complete == 1) { + finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE); + indices[count] = i; + count++; + } + } } - SIMIX_mutex_unlock(comm->barrier_mutex); + return count; +} - return MPI_SUCCESS; +void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) { + // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI)) + nary_tree_bcast(buf, count, datatype, root, comm, 4); +} + +void smpi_mpi_barrier(MPI_Comm comm) { + // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI)) + nary_tree_barrier(comm, 4); } -*/ -int smpi_mpi_isend(smpi_mpi_request_t request) -{ - smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self()); - int retval = MPI_SUCCESS; +void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { + int system_tag = 666; + int rank, size, src, index, sendsize, recvsize; + MPI_Request* requests; - if (NULL == request) { - retval = MPI_ERR_INTERN; + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + if(rank != root) { + // Send buffer to root + smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm); } else { - xbt_fifo_push(pdata->pending_send_request_queue, request); - SIMIX_process_resume(pdata->sender); + sendsize = smpi_datatype_size(sendtype); + recvsize = smpi_datatype_size(recvtype); + // Local copy from root + memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char)); + // Receive buffers from senders + requests = xbt_new(MPI_Request, size - 1); + index = 0; + for(src = 0; src < size; src++) { + if(src != root) { + requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm); + index++; + } + } + // Wait for completion of irecv's. + smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE); + xbt_free(requests); } - - return retval; } -int smpi_mpi_irecv(smpi_mpi_request_t request) -{ - int retval = MPI_SUCCESS; - smpi_process_data_t pdata = SIMIX_process_get_data(SIMIX_process_self()); +void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) { + int system_tag = 666; + int rank, size, src, index, sendsize; + MPI_Request* requests; - if (NULL == request) { - retval = MPI_ERR_INTERN; + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + if(rank != root) { + // Send buffer to root + smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm); } else { - xbt_fifo_push(pdata->pending_recv_request_queue, request); - - if (SIMIX_process_is_suspended(pdata->receiver)) { - SIMIX_process_resume(pdata->receiver); + sendsize = smpi_datatype_size(sendtype); + // Local copy from root + memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char)); + // Receive buffers from senders + requests = xbt_new(MPI_Request, size - 1); + index = 0; + for(src = 0; src < size; src++) { + if(src != root) { + requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm); + index++; + } } + // Wait for completion of irecv's. + smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE); + xbt_free(requests); } - - return retval; } -void print_req( smpi_mpi_request_t r ); -void print_req( smpi_mpi_request_t r ) { - fprintf(stderr,"***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed); +void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { + int system_tag = 666; + int rank, size, other, index, sendsize, recvsize; + MPI_Request* requests; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + sendsize = smpi_datatype_size(sendtype); + recvsize = smpi_datatype_size(recvtype); + // Local copy from self + memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char)); + // Send/Recv buffers to/from others; + requests = xbt_new(MPI_Request, 2 * (size - 1)); + index = 0; + for(other = 0; other < size; other++) { + if(other != rank) { + requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm); + index++; + requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm); + index++; + } + } + // Wait for completion of all comms. + smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE); + xbt_free(requests); } +void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) { + int system_tag = 666; + int rank, size, other, index, sendsize, recvsize; + MPI_Request* requests; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + sendsize = smpi_datatype_size(sendtype); + recvsize = smpi_datatype_size(recvtype); + // Local copy from self + memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char)); + // Send buffers to others; + requests = xbt_new(MPI_Request, 2 * (size - 1)); + index = 0; + for(other = 0; other < size; other++) { + if(other != rank) { + requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm); + index++; + requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm); + index++; + } + } + // Wait for completion of all comms. + smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE); + xbt_free(requests); +} -/** - * wait and friends ... - **/ -int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status) -{ - int retval = MPI_SUCCESS; +void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { + int system_tag = 666; + int rank, size, dst, index, sendsize, recvsize; + MPI_Request* requests; - if (NULL == request) { - retval = MPI_ERR_INTERN; + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + if(rank != root) { + // Recv buffer from root + smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE); } else { - - DEBUG3("entered smpi_mpi_wait() for req_src=%d,req_dst=%d,req_tag=%d", - request->src,request->dst,request->tag); - SIMIX_mutex_lock(request->mutex); -//#define DEBUG_STEPH -#ifdef DEBUG_STEPH - print_req( request ); //@@ -#endif - while (!request->completed) { - SIMIX_cond_wait(request->cond, request->mutex); - } - if (NULL != status) { - status->MPI_SOURCE = request->src; - status->MPI_TAG = request->tag; - status->MPI_ERROR = MPI_SUCCESS; + sendsize = smpi_datatype_size(sendtype); + recvsize = smpi_datatype_size(recvtype); + // Local copy from root + memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char)); + // Send buffers to receivers + requests = xbt_new(MPI_Request, size - 1); + index = 0; + for(dst = 0; dst < size; dst++) { + if(dst != root) { + requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm); + index++; + } } - SIMIX_mutex_unlock(request->mutex); + // Wait for completion of isend's. + smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE); + xbt_free(requests); } - - return retval; } -/** - * waitall - **/ -int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[], - smpi_mpi_status_t status[]) -{ - int cpt; - int index; - int retval; - smpi_mpi_status_t stat; - - for (cpt = 0; cpt < count; cpt++) { - retval = smpi_mpi_waitany(count, requests, &index, &stat); - if (retval != MPI_SUCCESS) - return retval; - if (MPI_STATUS_IGNORE != status) - memcpy(&(status[index]), &stat, sizeof(stat)); - } - return MPI_SUCCESS; -} +void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { + int system_tag = 666; + int rank, size, dst, index, sendsize, recvsize; + MPI_Request* requests; -/** - * waitany - **/ -int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, - smpi_mpi_status_t * status) -{ - int cpt; - - DEBUG0("entering smpi_wait_any() ..."); - *index = MPI_UNDEFINED; - if (NULL == requests) { - return MPI_ERR_INTERN; - } - /* First check if one of them is already done */ - for (cpt = 0; cpt < count; cpt++) { - DEBUG2(" exam req[%d] of msg from <%d>",cpt,requests[cpt]->src); - if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */ - DEBUG2("smpi_wait_any() found match req[%d] of msg from <%d>",cpt,requests[cpt]->src); - *index = cpt; - goto found_request; + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + if(rank != root) { + // Recv buffer from root + smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE); + } else { + sendsize = smpi_datatype_size(sendtype); + recvsize = smpi_datatype_size(recvtype); + // Local copy from root + memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char)); + // Send buffers to receivers + requests = xbt_new(MPI_Request, size - 1); + index = 0; + for(dst = 0; dst < size; dst++) { + if(dst != root) { + requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm); + index++; + } } + // Wait for completion of isend's. + smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE); + xbt_free(requests); } - /* If none found, block */ - /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */ - while (1) { - for (cpt = 0; cpt < count; cpt++) { - -#ifdef DEBUG_STEPH - print_req( requests[cpt] ); -#endif - if (!requests[cpt]->completed) { /* this one is not done, wait on it */ - DEBUG3("smpi_waitany() blocked waiting a msg <%d> -> <%d>, tag=%d",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag); - while (!requests[cpt]->completed) - SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex); - - *index = cpt; - goto found_request; +} + +void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) { + int system_tag = 666; + int rank, size, src, index, datasize; + MPI_Request* requests; + void** tmpbufs; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + if(rank != root) { + // Send buffer to root + smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm); + } else { + datasize = smpi_datatype_size(datatype); + // Local copy from root + memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); + // Receive buffers from senders + //TODO: make a MPI_barrier here ? + requests = xbt_new(MPI_Request, size - 1); + tmpbufs = xbt_new(void*, size - 1); + index = 0; + for(src = 0; src < size; src++) { + if(src != root) { + tmpbufs[index] = xbt_malloc(count * datasize); + requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm); + index++; + } + } + // Wait for completion of irecv's. + for(src = 0; src < size - 1; src++) { + index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); + if(index == MPI_UNDEFINED) { + break; } + smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype); } - if (cpt == count) /* they are all done. Damn user */ - return MPI_ERR_REQUEST; + for(index = 0; index < size - 1; index++) { + xbt_free(tmpbufs[index]); + } + xbt_free(tmpbufs); + xbt_free(requests); } +} -found_request: -#ifdef DEBUG_STEPH - print_req( requests[cpt] ); -#endif - requests[*index]->consumed = 1; -#ifdef DEBUG_STEPH - print_req( requests[cpt] ); -#endif - DEBUG2("smpi_waitany() request %p unblocked ... mark *req[%d]->consumed",requests[*index],cpt); - if (NULL != status) { - status->MPI_SOURCE = requests[*index]->src; - status->MPI_TAG = requests[*index]->tag; - status->MPI_ERROR = MPI_SUCCESS; +void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { + smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm); + smpi_mpi_bcast(recvbuf, count, datatype, 0, comm); + +/* +FIXME: buggy implementation + + int system_tag = 666; + int rank, size, other, index, datasize; + MPI_Request* requests; + void** tmpbufs; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + datasize = smpi_datatype_size(datatype); + // Local copy from self + memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); + // Send/Recv buffers to/from others; + //TODO: make a MPI_barrier here ? + requests = xbt_new(MPI_Request, 2 * (size - 1)); + tmpbufs = xbt_new(void*, size - 1); + index = 0; + for(other = 0; other < size; other++) { + if(other != rank) { + tmpbufs[index / 2] = xbt_malloc(count * datasize); + requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm); + requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm); + index += 2; + } } - return MPI_SUCCESS; - + // Wait for completion of all comms. + for(other = 0; other < 2 * (size - 1); other++) { + index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); + if(index == MPI_UNDEFINED) { + break; + } + if((index & 1) == 1) { + // Request is odd: it's a irecv + smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype); + } + } + for(index = 0; index < size - 1; index++) { + xbt_free(tmpbufs[index]); + } + xbt_free(tmpbufs); + xbt_free(requests); +*/ } diff --git a/src/smpi/smpi_bench.c b/src/smpi/smpi_bench.c index cc81f08ebb..c15fc3533a 100644 --- a/src/smpi/smpi_bench.c +++ b/src/smpi/smpi_bench.c @@ -6,77 +6,69 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_bench, smpi, void smpi_execute(double duration) { - smx_host_t host = SIMIX_host_self(); - smx_mutex_t mutex = smpi_process_mutex(); - smx_cond_t cond = smpi_process_cond(); + smx_host_t host; smx_action_t action; + smx_mutex_t mutex; + smx_cond_t cond; e_surf_action_state_t state; - if (duration < 0.001) - return; - DEBUG1("Sleep for %f to handle real computation time",duration); - SIMIX_mutex_lock(mutex); - action = - SIMIX_action_execute(host, "execute", - duration * smpi_global->reference_speed); - - SIMIX_register_action_to_condition(action, cond); - for (state = SIMIX_action_get_state(action); - state == SURF_ACTION_READY || - state == SURF_ACTION_RUNNING; state = SIMIX_action_get_state(action) - ) { - SIMIX_cond_wait(cond, mutex); + if(duration > 0.001) { + host = SIMIX_host_self(); + mutex = SIMIX_mutex_init(); + cond = SIMIX_cond_init(); + DEBUG1("Sleep for %f to handle real computation time", duration); + duration *= xbt_cfg_get_double(_surf_cfg_set, "reference_speed"); + action = SIMIX_action_sleep(host, duration); + SIMIX_mutex_lock(mutex); + SIMIX_register_action_to_condition(action, cond); + for (state = SIMIX_action_get_state(action); + state == SURF_ACTION_READY || + state == SURF_ACTION_RUNNING; state = SIMIX_action_get_state(action)) { + SIMIX_cond_wait(cond, mutex); + } + SIMIX_unregister_action_to_condition(action, cond); + SIMIX_mutex_unlock(mutex); + SIMIX_action_destroy(action); + SIMIX_cond_destroy(cond); + SIMIX_mutex_destroy(mutex); } - SIMIX_unregister_action_to_condition(action, cond); - SIMIX_action_destroy(action); - - SIMIX_mutex_unlock(mutex); - - return; -} - -void smpi_start_timer() -{ - xbt_os_timer_start(smpi_global->timer); -} - -double smpi_stop_timer() -{ - double duration; - xbt_os_timer_stop(smpi_global->timer); - duration = xbt_os_timer_elapsed(smpi_global->timer); - return duration; } void smpi_bench_begin() { - smpi_start_timer(); + xbt_os_timer_start(smpi_process_timer()); } void smpi_bench_end() { - smpi_execute(smpi_stop_timer()); + xbt_os_timer_t timer = smpi_process_timer(); + + xbt_os_timer_stop(timer); + smpi_execute(xbt_os_timer_elapsed(timer)); } +/* +TODO void smpi_do_once_1(const char *file, int line) { smpi_do_once_duration_node_t curr, prev; + smpi_bench_end(); SIMIX_mutex_lock(smpi_global->do_once_mutex); prev = NULL; - for (curr = smpi_global->do_once_duration_nodes; - NULL != curr && (strcmp(curr->file, file) || curr->line != line); - curr = curr->next) { + for(curr = smpi_global->do_once_duration_nodes; + NULL != curr && (strcmp(curr->file, file) || curr->line != line); + curr = curr->next) { prev = curr; } - if (NULL == curr) { + if(NULL == curr) { curr = xbt_new(s_smpi_do_once_duration_node_t, 1); curr->file = xbt_strdup(file); curr->line = line; curr->duration = -1; curr->next = NULL; - if (NULL == prev) { + if(NULL == prev) { smpi_global->do_once_duration_nodes = curr; } else { prev->next = curr; @@ -88,7 +80,8 @@ void smpi_do_once_1(const char *file, int line) int smpi_do_once_2() { double duration = *(smpi_global->do_once_duration); - if (0 > duration) { + + if(0 > duration) { smpi_start_timer(); return 1; } @@ -102,3 +95,4 @@ void smpi_do_once_3() { *(smpi_global->do_once_duration) = smpi_stop_timer(); } +*/ diff --git a/src/smpi/smpi_coll.c b/src/smpi/smpi_coll.c index 42217104de..c5a935a950 100644 --- a/src/smpi/smpi_coll.c +++ b/src/smpi/smpi_coll.c @@ -8,9 +8,7 @@ /* This program is free software; you can redistribute it and/or modify it * * under the terms of the license (GNU LGPL) which comes with this package. */ - #include -#include #include #include @@ -20,805 +18,304 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_coll, smpi, "Logging specific to SMPI (coll)"); -/* proc_tree taken and translated from P2P-MPI */ - -struct proc_tree { - int PROCTREE_A; - int numChildren; - int * child; - int parent; - int me; - int root; - int isRoot; +struct s_proc_tree { + int PROCTREE_A; + int numChildren; + int * child; + int parent; + int me; + int root; + int isRoot; }; -typedef struct proc_tree * proc_tree_t; - - - -/* prototypes */ -void build_tree( int index, int extent, proc_tree_t *tree); -proc_tree_t alloc_tree( int arity ); -void free_tree( proc_tree_t tree); -void print_tree(proc_tree_t tree); - - - +typedef struct s_proc_tree * proc_tree_t; /** * alloc and init **/ -proc_tree_t alloc_tree( int arity ) { - proc_tree_t tree = malloc(1*sizeof(struct proc_tree)); - int i; - - tree->PROCTREE_A = arity; - tree->isRoot = 0; - tree->numChildren = 0; - tree->child = malloc(arity*sizeof(int)); - for (i=0; i < arity; i++) { - tree->child[i] = -1; - } - tree->root = -1; - tree->parent = -1; - return( tree ); +static proc_tree_t alloc_tree(int arity) { + proc_tree_t tree; + int i; + + tree = xbt_new(struct s_proc_tree, 1); + tree->PROCTREE_A = arity; + tree->isRoot = 0; + tree->numChildren = 0; + tree->child = xbt_new(int, arity); + for(i = 0; i < arity; i++) { + tree->child[i] = -1; + } + tree->root = -1; + tree->parent = -1; + return tree; } /** * free **/ -void free_tree( proc_tree_t tree) { - free (tree->child ); - free(tree); +static void free_tree(proc_tree_t tree) { + xbt_free(tree->child ); + xbt_free(tree); } - - /** * Build the tree depending on a process rank (index) and the group size (extent) * @param index the rank of the calling process * @param extent the total number of processes **/ -void build_tree( int index, int extent, proc_tree_t *tree) { - int places = (*tree)->PROCTREE_A * index; - int i; - int ch; - int pr; - - (*tree)->me = index; - (*tree)->root = 0 ; - - for (i = 1; i <= (*tree)->PROCTREE_A; i++) { - ++places; - ch = (*tree)->PROCTREE_A * index + i + (*tree)->root; - //printf("places %d\n",places); - ch %= extent; - if (places < extent) { - //printf("ch <%d> = <%d>\n",i,ch); - //printf("adding to the tree at index <%d>\n\n",i-1); - (*tree)->child[i - 1] = ch; - (*tree)->numChildren++; - } - else { - //fprintf(stderr,"not adding to the tree\n"); - } - } - //fprintf(stderr,"procTree.numChildren <%d>\n",(*tree)->numChildren); - - if (index == (*tree)->root) { - (*tree)->isRoot = 1; - } - else { - (*tree)->isRoot = 0; - pr = (index - 1) / (*tree)->PROCTREE_A; - (*tree)->parent = pr; - } +static void build_tree(int index, int extent, proc_tree_t* tree) { + int places = (*tree)->PROCTREE_A * index; + int i, ch, pr; + + (*tree)->me = index; + (*tree)->root = 0 ; + for(i = 1; i <= (*tree)->PROCTREE_A; i++) { + ++places; + ch = (*tree)->PROCTREE_A * index + i + (*tree)->root; + ch %= extent; + if(places < extent) { + (*tree)->child[i - 1] = ch; + (*tree)->numChildren++; + } + } + if(index == (*tree)->root) { + (*tree)->isRoot = 1; + } else { + (*tree)->isRoot = 0; + pr = (index - 1) / (*tree)->PROCTREE_A; + (*tree)->parent = pr; + } } -/** - * prints the tree as a graphical representation - **/ -void print_tree(proc_tree_t tree) { - int i; - char *spacer; - if (-1 != tree->parent ) { - printf("[%d]\n +---",tree->parent); - spacer= strdup(" "); - } - else { - spacer=strdup(""); - } - printf("<%d>\n",tree->me); - for (i=0;i < tree->numChildren; i++) { - printf("%s +--- %d\n", spacer,tree->child[i]); - } - free(spacer); -} - /** * bcast **/ -int tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, proc_tree_t tree); -int tree_bcast( void *buf, int count, MPI_Datatype datatype, int root, - MPI_Comm comm, proc_tree_t tree) -{ - int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked) - int rank; - int retval = MPI_SUCCESS; - int i; - smpi_mpi_request_t request; - smpi_mpi_request_t * requests; - - rank = smpi_mpi_comm_rank(comm); - - /* wait for data from my parent in the tree */ - if (!tree->isRoot) { - DEBUG3("<%d> tree_bcast(): i am not root: recv from %d, tag=%d)",rank,tree->parent,system_tag+rank); - retval = smpi_create_request(buf, count, datatype, - tree->parent, rank, - system_tag + rank, - comm, &request); - if (MPI_SUCCESS != retval) { - printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", - rank,retval,__FILE__,__LINE__); - } - smpi_mpi_irecv(request); - DEBUG2("<%d> tree_bcast(): waiting on irecv from %d",rank, tree->parent); - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, request); - } - - requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t)); - DEBUG2("<%d> creates %d requests (1 per child)\n",rank,tree->numChildren); - - /* iniates sends to ranks lower in the tree */ - for (i=0; i < tree->numChildren; i++) { - if (tree->child[i] != -1) { - DEBUG3("<%d> send to <%d>,tag=%d",rank,tree->child[i], system_tag+tree->child[i]); - retval = smpi_create_request(buf, count, datatype, - rank, tree->child[i], - system_tag + tree->child[i], - comm, &(requests[i])); - if (MPI_SUCCESS != retval) { - printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", - rank,retval,__FILE__,__LINE__); - } - smpi_mpi_isend(requests[i]); - /* FIXME : we should not wait immediately here. See next FIXME. */ - smpi_mpi_wait( requests[i], MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, requests[i]); - } - } - /* FIXME : normally, we sould wait only once all isend have been issued: - * this is the following commented code. It deadlocks, probably because - * of a bug in the sender process */ - - /* wait for completion of sends */ - /* printf("[%d] wait for %d send completions\n",rank,tree->numChildren); - smpi_mpi_waitall( tree->numChildren, requests, MPI_STATUS_IGNORE); - printf("[%d] reqs completed\n)",rank); - */ - - xbt_free(requests); - return(retval); - /* checked ok with valgrind --leak-check=full*/ +static void tree_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, proc_tree_t tree) { + int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked) + int rank, i; + MPI_Request* requests; + + rank = smpi_comm_rank(comm); + /* wait for data from my parent in the tree */ + if(!tree->isRoot) { + DEBUG3("<%d> tree_bcast(): i am not root: recv from %d, tag=%d)", + rank, tree->parent, system_tag + rank); + smpi_mpi_recv(buf, count, datatype, tree->parent, system_tag + rank, comm, MPI_STATUS_IGNORE); + } + requests = xbt_new(MPI_Request, tree->numChildren); + DEBUG2("<%d> creates %d requests (1 per child)\n", rank, tree->numChildren); + /* iniates sends to ranks lower in the tree */ + for(i = 0; i < tree->numChildren; i++) { + if(tree->child[i] == -1) { + requests[i] = MPI_REQUEST_NULL; + } else { + DEBUG3("<%d> send to <%d>, tag=%d", rank, tree->child[i], system_tag + tree->child[i]); + requests[i] = smpi_mpi_isend(buf, count, datatype, tree->child[i], system_tag + tree->child[i], comm); + } + } + smpi_mpi_waitall(tree->numChildren, requests, MPI_STATUS_IGNORE); + xbt_free(requests); } - /** * anti-bcast **/ -int tree_antibcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, proc_tree_t tree); -int tree_antibcast( void *buf, int count, MPI_Datatype datatype, int root, - MPI_Comm comm, proc_tree_t tree) -{ - int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked) - int rank; - int retval = MPI_SUCCESS; - int i; - smpi_mpi_request_t request; - smpi_mpi_request_t * requests; - - rank = smpi_mpi_comm_rank(comm); - - //------------------anti-bcast------------------- - - // everyone sends to its parent, except root. - if (!tree->isRoot) { - retval = smpi_create_request(buf, count, datatype, - rank,tree->parent, - system_tag + rank, - comm, &request); - if (MPI_SUCCESS != retval) { - ERROR4("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", - rank,retval,__FILE__,__LINE__); - } - smpi_mpi_isend(request); - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, request); - } - - //every one receives as many messages as it has children - requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t)); - for (i=0; i < tree->numChildren; i++) { - if (tree->child[i] != -1) { - retval = smpi_create_request(buf, count, datatype, - tree->child[i], rank, - system_tag + tree->child[i], - comm, &(requests[i])); - if (MPI_SUCCESS != retval) { - printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", - rank,retval,__FILE__,__LINE__); - } - smpi_mpi_irecv(requests[i]); - /* FIXME : we should not wait immediately here. See next FIXME. */ - smpi_mpi_wait( requests[i], MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, requests[i]); - } - } - xbt_free(requests); - return(retval); - - /* checked ok with valgrind --leak-check=full*/ +static void tree_antibcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, proc_tree_t tree) { + int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked) + int rank, i; + MPI_Request* requests; + + rank = smpi_comm_rank(comm); + // everyone sends to its parent, except root. + if(!tree->isRoot) { + DEBUG3("<%d> tree_antibcast(): i am not root: send to %d, tag=%d)", + rank, tree->parent, system_tag + rank); + smpi_mpi_send(buf, count, datatype, tree->parent, system_tag + rank, comm); + } + //every one receives as many messages as it has children + requests = xbt_new(MPI_Request, tree->numChildren); + DEBUG2("<%d> creates %d requests (1 per child)\n", rank, tree->numChildren); + for(i = 0; i < tree->numChildren; i++) { + if(tree->child[i] == -1) { + requests[i] = MPI_REQUEST_NULL; + } else { + DEBUG3("<%d> recv from <%d>, tag=%d", rank, tree->child[i], system_tag + tree->child[i]); + requests[i] = smpi_mpi_irecv(buf, count, datatype, tree->child[i], system_tag + tree->child[i], comm); + } + } + smpi_mpi_waitall(tree->numChildren, requests, MPI_STATUS_IGNORE); + xbt_free(requests); } /** * bcast with a binary, ternary, or whatever tree .. **/ -int nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, - MPI_Comm comm, int arity) -{ -int rank; -int retval; - - rank = smpi_mpi_comm_rank( comm ); - DEBUG2("<%d> entered nary_tree_bcast(), arity=%d",rank,arity); - // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI)) - proc_tree_t tree = alloc_tree( arity ); - build_tree( rank, comm->size, &tree ); - - retval = tree_bcast( buf, count, datatype, root, comm, tree ); - - free_tree( tree ); - return( retval ); +void nary_tree_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, int arity) { + proc_tree_t tree = alloc_tree(arity); + int rank, size; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + build_tree(rank, size, &tree); + tree_bcast(buf, count, datatype, root, comm, tree); + free_tree(tree); } - /** - * Barrier + * barrier with a binary, ternary, or whatever tree .. **/ -int nary_tree_barrier( MPI_Comm comm , int arity) -{ - int rank; - int retval = MPI_SUCCESS; - char dummy='$'; - - rank = smpi_mpi_comm_rank(comm); - // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI)) - proc_tree_t tree = alloc_tree( arity ); - - build_tree( rank, comm->size, &tree ); - - retval = tree_antibcast( &dummy, 1, MPI_CHAR, 0, comm, tree); - if (MPI_SUCCESS != retval) { - printf("[%s:%d] ** Error: tree_antibcast() returned retval=%d\n",__FILE__,__LINE__,retval); - } - else { - retval = tree_bcast(&dummy, 1, MPI_CHAR, 0, comm, tree); - } - - free_tree( tree ); - return(retval); - - /* checked ok with valgrind --leak-check=full*/ +void nary_tree_barrier(MPI_Comm comm, int arity) { + proc_tree_t tree = alloc_tree( arity ); + int rank, size; + char dummy='$'; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + build_tree(rank, size, &tree); + tree_antibcast(&dummy, 1, MPI_CHAR, 0, comm, tree); + tree_bcast(&dummy, 1, MPI_CHAR, 0, comm, tree); + free_tree(tree); } - /** - * Alltoall pairwise + * Alltoall Bruck * - * this algorithm performs size steps (1<=s<=size) and - * at each step s, a process p sends iand receive to.from a unique distinct remote process - * size=5 : s=1: 4->0->1, 0->1->2, 1->2->3, ... - * s=2: 3->0->2, 4->1->3, 0->2->4, 1->3->0 , 2->4->1 - * .... - * Openmpi calls this routine when the message size sent to each rank is greater than 3000 bytes + * Openmpi calls this routine when the message size sent to each rank < 2000 bytes and size < 12 **/ - -int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatype datatype, - void* recvbuf, int recvcount, MPI_Datatype recvdatatype, MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int rank; - int size = comm->size; - int step; - int sendto, recvfrom; - int tag_alltoall=999; - void * tmpsend, *tmprecv; - - rank = smpi_mpi_comm_rank(comm); - INFO1("<%d> algorithm alltoall_pairwise() called.\n",rank); - - - /* Perform pairwise exchange - starting from 1 so the local copy is last */ - for (step = 1; step < size+1; step++) { - - /* who do we talk to in this step? */ - sendto = (rank+step)%size; - recvfrom = (rank+size-step)%size; - - /* where from are we sending and where from are we receiving actual data ? */ - tmpsend = (char*)sendbuf+sendto*datatype->size*sendcount; - tmprecv = (char*)recvbuf+recvfrom*recvdatatype->size*recvcount; - - /* send and receive */ - /* in OpenMPI, they use : - err = ompi_coll_tuned_sendrecv( tmpsend, scount, sdtype, sendto, MCA_COLL_BASE_TAG_ALLTOALL, - tmprecv, rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_ALLTOALL, - comm, MPI_STATUS_IGNORE, rank); - */ - retval = MPI_Sendrecv(tmpsend, sendcount, datatype, sendto, tag_alltoall, - tmprecv, recvcount, recvdatatype, recvfrom, tag_alltoall, - comm, MPI_STATUS_IGNORE); - } - return(retval); -} - -/** - * helper: copy a datatype into another (in the simple case dt1=dt2) -**/ -int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype, void *rbuf, int rcount, const MPI_Datatype rdtype); -int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype, - void *rbuf, int rcount, const MPI_Datatype rdtype) -{ - /* First check if we really have something to do */ - if (0 == rcount) { - return ((0 == scount) ? MPI_SUCCESS : MPI_ERR_TRUNCATE); - } - /* If same datatypes used, just copy. */ - if (sdtype == rdtype) { - int count = ( scount < rcount ? scount : rcount ); - memcpy( rbuf, sbuf, sdtype->size*count); - return ((scount > rcount) ? MPI_ERR_TRUNCATE : MPI_SUCCESS); - } - /* FIXME: cases - * - If receive packed. - * - If send packed - * to be treated once we have the MPI_Pack things ... - **/ - return( MPI_SUCCESS ); +int smpi_coll_tuned_alltoall_bruck(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { + DEBUG0("coll:tuned:alltoall_intra_bruck ** NOT IMPLEMENTED YET**"); + return MPI_SUCCESS; } /** * Alltoall basic_linear **/ -int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype sdtype, - void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm) -{ - int i; - int system_alltoall_tag = 888; - int rank; - int size = comm->size; - int err; - char *psnd; - char *prcv; - int nreq = 0; - MPI_Aint lb; - MPI_Aint sndinc; - MPI_Aint rcvinc; - MPI_Request *reqs; - - /* Initialize. */ - rank = smpi_mpi_comm_rank(comm); - DEBUG1("<%d> algorithm alltoall_basic_linear() called.",rank); - - err = smpi_mpi_type_get_extent(sdtype, &lb, &sndinc); - err = smpi_mpi_type_get_extent(rdtype, &lb, &rcvinc); - sndinc *= scount; - rcvinc *= rcount; - /* simple optimization */ - psnd = ((char *) sbuf) + (rank * sndinc); - prcv = ((char *) rbuf) + (rank * rcvinc); - - err = copy_dt( psnd, scount, sdtype, prcv, rcount, rdtype ); - if (MPI_SUCCESS != err) { - return err; - } - - /* If only one process, we're done. */ - if (1 == size) { - return MPI_SUCCESS; - } - - /* Initiate all send/recv to/from others. */ - reqs = xbt_malloc(2*(size-1) * sizeof(smpi_mpi_request_t)); - - prcv = (char *) rbuf; - psnd = (char *) sbuf; - - /* Post all receives first -- a simple optimization */ - for (i = (rank + 1) % size; i != rank; i = (i + 1) % size) { - err = smpi_create_request( prcv + (i * rcvinc), rcount, rdtype, - i, rank, - system_alltoall_tag, - comm, &(reqs[nreq])); - if (MPI_SUCCESS != err) { - DEBUG2("<%d> failed to create request for rank %d",rank,i); - for (i=0;i< nreq;i++) - xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); - return err; - } - nreq++; - } - /* Now post all sends in reverse order - * - We would like to minimize the search time through message queue - * when messages actually arrive in the order in which they were posted. - * */ - for (i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size ) { - err = smpi_create_request (psnd + (i * sndinc), scount, sdtype, - rank, i, - system_alltoall_tag, - comm, &(reqs[nreq])); - if (MPI_SUCCESS != err) { - DEBUG2("<%d> failed to create request for rank %d\n",rank,i); - for (i=0;i< nreq;i++) - xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); - return err; - } - nreq++; - } - - /* Start your engines. This will never return an error. */ - for ( i=0; i< nreq/2; i++ ) { - DEBUG3("<%d> issued irecv request reqs[%d]=%p",rank,i,reqs[i]); - smpi_mpi_irecv( reqs[i] ); - } - for ( i= nreq/2; i issued isend request reqs[%d]=%p",rank,i,reqs[i]); - smpi_mpi_isend( reqs[i] ); - } - - - /* Wait for them all. If there's an error, note that we don't - * care what the error was -- just that there *was* an error. The - * PML will finish all requests, even if one or more of them fail. - * i.e., by the end of this call, all the requests are free-able. - * So free them anyway -- even if there was an error, and return - * the error after we free everything. */ - - DEBUG2("<%d> wait for %d requests",rank,nreq); - // waitall is buggy: use a loop instead for the moment - // err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE); - for (i=0;irequest_mallocator, reqs[i]); - } - xbt_free( reqs ); - return err; +int smpi_coll_tuned_alltoall_basic_linear(void *sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { + int system_tag = 888; + int i, rank, size, err, count; + MPI_Aint lb, sendinc, recvinc; + MPI_Request *requests; + + /* Initialize. */ + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + DEBUG1("<%d> algorithm alltoall_basic_linear() called.", rank); + err = smpi_datatype_extent(sendtype, &lb, &sendinc); + err = smpi_datatype_extent(recvtype, &lb, &recvinc); + sendinc *= sendcount; + recvinc *= recvcount; + /* simple optimization */ + err = smpi_datatype_copy(&((char*)sendbuf)[rank * sendinc], sendcount, sendtype, &((char*)recvbuf)[rank * recvinc], recvcount, recvtype); + if(err == MPI_SUCCESS && size > 1) { + /* Initiate all send/recv to/from others. */ + requests = xbt_new(MPI_Request, 2 * (size - 1)); + /* Post all receives first -- a simple optimization */ + count = 0; + for(i = (rank + 1) % size; i != rank; i = (i + 1) % size) { + requests[count] = smpi_mpi_irecv(&((char*)recvbuf)[i * recvinc], recvcount, recvtype, i, system_tag, comm); + count++; + } + /* Now post all sends in reverse order + * - We would like to minimize the search time through message queue + * when messages actually arrive in the order in which they were posted. + * TODO: check the previous assertion + */ + for(i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size ) { + requests[count] = smpi_mpi_isend(&((char*)sendbuf)[i * sendinc], sendcount, sendtype, i, system_tag, comm); + count++; + } + /* Wait for them all. If there's an error, note that we don't + * care what the error was -- just that there *was* an error. The + * PML will finish all requests, even if one or more of them fail. + * i.e., by the end of this call, all the requests are free-able. + * So free them anyway -- even if there was an error, and return + * the error after we free everything. + */ + DEBUG2("<%d> wait for %d requests", rank, count); + smpi_mpi_waitall(count, requests, MPI_STATUS_IGNORE); + xbt_free(requests); + } + return err; } - /** - * Alltoall Bruck + * Alltoall pairwise * - * Openmpi calls this routine when the message size sent to each rank < 2000 bytes and size < 12 + * this algorithm performs size steps (1<=s<=size) and + * at each step s, a process p sends iand receive to.from a unique distinct remote process + * size=5 : s=1: 4->0->1, 0->1->2, 1->2->3, ... + * s=2: 3->0->2, 4->1->3, 0->2->4, 1->3->0 , 2->4->1 + * .... + * Openmpi calls this routine when the message size sent to each rank is greater than 3000 bytes **/ - - -int smpi_coll_tuned_alltoall_bruck(void *sendbuf, int sendcount, MPI_Datatype sdtype, - void* recvbuf, int recvcount, MPI_Datatype rdtype, MPI_Comm comm) -{ -/* int size = comm->size; - int i, k, line = -1; - int sendto, recvfrom, distance, *displs=NULL, *blen=NULL; - int maxpacksize, packsize, position; - char * tmpbuf=NULL, *packbuf=NULL; - ptrdiff_t lb, sext, rext; - int err = 0; - int weallocated = 0; - MPI_Datatype iddt; - - rank = smpi_mpi_comm_rank(comm); -*/ - INFO0("coll:tuned:alltoall_intra_bruck ** NOT IMPLEMENTED YET**"); -/* - displs = xbt_malloc(size*sizeof(int)); - blen = xbt_malloc(size*sizeof(int)); - - weallocated = 1; -*/ - /* Prepare for packing data */ -/* - err = MPI_Pack_size( scount*size, sdtype, comm, &maxpacksize ); - if (err != MPI_SUCCESS) { } -*/ - /* pack buffer allocation */ -/* packbuf = (char*) malloc((unsigned) maxpacksize); - if (packbuf == NULL) { } -*/ - /* tmp buffer allocation for message data */ -/* tmpbuf = xbt_malloc(scount*size*sext); - if (tmpbuf == NULL) { } -*/ - - /* Step 1 - local rotation - shift up by rank */ -/* err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) ((size-rank)*scount), - tmpbuf, ((char*)sbuf)+rank*scount*sext); - if (err<0) { - line = __LINE__; err = -1; goto err_hndl; - } - - if (rank != 0) { - err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) (rank*scount), - tmpbuf+(size-rank)*scount*sext, (char*)sbuf); - if (err<0) { - line = __LINE__; err = -1; goto err_hndl; - } - } -*/ - /* perform communication step */ -/* for (distance = 1; distance < size; distance<<=1) { -*/ - /* send data to "sendto" */ -/* sendto = (rank+distance)%size; - recvfrom = (rank-distance+size)%size; - packsize = 0; - k = 0; -*/ - /* create indexed datatype */ -// for (i = 1; i < size; i++) { -// if ((i&distance) == distance) { -// displs[k] = i*scount; blen[k] = scount; -// k++; -// } -// } - /* Set indexes and displacements */ -// err = MPI_Type_indexed(k, blen, displs, sdtype, &iddt); -// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } -// /* Commit the new datatype */ -/// err = MPI_Type_commit(&iddt); -// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } - - /* have the new distribution ddt, pack and exchange data */ -// err = MPI_Pack(tmpbuf, 1, iddt, packbuf, maxpacksize, &packsize, comm); -// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } - - /* Sendreceive */ -// err = ompi_coll_tuned_sendrecv ( packbuf, packsize, MPI_PACKED, sendto, -// MCA_COLL_BASE_TAG_ALLTOALL, -// rbuf, packsize, MPI_PACKED, recvfrom, -// MCA_COLL_BASE_TAG_ALLTOALL, -// comm, MPI_STATUS_IGNORE, rank); -// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } - - /* Unpack data from rbuf to tmpbuf */ -// position = 0; -// err = MPI_Unpack(rbuf, packsize, &position, -// tmpbuf, 1, iddt, comm); -// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } - - /* free ddt */ -// err = MPI_Type_free(&iddt); -// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } -// } /* end of for (distance = 1... */ - - /* Step 3 - local rotation - */ -// for (i = 0; i < size; i++) { - -// err = ompi_ddt_copy_content_same_ddt (rdtype, (int32_t) rcount, -// ((char*)rbuf)+(((rank-i+size)%size)*rcount*rext), -// tmpbuf+i*rcount*rext); -// -// if (err<0) { -// line = __LINE__; err = -1; goto err_hndl; -// } -// } - - /* Step 4 - clean up */ -/* if (tmpbuf != NULL) free(tmpbuf); - if (packbuf != NULL) free(packbuf); - if (weallocated) { - if (displs != NULL) free(displs); - if (blen != NULL) free(blen); - } - return OMPI_SUCCESS; - -err_hndl: - OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank)); - if (tmpbuf != NULL) free(tmpbuf); - if (packbuf != NULL) free(packbuf); - if (weallocated) { - if (displs != NULL) free(displs); - if (blen != NULL) free(blen); - } - return err; - */ - return -1; /* FIXME: to be changed*/ -} - -static void print_buffer_int(void *buf, int len, char *msg, int rank); -static void print_buffer_int(void *buf, int len, char *msg, int rank) -{ - int tmp, *v; - fprintf(stderr,"**<%d> %s (#%d): ", rank, msg,len); - for (tmp = 0; tmp < len; tmp++) { - v = buf; - fprintf(stderr,"[%d (%p)]", v[tmp],v+tmp); +int smpi_coll_tuned_alltoall_pairwise(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { + int system_tag = 999; + int rank, size, step, sendto, recvfrom, sendsize, recvsize; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + DEBUG1("<%d> algorithm alltoall_pairwise() called.", rank); + sendsize = smpi_datatype_size(sendtype); + recvsize = smpi_datatype_size(recvtype); + /* Perform pairwise exchange - starting from 1 so the local copy is last */ + for(step = 1; step < size + 1; step++) { + /* who do we talk to in this step? */ + sendto = (rank+step)%size; + recvfrom = (rank+size-step)%size; + /* send and receive */ + smpi_mpi_sendrecv(&((char*)sendbuf)[sendto * sendsize * sendcount], sendcount, sendtype, sendto, system_tag, &((char*)recvbuf)[recvfrom * recvsize * recvcount], recvcount, recvtype, recvfrom, system_tag, comm, MPI_STATUS_IGNORE); } - fprintf(stderr,"\n"); - free(msg); -} - - - -/** - * alltoallv basic - **/ - -int smpi_coll_basic_alltoallv(void *sbuf, int *scounts, int *sdisps, MPI_Datatype sdtype, - void *rbuf, int *rcounts, int *rdisps, MPI_Datatype rdtype, - MPI_Comm comm) { - - int i; - int system_alltoallv_tag = 889; - int rank; - int size = comm->size; - int err; - char *psnd; - char *prcv; - //int nreq = 0; - int rreq = 0; - int sreq = 0; - MPI_Aint lb; - MPI_Aint sndextent; - MPI_Aint rcvextent; - MPI_Request *reqs; - - /* Initialize. */ - rank = smpi_mpi_comm_rank(comm); - DEBUG1("<%d> algorithm basic_alltoallv() called.",rank); - - err = smpi_mpi_type_get_extent(sdtype, &lb, &sndextent); - err = smpi_mpi_type_get_extent(rdtype, &lb, &rcvextent); - - psnd = (char *)sbuf; - //print_buffer_int(psnd,size*size,xbt_strdup("sbuff"),rank); - - /* copy the local sbuf to rbuf when it's me */ - psnd = ((char *) sbuf) + (sdisps[rank] * sndextent); - prcv = ((char *) rbuf) + (rdisps[rank] * rcvextent); - - if (0 != scounts[rank]) { - err = copy_dt( psnd, scounts[rank], sdtype, prcv, rcounts[rank], rdtype ); - if (MPI_SUCCESS != err) { - return err; - } - } - - /* If only one process, we're done. */ - if (1 == size) { - return MPI_SUCCESS; - } - - /* Initiate all send/recv to/from others. */ - reqs = xbt_malloc(2*(size-1) * sizeof(smpi_mpi_request_t)); - - - /* Create all receives that will be posted first */ - for (i = 0; i < size; ++i) { - if (i == rank || 0 == rcounts[i]) { - DEBUG3("<%d> skip req creation i=%d,rcounts[i]=%d",rank,i, rcounts[i]); - continue; - } - prcv = ((char *) rbuf) + (rdisps[i] * rcvextent); - - err = smpi_create_request( prcv, rcounts[i], rdtype, - i, rank, - system_alltoallv_tag, - comm, &(reqs[rreq])); - if (MPI_SUCCESS != err) { - DEBUG2("<%d> failed to create request for rank %d",rank,i); - for (i=0;i< rreq;i++) - xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); - return err; - } - rreq++; - } - DEBUG2("<%d> %d irecv reqs created",rank,rreq); - /* Now create all sends */ - for (i = 0; i < size; ++i) { - if (i == rank || 0 == scounts[i]) { - DEBUG3("<%d> skip req creation i=%d,scounts[i]=%d",rank,i, scounts[i]); - continue; - } - psnd = ((char *) sbuf) + (sdisps[i] * sndextent); - - //fprintf(stderr,"<%d> send %d elems to <%d>\n",rank,scounts[i],i); - //print_buffer_int(psnd,scounts[i],xbt_strdup("sbuff part"),rank); - err = smpi_create_request (psnd, scounts[i], sdtype, - rank, i, - system_alltoallv_tag, - comm, &(reqs[rreq+sreq])); - if (MPI_SUCCESS != err) { - DEBUG2("<%d> failed to create request for rank %d\n",rank,i); - for (i=0;i< rreq+sreq;i++) - xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); - return err; - } - sreq++; - } - DEBUG2("<%d> %d isend reqs created",rank,sreq); - - /* Start your engines. This will never return an error. */ - for ( i=0; i< rreq; i++ ) { - DEBUG3("<%d> issued irecv request reqs[%d]=%p",rank,i,reqs[i]); - smpi_mpi_irecv( reqs[i] ); - } - DEBUG3("<%d> for (i=%d;i<%d)",rank,rreq,sreq); - for ( i=rreq; i< rreq+sreq; i++ ) { - DEBUG3("<%d> issued isend request reqs[%d]=%p",rank,i,reqs[i]); - smpi_mpi_isend( reqs[i] ); - } - - - /* Wait for them all. If there's an error, note that we don't - * care what the error was -- just that there *was* an error. The - * PML will finish all requests, even if one or more of them fail. - * i.e., by the end of this call, all the requests are free-able. - * So free them anyway -- even if there was an error, and return - * the error after we free everything. */ - - DEBUG2("<%d> wait for %d requests",rank,rreq+sreq); - // waitall is buggy: use a loop instead for the moment - // err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE); - for (i=0;i< rreq+sreq;i++) { - err = smpi_mpi_wait( reqs[i], MPI_STATUS_IGNORE); - } - - /* Free the reqs */ - /* nreq might be < 2*(size-1) since some request creations are skipped */ - for (i=0;i< rreq+sreq;i++) { - xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); - } - xbt_free( reqs ); - return err; + return MPI_SUCCESS; } - - - -/** - * ----------------------------------------------------------------------------------------------------- - * example usage - * ----------------------------------------------------------------------------------------------------- - **/ -/* - * int main() { - - int rank; - int size=12; - - proc_tree_t tree; - for (rank=0;rank algorithm basic_alltoallv() called.", rank); + err = smpi_datatype_extent(sendtype, &lb, &sendextent); + err = smpi_datatype_extent(recvtype, &lb, &recvextent); + /* Local copy from self */ + err = smpi_datatype_copy(&((char*)sendbuf)[senddisps[rank] * sendextent], sendcounts[rank], sendtype, &((char*)recvbuf)[recvdisps[rank] * recvextent], recvcounts[rank], recvtype); + if(err == MPI_SUCCESS && size > 1) { + /* Initiate all send/recv to/from others. */ + requests = xbt_new(MPI_Request, 2 * (size - 1)); + rcount = 0; + /* Create all receives that will be posted first */ + for(i = 0; i < size; ++i) { + if(i == rank || recvcounts[i] == 0) { + DEBUG3("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]); + continue; } - printf("-------------- bcast ----------\n"); - for (rank=0;rank %d irecv requests created", rank, rcount); + scount = rcount; + /* Now create all sends */ + for(i = 0; i < size; ++i) { + if(i == rank || sendcounts[i] == 0) { + DEBUG3("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]); + continue; } - - + requests[scount] = smpi_mpi_isend(&((char*)sendbuf)[senddisps[i] * sendextent], sendcounts[i], sendtype, i, system_tag, comm); + scount++; + } + DEBUG2("<%d> %d isend requests created", rank, scount); + /* Wait for them all. If there's an error, note that we don't + * care what the error was -- just that there *was* an error. The + * PML will finish all requests, even if one or more of them fail. + * i.e., by the end of this call, all the requests are free-able. + * So free them anyway -- even if there was an error, and return + * the error after we free everything. + */ + DEBUG2("<%d> wait for %d requests", rank, rcount + scount); + smpi_mpi_waitall(rcount + scount, requests, MPI_STATUS_IGNORE); + xbt_free(requests); + } + return err; } -*/ - - - diff --git a/src/smpi/smpi_coll_private.h b/src/smpi/smpi_coll_private.h index 31a0bd7b06..f062c7c684 100644 --- a/src/smpi/smpi_coll_private.h +++ b/src/smpi/smpi_coll_private.h @@ -7,23 +7,3 @@ * **/ #include "private.h" - -int nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm,int arity); -int nary_tree_barrier( MPI_Comm comm, int arity ); - -int smpi_coll_tuned_alltoall_bruck(void *sbuf, int scount, MPI_Datatype sdtype, - void* rbuf, int rcount, MPI_Datatype rdtype, - MPI_Comm comm); - -int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatype datatype, - void* recvbuf, int recvcount, MPI_Datatype recvdatatype, - MPI_Comm comm); - -int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype sdtype, - void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm); - -int smpi_coll_basic_alltoallv(void *sendbuf, int *scounts, int *sdisps, MPI_Datatype datatype, - void *recvbuf, int *rcounts, int *rdisps, MPI_Datatype recvtype, - MPI_Comm comm); - - diff --git a/src/smpi/smpi_comm.c b/src/smpi/smpi_comm.c new file mode 100644 index 0000000000..59aa85d390 --- /dev/null +++ b/src/smpi/smpi_comm.c @@ -0,0 +1,34 @@ +#include "private.h" + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi, + "Logging specific to SMPI (comm)"); + +typedef struct s_smpi_mpi_communicator { + MPI_Group group; +} s_smpi_mpi_communicator_t; + +MPI_Comm smpi_comm_new(MPI_Group group) { + MPI_Comm comm; + + comm = xbt_new(s_smpi_mpi_communicator_t, 1); + comm->group = group; + smpi_group_use(comm->group); + return comm; +} + +void smpi_comm_destroy(MPI_Comm comm) { + smpi_group_destroy(comm->group); + xbt_free(comm); +} + +MPI_Group smpi_comm_group(MPI_Comm comm) { + return comm->group; +} + +int smpi_comm_size(MPI_Comm comm) { + return smpi_group_size(smpi_comm_group(comm)); +} + +int smpi_comm_rank(MPI_Comm comm) { + return smpi_group_rank(smpi_comm_group(comm), smpi_process_index()); +} diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index 4f60f00bf9..66a1a1a839 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -1,4 +1,4 @@ -#include +#include #include "private.h" #include "smpi_mpi_dt_private.h" @@ -8,338 +8,150 @@ XBT_LOG_NEW_CATEGORY(smpi, "All SMPI categories"); XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_kernel, smpi, "Logging specific to SMPI (kernel)"); -smpi_global_t smpi_global = NULL; +typedef struct s_smpi_process_data { + int index; + xbt_fifo_t pending_sent; + xbt_fifo_t pending_recv; + xbt_os_timer_t timer; +} s_smpi_process_data_t; -void *smpi_request_new(void); +static smpi_process_data_t* process_data = NULL; +static int process_count = 0; -void *smpi_request_new() -{ - smpi_mpi_request_t request = xbt_new(s_smpi_mpi_request_t, 1); - - request->buf = NULL; - request->completed = 0; - request->consumed = 0; - request->mutex = SIMIX_mutex_init(); - request->cond = SIMIX_cond_init(); - request->data = NULL; - request->forward = 0; +MPI_Comm MPI_COMM_WORLD = MPI_COMM_NULL; - return request; +smpi_process_data_t smpi_process_data(void) { + return SIMIX_process_get_data(SIMIX_process_self()); } -void smpi_request_free(void *pointer); - -void smpi_request_free(void *pointer) -{ - - smpi_mpi_request_t request = pointer; - - SIMIX_cond_destroy(request->cond); - SIMIX_mutex_destroy(request->mutex); - xbt_free(request); - - return; +smpi_process_data_t smpi_process_remote_data(int index) { + return process_data[index]; } -void smpi_request_reset(void *pointer); - -void smpi_request_reset(void *pointer) -{ - smpi_mpi_request_t request = pointer; - - request->buf = NULL; - request->completed = 0; - request->consumed = 0; - request->data = NULL; - request->forward = 0; - - return; +int smpi_process_count(void) { + return process_count; } +int smpi_process_index(void) { + smpi_process_data_t data = smpi_process_data(); -void *smpi_message_new(void); - -void *smpi_message_new() -{ - smpi_received_message_t message = xbt_new(s_smpi_received_message_t, 1); - message->buf = NULL; - return message; + return data->index; } -void smpi_message_free(void *pointer); +xbt_os_timer_t smpi_process_timer(void) { + smpi_process_data_t data = smpi_process_data(); -void smpi_message_free(void *pointer) -{ - xbt_free(pointer); - return; + return data->timer; } -void smpi_message_reset(void *pointer); - -void smpi_message_reset(void *pointer) -{ - smpi_received_message_t message = pointer; - message->buf = NULL; - return; -} +void smpi_process_post_send(MPI_Comm comm, MPI_Request request) { + int index = smpi_group_index(smpi_comm_group(comm), request->dst); + smpi_process_data_t data = smpi_process_remote_data(index); + xbt_fifo_item_t item; + MPI_Request req; -int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype, - int src, int dst, int tag, - smpi_mpi_communicator_t comm, - smpi_mpi_request_t * requestptr) -{ - int retval = MPI_SUCCESS; - - smpi_mpi_request_t request = NULL; - - // parameter checking prob belongs in smpi_mpi, but this is less repeat code - if (NULL == buf) { - retval = MPI_ERR_INTERN; - } else if (0 > count) { - retval = MPI_ERR_COUNT; - } else if (NULL == datatype) { - retval = MPI_ERR_TYPE; - } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) { - retval = MPI_ERR_RANK; - } else if (0 > dst || comm->size <= dst) { - retval = MPI_ERR_RANK; - } else if (MPI_ANY_TAG != tag && 0 > tag) { - retval = MPI_ERR_TAG; - } else if (NULL == comm) { - retval = MPI_ERR_COMM; - } else if (NULL == requestptr) { - retval = MPI_ERR_ARG; - } else { - request = xbt_mallocator_get(smpi_global->request_mallocator); - request->comm = comm; - request->src = src; - request->dst = dst; - request->tag = tag; - request->buf = buf; - request->datatype = datatype; - request->count = count; - - *requestptr = request; + DEBUG4("isend for request %p [src = %d, dst = %d, tag = %d]", + request, request->src, request->dst, request->tag); + xbt_fifo_foreach(data->pending_recv, item, req, MPI_Request) { + if(req->comm == request->comm + && (req->src == MPI_ANY_SOURCE || req->src == request->src) + && (req->tag == MPI_ANY_TAG || req->tag == request->tag)){ + DEBUG4("find matching request %p [src = %d, dst = %d, tag = %d]", + req, req->src, req->dst, req->tag); + xbt_fifo_remove_item(data->pending_recv, item); + /* Materialize the *_ANY_* fields from corresponding irecv request */ + req->src = request->src; + req->tag = request->tag; + request->rdv = req->rdv; + return; + } else { + DEBUG4("not matching request %p [src = %d, dst = %d, tag = %d]", + req, req->src, req->dst, req->tag); + } } - return retval; -} - -/* FIXME: understand what they do and put the prototypes in a header file (live in smpi_base.c) */ -void smpi_mpi_land_func(void *a, void *b, int *length, - MPI_Datatype * datatype); -void smpi_mpi_sum_func(void *a, void *b, int *length, - MPI_Datatype * datatype); -void smpi_mpi_prod_func(void *a, void *b, int *length, - MPI_Datatype * datatype); -void smpi_mpi_min_func(void *a, void *b, int *length, - MPI_Datatype * datatype); -void smpi_mpi_max_func(void *a, void *b, int *length, - MPI_Datatype * datatype); - -void smpi_init() -{ - smpi_global = xbt_new(s_smpi_global_t, 1); + request->rdv = SIMIX_rdv_create(NULL); + xbt_fifo_push(data->pending_sent, request); +} + +void smpi_process_post_recv(MPI_Request request) { + smpi_process_data_t data = smpi_process_data(); + xbt_fifo_item_t item; + MPI_Request req; + + DEBUG4("irecv for request %p [src = %d, dst = %d, tag = %d]", + request, request->src, request->dst, request->tag); + xbt_fifo_foreach(data->pending_sent, item, req, MPI_Request) { + if(req->comm == request->comm + && (request->src == MPI_ANY_SOURCE || req->src == request->src) + && (request->tag == MPI_ANY_TAG || req->tag == request->tag)){ + DEBUG4("find matching request %p [src = %d, dst = %d, tag = %d]", + req, req->src, req->dst, req->tag); + xbt_fifo_remove_item(data->pending_sent, item); + /* Materialize the *_ANY_* fields from the irecv request */ + request->src = req->src; + request->tag = req->tag; + request->rdv = req->rdv; + return; + } else { + DEBUG4("not matching request %p [src = %d, dst = %d, tag = %d]", + req, req->src, req->dst, req->tag); + } + } + request->rdv = SIMIX_rdv_create(NULL); + xbt_fifo_push(data->pending_recv, request); } -void smpi_global_init() -{ +void smpi_global_init(void) { int i; - - /* Connect our log channels: that must be done manually under windows */ -#ifdef XBT_LOG_CONNECT - XBT_LOG_CONNECT(smpi_base, smpi); - XBT_LOG_CONNECT(smpi_bench, smpi); - XBT_LOG_CONNECT(smpi_kernel, smpi); - XBT_LOG_CONNECT(smpi_mpi, smpi); - XBT_LOG_CONNECT(smpi_receiver, smpi); - XBT_LOG_CONNECT(smpi_sender, smpi); - XBT_LOG_CONNECT(smpi_util, smpi); -#endif - - // config vars - smpi_global->reference_speed = - xbt_cfg_get_double(_surf_cfg_set, "reference_speed"); - - // mallocators - smpi_global->request_mallocator = - xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_request_new, - smpi_request_free, smpi_request_reset); - smpi_global->message_mallocator = - xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_message_new, - smpi_message_free, smpi_message_reset); - - smpi_global->process_count = SIMIX_process_count(); - DEBUG1("There is %d processes", smpi_global->process_count); - - // sender/receiver processes - smpi_global->main_processes = - xbt_new(smx_process_t, smpi_global->process_count); - - // timers - smpi_global->timer = xbt_os_timer_new(); - smpi_global->timer_cond = SIMIX_cond_init(); - - smpi_global->do_once_duration_nodes = NULL; - smpi_global->do_once_duration = NULL; - smpi_global->do_once_mutex = SIMIX_mutex_init(); - - - smpi_mpi_global = xbt_new(s_smpi_mpi_global_t, 1); - - // global communicator - smpi_mpi_global->mpi_comm_world = xbt_new(s_smpi_mpi_communicator_t, 1); - smpi_mpi_global->mpi_comm_world->size = smpi_global->process_count; - smpi_mpi_global->mpi_comm_world->barrier_count = 0; - smpi_mpi_global->mpi_comm_world->barrier_mutex = SIMIX_mutex_init(); - smpi_mpi_global->mpi_comm_world->barrier_cond = SIMIX_cond_init(); - smpi_mpi_global->mpi_comm_world->rank_to_index_map = - xbt_new(int, smpi_global->process_count); - smpi_mpi_global->mpi_comm_world->index_to_rank_map = - xbt_new(int, smpi_global->process_count); - for (i = 0; i < smpi_global->process_count; i++) { - smpi_mpi_global->mpi_comm_world->rank_to_index_map[i] = i; - smpi_mpi_global->mpi_comm_world->index_to_rank_map[i] = i; + MPI_Group group; + + process_count = SIMIX_process_count(); + process_data = xbt_new(smpi_process_data_t, process_count); + for(i = 0; i < process_count; i++) { + process_data[i] = xbt_new(s_smpi_process_data_t, 1); + process_data[i]->index = i; + process_data[i]->pending_sent = xbt_fifo_new(); + process_data[i]->pending_recv = xbt_fifo_new(); + process_data[i]->timer = xbt_os_timer_new(); } - - // mpi datatypes - smpi_mpi_global->mpi_byte = xbt_new(s_smpi_mpi_datatype_t, 1); /* we can think of it as a placeholder for value*/ - smpi_mpi_global->mpi_byte->size = (size_t) 1; - smpi_mpi_global->mpi_byte->lb = (ptrdiff_t) 0; - smpi_mpi_global->mpi_byte->ub = smpi_mpi_global->mpi_byte->lb + smpi_mpi_global->mpi_byte->size; - smpi_mpi_global->mpi_byte->flags = DT_FLAG_BASIC; - - smpi_mpi_global->mpi_char = xbt_new(s_smpi_mpi_datatype_t, 1); - smpi_mpi_global->mpi_char->size = (size_t) 1; - smpi_mpi_global->mpi_char->lb = (ptrdiff_t) 0; //&(smpi_mpi_global->mpi_char); - smpi_mpi_global->mpi_char->ub = smpi_mpi_global->mpi_char->lb + smpi_mpi_global->mpi_char->size; - smpi_mpi_global->mpi_char->flags = DT_FLAG_BASIC; - - smpi_mpi_global->mpi_int = xbt_new(s_smpi_mpi_datatype_t, 1); - smpi_mpi_global->mpi_int->size = sizeof(int); - smpi_mpi_global->mpi_int->lb = (ptrdiff_t) 0; // &(smpi_mpi_global->mpi_int); - smpi_mpi_global->mpi_int->ub = smpi_mpi_global->mpi_int->lb + smpi_mpi_global->mpi_int->size; - smpi_mpi_global->mpi_int->flags = DT_FLAG_BASIC; - - smpi_mpi_global->mpi_float = xbt_new(s_smpi_mpi_datatype_t, 1); - smpi_mpi_global->mpi_float->size = sizeof(float); - smpi_mpi_global->mpi_float->lb = (ptrdiff_t) 0; // &(smpi_mpi_global->mpi_float); - smpi_mpi_global->mpi_float->ub = smpi_mpi_global->mpi_float->lb + smpi_mpi_global->mpi_float->size; - smpi_mpi_global->mpi_float->flags = DT_FLAG_BASIC; - - smpi_mpi_global->mpi_double = xbt_new(s_smpi_mpi_datatype_t, 1); - smpi_mpi_global->mpi_double->size = sizeof(double); - smpi_mpi_global->mpi_double->lb = (ptrdiff_t) 0; //&(smpi_mpi_global->mpi_float); - smpi_mpi_global->mpi_double->ub = smpi_mpi_global->mpi_double->lb + smpi_mpi_global->mpi_double->size; - smpi_mpi_global->mpi_double->flags = DT_FLAG_BASIC; - - // mpi operations - smpi_mpi_global->mpi_land = xbt_new(s_smpi_mpi_op_t, 1); - smpi_mpi_global->mpi_land->func = smpi_mpi_land_func; - smpi_mpi_global->mpi_sum = xbt_new(s_smpi_mpi_op_t, 1); - smpi_mpi_global->mpi_sum->func = smpi_mpi_sum_func; - smpi_mpi_global->mpi_prod = xbt_new(s_smpi_mpi_op_t, 1); - smpi_mpi_global->mpi_prod->func = smpi_mpi_prod_func; - smpi_mpi_global->mpi_min = xbt_new(s_smpi_mpi_op_t, 1); - smpi_mpi_global->mpi_min->func = smpi_mpi_min_func; - smpi_mpi_global->mpi_max = xbt_new(s_smpi_mpi_op_t, 1); - smpi_mpi_global->mpi_max->func = smpi_mpi_max_func; - -} - -void smpi_global_destroy() -{ - smpi_do_once_duration_node_t curr, next; - - // processes - xbt_free(smpi_global->main_processes); - - // mallocators - xbt_mallocator_free(smpi_global->request_mallocator); - xbt_mallocator_free(smpi_global->message_mallocator); - - xbt_os_timer_free(smpi_global->timer); - SIMIX_cond_destroy(smpi_global->timer_cond); - - for (curr = smpi_global->do_once_duration_nodes; NULL != curr; curr = next) { - next = curr->next; - xbt_free(curr->file); - xbt_free(curr); + group = smpi_group_new(process_count); + MPI_COMM_WORLD = smpi_comm_new(group); + for(i = 0; i < process_count; i++) { + smpi_group_set_mapping(group, i, i); } - - SIMIX_mutex_destroy(smpi_global->do_once_mutex); - - xbt_free(smpi_global); - smpi_global = NULL; - - /* free smpi_mpi_global */ - SIMIX_mutex_destroy(smpi_mpi_global->mpi_comm_world->barrier_mutex); - SIMIX_cond_destroy(smpi_mpi_global->mpi_comm_world->barrier_cond); - xbt_free(smpi_mpi_global->mpi_comm_world->rank_to_index_map); - xbt_free(smpi_mpi_global->mpi_comm_world); - - xbt_free(smpi_mpi_global->mpi_byte); - xbt_free(smpi_mpi_global->mpi_char); - xbt_free(smpi_mpi_global->mpi_int); - xbt_free(smpi_mpi_global->mpi_double); - xbt_free(smpi_mpi_global->mpi_float); - - xbt_free(smpi_mpi_global->mpi_land); - xbt_free(smpi_mpi_global->mpi_sum); - xbt_free(smpi_mpi_global->mpi_prod); - xbt_free(smpi_mpi_global->mpi_max); - xbt_free(smpi_mpi_global->mpi_min); - - xbt_free(smpi_mpi_global); - } -int smpi_process_index() -{ - smpi_process_data_t pdata = - (smpi_process_data_t) SIMIX_process_get_data(SIMIX_process_self()); - return pdata->index; -} - -smx_mutex_t smpi_process_mutex() -{ - smpi_process_data_t pdata = - (smpi_process_data_t) SIMIX_process_get_data(SIMIX_process_self()); - return pdata->mutex; -} - -smx_cond_t smpi_process_cond() -{ - smpi_process_data_t pdata = - (smpi_process_data_t) SIMIX_process_get_data(SIMIX_process_self()); - return pdata->cond; -} +void smpi_global_destroy(void) { + int count = smpi_process_count(); + int i; -static void smpi_cfg_cb_host_speed(const char *name, int pos) -{ - smpi_global->reference_speed = - xbt_cfg_get_double_at(_surf_cfg_set, name, pos); + smpi_comm_destroy(MPI_COMM_WORLD); + MPI_COMM_WORLD = MPI_COMM_NULL; + for(i = 0; i < count; i++) { + xbt_os_timer_free(process_data[i]->timer); + xbt_fifo_free(process_data[i]->pending_recv); + xbt_fifo_free(process_data[i]->pending_sent); + xbt_free(process_data[i]); + } + xbt_free(process_data); + process_data = NULL; } -int smpi_run_simulation(int *argc, char **argv) +int main(int argc, char **argv) { srand(SMPI_RAND_SEED); double default_reference_speed = 20000.0; xbt_cfg_register(&_surf_cfg_set, "reference_speed", "Power of the host running the simulation (in flop/s). Used to bench the operations.", - xbt_cfgelm_double, &default_reference_speed, 1, 1, - smpi_cfg_cb_host_speed, NULL); + xbt_cfgelm_double, &default_reference_speed, 1, 1, NULL, NULL); int default_display_timing = 0; xbt_cfg_register(&_surf_cfg_set, "display_timing", "Boolean indicating whether we should display the timing after simulation.", xbt_cfgelm_int, &default_display_timing, 1, 1, NULL, NULL); - // Allocate minimal things before parsing command line arguments - smpi_init(); - - SIMIX_global_init(argc, argv); - + SIMIX_global_init(&argc, argv); // parse the platform file: get the host list SIMIX_create_environment(argv[1]); @@ -347,7 +159,6 @@ int smpi_run_simulation(int *argc, char **argv) SIMIX_function_register("smpi_simulated_main", smpi_simulated_main); SIMIX_launch_application(argv[2]); - // must initialize globals between creating environment and launching app.... smpi_global_init(); /* Clean IO before the run */ @@ -357,18 +168,11 @@ int smpi_run_simulation(int *argc, char **argv) while (SIMIX_solve(NULL, NULL) != -1.0); - // FIXME: cleanup incomplete - if (xbt_cfg_get_int(_surf_cfg_set, "display_timing")) INFO1("simulation time %g", SIMIX_get_clock()); smpi_global_destroy(); - SIMIX_clean(); + SIMIX_clean(); return 0; } - -int main(int argc, char** argv) -{ - return smpi_run_simulation(&argc, argv); -} diff --git a/src/smpi/smpi_group.c b/src/smpi/smpi_group.c new file mode 100644 index 0000000000..0d7b4f8a0f --- /dev/null +++ b/src/smpi/smpi_group.c @@ -0,0 +1,109 @@ +#include "private.h" + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_group, smpi, + "Logging specific to SMPI (group)"); + +typedef struct s_smpi_mpi_group { + int size; + int* rank_to_index_map; + int* index_to_rank_map; + int refcount; +} s_smpi_mpi_group_t; + +static s_smpi_mpi_group_t mpi_MPI_GROUP_EMPTY = { + 0, /* size */ + NULL, /* rank_to_index_map */ + NULL, /* index_to_rank_map */ + 1, /* refcount: start > 0 so that this group never gets freed */ +}; +MPI_Group MPI_GROUP_EMPTY = &mpi_MPI_GROUP_EMPTY; + +MPI_Group smpi_group_new(int size) { + MPI_Group group; + int i, count; + + count = smpi_process_count(); + group = xbt_new(s_smpi_mpi_group_t, 1); + group->size = size; + group->rank_to_index_map = xbt_new(int, size); + group->index_to_rank_map = xbt_new(int, count); + group->refcount = 0; + for(i = 0; i < size; i++) { + group->rank_to_index_map[i] = MPI_UNDEFINED; + } + for(i = 0; i < count; i++) { + group->index_to_rank_map[i] = MPI_UNDEFINED; + } + return group; +} + +void smpi_group_destroy(MPI_Group group) { + if(smpi_group_unuse(group) <= 0) { + xbt_free(group->rank_to_index_map); + xbt_free(group->index_to_rank_map); + xbt_free(group); + } +} + +void smpi_group_set_mapping(MPI_Group group, int index, int rank) { + if(rank < group->size && index < smpi_process_count()) { + group->rank_to_index_map[rank] = index; + group->index_to_rank_map[index] = rank; + } +} + +int smpi_group_index(MPI_Group group, int rank) { + int index = MPI_UNDEFINED; + + if(rank < group->size) { + index = group->rank_to_index_map[rank]; + } + return index; +} + +int smpi_group_rank(MPI_Group group, int index) { + int rank = MPI_UNDEFINED; + + if(index < smpi_process_count()) { + rank = group->index_to_rank_map[index]; + } + return rank; +} + +int smpi_group_use(MPI_Group group) { + group->refcount++; + return group->refcount; +} + +int smpi_group_unuse(MPI_Group group) { + group->refcount--; + return group->refcount; +} + +int smpi_group_size(MPI_Group group) { + return group->size; +} + +int smpi_group_compare(MPI_Group group1, MPI_Group group2) { + int result; + int i, index, rank, size; + + result = MPI_IDENT; + if(smpi_group_size(group1) != smpi_group_size(group2)) { + result = MPI_UNEQUAL; + } else { + size = smpi_group_size(group2); + for(i = 0; i < size; i++) { + index = smpi_group_index(group1, i); + rank = smpi_group_rank(group2, index); + if(rank == MPI_UNDEFINED) { + result = MPI_UNEQUAL; + break; + } + if(rank != i) { + result = MPI_SIMILAR; + } + } + } + return result; +} diff --git a/src/smpi/smpi_mpi.c b/src/smpi/smpi_mpi.c index 20ff68f150..5436c0a4cc 100644 --- a/src/smpi/smpi_mpi.c +++ b/src/smpi/smpi_mpi.c @@ -1,16 +1,5 @@ /* $Id$tag */ -/* smpi_mpi.c -- - * - * Eventually will contain the user level MPI primitives and its corresponding - * internal wrapper. The implementations of these primitives should go to specific - * files. For example, a SMPI_MPI_Bcast() in this file, should call the wrapper - * smpi_mpi_bcast(), which decides which implementation to call. Currently, it - * calls nary_tree_bcast() in smpi_coll.c. (Stéphane Genaud). - * */ - - - #include "private.h" #include "smpi_coll_private.h" #include "smpi_mpi_dt_private.h" @@ -18,990 +7,1019 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi, smpi, "Logging specific to SMPI (mpi)"); -int SMPI_MPI_Init(int *argc, char ***argv) -{ +/* MPI User level calls */ + +int MPI_Init(int* argc, char*** argv) { smpi_process_init(argc, argv); smpi_bench_begin(); return MPI_SUCCESS; } -int SMPI_MPI_Finalize() -{ +int MPI_Finalize(void) { smpi_bench_end(); - smpi_process_finalize(); + smpi_process_destroy(); return MPI_SUCCESS; } -// right now this just exits the current node, should send abort signal to all -// hosts in the communicator (TODO) -int SMPI_MPI_Abort(MPI_Comm comm, int errorcode) -{ - smpi_exit(errorcode); - return 0; +int MPI_Init_thread(int* argc, char*** argv, int required, int* provided) { + if(provided != NULL) { + *provided = MPI_THREAD_MULTIPLE; + } + return MPI_Init(argc, argv); } -int SMPI_MPI_Comm_size(MPI_Comm comm, int *size) -{ - int retval = MPI_SUCCESS; +int MPI_Query_thread(int* provided) { + int retval; smpi_bench_end(); - - if (NULL == comm) { - retval = MPI_ERR_COMM; - } else if (NULL == size) { + if(provided == NULL) { retval = MPI_ERR_ARG; } else { - *size = comm->size; + *provided = MPI_THREAD_MULTIPLE; + retval = MPI_SUCCESS; } - smpi_bench_begin(); - return retval; } -int SMPI_MPI_Comm_rank(MPI_Comm comm, int *rank) -{ - int retval = MPI_SUCCESS; +int MPI_Is_thread_main(int* flag) { + int retval; smpi_bench_end(); - - if (NULL == comm) { - retval = MPI_ERR_COMM; - } else if (NULL == rank) { + if(flag == NULL) { retval = MPI_ERR_ARG; } else { - *rank = smpi_mpi_comm_rank(comm); + *flag = smpi_process_index() == 0; + retval = MPI_SUCCESS; } - smpi_bench_begin(); - return retval; } +int MPI_Abort(MPI_Comm comm, int errorcode) { + smpi_bench_end(); + smpi_process_destroy(); + // FIXME: should kill all processes in comm instead + SIMIX_process_kill(SIMIX_process_self()); + return MPI_SUCCESS; +} - -/** - * Barrier - **/ -int SMPI_MPI_Barrier(MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int arity=4; +double MPI_Wtime(void) { + double time; smpi_bench_end(); + time = SIMIX_get_clock(); + smpi_bench_begin(); + return time; +} - if (NULL == comm) { - retval = MPI_ERR_COMM; - } else { +int MPI_Type_size(MPI_Datatype datatype, size_t* size) { + int retval; - /* - * original implemantation: - * retval = smpi_mpi_barrier(comm); - * this one is unrealistic: it just cond_waits, means no time. - */ - retval = nary_tree_barrier( comm, arity ); + smpi_bench_end(); + if(datatype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(size == NULL) { + retval = MPI_ERR_ARG; + } else { + *size = smpi_datatype_size(datatype); + retval = MPI_SUCCESS; } - smpi_bench_begin(); - return retval; } +int MPI_Type_get_extent(MPI_Datatype datatype, MPI_Aint* lb, MPI_Aint* extent) { + int retval; + smpi_bench_end(); + if(datatype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(lb == NULL || extent == NULL) { + retval = MPI_ERR_ARG; + } else { + retval = smpi_datatype_extent(datatype, lb, extent); + } + smpi_bench_begin(); + return retval; +} -int SMPI_MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int src, - int tag, MPI_Comm comm, MPI_Request * request) -{ - int retval = MPI_SUCCESS; - int rank; +int MPI_Type_lb(MPI_Datatype datatype, MPI_Aint* disp) { + int retval; smpi_bench_end(); - rank = smpi_mpi_comm_rank(comm); - retval = smpi_create_request(buf, count, datatype, src, rank, tag, comm, - request); - if (NULL != *request && MPI_SUCCESS == retval) { - retval = smpi_mpi_irecv(*request); + if(datatype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(disp == NULL) { + retval = MPI_ERR_ARG; + } else { + *disp = smpi_datatype_lb(datatype); + retval = MPI_SUCCESS; } - smpi_bench_begin(); - return retval; } -int SMPI_MPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, - int tag, MPI_Comm comm, MPI_Status * status) -{ - int retval = MPI_SUCCESS; - int rank; - smpi_mpi_request_t request; +int MPI_Type_ub(MPI_Datatype datatype, MPI_Aint* disp) { + int retval; smpi_bench_end(); - - rank = smpi_mpi_comm_rank(comm); - retval = smpi_create_request(buf, count, datatype, src, rank, tag, comm, - &request); - if (NULL != request && MPI_SUCCESS == retval) { - retval = smpi_mpi_irecv(request); - if (MPI_SUCCESS == retval) { - retval = smpi_mpi_wait(request, status); - } - xbt_mallocator_release(smpi_global->request_mallocator, request); + if(datatype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(disp == NULL) { + retval = MPI_ERR_ARG; + } else { + *disp = smpi_datatype_ub(datatype); + retval = MPI_SUCCESS; } - smpi_bench_begin(); - return retval; } -int SMPI_MPI_Isend(void *buf, int count, MPI_Datatype datatype, int dst, - int tag, MPI_Comm comm, MPI_Request * request) -{ - int retval = MPI_SUCCESS; - int rank; +int MPI_Op_create(MPI_User_function* function, int commute, MPI_Op* op) { + int retval; smpi_bench_end(); - - rank = smpi_mpi_comm_rank(comm); - retval = smpi_create_request(buf, count, datatype, rank, dst, tag, comm, - request); - if (NULL != *request && MPI_SUCCESS == retval) { - retval = smpi_mpi_isend(*request); + if(function == NULL || op == NULL) { + retval = MPI_ERR_ARG; + } else { + *op = smpi_op_new(function, commute); + retval = MPI_SUCCESS; } - smpi_bench_begin(); - return retval; } -/** - * MPI_Send user level - **/ -int SMPI_MPI_Send(void *buf, int count, MPI_Datatype datatype, int dst, - int tag, MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int rank; - smpi_mpi_request_t request; - - smpi_bench_end(); - - rank = smpi_mpi_comm_rank(comm); - retval = smpi_create_request(buf, count, datatype, rank, dst, tag, comm, - &request); - if (NULL != request && MPI_SUCCESS == retval) { - retval = smpi_mpi_isend(request); - if (MPI_SUCCESS == retval) { - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - } - xbt_mallocator_release(smpi_global->request_mallocator, request); - } +int MPI_Op_free(MPI_Op* op) { + int retval; + smpi_bench_end(); + if(op == NULL) { + retval = MPI_ERR_ARG; + } else if(*op == MPI_OP_NULL) { + retval = MPI_ERR_OP; + } else { + smpi_op_destroy(*op); + *op = MPI_OP_NULL; + retval = MPI_SUCCESS; + } smpi_bench_begin(); - return retval; } +int MPI_Group_free(MPI_Group *group) { + int retval; -/** - * MPI_Sendrecv internal level - **/ -int smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, - void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, - MPI_Comm comm, MPI_Status *status) -{ -int rank; -int retval = MPI_SUCCESS; -smpi_mpi_request_t srequest; -smpi_mpi_request_t rrequest; - - rank = smpi_mpi_comm_rank(comm); - - /* send */ - retval = smpi_create_request(sendbuf, sendcount, sendtype, - rank,dest,sendtag, - comm, &srequest); - smpi_mpi_isend(srequest); - - /* recv */ - retval = smpi_create_request(recvbuf, recvcount, recvtype, - source, rank,recvtag, - comm, &rrequest); - smpi_mpi_irecv(rrequest); - - smpi_mpi_wait(srequest, MPI_STATUS_IGNORE); - //printf("[%d] isend request src=%d dst=%d tag=%d COMPLETED (retval=%d) \n",rank,rank,dest,sendtag,retval); - smpi_mpi_wait(rrequest, MPI_STATUS_IGNORE); - //printf("[%d] irecv request src=%d -> dst=%d tag=%d COMPLETED (retval=%d)\n",rank,source,rank,recvtag,retval); - - return(retval); + smpi_bench_end(); + if(group == NULL) { + retval = MPI_ERR_ARG; + } else { + smpi_group_destroy(*group); + *group = MPI_GROUP_NULL; + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; } -/** - * MPI_Sendrecv user entry point - **/ -int SMPI_MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, - void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, - MPI_Comm comm, MPI_Status *status) -{ -int retval = MPI_SUCCESS; + +int MPI_Group_size(MPI_Group group, int* size) { + int retval; smpi_bench_end(); - smpi_mpi_sendrecv( sendbuf, sendcount, sendtype, dest, sendtag, - recvbuf, recvcount, recvtype, source, recvtag, - comm, status); + if(group == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(size == NULL) { + retval = MPI_ERR_ARG; + } else { + *size = smpi_group_size(group); + retval = MPI_SUCCESS; + } smpi_bench_begin(); - return retval; - - } -/** - * MPI_Wait and friends - **/ -int SMPI_MPI_Wait(MPI_Request * request, MPI_Status * status) -{ +int MPI_Group_rank(MPI_Group group, int* rank) { int retval; smpi_bench_end(); - retval = smpi_mpi_wait(*request, status); + if(group == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(rank == NULL) { + retval = MPI_ERR_ARG; + } else { + *rank = smpi_group_rank(group, smpi_process_index()); + retval = MPI_SUCCESS; + } smpi_bench_begin(); return retval; } -int SMPI_MPI_Waitall(int count, MPI_Request requests[], MPI_Status status[]) -{ - int retval; +int MPI_Group_translate_ranks (MPI_Group group1, int n, int* ranks1, MPI_Group group2, int* ranks2) { + int retval, i, index; smpi_bench_end(); - retval = smpi_mpi_waitall(count, requests, status); + if(group1 == MPI_GROUP_NULL || group2 == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else { + for(i = 0; i < n; i++) { + index = smpi_group_index(group1, ranks1[i]); + ranks2[i] = smpi_group_rank(group2, index); + } + retval = MPI_SUCCESS; + } smpi_bench_begin(); return retval; } -int SMPI_MPI_Waitany(int count, MPI_Request requests[], int *index, - MPI_Status status[]) -{ +int MPI_Group_compare(MPI_Group group1, MPI_Group group2, int* result) { int retval; smpi_bench_end(); - retval = smpi_mpi_waitany(count, requests, index, status); + if(group1 == MPI_GROUP_NULL || group2 == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(result == NULL) { + retval = MPI_ERR_ARG; + } else { + *result = smpi_group_compare(group1, group2); + retval = MPI_SUCCESS; + } smpi_bench_begin(); return retval; } -/** - * MPI_Bcast - **/ - -/** - * flat bcast - **/ -int flat_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm); -int flat_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, - MPI_Comm comm) -{ - int rank; - int retval = MPI_SUCCESS; - smpi_mpi_request_t request; - - rank = smpi_mpi_comm_rank(comm); - if (rank == root) { - retval = smpi_create_request(buf, count, datatype, root, - (root + 1) % comm->size, 0, comm, &request); - request->forward = comm->size - 1; - smpi_mpi_isend(request); - } else { - retval = smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank, - 0, comm, &request); - smpi_mpi_irecv(request); - } +int MPI_Group_union(MPI_Group group1, MPI_Group group2, MPI_Group* newgroup) { + int retval, i, proc1, proc2, size, size2; - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, request); + smpi_bench_end(); + if(group1 == MPI_GROUP_NULL || group2 == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(newgroup == NULL) { + retval = MPI_ERR_ARG; + } else { + size = smpi_group_size(group1); + size2 = smpi_group_size(group2); + for(i = 0; i < size2; i++) { + proc2 = smpi_group_index(group2, i); + proc1 = smpi_group_rank(group1, proc2); + if(proc1 == MPI_UNDEFINED) { + size++; + } + } + if(size == 0) { + *newgroup = MPI_GROUP_EMPTY; + } else { + *newgroup = smpi_group_new(size); + size2 = smpi_group_size(group1); + for(i = 0; i < size2; i++) { + proc1 = smpi_group_index(group1, i); + smpi_group_set_mapping(*newgroup, proc1, i); + } + for(i = size2; i < size; i++) { + proc2 = smpi_group_index(group2, i - size2); + smpi_group_set_mapping(*newgroup, proc2, i); + } + } + smpi_group_use(*newgroup); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - return(retval); +int MPI_Group_intersection(MPI_Group group1, MPI_Group group2, MPI_Group* newgroup) { + int retval, i, proc1, proc2, size, size2; + smpi_bench_end(); + if(group1 == MPI_GROUP_NULL || group2 == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(newgroup == NULL) { + retval = MPI_ERR_ARG; + } else { + size = smpi_group_size(group1); + size2 = smpi_group_size(group2); + for(i = 0; i < size2; i++) { + proc2 = smpi_group_index(group2, i); + proc1 = smpi_group_rank(group1, proc2); + if(proc1 == MPI_UNDEFINED) { + size--; + } + } + if(size == 0) { + *newgroup = MPI_GROUP_EMPTY; + } else { + *newgroup = smpi_group_new(size); + size2 = smpi_group_size(group1); + for(i = 0; i < size2; i++) { + proc1 = smpi_group_index(group1, i); + proc2 = smpi_group_rank(group2, proc1); + if(proc2 != MPI_UNDEFINED) { + smpi_group_set_mapping(*newgroup, proc1, i); + } + } + } + smpi_group_use(*newgroup); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; } -/** - * Bcast internal level - **/ -int smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, - MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int rank = smpi_mpi_comm_rank(comm); - DEBUG1("<%d> entered smpi_mpi_bcast(). Calls nary_tree_bcast()",rank); - //retval = flat_tree_bcast(buf, count, datatype, root, comm); - retval = nary_tree_bcast(buf, count, datatype, root, comm, 2 ); +int MPI_Group_difference(MPI_Group group1, MPI_Group group2, MPI_Group* newgroup) { + int retval, i, proc1, proc2, size, size2; + + smpi_bench_end(); + if(group1 == MPI_GROUP_NULL || group2 == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(newgroup == NULL) { + retval = MPI_ERR_ARG; + } else { + size = size2 = smpi_group_size(group1); + for(i = 0; i < size2; i++) { + proc1 = smpi_group_index(group1, i); + proc2 = smpi_group_rank(group2, proc1); + if(proc2 != MPI_UNDEFINED) { + size--; + } + } + if(size == 0) { + *newgroup = MPI_GROUP_EMPTY; + } else { + *newgroup = smpi_group_new(size); + for(i = 0; i < size2; i++) { + proc1 = smpi_group_index(group1, i); + proc2 = smpi_group_rank(group2, proc1); + if(proc2 == MPI_UNDEFINED) { + smpi_group_set_mapping(*newgroup, proc1, i); + } + } + } + smpi_group_use(*newgroup); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); return retval; } -/** - * Bcast user entry point - **/ -int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, - MPI_Comm comm) -{ - int retval = MPI_SUCCESS; +int MPI_Group_incl(MPI_Group group, int n, int* ranks, MPI_Group* newgroup) { + int retval, i, index; smpi_bench_end(); - smpi_mpi_bcast(buf,count,datatype,root,comm); + if(group == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(newgroup == NULL) { + retval = MPI_ERR_ARG; + } else { + if(n == 0) { + *newgroup = MPI_GROUP_EMPTY; + } else if(n == smpi_group_size(group)) { + *newgroup = group; + } else { + *newgroup = smpi_group_new(n); + for(i = 0; i < n; i++) { + index = smpi_group_index(group, ranks[i]); + smpi_group_set_mapping(*newgroup, index, i); + } + } + smpi_group_use(*newgroup); + retval = MPI_SUCCESS; + } smpi_bench_begin(); - return retval; } +int MPI_Group_excl(MPI_Group group, int n, int* ranks, MPI_Group* newgroup) { + int retval, i, size, rank, index; - -#ifdef DEBUG_REDUCE -/** - * debugging helper function - **/ -static void print_buffer_int(void *buf, int len, char *msg, int rank) -{ - int tmp, *v; - printf("**[%d] %s: ", rank, msg); - for (tmp = 0; tmp < len; tmp++) { - v = buf; - printf("[%d]", v[tmp]); + smpi_bench_end(); + if(group == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(newgroup == NULL) { + retval = MPI_ERR_ARG; + } else { + if(n == 0) { + *newgroup = group; + } else if(n == smpi_group_size(group)) { + *newgroup = MPI_GROUP_EMPTY; + } else { + size = smpi_group_size(group) - n; + *newgroup = smpi_group_new(size); + rank = 0; + while(rank < size) { + for(i = 0; i < n; i++) { + if(ranks[i] == rank) { + break; + } + } + if(i >= n) { + index = smpi_group_index(group, rank); + smpi_group_set_mapping(*newgroup, index, rank); + rank++; + } + } + } + smpi_group_use(*newgroup); + retval = MPI_SUCCESS; } - printf("\n"); - free(msg); + smpi_bench_begin(); + return retval; } -static void print_buffer_double(void *buf, int len, char *msg, int rank) -{ - int tmp; - double *v; - printf("**[%d] %s: ", rank, msg); - for (tmp = 0; tmp < len; tmp++) { - v = buf; - printf("[%lf]", v[tmp]); + +int MPI_Group_range_incl(MPI_Group group, int n, int ranges[][3], MPI_Group* newgroup) { + int retval, i, j, rank, size, index; + + smpi_bench_end(); + if(group == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(newgroup == NULL) { + retval = MPI_ERR_ARG; + } else { + if(n == 0) { + *newgroup = MPI_GROUP_EMPTY; + } else { + size = 0; + for(i = 0; i < n; i++) { + for(rank = ranges[i][0]; /* First */ + rank >= 0 && rank <= ranges[i][1]; /* Last */ + rank += ranges[i][2] /* Stride */) { + size++; + } + } + if(size == smpi_group_size(group)) { + *newgroup = group; + } else { + *newgroup = smpi_group_new(size); + j = 0; + for(i = 0; i < n; i++) { + for(rank = ranges[i][0]; /* First */ + rank >= 0 && rank <= ranges[i][1]; /* Last */ + rank += ranges[i][2] /* Stride */) { + index = smpi_group_index(group, rank); + smpi_group_set_mapping(*newgroup, index, j); + j++; + } + } + } + } + smpi_group_use(*newgroup); + retval = MPI_SUCCESS; } - printf("\n"); - free(msg); + smpi_bench_begin(); + return retval; } +int MPI_Group_range_excl(MPI_Group group, int n, int ranges[][3], MPI_Group* newgroup) { + int retval, i, newrank, rank, size, index, add; -#endif -/** - * MPI_Reduce internal level - **/ -int smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, - MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int rank; - int size; - int i; - int system_tag = 666; - smpi_mpi_request_t *requests; - smpi_mpi_request_t request; - - smpi_bench_end(); - - rank = smpi_mpi_comm_rank(comm); - size = comm->size; - DEBUG1("<%d> entered smpi_mpi_reduce()",rank); - - if (rank != root) { // if i am not ROOT, simply send my buffer to root - -#ifdef DEBUG_REDUCE - print_buffer_int(sendbuf, count, xbt_strdup("sndbuf"), rank); -#endif - retval = smpi_create_request(sendbuf, count, datatype, rank, root, system_tag, comm, - &request); - smpi_mpi_isend(request); - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, request); - - } else { - // i am the ROOT: wait for all buffers by creating one request by sender - int src; - requests = xbt_malloc((size-1) * sizeof(smpi_mpi_request_t)); - - void **tmpbufs = xbt_malloc((size-1) * sizeof(void *)); - for (i = 0; i < size-1; i++) { - // we need 1 buffer per request to store intermediate receptions - tmpbufs[i] = xbt_malloc(count * datatype->size); - } - // root: initiliaze recv buf with my own snd buf - memcpy(recvbuf, sendbuf, count * datatype->size * sizeof(char)); - - // i can not use: 'request->forward = size-1;' (which would progagate size-1 receive reqs) - // since we should op values as soon as one receiving request matches. - for (i = 0; i < size-1; i++) { - // reminder: for smpi_create_request() the src is always the process sending. - src = i < root ? i : i + 1; - retval = smpi_create_request(tmpbufs[i], count, datatype, - src, root, system_tag, comm, &(requests[i])); - if (NULL != requests[i] && MPI_SUCCESS == retval) { - if (MPI_SUCCESS == retval) { - smpi_mpi_irecv(requests[i]); - } - } - } - // now, wait for completion of all irecv's. - for (i = 0; i < size-1; i++) { - int index = MPI_UNDEFINED; - smpi_mpi_waitany( size-1, requests, &index, MPI_STATUS_IGNORE); - DEBUG3("<%d> waitany() unblocked by reception (completes request[%d]) (%d reqs remaining)", - rank,index,size-i-2); -#ifdef DEBUG_REDUCE - print_buffer_int(tmpbufs[index], count, bprintf("tmpbufs[index=%d] (value received)", index), - rank); -#endif - - // arg 2 is modified - op->func(tmpbufs[index], recvbuf, &count, &datatype); -#ifdef DEBUG_REDUCE - print_buffer_int(recvbuf, count, xbt_strdup("rcvbuf"), rank); -#endif - xbt_free(tmpbufs[index]); - /* FIXME: with the following line, it generates an - * [xbt_ex/CRITICAL] Conditional list not empty 162518800. - * Fixed ? - */ - xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); - } - xbt_free(requests); - xbt_free(tmpbufs); + smpi_bench_end(); + if(group == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(newgroup == NULL) { + retval = MPI_ERR_ARG; + } else { + if(n == 0) { + *newgroup = group; + } else { + size = smpi_group_size(group); + for(i = 0; i < n; i++) { + for(rank = ranges[i][0]; /* First */ + rank >= 0 && rank <= ranges[i][1]; /* Last */ + rank += ranges[i][2] /* Stride */) { + size--; + } + } + if(size == 0) { + *newgroup = MPI_GROUP_EMPTY; + } else { + *newgroup = smpi_group_new(size); + newrank = 0; + while(newrank < size) { + for(i = 0; i < n; i++) { + add = 1; + for(rank = ranges[i][0]; /* First */ + rank >= 0 && rank <= ranges[i][1]; /* Last */ + rank += ranges[i][2] /* Stride */) { + if(rank == newrank) { + add = 0; + break; + } + } + if(add == 1) { + index = smpi_group_index(group, newrank); + smpi_group_set_mapping(*newgroup, index, newrank); + } + } } - return retval; + } + } + smpi_group_use(*newgroup); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; } -/** - * MPI_Reduce user entry point - **/ -int SMPI_MPI_Reduce(void *sendbuf, void *recvbuf, int count, - MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) -{ -int retval = MPI_SUCCESS; +int MPI_Comm_rank(MPI_Comm comm, int* rank) { + int retval; - smpi_bench_end(); + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else { + *rank = smpi_comm_rank(comm); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - retval = smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, root, comm); +int MPI_Comm_size(MPI_Comm comm, int* size) { + int retval; - smpi_bench_begin(); - return retval; + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(size == NULL) { + retval = MPI_ERR_ARG; + } else { + *size = smpi_comm_size(comm); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; } +int MPI_Comm_group(MPI_Comm comm, MPI_Group* group) { + int retval; + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(group == NULL) { + retval = MPI_ERR_ARG; + } else { + *group = smpi_comm_group(comm); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} -/** - * MPI_Allreduce - * - * Same as MPI_Reduce except that the result appears in the receive buffer of all the group members. - **/ -int SMPI_MPI_Allreduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, - MPI_Op op, MPI_Comm comm ) -{ -int retval = MPI_SUCCESS; -int root=0; // arbitrary choice - - smpi_bench_end(); +int MPI_Comm_compare(MPI_Comm comm1, MPI_Comm comm2, int* result) { + int retval; - retval = smpi_mpi_reduce( sendbuf, recvbuf, count, datatype, op, root, comm); - if (MPI_SUCCESS != retval) - return(retval); + smpi_bench_end(); + if(comm1 == MPI_COMM_NULL || comm2 == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(result == NULL) { + retval = MPI_ERR_ARG; + } else { + if(comm1 == comm2) { /* Same communicators means same groups */ + *result = MPI_IDENT; + } else { + *result = smpi_group_compare(smpi_comm_group(comm1), smpi_comm_group(comm2)); + if(*result == MPI_IDENT) { + *result = MPI_CONGRUENT; + } + } + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - retval = smpi_mpi_bcast( sendbuf, count, datatype, root, comm); +int MPI_Comm_dup(MPI_Comm comm, MPI_Comm* newcomm) { + int retval; - smpi_bench_end(); - return( retval ); + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(newcomm == NULL) { + retval = MPI_ERR_ARG; + } else { + *newcomm = smpi_comm_new(smpi_comm_group(comm)); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; } +int MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm* newcomm) { + int retval; -/** - * MPI_Scatter user entry point - **/ -int SMPI_MPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype datatype, - void *recvbuf, int recvcount, MPI_Datatype recvtype, - int root, MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int i; - int cnt=0; - int rank; - int tag=0; - char *cptr; // to manipulate the void * buffers - smpi_mpi_request_t *requests; - smpi_mpi_request_t request; - smpi_mpi_status_t status; - - - smpi_bench_end(); - - rank = smpi_mpi_comm_rank(comm); - - requests = xbt_malloc((comm->size-1) * sizeof(smpi_mpi_request_t)); - if (rank == root) { - // i am the root: distribute my sendbuf - //print_buffer_int(sendbuf, comm->size, xbt_strdup("rcvbuf"), rank); - cptr = sendbuf; - for (i=0; i < comm->size; i++) { - if ( i!=root ) { // send to processes ... - - retval = smpi_create_request((void *)cptr, sendcount, - datatype, root, i, tag, comm, &(requests[cnt])); - if (NULL != requests[cnt] && MPI_SUCCESS == retval) { - if (MPI_SUCCESS == retval) { - smpi_mpi_isend(requests[cnt]); - } - } - cnt++; - } - else { // ... except if it's me. - memcpy(recvbuf, (void *)cptr, recvcount*recvtype->size*sizeof(char)); - } - cptr += sendcount*datatype->size; - } - for(i=0; irequest_mallocator, requests[i]); - - } - } - else { // i am a non-root process: wait data from the root - retval = smpi_create_request(recvbuf,recvcount, - recvtype, root, rank, tag, comm, &request); - if (NULL != request && MPI_SUCCESS == retval) { - if (MPI_SUCCESS == retval) { - smpi_mpi_irecv(request); - } - } - smpi_mpi_wait(request, &status); - xbt_mallocator_release(smpi_global->request_mallocator, request); - } - xbt_free(requests); - + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(group == MPI_GROUP_NULL) { + retval = MPI_ERR_GROUP; + } else if(newcomm == NULL) { + retval = MPI_ERR_ARG; + } else { + *newcomm = smpi_comm_new(group); + retval = MPI_SUCCESS; + } smpi_bench_begin(); - return retval; } - -/** - * MPI_Alltoall user entry point - * - * Uses the logic of OpenMPI (upto 1.2.7 or greater) for the optimizations - * ompi/mca/coll/tuned/coll_tuned_module.c - **/ -int SMPI_MPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype datatype, - void *recvbuf, int recvcount, MPI_Datatype recvtype, - MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int block_dsize; - int rank; +int MPI_Comm_free(MPI_Comm* comm) { + int retval; smpi_bench_end(); + if(comm == NULL) { + retval = MPI_ERR_ARG; + } else if(*comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else { + smpi_comm_destroy(*comm); + *comm = MPI_COMM_NULL; + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - rank = smpi_mpi_comm_rank(comm); - block_dsize = datatype->size * sendcount; - DEBUG2("<%d> optimized alltoall() called. Block size sent to each rank: %d bytes.",rank,block_dsize); - - if ((block_dsize < 200) && (comm->size > 12)) { - retval = smpi_coll_tuned_alltoall_bruck(sendbuf, sendcount, datatype, - recvbuf, recvcount, recvtype, comm); +int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request* request) { + int retval; - } else if (block_dsize < 3000) { - retval = smpi_coll_tuned_alltoall_basic_linear(sendbuf, sendcount, datatype, - recvbuf, recvcount, recvtype, comm); + smpi_bench_end(); + if(request == NULL) { + retval = MPI_ERR_ARG; } else { - - retval = smpi_coll_tuned_alltoall_pairwise(sendbuf, sendcount, datatype, - recvbuf, recvcount, recvtype, comm); + *request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm); + retval = MPI_SUCCESS; } - smpi_bench_begin(); - return retval; } -/** - * MPI_Alltoallv user entry point - * - * As in OpenMPI, alltoallv is not optimized - * ompi/mca/coll/basic/coll_basic_alltoallv.c - **/ -int SMPI_MPI_Alltoallv(void *sendbuf, int *scounts, int *sdisps, MPI_Datatype datatype, - void *recvbuf, int *rcounts, int *rdisps, MPI_Datatype recvtype, - MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int rank; +int MPI_Isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm, MPI_Request* request) { + int retval; smpi_bench_end(); - rank = smpi_mpi_comm_rank(comm); - DEBUG1("<%d> basic alltoallv() called.",rank); - - retval = smpi_coll_basic_alltoallv(sendbuf, scounts, sdisps, datatype, - recvbuf, rcounts, rdisps, recvtype, - comm); + if(request == NULL) { + retval = MPI_ERR_ARG; + } else { + *request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm); + retval = MPI_SUCCESS; + } smpi_bench_begin(); return retval; } +int MPI_Recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) { + smpi_bench_end(); + smpi_mpi_recv(buf, count, datatype, src, tag, comm, status); + smpi_bench_begin(); + return MPI_SUCCESS; +} + +int MPI_Send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { + smpi_bench_end(); + smpi_mpi_send(buf, count, datatype, dst, tag, comm); + smpi_bench_begin(); + return MPI_SUCCESS; +} +int MPI_Sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) { + smpi_bench_end(); + smpi_mpi_sendrecv(sendbuf, sendcount, sendtype, dst, sendtag, recvbuf, recvcount, recvtype, src, recvtag, comm, status); + smpi_bench_begin(); + return MPI_SUCCESS; +} +int MPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype, int dst, int sendtag, int src, int recvtag, MPI_Comm comm, MPI_Status* status) { + //TODO: suboptimal implementation + void* recvbuf; + int retval, size; -// used by comm_split to sort ranks based on key values -int smpi_compare_rankkeys(const void *a, const void *b); -int smpi_compare_rankkeys(const void *a, const void *b) -{ - int *x = (int *) a; - int *y = (int *) b; + size = smpi_datatype_size(datatype) * count; + recvbuf = xbt_new(char, size); + retval = MPI_Sendrecv(buf, count, datatype, dst, sendtag, recvbuf, count, datatype, src, recvtag, comm, status); + memcpy(buf, recvbuf, size * sizeof(char)); + xbt_free(recvbuf); + return retval; +} - if (x[1] < y[1]) - return -1; +int MPI_Test(MPI_Request* request, int* flag, MPI_Status* status) { + int retval; - if (x[1] == y[1]) { - if (x[0] < y[0]) - return -1; - if (x[0] == y[0]) - return 0; - return 1; + smpi_bench_end(); + if(request == NULL || flag == NULL) { + retval = MPI_ERR_ARG; + } else if(*request == MPI_REQUEST_NULL) { + retval = MPI_ERR_REQUEST; + } else { + *flag = smpi_mpi_test(request, status); + retval = MPI_SUCCESS; } - - return 1; + smpi_bench_begin(); + return retval; } -int SMPI_MPI_Comm_split(MPI_Comm comm, int color, int key, - MPI_Comm * comm_out) -{ - int retval = MPI_SUCCESS; +int MPI_Testany(int count, MPI_Request requests[], int* index, int* flag, MPI_Status* status) { + int retval; + + smpi_bench_end(); + if(index == NULL || flag == NULL) { + retval = MPI_ERR_ARG; + } else { + *flag = smpi_mpi_testany(count, requests, index, status); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - int index, rank; - smpi_mpi_request_t request; - int colorkey[2]; - smpi_mpi_status_t status; +int MPI_Wait(MPI_Request* request, MPI_Status* status) { + int retval; smpi_bench_end(); + if(request == NULL) { + retval = MPI_ERR_ARG; + } else if(*request == MPI_REQUEST_NULL) { + retval = MPI_ERR_REQUEST; + } else { + smpi_mpi_wait(request, status); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - // FIXME: need to test parameters +int MPI_Waitany(int count, MPI_Request requests[], int* index, MPI_Status* status) { + int retval; - index = smpi_process_index(); - rank = comm->index_to_rank_map[index]; + smpi_bench_end(); + if(index == NULL) { + retval = MPI_ERR_ARG; + } else { + *index = smpi_mpi_waitany(count, requests, status); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - // default output - comm_out = NULL; +int MPI_Waitall(int count, MPI_Request requests[], MPI_Status status[]) { + smpi_bench_end(); + smpi_mpi_waitall(count, requests, status); + smpi_bench_begin(); + return MPI_SUCCESS; +} - // root node does most of the real work - if (0 == rank) { - int colormap[comm->size]; - int keymap[comm->size]; - int rankkeymap[comm->size * 2]; - int i, j; - smpi_mpi_communicator_t tempcomm = NULL; - int count; - int indextmp; +int MPI_Waitsome(int incount, MPI_Request requests[], int* outcount, int* indices, MPI_Status status[]) { + int retval; - colormap[0] = color; - keymap[0] = key; + smpi_bench_end(); + if(outcount == NULL || indices == NULL) { + retval = MPI_ERR_ARG; + } else { + *outcount = smpi_mpi_waitsome(incount, requests, indices, status); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - // FIXME: use scatter/gather or similar instead of individual comms - for (i = 1; i < comm->size; i++) { - retval = smpi_create_request(colorkey, 2, MPI_INT, MPI_ANY_SOURCE, - rank, MPI_ANY_TAG, comm, &request); - smpi_mpi_irecv(request); - smpi_mpi_wait(request, &status); - colormap[status.MPI_SOURCE] = colorkey[0]; - keymap[status.MPI_SOURCE] = colorkey[1]; - xbt_mallocator_release(smpi_global->request_mallocator, request); - } +int MPI_Bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) { + int retval; - for (i = 0; i < comm->size; i++) { - if (MPI_UNDEFINED == colormap[i]) { - continue; - } - // make a list of nodes with current color and sort by keys - count = 0; - for (j = i; j < comm->size; j++) { - if (colormap[i] == colormap[j]) { - colormap[j] = MPI_UNDEFINED; - rankkeymap[count * 2] = j; - rankkeymap[count * 2 + 1] = keymap[j]; - count++; - } - } - qsort(rankkeymap, count, sizeof(int) * 2, &smpi_compare_rankkeys); - - // new communicator - tempcomm = xbt_new(s_smpi_mpi_communicator_t, 1); - tempcomm->barrier_count = 0; - tempcomm->size = count; - tempcomm->barrier_mutex = SIMIX_mutex_init(); - tempcomm->barrier_cond = SIMIX_cond_init(); - tempcomm->rank_to_index_map = xbt_new(int, count); - tempcomm->index_to_rank_map = xbt_new(int, smpi_global->process_count); - for (j = 0; j < smpi_global->process_count; j++) { - tempcomm->index_to_rank_map[j] = -1; - } - for (j = 0; j < count; j++) { - indextmp = comm->rank_to_index_map[rankkeymap[j * 2]]; - tempcomm->rank_to_index_map[j] = indextmp; - tempcomm->index_to_rank_map[indextmp] = j; - } - for (j = 0; j < count; j++) { - if (rankkeymap[j * 2]) { - retval = smpi_create_request(&j, 1, MPI_INT, 0, - rankkeymap[j * 2], 0, comm, &request); - request->data = tempcomm; - smpi_mpi_isend(request); - smpi_mpi_wait(request, &status); - xbt_mallocator_release(smpi_global->request_mallocator, request); - } else { - *comm_out = tempcomm; - } - } - } + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; } else { - colorkey[0] = color; - colorkey[1] = key; - retval = smpi_create_request(colorkey, 2, MPI_INT, rank, 0, 0, comm, - &request); - smpi_mpi_isend(request); - smpi_mpi_wait(request, &status); - xbt_mallocator_release(smpi_global->request_mallocator, request); - if (MPI_UNDEFINED != color) { - retval = smpi_create_request(colorkey, 1, MPI_INT, 0, rank, 0, comm, - &request); - smpi_mpi_irecv(request); - smpi_mpi_wait(request, &status); - *comm_out = request->data; - } + smpi_mpi_bcast(buf, count, datatype, root, comm); + retval = MPI_SUCCESS; } + smpi_bench_begin(); + return retval; +} + +int MPI_Barrier(MPI_Comm comm) { + int retval; + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else { + smpi_mpi_barrier(comm); + retval = MPI_SUCCESS; + } smpi_bench_begin(); + return retval; +} + +int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { + int retval; + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(sendtype == MPI_DATATYPE_NULL || recvtype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else { + smpi_mpi_gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); return retval; } -double SMPI_MPI_Wtime(void) -{ - double time; +int MPI_Gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) { + int retval; smpi_bench_end(); - time = SIMIX_get_clock(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(sendtype == MPI_DATATYPE_NULL || recvtype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(recvcounts == NULL || displs == NULL) { + retval = MPI_ERR_ARG; + } else { + smpi_mpi_gatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm); + retval = MPI_SUCCESS; + } smpi_bench_begin(); - return time; + return retval; } -int SMPI_MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, - void* recvbuf, int recvcount, MPI_Datatype recvtype, - int root, MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int system_tag = 666; - int rank, size; - - smpi_bench_end(); - rank = smpi_mpi_comm_rank(comm); - size = comm->size; - if(rank != root) { - // Send buffer to root - smpi_mpi_request_t request; - - retval = smpi_create_request(sendbuf, sendcount, sendtype, - rank, root, system_tag, comm, &request); - smpi_mpi_isend(request); - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, request); - } else { - // Receive buffers from senders - int src; - smpi_mpi_request_t* requests; - - requests = xbt_malloc((size-1) * sizeof(smpi_mpi_request_t)); - for(src = 0; src < size; src++) { - if(src == root) { - // Local copy from root - memcpy(&((char*)recvbuf)[src*recvcount*recvtype->size], - sendbuf, sendcount*sendtype->size*sizeof(char)); - } else { - int index = src < root ? src : src - 1; - retval = smpi_create_request(&((char*)recvbuf)[src*recvcount*recvtype->size], - recvcount, recvtype, src, root, system_tag, - comm, &requests[index]); - if(NULL != requests[index] && MPI_SUCCESS == retval) { - smpi_mpi_irecv(requests[index]); - } - } - } - // Wait for completion of irecv's. - for(src = 0; src < size - 1; src++) { - int index = MPI_UNDEFINED; - smpi_mpi_waitany(size - 1, requests, &index, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); - } - xbt_free(requests); +int MPI_Allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { + int retval; + + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(sendtype == MPI_DATATYPE_NULL || recvtype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else { + smpi_mpi_allgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm); + retval = MPI_SUCCESS; } smpi_bench_begin(); return retval; } -int SMPI_MPI_Gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, - void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, - int root, MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int system_tag = 666; - int rank, size; +int MPI_Allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) { + int retval; smpi_bench_end(); - rank = smpi_mpi_comm_rank(comm); - size = comm->size; - if(rank != root) { - // Send buffer to root - smpi_mpi_request_t request; + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(sendtype == MPI_DATATYPE_NULL || recvtype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(recvcounts == NULL || displs == NULL) { + retval = MPI_ERR_ARG; + } else { + smpi_mpi_allgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} + +int MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { + int retval; - retval = smpi_create_request(sendbuf, sendcount, sendtype, - rank, root, system_tag, comm, &request); - smpi_mpi_isend(request); - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, request); + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(sendtype == MPI_DATATYPE_NULL || recvtype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; } else { - // Receive buffers from senders - int src; - smpi_mpi_request_t* requests; - - requests = xbt_malloc((size-1) * sizeof(smpi_mpi_request_t)); - for(src = 0; src < size; src++) { - if(src == root) { - // Local copy from root - memcpy(&((char*)recvbuf)[displs[src]], - sendbuf, sendcount*sendtype->size*sizeof(char)); - } else { - int index = src < root ? src : src - 1; - retval = smpi_create_request(&((char*)recvbuf)[displs[src]], - recvcounts[src], recvtype, src, root, system_tag, - comm, &requests[index]); - if(NULL != requests[index] && MPI_SUCCESS == retval) { - smpi_mpi_irecv(requests[index]); - } - } - } - // Wait for completion of irecv's. - for(src = 0; src < size - 1; src++) { - int index = MPI_UNDEFINED; - smpi_mpi_waitany(size - 1, requests, &index, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); - } - xbt_free(requests); + smpi_mpi_scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm); + retval = MPI_SUCCESS; } smpi_bench_begin(); return retval; } -int SMPI_MPI_Scatterv(void* sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, - void* recvbuf, int recvcount, MPI_Datatype recvtype, - int root, MPI_Comm comm) -{ - int retval = MPI_SUCCESS; - int system_tag = 666; - int rank, size; +int MPI_Scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { + int retval; smpi_bench_end(); - rank = smpi_mpi_comm_rank(comm); - size = comm->size; - if(rank != root) { - // Receive buffer from root - smpi_mpi_request_t request; + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(sendtype == MPI_DATATYPE_NULL || recvtype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(sendcounts == NULL || displs == NULL) { + retval = MPI_ERR_ARG; + } else { + smpi_mpi_scatterv(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - retval = smpi_create_request(recvbuf, recvcount, recvtype, - root, rank, system_tag, comm, &request); - smpi_mpi_isend(request); - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, request); +int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) { + int retval; + + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(datatype == MPI_DATATYPE_NULL || op == MPI_OP_NULL) { + retval = MPI_ERR_ARG; } else { - // Send buffers to receivers - int dst; - smpi_mpi_request_t* requests; + smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, root, comm); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} - requests = xbt_malloc((size-1) * sizeof(smpi_mpi_request_t)); - for(dst = 0; dst < size; dst++) { - if(dst == root) { - // Local copy from root - memcpy(recvbuf, &((char*)sendbuf)[displs[dst]], - sendcounts[dst]*sendtype->size*sizeof(char)); - } else { - int index = dst < root ? dst : dst - 1; - retval = smpi_create_request(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, - root, dst, system_tag, comm, &requests[index]); - if(NULL != requests[index] && MPI_SUCCESS == retval) { - smpi_mpi_isend(requests[index]); - } - } - } - // Wait for completion of isend's. - for(dst = 0; dst < size - 1; dst++) { - int index = MPI_UNDEFINED; - smpi_mpi_waitany(size - 1, requests, &index, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); - } - xbt_free(requests); +int MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { + int retval; + + smpi_bench_end(); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(datatype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(op == MPI_OP_NULL) { + retval = MPI_ERR_OP; + } else { + smpi_mpi_allreduce(sendbuf, recvbuf, count, datatype, op, comm); + retval = MPI_SUCCESS; } smpi_bench_begin(); return retval; } -int SMPI_MPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts, - MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) -{ - // FIXME: Suboptimal implementation - int retval = MPI_SUCCESS; - int count = 0; - int root = 0; - int i, rank; +int MPI_Reduce_scatter(void* sendbuf, void* recvbuf, int* recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { + int retval, i, rank, size, count; int* displs; smpi_bench_end(); - rank = smpi_mpi_comm_rank(comm); - displs = xbt_new(int, comm->size); - for(i = 0; i < comm->size; i++) { - count += recvcounts[i]; - displs[i] = 0; + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(datatype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(op == MPI_OP_NULL) { + retval = MPI_ERR_OP; + } else if(recvcounts == NULL) { + retval = MPI_ERR_ARG; + } else { + /* arbitrarily choose root as rank 0 */ + /* TODO: faster direct implementation ? */ + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + count = 0; + displs = xbt_new(int, size); + for(i = 0; i < size; i++) { + count += recvcounts[i]; + displs[i] = 0; + } + smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm); + smpi_mpi_scatterv(recvbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm); + xbt_free(displs); + retval = MPI_SUCCESS; } - retval = smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, root, comm); - retval = SMPI_MPI_Scatterv(recvbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, root, comm); - xbt_free(displs); smpi_bench_begin(); return retval; } -int SMPI_MPI_Allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, - void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) -{ - // FIXME: Suboptimal implementation - int root = 0; - int retval; +/** + * MPI_Alltoall user entry point + * + * Uses the logic of OpenMPI (upto 1.2.7 or greater) for the optimizations + * ompi/mca/coll/tuned/coll_tuned_module.c + **/ + +int MPI_Alltoall(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { + int retval, size, sendsize; smpi_bench_end(); - retval = SMPI_MPI_Gather(sendbuf, sendcount, sendtype, - recvbuf, recvcount, recvtype, root, comm); - if(retval == MPI_SUCCESS) { - retval = SMPI_MPI_Bcast(recvbuf, recvcount, recvtype, root, comm); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(sendtype == MPI_DATATYPE_NULL || recvtype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else { + size = smpi_comm_size(comm); + sendsize = smpi_datatype_size(sendtype) * sendcount; + if(sendsize < 200 && size > 12) { + retval = smpi_coll_tuned_alltoall_bruck(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm); + } else if(sendsize < 3000) { + retval = smpi_coll_tuned_alltoall_basic_linear(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm); + } else { + retval = smpi_coll_tuned_alltoall_pairwise(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm); + } } smpi_bench_begin(); return retval; } -int SMPI_MPI_Allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, - void* recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, - MPI_Comm comm) -{ - // FIXME: Suboptimal implementation - int root = 0; - int last, retval; +int MPI_Alltoallv(void* sendbuf, int* sendcounts, int* senddisps, MPI_Datatype sendtype, void* recvbuf, int *recvcounts, int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm) { + int retval; smpi_bench_end(); - retval = SMPI_MPI_Gatherv(sendbuf, sendcount, sendtype, - recvbuf, recvcounts, displs, recvtype, root, comm); - if(retval == MPI_SUCCESS) { - last = comm->size - 1; - retval = SMPI_MPI_Bcast(recvbuf, displs[last] + recvcounts[last], recvtype, root, comm); + if(comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(sendtype == MPI_DATATYPE_NULL || recvtype == MPI_DATATYPE_NULL) { + retval = MPI_ERR_TYPE; + } else if(sendcounts == NULL || senddisps == NULL || recvcounts == NULL || recvdisps == NULL) { + retval = MPI_ERR_ARG; + } else { + retval = smpi_coll_basic_alltoallv(sendbuf, sendcounts, senddisps, sendtype, recvbuf, recvcounts, recvdisps, recvtype, comm); } smpi_bench_begin(); return retval; diff --git a/src/smpi/smpi_mpi_dt.c b/src/smpi/smpi_mpi_dt.c index 8e74105fef..30575b5e17 100644 --- a/src/smpi/smpi_mpi_dt.c +++ b/src/smpi/smpi_mpi_dt.c @@ -21,81 +21,362 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi, "Logging specific to SMPI (datatype)"); +typedef struct s_smpi_mpi_datatype { + size_t size; + MPI_Aint lb; + MPI_Aint ub; + int flags; +} s_smpi_mpi_datatype_t; -/** - * Get the lower bound and extent for a Datatype - * The extent of a datatype is defined to be the span from the first byte to the last byte - * occupied by entries in this datatype, rounded up to satisfy alignment requirements (epsilon). - * - * For typemap T = {(t_0,disp_0), ..., (t_n-1,disp_n-1)} - * lb(T) = min_j disp_j - * ub(T) = max_j (disp_j+sizeof(t_j)) + epsilon - * extent(T) = ub(T) - lb(T) - * - * FIXME: this an incomplete implementation as we do not support yet MPI_Type_commit. - * Hence, this can be called only for primitive type MPI_INT, MPI_DOUBLE, ... - * - * remark: MPI-1 has also the deprecated - * int MPI_Type_extent(MPI_Datatype datatype, *MPI_Aint *extent); - * - **/ -int smpi_mpi_type_get_extent(MPI_Datatype datatype, MPI_Aint *lb, MPI_Aint *extent) { - - if ( DT_FLAG_COMMITED != (datatype-> flags & DT_FLAG_COMMITED) ) - return( MPI_ERR_TYPE ); - *lb = datatype->lb; - *extent = datatype->ub - datatype->lb; - return( MPI_SUCCESS ); -} - - -/** - * query extent and lower bound of the type - **/ -int SMPI_MPI_Type_get_extent( MPI_Datatype datatype, MPI_Aint *lb, MPI_Aint *extent) -{ - return( smpi_mpi_type_get_extent( datatype, lb, extent)); -} - -/** - * query the size of the type - **/ -int SMPI_MPI_Type_size(MPI_Datatype datatype, size_t * size) -{ - int retval = MPI_SUCCESS; - - smpi_bench_end(); - - if (NULL == datatype) { +#define CREATE_MPI_DATATYPE(name, type) \ + static s_smpi_mpi_datatype_t mpi_##name = { \ + sizeof(type), /* size */ \ + 0, /* lb */ \ + sizeof(type), /* ub = lb + size */ \ + DT_FLAG_BASIC /* flags */ \ + }; \ + MPI_Datatype name = &mpi_##name; + +// Predefined data types +CREATE_MPI_DATATYPE(MPI_CHAR, char); +CREATE_MPI_DATATYPE(MPI_SHORT, short); +CREATE_MPI_DATATYPE(MPI_INT, int); +CREATE_MPI_DATATYPE(MPI_LONG, long); +CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long); +CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char); +CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char); +CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short); +CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int); +CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long); +CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long); +CREATE_MPI_DATATYPE(MPI_FLOAT, float); +CREATE_MPI_DATATYPE(MPI_DOUBLE, double); +CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double); +CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t); +CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool); +CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t); +CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t); +CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t); +CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t); +CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t); +CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t); +CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t); +CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t); +CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex); +CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex); +CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex); +CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint); +CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset); + +size_t smpi_datatype_size(MPI_Datatype datatype) { + return datatype->size; +} + +MPI_Aint smpi_datatype_lb(MPI_Datatype datatype) { + return datatype->lb; +} + +MPI_Aint smpi_datatype_ub(MPI_Datatype datatype) { + return datatype->ub; +} + +int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint* lb, MPI_Aint * extent) { + int retval; + + if((datatype->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) { retval = MPI_ERR_TYPE; - } else if (NULL == size) { - retval = MPI_ERR_ARG; } else { - *size = datatype->size; + *lb = datatype->lb; + *extent = datatype->ub - datatype->lb; + retval = MPI_SUCCESS; } + return MPI_SUCCESS; +} - smpi_bench_begin(); +int smpi_datatype_copy(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype) { + int retval, count; - return retval; + /* First check if we really have something to do */ + if(recvcount == 0) { + retval = sendcount == 0 ? MPI_SUCCESS : MPI_ERR_TRUNCATE; + } else if(sendtype == recvtype) { + /* If same datatypes used, just copy. */ + count = sendcount < recvcount ? sendcount : recvcount; + memcpy(recvbuf, sendbuf, smpi_datatype_size(sendtype) * count); + retval = sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS; + } else { + /* FIXME: cases + * - If receive packed. + * - If send packed + * to be treated once we have the MPI_Pack things ... + **/ + retval = MPI_SUCCESS; + } + return retval; } +typedef struct s_smpi_mpi_op { + MPI_User_function* func; +} s_smpi_mpi_op_t; + +#define MAX_OP(a, b) (b) = (a) < (b) ? (b) : (a) +#define MIN_OP(a, b) (b) = (a) < (b) ? (a) : (b) +#define SUM_OP(a, b) (b) += (a) +#define PROD_OP(a, b) (b) *= (a) +#define LAND_OP(a, b) (b) = (a) && (b) +#define LOR_OP(a, b) (b) = (a) || (b) +#define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b)) +#define BAND_OP(a, b) (b) &= (a) +#define BOR_OP(a, b) (b) |= (a) +#define BXOR_OP(a, b) (b) ^= (a) +//TODO : MINLOC & MAXLOC + +#define APPLY_FUNC(a, b, length, type, func) \ + { \ + int i; \ + type* x = (type*)(a); \ + type* y = (type*)(b); \ + for(i = 0; i < *(length); i++) { \ + func(x[i], y[i]); \ + } \ + } -/* Deprecated Functions. - * The MPI-2 standard deprecated a number of routines because MPI-2 provides better versions. - * This routine is one of those that was deprecated. The routine may continue to be used, but - * new code should use the replacement routine. The replacement for this routine is MPI_Type_Get_extent. - **/ -int SMPI_MPI_Type_ub( MPI_Datatype datatype, MPI_Aint *displacement) -{ - if ( DT_FLAG_COMMITED != (datatype->flags & DT_FLAG_COMMITED) ) - return( MPI_ERR_TYPE ); - *displacement = datatype->ub; - return( MPI_SUCCESS ); +static void max_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, MAX_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, MAX_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, MAX_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, MAX_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, MAX_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, MAX_OP); + } else if(*datatype == MPI_FLOAT) { + APPLY_FUNC(a, b, length, float, MAX_OP); + } else if(*datatype == MPI_DOUBLE) { + APPLY_FUNC(a, b, length, double, MAX_OP); + } else if(*datatype == MPI_LONG_DOUBLE) { + APPLY_FUNC(a, b, length, long double, MAX_OP); + } +} + +static void min_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, MIN_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, MIN_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, MIN_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, MIN_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, MIN_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, MIN_OP); + } else if(*datatype == MPI_FLOAT) { + APPLY_FUNC(a, b, length, float, MIN_OP); + } else if(*datatype == MPI_DOUBLE) { + APPLY_FUNC(a, b, length, double, MIN_OP); + } else if(*datatype == MPI_LONG_DOUBLE) { + APPLY_FUNC(a, b, length, long double, MIN_OP); + } +} + +static void sum_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, SUM_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, SUM_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, SUM_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, SUM_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, SUM_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, SUM_OP); + } else if(*datatype == MPI_FLOAT) { + APPLY_FUNC(a, b, length, float, SUM_OP); + } else if(*datatype == MPI_DOUBLE) { + APPLY_FUNC(a, b, length, double, SUM_OP); + } else if(*datatype == MPI_LONG_DOUBLE) { + APPLY_FUNC(a, b, length, long double, SUM_OP); + } else if(*datatype == MPI_C_FLOAT_COMPLEX) { + APPLY_FUNC(a, b, length, float _Complex, SUM_OP); + } else if(*datatype == MPI_C_DOUBLE_COMPLEX) { + APPLY_FUNC(a, b, length, double _Complex, SUM_OP); + } else if(*datatype == MPI_C_LONG_DOUBLE_COMPLEX) { + APPLY_FUNC(a, b, length, long double _Complex, SUM_OP); + } +} + +static void prod_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, PROD_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, PROD_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, PROD_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, PROD_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, PROD_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, PROD_OP); + } else if(*datatype == MPI_FLOAT) { + APPLY_FUNC(a, b, length, float, PROD_OP); + } else if(*datatype == MPI_DOUBLE) { + APPLY_FUNC(a, b, length, double, PROD_OP); + } else if(*datatype == MPI_LONG_DOUBLE) { + APPLY_FUNC(a, b, length, long double, PROD_OP); + } else if(*datatype == MPI_C_FLOAT_COMPLEX) { + APPLY_FUNC(a, b, length, float _Complex, PROD_OP); + } else if(*datatype == MPI_C_DOUBLE_COMPLEX) { + APPLY_FUNC(a, b, length, double _Complex, PROD_OP); + } else if(*datatype == MPI_C_LONG_DOUBLE_COMPLEX) { + APPLY_FUNC(a, b, length, long double _Complex, PROD_OP); + } } -int SMPI_MPI_Type_lb( MPI_Datatype datatype, MPI_Aint *displacement) -{ - if ( DT_FLAG_COMMITED != (datatype->flags & DT_FLAG_COMMITED) ) - return( MPI_ERR_TYPE ); - *displacement = datatype->lb; - return( MPI_SUCCESS ); + +static void land_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, LAND_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, LAND_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, LAND_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, LAND_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, LAND_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, LAND_OP); + } else if(*datatype == MPI_C_BOOL) { + APPLY_FUNC(a, b, length, _Bool, LAND_OP); + } +} + +static void lor_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, LOR_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, LOR_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, LOR_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, LOR_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, LOR_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, LOR_OP); + } else if(*datatype == MPI_C_BOOL) { + APPLY_FUNC(a, b, length, _Bool, LOR_OP); + } +} + +static void lxor_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, LXOR_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, LXOR_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, LXOR_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, LXOR_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, LXOR_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, LXOR_OP); + } else if(*datatype == MPI_C_BOOL) { + APPLY_FUNC(a, b, length, _Bool, LXOR_OP); + } +} + +static void band_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, BAND_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, BAND_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, BAND_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, BAND_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, BAND_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, BAND_OP); + } else if(*datatype == MPI_BYTE) { + APPLY_FUNC(a, b, length, uint8_t, BAND_OP); + } +} + +static void bor_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, BOR_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, BOR_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, BOR_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, BOR_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, BOR_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, BOR_OP); + } else if(*datatype == MPI_BYTE) { + APPLY_FUNC(a, b, length, uint8_t, BOR_OP); + } +} + +static void bxor_func(void* a, void* b, int* length, MPI_Datatype* datatype) { + if(*datatype == MPI_SHORT) { + APPLY_FUNC(a, b, length, short, BXOR_OP); + } else if(*datatype == MPI_INT) { + APPLY_FUNC(a, b, length, int, BXOR_OP); + } else if(*datatype == MPI_LONG) { + APPLY_FUNC(a, b, length, long, BXOR_OP); + } else if(*datatype == MPI_UNSIGNED_SHORT) { + APPLY_FUNC(a, b, length, unsigned short, BXOR_OP); + } else if(*datatype == MPI_UNSIGNED) { + APPLY_FUNC(a, b, length, unsigned int, BXOR_OP); + } else if(*datatype == MPI_UNSIGNED_LONG) { + APPLY_FUNC(a, b, length, unsigned long, BXOR_OP); + } else if(*datatype == MPI_BYTE) { + APPLY_FUNC(a, b, length, uint8_t, BXOR_OP); + } +} + +#define CREATE_MPI_OP(name, func) \ + static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */ }; \ + MPI_Op name = &mpi_##name; + +CREATE_MPI_OP(MPI_MAX, max_func); +CREATE_MPI_OP(MPI_MIN, min_func); +CREATE_MPI_OP(MPI_SUM, sum_func); +CREATE_MPI_OP(MPI_PROD, prod_func); +CREATE_MPI_OP(MPI_LAND, land_func); +CREATE_MPI_OP(MPI_LOR, lor_func); +CREATE_MPI_OP(MPI_LXOR, lxor_func); +CREATE_MPI_OP(MPI_BAND, band_func); +CREATE_MPI_OP(MPI_BOR, bor_func); +CREATE_MPI_OP(MPI_BXOR, bxor_func); + +MPI_Op smpi_op_new(MPI_User_function* function, int commute) { + MPI_Op op; + + //FIXME: add commute param + op = xbt_new(s_smpi_mpi_op_t, 1); + op->func = function; + return op; +} + +void smpi_op_destroy(MPI_Op op) { + xbt_free(op); +} + +void smpi_op_apply(MPI_Op op, void* invec, void* inoutvec, int* len, MPI_Datatype* datatype) { + op->func(invec, inoutvec, len, datatype); } diff --git a/src/smpi/smpi_mpi_dt_private.h b/src/smpi/smpi_mpi_dt_private.h index 47105276d7..cb548a9993 100644 --- a/src/smpi/smpi_mpi_dt_private.h +++ b/src/smpi/smpi_mpi_dt_private.h @@ -6,7 +6,6 @@ **/ #include "private.h" -/* flags for the datatypes. */ #define DT_FLAG_DESTROYED 0x0001 /**< user destroyed but some other layers still have a reference */ #define DT_FLAG_COMMITED 0x0002 /**< ready to be used for a send/recv operation */ #define DT_FLAG_CONTIGUOUS 0x0004 /**< contiguous datatype */ @@ -25,16 +24,3 @@ * datatypes. The DT_FLAG_BASIC is held by all predefined contiguous datatypes. */ #define DT_FLAG_BASIC (DT_FLAG_PREDEFINED | DT_FLAG_CONTIGUOUS | DT_FLAG_NO_GAPS | DT_FLAG_DATA | DT_FLAG_COMMITED) - - - -int smpi_mpi_type_get_extent(MPI_Datatype datatype, MPI_Aint *lb, MPI_Aint *extent); - -/* Deprecated Functions. - * The MPI-2 standard deprecated a number of routines because MPI-2 provides better versions. - * This routine is one of those that was deprecated. The routine may continue to be used, but - * new code should use the replacement routine. The replacement for this routine is MPI_Type_Get_extent. - **/ -int SMPI_MPI_Type_ub( MPI_Datatype datatype, MPI_Aint *displacement); -int SMPI_MPI_Type_lb( MPI_Datatype datatype, MPI_Aint *displacement); - diff --git a/src/smpi/smpi_receiver.c b/src/smpi/smpi_receiver.c deleted file mode 100644 index 999f5612fe..0000000000 --- a/src/smpi/smpi_receiver.c +++ /dev/null @@ -1,102 +0,0 @@ -#include "private.h" - -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_receiver, smpi, - "Logging specific to SMPI (receiver)"); - -int smpi_receiver(int argc, char *argv[]) -{ - smpi_process_data_t mydata = SIMIX_process_get_data(SIMIX_process_self()); - smx_process_t self; - int index = mydata->index; - - xbt_fifo_t request_queue; - xbt_fifo_t message_queue; - - smpi_mpi_request_t request; - smpi_received_message_t message; - - xbt_fifo_item_t request_item; - xbt_fifo_item_t message_item; - - self = SIMIX_process_self(); - - request_queue = mydata->pending_recv_request_queue; - message_queue = mydata->received_message_queue; - - DEBUG0("Up and running"); - - - while (1) { - // FIXME: better algorithm, maybe some kind of balanced tree? or a heap? - - DEBUG0("Look for matching"); - xbt_fifo_foreach(request_queue, request_item, request, smpi_mpi_request_t) { - xbt_fifo_foreach(message_queue, message_item, message, - smpi_received_message_t) { - -//#define DEBUG_MATCH -#ifdef DEBUG_MATCH - printf("[%s] try match (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)\n", - __FILE__,request->src,message->src,request->tag, message->tag); -#endif - if (request->comm == message->comm && - (MPI_ANY_SOURCE == request->src || request->src == message->src) - && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) { - xbt_fifo_remove_item(request_queue, request_item); - xbt_fifo_free_item(request_item); - xbt_fifo_remove_item(message_queue, message_item); - xbt_fifo_free_item(message_item); - DEBUG5("found matching request %p: (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)", - request,request->src,message->src,request->tag, message->tag); - goto stopsearch; - } else { - DEBUG5("fail to match request %p: (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)", - request,request->src,message->src,request->tag, message->tag); - } - } - } - - request = NULL; - message = NULL; - - stopsearch: - if (NULL != request) { - if (NULL == message) - DIE_IMPOSSIBLE; - - SIMIX_mutex_lock(request->mutex); - memcpy(request->buf, message->buf, - request->datatype->size * request->count); - request->src = message->src; - request->data = message->data; - request->forward = message->forward; - - if (0 == request->forward) { - request->completed = 1; - SIMIX_cond_broadcast(request->cond); - } else { - request->src = request->comm->index_to_rank_map[index]; - request->dst = (request->src + 1) % request->comm->size; - smpi_mpi_isend(request); - } - - SIMIX_mutex_unlock(request->mutex); - - xbt_free(message->buf); - xbt_mallocator_release(smpi_global->message_mallocator, message); - - } else if (mydata->finalize > 0) { /* main wants me to die and nothing to do */ - // FIXME: display the list of remaining requests and messages (user code synchronization faulty?) - DEBUG0("Main wants me to die and I'm done. Bye, guys."); - mydata->finalize--; - SIMIX_cond_signal(mydata->cond); - return 0; - } else { - DEBUG0("Nothing to do. Let's get a nap"); - SIMIX_process_suspend(self); - DEBUG0("=== Uh? Someone called me? ==="); - } - } - - return 0; -} diff --git a/src/smpi/smpi_sender.c b/src/smpi/smpi_sender.c deleted file mode 100644 index eb5e727d58..0000000000 --- a/src/smpi/smpi_sender.c +++ /dev/null @@ -1,110 +0,0 @@ -#include "private.h" - -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_sender, smpi, - "Logging specific to SMPI (sender)"); - -int smpi_sender(int argc, char *argv[]) -{ - smpi_process_data_t mydata = SIMIX_process_get_data(SIMIX_process_self()); - smx_process_t self; - smx_host_t shost; - - int index; - - xbt_fifo_t request_queue; - - smpi_mpi_request_t request; - - smx_host_t dhost; - - smx_action_t action; - - e_surf_action_state_t state; - - smpi_received_message_t message; - - int dindex; - - self = SIMIX_process_self(); - shost = SIMIX_host_self(); - - index = mydata->index; - - DEBUG0("Up and running"); - - request_queue = mydata->pending_send_request_queue; - - while (1) { - request = xbt_fifo_shift(request_queue); - - if (NULL != request) { - message = xbt_mallocator_get(smpi_global->message_mallocator); - - SIMIX_mutex_lock(request->mutex); - - message->comm = request->comm; - message->src = request->comm->index_to_rank_map[index]; - message->tag = request->tag; - message->data = request->data; - message->buf = xbt_malloc(request->datatype->size * request->count); - memcpy(message->buf, request->buf, - request->datatype->size * request->count); - - dindex = request->comm->rank_to_index_map[request->dst]; - smpi_process_data_t remote_process = - SIMIX_process_get_data(smpi_global->main_processes[dindex]); - dhost = SIMIX_process_get_host(smpi_global->main_processes[dindex]); - - DEBUG4("handle send request %p to %s (req_dst=%d,req_tag=%d)", - request,SIMIX_host_get_name(dhost),request->dst,message->tag); - message->forward = (request->forward - 1) / 2; - request->forward = request->forward / 2; - - if (0 < request->forward) { - request->dst = - (request->dst + message->forward + 1) % request->comm->size; - xbt_fifo_push(request_queue, request); - } else { - DEBUG4("DONE Handling send request %p to %s (req_dst=%d,req_tag=%d)", - request, SIMIX_host_get_name(dhost),request->dst,message->tag); - request->completed = 1; - } - - action = - SIMIX_action_communicate(shost, dhost, "communication", - request->datatype->size * request->count, - -1.0); - - SIMIX_register_action_to_condition(action, request->cond); - - for (state = SIMIX_action_get_state(action); - state == SURF_ACTION_READY || - state == SURF_ACTION_RUNNING; - state = SIMIX_action_get_state(action) - ) { - SIMIX_cond_wait(request->cond, request->mutex); - } - - xbt_fifo_push(remote_process->received_message_queue, message); - - SIMIX_unregister_action_to_condition(action, request->cond); - SIMIX_action_destroy(action); - - SIMIX_mutex_unlock(request->mutex); - - // wake up receiver if necessary - SIMIX_process_resume(remote_process->receiver); - - } else if (mydata->finalize > 0) { /* main wants me to die and nothing to do */ - DEBUG0("===Main wants me to die and I'm done. Bye, guys.==="); - mydata->finalize--; - SIMIX_cond_signal(mydata->cond); - return 0; - } else { - DEBUG0("Nothing to do. Let's get a nap"); - SIMIX_process_suspend(self); - DEBUG0("===Uh? Someone called me?==="); - } - } - return 0; -} diff --git a/src/smpi/smpi_util.c b/src/smpi/smpi_util.c index 91649a8c35..1e2fb2c079 100644 --- a/src/smpi/smpi_util.c +++ b/src/smpi/smpi_util.c @@ -3,6 +3,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_util, smpi, "Logging specific to SMPI (utilities)"); +/* int smpi_gettimeofday(struct timeval *tv, struct timezone *tz) { double now; @@ -23,34 +24,13 @@ int smpi_gettimeofday(struct timeval *tv, struct timezone *tz) unsigned int smpi_sleep(unsigned int seconds) { smx_host_t host; - smx_mutex_t mutex; - smx_cond_t cond; smx_action_t action; - e_surf_action_state_t state; smpi_bench_end(); - host = SIMIX_host_self(); - mutex = smpi_process_mutex(); - cond = smpi_process_cond(); - - SIMIX_mutex_lock(mutex); - - // FIXME: explicit conversion to double? - action = SIMIX_action_sleep(host, seconds); - - SIMIX_register_action_to_condition(action, cond); - for (state = SIMIX_action_get_state(action); - state == SURF_ACTION_READY || - state == SURF_ACTION_RUNNING; state = SIMIX_action_get_state(action) - ) { - SIMIX_cond_wait(cond, mutex); - } - SIMIX_unregister_action_to_condition(action, cond); + action = SIMIX_action_sleep(host, (double)seconds); + smpi_process_wait_action(action); SIMIX_action_destroy(action); - - SIMIX_mutex_unlock(mutex); - smpi_bench_begin(); return 0; } @@ -62,3 +42,4 @@ void smpi_exit(int status) SIMIX_process_kill(SIMIX_process_self()); return; } +*/ diff --git a/src/smpi/smpicc.in b/src/smpi/smpicc.in index 42f87574a3..886399a6b1 100755 --- a/src/smpi/smpicc.in +++ b/src/smpi/smpicc.in @@ -4,6 +4,7 @@ prefix="@prefix@" exec_prefix="@exec_prefix@" CC="gcc" +CFLAGS="-Dmain=smpi_simulated_main" #FIXME: probably only want the last two pairs after 'make install' INCLUDEARGS="-I@top_srcdir@/include -I@top_srcdir@/include/smpi -I@includedir@ -I@includedir@/smpi" -- 2.20.1