#ifndef SIMGRID_S4U_ACTIVITY_HPP
#define SIMGRID_S4U_ACTIVITY_HPP
+#include "xbt/asserts.h"
#include <simgrid/forward.h>
#include <xbt/signal.hpp>
Activity* set_remaining(double remains);
/** Put some user data onto the Activity */
- Activity* set_user_data(void* data)
- {
- user_data_ = data;
- return this;
- }
- /** Retrieve the user data of the Activity */
- void* get_user_data() { return user_data_; }
- kernel::activity::ActivityImplPtr get_impl() { return pimpl_; }
-
- #ifndef DOXYGEN
- XBT_ATTRIB_DEPRECATED_v324("Please use Activity::wait_for()") virtual void wait(double timeout) = 0;
- XBT_ATTRIB_DEPRECATED_v323("Please use Activity::get_state()") Activity::State getState() { return state_; }
- XBT_ATTRIB_DEPRECATED_v323("Please use Activity::get_remaining()") double getRemains() { return get_remaining(); }
- XBT_ATTRIB_DEPRECATED_v323("Please use Activity::set_remaining()") Activity* setRemains(double remains)
- {
- return set_remaining(remains);
- }
- #endif
+ kernel::activity::ActivityImpl* get_impl() const { return pimpl_.get(); }
private:
kernel::activity::ActivityImplPtr pimpl_ = nullptr;
Activity::State state_ = Activity::State::INITED;
double remains_ = 0;
- void* user_data_ = nullptr;
+};
+
+template <class AnyActivity> class Activity_T : public Activity {
+private:
+ std::string name_ = "";
+ std::string tracing_category_ = "";
+ void* user_data_ = nullptr;
+
+public:
+ AnyActivity* set_name(const std::string& name)
+ {
+ xbt_assert(get_state() == State::INITED, "Cannot change the name of an activity after its start");
+ name_ = name;
+ return static_cast<AnyActivity*>(this);
+ }
+ const std::string& get_name() { return name_; }
+ const char* get_cname() { return name_.c_str(); }
+
+ AnyActivity* set_tracing_category(const std::string& category)
+ {
+ xbt_assert(get_state() == State::INITED, "Cannot change the tracing category of an activity after its start");
+ tracing_category_ = category;
+ return static_cast<AnyActivity*>(this);
+ }
+ const std::string& get_tracing_category() { return tracing_category_; }
+
+ AnyActivity* set_user_data(void* data)
+ {
+ user_data_ = data;
+ return static_cast<AnyActivity*>(this);
+ }
+
+ void* get_user_data() { return user_data_; }
+ XBT_ATTRIB_DEPRECATED_v323("Please use Activity::set_user_data()") AnyActivity* setUserData(void* data)
+ {
+ return set_user_data(data);
+ }
+ XBT_ATTRIB_DEPRECATED_v323("Please use Activity::get_user_data()") void* getUserData() { return user_data_; }
};
} // namespace s4u
#include <simgrid/s4u/Activity.hpp>
#include <atomic>
+ #include <string>
#include <vector>
namespace simgrid {
*
* Represents all asynchronous communications, that you can test or wait onto.
*/
-class XBT_PUBLIC Comm : public Activity {
+class XBT_PUBLIC Comm : public Activity_T<Comm> {
Mailbox* mailbox_ = nullptr;
kernel::actor::ActorImpl* sender_ = nullptr;
kernel::actor::ActorImpl* receiver_ = nullptr;
size_t dst_buff_size_ = 0;
void* src_buff_ = nullptr;
size_t src_buff_size_ = sizeof(void*);
+ std::string tracing_category_ = "";
std::atomic_int_fast32_t refcount_{0};
-
/* FIXME: expose these elements in the API */
- int detached_ = 0;
+ bool detached_ = false;
int (*match_fun_)(void*, void*, kernel::activity::CommImpl*) = nullptr;
void (*clean_fun_)(void*) = nullptr;
void (*copy_data_function_)(kernel::activity::CommImpl*, void*, size_t) = nullptr;
- Comm() : Activity() {}
+ Comm() = default;
public:
+ #ifndef DOXYGEN
friend XBT_PUBLIC void intrusive_ptr_release(Comm* c);
friend XBT_PUBLIC void intrusive_ptr_add_ref(Comm* c);
friend Mailbox; // Factory of comms
+ #endif
virtual ~Comm();
* That's a buffer where the sent data will be copied */
CommPtr set_dst_data(void** buff, size_t size);
+ CommPtr set_tracing_category(const std::string& category);
+
/** Retrieve the mailbox on which this comm acts */
Mailbox* get_mailbox();
/** Retrieve the size of the received data. Not to be mixed with @ref Activity::set_remaining() */
size_t get_dst_data_size();
- s4u::ActorPtr get_sender();
-
- #ifndef DOXYGEN
- XBT_ATTRIB_DEPRECATED_v324("Please use Comm::wait_for()") void wait(double t) override { wait_for(t); }
- XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_rate()") Activity* setRate(double rate)
- {
- return set_rate(rate).get();
- }
- XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_src_data()") Activity* setSrcData(void* buff)
- {
- return set_src_data(buff).get();
- }
- XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_src_data()") Activity* setSrcData(void* buff, size_t size)
- {
- return set_src_data(buff, size).get();
- }
- XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_src_data_size()") Activity* setSrcDataSize(size_t size)
- {
- return set_src_data_size(size).get();
- }
- XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_dst_data()") Activity* setDstData(void** buff)
- {
- return set_dst_data(buff).get();
- }
- XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_dst_data()") Activity* setDstData(void** buff, size_t size)
- {
- return set_dst_data(buff, size).get();
- }
- XBT_ATTRIB_DEPRECATED_v323("Please use Comm::get_dst_data_size()") size_t getDstDataSize()
- {
- return get_dst_data_size();
- }
- XBT_ATTRIB_DEPRECATED_v323("Please use Comm::get_mailbox()") Mailbox* getMailbox() { return get_mailbox(); }
- #endif
+ Actor* get_sender();
};
} // namespace s4u
} // namespace simgrid
#include <simgrid/forward.h>
#include <simgrid/s4u/Activity.hpp>
+ #include <simgrid/s4u/Actor.hpp>
#include <xbt/ex.h>
#include <atomic>
* They are generated from this_actor::exec_init() or Host::execute(), and can be used to model pools of threads or
* similar mechanisms.
*/
-class XBT_PUBLIC Exec : public Activity {
- std::string name_ = "";
+class XBT_PUBLIC Exec : public Activity_T<Exec> {
double priority_ = 1.0;
double bound_ = 0.0;
double timeout_ = 0.0;
- std::string tracing_category_ = "";
std::atomic_int_fast32_t refcount_{0};
Host* host_ = nullptr;
#ifndef DOXYGEN
Exec(Exec const&) = delete;
Exec& operator=(Exec const&) = delete;
- #endif
friend ExecSeq;
friend ExecPar;
friend XBT_PUBLIC void intrusive_ptr_release(Exec* e);
friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e);
- static xbt::signal<void(Actor const&)> on_start;
- static xbt::signal<void(Actor const&)> on_completion;
+ #endif
+ static xbt::signal<void(Actor const&, Exec const&)> on_start;
+ static xbt::signal<void(Actor const&, Exec const&)> on_completion;
virtual Exec* start() override = 0;
virtual double get_remaining_ratio() = 0;
Exec* wait() override;
Exec* wait_for(double timeout) override;
+ /*! take a vector of s4u::ExecPtr and return when one of them is finished.
+ * The return value is the rank of the first finished ExecPtr. */
+ static int wait_any(std::vector<ExecPtr>* execs) { return wait_any_for(execs, -1); }
+ /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
+ static int wait_any_for(std::vector<ExecPtr>* execs, double timeout);
+
bool test() override;
ExecPtr set_bound(double bound);
- ExecPtr set_name(const std::string& name);
ExecPtr set_priority(double priority);
- ExecPtr set_tracing_category(const std::string& category);
ExecPtr set_timeout(double timeout);
Exec* cancel() override;
-
- XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_priority()") ExecPtr setPriority(double priority)
- {
- return set_priority(priority);
- }
- XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_bound()") ExecPtr setBound(double bound) { return set_bound(bound); }
- XBT_ATTRIB_DEPRECATED_v324("Please use Exec::wait_for()") void wait(double t) override { wait_for(t); }
+ const std::string& get_name() const { return name_; }
+ const char* get_cname() const { return name_.c_str(); }
+ Host* get_host() const;
+ unsigned int get_host_number() const;
+ double get_start_time() const;
+ double get_finish_time() const;
+ double get_cost() const;
};
class XBT_PUBLIC ExecSeq : public Exec {
double get_remaining() override;
double get_remaining_ratio() override;
-
- #ifndef DOXYGEN
- //////////////// Deprecated functions
- XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_host()") ExecPtr setHost(Host* host) { return set_host(host); }
- XBT_ATTRIB_DEPRECATED_v323("Please use Exec::get_host()") Host* getHost() { return get_host(); }
- XBT_ATTRIB_DEPRECATED_v323("Please use Exec::get_remaining_ratio()") double getRemainingRatio()
- {
- return get_remaining_ratio();
- }
- #endif
};
class XBT_PUBLIC ExecPar : public Exec {
std::vector<double> bytes_amounts_;
explicit ExecPar(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
const std::vector<double>& bytes_amounts);
- ExecPtr set_host(Host* host) override { return this; }
+ ExecPtr set_host(Host*) override { /* parallel exec cannot be moved */ THROW_UNIMPLEMENTED; }
public:
~ExecPar() = default;
/** I/O Activity, representing the asynchronous disk access.
*
- * They are generated from Storage::io_init() or Storage::read() and Storage::write().
+ * They are generated from Disk::io_init(), Disk::read() Disk::read_async(), Disk::write() and Disk::write_async().
*/
-class XBT_PUBLIC Io : public Activity {
+class XBT_PUBLIC Io : public Activity_T<Io> {
public:
enum class OpType { READ, WRITE };
private:
Storage* storage_ = nullptr;
+ Disk* disk_ = nullptr;
sg_size_t size_ = 0;
OpType type_ = OpType::READ;
- std::string name_ = "";
std::atomic_int_fast32_t refcount_{0};
explicit Io(sg_storage_t storage, sg_size_t size, OpType type);
+ explicit Io(sg_disk_t disk, sg_size_t size, OpType type);
public:
+ #ifndef DOXYGEN
friend XBT_PUBLIC void intrusive_ptr_release(simgrid::s4u::Io* i);
friend XBT_PUBLIC void intrusive_ptr_add_ref(simgrid::s4u::Io* i);
+ friend Disk; // Factory of IOs
friend Storage; // Factory of IOs
+ #endif
~Io() = default;
double get_remaining() override;
sg_size_t get_performed_ioops();
-
- #ifndef DOXYGEN
- XBT_ATTRIB_DEPRECATED_v324("Please use Io::wait_for()") void wait(double t) override { wait_for(t); }
- #endif
};
} // namespace s4u
return this;
}
+ CommPtr Comm::set_tracing_category(const std::string& category)
+ {
+ xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
+ tracing_category_ = category;
+ return this;
+ }
+
Comm* Comm::start()
{
- xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
+ xbt_assert(get_state() == State::INITED, "You cannot use %s() once your communication started (not implemented)",
__FUNCTION__);
if (src_buff_ != nullptr) { // Sender side
on_sender_start(*Actor::self());
pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
- clean_fun_, copy_data_function_, user_data_, detached_);
+ clean_fun_, copy_data_function_, get_user_data(), detached_);
} else if (dst_buff_ != nullptr) { // Receiver side
xbt_assert(not detached_, "Receive cannot be detached");
on_receiver_start(*Actor::self());
pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
- copy_data_function_, user_data_, rate_);
+ copy_data_function_, get_user_data(), rate_);
} else {
xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
if (src_buff_ != nullptr) {
on_sender_start(*Actor::self());
simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
- copy_data_function_, user_data_, timeout);
+ copy_data_function_, get_user_data(), timeout);
} else { // Receiver
on_receiver_start(*Actor::self());
simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
- user_data_, timeout, rate_);
+ get_user_data(), timeout, rate_);
}
state_ = State::FINISHED;
break;
Comm* Comm::cancel()
{
- simix::simcall([this] {
+ kernel::actor::simcall([this] {
if (pimpl_)
boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->cancel();
});
return mailbox_;
}
- ActorPtr Comm::get_sender()
+ Actor* Comm::get_sender()
{
- return sender_ ? sender_->iface() : nullptr;
+ return sender_ ? sender_->ciface() : nullptr;
}
void intrusive_ptr_release(simgrid::s4u::Comm* c)
namespace simgrid {
namespace s4u {
- xbt::signal<void(Actor const&)> Exec::on_start;
- xbt::signal<void(Actor const&)> Exec::on_completion;
+ xbt::signal<void(Actor const&, Exec const&)> Exec::on_start;
+ xbt::signal<void(Actor const&, Exec const&)> Exec::on_completion;
Exec::Exec()
{
start();
simcall_execution_wait(pimpl_);
state_ = State::FINISHED;
- on_completion(*Actor::self());
+ on_completion(*Actor::self(), *this);
return this;
}
THROW_UNIMPLEMENTED;
}
+ int Exec::wait_any_for(std::vector<ExecPtr>* execs, double timeout)
+ {
+ std::unique_ptr<kernel::activity::ExecImpl* []> rexecs(new kernel::activity::ExecImpl*[execs->size()]);
+ std::transform(begin(*execs), end(*execs), rexecs.get(),
+ [](const ExecPtr& exec) { return static_cast<kernel::activity::ExecImpl*>(exec->pimpl_.get()); });
+ return simcall_execution_waitany_for(rexecs.get(), execs->size(), timeout);
+ }
+
Exec* Exec::cancel()
{
- simix::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
+ kernel::actor::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
state_ = State::CANCELED;
return this;
}
return this;
}
-ExecPtr Exec::set_name(const std::string& name)
-{
- xbt_assert(state_ == State::INITED, "Cannot change the name of an exec after its start");
- name_ = name;
- return this;
-}
-
+ Host* Exec::get_host() const
+ {
+ return static_cast<kernel::activity::ExecImpl*>(pimpl_.get())->get_host();
+ }
+ unsigned int Exec::get_host_number() const
+ {
+ return static_cast<kernel::activity::ExecImpl*>(pimpl_.get())->get_host_number();
+ }
+ double Exec::get_start_time() const
+ {
+ return (pimpl_->surf_action_ == nullptr) ? -1 : pimpl_->surf_action_->get_start_time();
+ }
+ double Exec::get_finish_time() const
+ {
+ return (pimpl_->surf_action_ == nullptr) ? -1 : pimpl_->surf_action_->get_finish_time();
+ }
+ double Exec::get_cost() const
+ {
+ return (pimpl_->surf_action_ == nullptr) ? -1 : pimpl_->surf_action_->get_cost();
+ }
+
/** @brief Change the execution priority, don't you think?
*
* An execution with twice the priority will get twice the amount of flops when the resource is shared.
return this;
}
-ExecPtr Exec::set_tracing_category(const std::string& category)
-{
- xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
- tracing_category_ = category;
- return this;
-}
-
///////////// SEQUENTIAL EXECUTIONS ////////
ExecSeq::ExecSeq(sg_host_t host, double flops_amount) : Exec(), flops_amount_(flops_amount)
{
Exec* ExecSeq::start()
{
- simix::simcall([this] {
+ kernel::actor::simcall([this] {
(*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
.set_name(name_)
.set_tracing_category(tracing_category_)
- .set_priority(1. / priority_)
+ .set_sharing_penalty(1. / priority_)
.set_bound(bound_)
.set_flops_amount(flops_amount_)
.start();
});
state_ = State::STARTED;
- on_start(*Actor::self());
+ on_start(*Actor::self(), *this);
return this;
}
/** @brief Returns the amount of flops that remain to be done */
double ExecSeq::get_remaining()
{
- return simgrid::simix::simcall(
+ return simgrid::kernel::actor::simcall(
[this]() { return boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->get_remaining(); });
}
- /** @brief Returns the ratio of elements that are still to do
+ /** @brief Returns the ratio of elements that are still to do
*
* The returned value is between 0 (completely done) and 1 (nothing done yet).
*/
double ExecSeq::get_remaining_ratio()
{
- return simgrid::simix::simcall([this]() {
+ return simgrid::kernel::actor::simcall([this]() {
return boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->get_seq_remaining_ratio();
});
}
Exec* ExecPar::start()
{
- simix::simcall([this] {
+ kernel::actor::simcall([this] {
(*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
.set_hosts(hosts_)
.set_timeout(timeout_)
.start();
});
state_ = State::STARTED;
- on_start(*Actor::self());
+ on_start(*Actor::self(), *this);
return this;
}
double ExecPar::get_remaining_ratio()
{
- return simix::simcall(
+ return kernel::actor::simcall(
[this]() { return boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->get_par_remaining_ratio(); });
}