status->MPI_TAG = (*request)->tag;
status->MPI_ERROR = MPI_SUCCESS;
status->_count = (*request)->size; // size in bytes
- status->_cancelled = 0; // FIXME: cancellation of requests not handled yet
+ status->_cancelled = 0; // FIXME: cancellation of requests not handled yet
}
DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
// data == *request if sender is first to finish its wait
int rank, size, src, index, datasize;
MPI_Request* requests;
void** tmpbufs;
-
+
rank = smpi_comm_rank(comm);
size = smpi_comm_size(comm);
if(rank != root) {
} else {
datasize = smpi_datatype_size(datatype);
// Local copy from root
- memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
+ memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
// Receive buffers from senders
//TODO: make a MPI_barrier here ?
requests = xbt_new(MPI_Request, size - 1);
int rank, size, other, index, datasize;
MPI_Request* requests;
void** tmpbufs;
-
+
rank = smpi_comm_rank(comm);
size = smpi_comm_size(comm);
datasize = smpi_datatype_size(datatype);
// Local copy from self
- memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
+ memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
// Send/Recv buffers to/from others;
//TODO: make a MPI_barrier here ?
requests = xbt_new(MPI_Request, 2 * (size - 1));
xbt_free(requests);
*/
}
+
+void smpi_mpi_scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
+ int system_tag = 666;
+ int rank, size, other, index, datasize;
+ int total;
+ MPI_Request* requests;
+ void** tmpbufs;
+
+ rank = smpi_comm_rank(comm);
+ size = smpi_comm_size(comm);
+ datasize = smpi_datatype_size(datatype);
+ // Local copy from self
+ memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
+ // Send/Recv buffers to/from others;
+ total = rank + (size - (rank + 1));
+ requests = xbt_new(MPI_Request, total);
+ tmpbufs = xbt_new(void*, rank);
+ index = 0;
+ for(other = 0; other < rank; other++) {
+ tmpbufs[index] = xbt_malloc(count * datasize);
+ requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, other, system_tag, comm);
+ index++;
+ }
+ for(other = rank + 1; other < size; other++) {
+ requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
+ index++;
+ }
+ // Wait for completion of all comms.
+ for(other = 0; other < total; other++) {
+ index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
+ if(index == MPI_UNDEFINED) {
+ break;
+ }
+ if(index < rank) {
+ // #Request is below rank: it's a irecv
+ smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
+ }
+ }
+ for(index = 0; index < size - 1; index++) {
+ xbt_free(tmpbufs[index]);
+ }
+ xbt_free(tmpbufs);
+ xbt_free(requests);
+}