-#include "colls.h"
+#include "colls_private.h"
//#include <star-reduction.c>
int reduce_arrival_pattern_aware_segment_size_in_byte = 8192;
MPI_Comm comm)
{
int rank;
- MPI_Comm_rank(comm, &rank);
+ rank = smpi_comm_rank(comm);
- int tag = 50;
+ int tag = -COLL_TAG_REDUCE;
MPI_Status status;
MPI_Request request;
MPI_Request *send_request_array;
}
char *tmp_buf;
- tmp_buf = (char *) malloc(count * extent);
+ tmp_buf = (char *) xbt_malloc(count * extent);
smpi_mpi_sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
tag, comm, &status);
while (sent_count < (size - 1)) {
for (i = 1; i < size; i++) {
- if (already_received[i] == 0)
- smpi_mpi_iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i],
- MPI_STATUSES_IGNORE);
+ if (already_received[i] == 0) {
+ smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
+ MPI_STATUSES_IGNORE);
+ simcall_process_sleep(0.0001);
+ }
}
header_index = 0;
/* 1-byte message arrive */
if ((flag_array[i] == 1) && (already_received[i] == 0)) {
- smpi_mpi_recv(temp_buf, 1, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status);
+ smpi_mpi_recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status);
header_buf[header_index] = i;
header_index++;
sent_count++;
smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
smpi_mpi_recv(tmp_buf, count, datatype, from, tag, comm, &status);
- star_reduction(op, tmp_buf, rbuf, &count, &datatype);
+ smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype);
}
} /* while loop */
}
/* wait for header and data, forward when required */
smpi_mpi_recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
&status);
- // MPI_Recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
+ // smpi_mpi_recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
/* search for where it is */
int myordering = 0;
from = header_buf[myordering - 1];
smpi_mpi_recv(tmp_buf, count, datatype, header_buf[myordering - 1], tag,
comm, &status);
- star_reduction(op, tmp_buf, rbuf, &count, &datatype);
+ smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype);
smpi_mpi_send(rbuf, count, datatype, to, tag, comm);
}
} /* non-root */
// printf("node %d start\n",rank);
send_request_array =
- (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request));
+ (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
recv_request_array =
- (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request));
+ (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
send_status_array =
- (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status));
+ (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
recv_status_array =
- (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status));
+ (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
if (rank == 0) {
sent_count = 0;
//if (i == rank)
//continue;
if ((already_received[i] == 0) && (will_send[i] == 0)) {
- smpi_mpi_iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i],
+ smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
&temp_status_array[i]);
if (flag_array[i] == 1) {
will_send[i] = 1;
- smpi_mpi_recv(&temp_buf[i], 1, MPI_CHAR, i, tag, MPI_COMM_WORLD,
+ smpi_mpi_recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm,
&status);
//printf("recv from %d\n",i);
i = 1;
for (i = 0; i < pipe_length; i++) {
smpi_mpi_recv(tmp_buf + (i * increment), segment, datatype, from, tag,
comm, &status);
- star_reduction(op, tmp_buf + (i * increment),
+ smpi_op_apply(op, tmp_buf + (i * increment),
(char *)rbuf + (i * increment), &segment, &datatype);
}
}
}
for (i = 0; i < pipe_length; i++) {
smpi_mpi_wait(&recv_request_array[i], MPI_STATUS_IGNORE);
- star_reduction(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment),
+ smpi_op_apply(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment),
&segment, &datatype);
send_request_array[i]=smpi_mpi_isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
}