#define MPI_ERR_COUNT 6
#define MPI_ERR_RANK 7
#define MPI_ERR_TAG 8
+
// MPI_Comm
typedef struct smpi_mpi_communicator_t *smpi_mpi_communicator_t;
typedef smpi_mpi_communicator_t MPI_Comm;
smpi_mpi_datatype_t mpi_byte;
smpi_mpi_datatype_t mpi_int;
+ smpi_mpi_datatype_t mpi_float;
smpi_mpi_datatype_t mpi_double;
smpi_mpi_op_t mpi_land;
smpi_mpi_op_t mpi_sum;
+ smpi_mpi_op_t mpi_min;
+ smpi_mpi_op_t mpi_max;
} s_smpi_mpi_global_t;
typedef struct smpi_mpi_global_t *smpi_mpi_global_t;
#define MPI_STATUS_IGNORE NULL
#define MPI_BYTE (smpi_mpi_global->mpi_byte)
-#define MPI_DOUBLE (smpi_mpi_global->mpi_double)
#define MPI_INT (smpi_mpi_global->mpi_int)
+#define MPI_FLOAT (smpi_mpi_global->mpi_float)
+#define MPI_DOUBLE (smpi_mpi_global->mpi_double)
#define MPI_LAND (smpi_mpi_global->mpi_land)
#define MPI_SUM (smpi_mpi_global->mpi_sum)
+#define MPI_MIN (smpi_mpi_global->mpi_min)
+#define MPI_MAX (smpi_mpi_global->mpi_max)
// MPI macros
#define MPI_Init(a, b) SMPI_MPI_Init(a, b)
#define MPI_Wait(a, b) SMPI_MPI_Wait(a, b)
#define MPI_Comm_split(a, b, c, d) SMPI_MPI_Comm_split(a, b, c, d)
#define MPI_Wtime() SMPI_MPI_Wtime()
+#define MPI_Reduce( a, b, c, d, e, f, g) SMPI_MPI_Reduce( a, b, c, d, e, f, g)
// SMPI Functions
XBT_PUBLIC(int) SMPI_MPI_Init(int *argc, char ***argv);
MPI_Comm * comm_out);
XBT_PUBLIC(double) SMPI_MPI_Wtime(void);
+XBT_PUBLIC(int) SMPI_MPI_Reduce(void *sendbuf, void *recvbuf, int count,
+ MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm);
+
// smpi functions
XBT_IMPORT_NO_EXPORT(int) smpi_simulated_main(int argc, char **argv);
XBT_PUBLIC(unsigned int) smpi_sleep(unsigned int);
--- /dev/null
+#include <stdio.h>
+#include <mpi.h>
+
+int main (int argc, char **argv) {
+ int size, rank;
+ int root=0;
+ int value = 1;
+ int sum=-99;
+
+ double start_timer;
+
+
+ MPI_Init(&argc, &argv);
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+
+ start_timer = MPI_Wtime();
+
+ printf("rank %d has value %d\n", rank, value);
+ MPI_Reduce(&value, &sum, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
+ if ( rank == root) {
+ printf("On root: sum=%d\n",sum);
+ printf("Elapsed time=%lf s\n", MPI_Wtime()-start_timer);
+ }
+ MPI_Finalize();
+ return 0;
+}
smpi_mpi_global_t smpi_mpi_global = NULL;
+/**
+ * Operations of MPI_OP : implemented=land,sum,min,max
+ **/
void smpi_mpi_land_func(void *a, void *b, int *length,
MPI_Datatype * datatype);
}
}
+/**
+ * sum two vectors element-wise
+ *
+ * @param a the first vectors
+ * @param b the second vectors
+ * @return the second vector is modified and contains the element-wise sums
+ **/
void smpi_mpi_sum_func(void *a, void *b, int *length,
MPI_Datatype * datatype);
void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
{
- int i;
- if (*datatype == smpi_mpi_global->mpi_int) {
- int *x = a, *y = b;
- for (i = 0; i < *length; i++) {
- y[i] = x[i] + y[i];
- }
- }
+ int i;
+ if (*datatype == smpi_mpi_global->mpi_byte) {
+ char *x = a, *y = b;
+ for (i = 0; i < *length; i++) {
+ y[i] = x[i] + y[i];
+ }
+ } else {
+ if (*datatype == smpi_mpi_global->mpi_int) {
+ int *x = a, *y = b;
+ for (i = 0; i < *length; i++) {
+ y[i] = x[i] + y[i];
+ }
+ } else {
+ if (*datatype == smpi_mpi_global->mpi_float) {
+ float *x = a, *y = b;
+ for (i = 0; i < *length; i++) {
+ y[i] = x[i] + y[i];
+ }
+ } else {
+ if (*datatype == smpi_mpi_global->mpi_double) {
+ double *x = a, *y = b;
+ for (i = 0; i < *length; i++) {
+ y[i] = x[i] + y[i];
+ }
+ }}}}
}
+/**
+ * compute the min of two vectors element-wise
+ **/
+void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+
+void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype)
+{
+ int i;
+ if (*datatype == smpi_mpi_global->mpi_byte) {
+ char *x = a, *y = b;
+ for (i = 0; i < *length; i++) {
+ y[i] = x[i] < y[i] ? x[i] : y[i];
+ }
+ } else {
+ if (*datatype == smpi_mpi_global->mpi_int) {
+ int *x = a, *y = b;
+ for (i = 0; i < *length; i++) {
+ y[i] = x[i] < y[i] ? x[i] : y[i];
+ }
+ } else {
+ if (*datatype == smpi_mpi_global->mpi_float) {
+ float *x = a, *y = b;
+ for (i = 0; i < *length; i++) {
+ y[i] = x[i] < y[i] ? x[i] : y[i];
+ }
+ } else {
+ if (*datatype == smpi_mpi_global->mpi_double) {
+ double *x = a, *y = b;
+ for (i = 0; i < *length; i++) {
+ y[i] = x[i] < y[i] ? x[i] : y[i];
+ }
+
+ }}}}
+}
+/**
+ * compute the max of two vectors element-wise
+ **/
+void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+
+void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype)
+{
+ int i;
+ if (*datatype == smpi_mpi_global->mpi_byte) {
+ char *x = a, *y = b;
+ for (i = 0; i < *length; i++) {
+ y[i] = x[i] > y[i] ? x[i] : y[i];
+ }
+ } else {
+ if (*datatype == smpi_mpi_global->mpi_int) {
+ int *x = a, *y = b;
+ for (i = 0; i > *length; i++) {
+ y[i] = x[i] < y[i] ? x[i] : y[i];
+ }
+ } else {
+ if (*datatype == smpi_mpi_global->mpi_float) {
+ float *x = a, *y = b;
+ for (i = 0; i > *length; i++) {
+ y[i] = x[i] < y[i] ? x[i] : y[i];
+ }
+ } else {
+ if (*datatype == smpi_mpi_global->mpi_double) {
+ double *x = a, *y = b;
+ for (i = 0; i > *length; i++) {
+ y[i] = x[i] < y[i] ? x[i] : y[i];
+ }
+
+ }}}}
+}
+
+
+
+/**
+ * tell the MPI rank of the calling process (from its SIMIX process id)
+ **/
int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm)
{
return comm->index_to_rank_map[smpi_process_index()];
smpi_mpi_request_t request = NULL;
+ printf("in create-req(): MPI_ANY_SOURCE=%d,src=%d,comm->size=%d\n",MPI_ANY_SOURCE,src,comm->size);
// parameter checking prob belongs in smpi_mpi, but this is less repeat code
if (NULL == buf) {
retval = MPI_ERR_INTERN;
} else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
retval = MPI_ERR_RANK;
} else if (0 > dst || comm->size <= dst) {
+ printf("err MPI_ERR_RANK => MPI_ANY_SOURCE=%d,src=%d,dst=%d,comm->size=%d\n",MPI_ANY_SOURCE,src,dst,comm->size);
retval = MPI_ERR_RANK;
} else if (MPI_ANY_TAG != tag && 0 > tag) {
retval = MPI_ERR_TAG;
return retval;
}
/* FIXME: understand what they do and put the prototypes in a header file (live in smpi_base.c) */
-void smpi_mpi_land_func(void *a, void *b, int *length,
- MPI_Datatype * datatype);
-void smpi_mpi_sum_func(void *a, void *b, int *length,
- MPI_Datatype * datatype);
+void smpi_mpi_land_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype);
+void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype);
void smpi_global_init()
{
smpi_mpi_global->mpi_land->func = smpi_mpi_land_func;
smpi_mpi_global->mpi_sum = xbt_new(s_smpi_mpi_op_t, 1);
smpi_mpi_global->mpi_sum->func = smpi_mpi_sum_func;
+ smpi_mpi_global->mpi_min = xbt_new(s_smpi_mpi_op_t, 1);
+ smpi_mpi_global->mpi_min->func = smpi_mpi_min_func;
+ smpi_mpi_global->mpi_max = xbt_new(s_smpi_mpi_op_t, 1);
+ smpi_mpi_global->mpi_min->func = smpi_mpi_max_func;
}
return smpi_mpi_wait(*request, status);
}
+/**
+ * MPI_Bcast
+ **/
int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
MPI_Comm comm)
{
return retval;
}
+/**
+ * MPI_Reduce
+ **/
+
+int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
+ MPI_Op op, int root, MPI_Comm comm )
+{
+ int retval = MPI_SUCCESS;
+ int rank;
+ int size;
+ int i;
+ smpi_mpi_request_t *tabrequest;
+
+ smpi_bench_end();
+
+ rank = smpi_mpi_comm_rank(comm);
+ size = comm->size;
+
+ tabrequest = malloc((size)*sizeof(smpi_mpi_request_t));
+ if (NULL==tabrequest) {
+ fprintf(stderr,"[smpi] %s:%d : cannot alloc memory for %d requests. Exiting.\n",__FILE__,__LINE__,size);
+ exit(1);
+ }
+
+ if (rank != root) { // if i am not root, simply send my buffer to root
+ retval = smpi_create_request(sendbuf, count, datatype,
+ rank, root, 0, comm, &(tabrequest[rank]));
+ smpi_mpi_isend(tabrequest[rank]);
+ smpi_mpi_wait(tabrequest[rank], MPI_STATUS_IGNORE);
+ //printf("DEBUG: rank %d sent my sendbuf to root (rank %d)\n",rank,root);
+ } else {
+ // i am the root: wait for all buffers by creating requests
+ // i can not use: 'request->forward = size-1;' (which would progagate size-1 receive reqs)
+ // since we should op values as soon as one receiving request matches.
+ for (i=0; i<comm->size; i++) {
+ if ( rank != i ) { // except for me
+ // reminder: for smpi_create_request() the src is always the process sending.
+ retval = smpi_create_request(recvbuf, count, datatype, MPI_ANY_SOURCE, root,
+ 0, comm, &(tabrequest[i]));
+ if (NULL != tabrequest[i] && MPI_SUCCESS == retval) {
+ if (MPI_SUCCESS == retval) {
+ smpi_mpi_irecv(tabrequest[i]);
+ }
+ }
+ }
+ }
+ // now, wait for completion of all irecv's.
+ // FIXME: we should implement smpi_waill_all for a more asynchronous behavior
+ for (i=0; i<comm->size; i++) {
+ if ( rank != i ) { // except for me
+ smpi_mpi_wait(tabrequest[i], MPI_STATUS_IGNORE);
+
+ // FIXME: the core part is here. To be written ...
+ fprintf(stderr,"[smpi] %s:%d : MPI_Reduce *Not yet implemented*.\n",__FILE__,__LINE__);
+
+ }
+
+ }
+ }
+ for (i=0; i<comm->size; i++)
+ xbt_mallocator_release(smpi_global->request_mallocator, tabrequest[i]);
+
+ smpi_bench_begin();
+
+ return retval;
+}
+
// used by comm_split to sort ranks based on key values
int smpi_compare_rankkeys(const void *a, const void *b);
int smpi_compare_rankkeys(const void *a, const void *b)