+ // Wait for completion of isend's.
+ smpi_mpi_startall(size - 1, requests);
+ smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
+ xbt_free(requests);
+ }
+}
+
+void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
+ MPI_Datatype sendtype, void *recvbuf, int recvcount,
+ MPI_Datatype recvtype, int root, MPI_Comm comm)
+{
+ int system_tag = 666;
+ int rank, size, dst, index, sendsize, recvsize;
+ MPI_Request *requests;
+
+ rank = smpi_comm_rank(comm);
+ size = smpi_comm_size(comm);
+ if (rank != root) {
+ // Recv buffer from root
+ smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
+ MPI_STATUS_IGNORE);
+ } else {
+ sendsize = smpi_datatype_size(sendtype);
+ recvsize = smpi_datatype_size(recvtype);
+ // Local copy from root
+ memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
+ recvcount * recvsize * sizeof(char));
+ // Send buffers to receivers
+ requests = xbt_new(MPI_Request, size - 1);
+ index = 0;
+ for (dst = 0; dst < size; dst++) {
+ if (dst != root) {
+ requests[index] =
+ smpi_isend_init(&((char *) sendbuf)[displs[dst]],
+ sendcounts[dst], sendtype, dst, system_tag,
+ comm);
+ index++;
+ }
+ }
+ // Wait for completion of isend's.
+ smpi_mpi_startall(size - 1, requests);
+ smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
+ xbt_free(requests);
+ }
+}
+
+void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
+ MPI_Datatype datatype, MPI_Op op, int root,
+ MPI_Comm comm)
+{
+ int system_tag = 666;
+ int rank, size, src, index, datasize;
+ MPI_Request *requests;
+ void **tmpbufs;
+
+ rank = smpi_comm_rank(comm);
+ size = smpi_comm_size(comm);
+ if (rank != root) {
+ // Send buffer to root
+ smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
+ } else {
+ datasize = smpi_datatype_size(datatype);
+ // Local copy from root
+ memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
+ // Receive buffers from senders
+ //TODO: make a MPI_barrier here ?
+ requests = xbt_new(MPI_Request, size - 1);
+ tmpbufs = xbt_new(void *, size - 1);
+ index = 0;
+ for (src = 0; src < size; src++) {
+ if (src != root) {
+ tmpbufs[index] = xbt_malloc(count * datasize);
+ requests[index] =
+ smpi_irecv_init(tmpbufs[index], count, datatype, src,
+ system_tag, comm);
+ index++;
+ }
+ }
+ // Wait for completion of irecv's.
+ smpi_mpi_startall(size - 1, requests);
+ for (src = 0; src < size - 1; src++) {
+ index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
+ if (index == MPI_UNDEFINED) {
+ break;
+ }
+ smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);