Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
leaks --
[simgrid.git] / src / smpi / colls / smpi_mvapich2_selector.c
index d9e3565..b15b5ad 100644 (file)
@@ -3,7 +3,7 @@
 /* Copyright (c) 2009-2010, 2013-2014. The SimGrid Team.
  * All rights reserved.                                                     */
 
-/* This program is xbt_free software; you can redistribute it and/or modify it
+/* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 #include "colls_private.h"
@@ -61,7 +61,7 @@ int smpi_coll_tuned_alltoall_mvapich2( void *sendbuf, int sendcount,
           mv2_alltoall_thresholds_table[conf_index][range].in_place_algo_table[range_threshold].min
           ||nbytes > mv2_alltoall_thresholds_table[conf_index][range].in_place_algo_table[range_threshold].max
       ) {
-          tmp_buf = (char *)xbt_malloc( comm_size * recvcount * recvtype_size );
+          tmp_buf = (char *)smpi_get_tmp_sendbuffer( comm_size * recvcount * recvtype_size );
           mpi_errno = smpi_datatype_copy((char *)recvbuf,
               comm_size*recvcount, recvtype,
               (char *)tmp_buf,
@@ -70,7 +70,7 @@ int smpi_coll_tuned_alltoall_mvapich2( void *sendbuf, int sendcount,
           mpi_errno = MV2_Alltoall_function(tmp_buf, recvcount, recvtype,
               recvbuf, recvcount, recvtype,
               comm );
-          xbt_free(tmp_buf);
+          smpi_free_tmp_buffer(tmp_buf);
       } else {
           mpi_errno = MPIR_Alltoall_inplace_MV2(sendbuf, sendcount, sendtype,
               recvbuf, recvcount, recvtype,
@@ -92,12 +92,12 @@ int smpi_coll_tuned_allgather_mvapich2(void *sendbuf, int sendcount, MPI_Datatyp
   int mpi_errno = MPI_SUCCESS;
   int nbytes = 0, comm_size, recvtype_size;
   int range = 0;
-  //int partial_sub_ok = 0;
+  int partial_sub_ok = 0;
   int conf_index = 0;
   int range_threshold = 0;
   int is_two_level = 0;
-  //int local_size = -1;
-  //MPI_Comm shmem_comm;
+  int local_size = -1;
+  MPI_Comm shmem_comm;
   //MPI_Comm *shmem_commptr=NULL;
   /* Get the size of the communicator */
   comm_size = smpi_comm_size(comm);
@@ -106,34 +106,35 @@ int smpi_coll_tuned_allgather_mvapich2(void *sendbuf, int sendcount, MPI_Datatyp
 
   if(mv2_allgather_table_ppn_conf==NULL)
     init_mv2_allgather_tables_stampede();
+    
+  if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL){
+    smpi_comm_init_smp(comm);
+  }
 
