X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/d155fd69fa99c97b3a9c86bb7f2e472c2e7332df..cc63d7b267be4c20633a0be7db63b3d88030bee6:/src/smpi/colls/reduce-arrival-pattern-aware.c diff --git a/src/smpi/colls/reduce-arrival-pattern-aware.c b/src/smpi/colls/reduce-arrival-pattern-aware.c index 0182a6b516..920a15c836 100644 --- a/src/smpi/colls/reduce-arrival-pattern-aware.c +++ b/src/smpi/colls/reduce-arrival-pattern-aware.c @@ -1,4 +1,10 @@ -#include "colls.h" +/* Copyright (c) 2013-2014. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "colls_private.h" //#include int reduce_arrival_pattern_aware_segment_size_in_byte = 8192; @@ -18,10 +24,8 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, MPI_Op op, int root, MPI_Comm comm) { - int rank; - MPI_Comm_rank(comm, &rank); - - int tag = 50; + int rank = smpi_comm_rank(comm); + int tag = -COLL_TAG_REDUCE; MPI_Status status; MPI_Request request; MPI_Request *send_request_array; @@ -31,7 +35,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, MPI_Status temp_status_array[MAX_NODE]; - int size; + int size = smpi_comm_size(comm); int i; int sent_count; @@ -42,16 +46,12 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, int header_buf[HEADER_SIZE]; char temp_buf[MAX_NODE]; - MPI_Aint extent; - MPI_Type_extent(datatype, &extent); + MPI_Aint extent, lb; + smpi_datatype_extent(datatype, &lb, &extent); /* source and destination */ int to, from; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); - - /* segment is segment size in number of elements (not bytes) */ int segment = reduce_arrival_pattern_aware_segment_size_in_byte / extent; @@ -72,9 +72,9 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, } char *tmp_buf; - tmp_buf = (char *) malloc(count * extent); + tmp_buf = (char *) xbt_malloc(count * extent); - MPI_Sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank, + smpi_mpi_sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank, tag, comm, &status); @@ -88,9 +88,11 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, while (sent_count < (size - 1)) { for (i = 1; i < size; i++) { - if (already_received[i] == 0) - MPI_Iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i], - MPI_STATUSES_IGNORE); + if (already_received[i] == 0) { + smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i], + MPI_STATUSES_IGNORE); + simcall_process_sleep(0.0001); + } } header_index = 0; @@ -101,7 +103,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, /* 1-byte message arrive */ if ((flag_array[i] == 1) && (already_received[i] == 0)) { - MPI_Recv(temp_buf, 1, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); + smpi_mpi_recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status); header_buf[header_index] = i; header_index++; sent_count++; @@ -125,9 +127,9 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, to = header_buf[0]; from = header_buf[header_index - 1]; - MPI_Send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm); - MPI_Recv(tmp_buf, count, datatype, from, tag, comm, &status); - star_reduction(op, tmp_buf, rbuf, &count, &datatype); + smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm); + smpi_mpi_recv(tmp_buf, count, datatype, from, tag, comm, &status); + smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype); } } /* while loop */ } @@ -137,12 +139,12 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, else { /* send 1-byte message to root */ - MPI_Send(temp_buf, 1, MPI_CHAR, 0, tag, comm); + smpi_mpi_send(temp_buf, 1, MPI_CHAR, 0, tag, comm); /* wait for header and data, forward when required */ - MPI_Recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm, + smpi_mpi_recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm, &status); - // MPI_Recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status); + // smpi_mpi_recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status); /* search for where it is */ int myordering = 0; @@ -152,7 +154,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, /* forward header */ if (header_buf[myordering + 1] != -1) { - MPI_Send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1], + smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1], tag, comm); } //printf("node %d ordering %d\n",rank,myordering); @@ -166,7 +168,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, } else { to = header_buf[myordering + 1]; } - MPI_Send(rbuf, count, datatype, to, tag, comm); + smpi_mpi_send(rbuf, count, datatype, to, tag, comm); } /* recv, reduce, send */ @@ -177,10 +179,9 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, to = header_buf[myordering + 1]; } from = header_buf[myordering - 1]; - MPI_Recv(tmp_buf, count, datatype, header_buf[myordering - 1], tag, - comm, &status); - star_reduction(op, tmp_buf, rbuf, &count, &datatype); - MPI_Send(rbuf, count, datatype, to, tag, comm); + smpi_mpi_recv(tmp_buf, count, datatype, from, tag, comm, &status); + smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype); + smpi_mpi_send(rbuf, count, datatype, to, tag, comm); } } /* non-root */ } @@ -189,13 +190,13 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, // printf("node %d start\n",rank); send_request_array = - (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request)); + (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request)); recv_request_array = - (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request)); + (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request)); send_status_array = - (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status)); + (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status)); recv_status_array = - (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status)); + (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status)); if (rank == 0) { sent_count = 0; @@ -212,11 +213,11 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, //if (i == rank) //continue; if ((already_received[i] == 0) && (will_send[i] == 0)) { - MPI_Iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i], + smpi_mpi_iprobe(i, MPI_ANY_TAG, comm, &flag_array[i], &temp_status_array[i]); if (flag_array[i] == 1) { will_send[i] = 1; - MPI_Recv(&temp_buf[i], 1, MPI_CHAR, i, tag, MPI_COMM_WORLD, + smpi_mpi_recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm, &status); //printf("recv from %d\n",i); i = 1; @@ -248,14 +249,14 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, to = header_buf[0]; /* send header */ - MPI_Send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm); + smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm); /* recv data - pipeline */ from = header_buf[header_index - 1]; for (i = 0; i < pipe_length; i++) { - MPI_Recv(tmp_buf + (i * increment), segment, datatype, from, tag, + smpi_mpi_recv(tmp_buf + (i * increment), segment, datatype, from, tag, comm, &status); - star_reduction(op, tmp_buf + (i * increment), + smpi_op_apply(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment), &segment, &datatype); } } @@ -266,13 +267,12 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, /* none root */ else { /* send 1-byte message to root */ - MPI_Send(temp_buf, 1, MPI_CHAR, 0, tag, comm); + smpi_mpi_send(temp_buf, 1, MPI_CHAR, 0, tag, comm); /* wait for header forward when required */ - MPI_Irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm, - &request); - MPI_Wait(&request, MPI_STATUS_IGNORE); + request=smpi_mpi_irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm); + smpi_mpi_wait(&request, MPI_STATUS_IGNORE); /* search for where it is */ int myordering = 0; @@ -283,7 +283,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, /* send header when required */ if (header_buf[myordering + 1] != -1) { - MPI_Send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1], + smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1], tag, comm); } @@ -297,27 +297,24 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, /* send only */ if (myordering == 0) { for (i = 0; i < pipe_length; i++) { - MPI_Isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm, - &send_request_array[i]); + send_request_array[i]= smpi_mpi_isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm); } - MPI_Waitall((pipe_length), send_request_array, send_status_array); + smpi_mpi_waitall((pipe_length), send_request_array, send_status_array); } /* receive, reduce, and send */ else { from = header_buf[myordering - 1]; for (i = 0; i < pipe_length; i++) { - MPI_Irecv(tmp_buf + (i * increment), segment, datatype, from, tag, - comm, &recv_request_array[i]); + recv_request_array[i]=smpi_mpi_irecv(tmp_buf + (i * increment), segment, datatype, from, tag, comm); } for (i = 0; i < pipe_length; i++) { - MPI_Wait(&recv_request_array[i], MPI_STATUS_IGNORE); - star_reduction(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment), + smpi_mpi_wait(&recv_request_array[i], MPI_STATUS_IGNORE); + smpi_op_apply(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment), &segment, &datatype); - MPI_Isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm, - &send_request_array[i]); + send_request_array[i]=smpi_mpi_isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm); } - MPI_Waitall((pipe_length), send_request_array, send_status_array); + smpi_mpi_waitall((pipe_length), send_request_array, send_status_array); } } /* non-root */ @@ -338,16 +335,16 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf, */ if (root != 0) { if (rank == 0) { - MPI_Send(rbuf, count, datatype, root, tag, comm); + smpi_mpi_send(rbuf, count, datatype, root, tag, comm); } else if (rank == root) { - MPI_Recv(rbuf, count, datatype, 0, tag, comm, &status); + smpi_mpi_recv(rbuf, count, datatype, 0, tag, comm, &status); } } /* when count is not divisible by block size, use default BCAST for the remainder */ if ((remainder != 0) && (count > segment)) { - MPI_Reduce((char *)buf + (pipe_length * increment), + smpi_mpi_reduce((char *)buf + (pipe_length * increment), (char *)rbuf + (pipe_length * increment), remainder, datatype, op, root, comm); }