Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Sanitize the API of this_actor::parallel_execute()
authorMartin Quinson <martin.quinson@loria.fr>
Wed, 17 Oct 2018 16:11:12 +0000 (18:11 +0200)
committerMartin Quinson <martin.quinson@loria.fr>
Wed, 17 Oct 2018 16:11:12 +0000 (18:11 +0200)
- Use std::vector instead of C arrays (old API remains but is not
  documented -- it should be properly deprecated)
- The flop_amounts and comm_amounts arrays are not automatically freed
  by the internal functions, and should be properly cleaned by their
  creator.
  - EXCEPTION: in ptask model, sequential exec and regular comms don't
    have a real caller, so the internal function still has to free
    these arrays... Sick Sad World.
  - The proper solution would be to have the only copy of these arrays
    in the Action instead of having it in s4u.
  - But for now, Actions start as soon as created. So if you want to
    init them without starting, you have to have the data in s4u and
    only create the implementation side when you start the stuff.
  - That should obviously be fixed :) First step in that direction
    would be to have the constructor of each action NOT register the
    action in the LMM, but have an Action::start() in charge of this.
    For each subclass of Action.

examples/s4u/exec-ptask/s4u-exec-ptask.cpp
include/simgrid/s4u/Actor.hpp
src/s4u/s4u_Actor.cpp
src/simix/libsmx.cpp
src/simix/smx_host.cpp
src/surf/HostImpl.cpp
src/surf/ptask_L07.cpp
src/surf/ptask_L07.hpp

index e5b9b6b..1490771 100644 (file)
@@ -31,31 +31,29 @@ static void runner()
 
   XBT_INFO("First, build a classical parallel task, with 1 Gflop to execute on each node, "
            "and 10MB to exchange between each pair");
-  double* computation_amounts   = new double[hosts_count]();
-  double* communication_amounts = new double[hosts_count * hosts_count]();
 
-  for (int i               = 0; i < hosts_count; i++)
-    computation_amounts[i] = 1e9; // 1 Gflop
+  std::vector<double> computation_amounts;
+  std::vector<double> communication_amounts;
 
+  /* ------[ test 1 ]----------------- */
+  computation_amounts.assign(hosts.size(), 1e9 /*1Gflop*/);
+  communication_amounts.assign(hosts.size() * hosts.size(), 0);
   for (int i = 0; i < hosts_count; i++)
     for (int j = i + 1; j < hosts_count; j++)
       communication_amounts[i * hosts_count + j] = 1e7; // 10 MB
 
-  simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, communication_amounts);
+  simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts);
 
+  /* ------[ test 2 ]----------------- */
   XBT_INFO("We can do the same with a timeout of one second enabled.");
-  computation_amounts   = new double[hosts_count]();
-  communication_amounts = new double[hosts_count * hosts_count]();
-
-  for (int i               = 0; i < hosts_count; i++)
-    computation_amounts[i] = 1e9; // 1 Gflop
-
+  computation_amounts.assign(hosts.size(), 1e9 /*1Gflop*/);
+  communication_amounts.assign(hosts.size() * hosts.size(), 0);
   for (int i = 0; i < hosts_count; i++)
     for (int j = i + 1; j < hosts_count; j++)
       communication_amounts[i * hosts_count + j] = 1e7; // 10 MB
 
   try {
-    simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, communication_amounts,
+    simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts,
                                                1.0 /* timeout (in seconds)*/);
     XBT_WARN("Woops, this did not timeout as expected... Please report that bug.");
   } catch (xbt_ex& e) {
@@ -63,28 +61,34 @@ static void runner()
     XBT_DEBUG("Caught expected exception: %s", e.what());
   }
 
+  /* ------[ test 3 ]----------------- */
   XBT_INFO("Then, build a parallel task involving only computations and no communication (1 Gflop per node)");
-  computation_amounts = new double[hosts_count]();
-  for (int i               = 0; i < hosts_count; i++)
-    computation_amounts[i] = 1e9; // 1 Gflop
-  simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, nullptr /* no comm */);
+  computation_amounts.assign(hosts.size(), 1e9 /*1Gflop*/);
+  communication_amounts.clear(); /* no comm */
+  simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts);
 
+  /* ------[ test 4 ]----------------- */
   XBT_INFO("Then, build a parallel task involving only heterogeneous computations and no communication");
-  computation_amounts = new double[hosts_count]();
-  for (int i               = 0; i < hosts_count; i++)
+  computation_amounts.resize(hosts.size());
+  for (int i = 0; i < hosts_count; i++)
     computation_amounts[i] = 5 * (i + 1) * 1e8; // 500Mflop, 1Gflop, 1.5Gflop
-  simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, nullptr /* no comm */);
+  communication_amounts.clear();                /* no comm */
+  simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts);
 
