Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
handle non commutative reduce operations
authorAugustin Degomme <degomme@idpann.imag.fr>
Wed, 26 Jun 2013 09:14:04 +0000 (11:14 +0200)
committerAugustin Degomme <degomme@idpann.imag.fr>
Wed, 26 Jun 2013 09:14:04 +0000 (11:14 +0200)
src/smpi/colls/reduce-binomial.c
src/smpi/colls/reduce-ompi.c
src/smpi/smpi_base.c
teshsuite/smpi/mpich-test/coll/runtests

index 63de8fe..f2555f8 100644 (file)
@@ -13,7 +13,7 @@ int smpi_coll_tuned_reduce_binomial(void *sendbuf, void *recvbuf, int count,
   int tag = 4321;
   MPI_Aint extent;
   void *tmp_buf;
-
+  MPI_Aint true_lb, true_extent;
   if (count == 0)
     return 0;
   rank = smpi_comm_rank(comm);
@@ -22,29 +22,64 @@ int smpi_coll_tuned_reduce_binomial(void *sendbuf, void *recvbuf, int count,
   extent = smpi_datatype_get_extent(datatype);
 
   tmp_buf = (void *) xbt_malloc(count * extent);
-
+  int is_commutative = smpi_op_is_commute(op);
   smpi_mpi_sendrecv(sendbuf, count, datatype, rank, tag,
                recvbuf, count, datatype, rank, tag, comm, &status);
   mask = 1;
-  relrank = (rank - root + comm_size) % comm_size;
+  
+  int lroot;
+  if (is_commutative) 
+        lroot   = root;
+  else
+        lroot   = 0;
+  relrank = (rank - lroot + comm_size) % comm_size;
+
+  smpi_datatype_extent(datatype, &true_lb, &true_extent);
+
+  /* adjust for potential negative lower bound in datatype */
+  tmp_buf = (void *)((char*)tmp_buf - true_lb);
+    
+  /* If I'm not the root, then my recvbuf may not be valid, therefore
+     I have to allocate a temporary one */
+  if (rank != root) {
+      recvbuf = (void *) malloc(count*(max(extent,true_extent)));
+      recvbuf = (void *)((char*)recvbuf - true_lb);
+  }
+   if ((rank != root) || (sendbuf != MPI_IN_PLACE)) {
+      smpi_datatype_copy(sendbuf, count, datatype, recvbuf,count, datatype);
+  }
 
   while (mask < comm_size) {
     /* Receive */
     if ((mask & relrank) == 0) {
       source = (relrank | mask);
       if (source < comm_size) {
-        source = (source + root) % comm_size;
+        source = (source + lroot) % comm_size;
         smpi_mpi_recv(tmp_buf, count, datatype, source, tag, comm, &status);
-        smpi_op_apply(op, tmp_buf, recvbuf, &count, &datatype);
+        
+        if (is_commutative) {
+          smpi_op_apply(op, tmp_buf, recvbuf, &count, &datatype);
+        } else {
+          smpi_op_apply(op, recvbuf, tmp_buf, &count, &datatype);
+          smpi_datatype_copy(tmp_buf, count, datatype,recvbuf, count, datatype);
+        }
       }
     } else {
-      dst = ((relrank & (~mask)) + root) % comm_size;
+      dst = ((relrank & (~mask)) + lroot) % comm_size;
       smpi_mpi_send(recvbuf, count, datatype, dst, tag, comm);
       break;
     }
     mask <<= 1;
   }
 
+if (!is_commutative && (root != 0)){
+  if (rank == 0){
+    smpi_mpi_send(recvbuf, count, datatype, root,tag, comm);
+  }else if (rank == root){
+    smpi_mpi_recv(recvbuf, count, datatype, 0, tag, comm, &status);
+  }
+}
+
   free(tmp_buf);
 
   return 0;
index c5d767c..78294e8 100644 (file)
@@ -92,8 +92,8 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi
            sendbuf to the accumbuf, in order to simplfy the loops */
         if (!smpi_op_is_commute(op)) {
             smpi_datatype_copy(
-                                                (char*)accumbuf, original_count, datatype,
-                                                (char*)sendtmpbuf, original_count, datatype);
+                                                (char*)sendtmpbuf, original_count, datatype,
+                                                (char*)accumbuf, original_count, datatype);
         }
         /* Allocate two buffers for incoming segments */
         real_segment_size = true_extent + (count_by_segment - 1) * extent;
@@ -521,8 +521,8 @@ int smpi_coll_tuned_reduce_ompi_in_order_binary( void *sendbuf, void *recvbuf,
                 return MPI_ERR_INTERN;
             }
             smpi_datatype_copy (
-                                                (char*)tmpbuf, count, datatype,
-                                                (char*)recvbuf, count, datatype);
+                                                (char*)recvbuf, count, datatype,
+                                                (char*)tmpbuf, count, datatype);
             use_this_sendbuf = tmpbuf;
         } else if (io_root == rank) {
             tmpbuf = (char *) malloc(text + (count - 1) * ext);
@@ -639,8 +639,7 @@ smpi_coll_tuned_reduce_ompi_basic_linear(void *sbuf, void *rbuf, int count,
     /* Initialize the receive buffer. */
 
     if (rank == (size - 1)) {
-        smpi_datatype_copy((char*)rbuf, count, dtype,
-                                                  (char*)sbuf, count, dtype);
+        smpi_datatype_copy((char*)sbuf, count, dtype,(char*)rbuf, count, dtype);
     } else {
         smpi_mpi_recv(rbuf, count, dtype, size - 1,
                                 MCA_COLL_BASE_TAG_REDUCE, comm,
@@ -664,8 +663,8 @@ smpi_coll_tuned_reduce_ompi_basic_linear(void *sbuf, void *rbuf, int count,
     }
 
     if (NULL != inplace_temp) {
-        smpi_datatype_copy((char*)sbuf, count, dtype,
-                                                  inplace_temp,count , dtype);
+        smpi_datatype_copy(inplace_temp, count, dtype,(char*)sbuf
+                                                  ,count , dtype);
         free(inplace_temp);
     }
     if (NULL != free_buffer) {
index 8664524..e4ab83b 100644 (file)
@@ -12,7 +12,7 @@
 #include "simix/smx_private.h"
 #include "surf/surf.h"
 #include "simgrid/sg_config.h"
-
+#include "colls/colls.h"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
 
@@ -1173,6 +1173,13 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
 
   rank = smpi_comm_rank(comm);
   size = smpi_comm_size(comm);
+  //non commutative case, use a working algo from openmpi
+  if(!smpi_op_is_commute(op)){
+    smpi_coll_tuned_reduce_ompi_basic_linear(sendbuf, recvbuf, count,
+                     datatype, op, root, comm);
+    return;
+  }
+  
   if(rank != root) {
     // Send buffer to root
     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
index 08e3d85..1b56ab0 100755 (executable)
@@ -114,10 +114,8 @@ RunTest coll8 4
 
 RunTest coll9 4
 
-#smpi does not handle non commutative operations, removed
-#RunTest coll10 4
+RunTest coll10 4
 
-#smpi does not handle non commutative operations, removed
 RunTest coll11 4
 
 #weird manipulations of ranks in split, and comms -> deadlock, removed