namespace activity {
class XBT_PUBLIC ActivityImpl {
+ std::atomic_int_fast32_t refcount_{0};
+ std::string name_; /* Activity name if any */
public:
+ virtual ~ActivityImpl();
ActivityImpl() = default;
explicit ActivityImpl(const std::string& name) : name_(name) {}
- virtual ~ActivityImpl();
e_smx_state_t state_ = SIMIX_WAITING; /* State of the activity */
std::list<smx_simcall_t> simcalls_; /* List of simcalls waiting for this activity */
resource::Action* surf_action_ = nullptr;
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
void set_name(const std::string& name) { name_ = name; }
+ void set_category(const std::string& category);
virtual void suspend();
virtual void resume();
virtual void post() = 0; // What to do when a simcall terminates
virtual void finish() = 0;
- void set_category(const std::string& category);
// boost::intrusive_ptr<ActivityImpl> support:
friend XBT_PUBLIC void intrusive_ptr_add_ref(ActivityImpl* activity);
friend XBT_PUBLIC void intrusive_ptr_release(ActivityImpl* activity);
-private:
- std::atomic_int_fast32_t refcount_{0};
- std::string name_; /* Activity name if any */
-
-public:
static xbt::signal<void(ActivityImpl const&)> on_suspended;
static xbt::signal<void(ActivityImpl const&)> on_resumed;
};
XBT_DEBUG("send from mailbox %p", mbox);
/* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
- simgrid::kernel::activity::CommImplPtr this_comm = simgrid::kernel::activity::CommImplPtr(
- new simgrid::kernel::activity::CommImpl(simgrid::kernel::activity::CommImpl::Type::SEND));
+ simgrid::kernel::activity::CommImplPtr this_comm =
+ simgrid::kernel::activity::CommImplPtr(new simgrid::kernel::activity::CommImpl());
+ this_comm->set_type(simgrid::kernel::activity::CommImpl::Type::SEND);
/* Look for communication synchro matching our needs. We also provide a description of
* ourself so that the other side also gets a chance of choosing if it wants to match with us.
XBT_DEBUG("Receive already pushed");
other_comm->state_ = SIMIX_READY;
- other_comm->type = simgrid::kernel::activity::CommImpl::Type::READY;
+ other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::READY);
}
src_proc->comms.push_back(other_comm);
/* Setup the communication synchro */
other_comm->src_actor_ = src_proc;
- other_comm->task_size_ = task_size;
- other_comm->rate_ = rate;
- other_comm->src_buff_ = src_buff;
- other_comm->src_buff_size_ = src_buff_size;
other_comm->src_data_ = data;
+ (*other_comm).set_src_buff(src_buff, src_buff_size).set_size(task_size).set_rate(rate);
other_comm->match_fun = match_fun;
other_comm->copy_data_fun = copy_data_fun;
simix_match_func_t match_fun, void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t),
void* data, double rate)
{
- simgrid::kernel::activity::CommImplPtr this_synchro = simgrid::kernel::activity::CommImplPtr(
- new simgrid::kernel::activity::CommImpl(simgrid::kernel::activity::CommImpl::Type::RECEIVE));
+ simgrid::kernel::activity::CommImplPtr this_synchro =
+ simgrid::kernel::activity::CommImplPtr(new simgrid::kernel::activity::CommImpl());
+ this_synchro->set_type(simgrid::kernel::activity::CommImpl::Type::RECEIVE);
XBT_DEBUG("recv from mbox %p. this_synchro=%p", mbox, this_synchro.get());
simgrid::kernel::activity::CommImplPtr other_comm;
if (other_comm->surf_action_ && other_comm->remains() < 1e-12) {
XBT_DEBUG("comm %p has been already sent, and is finished, destroy it", other_comm.get());
other_comm->state_ = SIMIX_DONE;
- other_comm->type = simgrid::kernel::activity::CommImpl::Type::DONE;
+ other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::DONE);
other_comm->mbox = nullptr;
}
}
XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_comm.get());
other_comm->state_ = SIMIX_READY;
- other_comm->type = simgrid::kernel::activity::CommImpl::Type::READY;
+ other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::READY);
}
receiver->comms.push_back(other_comm);
}
/* Setup communication synchro */
other_comm->dst_actor_ = receiver;
- other_comm->dst_buff_ = dst_buff;
- other_comm->dst_buff_size_ = dst_buff_size;
other_comm->dst_data_ = data;
+ other_comm->set_dst_buff(dst_buff, dst_buff_size);
- if (rate > -1.0 && (other_comm->rate_ < 0.0 || rate < other_comm->rate_))
- other_comm->rate_ = rate;
+ if (rate > -1.0 && (other_comm->get_rate() < 0.0 || rate < other_comm->get_rate()))
+ other_comm->set_rate(rate);
other_comm->match_fun = match_fun;
other_comm->copy_data_fun = copy_data_fun;
namespace kernel {
namespace activity {
-CommImpl::CommImpl(CommImpl::Type type) : type(type)
+CommImpl& CommImpl::set_type(CommImpl::Type type)
{
- state_ = SIMIX_WAITING;
- src_data_ = nullptr;
- dst_data_ = nullptr;
- XBT_DEBUG("Create comm activity %p", this);
+ type_ = type;
+ return *this;
+}
+
+CommImpl& CommImpl::set_size(double size)
+{
+ size_ = size;
+ return *this;
+}
+
+CommImpl& CommImpl::set_rate(double rate)
+{
+ rate_ = rate;
+ return *this;
+}
+
+CommImpl& CommImpl::set_src_buff(void* buff, size_t size)
+{
+ src_buff_ = buff;
+ src_buff_size_ = size;
+ return *this;
+}
+
+CommImpl& CommImpl::set_dst_buff(void* buff, size_t* size)
+{
+ dst_buff_ = buff;
+ dst_buff_size_ = size;
+ return *this;
}
CommImpl::~CommImpl()
}
/** @brief Starts the simulation of a communication synchro. */
-void CommImpl::start()
+CommImpl* CommImpl::start()
{
/* If both the sender and the receiver are already there, start the communication */
if (state_ == SIMIX_READY) {
s4u::Host* sender = src_actor_->get_host();
s4u::Host* receiver = dst_actor_->get_host();
- surf_action_ = surf_network_model->communicate(sender, receiver, task_size_, rate_);
+ surf_action_ = surf_network_model->communicate(sender, receiver, size_, rate_);
surf_action_->set_data(this);
state_ = SIMIX_RUNNING;
surf_action_->suspend();
}
}
+
+ return this;
}
/** @brief Copy the communication data from the sender's buffer to the receiver's one */
~CommImpl() override;
void cleanupSurf();
+ double rate_ = 0.0;
+ double size_ = 0.0;
+
public:
enum class Type { SEND = 0, RECEIVE, READY, DONE };
- explicit CommImpl(Type type);
- void start();
+ CommImpl& set_type(CommImpl::Type type);
+ CommImpl& set_size(double size);
+ double get_rate() { return rate_; }
+ CommImpl& set_rate(double rate);
+ CommImpl& set_src_buff(void* buff, size_t size);
+ CommImpl& set_dst_buff(void* buff, size_t* size);
+
+ CommImpl* start();
void copy_data();
void suspend() override;
void resume() override;
void cancel();
double remains();
- CommImpl::Type type; /* Type of the communication (SIMIX_COMM_SEND or SIMIX_COMM_RECEIVE) */
+ CommImpl::Type type_; /* Type of the communication (SIMIX_COMM_SEND or SIMIX_COMM_RECEIVE) */
MailboxImpl* mbox = nullptr; /* Rendez-vous where the comm is queued */
#if SIMGRID_HAVE_MC
void (*copy_data_fun)(CommImpl*, void*, size_t) = nullptr;
/* Surf action data */
- resource::Action* surf_action_ = nullptr; /* The Surf communication action encapsulated */
resource::Action* src_timeout_ = nullptr; /* Surf's actions to instrument the timeouts */
resource::Action* dst_timeout_ = nullptr; /* Surf's actions to instrument the timeouts */
actor::ActorImplPtr src_actor_ = nullptr;
actor::ActorImplPtr dst_actor_ = nullptr;
- double rate_ = 0.0;
- double task_size_ = 0.0;
/* Data to be transfered */
void* src_buff_ = nullptr;
namespace kernel {
namespace activity {
-ExecImpl::ExecImpl(const std::string& name, const std::string& tracing_category) : ActivityImpl(name)
-{
- this->state_ = SIMIX_RUNNING;
- this->set_category(tracing_category);
-
- XBT_DEBUG("Create exec %p", this);
-}
-
ExecImpl::~ExecImpl()
{
if (timeout_detector_)
XBT_DEBUG("Destroy exec %p", this);
}
-ExecImpl* ExecImpl::set_host(s4u::Host* host)
+ExecImpl& ExecImpl::set_host(s4u::Host* host)
{
host_ = host;
- return this;
+ return *this;
+}
+
+ExecImpl& ExecImpl::set_name(const std::string& name)
+{
+ ActivityImpl::set_name(name);
+ return *this;
+}
+
+ExecImpl& ExecImpl::set_tracing_category(const std::string& category)
+{
+ ActivityImpl::set_category(category);
+ return *this;
}
-ExecImpl* ExecImpl::set_timeout(double timeout)
+ExecImpl& ExecImpl::set_timeout(double timeout)
{
if (timeout > 0 && not MC_is_active() && not MC_record_replay_is_active()) {
timeout_detector_ = host_->pimpl_cpu->sleep(timeout);
timeout_detector_->set_data(this);
}
- return this;
+ return *this;
}
ExecImpl* ExecImpl::start(double flops_amount, double priority, double bound)
{
+ state_ = SIMIX_RUNNING;
if (not MC_is_active() && not MC_record_replay_is_active()) {
surf_action_ = host_->pimpl_cpu->execution_start(flops_amount);
surf_action_->set_data(this);
ExecImpl* ExecImpl::start(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
const std::vector<double>& bytes_amounts)
{
+ state_ = SIMIX_RUNNING;
/* set surf's synchro */
if (not MC_is_active() && not MC_record_replay_is_active()) {
surf_action_ = surf_host_model->execute_parallel(hosts, flops_amounts.data(), bytes_amounts.data(), -1);
~ExecImpl();
public:
- explicit ExecImpl(const std::string& name, const std::string& tracing_category);
ExecImpl* start(double flops_amount, double priority, double bound);
ExecImpl* start(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
const std::vector<double>& bytes_amounts);
- ExecImpl* set_host(s4u::Host* host);
- ExecImpl* set_timeout(double timeout);
+ ExecImpl& set_name(const std::string& name);
+ ExecImpl& set_tracing_category(const std::string& category);
+ ExecImpl& set_host(s4u::Host* host);
+ ExecImpl& set_timeout(double timeout);
+
void cancel();
void post() override;
void finish() override;
namespace kernel {
namespace activity {
-IoImplPtr IoImpl::set_name(const std::string& name)
+IoImpl& IoImpl::set_name(const std::string& name)
{
ActivityImpl::set_name(name);
- return this;
+ return *this;
}
-IoImplPtr IoImpl::set_type(s4u::Io::OpType type)
+IoImpl& IoImpl::set_type(s4u::Io::OpType type)
{
type_ = type;
- return this;
+ return *this;
}
-IoImplPtr IoImpl::set_size(sg_size_t size)
+IoImpl& IoImpl::set_size(sg_size_t size)
{
size_ = size;
- return this;
+ return *this;
}
-IoImplPtr IoImpl::set_storage(resource::StorageImpl* storage)
+IoImpl& IoImpl::set_storage(resource::StorageImpl* storage)
{
storage_ = storage;
- return this;
+ return *this;
}
IoImpl* IoImpl::start()
sg_size_t performed_ioops_ = 0;
public:
- IoImplPtr set_name(const std::string& name);
- IoImplPtr set_size(sg_size_t size);
- IoImplPtr set_type(s4u::Io::OpType type);
- IoImplPtr set_storage(resource::StorageImpl* storage);
+ IoImpl& set_name(const std::string& name);
+ IoImpl& set_size(sg_size_t size);
+ IoImpl& set_type(s4u::Io::OpType type);
+ IoImpl& set_storage(resource::StorageImpl* storage);
sg_size_t get_performed_ioops() { return performed_ioops_; }
CommImplPtr this_comm;
CommImpl::Type smx_type;
if (type == 1) {
- this_comm = CommImplPtr(new CommImpl(CommImpl::Type::SEND));
+ this_comm = CommImplPtr(new CommImpl());
+ this_comm->set_type(CommImpl::Type::SEND);
smx_type = CommImpl::Type::RECEIVE;
} else {
- this_comm = CommImplPtr(new CommImpl(CommImpl::Type::RECEIVE));
+ this_comm = CommImplPtr(new CommImpl());
+ this_comm->set_type(CommImpl::Type::RECEIVE);
smx_type = CommImpl::Type::SEND;
}
CommImplPtr other_comm = nullptr;
for (auto it = comm_queue.begin(); it != comm_queue.end(); it++) {
CommImplPtr& comm = *it;
- if (comm->type == CommImpl::Type::SEND) {
+ if (comm->type_ == CommImpl::Type::SEND) {
other_user_data = comm->src_data_;
- } else if (comm->type == CommImpl::Type::RECEIVE) {
+ } else if (comm->type_ == CommImpl::Type::RECEIVE) {
other_user_data = comm->dst_data_;
}
- if (comm->type == type && (match_fun == nullptr || match_fun(this_user_data, other_user_data, comm.get())) &&
+ if (comm->type_ == type && (match_fun == nullptr || match_fun(this_user_data, other_user_data, comm.get())) &&
(not comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro.get()))) {
XBT_DEBUG("Found a matching communication synchro %p", comm.get());
#if SIMGRID_HAVE_MC
}
XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
" its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
- comm.get(), (int)comm->type, (int)type);
+ comm.get(), (int)comm->type_, (int)type);
}
XBT_DEBUG("No matching communication synchro found");
return nullptr;
return nullptr;
} else {
- return activity::ExecImplPtr(new activity::ExecImpl("suspend", ""))->set_host(host_)->start(0.0, 1.0, 0.0);
+ activity::ExecImpl* exec = new activity::ExecImpl();
+ (*exec).set_name("suspend").set_host(host_).start(0.0, 1.0, 0.0);
+ return activity::ExecImplPtr(exec);
}
}
}
/* On the other hand if it hasn't a timeout, check if the comm is ready.*/
else if (act->detached && act->src_actor_ == nullptr &&
- act->type == simgrid::kernel::activity::CommImpl::Type::READY)
+ act->type_ == simgrid::kernel::activity::CommImpl::Type::READY)
return (act->dst_actor_ != nullptr);
return (act->src_actor_ && act->dst_actor_);
}
simcall_comm_wait__get__timeout(r2) <= 0)
return false;
- if ((r1->call == SIMCALL_COMM_ISEND) && (synchro2->type == kernel::activity::CommImpl::Type::SEND) &&
+ if ((r1->call == SIMCALL_COMM_ISEND) && (synchro2->type_ == kernel::activity::CommImpl::Type::SEND) &&
(synchro2->src_buff_ != simcall_comm_isend__get__src_buff(r1)) && simcall_comm_wait__get__timeout(r2) <= 0)
return false;
- if ((r1->call == SIMCALL_COMM_IRECV) && (synchro2->type == kernel::activity::CommImpl::Type::RECEIVE) &&
+ if ((r1->call == SIMCALL_COMM_IRECV) && (synchro2->type_ == kernel::activity::CommImpl::Type::RECEIVE) &&
(synchro2->dst_buff_ != simcall_comm_irecv__get__dst_buff(r1)) && simcall_comm_wait__get__timeout(r2) <= 0)
return false;
}
simgrid::kernel::activity::CommImpl* act = temp_act.getBuffer();
if (act->src_actor_.get() && act->dst_actor_.get())
state->transition.argument = 0;
- else if (act->src_actor_.get() == nullptr && act->type == simgrid::kernel::activity::CommImpl::Type::READY &&
+ else if (act->src_actor_.get() == nullptr && act->type_ == simgrid::kernel::activity::CommImpl::Type::READY &&
act->detached == 1)
state->transition.argument = 0;
else
Exec::Exec()
{
- pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl(name_, tracing_category_));
+ pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl());
}
bool Exec::test()
Exec* ExecSeq::start()
{
simix::simcall([this] {
- boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(flops_amount_, 1. / priority_, bound_);
+ (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
+ .set_name(name_)
+ .set_tracing_category(tracing_category_)
+ .start(flops_amount_, 1. / priority_, bound_);
});
state_ = State::STARTED;
on_start(*Actor::self());
namespace simgrid {
namespace s4u {
-Io::Io(sg_storage_t storage, sg_size_t size, OpType type) : Activity(), storage_(storage), size_(size), type_(type)
+Io::Io(sg_storage_t storage, sg_size_t size, OpType type) : storage_(storage), size_(size), type_(type)
{
Activity::set_remaining(size_);
pimpl_ = kernel::activity::IoImplPtr(new kernel::activity::IoImpl());
Io* Io::start()
{
simix::simcall([this] {
- boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)
- ->set_name(name_)
- ->set_storage(storage_->get_impl())
- ->set_size(size_)
- ->set_type(type_)
- ->start();
+ (*boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_))
+ .set_name(name_)
+ .set_storage(storage_->get_impl())
+ .set_size(size_)
+ .set_type(type_)
+ .start();
});
state_ = State::STARTED;
return this;
double priority, double bound, sg_host_t host)
{
return simgrid::simix::simcall([name, category, flops_amount, priority, bound, host] {
- return simgrid::kernel::activity::ExecImplPtr(
- new simgrid::kernel::activity::ExecImpl(std::move(name), std::move(category)))
- ->set_host(host)
- ->start(flops_amount, priority, bound);
+ simgrid::kernel::activity::ExecImpl* exec = new simgrid::kernel::activity::ExecImpl();
+ (*exec).set_name(name).set_tracing_category(category).set_host(host).start(flops_amount, priority, bound);
+ return simgrid::kernel::activity::ExecImplPtr(exec);
});
}
if (bytes_amount != nullptr)
bytes_parallel_amount = std::vector<double>(bytes_amount, bytes_amount + host_nb * host_nb);
return simgrid::simix::simcall([name, hosts, flops_parallel_amount, bytes_parallel_amount, timeout] {
- return simgrid::kernel::activity::ExecImplPtr(new simgrid::kernel::activity::ExecImpl(std::move(name), ""))
- ->set_timeout(timeout)
- ->start(hosts, flops_parallel_amount, bytes_parallel_amount);
+ simgrid::kernel::activity::ExecImpl* exec = new simgrid::kernel::activity::ExecImpl();
+ (*exec).set_name(name).set_timeout(timeout).start(hosts, flops_parallel_amount, bytes_parallel_amount);
+ return simgrid::kernel::activity::ExecImplPtr(exec);
});
}