Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
added smpi to cvs repository. still need to do a lot of integration work.
authormarkls <markls@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 12 Jun 2007 04:25:02 +0000 (04:25 +0000)
committermarkls <markls@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 12 Jun 2007 04:25:02 +0000 (04:25 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3599 48e7efb5-ca39-0410-a469-dd3cf9ba447f

14 files changed:
src/smpi/include/smpi.h [new file with mode: 0644]
src/smpi/sample/allreduce.c [new file with mode: 0644]
src/smpi/sample/alltoall.c [new file with mode: 0644]
src/smpi/sample/bcast.c [new file with mode: 0644]
src/smpi/sample/bcbench.c [new file with mode: 0644]
src/smpi/sample/first.c [new file with mode: 0644]
src/smpi/sample/matrix.c [new file with mode: 0644]
src/smpi/sample/mvmul.c [new file with mode: 0644]
src/smpi/sample/reduce.c [new file with mode: 0644]
src/smpi/sample/ring_c.c [new file with mode: 0644]
src/smpi/scripts/smpicc [new file with mode: 0755]
src/smpi/scripts/smpirun [new file with mode: 0755]
src/smpi/src/smpi_base.c [new file with mode: 0644]
src/smpi/src/smpi_mpi.c [new file with mode: 0644]

diff --git a/src/smpi/include/smpi.h b/src/smpi/include/smpi.h
new file mode 100644 (file)
index 0000000..26de7f2
--- /dev/null
@@ -0,0 +1,121 @@
+#define DEFAULT_POWER 100
+
+#define MPI_ANY_SOURCE -1
+
+// errorcodes
+#define MPI_SUCCESS     0
+#define MPI_ERR_COMM    1
+#define MPI_ERR_ARG     2
+#define MPI_ERR_TYPE    3
+#define MPI_ERR_REQUEST 4
+#define MPI_ERR_INTERN  5
+#define MPI_ERR_COUNT   6
+#define MPI_ERR_RANK    7
+#define MPI_ERR_TAG     8
+
+#include <stdlib.h>
+#include <msg/msg.h>
+
+typedef enum { MPI_PORT = 0, SEND_SYNC_PORT, RECV_SYNC_PORT, MAX_CHANNEL } channel_t;
+
+// MPI_Comm
+struct smpi_mpi_communicator_t {
+  int id;
+  int size;
+  int barrier;
+  m_host_t *hosts;
+  m_process_t *processes;
+};
+typedef struct smpi_mpi_communicator_t smpi_mpi_communicator_t;
+typedef smpi_mpi_communicator_t *MPI_Comm;
+extern smpi_mpi_communicator_t smpi_mpi_comm_world;
+#define MPI_COMM_WORLD (&smpi_mpi_comm_world)
+
+// MPI_Status
+struct smpi_mpi_status_t {
+  int MPI_SOURCE;
+};
+typedef struct smpi_mpi_status_t smpi_mpi_status_t;
+typedef smpi_mpi_status_t MPI_Status;
+extern smpi_mpi_status_t smpi_mpi_status_ignore;
+#define MPI_STATUS_IGNORE (&smpi_mpi_status_ignore)
+
+// MPI_Datatype
+struct smpi_mpi_datatype_t {
+//  int type;
+  size_t size;
+};
+typedef struct smpi_mpi_datatype_t smpi_mpi_datatype_t;
+typedef smpi_mpi_datatype_t *MPI_Datatype;
+// FIXME: add missing datatypes
+extern smpi_mpi_datatype_t smpi_mpi_byte;
+#define MPI_BYTE (&smpi_mpi_byte)
+extern smpi_mpi_datatype_t smpi_mpi_int;
+#define MPI_INT (&smpi_mpi_int)
+extern smpi_mpi_datatype_t smpi_mpi_double;
+#define MPI_DOUBLE (&smpi_mpi_double)
+
+struct smpi_waitlist_node_t {
+  m_process_t process;
+  struct smpi_waitlist_node_t *next;
+};
+typedef struct smpi_waitlist_node_t smpi_waitlist_node_t;
+
+// FIXME: maybe it isn't appropriate to have the next pointer inside
+// MPI_Request
+struct smpi_mpi_request_t {
+  void *buf;
+  int count;
+  smpi_mpi_datatype_t *datatype;
+  int src;
+  int dst;
+  int tag;
+  smpi_mpi_communicator_t *comm;
+  short int completed;
+  smpi_waitlist_node_t *waitlist;
+  struct smpi_mpi_request_t *next;
+  int fwdthrough;
+};
+typedef struct smpi_mpi_request_t smpi_mpi_request_t;
+typedef smpi_mpi_request_t *MPI_Request;
+
+// MPI_Op
+struct smpi_mpi_op_t {
+  void (*func)(void *x, void *y, void *z);
+};
+typedef struct smpi_mpi_op_t smpi_mpi_op_t;
+typedef smpi_mpi_op_t *MPI_Op;
+extern smpi_mpi_op_t smpi_mpi_land;
+#define MPI_LAND (&smpi_mpi_land)
+extern smpi_mpi_op_t smpi_mpi_sum;
+#define MPI_SUM (&smpi_mpi_sum)
+
+// smpi_received_t
+struct smpi_received_t {
+  int commid;
+  int src;
+  int dst;
+  int tag;
+  int fwdthrough;
+  void *data;
+  struct smpi_received_t *next;
+};
+typedef struct smpi_received_t smpi_received_t;
+
+// sender/receiver (called by main routine)
+int smpi_sender(int argc, char *argv[]);
+int smpi_receiver(int argc, char *argv[]);
+
+// smpi functions
+int smpi_comm_rank(smpi_mpi_communicator_t *comm, m_host_t host);
+void smpi_isend(smpi_mpi_request_t*);
+void smpi_irecv(smpi_mpi_request_t*);
+void smpi_barrier(smpi_mpi_communicator_t *comm);
+void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status);
+void smpi_wait_all(int count, smpi_mpi_request_t **requests, smpi_mpi_status_t *statuses);
+void smpi_wait_all_nostatus(int count, smpi_mpi_request_t **requests);
+void smpi_bench_begin();
+void smpi_bench_end();
+int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype, int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request);
+unsigned int smpi_sleep(unsigned int);
+void smpi_exit(int);
diff --git a/src/smpi/sample/allreduce.c b/src/smpi/sample/allreduce.c
new file mode 100644 (file)
index 0000000..1f8cd88
--- /dev/null
@@ -0,0 +1,29 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <mpi.h>
+
+int main(int argc, char *argv[]) {
+  int rank, size;
+  int i;
+  int *sendbuf, *recvbuf;
+  MPI_Init(&argc, &argv);
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+  MPI_Comm_size(MPI_COMM_WORLD, &size);
+  sendbuf = malloc(sizeof(int) * size);
+  recvbuf = malloc(sizeof(int) * size);
+  for (i = 0; i < size; i++) {
+    sendbuf[i] = 0;
+    recvbuf[i] = 0;
+  }
+  sendbuf[rank] = rank + 1;
+  MPI_Allreduce(sendbuf, recvbuf, size, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
+  printf("node %d has: ", rank);
+  for (i = 0; i < size; i++) {
+    printf("%d ", recvbuf[i]);
+  }
+  printf("\n");
+  free(sendbuf);
+  free(recvbuf);
+  MPI_Finalize();
+  return 0;
+}
diff --git a/src/smpi/sample/alltoall.c b/src/smpi/sample/alltoall.c
new file mode 100644 (file)
index 0000000..c11a6dd
--- /dev/null
@@ -0,0 +1,173 @@
+#include "mpi.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+
+#ifndef EXIT_SUCCESS
+#define EXIT_SUCCESS 0
+#define EXIT_FAILURE 1
+#endif
+
+// sandler says, compile with mpicc -v alltoalldemo.c
+// run with mpirun -np 3 a.out -m 5
+
+int main( int argc, char *argv[] )
+{
+    int rank, size;
+    int chunk = 128;
+    int i;
+     int j; // added by sandler
+    int *sb;
+    int *rb;
+    int status, gstatus;
+
+    MPI_Init(&argc,&argv);
+    MPI_Comm_rank(MPI_COMM_WORLD,&rank);
+    MPI_Comm_size(MPI_COMM_WORLD,&size);
+     if (rank==0) {
+        printf("size: %d\n", size);
+    }
+    for ( i=1 ; i < argc ; ++i ) {
+        if ( argv[i][0] != '-' ) {
+                // added by sandler
+                fprintf(stderr, "Unrecognized option %s\n", argv[i]);fflush(stderr);
+            continue;
+            }
+        switch(argv[i][1]) {
+            case 'm':
+                chunk = atoi(argv[++i]);
+                     if (rank==0) {
+                        printf("chunk: %d\n", chunk);
+                    }
+                break;
+            default:
+                fprintf(stderr, "Unrecognized argument %s\n", argv[i]);fflush(stderr);
+                MPI_Abort(MPI_COMM_WORLD,EXIT_FAILURE);
+        }
+    }
+    sb = (int *)malloc(size*chunk*sizeof(int));
+    if ( !sb ) {
+        perror( "can't allocate send buffer" );fflush(stderr);
+        MPI_Abort(MPI_COMM_WORLD,EXIT_FAILURE);
+    }
+    rb = (int *)malloc(size*chunk*sizeof(int));
+    if ( !rb ) {
+        perror( "can't allocate recv buffer");fflush(stderr);
+        free(sb);
+        MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
+    }
+     
+     
+     /* original deino.net:
+    for ( i=0 ; i < size*chunk ; ++i ) {
+        sb[i] = sb[i] = rank + 1;
+        rb[i] = 0;
+    }
+    */
+        // written by sandler
+        
+        if (rank==0) printf("note in the following:\n"
+        "if you were to compare the sending buffer and the receiving buffer on the SAME processor, \n"
+        "you might think that the values were getting wiped out.  However, each row IS going somewhere. \n"
+        "The 0th row of processor 0 goes to the 0th row of processor 0\n"
+        "The 1st row of processor 0 goes to the 0th row of processor 1.  (Go look at rb for processor 1!)\n"
+        "\n"
+        "Too bad the values don't come out in a deterministic order. That's life!\n"
+        "\n"
+        "Now look at the receiving buffer for processor 0.\n"
+        "The 0th row is from processor 0 (itself).\n"
+        "The 1st row on processor 0 is from the 0th row on processor 1. (Go look at the sb of processor 1!)\n"
+        "\n"
+        "Apparently this is the intended behavior.\n"
+        "\n"
+        "Note that each row is always moved as one chunk, unchangeable.\n"
+        "\n"
+        "TODO: draw a diagram\n"
+        );
+        
+        for (i=0; i<size; i++) {
+            for (j=0; j<chunk; j++) {
+                int offset = i*chunk + j; // note the multiplier is chunk, not size
+
+                sb[offset] = rank*100 + i*10 + j;
+                rb[offset] = -1;
+            }
+        }
+    
+    
+        // this clearly shows what is NOT indended to be done, in that the rb on a processor is the same as the sb on the processor 
+        // in this intialization: on processor 0, only the 0th row gets normal values.
+        // on processor 1, only the 1st row gets normal values.
+        // when you look the rb, it looks like nothing happened.  this is because, say, for processor 1, the 1st row got sent to itself.
+        /*
+        for (i=0; i<size; i++) {
+            for (j=0; j<chunk; j++) {
+                int offset = i*chunk + j; // note the multiplier is chunk, not size
+                
+                if (i==rank) 
+                    sb[offset] = rank*100 + i*10 + j;
+                else 
+                    sb[offset] = 999;
+                    
+                rb[i*chunk + j] = 999;
+            }
+        }
+    */
+        
+        // this does printgrid("sb", rank, size, chunk, sb);
+        //added by sandler
+        printf("[processor %d] To send:\n", rank);
+        for (i=0; i<size; i++) {
+            for (j=0; j<chunk; j++) {
+                // note the multiplier is chunk, not size
+                printf("%03d ", sb[i*chunk+j]);       
+            }
+            printf("\n");
+        }
+        printf("\n");
+
+/*
+        // for another variation, could send out a bunch of characters, like
+        p r o c e s s o r 0 r o w 0
+        p r o c e s s o r 0 r o w 1
+        p r o c e s s o r 0 r o w 2
+        
+        then you'd get back
+        
+        p r o c e s s o r 0 r o w 0
+        p r o c e s s o r 1 r o w 0
+        p r o c e s s o r 2 r o w 0
+*/      
+        
+
+    status = MPI_Alltoall(sb, chunk, MPI_INT, rb, chunk, MPI_INT, MPI_COMM_WORLD);
+
+        // this does printgrid("rb", rank, size, chunk, rb);
+        // added by sandler
+        printf("[processor %d] Received:\n", rank);
+        for (i=0; i<size; i++) {
+            for (j=0; j<chunk; j++) {
+                printf("%03d ", rb[i*chunk+j]);       
+            }
+            printf("\n");
+        }
+        printf("\n");
+
+
+    MPI_Allreduce( &status, &gstatus, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD );
+    if (rank == 0) {
+        if (gstatus != 0) {
+            printf("all_to_all returned %d\n",gstatus);fflush(stdout);
+        }
+
+        
+   }
+
+
+
+    free(sb);
+    free(rb);
+    MPI_Finalize();
+    return(EXIT_SUCCESS);
+}
diff --git a/src/smpi/sample/bcast.c b/src/smpi/sample/bcast.c
new file mode 100644 (file)
index 0000000..2d84c13
--- /dev/null
@@ -0,0 +1,18 @@
+#include <stdio.h>
+#include <mpi.h>
+
+int main (int argc, char **argv) {
+  int size, rank;
+  int value = 3;
+  MPI_Init(&argc, &argv);
+  MPI_Comm_size(MPI_COMM_WORLD, &size);
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+  if (0 == rank) {
+    value = 17;
+  }
+  printf("node %d has value %d\n", rank, value);
+  MPI_Bcast(&value, 1, MPI_INT, 0, MPI_COMM_WORLD);
+  printf("node %d has value %d\n", rank, value);
+  MPI_Finalize();
+  return 0;
+}
diff --git a/src/smpi/sample/bcbench.c b/src/smpi/sample/bcbench.c
new file mode 100644 (file)
index 0000000..85d3656
--- /dev/null
@@ -0,0 +1,87 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <mpi.h>
+
+#define GETTIMEOFDAY_ERROR 1
+
+#define N_START 1
+#define N_STOP  1024*1024
+#define N_NEXT  (N*2)
+#define ITER    100
+#define ONE_MILLION 1000000.0
+#define RAND_SEED 842270
+
+int main(int argc, char* argv[]) {
+
+  int size, rank;
+  int N, I;
+  struct timeval *start_time, *stop_time;
+  double seconds;
+  int i, j, k;
+  char *buffer;
+  int check;
+
+  srandom(RAND_SEED);
+
+  MPI_Init(&argc, &argv);
+
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+  MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+  if (0 == rank) {
+    start_time = (struct timeval *)malloc(sizeof(struct timeval));
+    stop_time  = (struct timeval *)malloc(sizeof(struct timeval));
+  }
+
+  for (N = N_START ; N <= N_STOP ; N = N_NEXT) {
+
+    buffer = malloc(sizeof(char) * N);
+
+    if (0 == rank) {
+      for (j = 0; j < N; j++) {
+        buffer[j] = (char)(random() % 256);
+      }
+      if (-1 == gettimeofday(start_time, NULL)) {
+        printf("couldn't set start_time on node 0!\n");
+        MPI_Abort(MPI_COMM_WORLD, GETTIMEOFDAY_ERROR);
+        exit(EXIT_FAILURE);
+      }
+    }
+
+    for (i = 0; i < ITER; i++) {
+      MPI_Bcast(buffer, N, MPI_BYTE, 0, MPI_COMM_WORLD);
+      if (0 == rank) {
+        for (j = 1; j < size; j++) {
+          MPI_Recv(&check, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+        }
+      } else {
+        MPI_Send(&rank, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
+      }
+    }
+
+    if (0 == rank) {
+      if (-1 == gettimeofday(stop_time, NULL)) {
+        printf("couldn't set start_time on node 0!\n");
+        MPI_Abort(MPI_COMM_WORLD, GETTIMEOFDAY_ERROR);
+        exit(EXIT_FAILURE);
+      }
+      seconds = (double)(stop_time->tv_sec - start_time->tv_sec) + (double)(stop_time->tv_usec - start_time->tv_usec) / ONE_MILLION;
+    }
+
+    free(buffer);
+
+    if (0 == rank) {
+      printf("N: %10d, iter: %d, time: %10f s, avg rate: %12f Mbps\n", N, ITER, seconds, ((double)N * ITER * 8) / (1024.0 * 1024.0 * seconds));
+    }
+
+  }
+
+  if (0 == rank) {
+    free(start_time);
+    free(stop_time);
+  }
+
+  MPI_Finalize();
+
+  return 0;
+}
diff --git a/src/smpi/sample/first.c b/src/smpi/sample/first.c
new file mode 100644 (file)
index 0000000..d78aeac
--- /dev/null
@@ -0,0 +1,56 @@
+/* A first simple SPMD example program using MPI                  */
+
+/* The program consists of on receiver process and N-1 sender     */
+/* processes. The sender processes send a message consisting      */
+/* of their process identifier (id) and the total number of       */
+/* processes (ntasks) to the receiver. The receiver process       */
+/* prints out the values it receives in the messeges from the     */
+/* senders.                                                       */
+
+/* Compile the program with 'mpicc first.c -o first'              */
+/* To run the program, using four of the computers specified in   */
+/* your hostfile, do 'mpirun -machinefile hostfile -np 4 first    */
+
+#include <stdio.h>
+#include <mpi.h>
+main(int argc, char *argv[]) {
+  const int tag = 42;         /* Message tag */
+  int id, ntasks, source_id, dest_id, err, i;
+  MPI_Status status;
+  int msg[2];     /* Message array */
+  
+  err = MPI_Init(&argc, &argv); /* Initialize MPI */
+  if (err != MPI_SUCCESS) {
+    printf("MPI initialization failed!\n");
+    exit(1);
+  }
+  err = MPI_Comm_size(MPI_COMM_WORLD, &ntasks); /* Get nr of tasks */
+  err = MPI_Comm_rank(MPI_COMM_WORLD, &id); /* Get id of this process */
+  if (ntasks < 2) {
+    printf("You have to use at least 2 processors to run this program\n");
+    MPI_Finalize();      /* Quit if there is only one processor */
+    exit(0);
+  }
+  
+  if (id == 0) {    /* Process 0 (the receiver) does this */
+    for (i=1; i<ntasks; i++) {
+      err = MPI_Recv(msg, 2, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, \
+         &status);          /* Receive a message */
+      source_id = status.MPI_SOURCE;  /* Get id of sender */
+      printf("Received message %d %d from process %d\n", msg[0], msg[1], \
+       source_id);
+    }
+  }
+  else {      /* Processes 1 to N-1 (the senders) do this */
+    msg[0] = id;    /* Put own identifier in the message */
+    msg[1] = ntasks;          /* and total number of processes */
+    dest_id = 0;    /* Destination address */
+    sleep(3);
+    err = MPI_Send(msg, 2, MPI_INT, dest_id, tag, MPI_COMM_WORLD);
+  }
+  
+  err = MPI_Finalize();          /* Terminate MPI */
+  if (id==0) printf("Ready\n");
+  return 0;
+}
+
diff --git a/src/smpi/sample/matrix.c b/src/smpi/sample/matrix.c
new file mode 100644 (file)
index 0000000..4e10a8e
--- /dev/null
@@ -0,0 +1,254 @@
+/*
+ * Mark Stillwell
+ * ICS691: High Performance Computing
+ * Fall 2006
+ * Homework 3, Exercise 2, Step 1
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <libgen.h>
+#include <mpi.h>
+
+#define ITERATIONS         10
+#define STEPS              1
+#define STEP_SIZE          0
+
+#define USAGE_ERROR        1
+#define MALLOC_ERROR       2
+#define GETTIMEOFDAY_ERROR 3
+
+void * checked_malloc(int rank, const char * varname, size_t size) {
+  void * ptr;
+  ptr = malloc(size);
+  if (NULL == ptr) {
+    printf("node %d could not malloc memory for %s.\n", rank, varname);
+    MPI_Abort(MPI_COMM_WORLD, MALLOC_ERROR);
+    exit(MALLOC_ERROR);
+  }
+  return ptr;
+}
+
+int main(int argc, char* argv[]) {
+
+  // timing/system variables
+  int iteration, iterations = ITERATIONS;
+  int step, steps = STEPS, step_size = STEP_SIZE;
+  long usecs, total_usecs;
+  struct timeval *start_time, *stop_time;
+  char *program;
+
+  // mpi/communications variables
+  int rank;
+  int row, col;
+  MPI_Comm row_comm, col_comm;
+
+  // algorithm variables
+  int N_start, N, P;
+  int *A, *A_t, *B, *C, *D, *a, *b, *abuf, *bbuf;
+  int n, i, j, k, I, J;
+
+  MPI_Init(&argc, &argv);
+
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+
+  if (0 == rank) {
+    int size;
+    MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+    program = basename(argv[0]);
+
+    // root node parses cmdline args
+    /*
+    if (3 > argc || !isdigit(*argv[1]) || !isdigit(*argv[2])) {
+      printf("usage:\n%s <N> <P> [<iterations>]\n", program);
+      MPI_Abort(MPI_COMM_WORLD, USAGE_ERROR);
+      exit(USAGE_ERROR);
+    }
+    */
+
+    //N_start = atoi(argv[1]);
+    //P = atoi(argv[2]);
+    N_start = 100;
+    P = 2;
+
+    /*
+    if (4 <= argc && isdigit(*argv[3])) {
+      iterations = atoi(argv[3]);
+    }
+
+    if (5 <= argc && isdigit(*argv[4])) {
+      steps = atoi(argv[4]);
+    }
+
+    if (6 <= argc && isdigit(*argv[5])) {
+      step_size = atoi(argv[5]);
+    }
+    */
+
+    if (P*P != size) {
+      printf("P^2 must equal size.\n");
+      MPI_Abort(MPI_COMM_WORLD, USAGE_ERROR);
+      exit(USAGE_ERROR);
+    }
+
+    start_time = (struct timeval *)checked_malloc(rank, "start_time", sizeof(struct timeval));
+    stop_time  = (struct timeval *)checked_malloc(rank, "stop_time",  sizeof(struct timeval));
+
+  }
+
+  // send command line parameters except N, since it can vary
+  MPI_Bcast(&P, 1, MPI_INT, 0, MPI_COMM_WORLD);
+  MPI_Bcast(&iterations, 1, MPI_INT, 0, MPI_COMM_WORLD);
+  MPI_Bcast(&steps, 1, MPI_INT, 0, MPI_COMM_WORLD);
+  MPI_Bcast(&step_size, 1, MPI_INT, 0, MPI_COMM_WORLD);
+
+  row = rank / P;
+  col = rank % P;
+
+  // create row/column communicators
+  MPI_Comm_split(MPI_COMM_WORLD, row, col, &row_comm);
+  MPI_Comm_split(MPI_COMM_WORLD, col, row, &col_comm);
+
+  for (step = 0; step < steps; step++) {
+
+    total_usecs = 0;
+
+    if (0 == rank) {
+      N = N_start + step * step_size;
+      if ((N/P)*P != N) {
+        printf("P must divide N and %d does not divide %d.\n", P, N);
+        N = -1;
+      }
+    }
+
+    MPI_Bcast(&N, 1, MPI_INT, 0, MPI_COMM_WORLD);
+
+    // if root passes N = -1, skip this round
+    if (-1 == N) continue;
+
+    n = N / P;
+
+    // initialize matrix components
+    A   = (int *)checked_malloc(rank, "A",   n*n*sizeof(int));
+    A_t = (int *)checked_malloc(rank, "A_t", n*n*sizeof(int));
+    B   = (int *)checked_malloc(rank, "B",   n*n*sizeof(int));
+    C   = (int *)checked_malloc(rank, "C",   n*n*sizeof(int));
+    D   = (int *)checked_malloc(rank, "D",   n*n*sizeof(int));
+
+    for (i = 0; i < n; i++) {
+      for (j = 0; j < n; j++) {
+
+        I = n*row+i;
+        J = n*col+j;
+
+        A[n*i+j] = I+J;
+        B[n*i+j] = I;
+
+        // d is the check matrix
+        D[n*i+j] = 0;
+        for (k = 0; k < N; k++) {
+          // A[I,k] = I+k
+          // B[k,J] = k
+          D[n*i+j] += (I+k) * k;
+        }
+
+      }
+    }
+
+    // buffers
+    abuf = (int *)checked_malloc(rank, "abuf", n*sizeof(int));
+    bbuf = (int *)checked_malloc(rank, "bbuf", n*sizeof(int));
+
+    for (iteration = 0; iteration < iterations; iteration++) {
+
+      for (i = 0; i < n*n; i++) {
+        C[i] = 0;
+      }
+
+      // node zero sets start time
+      if (0 == rank && -1 == gettimeofday(start_time, NULL)) {
+        printf("couldn't set start_time on node 0!\n");
+        MPI_Abort(MPI_COMM_WORLD, GETTIMEOFDAY_ERROR);
+        exit(GETTIMEOFDAY_ERROR);
+      }
+
+      // populate transpose of A
+      for (i = 0; i < n; i++) {
+        for (j = 0; j < n; j++) {
+          A_t[n*i+j] = A[n*j+i];
+        }
+      }
+
+      // perform calculations
+      for (k = 0; k < N; k++) {
+
+        if (k/n == col) {
+          a = A_t + n*(k%n);
+        } else {
+          a = abuf;
+        }
+
+        if (k/n == row) {
+          b = B + n*(k%n);
+        } else {
+          b = bbuf;
+        }
+
+        MPI_Bcast(a, n, MPI_INT, k/n, row_comm);
+        MPI_Bcast(b, n, MPI_INT, k/n, col_comm);
+
+        for (i = 0; i < n; i++) {
+          for (j = 0; j < n; j++) {
+            C[n*i+j] += a[i] * b[j];
+          }
+        }
+
+      } // for k
+
+      // node zero sets stop time
+      if (0 == rank && -1 == gettimeofday(stop_time, NULL)) {
+        printf("couldn't set stop_time on node 0!\n");
+        MPI_Abort(MPI_COMM_WORLD, GETTIMEOFDAY_ERROR);
+        exit(GETTIMEOFDAY_ERROR);
+      }
+
+      // check calculation
+      for (i = 0; i < n*n && C[i] == D[i]; i++);
+      j = (n*n == i);
+      MPI_Reduce(&j, &k, 1, MPI_INT, MPI_LAND, 0, MPI_COMM_WORLD);
+
+      // node zero prints stats
+      if (0 == rank) {
+        usecs = (stop_time->tv_sec*1000000+stop_time->tv_usec) - (start_time->tv_sec*1000000+start_time->tv_usec);
+        printf("prog: %s, N: %d, P: %d, procs: %d, time: %d us, check: %d\n", program, N, P, P*P, usecs, k);
+        total_usecs += usecs;
+      }
+
+    }
+
+    // node 0 prints final stats
+    if (0 == rank) {
+      printf("prog: %s, N: %d, P: %d, procs: %d, iterations: %d, avg. time: %d us\n",
+          program, N, P, P*P, iterations, total_usecs / iterations);
+    }
+
+    // free data structures
+    free(A);
+    free(A_t);
+    free(B);
+    free(C);
+    free(D);
+    free(abuf);
+    free(bbuf);
+
+  }
+
+  if (0 == rank) {
+    free(start_time);
+    free(stop_time);
+  }
+
+  MPI_Finalize();
+
+  return 0;
+}
diff --git a/src/smpi/sample/mvmul.c b/src/smpi/sample/mvmul.c
new file mode 100644 (file)
index 0000000..aa32e30
--- /dev/null
@@ -0,0 +1,218 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <mpi.h>
+
+#define ITERATIONS         10
+
+#define USAGE_ERROR        1
+#define SANITY_ERROR       2
+#define GETTIMEOFDAY_ERROR 3
+
+int main(int argc, char* argv[]) {
+
+  int size, rank;
+  int N, n, i, j, k, current_iteration, successful_iterations = 0;
+  double *matrix, *vector, *vcalc, *vcheck;
+  MPI_Status status;
+  struct timeval *start_time, *stop_time;
+  long parallel_usecs, parallel_usecs_total = 0, sequential_usecs, sequential_usecs_total = 0;
+
+  MPI_Init(&argc, &argv);
+
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+  MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+  if (0 == rank) {
+
+    // root node parses cmdline args
+    if (2 > argc || !isdigit(*argv[1])) {
+      printf("usage:\n%s <size>\n", argv[0]);
+      MPI_Abort(MPI_COMM_WORLD, USAGE_ERROR);
+      exit(USAGE_ERROR);
+    }
+
+    N = atoi(argv[1]);
+
+    start_time     = (struct timeval *)malloc(sizeof(struct timeval));
+    stop_time      = (struct timeval *)malloc(sizeof(struct timeval));
+
+  }
+
+  for(current_iteration = 0; current_iteration < ITERATIONS; current_iteration++) {
+
+    if (0 == rank) {
+
+      matrix         = (double *)malloc(N*N*sizeof(double));
+      vector         = (double *)malloc(N*sizeof(double));
+
+      for(i = 0; i < N*N; i++) {
+        matrix[i] = (double)rand()/((double)RAND_MAX + 1);
+      }
+
+      for(i = 0; i < N; i++) {
+        vector[i] = (double)rand()/((double)RAND_MAX + 1);
+      }
+
+      // for the sake of argument, the parallel algorithm begins
+      // when the root node begins to transmit the matrix to the
+      // workers.
+      if (-1 == gettimeofday(start_time, NULL)) {
+        printf("couldn't set start_time on node 0!\n");
+        MPI_Abort(MPI_COMM_WORLD, GETTIMEOFDAY_ERROR);
+        exit(GETTIMEOFDAY_ERROR);
+      }
+
+      for(i = 1; i < size; i++) {
+        MPI_Send(&N, 1, MPI_INT, i, 0, MPI_COMM_WORLD);
+      }
+
+    } else {
+      MPI_Recv(&N, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);
+    }
+
+    // this algorithm uses at most N processors...
+    if (rank < N) {
+
+      if (size > N) size = N;
+      n = N / size + ((rank < (N % size)) ? 1 : 0);
+
+      if (0 == rank) {
+
+        for(i = 1, j = n; i < size && j < N; i++, j+=k) {
+          k = N / size + ((i < (N % size)) ? 1 : 0);
+          MPI_Send(matrix+N*j, N*k, MPI_DOUBLE, i, 0, MPI_COMM_WORLD);
+          MPI_Send(vector, N, MPI_DOUBLE, i, 0, MPI_COMM_WORLD);
+        }
+
+        // sanity check
+        #ifdef DEBUG
+        if(i != size || j != N) {
+          printf("index calc error: i = %d, size = %d, j = %d, N = %d\n", i, size, j, N);
+          MPI_Abort(MPI_COMM_WORLD, SANITY_ERROR);
+          exit(SANITY_ERROR);
+        }
+        #endif
+
+        vcalc = (double *)malloc(N*sizeof(double));
+
+      } else {
+
+        matrix = (double *)malloc(N*n*sizeof(double));
+        vector = (double *)malloc(N*sizeof(double));
+        vcalc  = (double *)malloc(n*sizeof(double));
+
+        MPI_Recv(matrix, N*n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &status);
+        MPI_Recv(vector, N, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &status);
+
+      }
+
+      for(i = 0; i < n; i++) {
+        vcalc[i] = 0.0;
+        for(j = 0; j < N; j++) {
+          vcalc[i] += matrix[N*i+j] * vector[j];
+        }
+      }
+
+      if (0 != rank) {
+        MPI_Send(vcalc, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
+      } else {
+
+        for(i = 1, j = n; i < size && j < N; i++, j+=k) {
+          k = N / size + ((i < (N % size)) ? 1 : 0);
+          MPI_Recv(vcalc+j, k, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, &status);
+        }
+
+        // sanity check
+        #ifdef DEBUG
+        if(i != size || j != N) {
+          printf("index calc error 2: i = %d, size = %d, j = %d, N = %d\n", i, size, j, N);
+          MPI_Abort(MPI_COMM_WORLD, SANITY_ERROR);
+          exit(SANITY_ERROR);
+        }
+        #endif
+
+        if (-1 == gettimeofday(stop_time, NULL)) {
+          printf("couldn't set stop_time on node 0!\n");
+          MPI_Abort(MPI_COMM_WORLD, GETTIMEOFDAY_ERROR);
+          exit(GETTIMEOFDAY_ERROR);
+        }
+
+        parallel_usecs = (stop_time->tv_sec*1000000+stop_time->tv_usec) - (start_time->tv_sec*1000000+start_time->tv_usec);
+
+        if (-1 == gettimeofday(start_time, NULL)) {
+          printf("couldn't set start_time on node 0!\n");
+          MPI_Abort(MPI_COMM_WORLD, GETTIMEOFDAY_ERROR);
+          exit(GETTIMEOFDAY_ERROR);
+        }
+
+        // calculate serially
+        vcheck = (double *)malloc(N*sizeof(double));
+        for(i = 0; i < N; i++) {
+          vcheck[i] = 0.0;
+          for(j = 0; j < N; j++) {
+            vcheck[i] += matrix[N*i+j] * vector[j];
+          }
+        }
+
+        if (-1 == gettimeofday(stop_time, NULL)) {
+          printf("couldn't set stop_time on node 0!\n");
+          MPI_Abort(MPI_COMM_WORLD, GETTIMEOFDAY_ERROR);
+          exit(GETTIMEOFDAY_ERROR);
+        }
+
+        sequential_usecs = (stop_time->tv_sec*1000000+stop_time->tv_usec) - (start_time->tv_sec*1000000+start_time->tv_usec);
+
+        // verify correctness
+        for(i = 0; i < N && vcalc[i] == vcheck[i]; i++);
+        
+        printf("prog: blocking, i: %d ", current_iteration);
+
+        if (i == N) {
+          printf("ptime: %d us, stime: %d us, speedup: %.3f, nodes: %d, efficiency: %.3f\n",
+              parallel_usecs,
+              sequential_usecs,
+              (double)sequential_usecs / (double)parallel_usecs,
+              size,
+              (double)sequential_usecs / ((double)parallel_usecs * (double)size));
+
+          parallel_usecs_total += parallel_usecs;
+          sequential_usecs_total += sequential_usecs;
+          successful_iterations++;
+        } else {
+          printf("parallel calc != serial calc, ");
+        }
+
+        free(vcheck);
+
+      }
+
+      free(matrix);
+      free(vector);
+      free(vcalc);
+    }
+
+  }
+
+  if(0 == rank) {
+    printf("prog: blocking, ");
+    if(0 < successful_iterations) {
+      printf("iterations: %d, avg. ptime: %d us, avg. stime: %d us, avg. speedup: %.3f, nodes: %d, avg. efficiency: %.3f\n",
+          successful_iterations,
+          parallel_usecs_total / successful_iterations,
+          sequential_usecs_total / successful_iterations,
+          (double)sequential_usecs_total / (double)parallel_usecs_total,
+          size,
+          (double)sequential_usecs_total / ((double)parallel_usecs_total * (double)size));
+    } else {
+      printf("no successful iterations!\n");
+    }
+
+    free(start_time);
+    free(stop_time);
+
+  }
+
+  MPI_Finalize();
+
+  return 0;
+}
diff --git a/src/smpi/sample/reduce.c b/src/smpi/sample/reduce.c
new file mode 100644 (file)
index 0000000..9fc0be2
--- /dev/null
@@ -0,0 +1,31 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <mpi.h>
+
+int main(int argc, char *argv[]) {
+  int rank, size;
+  int i;
+  int *sendbuf, *recvbuf;
+  MPI_Init(&argc, &argv);
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+  MPI_Comm_size(MPI_COMM_WORLD, &size);
+  sendbuf = malloc(sizeof(int) * size);
+  recvbuf = malloc(sizeof(int) * size);
+  for (i = 0; i < size; i++) {
+    sendbuf[i] = 0;
+    recvbuf[i] = 0;
+  }
+  sendbuf[rank] = rank + 1;
+  MPI_Reduce(sendbuf, recvbuf, size, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
+  if (0 == rank) {
+    printf("nodes: ", rank);
+    for (i = 0; i < size; i++) {
+      printf("%d ", recvbuf[i]);
+    }
+    printf("\n");
+  }
+  free(sendbuf);
+  free(recvbuf);
+  MPI_Finalize();
+  return 0;
+}
diff --git a/src/smpi/sample/ring_c.c b/src/smpi/sample/ring_c.c
new file mode 100644 (file)
index 0000000..c301ab3
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
+ *                         University Research and Technology
+ *                         Corporation.  All rights reserved.
+ * Copyright (c) 2006      Cisco Systems, Inc.  All rights reserved.
+ *
+ * Simple ring test program
+ */
+
+#include <stdio.h>
+#include "mpi.h"
+
+int main(int argc, char *argv[])
+{
+    int rank, size, next, prev, message, tag = 201;
+
+    /* Start up MPI */
+
+    MPI_Init(&argc, &argv);
+    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+    MPI_Comm_size(MPI_COMM_WORLD, &size);
+    /* Calculate the rank of the next process in the ring.  Use the
+       modulus operator so that the last process "wraps around" to
+       rank zero. */
+
+    next = (rank + 1) % size;
+    prev = (rank + size - 1) % size;
+
+    /* If we are the "master" process (i.e., MPI_COMM_WORLD rank 0),
+       put the number of times to go around the ring in the
+       message. */
+
+    if (0 == rank) {
+        message = 10;
+
+        printf("Process 0 sending %d to %d, tag %d (%d processes in ring)\n", 
+               message, next, tag, size);
+        MPI_Send(&message, 1, MPI_INT, next, tag, MPI_COMM_WORLD); 
+        printf("Process 0 sent to %d\n", next);
+    }
+
+    /* Pass the message around the ring.  The exit mechanism works as
+       follows: the message (a positive integer) is passed around the
+       ring.  Each time it passes rank 0, it is decremented.  When
+       each processes receives a message containing a 0 value, it
+       passes the message on to the next process and then quits.  By
+       passing the 0 message first, every process gets the 0 message
+       and can quit normally. */
+
+    sleep(3);
+
+    while (1) {
+        MPI_Recv(&message, 1, MPI_INT, prev, tag, MPI_COMM_WORLD, 
+                 MPI_STATUS_IGNORE);
+
+        if (0 == rank) {
+            --message;
+            printf("Process 0 decremented value: %d\n", message);
+        }
+
+        MPI_Send(&message, 1, MPI_INT, next, tag, MPI_COMM_WORLD);
+        if (0 == message) {
+            printf("Process %d exiting\n", rank);
+            break;
+        }
+    }
+
+    /* The last process does one extra send to process 0, which needs
+       to be received before the program can exit */
+
+    if (0 == rank) {
+        MPI_Recv(&message, 1, MPI_INT, prev, tag, MPI_COMM_WORLD,
+                 MPI_STATUS_IGNORE);
+    }
+    
+    /* All done */
+
+    MPI_Finalize();
+    return 0;
+}
diff --git a/src/smpi/scripts/smpicc b/src/smpi/scripts/smpicc
new file mode 100755 (executable)
index 0000000..87700d6
--- /dev/null
@@ -0,0 +1,80 @@
+#!/bin/sh
+#FIXME: .. paths...
+SIMGRID_INCLUDE="${SIMGRID_HOME}/include"
+SIMGRID_LIB="${SIMGRID_HOME}/lib"
+CC="gcc"
+
+SMPI_INCLUDE="${SMPI_HOME}/include"
+SMPI_LIB="${SMPI_HOME}/lib"
+SEED="221238"
+
+TMPDIR="$(mktemp -d tmpXXXXXXX)"
+
+function modsource {
+  SOURCE="$1"
+  SOURCEFILE="$(basename ${SOURCE})"
+  SOURCEDIR="${SOURCE%${SOURCEFILE}}"
+  if [ -n "${SOURCEDIR}" ]; then
+    mkdir -p ${TMPDIR}${SOURCEDIR}
+  fi
+  TMPSOURCE="${TMPDIR}${SOURCE}"
+  cat > ${TMPSOURCE} <<HEADER
+#define SEED ${SEED}
+#include "smpi.h"
+#include "msg/msg.h"
+#include "xbt/sysdep.h"
+#include "xbt/log.h"
+#include "xbt/asserts.h"
+#define sleep(x) smpi_sleep(x)
+#define gettimeofday(x, y) smpi_gettimeofday(x, y)
+HEADER
+  # very simplistic transform, will probably want full parser for next version
+  grep -v "mpi.h" < ${SOURCE} | perl -pe 's/main/smpi_main/;' >> ${TMPSOURCE}
+  grep -q "smpi_main" ${TMPSOURCE}
+  if [ $? -eq 0 ]; then
+    cat >> ${TMPSOURCE} <<FOOTER
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_simulation, "Messages specific to this simulation");
+int main(int argc, char *argv[]) {
+  MSG_error_t result;
+  srand(SEED);
+  MSG_global_init(&argc, argv);
+  MSG_set_channel_number(MAX_CHANNEL);
+  MSG_function_register("smpi_main",     smpi_main);
+  MSG_function_register("smpi_sender",   smpi_sender);
+  MSG_function_register("smpi_receiver", smpi_receiver);
+  MSG_create_environment(argv[1]);
+  MSG_launch_application(argv[2]);
+  result = MSG_main();
+  INFO1("simulation time %g", MSG_get_clock());
+  MSG_clean();
+  return (MSG_OK != result);
+}
+FOOTER
+  fi
+}
+
+INCLUDEARGS=""
+LINKARGS="-L${SMPI_LIB} -lsmpi -L${SIMGRID_LIB} -lsimgrid"
+
+CMDLINE=""
+while [ -n "$1" ]; do
+  ARG="$1"
+  shift
+  if [ "${ARG}" = "-c" ]; then
+      LINKARGS=""
+      CMDLINE="${CMDLINE} -c "
+  elif [ "${ARG%.c}" != "${ARG}" ]; then
+    INCLUDEARGS="-I${SMPI_INCLUDE} -I${SIMGRID_INCLUDE} "
+    SRCFILE="$(realpath ${ARG})"
+    modsource ${SRCFILE}
+    CMDLINE="${CMDLINE} ${TMPDIR}${SRCFILE} "
+  else
+    CMDLINE="${CMDLINE} ${ARG} "
+  fi
+done
+
+CMDLINE="${CC} ${INCLUDEARGS}${CMDLINE}${LINKARGS}"
+
+#echo "${CMDLINE}"
+${CMDLINE}
+rm -rf ${TMPDIR}
diff --git a/src/smpi/scripts/smpirun b/src/smpi/scripts/smpirun
new file mode 100755 (executable)
index 0000000..69abe80
--- /dev/null
@@ -0,0 +1,98 @@
+#!/bin/sh
+DEFAULT_LOOPBACK_BANDWIDTH="498000000"
+DEFAULT_LOOPBACK_LATENCY="0.000004"
+DEFAULT_NETWORK_BANDWIDTH="$((26 * 1024 * 1024))"
+DEFAULT_NETWORK_LATENCY="0.000005"
+DEFAULT_NUMPROCS="4"
+DEFAULT_POWER="100"
+
+LOOPBACK_BANDWIDTH="${DEFAULT_LOOPBACK_BANDWIDTH}"
+LOOPBACK_LATENCY="${DEFAULT_LOOPBACK_LATENCY}"
+NETWORK_BANDWIDTH="${DEFAULT_NETWORK_BANDWIDTH}"
+NETWORK_LATENCY="${DEFAULT_NETWORK_LATENCY}"
+NUMPROCS="${DEFAULT_NUMPROCS}"
+POWER="${DEFAULT_POWER}"
+
+while true; do
+  case "$1" in
+   "-np")
+      NUMPROCS="$2"
+      shift 2
+    ;;
+   "-bandwidth")
+      NETWORK_BANDWIDTH="$2"
+      shift 2
+    ;;
+   "-latency")
+      NETWORK_LATENCY="$2"
+      shift 2
+    ;;
+    *)
+      break
+    ;;
+  esac
+done
+
+EXEC="$1"
+shift
+
+PLATFORMTMP="$(mktemp tmpXXXXXX)"
+#PLATFORMTMP="pla.xml"
+
+cat > ${PLATFORMTMP} <<PLATFORMHEAD
+<?xml version='1.0'?>
+<!DOCTYPE platform_description SYSTEM "surfxml.dtd">
+<platform_description version="1">
+PLATFORMHEAD
+
+for (( i=${NUMPROCS}; $i ; i=$i-1 )) do
+  echo "  <cpu name=\"host$i\" power=\"${POWER}\"/>" >> ${PLATFORMTMP}
+  echo "  <network_link name=\"loop$i\" bandwidth=\"${LOOPBACK_BANDWIDTH}\" latency=\"${LOOPBACK_LATENCY}\"/>" >> ${PLATFORMTMP}
+  echo "  <network_link name=\"link$i\" bandwidth=\"${NETWORK_BANDWIDTH}\" latency=\"${NETWORK_LATENCY}\"/>" >> ${PLATFORMTMP}
+done
+
+for (( i=${NUMPROCS}; $i ; i=$i-1 )) do
+  for (( j=${NUMPROCS}; $j ; j=$j-1 )) do
+    if [ $i -eq $j ]; then
+      echo "  <route src=\"host$i\" dst=\"host$j\"><route_element name=\"loop$i\"/></route>" >> ${PLATFORMTMP}
+    else
+      echo "  <route src=\"host$i\" dst=\"host$j\"><route_element name=\"link$i\"/><route_element name=\"link$j\"/></route>" >> ${PLATFORMTMP}
+    fi
+  done
+done
+
+cat >> ${PLATFORMTMP} <<PLATFORMFOOT
+</platform_description>
+PLATFORMFOOT
+
+APPLICATIONTMP="$(mktemp tmpXXXXXX)"
+#APPLICATIONTMP="app.xml"
+
+cat > ${APPLICATIONTMP} <<APPLICATIONHEAD
+<?xml version='1.0'?>
+<!DOCTYPE platform_description SYSTEM "surfxml.dtd">
+<platform_description version="1">
+APPLICATIONHEAD
+
+for (( i=${NUMPROCS}; $i ; i=$i-1 )) do
+  echo "  <process host=\"host$i\" function=\"smpi_main\">" >> ${APPLICATIONTMP}
+  for ARG in $*; do
+    echo "    <argument value=\"${ARG}\"/>" >> ${APPLICATIONTMP}
+  done
+  echo "  </process>" >> ${APPLICATIONTMP}
+done
+
+for (( i=${NUMPROCS}; $i ; i=$i-1 )) do
+  echo "  <process host=\"host$i\" function=\"smpi_sender\"/>" >> ${APPLICATIONTMP}
+done
+
+for (( i=${NUMPROCS}; $i ; i=$i-1 )) do
+  echo "  <process host=\"host$i\" function=\"smpi_receiver\"/>" >> ${APPLICATIONTMP}
+done
+
+cat >> ${APPLICATIONTMP} <<APPLICATIONFOOT
+</platform_description>
+APPLICATIONFOOT
+
+${EXEC} ${PLATFORMTMP} ${APPLICATIONTMP}
+rm ${PLATFORMTMP} ${APPLICATIONTMP}
diff --git a/src/smpi/src/smpi_base.c b/src/smpi/src/smpi_base.c
new file mode 100644 (file)
index 0000000..02bb9a8
--- /dev/null
@@ -0,0 +1,557 @@
+#include <stdio.h>
+#include <sys/time.h>
+#include "msg/msg.h"
+#include "xbt/sysdep.h"
+#include "xbt/xbt_portability.h"
+#include "smpi.h"
+
+smpi_mpi_request_t **smpi_pending_send_requests      = NULL;
+smpi_mpi_request_t **smpi_last_pending_send_requests = NULL;
+
+smpi_mpi_request_t **smpi_pending_recv_requests      = NULL;
+smpi_mpi_request_t **smpi_last_pending_recv_requests = NULL;
+
+smpi_received_t **smpi_received                      = NULL;
+smpi_received_t **smpi_last_received                 = NULL;
+
+m_process_t *smpi_sender_processes                   = NULL;
+m_process_t *smpi_receiver_processes                 = NULL;
+
+int smpi_running_hosts = 0;
+
+smpi_mpi_communicator_t smpi_mpi_comm_world;
+
+smpi_mpi_status_t smpi_mpi_status_ignore;
+
+smpi_mpi_datatype_t smpi_mpi_byte;
+smpi_mpi_datatype_t smpi_mpi_int;
+smpi_mpi_datatype_t smpi_mpi_double;
+
+smpi_mpi_op_t smpi_mpi_land;
+smpi_mpi_op_t smpi_mpi_sum;
+
+static xbt_os_timer_t smpi_timer;
+static int smpi_benchmarking;
+static double smpi_reference;
+
+void smpi_mpi_land_func(void *x, void *y, void *z) {
+  *(int *)z = *(int *)x && *(int *)y;
+}
+
+void smpi_mpi_sum_func(void *x, void *y, void *z) {
+  *(int *)z = *(int *)x + *(int *)y;
+}
+
+void smpi_mpi_init() {
+  int i;
+  int size, rank;
+  m_host_t *hosts;
+  m_host_t host;
+  double duration;
+  m_task_t mtask;
+
+  // will eventually need mutex
+  smpi_running_hosts++;
+
+  // initialize some local variables
+  size  = MSG_get_host_number();
+  host  = MSG_host_self();
+  hosts = MSG_get_host_table();
+  for(i = 0; i < size && host != hosts[i]; i++);
+  rank  = i;
+
+  // node 0 sets the globals
+  if (0 == rank) {
+
+    // global communicator
+    smpi_mpi_comm_world.id           = 0;
+    smpi_mpi_comm_world.size         = size;
+    smpi_mpi_comm_world.barrier      = 0;
+    smpi_mpi_comm_world.hosts        = hosts;
+    smpi_mpi_comm_world.processes    = xbt_malloc(sizeof(m_process_t) * size);
+    smpi_mpi_comm_world.processes[0] = MSG_process_self();
+
+    // mpi datatypes
+    smpi_mpi_byte.size               = (size_t)1;
+    smpi_mpi_int.size                = sizeof(int);
+    smpi_mpi_double.size             = sizeof(double);
+
+    // mpi operations
+    smpi_mpi_land.func               = &smpi_mpi_land_func;
+    smpi_mpi_sum.func                = &smpi_mpi_sum_func;
+
+    // smpi globals
+    smpi_pending_send_requests       = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
+    smpi_last_pending_send_requests  = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
+    smpi_pending_recv_requests       = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
+    smpi_last_pending_recv_requests  = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
+    smpi_received                    = xbt_malloc(sizeof(smpi_received_t*) * size);
+    smpi_last_received               = xbt_malloc(sizeof(smpi_received_t*) * size);
+    smpi_sender_processes            = xbt_malloc(sizeof(m_process_t) * size);
+    smpi_receiver_processes          = xbt_malloc(sizeof(m_process_t) * size);
+    for(i = 0; i < size; i++) {
+      smpi_pending_send_requests[i]      = NULL;
+      smpi_last_pending_send_requests[i] = NULL;
+      smpi_pending_recv_requests[i]      = NULL;
+      smpi_last_pending_recv_requests[i] = NULL;
+      smpi_received[i]                   = NULL;
+      smpi_last_received[i]              = NULL;
+    }
+    smpi_timer                      = xbt_os_timer_new();
+    smpi_reference                  = DEFAULT_POWER;
+    smpi_benchmarking               = 0;
+
+    // tell send/recv nodes to begin
+    for(i = 0; i < size; i++) {
+      mtask = MSG_task_create("READY", 0, 0, NULL);
+      MSG_task_put(mtask, hosts[i], SEND_SYNC_PORT);
+      mtask = (m_task_t)0;
+      MSG_task_get_from_host(&mtask, SEND_SYNC_PORT, hosts[i]);
+      MSG_task_destroy(mtask);
+      mtask = MSG_task_create("READY", 0, 0, NULL);
+      MSG_task_put(mtask, hosts[i], RECV_SYNC_PORT);
+      mtask = (m_task_t)0;
+      MSG_task_get_from_host(&mtask, RECV_SYNC_PORT, hosts[i]);
+      MSG_task_destroy(mtask);
+    }
+
+    // now everyone else
+    for(i = 1; i < size; i++) {
+      mtask = MSG_task_create("READY", 0, 0, NULL);
+      MSG_task_put(mtask, hosts[i], MPI_PORT);
+    }
+
+  } else {
+    // everyone needs to wait for node 0 to finish
+    mtask = (m_task_t)0;
+    MSG_task_get(&mtask, MPI_PORT);
+    MSG_task_destroy(mtask);
+    smpi_mpi_comm_world.processes[rank] = MSG_process_self();
+  }
+
+  // now that mpi_comm_world_processes is set, it's safe to set a barrier
+  smpi_barrier(&smpi_mpi_comm_world);
+}
+
+void smpi_mpi_finalize() {
+  int i;
+  smpi_running_hosts--;
+  if (0 <= smpi_running_hosts) {
+    for(i = 0; i < smpi_mpi_comm_world.size; i++) {
+      if(MSG_process_is_suspended(smpi_sender_processes[i])) {
+        MSG_process_resume(smpi_sender_processes[i]);
+      }
+      if(MSG_process_is_suspended(smpi_receiver_processes[i])) {
+        MSG_process_resume(smpi_receiver_processes[i]);
+      }
+    }
+  } else {
+    xbt_free(smpi_mpi_comm_world.processes);
+    xbt_free(smpi_pending_send_requests);
+    xbt_free(smpi_last_pending_send_requests);
+    xbt_free(smpi_pending_recv_requests);
+    xbt_free(smpi_last_pending_recv_requests);
+    xbt_free(smpi_received);
+    xbt_free(smpi_last_received);
+    xbt_free(smpi_sender_processes);
+    xbt_free(smpi_receiver_processes);
+    xbt_os_timer_free(smpi_timer);
+  }
+}
+
+void smpi_complete(smpi_mpi_request_t *request) {
+  smpi_waitlist_node_t *current, *next;
+  request->completed = 1;
+  request->next      = NULL;
+  current = request->waitlist;
+  while(NULL != current) {
+    if(MSG_process_is_suspended(current->process)) {
+      MSG_process_resume(current->process);
+    }
+    next = current->next;
+    xbt_free(current);
+    current = next;
+  }
+  request->waitlist  = NULL;
+}
+
+int smpi_host_rank_self() {
+  return smpi_comm_rank(&smpi_mpi_comm_world, MSG_host_self());
+}
+
+void smpi_isend(smpi_mpi_request_t *sendreq) {
+  int rank = smpi_host_rank_self();
+  if (NULL == smpi_last_pending_send_requests[rank]) {
+    smpi_pending_send_requests[rank] = sendreq;
+  } else {
+    smpi_last_pending_send_requests[rank]->next = sendreq;
+  }
+  smpi_last_pending_send_requests[rank] = sendreq;
+  if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
+    MSG_process_resume(smpi_sender_processes[rank]);
+  }
+}
+
+void smpi_match_requests(int rank) {
+  smpi_mpi_request_t *frequest, *prequest, *crequest;
+  smpi_received_t *freceived, *preceived, *creceived;
+  size_t dsize;
+  short int match;
+  frequest  = smpi_pending_recv_requests[rank];
+  prequest  = NULL;
+  crequest  = frequest;
+  while(NULL != crequest) {
+    freceived = smpi_received[rank];
+    preceived = NULL;
+    creceived = freceived;
+    match     = 0;
+    while(NULL != creceived && !match) {
+      if(crequest->comm->id == creceived->commid && 
+        (MPI_ANY_SOURCE == crequest->src || crequest->src == creceived->src) && 
+        crequest->tag == creceived->tag) {
+
+        // we have a match!
+        match = 1;
+
+        // pull the request from the queue
+        if(NULL == prequest) {
+          frequest = crequest->next;
+          smpi_pending_recv_requests[rank] = frequest;
+        } else {
+          prequest->next = crequest->next;
+        }
+        if(crequest == smpi_last_pending_recv_requests[rank]) {
+          smpi_last_pending_recv_requests[rank] = prequest;
+        }
+
+        // pull the received data from the queue
+        if(NULL == preceived) {
+          freceived = creceived->next;
+          smpi_received[rank] = freceived;
+        } else {
+          preceived->next = creceived->next;
+        }
+        if(creceived == smpi_last_received[rank]) {
+          smpi_last_received[rank] = preceived;
+        }
+
+        // for when request->src is any source
+        crequest->src = creceived->src;
+
+        // calculate data size
+        dsize = crequest->count * crequest->datatype->size;
+
+        // copy data to buffer
+        memcpy(crequest->buf, creceived->data, dsize);
+
+        // fwd through
+        crequest->fwdthrough = creceived->fwdthrough;
+
+        // get rid of received data node, no longer needed
+        xbt_free(creceived->data);
+        xbt_free(creceived);
+
+        if (crequest->fwdthrough == rank) {
+          smpi_complete(crequest);
+        } else {
+          crequest->src = rank;
+          crequest->dst = (rank + 1) % crequest->comm->size;
+          smpi_isend(crequest);
+        }
+
+      } else {
+        preceived = creceived;
+        creceived = creceived->next;
+      }
+    }
+    prequest = crequest;
+    crequest = crequest->next;
+  }
+}
+
+void smpi_bench_begin() {
+  xbt_assert0(!smpi_benchmarking, "Already benchmarking");
+  smpi_benchmarking = 1;
+  xbt_os_timer_start(smpi_timer);
+  return;
+}
+
+void smpi_bench_end() {
+  m_task_t ctask = NULL;
+  double duration;
+  xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
+  smpi_benchmarking = 0;
+  xbt_os_timer_stop(smpi_timer);
+  duration = xbt_os_timer_elapsed(smpi_timer);
+  ctask = MSG_task_create("computation", duration * smpi_reference, 0 , NULL);
+  MSG_task_execute(ctask);
+  MSG_task_destroy(ctask);
+  return;
+}
+
+int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
+  int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request) {
+  int retval = MPI_SUCCESS;
+  *request = NULL;
+  if (NULL == buf && 0 < count) {
+    retval = MPI_ERR_INTERN;
+  } else if (0 > count) {
+    retval = MPI_ERR_COUNT;
+  } else if (NULL == datatype) {
+    retval = MPI_ERR_TYPE;
+  } else if (NULL == comm) {
+    retval = MPI_ERR_COMM;
+  } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
+    retval = MPI_ERR_RANK;
+  } else if (0 > dst || comm->size <= dst) {
+    retval = MPI_ERR_RANK;
+  } else if (0 > tag) {
+    retval = MPI_ERR_TAG;
+  } else {
+    *request = xbt_malloc(sizeof(smpi_mpi_request_t));
+    (*request)->buf        = buf;
+    (*request)->count      = count;
+    (*request)->datatype   = datatype;
+    (*request)->src        = src;
+    (*request)->dst        = dst;
+    (*request)->tag        = tag;
+    (*request)->comm       = comm;
+    (*request)->completed  = 0;
+    (*request)->fwdthrough = dst;
+    (*request)->waitlist   = NULL;
+    (*request)->next       = NULL;
+  }
+  return retval;
+}
+
+void smpi_barrier(smpi_mpi_communicator_t *comm) {
+  int i;
+  comm->barrier++;
+  if(comm->barrier < comm->size) {
+    MSG_process_suspend(MSG_process_self());
+  } else {
+    comm->barrier = 0;
+    for(i = 0; i < comm->size; i++) {
+      if (MSG_process_is_suspended(comm->processes[i])) {
+        MSG_process_resume(comm->processes[i]);
+      }
+    }
+  }
+}
+
+int smpi_comm_rank(smpi_mpi_communicator_t *comm, m_host_t host) {
+  int i;
+  for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
+  if (i >= comm->size) i = -1;
+  return i;
+}
+
+void smpi_irecv(smpi_mpi_request_t *recvreq) {
+  int rank = smpi_host_rank_self();
+  if (NULL == smpi_pending_recv_requests[rank]) {
+    smpi_pending_recv_requests[rank] = recvreq;
+  } else if (NULL != smpi_last_pending_recv_requests[rank]) {
+    smpi_last_pending_recv_requests[rank]->next = recvreq;
+  } else { // can't happen!
+    fprintf(stderr, "smpi_pending_recv_requests not null while smpi_last_pending_recv_requests null!\n");
+  }
+  smpi_last_pending_recv_requests[rank] = recvreq;
+  smpi_match_requests(rank);
+  if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
+    MSG_process_resume(smpi_receiver_processes[rank]);
+  }
+}
+
+void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status) {
+  smpi_waitlist_node_t *waitnode, *current;
+  if (NULL != request) {
+    if (!request->completed) {
+      waitnode = xbt_malloc(sizeof(smpi_waitlist_node_t));
+      waitnode->process = MSG_process_self();
+      waitnode->next    = NULL;
+      if (NULL == request->waitlist) {
+        request->waitlist = waitnode;
+      } else {
+        for(current = request->waitlist; NULL != current->next; current = current->next);
+        current->next = waitnode;
+      }
+      MSG_process_suspend(waitnode->process);
+    }
+    if (NULL != status && MPI_STATUS_IGNORE != status) {
+      status->MPI_SOURCE = request->src;
+    }
+  }
+}
+
+void smpi_wait_all(int count, smpi_mpi_request_t **requests, smpi_mpi_status_t *statuses) {
+  int i;
+  for (i = 0; i < count; i++) {
+    smpi_wait(requests[i], &statuses[i]);
+  }
+}
+
+void smpi_wait_all_nostatus(int count, smpi_mpi_request_t **requests) {
+  int i;
+  for (i = 0; i < count; i++) {
+    smpi_wait(requests[i], MPI_STATUS_IGNORE);
+  }
+}
+
+int smpi_sender(int argc, char *argv[]) {
+  m_process_t process;
+  char taskname[50];
+  size_t dsize;
+  void *data;
+  m_host_t dhost;
+  m_task_t mtask;
+  int rank, fc, ft;
+  smpi_mpi_request_t *sendreq;
+
+  process = MSG_process_self();
+
+  // wait for init
+  mtask = (m_task_t)0;
+  MSG_task_get(&mtask, SEND_SYNC_PORT);
+
+  rank = smpi_host_rank_self();
+
+  smpi_sender_processes[rank] = process;
+
+  // ready!
+  MSG_task_put(mtask, MSG_task_get_source(mtask), SEND_SYNC_PORT);
+
+  while (0 < smpi_running_hosts) {
+    sendreq = smpi_pending_send_requests[rank];
+    if (NULL != sendreq) {
+
+      // pull from queue if not a fwd or no more to fwd
+      if (sendreq->dst == sendreq->fwdthrough) {
+        smpi_pending_send_requests[rank] = sendreq->next;
+        if(sendreq == smpi_last_pending_send_requests[rank]) {
+          smpi_last_pending_send_requests[rank] = NULL;
+        }
+        ft = sendreq->dst;
+      } else {
+        fc = ((sendreq->fwdthrough - sendreq->dst + sendreq->comm->size) % sendreq->comm->size) / 2;
+        ft = (sendreq->dst + fc) % sendreq->comm->size;
+        //printf("node %d sending broadcast to node %d through node %d\n", rank, sendreq->dst, ft);
+      }
+
+      // create task to send
+      sprintf(taskname, "comm:%d,src:%d,dst:%d,tag:%d,ft:%d", sendreq->comm->id, sendreq->src, sendreq->dst, sendreq->tag, ft);
+      dsize = sendreq->count * sendreq->datatype->size;
+      data  = xbt_malloc(dsize);
+      memcpy(data, sendreq->buf, dsize);
+      mtask = MSG_task_create(taskname, 0, dsize, data);
+
+      // figure out which host to send it to
+      dhost = sendreq->comm->hosts[sendreq->dst];
+
+      // send task
+      #ifdef DEBUG
+        printf("host %s attempting to send to host %s\n", MSG_host_get_name(MSG_host_self()), MSG_host_get_name(dhost));
+      #endif
+      MSG_task_put(mtask, dhost, MPI_PORT);
+
+      if (sendreq->dst == sendreq->fwdthrough) {
+        smpi_complete(sendreq);
+      } else {
+        sendreq->dst = (sendreq->dst + fc + 1) % sendreq->comm->size;
+      }
+
+    } else {
+      MSG_process_suspend(process);
+    }
+  }
+  return 0;
+}
+
+int smpi_receiver(int argc, char **argv) {
+  m_process_t process;
+  m_task_t mtask;
+  smpi_received_t *received;
+  int rank;
+  smpi_mpi_request_t *recvreq;
+
+  process = MSG_process_self();
+
+  // wait for init
+  mtask = (m_task_t)0;
+  MSG_task_get(&mtask, RECV_SYNC_PORT);
+
+  rank = smpi_host_rank_self();
+
+  // potential race condition...
+  smpi_receiver_processes[rank] = process;
+
+  // ready!
+  MSG_task_put(mtask, MSG_task_get_source(mtask), RECV_SYNC_PORT);
+
+  while (0 < smpi_running_hosts) {
+    recvreq = smpi_pending_recv_requests[rank];
+    if (NULL != recvreq) {
+      mtask = (m_task_t)0;
+
+      #ifdef DEBUG
+        printf("host %s waiting to receive from anyone, but first in queue is (%d,%d,%d).\n",
+         MSG_host_get_name(MSG_host_self()), recvreq->src, recvreq->dst, recvreq->tag);
+      #endif
+      MSG_task_get(&mtask, MPI_PORT);
+
+      received = xbt_malloc(sizeof(smpi_received_t));
+
+      sscanf(MSG_task_get_name(mtask), "comm:%d,src:%d,dst:%d,tag:%d,ft:%d",
+        &received->commid, &received->src, &received->dst, &received->tag, &received->fwdthrough);
+      received->data = MSG_task_get_data(mtask);
+      received->next = NULL;
+
+      if (NULL == smpi_last_received[rank]) {
+        smpi_received[rank] = received;
+      } else {
+        smpi_last_received[rank]->next = received;
+      }
+      smpi_last_received[rank] = received;
+
+      MSG_task_destroy(mtask);
+
+      smpi_match_requests(rank);
+
+    } else {
+      MSG_process_suspend(process);
+    }
+  }
+  return 0;
+}
+
+// FIXME: move into own file
+int smpi_gettimeofday(struct timeval *tv, struct timezone *tz) {
+  double now;
+  int retval = 0;
+  smpi_bench_end();
+  if (NULL == tv) {
+    retval = -1;
+  } else {
+    now = MSG_get_clock();
+    tv->tv_sec  = now;
+    tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+unsigned int smpi_sleep(unsigned int seconds) {
+  m_task_t task = NULL;
+  smpi_bench_end();
+  task = MSG_task_create("sleep", seconds * DEFAULT_POWER, 0, NULL);
+  MSG_task_execute(task);
+  MSG_task_destroy(task);
+  smpi_bench_begin();
+  return 0;
+}
+
+void smpi_exit(int status) {
+  smpi_bench_end();
+  smpi_running_hosts--;
+  MSG_process_kill(MSG_process_self());
+  return;
+}
diff --git a/src/smpi/src/smpi_mpi.c b/src/smpi/src/smpi_mpi.c
new file mode 100644 (file)
index 0000000..4dc2648
--- /dev/null
@@ -0,0 +1,300 @@
+#include <stdio.h>
+#include <sys/time.h>
+#include "msg/msg.h"
+#include "xbt/sysdep.h"
+#include "xbt/xbt_portability.h"
+#include "smpi.h"
+
+int MPI_Init(int *argc, char ***argv) {
+  smpi_mpi_init();
+  smpi_bench_begin();
+  return MPI_SUCCESS;
+}
+
+int MPI_Finalize() {
+  smpi_bench_end();
+  smpi_mpi_finalize();
+  return MPI_SUCCESS;
+}
+
+// right now this just exits the current node, should send abort signal to all
+// hosts in the communicator;
+int MPI_Abort(MPI_Comm comm, int errorcode) {
+  smpi_exit(errorcode);
+}
+
+int MPI_Comm_size(MPI_Comm comm, int *size) {
+  int retval = MPI_SUCCESS;
+  smpi_bench_end();
+  if (NULL == comm) {
+    retval = MPI_ERR_COMM;
+  } else if (NULL == size) {
+    retval = MPI_ERR_ARG;
+  } else {
+    *size = comm->size;
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int MPI_Comm_rank(MPI_Comm comm, int *rank) {
+  int retval = MPI_SUCCESS;
+  smpi_bench_end();
+  if (NULL == comm) {
+    retval = MPI_ERR_COMM;
+  } else if (NULL == rank) {
+    retval = MPI_ERR_ARG;
+  } else {
+    *rank = smpi_comm_rank(comm, MSG_host_self());
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+/*
+int MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm *newcomm) {
+  int retval = MPI_SUCCESS;
+  m_host_t host = MSG_host_self();
+  int rank = smpi_comm_rank(comm, host);
+  smpi_mpi_comm_split_table_node_t *split_table; 
+  split_table = xbt_malloc(sizeof(smpi_mpi_comm_split_table_node_t) * comm->size);
+  split_table[rank].color = color;
+  split_table[rank].key   = key;
+  split_table[rank].host  = host;
+  smpi_mpi_alltoall(
+  return retval;
+}
+*/
+
+int MPI_Type_size(MPI_Datatype datatype, size_t *size) {
+  int retval = MPI_SUCCESS;
+  smpi_bench_end();
+  if (NULL == datatype) {
+    retval = MPI_ERR_TYPE;
+  } else if (NULL == size) {
+    retval = MPI_ERR_ARG;
+  } else {
+    *size = datatype->size;
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int MPI_Wait(MPI_Request *request, MPI_Status *status) {
+  int retval = MPI_SUCCESS;
+  smpi_bench_end();
+  if (NULL == request) {
+    retval = MPI_ERR_REQUEST;
+  } else if (NULL == status) {
+    retval = MPI_ERR_ARG;
+  } else {
+    smpi_wait(*request, status);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int MPI_Waitall(int count, MPI_Request *requests, MPI_Status *statuses) {
+  int retval = MPI_SUCCESS;
+  smpi_bench_end();
+  if (NULL == requests) {
+    retval = MPI_ERR_REQUEST;
+  } else if (NULL == statuses) {
+    retval = MPI_ERR_ARG;
+  } else {
+    smpi_wait_all(count, requests, statuses);
+  }
+  smpi_bench_begin();
+  return MPI_ERR_INTERN;
+}
+
+int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request) {
+  int retval = MPI_SUCCESS;
+  int dst;
+  smpi_mpi_request_t *recvreq;
+  smpi_bench_end();
+  dst = smpi_comm_rank(comm, MSG_host_self());
+  retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, &recvreq);
+  if (NULL != recvreq) {
+    smpi_irecv(recvreq);
+    if (NULL != request) {
+      *request = recvreq;
+    }
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status *status) {
+  int retval = MPI_SUCCESS;
+  int dst;
+  smpi_mpi_request_t *recvreq;
+  smpi_bench_end();
+  dst = smpi_comm_rank(comm, MSG_host_self());
+  retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, &recvreq);
+  if (NULL != recvreq) {
+    smpi_irecv(recvreq);
+    smpi_wait(recvreq, status);
+    xbt_free(recvreq);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int MPI_Isend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm, MPI_Request *request) {
+  int retval = MPI_SUCCESS;
+  int src;
+  smpi_mpi_request_t *sendreq;
+  smpi_bench_end();
+  src = smpi_comm_rank(comm, MSG_host_self());
+  retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, &sendreq);
+  if (NULL != sendreq) {
+    smpi_isend(sendreq);
+    if (NULL != request) {
+      *request = sendreq;
+    }
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
+  int retval = MPI_SUCCESS;
+  int src;
+  smpi_mpi_request_t *sendreq;
+  smpi_bench_end();
+  src = smpi_comm_rank(comm, MSG_host_self());
+  retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, &sendreq);
+  if (NULL != sendreq) {
+    smpi_isend(sendreq);
+    smpi_wait(sendreq, MPI_STATUS_IGNORE);
+  }
+  smpi_bench_begin();
+  return retval;
+}
+
+int MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
+  int i;
+  int rank;
+  smpi_mpi_request_t *request;
+  smpi_bench_end();
+
+  rank = smpi_comm_rank(comm, MSG_host_self());
+
+  if (root == rank) {
+    smpi_create_request(buf, count, datatype, root, (rank + 1) % comm->size, 0, comm, &request);
+    if (comm->size > 2) {
+      request->fwdthrough = (rank - 1 + comm->size) % comm->size;
+    }
+    smpi_isend(request);
+    smpi_wait(request, MPI_STATUS_IGNORE);
+  } else {
+    smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank, 0, comm, &request);
+    if (NULL != request) {
+      smpi_irecv(request);
+      smpi_wait(request, MPI_STATUS_IGNORE);
+    }
+  }
+
+  smpi_bench_begin();
+  return MPI_SUCCESS;
+}
+
+int MPI_Barrier(MPI_Comm comm) {
+  smpi_bench_end();
+  smpi_barrier(comm);
+  smpi_bench_begin();
+  return MPI_SUCCESS;
+}
+
+// FIXME: instead of everyone sending in order, might be a good idea to send to next...
+int MPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
+  int i;
+  int rank;
+  smpi_mpi_request_t **sendreqs, **recvreqs;
+  smpi_bench_end();
+
+  rank = smpi_comm_rank(comm, MSG_host_self());
+
+  sendreqs = xbt_malloc(sizeof(smpi_mpi_request_t*) * comm->size);
+  recvreqs = xbt_malloc(sizeof(smpi_mpi_request_t*) * comm->size);
+
+  for (i = 0; i < comm->size; i++) {
+    if (rank == i) {
+      sendreqs[i] = NULL;
+      recvreqs[i] = NULL;
+      memcpy(recvbuf + recvtype->size * recvcount * i, sendbuf + sendtype->size * sendcount * i, recvtype->size * recvcount);
+    } else {
+      smpi_create_request(sendbuf + sendtype->size * sendcount * i, sendcount, sendtype, rank, i, 0, comm, sendreqs + i);
+      smpi_isend(sendreqs[i]);
+      smpi_create_request(recvbuf + recvtype->size * recvcount * i, recvcount, recvtype, i, rank, 0, comm, recvreqs + i);
+      smpi_irecv(recvreqs[i]);
+    }
+  } 
+
+  smpi_wait_all_nostatus(comm->size, sendreqs);
+  smpi_wait_all_nostatus(comm->size, recvreqs);
+
+  xbt_free(sendreqs);
+  xbt_free(recvreqs);
+
+  smpi_bench_begin();
+  return MPI_SUCCESS;
+}
+
+// FIXME: mpi routines shouldn't call mpi routines, complexity belongs at a lower level
+// also, there's probably a really clever way to overlap everything for this one...
+int MPI_Allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
+  MPI_Reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
+  MPI_Bcast(recvbuf, count, datatype, 0, comm);
+  return MPI_SUCCESS;
+}
+
+// FIXME: check if behavior defined when send/recv bufs are same...
+int MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
+  int i, j;
+  int rank;
+  smpi_mpi_request_t **requests;
+  void **scratchbuf;
+  smpi_bench_end();
+
+  rank = smpi_comm_rank(comm, MSG_host_self());
+
+  if (root == rank) {
+    requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * comm->size);
+    scratchbuf = xbt_malloc(sizeof(void*) * comm->size);
+    memcpy(recvbuf, sendbuf, datatype->size * count);
+    for (i = 0; i < comm->size; i++) {
+      if (rank == i) {
+        requests[i] = NULL;
+        scratchbuf[i] = NULL;
+      } else {
+        scratchbuf[i] = xbt_malloc(datatype->size * count);
+        smpi_create_request(scratchbuf[i], count, datatype, MPI_ANY_SOURCE, rank, 0, comm, requests + i);
+        smpi_irecv(requests[i]);
+      }
+    }
+    smpi_wait_all_nostatus(comm->size, requests); // FIXME: use wait_any for slight performance gain
+    for (i = 0; i < comm->size; i++) {
+      if (rank != i) {
+        for (j = 0; j < count; j++) {
+          op->func(scratchbuf[i] + datatype->size * j, recvbuf + datatype->size * j, recvbuf + datatype->size * j);
+        }
+        xbt_free(requests[i]);
+        xbt_free(scratchbuf[i]);
+      }
+    }
+    xbt_free(requests);
+    xbt_free(scratchbuf);
+  } else {
+    requests = xbt_malloc(sizeof(smpi_mpi_request_t*));
+    smpi_create_request(sendbuf, count, datatype, rank, root, 0, comm, requests);
+    smpi_isend(*requests);
+    smpi_wait(*requests, MPI_STATUS_IGNORE);
+    xbt_free(*requests);
+    xbt_free(requests);
+  }
+
+  smpi_bench_begin();
+  return MPI_SUCCESS;
+}