X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/9ce9ffe42a1e578ec44ccf1cb804dff809d32c5d..d9e121bcb34f3cca05904dd7622a1968ffa77a37:/src/smpi/colls/reduce-arrival-pattern-aware.c diff --git a/src/smpi/colls/reduce-arrival-pattern-aware.c b/src/smpi/colls/reduce-arrival-pattern-aware.c index cf5df9eae6..7ae69a8fda 100644 --- a/src/smpi/colls/reduce-arrival-pattern-aware.c +++ b/src/smpi/colls/reduce-arrival-pattern-aware.c @@ -1,4 +1,4 @@ -#include "colls.h" +#include "colls_private.h" //#include int reduce_arrival_pattern_aware_segment_size_in_byte = 8192; @@ -19,9 +19,9 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, MPI_Comm comm) { int rank; - MPI_Comm_rank(comm, &rank); + rank = smpi_comm_rank(comm); - int tag = 50; + int tag = -COLL_TAG_REDUCE; MPI_Status status; MPI_Request request; MPI_Request *send_request_array; @@ -72,7 +72,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, } char *tmp_buf; - tmp_buf = (char *) malloc(count * extent); + tmp_buf = (char *) xbt_malloc(count * extent); smpi_mpi_sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank, tag, comm, &status); @@ -88,9 +88,11 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, while (sent_count < (size - 1)) { for (i = 1; i < size; i++) { - if (already_received[i] == 0) - smpi_mpi_iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i], - MPI_STATUSES_IGNORE); + if (already_received[i] == 0) { + smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i], + MPI_STATUSES_IGNORE); + simcall_process_sleep(0.0001); + } } header_index = 0; @@ -101,7 +103,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, /* 1-byte message arrive */ if ((flag_array[i] == 1) && (already_received[i] == 0)) { - smpi_mpi_recv(temp_buf, 1, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); + smpi_mpi_recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status); header_buf[header_index] = i; header_index++; sent_count++; @@ -127,7 +129,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm); smpi_mpi_recv(tmp_buf, count, datatype, from, tag, comm, &status); - star_reduction(op, tmp_buf, rbuf, &count, &datatype); + smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype); } } /* while loop */ } @@ -142,7 +144,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, /* wait for header and data, forward when required */ smpi_mpi_recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm, &status); - // MPI_Recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status); + // smpi_mpi_recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status); /* search for where it is */ int myordering = 0; @@ -179,7 +181,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, from = header_buf[myordering - 1]; smpi_mpi_recv(tmp_buf, count, datatype, header_buf[myordering - 1], tag, comm, &status); - star_reduction(op, tmp_buf, rbuf, &count, &datatype); + smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype); smpi_mpi_send(rbuf, count, datatype, to, tag, comm); } } /* non-root */ @@ -189,13 +191,13 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, // printf("node %d start\n",rank); send_request_array = - (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request)); + (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request)); recv_request_array = - (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request)); + (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request)); send_status_array = - (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status)); + (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status)); recv_status_array = - (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status)); + (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status)); if (rank == 0) { sent_count = 0; @@ -212,11 +214,11 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, //if (i == rank) //continue; if ((already_received[i] == 0) && (will_send[i] == 0)) { - smpi_mpi_iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i], + smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i], &temp_status_array[i]); if (flag_array[i] == 1) { will_send[i] = 1; - smpi_mpi_recv(&temp_buf[i], 1, MPI_CHAR, i, tag, MPI_COMM_WORLD, + smpi_mpi_recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm, &status); //printf("recv from %d\n",i); i = 1; @@ -255,7 +257,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, for (i = 0; i < pipe_length; i++) { smpi_mpi_recv(tmp_buf + (i * increment), segment, datatype, from, tag, comm, &status); - star_reduction(op, tmp_buf + (i * increment), + smpi_op_apply(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment), &segment, &datatype); } } @@ -309,7 +311,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, } for (i = 0; i < pipe_length; i++) { smpi_mpi_wait(&recv_request_array[i], MPI_STATUS_IGNORE); - star_reduction(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment), + smpi_op_apply(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment), &segment, &datatype); send_request_array[i]=smpi_mpi_isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm); }