+  /* ------[ test 5 ]----------------- */
   XBT_INFO("Then, build a parallel task with no computation nor communication (synchro only)");
-  computation_amounts   = new double[hosts_count]();
-  communication_amounts = new double[hosts_count * hosts_count]();
-  simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, communication_amounts);
+  computation_amounts.clear();
+  communication_amounts.clear();
+  simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts);
 
+  /* ------[ test 6 ]----------------- */
   XBT_INFO("Finally, trick the ptask to do a 'remote execution', on host %s", hosts[1]->get_cname());
-  computation_amounts = new double[1]{1e9};
+  std::vector<simgrid::s4u::Host*> remote;
+  remote.push_back(hosts[1]);
+  computation_amounts.assign(1, 1e9);
+  communication_amounts.clear();
 
-  simgrid::s4u::Host* remote[] = {hosts[1]};
-  simgrid::s4u::this_actor::parallel_execute(1, remote, computation_amounts, nullptr);
+  simgrid::s4u::this_actor::parallel_execute(remote, computation_amounts, communication_amounts);
 
   XBT_INFO("Goodbye now!");
 }
index cbe234d..098c8b9 100644 (file)
@@ -433,6 +433,9 @@ XBT_PUBLIC void execute(double flop, double priority);
  * each host, using a vector of flops amount. Then, you should specify the amount of data exchanged between each
  * hosts during the parallel kernel. For that, a matrix of values is expected.
  *
+ * It is OK to build a parallel execution without any computation and/or without any communication.
+ * Just pass an empty vector to the corresponding parameter.
+ *
  * For example, if your list of hosts is ``[host0, host1]``, passing a vector ``[1000, 2000]`` as a `flops_amount`
  * vector means that `host0` should compute 1000 flops while `host1` will compute 2000 flops. A matrix of
  * communications' sizes of ``[0, 1, 2, 3]`` specifies the following data exchanges:
@@ -461,14 +464,21 @@ XBT_PUBLIC void execute(double flop, double priority);
  *
  * \endrst
  */
+XBT_PUBLIC void parallel_execute(std::vector<s4u::Host*> hosts, std::vector<double> flops_amounts,
+                                 std::vector<double> bytes_amounts);
 
-XBT_PUBLIC void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount);
 /** \rst
  * Block the actor until the built :ref:`parallel execution <API_s4u_parallel_execute>` completes, or until the timeout.
  * \endrst
  */
+XBT_PUBLIC void parallel_execute(std::vector<s4u::Host*> hosts, std::vector<double> flops_amounts,
+                                 std::vector<double> bytes_amounts, double timeout);
+
+#ifndef DOXYGEN
+XBT_PUBLIC void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount);
 XBT_PUBLIC void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount,
                                  double timeout);
+#endif
 
 XBT_PUBLIC ExecPtr exec_init(double flops_amounts);
 XBT_PUBLIC ExecPtr exec_async(double flops_amounts);
index 945dfb4..56d2827 100644 (file)
@@ -293,11 +293,38 @@ void execute(double flops, double priority)
   exec_init(flops)->set_priority(priority)->start()->wait();
 }
 
+void parallel_execute(std::vector<s4u::Host*> hosts, std::vector<double> flops_amounts,
+                      std::vector<double> bytes_amounts)
+{
+  parallel_execute(hosts, flops_amounts, bytes_amounts, -1);
+}
+void parallel_execute(std::vector<s4u::Host*> hosts, std::vector<double> flops_amounts,
+                      std::vector<double> bytes_amounts, double timeout)
+{
+  xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host.");
+  xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(),
+             "Host count (%zu) does not match flops_amount count (%zu).", hosts.size(), flops_amounts.size());
+  xbt_assert(hosts.size() * hosts.size() == bytes_amounts.size() || bytes_amounts.empty(),
+             "bytes_amounts must be a matrix of size host_count * host_count (%zu*%zu), but it's of size %zu.",
+             hosts.size(), hosts.size(), flops_amounts.size());
+
+  /* The vectors live as parameter of parallel_execute. No copy is created for simcall_execution_parallel_start(),
+   * but that's OK because simcall_execution_wait() is called from here too.
+   */
+  smx_activity_t s = simcall_execution_parallel_start("", hosts.size(), hosts.data(),
+                                                      (flops_amounts.empty() ? nullptr : flops_amounts.data()),
+                                                      (bytes_amounts.empty() ? nullptr : bytes_amounts.data()),
+                                                      /* rate */ -1, timeout);
+  simcall_execution_wait(s);
+}
+
 void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount, double timeout)
 {
   smx_activity_t s =
       simcall_execution_parallel_start("", host_nb, host_list, flops_amount, bytes_amount, /* rate */ -1, timeout);
   simcall_execution_wait(s);
+  delete[] flops_amount;
+  delete[] bytes_amount;
 }
 
 void parallel_execute(int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount)
