Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
use the right index for sendrecv optimization
[simgrid.git] / src / smpi / smpi_base.c
index baf4891..d7b67c0 100644 (file)
@@ -12,7 +12,7 @@
 #include "simix/smx_private.h"
 #include "surf/surf.h"
 #include "simgrid/sg_config.h"
-
+#include "colls/colls.h"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
 
@@ -527,7 +527,12 @@ void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
 {
   MPI_Request requests[2];
   MPI_Status stats[2];
-
+  int myid=smpi_process_index();
+  if ((smpi_group_index(smpi_comm_group(comm), dst) == myid) && (smpi_group_index(smpi_comm_group(comm), src) == myid)) {
+      smpi_datatype_copy(sendbuf, sendcount, sendtype,
+                                     recvbuf, recvcount, recvtype);
+      return;
+  }
   requests[0] =
     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
   requests[1] =
@@ -907,7 +912,7 @@ void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
                      int root, MPI_Comm comm)
 {
-  int system_tag = 666;
+  int system_tag = COLL_TAG_GATHER;
   int rank, size, src, index;
   MPI_Aint lb = 0, recvext = 0;
   MPI_Request *requests;
@@ -941,11 +946,36 @@ void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
   }
 }
 
+
+void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts,
+                       MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
+{
+    int i, size, count;
+    int *displs;
+    int rank = smpi_process_index();
+    void *tmpbuf;
+
+    /* arbitrarily choose root as rank 0 */
+    size = smpi_comm_size(comm);
+    count = 0;
+    displs = xbt_new(int, size);
+    for (i = 0; i < size; i++) {
+      displs[i] = count;
+      count += recvcounts[i];
+    }
+    tmpbuf=(void*)xbt_malloc(count*smpi_datatype_get_extent(datatype));
+    mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
+    smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf,
+                      recvcounts[rank], datatype, 0, comm);
+    xbt_free(displs);
+    xbt_free(tmpbuf);
+}
+
 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
                       void *recvbuf, int *recvcounts, int *displs,
                       MPI_Datatype recvtype, int root, MPI_Comm comm)
 {
-  int system_tag = 666;
+  int system_tag = COLL_TAG_GATHERV;
   int rank, size, src, index;
   MPI_Aint lb = 0, recvext = 0;
   MPI_Request *requests;
@@ -985,7 +1015,7 @@ void smpi_mpi_allgather(void *sendbuf, int sendcount,
                         int recvcount, MPI_Datatype recvtype,
                         MPI_Comm comm)
 {
-  int system_tag = 666;
+  int system_tag = COLL_TAG_ALLGATHER;
   int rank, size, other, index;
   MPI_Aint lb = 0, recvext = 0;
   MPI_Request *requests;
@@ -1024,7 +1054,7 @@ void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
                          int *recvcounts, int *displs,
                          MPI_Datatype recvtype, MPI_Comm comm)
 {
-  int system_tag = 666;
+  int system_tag = COLL_TAG_ALLGATHERV;
   int rank, size, other, index;
   MPI_Aint lb = 0, recvext = 0;
   MPI_Request *requests;
@@ -1062,7 +1092,7 @@ void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
                       int root, MPI_Comm comm)
 {
-  int system_tag = 666;
+  int system_tag = COLL_TAG_SCATTER;
   int rank, size, dst, index;
   MPI_Aint lb = 0, sendext = 0;
   MPI_Request *requests;
@@ -1101,7 +1131,7 @@ void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
                        MPI_Datatype recvtype, int root, MPI_Comm comm)
 {
-  int system_tag = 666;
+  int system_tag = COLL_TAG_SCATTERV;
   int rank, size, dst, index;
   MPI_Aint lb = 0, sendext = 0;
   MPI_Request *requests;
@@ -1140,23 +1170,35 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
                      MPI_Datatype datatype, MPI_Op op, int root,
                      MPI_Comm comm)
 {
-  int system_tag = 666;
+  int system_tag = COLL_TAG_REDUCE;
   int rank, size, src, index;
   MPI_Aint lb = 0, dataext = 0;
   MPI_Request *requests;
   void **tmpbufs;
 
+  char* sendtmpbuf = (char*) sendbuf;
+  if( sendbuf == MPI_IN_PLACE ) {
+      sendtmpbuf = (char *)recvbuf;
+  }
+
   rank = smpi_comm_rank(comm);
   size = smpi_comm_size(comm);
+  //non commutative case, use a working algo from openmpi
+  if(!smpi_op_is_commute(op)){
+    smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count,
+                     datatype, op, root, comm);
+    return;
+  }
+  
   if(rank != root) {
     // Send buffer to root
-    smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
+    smpi_mpi_send(sendtmpbuf, count, datatype, root, system_tag, comm);
   } else {
     // FIXME: check for errors
     smpi_datatype_extent(datatype, &lb, &dataext);
     // Local copy from root
-    if (sendbuf && recvbuf)
-      smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
+    if (sendtmpbuf && recvbuf)
+      smpi_datatype_copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
     // Receive buffers from senders
     //TODO: make a MPI_barrier here ?
     requests = xbt_new(MPI_Request, size - 1);
@@ -1202,7 +1244,7 @@ void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
-  int system_tag = 666;
+  int system_tag = 888;
   int rank, size, other, index;
   MPI_Aint lb = 0, dataext = 0;
   MPI_Request *requests;