-  //int i;
-  /* check if safe to use partial subscription mode */
-  /*  if (comm->ch.shmem_coll_ok == 1 && comm->ch.is_uniform) {
-
-        shmem_comm = comm->ch.shmem_comm;
-        MPID_Comm_get_ptr(shmem_comm, shmem_commptr);
-        local_size = shmem_commptr->local_size;
-        i = 0;
-        if (mv2_allgather_table_ppn_conf[0] == -1) {
-            // Indicating user defined tuning
-            conf_index = 0;
-            goto conf_check_end;
-        }
-        do {
-            if (local_size == mv2_allgather_table_ppn_conf[i]) {
-                conf_index = i;
-                partial_sub_ok = 1;
-                break;
-            }
-            i++;
-        } while(i < mv2_allgather_num_ppn_conf);
+  int i;
+  if (smpi_comm_is_uniform(comm)){
+    shmem_comm = smpi_comm_get_intra_comm(comm);
+    local_size = smpi_comm_size(shmem_comm);
+    i = 0;
+    if (mv2_allgather_table_ppn_conf[0] == -1) {
+      // Indicating user defined tuning
+      conf_index = 0;
+      goto conf_check_end;
     }
-
+    do {
+      if (local_size == mv2_allgather_table_ppn_conf[i]) {
+        conf_index = i;
+        partial_sub_ok = 1;
+        break;
+      }
+      i++;
+    } while(i < mv2_allgather_num_ppn_conf);
+  }
   conf_check_end:
-    if (partial_sub_ok != 1) {
-        conf_index = 0;
-    }*/
+  if (partial_sub_ok != 1) {
+    conf_index = 0;
+  }
+  
   /* Search for the corresponding system size inside the tuning table */
   while ((range < (mv2_size_allgather_tuning_table[conf_index] - 1)) &&
       (comm_size >
@@ -158,24 +159,21 @@ int smpi_coll_tuned_allgather_mvapich2(void *sendbuf, int sendcount, MPI_Datatyp
 
   /* intracommunicator */
   if(is_two_level ==1){
-
-      /*       if(comm->ch.shmem_coll_ok == 1){
-            MPIR_T_PVAR_COUNTER_INC(MV2, mv2_num_shmem_coll_calls, 1);
-          if (1 == comm->ch.is_blocked) {
-                mpi_errno = MPIR_2lvl_Allgather_MV2(sendbuf, sendcount, sendtype,
-                                                   recvbuf, recvcount, recvtype,
-                                                   comm, errflag);
-          }
-          else {
-              mpi_errno = MPIR_Allgather_intra(sendbuf, sendcount, sendtype,
-                                               recvbuf, recvcount, recvtype,
-                                               comm, errflag);
-          }
-        } else {*/
+    if(partial_sub_ok ==1){
+      if (smpi_comm_is_blocked(comm)){
+      mpi_errno = MPIR_2lvl_Allgather_MV2(sendbuf, sendcount, sendtype,
+                            recvbuf, recvcount, recvtype,
+                            comm);
+      }else{
+      mpi_errno = smpi_coll_tuned_allgather_mpich(sendbuf, sendcount, sendtype,
+                            recvbuf, recvcount, recvtype,
+                            comm);
+      }
+    } else {
       mpi_errno = MPIR_Allgather_RD_MV2(sendbuf, sendcount, sendtype,
           recvbuf, recvcount, recvtype,
           comm);
-      //     }
+    }
   } else if(MV2_Allgather_function == &MPIR_Allgather_Bruck_MV2
       || MV2_Allgather_function == &MPIR_Allgather_RD_MV2
       || MV2_Allgather_function == &MPIR_Allgather_Ring_MV2) {
@@ -361,7 +359,7 @@ int smpi_coll_tuned_allreduce_mvapich2(void *sendbuf,
   int nbytes = 0;
   int range = 0, range_threshold = 0, range_threshold_intra = 0;
   int is_two_level = 0;
-  //int is_commutative = 0;
+  int is_commutative = 0;
   MPI_Aint true_lb, true_extent;
 
   sendtype_size=smpi_datatype_size(datatype);
@@ -429,16 +427,16 @@ int smpi_coll_tuned_allreduce_mvapich2(void *sendbuf,
 
     if(is_two_level == 1){
         // check if shm is ready, if not use other algorithm first
-        /*if ((comm->ch.shmem_coll_ok == 1)
-                    && (mv2_enable_shmem_allreduce)
-                    && (is_commutative)
-                    && (mv2_enable_shmem_collectives)) {
-                    mpi_errno = MPIR_Allreduce_two_level_MV2(sendbuf, recvbuf, count,
+        if (is_commutative) {
+          if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL){
+            smpi_comm_init_smp(comm);
+          }
+          mpi_errno = MPIR_Allreduce_two_level_MV2(sendbuf, recvbuf, count,
                                                      datatype, op, comm);
-                } else {*/
+                } else {
         mpi_errno = MPIR_Allreduce_pt2pt_rd_MV2(sendbuf, recvbuf, count,
             datatype, op, comm);
-        // }
+        }
     } else {
         mpi_errno = MV2_Allreduce_function(sendbuf, recvbuf, count,
             datatype, op, comm);
@@ -485,9 +483,198 @@ int smpi_coll_tuned_bcast_mvapich2(void *buffer,
     MPI_Datatype datatype,
     int root, MPI_Comm comm)
 {
+    int mpi_errno = MPI_SUCCESS;
+    int comm_size/*, rank*/;
+    int two_level_bcast = 1;
+    size_t nbytes = 0; 
+    int range = 0;
+    int range_threshold = 0;
+    int range_threshold_intra = 0;
+    int is_homogeneous, is_contig;
+    MPI_Aint type_size;
+    //, position;
+    void *tmp_buf = NULL;
+    MPI_Comm shmem_comm;
+    //MPID_Datatype *dtp;
+
+    if (count == 0)
+        return MPI_SUCCESS;
+    if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL){
+      smpi_comm_init_smp(comm);
+    }
+    if(!mv2_bcast_thresholds_table)
+      init_mv2_bcast_tables_stampede();
+    comm_size = smpi_comm_size(comm);
+    //rank = smpi_comm_rank(comm);
+
+    is_contig=1;
+/*    if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)*/
+/*        is_contig = 1;*/
+/*    else {*/
+/*        MPID_Datatype_get_ptr(datatype, dtp);*/
+/*        is_contig = dtp->is_contig;*/
+/*    }*/
+
+    is_homogeneous = 1;
+
+    /* MPI_Type_size() might not give the accurate size of the packed
+     * datatype for heterogeneous systems (because of padding, encoding,
+     * etc). On the other hand, MPI_Pack_size() can become very
+     * expensive, depending on the implementation, especially for
+     * heterogeneous systems. We want to use MPI_Type_size() wherever
+     * possible, and MPI_Pack_size() in other places.
+     */
+    //if (is_homogeneous) {
+        type_size=smpi_datatype_size(datatype);
+
+   /* } else {
+        MPIR_Pack_size_impl(1, datatype, &type_size);
+    }*/
+    nbytes = (size_t) (count) * (type_size);
+
+    /* Search for the corresponding system size inside the tuning table */
+    while ((range < (mv2_size_bcast_tuning_table - 1)) &&
+           (comm_size > mv2_bcast_thresholds_table[range].numproc)) {
+        range++;
+    }
+    /* Search for corresponding inter-leader function */
+    while ((range_threshold < (mv2_bcast_thresholds_table[range].size_inter_table - 1))
+           && (nbytes >
+               mv2_bcast_thresholds_table[range].inter_leader[range_threshold].max)
+           && (mv2_bcast_thresholds_table[range].inter_leader[range_threshold].max != -1)) {
+        range_threshold++;
+    }
+
+    /* Search for corresponding intra-node function */
+    while ((range_threshold_intra <
+            (mv2_bcast_thresholds_table[range].size_intra_table - 1))
+           && (nbytes >
+               mv2_bcast_thresholds_table[range].intra_node[range_threshold_intra].max)
+           && (mv2_bcast_thresholds_table[range].intra_node[range_threshold_intra].max !=
+               -1)) {
+        range_threshold_intra++;
+    }
 
-  //TODO : Bcast really needs intra/inter phases in mvapich. Default to mpich if not available
-  return smpi_coll_tuned_bcast_mpich(buffer, count, datatype, root, comm);
+    MV2_Bcast_function =
+        mv2_bcast_thresholds_table[range].inter_leader[range_threshold].
+        MV2_pt_Bcast_function;
+
+    MV2_Bcast_intra_node_function =
+        mv2_bcast_thresholds_table[range].
+        intra_node[range_threshold_intra].MV2_pt_Bcast_function;
+
+/*    if (mv2_user_bcast_intra == NULL && */
+/*            MV2_Bcast_intra_node_function == &MPIR_Knomial_Bcast_intra_node_MV2) {*/
+/*            MV2_Bcast_intra_node_function = &MPIR_Shmem_Bcast_MV2;*/
+/*    }*/
+
+    if (mv2_bcast_thresholds_table[range].inter_leader[range_threshold].
+        zcpy_pipelined_knomial_factor != -1) {
+        zcpy_knomial_factor = 
+            mv2_bcast_thresholds_table[range].inter_leader[range_threshold].
+            zcpy_pipelined_knomial_factor;
+    }
+
+    if (mv2_pipelined_zcpy_knomial_factor != -1) {
+        zcpy_knomial_factor = mv2_pipelined_zcpy_knomial_factor;
+    }
+
+    if(MV2_Bcast_intra_node_function == NULL) {
+        /* if tuning table do not have any intra selection, set func pointer to
+        ** default one for mcast intra node */
+        MV2_Bcast_intra_node_function = &MPIR_Shmem_Bcast_MV2;
+    }
+
+    /* Set value of pipeline segment size */
+    bcast_segment_size = mv2_bcast_thresholds_table[range].bcast_segment_size;
+    
+    /* Set value of inter node knomial factor */
+    mv2_inter_node_knomial_factor = mv2_bcast_thresholds_table[range].inter_node_knomial_factor;
+
+    /* Set value of intra node knomial factor */
+    mv2_intra_node_knomial_factor = mv2_bcast_thresholds_table[range].intra_node_knomial_factor;
+
+    /* Check if we will use a two level algorithm or not */
+    two_level_bcast =
+#if defined(_MCST_SUPPORT_)
+        mv2_bcast_thresholds_table[range].is_two_level_bcast[range_threshold] 
+        || comm->ch.is_mcast_ok;
+#else
+        mv2_bcast_thresholds_table[range].is_two_level_bcast[range_threshold];
+#endif
+     if (two_level_bcast == 1) {
+        if (!is_contig || !is_homogeneous) {
+            tmp_buf=(void *)smpi_get_tmp_sendbuffer(nbytes);
+
+/*            position = 0;*/
+/*            if (rank == root) {*/
+/*                mpi_errno =*/
+/*                    MPIR_Pack_impl(buffer, count, datatype, tmp_buf, nbytes, &position);*/
+/*                if (mpi_errno)*/
+/*                    MPIU_ERR_POP(mpi_errno);*/
+/*            }*/
+        }
+#ifdef CHANNEL_MRAIL_GEN2
+        if ((mv2_enable_zcpy_bcast == 1) &&
+              (&MPIR_Pipelined_Bcast_Zcpy_MV2 == MV2_Bcast_function)) {  
+            if (!is_contig || !is_homogeneous) {
+                mpi_errno = MPIR_Pipelined_Bcast_Zcpy_MV2(tmp_buf, nbytes, MPI_BYTE,
+                                                 root, comm);
+            } else { 
+                mpi_errno = MPIR_Pipelined_Bcast_Zcpy_MV2(buffer, count, datatype,
+                                                 root, comm);
+            } 
+        } else 
+#endif /* defined(CHANNEL_MRAIL_GEN2) */
+        { 
+            shmem_comm = smpi_comm_get_intra_comm(comm);
+            if (!is_contig || !is_homogeneous) {
+                mpi_errno =
+                    MPIR_Bcast_tune_inter_node_helper_MV2(tmp_buf, nbytes, MPI_BYTE,
+                                                          root, comm);
+            } else {
+                mpi_errno =
+                    MPIR_Bcast_tune_inter_node_helper_MV2(buffer, count, datatype, root,
+                                                          comm);
+            }
+
+            /* We are now done with the inter-node phase */
+
+
+                    root = INTRA_NODE_ROOT;
+   
+
+                if (!is_contig || !is_homogeneous) {
+                    mpi_errno = MV2_Bcast_intra_node_function(tmp_buf, nbytes,
+                                                              MPI_BYTE, root, shmem_comm);
+                } else {
+                    mpi_errno = MV2_Bcast_intra_node_function(buffer, count,
+                                                              datatype, root, shmem_comm);
+
+                }
+        } 
+/*        if (!is_contig || !is_homogeneous) {*/
+/*            if (rank != root) {*/
+/*                position = 0;*/
+/*                mpi_errno = MPIR_Unpack_impl(tmp_buf, nbytes, &position, buffer,*/
+/*                                             count, datatype);*/
+/*            }*/
+/*        }*/
+    } else {
+        /* We use Knomial for intra node */
+        MV2_Bcast_intra_node_function = &MPIR_Knomial_Bcast_intra_node_MV2;
+/*        if (mv2_enable_shmem_bcast == 0) {*/
+            /* Fall back to non-tuned version */
+/*            MPIR_Bcast_intra_MV2(buffer, count, datatype, root, comm);*/
+/*        } else {*/
+            mpi_errno = MV2_Bcast_function(buffer, count, datatype, root,
+                                           comm);
+
+/*        }*/
+    }
+
+
+    return mpi_errno;
 
 }
 
@@ -572,14 +759,16 @@ int smpi_coll_tuned_reduce_mvapich2( void *sendbuf,
   /* We call Reduce function */
   if(is_two_level == 1)
     {
-      /* if (comm->ch.shmem_coll_ok == 1
-            && is_commutative == 1) {
-            mpi_errno = MPIR_Reduce_two_level_helper_MV2(sendbuf, recvbuf, count, 
-                                           datatype, op, root, comm, errflag);
-        } else {*/
+       if (is_commutative == 1) {
+         if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL){
+           smpi_comm_init_smp(comm);
+         }
+         mpi_errno = MPIR_Reduce_two_level_helper_MV2(sendbuf, recvbuf, count, 
+                                           datatype, op, root, comm);
+        } else {
       mpi_errno = MPIR_Reduce_binomial_MV2(sendbuf, recvbuf, count,
           datatype, op, root, comm);
-      //}
+      }
     } else if(MV2_Reduce_function == &MPIR_Reduce_inter_knomial_wrapper_MV2 ){
         if(is_commutative ==1)
           {
@@ -677,7 +866,7 @@ int smpi_coll_tuned_reduce_scatter_mvapich2(void *sendbuf, void *recvbuf, int *r
           recvcnts, datatype,
           op, comm);
   }
-
+  xbt_free(disps);
   return mpi_errno;
 
 }
@@ -690,7 +879,7 @@ int smpi_coll_tuned_scatter_mvapich2(void *sendbuf,
     void *recvbuf,
     int recvcnt,
     MPI_Datatype recvtype,
-    int root, MPI_Comm comm_ptr)
+    int root, MPI_Comm comm)
 {
   int range = 0, range_threshold = 0, range_threshold_intra = 0;
   int mpi_errno = MPI_SUCCESS;
@@ -699,16 +888,20 @@ int smpi_coll_tuned_scatter_mvapich2(void *sendbuf,
   int recvtype_size, sendtype_size;
   int partial_sub_ok = 0;
   int conf_index = 0;
-  //  int local_size = -1;
-  //  int i;
-  //   MPI_Comm shmem_comm;
+    int local_size = -1;
+    int i;
+     MPI_Comm shmem_comm;
   //    MPID_Comm *shmem_commptr=NULL;
   if(mv2_scatter_thresholds_table==NULL)
     init_mv2_scatter_tables_stampede();
 
-  comm_size = smpi_comm_size(comm_ptr);
+  if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL){
+    smpi_comm_init_smp(comm);
+  }
+  
+  comm_size = smpi_comm_size(comm);
 
-  rank = smpi_comm_rank(comm_ptr);
+  rank = smpi_comm_rank(comm);
 
   if (rank == root) {
       sendtype_size=smpi_datatype_size(sendtype);
@@ -717,29 +910,28 @@ int smpi_coll_tuned_scatter_mvapich2(void *sendbuf,
       recvtype_size=smpi_datatype_size(recvtype);
       nbytes = recvcnt * recvtype_size;
   }
-  /*
+  
     // check if safe to use partial subscription mode 
-    if (comm_ptr->ch.shmem_coll_ok == 1 && comm_ptr->ch.is_uniform) {
+    if (smpi_comm_is_uniform(comm)) {
 
-        shmem_comm = comm_ptr->ch.shmem_comm;
-        MPID_Comm_get_ptr(shmem_comm, shmem_commptr);
-        local_size = shmem_commptr->local_size;
+        shmem_comm = smpi_comm_get_intra_comm(comm);
+        local_size = smpi_comm_size(shmem_comm);
         i = 0;
         if (mv2_scatter_table_ppn_conf[0] == -1) {
             // Indicating user defined tuning 
             conf_index = 0;
-            goto conf_check_end;
+        }else{
+            do {
+                if (local_size == mv2_scatter_table_ppn_conf[i]) {
+                    conf_index = i;
+                    partial_sub_ok = 1;
+                    break;
+                }
+                i++;
+            } while(i < mv2_scatter_num_ppn_conf);
         }
-        do {
-            if (local_size == mv2_scatter_table_ppn_conf[i]) {
-                conf_index = i;
-                partial_sub_ok = 1;
-                break;
-            }
-            i++;
-        } while(i < mv2_scatter_num_ppn_conf);
     }
-   */
+   
   if (partial_sub_ok != 1) {
       conf_index = 0;
   }
@@ -772,9 +964,9 @@ int smpi_coll_tuned_scatter_mvapich2(void *sendbuf,
 
   if(MV2_Scatter_function == &MPIR_Scatter_mcst_wrap_MV2) {
 #if defined(_MCST_SUPPORT_)
-      if(comm_ptr->ch.is_mcast_ok == 1
+      if(comm->ch.is_mcast_ok == 1
           && mv2_use_mcast_scatter == 1
-          && comm_ptr->ch.shmem_coll_ok == 1) {
+          && comm->ch.shmem_coll_ok == 1) {
           MV2_Scatter_function = &MPIR_Scatter_mcst_MV2;
       } else
 #endif /*#if defined(_MCST_SUPPORT_) */
@@ -792,25 +984,24 @@ int smpi_coll_tuned_scatter_mvapich2(void *sendbuf,
 
   if( (MV2_Scatter_function == &MPIR_Scatter_MV2_two_level_Direct) ||
       (MV2_Scatter_function == &MPIR_Scatter_MV2_two_level_Binomial)) {
-      /* if( comm_ptr->ch.shmem_coll_ok == 1 &&
-             comm_ptr->ch.is_global_block == 1 ) {
+       if( smpi_comm_is_blocked(comm)) {
              MV2_Scatter_intra_function = mv2_scatter_thresholds_table[conf_index][range].intra_node[range_threshold_intra]
                                 .MV2_pt_Scatter_function;
 
              mpi_errno =
                    MV2_Scatter_function(sendbuf, sendcnt, sendtype,
                                         recvbuf, recvcnt, recvtype, root,
-                                        comm_ptr);
-         } else {*/
+                                        comm);
+         } else {
       mpi_errno = MPIR_Scatter_MV2_Binomial(sendbuf, sendcnt, sendtype,
           recvbuf, recvcnt, recvtype, root,
-          comm_ptr);
+          comm);
 
-      //}
+      }
   } else {
       mpi_errno = MV2_Scatter_function(sendbuf, sendcnt, sendtype,
           recvbuf, recvcnt, recvtype, root,
-          comm_ptr);
+          comm);
   }
   return (mpi_errno);
 }