index 0dbe5b1..cff0c83 100644 (file)
@@ -73,7 +73,8 @@ smx_activity_t simcall_execution_parallel_start(std::string name, int host_nb, s
 {
   /* checking for infinite values */
   for (int i = 0 ; i < host_nb ; ++i) {
-    xbt_assert(std::isfinite(flops_amount[i]), "flops_amount[%d] is not finite!", i);
+    if (flops_amount != nullptr)
+      xbt_assert(std::isfinite(flops_amount[i]), "flops_amount[%d] is not finite!", i);
     if (bytes_amount != nullptr) {
       for (int j = 0 ; j < host_nb ; ++j) {
         xbt_assert(std::isfinite(bytes_amount[i + host_nb * j]),
index 745a67d..8c42e6c 100644 (file)
@@ -63,9 +63,7 @@ simgrid::kernel::activity::ExecImplPtr SIMIX_execution_parallel_start(std::strin
   simgrid::kernel::resource::Action* surf_action      = nullptr;
   simgrid::kernel::resource::Action* timeout_detector = nullptr;
   if (not MC_is_active() && not MC_record_replay_is_active()) {
-    sg_host_t* host_list_cpy = new sg_host_t[host_nb];
-    std::copy_n(host_list, host_nb, host_list_cpy);
-    surf_action = surf_host_model->execute_parallel(host_nb, host_list_cpy, flops_amount, bytes_amount, rate);
+    surf_action = surf_host_model->execute_parallel(host_nb, host_list, flops_amount, bytes_amount, rate);
     if (timeout > 0) {
       timeout_detector = host_list[0]->pimpl_cpu->sleep(timeout);
     }
index f718d80..ce622c4 100644 (file)
@@ -57,7 +57,7 @@ kernel::resource::Action* HostModel::execute_parallel(int host_nb, s4u::Host** h
                                                       double* bytes_amount, double rate)
 {
   kernel::resource::Action* action = nullptr;
-  if ((host_nb == 1) && (has_cost(bytes_amount, 0) <= 0)) {
+  if ((host_nb == 1) && (has_cost(bytes_amount, 0) <= 0) && (has_cost(flops_amount, 0) > 0)) {
     action = host_list[0]->pimpl_cpu->execution_start(flops_amount[0]);
   } else if ((host_nb == 1) && (has_cost(flops_amount, 0) <= 0)) {
     action = surf_network_model->communicate(host_list[0], host_list[0], bytes_amount[0], rate);
@@ -87,9 +87,6 @@ kernel::resource::Action* HostModel::execute_parallel(int host_nb, s4u::Host** h
         " - Self-comms with one host only\n"
         " - Communications with two hosts and no computation");
   }
-  delete[] host_list;
-  delete[] flops_amount;
-  delete[] bytes_amount;
   return action;
 }
 
index 75f1027..5943f79 100644 (file)
@@ -147,12 +147,14 @@ L07Action::L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* hos
   double latency = 0.0;
   this->set_last_update();
 
-  this->hostList_->reserve(host_nb);
-  for (int i = 0; i < host_nb; i++) {
-    this->hostList_->push_back(host_list[i]);
-    if (flops_amount[i] > 0)
-      nb_used_host++;
-  }
+  this->hostList_.reserve(host_nb);
+  for (int i = 0; i < host_nb; i++)
+    this->hostList_.push_back(host_list[i]);
+
+  if (flops_amount != nullptr)
+    for (int i = 0; i < host_nb; i++)
+      if (flops_amount[i] > 0)
+        nb_used_host++;
 
   /* Compute the number of affected resources... */
   if(bytes_amount != nullptr) {
@@ -165,7 +167,7 @@ L07Action::L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* hos
           double lat=0.0;
 
           std::vector<kernel::resource::LinkImpl*> route;
-          hostList_->at(i)->route_to(hostList_->at(j), route, &lat);
+          hostList_.at(i)->route_to(hostList_.at(j), route, &lat);
           latency = std::max(latency, lat);
 
           for (auto const& link : route)
@@ -185,15 +187,16 @@ L07Action::L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* hos
   if (latency_ > 0)
     model->get_maxmin_system()->update_variable_weight(get_variable(), 0.0);
 
-  for (int i = 0; i < host_nb; i++)
-    model->get_maxmin_system()->expand(host_list[i]->pimpl_cpu->get_constraint(), get_variable(), flops_amount[i]);
+  if (flops_amount != nullptr)
+    for (int i = 0; i < host_nb; i++)
+      model->get_maxmin_system()->expand(host_list[i]->pimpl_cpu->get_constraint(), get_variable(), flops_amount[i]);
 
-  if(bytes_amount != nullptr) {
+  if (bytes_amount != nullptr) {
     for (int i = 0; i < host_nb; i++) {
       for (int j = 0; j < host_nb; j++) {
         if (bytes_amount[i * host_nb + j] > 0.0) {
           std::vector<kernel::resource::LinkImpl*> route;
-          hostList_->at(i)->route_to(hostList_->at(j), route, nullptr);
+          hostList_.at(i)->route_to(hostList_.at(j), route, nullptr);
 
           for (auto const& link : route)
             model->get_maxmin_system()->expand_add(link->get_constraint(), this->get_variable(),
@@ -207,7 +210,6 @@ L07Action::L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* hos
     this->set_cost(1.0);
     this->set_remains(0.0);
   }
-  delete[] host_list;
 }
 
 kernel::resource::Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate)
@@ -220,7 +222,9 @@ kernel::resource::Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host
   host_list[1]    = dst;
   bytes_amount[1] = size;
 
-  return hostModel_->execute_parallel(2, host_list, flops_amount, bytes_amount, rate);
+  kernel::resource::Action* res = hostModel_->execute_parallel(2, host_list, flops_amount, bytes_amount, rate);
+  static_cast<L07Action*>(res)->free_arrays_ = true;
+  return res;
 }
 
 Cpu* CpuL07Model::create_cpu(simgrid::s4u::Host* host, std::vector<double>* speed_per_pstate, int core)
@@ -261,13 +265,15 @@ LinkL07::LinkL07(NetworkL07Model* model, const std::string& name, double bandwid
 
 kernel::resource::Action* CpuL07::execution_start(double size)
 {
-  sg_host_t* host_list = new sg_host_t[1]();
-  double* flops_amount = new double[1]();
+  sg_host_t host_list[1] = {get_host()};
 
-  host_list[0]    = get_host();
+  double* flops_amount = new double[1]();
   flops_amount[0] = size;
 
-  return static_cast<CpuL07Model*>(get_model())->hostModel_->execute_parallel(1, host_list, flops_amount, nullptr, -1);
+  kernel::resource::Action* res =
+      static_cast<CpuL07Model*>(get_model())->hostModel_->execute_parallel(1, host_list, flops_amount, nullptr, -1);
+  static_cast<L07Action*>(res)->free_arrays_ = true;
+  return res;
 }
 
 kernel::resource::Action* CpuL07::sleep(double duration)
@@ -378,17 +384,19 @@ LinkL07::~LinkL07() = default;
  * Action *
  **********/
 
-L07Action::~L07Action(){
-  delete hostList_;
-  delete[] communicationAmount_;
-  delete[] computationAmount_;
+L07Action::~L07Action()
+{
+  if (free_arrays_) {
+    delete[] computationAmount_;
+    delete[] communicationAmount_;
+  }
 }
 
 void L07Action::updateBound()
 {
   double lat_current = 0.0;
 
-  int hostNb = hostList_->size();
+  int hostNb = hostList_.size();
 
   if (communicationAmount_ != nullptr) {
     for (int i = 0; i < hostNb; i++) {
@@ -397,7 +405,7 @@ void L07Action::updateBound()
         if (communicationAmount_[i * hostNb + j] > 0) {
           double lat = 0.0;
           std::vector<kernel::resource::LinkImpl*> route;
-          hostList_->at(i)->route_to(hostList_->at(j), route, &lat);
+          hostList_.at(i)->route_to(hostList_.at(j), route, &lat);
 
           lat_current = std::max(lat_current, lat * communicationAmount_[i * hostNb + j]);
         }
index 98d6940..70efcaa 100644 (file)
@@ -106,6 +106,7 @@ class L07Action : public CpuAction {
   friend Action *CpuL07::sleep(double duration);
   friend Action* HostL07Model::execute_parallel(int host_nb, sg_host_t* host_list, double* flops_amount,
                                                 double* bytes_amount, double rate);
+  friend Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate);
 
 public:
   L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* host_list, double* flops_amount,
@@ -114,11 +115,15 @@ public:
 
   void updateBound();
 
-  std::vector<s4u::Host*>* hostList_ = new std::vector<s4u::Host*>();
-  double *computationAmount_;
-  double *communicationAmount_;
+  std::vector<s4u::Host*> hostList_;
+  double* computationAmount_;   /* pointer to the data that lives in s4u action -- do not free unless if free_arrays */
+  double* communicationAmount_; /* pointer to the data that lives in s4u action -- do not free unless if free_arrays */
   double latency_;
   double rate_;
+
+private:
+  bool free_arrays_ = false; // By default, computationAmount_ and friends are freed by caller. But not for sequential
+                             // exec and regular comms
 };
 
 }