- if (op != MPI_OP_NULL && not op->is_commutative()) {
- return Coll_reduce_ompi_basic_linear::reduce(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
- }
-
- if( sendbuf == MPI_IN_PLACE ) {
- sendtmpbuf = static_cast<char *>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
- Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
- }
-
- if(rank != root) {
- // Send buffer to root
- Request::send(sendtmpbuf, count, datatype, root, system_tag, comm);
- } else {
- datatype->extent(&lb, &dataext);
- // Local copy from root
- if (sendtmpbuf != nullptr && recvbuf != nullptr)
- Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
- // Receive buffers from senders
- MPI_Request *requests = xbt_new(MPI_Request, size - 1);
- void **tmpbufs = xbt_new(void *, size - 1);
- int index = 0;
- for (int src = 0; src < size; src++) {
- if (src != root) {
- if (not smpi_process()->replaying())
- tmpbufs[index] = xbt_malloc(count * dataext);
- else
- tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
- requests[index] =
- Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
- index++;
- }
- }
- // Wait for completion of irecv's.
- Request::startall(size - 1, requests);
- for (int src = 0; src < size - 1; src++) {
- index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
- XBT_DEBUG("finished waiting any request with index %d", index);
- if(index == MPI_UNDEFINED) {
- break;
- }else{
- Request::unref(&requests[index]);
- }
- if(op) /* op can be MPI_OP_NULL that does nothing */
- if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype);
- }
- for(index = 0; index < size - 1; index++) {
- smpi_free_tmp_buffer(tmpbufs[index]);
- }
- xbt_free(tmpbufs);
- xbt_free(requests);
-
- }
- if( sendbuf == MPI_IN_PLACE ) {
- smpi_free_tmp_buffer(sendtmpbuf);