-#include "colls.h"
+/* Copyright (c) 2013-2014. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "colls_private.h"
//#include <star-reduction.c>
int reduce_arrival_pattern_aware_segment_size_in_byte = 8192;
MPI_Op op, int root,
MPI_Comm comm)
{
- int rank;
- MPI_Comm_rank(comm, &rank);
-
- int tag = 50;
+ int rank = smpi_comm_rank(comm);
+ int tag = -COLL_TAG_REDUCE;
MPI_Status status;
MPI_Request request;
MPI_Request *send_request_array;
MPI_Status temp_status_array[MAX_NODE];
- int size;
+ int size = smpi_comm_size(comm);
int i;
int sent_count;
int header_buf[HEADER_SIZE];
char temp_buf[MAX_NODE];
- MPI_Aint extent;
- MPI_Type_extent(datatype, &extent);
+ MPI_Aint extent, lb;
+ smpi_datatype_extent(datatype, &lb, &extent);
/* source and destination */
int to, from;
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
- MPI_Comm_size(MPI_COMM_WORLD, &size);
-
-
/* segment is segment size in number of elements (not bytes) */
int segment = reduce_arrival_pattern_aware_segment_size_in_byte / extent;
}
char *tmp_buf;
- tmp_buf = (char *) malloc(count * extent);
+ tmp_buf = (char *) smpi_get_tmp_sendbuffer(count * extent);
- MPI_Sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
+ smpi_mpi_sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
tag, comm, &status);
while (sent_count < (size - 1)) {
for (i = 1; i < size; i++) {
- if (already_received[i] == 0)
- MPI_Iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i],
- MPI_STATUSES_IGNORE);
+ if (already_received[i] == 0) {
+ smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
+ MPI_STATUSES_IGNORE);
+ simcall_process_sleep(0.0001);
+ }
}
header_index = 0;
/* 1-byte message arrive */
if ((flag_array[i] == 1) && (already_received[i] == 0)) {
- MPI_Recv(temp_buf, 1, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status);
+ smpi_mpi_recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status);
header_buf[header_index] = i;
header_index++;
sent_count++;
to = header_buf[0];
from = header_buf[header_index - 1];
- MPI_Send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
- MPI_Recv(tmp_buf, count, datatype, from, tag, comm, &status);
- star_reduction(op, tmp_buf, rbuf, &count, &datatype);
+ smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
+ smpi_mpi_recv(tmp_buf, count, datatype, from, tag, comm, &status);
+ smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype);
}
} /* while loop */
}
else {
/* send 1-byte message to root */
- MPI_Send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
+ smpi_mpi_send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
/* wait for header and data, forward when required */
- MPI_Recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
+ smpi_mpi_recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
&status);
- // MPI_Recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
+ // smpi_mpi_recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
/* search for where it is */
int myordering = 0;
/* forward header */
if (header_buf[myordering + 1] != -1) {
- MPI_Send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
+ smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
tag, comm);
}
//printf("node %d ordering %d\n",rank,myordering);
} else {
to = header_buf[myordering + 1];
}
- MPI_Send(rbuf, count, datatype, to, tag, comm);
+ smpi_mpi_send(rbuf, count, datatype, to, tag, comm);
}
/* recv, reduce, send */
to = header_buf[myordering + 1];
}
from = header_buf[myordering - 1];
- MPI_Recv(tmp_buf, count, datatype, header_buf[myordering - 1], tag,
- comm, &status);
- star_reduction(op, tmp_buf, rbuf, &count, &datatype);
- MPI_Send(rbuf, count, datatype, to, tag, comm);
+ smpi_mpi_recv(tmp_buf, count, datatype, from, tag, comm, &status);
+ smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype);
+ smpi_mpi_send(rbuf, count, datatype, to, tag, comm);
}
} /* non-root */
}
// printf("node %d start\n",rank);
send_request_array =
- (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request));
+ (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
recv_request_array =
- (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request));
+ (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
send_status_array =
- (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status));
+ (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
recv_status_array =
- (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status));
+ (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
if (rank == 0) {
sent_count = 0;
//if (i == rank)
//continue;
if ((already_received[i] == 0) && (will_send[i] == 0)) {
- MPI_Iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i],
+ smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
&temp_status_array[i]);
if (flag_array[i] == 1) {
will_send[i] = 1;
- MPI_Recv(&temp_buf[i], 1, MPI_CHAR, i, tag, MPI_COMM_WORLD,
+ smpi_mpi_recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm,
&status);
//printf("recv from %d\n",i);
i = 1;
to = header_buf[0];
/* send header */
- MPI_Send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
+ smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
/* recv data - pipeline */
from = header_buf[header_index - 1];
for (i = 0; i < pipe_length; i++) {
- MPI_Recv(tmp_buf + (i * increment), segment, datatype, from, tag,
+ smpi_mpi_recv(tmp_buf + (i * increment), segment, datatype, from, tag,
comm, &status);
- star_reduction(op, tmp_buf + (i * increment),
+ smpi_op_apply(op, tmp_buf + (i * increment),
(char *)rbuf + (i * increment), &segment, &datatype);
}
}
/* none root */
else {
/* send 1-byte message to root */
- MPI_Send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
+ smpi_mpi_send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
/* wait for header forward when required */
- MPI_Irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
- &request);
- MPI_Wait(&request, MPI_STATUS_IGNORE);
+ request=smpi_mpi_irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm);
+ smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
/* search for where it is */
int myordering = 0;
/* send header when required */
if (header_buf[myordering + 1] != -1) {
- MPI_Send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
+ smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
tag, comm);
}
/* send only */
if (myordering == 0) {
for (i = 0; i < pipe_length; i++) {
- MPI_Isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm,
- &send_request_array[i]);
+ send_request_array[i]= smpi_mpi_isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
}
- MPI_Waitall((pipe_length), send_request_array, send_status_array);
+ smpi_mpi_waitall((pipe_length), send_request_array, send_status_array);
}
/* receive, reduce, and send */
else {
from = header_buf[myordering - 1];
for (i = 0; i < pipe_length; i++) {
- MPI_Irecv(tmp_buf + (i * increment), segment, datatype, from, tag,
- comm, &recv_request_array[i]);
+ recv_request_array[i]=smpi_mpi_irecv(tmp_buf + (i * increment), segment, datatype, from, tag, comm);
}
for (i = 0; i < pipe_length; i++) {
- MPI_Wait(&recv_request_array[i], MPI_STATUS_IGNORE);
- star_reduction(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment),
+ smpi_mpi_wait(&recv_request_array[i], MPI_STATUS_IGNORE);
+ smpi_op_apply(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment),
&segment, &datatype);
- MPI_Isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm,
- &send_request_array[i]);
+ send_request_array[i]=smpi_mpi_isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
}
- MPI_Waitall((pipe_length), send_request_array, send_status_array);
+ smpi_mpi_waitall((pipe_length), send_request_array, send_status_array);
}
} /* non-root */
*/
if (root != 0) {
if (rank == 0) {
- MPI_Send(rbuf, count, datatype, root, tag, comm);
+ smpi_mpi_send(rbuf, count, datatype, root, tag, comm);
} else if (rank == root) {
- MPI_Recv(rbuf, count, datatype, 0, tag, comm, &status);
+ smpi_mpi_recv(rbuf, count, datatype, 0, tag, comm, &status);
}
}
/* when count is not divisible by block size, use default BCAST for the remainder */
if ((remainder != 0) && (count > segment)) {
- MPI_Reduce((char *)buf + (pipe_length * increment),
+ smpi_mpi_reduce((char *)buf + (pipe_length * increment),
(char *)rbuf + (pipe_length * increment), remainder, datatype, op, root,
comm);
}
- free(tmp_buf);
+ smpi_free_tmp_buffer(tmp_buf);
return MPI_SUCCESS;
}