*
* Represents all asynchronous communications, that you can test or wait onto.
*/
-class XBT_PUBLIC Comm : public Activity {
+class XBT_PUBLIC Comm : public Activity_T<Comm> {
Mailbox* mailbox_ = nullptr;
kernel::actor::ActorImpl* sender_ = nullptr;
kernel::actor::ActorImpl* receiver_ = nullptr;
void (*clean_fun_)(void*) = nullptr;
void (*copy_data_function_)(kernel::activity::CommImpl*, void*, size_t) = nullptr;
- Comm() : Activity() {}
+ Comm() = default;
public:
friend XBT_PUBLIC void intrusive_ptr_release(Comm* c);
virtual ~Comm();
- static xbt::signal<void(ActorPtr)> on_sender_start;
- static xbt::signal<void(ActorPtr)> on_receiver_start;
- static xbt::signal<void(ActorPtr)> on_completion;
+ static xbt::signal<void(Actor const&)> on_sender_start;
+ static xbt::signal<void(Actor const&)> on_receiver_start;
+ static xbt::signal<void(Actor const&)> on_completion;
/*! take a vector s4u::CommPtr and return when one of them is finished.
* The return value is the rank of the first finished CommPtr. */
* They are generated from this_actor::exec_init() or Host::execute(), and can be used to model pools of threads or
* similar mechanisms.
*/
-class XBT_PUBLIC Exec : public Activity {
- std::string name_ = "";
+class XBT_PUBLIC Exec : public Activity_T<Exec> {
double priority_ = 1.0;
double bound_ = 0.0;
double timeout_ = 0.0;
- std::string tracing_category_ = "";
std::atomic_int_fast32_t refcount_{0};
Host* host_ = nullptr;
friend ExecPar;
friend XBT_PUBLIC void intrusive_ptr_release(Exec* e);
friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e);
- static xbt::signal<void(ActorPtr)> on_start;
- static xbt::signal<void(ActorPtr)> on_completion;
+ static xbt::signal<void(Actor const&)> on_start;
+ static xbt::signal<void(Actor const&)> on_completion;
virtual Exec* start() override = 0;
virtual double get_remaining_ratio() = 0;
bool test() override;
ExecPtr set_bound(double bound);
- ExecPtr set_name(const std::string& name);
ExecPtr set_priority(double priority);
- ExecPtr set_tracing_category(const std::string& category);
ExecPtr set_timeout(double timeout);
Exec* cancel() override;
namespace simgrid {
namespace s4u {
- xbt::signal<void(ActorPtr)> Comm::on_sender_start;
- xbt::signal<void(ActorPtr)> Comm::on_receiver_start;
- xbt::signal<void(ActorPtr)> Comm::on_completion;
+ xbt::signal<void(Actor const&)> Comm::on_sender_start;
+ xbt::signal<void(Actor const&)> Comm::on_receiver_start;
+ xbt::signal<void(Actor const&)> Comm::on_completion;
Comm::~Comm()
{
Comm* Comm::start()
{
- xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
+ xbt_assert(get_state() == State::INITED, "You cannot use %s() once your communication started (not implemented)",
__FUNCTION__);
if (src_buff_ != nullptr) { // Sender side
- on_sender_start(Actor::self());
+ on_sender_start(*Actor::self());
pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
- clean_fun_, copy_data_function_, user_data_, detached_);
+ clean_fun_, copy_data_function_, get_user_data(), detached_);
} else if (dst_buff_ != nullptr) { // Receiver side
xbt_assert(not detached_, "Receive cannot be detached");
- on_receiver_start(Actor::self());
+ on_receiver_start(*Actor::self());
pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
- copy_data_function_, user_data_, rate_);
+ copy_data_function_, get_user_data(), rate_);
} else {
xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
/** @brief Block the calling actor until the communication is finished, or until timeout
*
- * On timeout, an exception is thrown.
+ * On timeout, an exception is thrown and the communication is invalidated.
*
* @param timeout the amount of seconds to wait for the comm termination.
* Negative values denote infinite wait times. 0 as a timeout returns immediately. */
case State::INITED: // It's not started yet. Do it in one simcall
if (src_buff_ != nullptr) {
- on_sender_start(Actor::self());
+ on_sender_start(*Actor::self());
simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
- copy_data_function_, user_data_, timeout);
+ copy_data_function_, get_user_data(), timeout);
} else { // Receiver
- on_receiver_start(Actor::self());
+ on_receiver_start(*Actor::self());
simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
- user_data_, timeout, rate_);
+ get_user_data(), timeout, rate_);
}
state_ = State::FINISHED;
break;
case State::STARTED:
simcall_comm_wait(pimpl_, timeout);
- on_completion(Actor::self());
+ on_completion(*Actor::self());
state_ = State::FINISHED;
break;
namespace simgrid {
namespace s4u {
- xbt::signal<void(ActorPtr)> Exec::on_start;
- xbt::signal<void(ActorPtr)> Exec::on_completion;
+ xbt::signal<void(Actor const&)> Exec::on_start;
+ xbt::signal<void(Actor const&)> Exec::on_completion;
Exec::Exec()
{
- pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl(get_name(), get_tracing_category()));
+ pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl());
}
bool Exec::test()
start();
simcall_execution_wait(pimpl_);
state_ = State::FINISHED;
- on_completion(Actor::self());
+ on_completion(*Actor::self());
return this;
}
return this;
}
-ExecPtr Exec::set_name(const std::string& name)
-{
- xbt_assert(state_ == State::INITED, "Cannot change the name of an exec after its start");
- name_ = name;
- return this;
-}
-
/** @brief Change the execution priority, don't you think?
*
* An execution with twice the priority will get twice the amount of flops when the resource is shared.
return this;
}
-ExecPtr Exec::set_tracing_category(const std::string& category)
-{
- xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
- tracing_category_ = category;
- return this;
-}
-
///////////// SEQUENTIAL EXECUTIONS ////////
ExecSeq::ExecSeq(sg_host_t host, double flops_amount) : Exec(), flops_amount_(flops_amount)
{
Exec* ExecSeq::start()
{
simix::simcall([this] {
- boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(flops_amount_, 1. / priority_, bound_);
+ (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
+ .set_name(name_)
+ .set_tracing_category(tracing_category_)
+ .set_priority(1. / priority_)
+ .set_bound(bound_)
+ .set_flops_amount(flops_amount_)
+ .start();
});
state_ = State::STARTED;
- on_start(Actor::self());
+ on_start(*Actor::self());
return this;
}
if (state_ == State::STARTED)
boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->migrate(host);
host_ = host;
- boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->host_ = host;
+ boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->set_host(host);
return this;
}
const std::vector<double>& bytes_amounts)
: Exec(), hosts_(hosts), flops_amounts_(flops_amounts), bytes_amounts_(bytes_amounts)
{
- // For parallel executions, we need a special host to run the timeout detector.
- host_ = hosts.front();
- boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->host_ = host_;
}
Exec* ExecPar::start()
{
simix::simcall([this] {
- boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->set_timeout(timeout_);
- boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(hosts_, flops_amounts_, bytes_amounts_);
+ (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
+ .set_hosts(hosts_)
+ .set_timeout(timeout_)
+ .set_flops_amounts(flops_amounts_)
+ .set_bytes_amounts(bytes_amounts_)
+ .start();
});
state_ = State::STARTED;
- on_start(Actor::self());
+ on_start(*Actor::self());
return this;
}
+
double ExecPar::get_remaining_ratio()
{
return simix::simcall(