- int cnt=0;
- int rank;
- int tag=0;
- char *cptr; // to manipulate the void * buffers
- smpi_mpi_request_t *requests;
- smpi_mpi_request_t request;
- smpi_mpi_status_t status;
-
-
- smpi_bench_end();
-
- rank = smpi_mpi_comm_rank(comm);
-
- requests = xbt_malloc((comm->size-1) * sizeof(smpi_mpi_request_t));
- if (rank == root) {
- // i am the root: distribute my sendbuf
- //print_buffer_int(sendbuf, comm->size, xbt_strdup("rcvbuf"), rank);
- cptr = sendbuf;
- for (i=0; i < comm->size; i++) {
- if ( i!=root ) { // send to processes ...
-
- retval = smpi_create_request((void *)cptr, sendcount,
- datatype, root, i, tag, comm, &(requests[cnt]));
- if (NULL != requests[cnt] && MPI_SUCCESS == retval) {
- if (MPI_SUCCESS == retval) {
- smpi_mpi_isend(requests[cnt]);
- }
- }
- cnt++;
- }
- else { // ... except if it's me.
- memcpy(recvbuf, (void *)cptr, recvcount*recvtype->size*sizeof(char));
- }
- cptr += sendcount*datatype->size;
- }
- for(i=0; i<cnt; i++) { // wait for send to complete
- /* FIXME: waitall() should be slightly better */
- smpi_mpi_wait(requests[i], &status);
- xbt_mallocator_release(smpi_global->request_mallocator, requests[i]);
-
- }
- }
- else { // i am a non-root process: wait data from the root
- retval = smpi_create_request(recvbuf,recvcount,
- recvtype, root, rank, tag, comm, &request);
- if (NULL != request && MPI_SUCCESS == retval) {
- if (MPI_SUCCESS == retval) {
- smpi_mpi_irecv(request);
- }
- }
- smpi_mpi_wait(request, &status);
- xbt_mallocator_release(smpi_global->request_mallocator, request);
- }
- xbt_free(requests);
-
- smpi_bench_begin();
-
- return retval;
-}
-
-
-/**
- * MPI_Alltoall user entry point
- *
- * Uses the logic of OpenMPI (upto 1.2.7 or greater) for the optimizations
- * ompi/mca/coll/tuned/coll_tuned_module.c
- **/
-int SMPI_MPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype datatype,
- void *recvbuf, int recvcount, MPI_Datatype recvtype,
- MPI_Comm comm)
-{
- int retval = MPI_SUCCESS;
- int block_dsize;
- int rank;
+ xbt_dynar_t srcs = xbt_dynar_new (sizeof(int), xbt_free);
+ xbt_dynar_t dsts = xbt_dynar_new (sizeof(int), xbt_free);
+ xbt_dynar_t recvs = xbt_dynar_new (sizeof(int), xbt_free);
+ for (i = 0; i < count; i++){
+ MPI_Request req = requests[i]; //already received requests are no longer valid
+ if (req){
+ int *asrc = xbt_new(int, 1);
+ int *adst = xbt_new(int, 1);
+ int *arecv = xbt_new(int, 1);
+ *asrc = req->src;
+ *adst = req->dst;
+ *arecv = req->recv;
+ xbt_dynar_insert_at (srcs, i, asrc);
+ xbt_dynar_insert_at (dsts, i, adst);
+ xbt_dynar_insert_at (recvs, i, arecv);
+ }else{
+ int *t = xbt_new(int, 1);
+ xbt_dynar_insert_at (srcs, i, t);
+ xbt_dynar_insert_at (dsts, i, t);
+ xbt_dynar_insert_at (recvs, i, t);
+ }
+ }
+
+ //search for a suitable request to give the rank of current mpi proc
+ MPI_Request req = NULL;
+ for (i = 0; i < count && req == NULL; i++) {
+ req = requests[i];
+ }
+ MPI_Comm comm = (req)->comm;
+ int rank_traced = smpi_comm_rank(comm);
+ TRACE_smpi_ptp_in (rank_traced, -1, -1, __FUNCTION__);
+#endif
+ smpi_bench_end(-1, NULL); //FIXME
+ if(index == NULL) {
+ retval = MPI_ERR_ARG;
+ } else {
+ *index = smpi_mpi_waitany(count, requests, status);
+ retval = MPI_SUCCESS;
+ }
+ smpi_bench_begin(-1, NULL);
+#ifdef HAVE_TRACING
+ int src_traced, dst_traced, is_wait_for_receive;
+ xbt_dynar_get_cpy (srcs, *index, &src_traced);
+ xbt_dynar_get_cpy (dsts, *index, &dst_traced);
+ xbt_dynar_get_cpy (recvs, *index, &is_wait_for_receive);
+ if (is_wait_for_receive){
+ TRACE_smpi_recv (rank_traced, src_traced, dst_traced);
+ }
+ TRACE_smpi_ptp_out (rank_traced, src_traced, dst_traced, __FUNCTION__);
+ //clean-up of dynars
+ xbt_free (srcs);
+ xbt_free (dsts);
+ xbt_free (recvs);
+#endif
+ return retval;
+}
+
+int MPI_Waitall(int count, MPI_Request requests[], MPI_Status status[]) {
+
+#ifdef HAVE_TRACING
+ //save information from requests
+ int i;
+ xbt_dynar_t srcs = xbt_dynar_new (sizeof(int), xbt_free);
+ xbt_dynar_t dsts = xbt_dynar_new (sizeof(int), xbt_free);
+ xbt_dynar_t recvs = xbt_dynar_new (sizeof(int), xbt_free);
+ for (i = 0; i < count; i++){
+ MPI_Request req = requests[i]; //all req should be valid in Waitall
+ int *asrc = xbt_new(int, 1);
+ int *adst = xbt_new(int, 1);
+ int *arecv = xbt_new(int, 1);
+ *asrc = req->src;
+ *adst = req->dst;
+ *arecv = req->recv;
+ xbt_dynar_insert_at (srcs, i, asrc);
+ xbt_dynar_insert_at (dsts, i, adst);
+ xbt_dynar_insert_at (recvs, i, arecv);
+ }
+
+// find my rank inside one of MPI_Comm's of the requests
+ MPI_Request req = NULL;
+ for (i = 0; i < count && req == NULL; i++) {
+ req = requests[i];
+ }
+ MPI_Comm comm = (req)->comm;
+ int rank_traced = smpi_comm_rank(comm);
+ TRACE_smpi_ptp_in (rank_traced, -1, -1, __FUNCTION__);
+#endif
+ smpi_bench_end(-1, NULL); //FIXME
+ smpi_mpi_waitall(count, requests, status);
+ smpi_bench_begin(-1, NULL);
+#ifdef HAVE_TRACING
+ for (i = 0; i < count; i++){
+ int src_traced, dst_traced, is_wait_for_receive;
+ xbt_dynar_get_cpy (srcs, i, &src_traced);
+ xbt_dynar_get_cpy (dsts, i, &dst_traced);
+ xbt_dynar_get_cpy (recvs, i, &is_wait_for_receive);
+ if (is_wait_for_receive){
+ TRACE_smpi_recv (rank_traced, src_traced, dst_traced);
+ }
+ }
+ TRACE_smpi_ptp_out (rank_traced, -1, -1, __FUNCTION__);
+ //clean-up of dynars
+ xbt_free (srcs);
+ xbt_free (dsts);
+ xbt_free (recvs);
+#endif
+ return MPI_SUCCESS;
+}
+
+int MPI_Waitsome(int incount, MPI_Request requests[], int* outcount, int* indices, MPI_Status status[]) {
+ int retval;
+
+ smpi_bench_end(-1, NULL); //FIXME
+ if(outcount == NULL || indices == NULL) {
+ retval = MPI_ERR_ARG;
+ } else {
+ *outcount = smpi_mpi_waitsome(incount, requests, indices, status);
+ retval = MPI_SUCCESS;
+ }
+ smpi_bench_begin(-1, NULL);
+ return retval;
+}
+
+int MPI_Bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
+ int retval;
+ int rank = comm != MPI_COMM_NULL ? smpi_comm_rank(comm) : -1;
+
+#ifdef HAVE_TRACING
+ int root_traced = smpi_group_rank(smpi_comm_group(comm), root);
+ TRACE_smpi_collective_in (rank, root_traced, __FUNCTION__);
+#endif
+ smpi_bench_end(rank, "Bcast");
+ if(comm == MPI_COMM_NULL) {
+ retval = MPI_ERR_COMM;
+ } else {
+ smpi_mpi_bcast(buf, count, datatype, root, comm);
+ retval = MPI_SUCCESS;
+ }
+ smpi_bench_begin(rank, "Bcast");
+#ifdef HAVE_TRACING
+ TRACE_smpi_collective_out (rank, root_traced, __FUNCTION__);
+#endif
+ return retval;
+}
+
+int MPI_Barrier(MPI_Comm comm) {
+ int retval;
+ int rank = comm != MPI_COMM_NULL ? smpi_comm_rank(comm) : -1;
+
+#ifdef HAVE_TRACING
+ TRACE_smpi_collective_in (rank, -1, __FUNCTION__);
+#endif
+ smpi_bench_end(rank, "Barrier");
+ if(comm == MPI_COMM_NULL) {
+ retval = MPI_ERR_COMM;
+ } else {
+ smpi_mpi_barrier(comm);
+ retval = MPI_SUCCESS;
+ }
+ smpi_bench_begin(rank, "Barrier");
+#ifdef HAVE_TRACING
+ TRACE_smpi_collective_out (rank, -1, __FUNCTION__);
+#endif
+ return retval;
+}
+
+int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
+ int retval;
+ int rank = comm != MPI_COMM_NULL ? smpi_comm_rank(comm) : -1;