sg4::CommPtr comm = mailbox->put_init(computation_amount, 7e6);
exec->set_name("exec on sender")->add_successor(comm)->start();
- comm->set_name("comm to receiver")->vetoable_start();
+ comm->set_name("comm to receiver")->start();
exec->wait();
comm->wait();
}
sg4::CommPtr comm = mailbox->get_init()->set_dst_data((void**)&received, sizeof(double));
comm->set_name("comm from sender")->add_successor(exec)->start();
- exec->set_name("exec on receiver")->vetoable_start();
+ exec->set_name("exec on receiver")->start();
comm->wait();
exec->wait();
p Testing with default compound
$ ${bindir:=.}/s4u-comm-dependent ${platfdir}/small_platform.xml --log=s4u_activity.t:verbose "--log=root.fmt:[%6.2r]%e(%i:%a@%h)%e%m%n"
+> [ 0.00] (1:sender@Tremblay) 'exec on sender' is assigned to a resource and all dependencies are solved. Let's start
+> [ 0.00] (2:receiver@Jupiter) 'comm from sender' is assigned to a resource and all dependencies are solved. Let's start
> [ 2.00] (1:sender@Tremblay) Remove a dependency from 'exec on sender' on 'comm to receiver'
> [ 2.00] (1:sender@Tremblay) 'comm to receiver' is assigned to a resource and all dependencies are solved. Let's start
> [ 3.07] (2:receiver@Jupiter) Remove a dependency from 'comm from sender' on 'exec on receiver'
// Set the parameters (the name is for logging purposes only)
// + parent and child end after 1 second
- parent->set_name("parent")->set_flops_amount(tremblay->get_speed())->vetoable_start();
- transfer->set_name("transfer")->set_payload_size(125e6)->vetoable_start();
- child->set_name("child")->set_flops_amount(jupiter->get_speed())->vetoable_start();
+ parent->set_name("parent")->set_flops_amount(tremblay->get_speed())->start();
+ transfer->set_name("transfer")->set_payload_size(125e6)->start();
+ child->set_name("child")->set_flops_amount(jupiter->get_speed())->start();
// Schedule the different activities
parent->set_host(tremblay);
/* creation of a single Exec that will poorly fail when the workstation will stop */
XBT_INFO("First test: sequential Exec activity");
- sg4::ExecPtr exec = sg4::Exec::init()->set_name("Poor task")->set_flops_amount(2e10)->vetoable_start();
+ sg4::ExecPtr exec = sg4::Exec::init()->set_name("Poor task")->set_flops_amount(2e10)->start();
XBT_INFO("Schedule Activity '%s' on 'Faulty Host'", exec->get_cname());
exec->set_host(faulty);
/* Add a child Exec that depends on the Poor task' */
sg4::ExecPtr child = sg4::Exec::init()->set_name("Child")->set_flops_amount(2e10)->set_host(safe);
exec->add_successor(child);
- child->vetoable_start();
+ child->start();
XBT_INFO("Run the simulation");
e.run();
e.run();
XBT_INFO("Second test: parallel Exec activity");
- exec = sg4::Exec::init()->set_name("Poor parallel task")->set_flops_amounts({2e10, 2e10})->vetoable_start();
+ exec = sg4::Exec::init()->set_name("Poor parallel task")->set_flops_amounts({2e10, 2e10})->start();
XBT_INFO("Schedule Activity '%s' on 'Safe Host' and 'Faulty Host'", exec->get_cname());
exec->set_hosts({safe, faulty});
/* Add a child Exec that depends on the Poor parallel task' */
child = sg4::Exec::init()->set_name("Child")->set_flops_amount(2e10)->set_host(safe);
exec->add_successor(child);
- child->vetoable_start();
+ child->start();
XBT_INFO("Run the simulation");
e.run();
child->set_name("child")->set_flops_amount(carl->get_speed());
// Schedule and try to start the different activities
- parent->set_host(bob)->vetoable_start();
- write_output->set_disk(bob->get_disks().front())->vetoable_start();
- read_input->set_disk(carl->get_disks().front())->vetoable_start();
- child->set_host(carl)->vetoable_start();
+ parent->set_host(bob)->start();
+ write_output->set_disk(bob->get_disks().front())->start();
+ read_input->set_disk(carl->get_disks().front())->start();
+ child->set_host(carl)->start();
e.run();
second_parent->set_host(fafard);
// Start all activities that can actually start.
- first_parent->vetoable_start();
- second_parent->vetoable_start();
- child->vetoable_start();
+ first_parent->start();
+ second_parent->start();
+ child->start();
while (child->get_state() != sg4::Activity::State::FINISHED) {
e.run();
// Start the activities.
first_parent->start();
second_parent->start();
- // child uses a vetoable start to force it to wait for the completion of its predecessors
- child->vetoable_start();
+ child->start();
// wait for the completion of all activities
while (not pending_execs.empty()) {
#!/usr/bin/env tesh
$ ${bindir:=.}/s4u-exec-dependent ${platfdir}/small_platform.xml --log=s4u_activity.t:verbose "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [ 0.000000] (1:worker@Fafard) 'parent 1' is assigned to a resource and all dependencies are solved. Let's start
+> [ 0.000000] (1:worker@Fafard) 'parent 2' is assigned to a resource and all dependencies are solved. Let's start
> [ 0.000000] (1:worker@Fafard) Activity 'child' vetoed. Dependencies: NOT solved; Ressources: NOT assigned
> [ 2.000000] (1:worker@Fafard) Remove a dependency from 'parent 1' on 'child'
> [ 2.000000] (1:worker@Fafard) Exec 'parent 1' is complete
double computation_amount = sg4::this_actor::get_host()->get_speed();
// Create an unassigned activity and start it. It will not actually start, because it's not assigned to any host yet
- sg4::ExecPtr exec = sg4::Exec::init()->set_flops_amount(computation_amount)->set_name("exec")->vetoable_start();
+ sg4::ExecPtr exec = sg4::Exec::init()->set_flops_amount(computation_amount)->set_name("exec")->start();
// Wait for a while
sg4::this_actor::sleep_for(10);
// Start the activities.
bob_compute->start();
- bob_write->vetoable_start();
- carl_read->vetoable_start();
- carl_compute->vetoable_start();
+ bob_write->start();
+ carl_read->start();
+ carl_compute->start();
// wait for the completion of all activities
while (not pending_activities.empty()) {
! output sort
$ ${bindir:=.}/s4u-io-dependent ${platfdir}/hosts_with_disks.xml --log=s4u_activity.t:verbose "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [ 0.000000] (1:bob@bob) 'bob compute' is assigned to a resource and all dependencies are solved. Let's start
> [ 1.000000] (1:bob@bob) 'bob write' is assigned to a resource and all dependencies are solved. Let's start
> [ 1.000000] (1:bob@bob) Remove a dependency from 'bob compute' on 'bob write'
> [ 1.000000] (1:bob@bob) Activity 'bob compute' is complete
XBT_CVERB(s4u_activity, "Remove a dependency from '%s' on '%s'", get_cname(), b->get_cname());
b->dependencies_.erase(this);
if (b->dependencies_solved()) {
- b->vetoable_start();
+ b->start();
}
successors_.pop_back();
}
/*! Add a callback fired when the activity is resumed after being suspended */
static void on_resumed_cb(const std::function<void(Activity const&)>& cb) { on_resumed.connect(cb); }
- void vetoable_start()
+ XBT_ATTRIB_DEPRECATED_v334("All start() are vetoable now. Please use start() ") void vetoable_start()
+ {
+ start();
+ }
+ void start()
{
state_ = State::STARTING;
if (dependencies_solved() && is_assigned()) {
XBT_CVERB(s4u_activity, "'%s' is assigned to a resource and all dependencies are solved. Let's start", get_cname());
- start();
+ do_start();
} else {
if (vetoed_activities_ != nullptr)
vetoed_activities_->insert(this);
*
* This function is optional: you can call wait() even if you didn't call start()
*/
- virtual Activity* start() = 0;
+ virtual Activity* do_start() = 0;
/** Tests whether the given activity is terminated yet. */
virtual bool test();
/*! take a vector s4u::ActivityPtr and return the rank of the first finished one (or -1 if none is done). */
{
return get_data<void>();
}
-
- AnyActivity* vetoable_start()
+ XBT_ATTRIB_DEPRECATED_v334("All start() are vetoable now. Please use start() ") AnyActivity* vetoable_start()
+ {
+ return start();
+ }
+ AnyActivity* start()
{
- Activity::vetoable_start();
+ Activity::start();
return static_cast<AnyActivity*>(this);
}
std::function<void(kernel::activity::CommImpl*, void*, size_t)> copy_data_function_;
Comm() = default;
+ Comm* do_start() override;
public:
/* signals and related callbacks */
Actor* get_sender() const;
/* Comm life cycle */
- Comm* start() override;
/** Start the comm, and ignore its result. It can be completely forgotten after that. */
Comm* detach();
/** Start the comm, and ignore its result. It can be completely forgotten after that. */
protected:
explicit Exec(kernel::activity::ExecImplPtr pimpl);
+ Exec* do_start() override;
void reset() const;
static void on_start_cb(const std::function<void(Exec const&)>& cb) { on_start.connect(cb); }
static ExecPtr init();
- Exec* start() override;
/*! take a vector of s4u::ExecPtr and return when one of them is finished.
* The return value is the rank of the first finished ExecPtr. */
protected:
explicit Io(kernel::activity::IoImplPtr pimpl);
+ Io* do_start() override;
public:
enum class OpType { READ, WRITE };
static void on_start_cb(const std::function<void(Io const&)>& cb) { on_start.connect(cb); }
static IoPtr init();
- Io* start() override;
/*! take a vector of s4u::IoPtr and return when one of them is finished.
* The return value is the rank of the first finished IoPtr. */
static ssize_t wait_any(const std::vector<IoPtr>& ios) { return wait_any_for(ios, -1); }
template <typename T> CommPtr Mailbox::get_async(T** data)
{
CommPtr res = get_init()->set_dst_data(reinterpret_cast<void**>(data), sizeof(void*));
- res->vetoable_start();
+ res->start();
return res;
}
std::string new_name = parent->get_name() + "_" + comm->get_name() + "_" + child->get_name();
- comm->set_name(new_name)->vetoable_start();
+ comm->set_name(new_name)->start();
}
static bool check_for_cycle(const std::vector<simgrid::s4u::ActivityPtr>& dag)
dax_lineno = 1;
auto root_task = Exec::init()->set_name("root")->set_flops_amount(0);
- root_task->vetoable_start();
+ root_task->start();
result.push_back(root_task);
auto end_task = Exec::init()->set_name("end")->set_flops_amount(0);
- end_task->vetoable_start();
+ end_task->start();
xbt_assert(dax_lex() == 0, "Parse error in %s: %s", filename.c_str(), dax__parse_err_msg());
dax__delete_buffer(input_buffer);
if (activities.find(name) == activities.end()) {
XBT_DEBUG("See <Exec id = %s amount = %.0f>", name.c_str(), amount);
- act = Exec::init()->set_name(name)->set_flops_amount(amount)->vetoable_start();
+ act = Exec::init()->set_name(name)->set_flops_amount(amount)->start();
activities.try_emplace(name, act);
if (name != "root" && name != "end")
dag.push_back(act);
}
/*Check if 'root' and 'end' nodes have been explicitly declared. If not, create them. */
if (activities.find("root") == activities.end())
- root = Exec::init()->set_name("root")->set_flops_amount(0)->vetoable_start();
+ root = Exec::init()->set_name("root")->set_flops_amount(0)->start();
else
root = activities.at("root");
if (activities.find("end") == activities.end())
- end = Exec::init()->set_name("end")->set_flops_amount(0)->vetoable_start();
+ end = Exec::init()->set_name("end")->set_flops_amount(0)->start();
else
end = activities.at("end");
std::string name = std::string(src_name) + "->" + dst_name;
XBT_DEBUG("See <Comm id=%s amount = %.0f>", name.c_str(), size);
if (activities.find(name) == activities.end()) {
- act = Comm::sendto_init()->set_name(name)->set_payload_size(size)->vetoable_start();
+ act = Comm::sendto_init()->set_name(name)->set_payload_size(size)->start();
src->add_successor(act);
act->add_successor(dst);
activities.try_emplace(name, act);
std::string name = std::string(A_dax__job_id) + "@" + A_dax__job_name;
runtime *= 4200000000.; /* Assume that timings were done on a 4.2GFlops machine. I mean, why not? */
XBT_DEBUG("See <job id=%s runtime=%s %.0f>", A_dax__job_id, A_dax__job_runtime, runtime);
- simgrid::s4u::current_job = simgrid::s4u::Exec::init()->set_name(name)->set_flops_amount(runtime)->vetoable_start();
+ simgrid::s4u::current_job = simgrid::s4u::Exec::init()->set_name(name)->set_flops_amount(runtime)->start();
simgrid::s4u::jobs.try_emplace(A_dax__job_id, simgrid::s4u::current_job);
simgrid::s4u::result.push_back(simgrid::s4u::current_job);
} catch (const std::invalid_argument&) {
Activity* Activity::wait_for(double timeout)
{
if (state_ == State::INITED)
- vetoable_start();
+ start();
if (state_ == State::FAILED) {
if (dynamic_cast<Comm*>(this))
return true;
if (state_ == State::INITED || state_ == State::STARTING)
- this->vetoable_start();
+ this->start();
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
kernel::actor::ActivityTestSimcall observer{issuer, pimpl_.get()};
void execute(double flops, double priority)
{
- exec_init(flops)->set_priority(priority)->vetoable_start()->wait();
+ exec_init(flops)->set_priority(priority)->start()->wait();
}
void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
ExecPtr exec_async(double flops)
{
ExecPtr res = exec_init(flops);
- res->vetoable_start();
+ res->start();
return res;
}
if (state_ == State::STARTING && remains_ <= 0)
XBT_DEBUG("This communication has a payload size of 0 byte. It cannot start yet");
else
- vetoable_start();
+ start();
return this;
}
if (state_ == State::STARTING && remains_ <= 0)
XBT_DEBUG("This communication has a payload size of 0 byte. It cannot start yet");
else
- vetoable_start();
+ start();
return this;
}
mailbox_ != nullptr;
}
-Comm* Comm::start()
+Comm* Comm::do_start()
{
xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
"You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
"You cannot use %s() once your communication is %s (not implemented)", __FUNCTION__, get_state_str());
xbt_assert(dst_buff_ == nullptr && dst_buff_size_ == 0, "You can only detach sends, not recvs");
detached_ = true;
- vetoable_start();
+ start();
return this;
}
case State::INITED:
case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
if (get_source() != nullptr || get_destination() != nullptr) {
- return vetoable_start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls
+ return start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls
} else if (src_buff_ != nullptr) {
on_send(*this);
send(sender_, mailbox_, remains_, rate_, src_buff_, src_buff_size_, match_fun_, copy_data_function_,
IoPtr Disk::read_async(sg_size_t size) const
{
- return IoPtr(io_init(size, Io::OpType::READ))->vetoable_start();
+ return IoPtr(io_init(size, Io::OpType::READ))->start();
}
sg_size_t Disk::read(sg_size_t size) const
{
- return IoPtr(io_init(size, Io::OpType::READ))->vetoable_start()->wait()->get_performed_ioops();
+ return IoPtr(io_init(size, Io::OpType::READ))->start()->wait()->get_performed_ioops();
}
sg_size_t Disk::read(sg_size_t size, double priority) const
{
return IoPtr(io_init(size, Io::OpType::READ))
->set_priority(priority)
- ->vetoable_start()
+ ->start()
->wait()
->get_performed_ioops();
}
IoPtr Disk::write_async(sg_size_t size) const
{
- return IoPtr(io_init(size, Io::OpType::WRITE)->vetoable_start());
+ return IoPtr(io_init(size, Io::OpType::WRITE)->start());
}
sg_size_t Disk::write(sg_size_t size) const
{
- return IoPtr(io_init(size, Io::OpType::WRITE))->vetoable_start()->wait()->get_performed_ioops();
+ return IoPtr(io_init(size, Io::OpType::WRITE))->start()->wait()->get_performed_ioops();
}
sg_size_t Disk::write(sg_size_t size, double priority) const
{
return IoPtr(io_init(size, Io::OpType::WRITE))
->set_priority(priority)
- ->vetoable_start()
+ ->start()
->wait()
->get_performed_ioops();
}
return ExecPtr(static_cast<Exec*>(pimpl->get_iface()));
}
-Exec* Exec::start()
+Exec* Exec::do_start()
{
kernel::actor::simcall_answered([this] {
(*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
pimpl_.get(), [this, host] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->set_host(host); });
if (state_ == State::STARTING)
- // Setting the host may allow to start the activity, let's try
- vetoable_start();
+ // Setting the host may allow to start the activity, let's try
+ start();
return this;
}
// Setting the host may allow to start the activity, let's try
if (state_ == State::STARTING)
- vetoable_start();
+ start();
return this;
}
if (state_ == State::STARTED)
cancel();
- vetoable_start();
+ start();
return this;
}
void sg_exec_start(sg_exec_t exec)
{
- exec->vetoable_start();
+ exec->start();
}
void sg_exec_cancel(sg_exec_t exec)
void Host::execute(double flops, double priority) const
{
- this_actor::exec_init(flops)->set_priority(1 / priority)->vetoable_start()->wait();
+ this_actor::exec_init(flops)->set_priority(1 / priority)->start()->wait();
}
Host* Host::seal()
if (state_ == State::STARTING && remains_ <= 0)
XBT_DEBUG("This IO has a size of 0 byte. It cannot start yet");
else
- vetoable_start();
+ start();
return this;
}
if (state_ == State::STARTING && remains_ <= 0)
XBT_DEBUG("This IO has a size of 0 byte. It cannot start yet");
else
- vetoable_start();
+ start();
return this;
}
-Io* Io::start()
+Io* Io::do_start()
{
kernel::actor::simcall_answered(
[this] { (*boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)).set_name(get_name()).start(); });
// Setting the disk may allow to start the activity, let's try
if (state_ == State::STARTING)
- vetoable_start();
+ start();
return this;
}
xbt_assert(payload != nullptr, "You cannot send nullptr");
CommPtr res = put_init(payload, simulated_size_in_bytes);
- res->vetoable_start();
+ res->start();
return res;
}
{
xbt_assert(payload != nullptr, "You cannot send nullptr");
- put_init()->set_payload_size(simulated_size_in_bytes)->set_src_data(payload)->vetoable_start()->wait();
+ put_init()->set_payload_size(simulated_size_in_bytes)->set_src_data(payload)->start()->wait();
}
/** Blocking send with timeout */
{
xbt_assert(payload != nullptr, "You cannot send nullptr");
- put_init()->set_payload_size(simulated_size_in_bytes)->set_src_data(payload)->vetoable_start()->wait_for(timeout);
+ put_init()->set_payload_size(simulated_size_in_bytes)->set_src_data(payload)->start()->wait_for(timeout);
}
CommPtr Mailbox::get_init()
auto host = e.host_by_name("cpu0");
/* creation of the tasks and their dependencies */
- simgrid::s4u::ExecPtr Init = simgrid::s4u::Exec::init()->set_name("Init")->set_flops_amount(0)->vetoable_start();
- simgrid::s4u::CommPtr A = simgrid::s4u::Comm::sendto_init()->set_name("A")->set_payload_size(1e9)->vetoable_start();
- simgrid::s4u::CommPtr B = simgrid::s4u::Comm::sendto_init()->set_name("B")->vetoable_start();
- simgrid::s4u::ExecPtr C = simgrid::s4u::Exec::init()->set_name("C")->vetoable_start();
- simgrid::s4u::CommPtr D = simgrid::s4u::Comm::sendto_init()->set_name("D")->set_payload_size(1e9)->vetoable_start();
+ simgrid::s4u::ExecPtr Init = simgrid::s4u::Exec::init()->set_name("Init")->set_flops_amount(0)->start();
+ simgrid::s4u::CommPtr A = simgrid::s4u::Comm::sendto_init()->set_name("A")->set_payload_size(1e9)->start();
+ simgrid::s4u::CommPtr B = simgrid::s4u::Comm::sendto_init()->set_name("B")->start();
+ simgrid::s4u::ExecPtr C = simgrid::s4u::Exec::init()->set_name("C")->start();
+ simgrid::s4u::CommPtr D = simgrid::s4u::Comm::sendto_init()->set_name("D")->set_payload_size(1e9)->start();
std::vector<simgrid::s4u::ActivityPtr> activities = {Init, A, B, C, D};
Init->add_successor(A);
});
/* creation of the activities and their dependencies */
- simgrid::s4u::ExecPtr A = simgrid::s4u::Exec::init()->set_name("A")->vetoable_start();
- simgrid::s4u::ExecPtr B = simgrid::s4u::Exec::init()->set_name("B")->vetoable_start();
- simgrid::s4u::ExecPtr C = simgrid::s4u::Exec::init()->set_name("C")->vetoable_start();
- simgrid::s4u::ExecPtr D = simgrid::s4u::Exec::init()->set_name("D")->vetoable_start();
+ simgrid::s4u::ExecPtr A = simgrid::s4u::Exec::init()->set_name("A")->start();
+ simgrid::s4u::ExecPtr B = simgrid::s4u::Exec::init()->set_name("B")->start();
+ simgrid::s4u::ExecPtr C = simgrid::s4u::Exec::init()->set_name("C")->start();
+ simgrid::s4u::ExecPtr D = simgrid::s4u::Exec::init()->set_name("D")->start();
B->add_successor(A);
C->add_successor(A);