XBT_INFO("First, build a classical parallel task, with 1 Gflop to execute on each node, "
"and 10MB to exchange between each pair");
- double* computation_amounts = new double[hosts_count]();
- double* communication_amounts = new double[hosts_count * hosts_count]();
- for (int i = 0; i < hosts_count; i++)
- computation_amounts[i] = 1e9; // 1 Gflop
+ std::vector<double> computation_amounts;
+ std::vector<double> communication_amounts;
+ /* ------[ test 1 ]----------------- */
+ computation_amounts.assign(hosts.size(), 1e9 /*1Gflop*/);
+ communication_amounts.assign(hosts.size() * hosts.size(), 0);
for (int i = 0; i < hosts_count; i++)
for (int j = i + 1; j < hosts_count; j++)
communication_amounts[i * hosts_count + j] = 1e7; // 10 MB
- simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, communication_amounts);
+ simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts);
+ /* ------[ test 2 ]----------------- */
XBT_INFO("We can do the same with a timeout of one second enabled.");
- computation_amounts = new double[hosts_count]();
- communication_amounts = new double[hosts_count * hosts_count]();
-
- for (int i = 0; i < hosts_count; i++)
- computation_amounts[i] = 1e9; // 1 Gflop
-
+ computation_amounts.assign(hosts.size(), 1e9 /*1Gflop*/);
+ communication_amounts.assign(hosts.size() * hosts.size(), 0);
for (int i = 0; i < hosts_count; i++)
for (int j = i + 1; j < hosts_count; j++)
communication_amounts[i * hosts_count + j] = 1e7; // 10 MB
try {
- simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, communication_amounts,
+ simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts,
1.0 /* timeout (in seconds)*/);
XBT_WARN("Woops, this did not timeout as expected... Please report that bug.");
} catch (xbt_ex& e) {
XBT_DEBUG("Caught expected exception: %s", e.what());
}
+ /* ------[ test 3 ]----------------- */
XBT_INFO("Then, build a parallel task involving only computations and no communication (1 Gflop per node)");
- computation_amounts = new double[hosts_count]();
- for (int i = 0; i < hosts_count; i++)
- computation_amounts[i] = 1e9; // 1 Gflop
- simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, nullptr /* no comm */);
+ computation_amounts.assign(hosts.size(), 1e9 /*1Gflop*/);
+ communication_amounts.clear(); /* no comm */
+ simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts);
+ /* ------[ test 4 ]----------------- */
XBT_INFO("Then, build a parallel task involving only heterogeneous computations and no communication");
- computation_amounts = new double[hosts_count]();
- for (int i = 0; i < hosts_count; i++)
+ computation_amounts.resize(hosts.size());
+ for (int i = 0; i < hosts_count; i++)
computation_amounts[i] = 5 * (i + 1) * 1e8; // 500Mflop, 1Gflop, 1.5Gflop
- simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, nullptr /* no comm */);
+ communication_amounts.clear(); /* no comm */
+ simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts);
+ /* ------[ test 5 ]----------------- */
XBT_INFO("Then, build a parallel task with no computation nor communication (synchro only)");
- computation_amounts = new double[hosts_count]();
- communication_amounts = new double[hosts_count * hosts_count]();
- simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, communication_amounts);
+ computation_amounts.clear();
+ communication_amounts.clear();
+ simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts);
+ /* ------[ test 6 ]----------------- */
XBT_INFO("Finally, trick the ptask to do a 'remote execution', on host %s", hosts[1]->get_cname());
- computation_amounts = new double[1]{1e9};
+ std::vector<simgrid::s4u::Host*> remote;
+ remote.push_back(hosts[1]);
+ computation_amounts.assign(1, 1e9);
+ communication_amounts.clear();
- simgrid::s4u::Host* remote[] = {hosts[1]};
- simgrid::s4u::this_actor::parallel_execute(1, remote, computation_amounts, nullptr);
+ simgrid::s4u::this_actor::parallel_execute(remote, computation_amounts, communication_amounts);
XBT_INFO("Goodbye now!");
}
* each host, using a vector of flops amount. Then, you should specify the amount of data exchanged between each
* hosts during the parallel kernel. For that, a matrix of values is expected.
*
+ * It is OK to build a parallel execution without any computation and/or without any communication.
+ * Just pass an empty vector to the corresponding parameter.
+ *
* For example, if your list of hosts is ``[host0, host1]``, passing a vector ``[1000, 2000]`` as a `flops_amount`
* vector means that `host0` should compute 1000 flops while `host1` will compute 2000 flops. A matrix of
* communications' sizes of ``[0, 1, 2, 3]`` specifies the following data exchanges:
*
* \endrst
*/
+XBT_PUBLIC void parallel_execute(std::vector<s4u::Host*> hosts, std::vector<double> flops_amounts,
+ std::vector<double> bytes_amounts);
-XBT_PUBLIC void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount);
/** \rst
* Block the actor until the built :ref:`parallel execution <API_s4u_parallel_execute>` completes, or until the timeout.
* \endrst
*/
+XBT_PUBLIC void parallel_execute(std::vector<s4u::Host*> hosts, std::vector<double> flops_amounts,
+ std::vector<double> bytes_amounts, double timeout);
+
+#ifndef DOXYGEN
+XBT_PUBLIC void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount);
XBT_PUBLIC void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount,
double timeout);
+#endif
XBT_PUBLIC ExecPtr exec_init(double flops_amounts);
XBT_PUBLIC ExecPtr exec_async(double flops_amounts);
exec_init(flops)->set_priority(priority)->start()->wait();
}
+void parallel_execute(std::vector<s4u::Host*> hosts, std::vector<double> flops_amounts,
+ std::vector<double> bytes_amounts)
+{
+ parallel_execute(hosts, flops_amounts, bytes_amounts, -1);
+}
+void parallel_execute(std::vector<s4u::Host*> hosts, std::vector<double> flops_amounts,
+ std::vector<double> bytes_amounts, double timeout)
+{
+ xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host.");
+ xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(),
+ "Host count (%zu) does not match flops_amount count (%zu).", hosts.size(), flops_amounts.size());
+ xbt_assert(hosts.size() * hosts.size() == bytes_amounts.size() || bytes_amounts.empty(),
+ "bytes_amounts must be a matrix of size host_count * host_count (%zu*%zu), but it's of size %zu.",
+ hosts.size(), hosts.size(), flops_amounts.size());
+
+ /* The vectors live as parameter of parallel_execute. No copy is created for simcall_execution_parallel_start(),
+ * but that's OK because simcall_execution_wait() is called from here too.
+ */
+ smx_activity_t s = simcall_execution_parallel_start("", hosts.size(), hosts.data(),
+ (flops_amounts.empty() ? nullptr : flops_amounts.data()),
+ (bytes_amounts.empty() ? nullptr : bytes_amounts.data()),
+ /* rate */ -1, timeout);
+ simcall_execution_wait(s);
+}
+
void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount, double timeout)
{
smx_activity_t s =
simcall_execution_parallel_start("", host_nb, host_list, flops_amount, bytes_amount, /* rate */ -1, timeout);
simcall_execution_wait(s);
+ delete[] flops_amount;
+ delete[] bytes_amount;
}
void parallel_execute(int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount)
{
/* checking for infinite values */
for (int i = 0 ; i < host_nb ; ++i) {
- xbt_assert(std::isfinite(flops_amount[i]), "flops_amount[%d] is not finite!", i);
+ if (flops_amount != nullptr)
+ xbt_assert(std::isfinite(flops_amount[i]), "flops_amount[%d] is not finite!", i);
if (bytes_amount != nullptr) {
for (int j = 0 ; j < host_nb ; ++j) {
xbt_assert(std::isfinite(bytes_amount[i + host_nb * j]),
simgrid::kernel::resource::Action* surf_action = nullptr;
simgrid::kernel::resource::Action* timeout_detector = nullptr;
if (not MC_is_active() && not MC_record_replay_is_active()) {
- sg_host_t* host_list_cpy = new sg_host_t[host_nb];
- std::copy_n(host_list, host_nb, host_list_cpy);
- surf_action = surf_host_model->execute_parallel(host_nb, host_list_cpy, flops_amount, bytes_amount, rate);
+ surf_action = surf_host_model->execute_parallel(host_nb, host_list, flops_amount, bytes_amount, rate);
if (timeout > 0) {
timeout_detector = host_list[0]->pimpl_cpu->sleep(timeout);
}
double* bytes_amount, double rate)
{
kernel::resource::Action* action = nullptr;
- if ((host_nb == 1) && (has_cost(bytes_amount, 0) <= 0)) {
+ if ((host_nb == 1) && (has_cost(bytes_amount, 0) <= 0) && (has_cost(flops_amount, 0) > 0)) {
action = host_list[0]->pimpl_cpu->execution_start(flops_amount[0]);
} else if ((host_nb == 1) && (has_cost(flops_amount, 0) <= 0)) {
action = surf_network_model->communicate(host_list[0], host_list[0], bytes_amount[0], rate);
" - Self-comms with one host only\n"
" - Communications with two hosts and no computation");
}
- delete[] host_list;
- delete[] flops_amount;
- delete[] bytes_amount;
return action;
}
double latency = 0.0;
this->set_last_update();
- this->hostList_->reserve(host_nb);
- for (int i = 0; i < host_nb; i++) {
- this->hostList_->push_back(host_list[i]);
- if (flops_amount[i] > 0)
- nb_used_host++;
- }
+ this->hostList_.reserve(host_nb);
+ for (int i = 0; i < host_nb; i++)
+ this->hostList_.push_back(host_list[i]);
+
+ if (flops_amount != nullptr)
+ for (int i = 0; i < host_nb; i++)
+ if (flops_amount[i] > 0)
+ nb_used_host++;
/* Compute the number of affected resources... */
if(bytes_amount != nullptr) {
double lat=0.0;
std::vector<kernel::resource::LinkImpl*> route;
- hostList_->at(i)->route_to(hostList_->at(j), route, &lat);
+ hostList_.at(i)->route_to(hostList_.at(j), route, &lat);
latency = std::max(latency, lat);
for (auto const& link : route)
if (latency_ > 0)
model->get_maxmin_system()->update_variable_weight(get_variable(), 0.0);
- for (int i = 0; i < host_nb; i++)
- model->get_maxmin_system()->expand(host_list[i]->pimpl_cpu->get_constraint(), get_variable(), flops_amount[i]);
+ if (flops_amount != nullptr)
+ for (int i = 0; i < host_nb; i++)
+ model->get_maxmin_system()->expand(host_list[i]->pimpl_cpu->get_constraint(), get_variable(), flops_amount[i]);
- if(bytes_amount != nullptr) {
+ if (bytes_amount != nullptr) {
for (int i = 0; i < host_nb; i++) {
for (int j = 0; j < host_nb; j++) {
if (bytes_amount[i * host_nb + j] > 0.0) {
std::vector<kernel::resource::LinkImpl*> route;
- hostList_->at(i)->route_to(hostList_->at(j), route, nullptr);
+ hostList_.at(i)->route_to(hostList_.at(j), route, nullptr);
for (auto const& link : route)
model->get_maxmin_system()->expand_add(link->get_constraint(), this->get_variable(),
this->set_cost(1.0);
this->set_remains(0.0);
}
- delete[] host_list;
}
kernel::resource::Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate)
host_list[1] = dst;
bytes_amount[1] = size;
- return hostModel_->execute_parallel(2, host_list, flops_amount, bytes_amount, rate);
+ kernel::resource::Action* res = hostModel_->execute_parallel(2, host_list, flops_amount, bytes_amount, rate);
+ static_cast<L07Action*>(res)->free_arrays_ = true;
+ return res;
}
Cpu* CpuL07Model::create_cpu(simgrid::s4u::Host* host, std::vector<double>* speed_per_pstate, int core)
kernel::resource::Action* CpuL07::execution_start(double size)
{
- sg_host_t* host_list = new sg_host_t[1]();
- double* flops_amount = new double[1]();
+ sg_host_t host_list[1] = {get_host()};
- host_list[0] = get_host();
+ double* flops_amount = new double[1]();
flops_amount[0] = size;
- return static_cast<CpuL07Model*>(get_model())->hostModel_->execute_parallel(1, host_list, flops_amount, nullptr, -1);
+ kernel::resource::Action* res =
+ static_cast<CpuL07Model*>(get_model())->hostModel_->execute_parallel(1, host_list, flops_amount, nullptr, -1);
+ static_cast<L07Action*>(res)->free_arrays_ = true;
+ return res;
}
kernel::resource::Action* CpuL07::sleep(double duration)
* Action *
**********/
-L07Action::~L07Action(){
- delete hostList_;
- delete[] communicationAmount_;
- delete[] computationAmount_;
+L07Action::~L07Action()
+{
+ if (free_arrays_) {
+ delete[] computationAmount_;
+ delete[] communicationAmount_;
+ }
}
void L07Action::updateBound()
{
double lat_current = 0.0;
- int hostNb = hostList_->size();
+ int hostNb = hostList_.size();
if (communicationAmount_ != nullptr) {
for (int i = 0; i < hostNb; i++) {
if (communicationAmount_[i * hostNb + j] > 0) {
double lat = 0.0;
std::vector<kernel::resource::LinkImpl*> route;
- hostList_->at(i)->route_to(hostList_->at(j), route, &lat);
+ hostList_.at(i)->route_to(hostList_.at(j), route, &lat);
lat_current = std::max(lat_current, lat * communicationAmount_[i * hostNb + j]);
}
friend Action *CpuL07::sleep(double duration);
friend Action* HostL07Model::execute_parallel(int host_nb, sg_host_t* host_list, double* flops_amount,
double* bytes_amount, double rate);
+ friend Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate);
public:
L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* host_list, double* flops_amount,
void updateBound();
- std::vector<s4u::Host*>* hostList_ = new std::vector<s4u::Host*>();
- double *computationAmount_;
- double *communicationAmount_;
+ std::vector<s4u::Host*> hostList_;
+ double* computationAmount_; /* pointer to the data that lives in s4u action -- do not free unless if free_arrays */
+ double* communicationAmount_; /* pointer to the data that lives in s4u action -- do not free unless if free_arrays */
double latency_;
double rate_;
+
+private:
+ bool free_arrays_ = false; // By default, computationAmount_ and friends are freed by caller. But not for sequential
+ // exec and regular comms
};
}