#ifndef SIMGRID_S4U_ACTIVITY_HPP
#define SIMGRID_S4U_ACTIVITY_HPP
+#include "xbt/asserts.h"
+#include <atomic>
#include <simgrid/forward.h>
#include <xbt/signal.hpp>
Activity* set_remaining(double remains);
/** Put some user data onto the Activity */
- Activity* set_user_data(void* data)
- {
- user_data_ = data;
- return this;
- }
- /** Retrieve the user data of the Activity */
- void* get_user_data() { return user_data_; }
kernel::activity::ActivityImpl* get_impl() const { return pimpl_.get(); }
kernel::activity::ActivityImplPtr pimpl_ = nullptr;
Activity::State state_ = Activity::State::INITED;
double remains_ = 0;
- void* user_data_ = nullptr;
+};
+
+template <class AnyActivity> class Activity_T : public Activity {
+private:
+ std::string name_ = "";
+ std::string tracing_category_ = "";
+ void* user_data_ = nullptr;
+ std::atomic_int_fast32_t refcount_{0};
+
+public:
+#ifndef DOXYGEN
+ friend XBT_PUBLIC void intrusive_ptr_release(AnyActivity* a)
+ {
+ if (a->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ delete a;
+ }
+ }
+ friend XBT_PUBLIC void intrusive_ptr_add_ref(AnyActivity* a) { a->refcount_.fetch_add(1, std::memory_order_relaxed); }
+#endif
+ AnyActivity* set_name(const std::string& name)
+ {
+ xbt_assert(get_state() == State::INITED, "Cannot change the name of an activity after its start");
+ name_ = name;
+ return static_cast<AnyActivity*>(this);
+ }
+ const std::string& get_name() { return name_; }
+ const char* get_cname() { return name_.c_str(); }
+
+ AnyActivity* set_tracing_category(const std::string& category)
+ {
+ xbt_assert(get_state() == State::INITED, "Cannot change the tracing category of an activity after its start");
+ tracing_category_ = category;
+ return static_cast<AnyActivity*>(this);
+ }
+ const std::string& get_tracing_category() { return tracing_category_; }
+
+ AnyActivity* set_user_data(void* data)
+ {
+ user_data_ = data;
+ return static_cast<AnyActivity*>(this);
+ }
+
+ void* get_user_data() { return user_data_; }
};
} // namespace s4u
#include <simgrid/forward.h>
#include <simgrid/s4u/Activity.hpp>
-#include <atomic>
#include <string>
#include <vector>
*
* Represents all asynchronous communications, that you can test or wait onto.
*/
-class XBT_PUBLIC Comm : public Activity {
+class XBT_PUBLIC Comm : public Activity_T<Comm> {
Mailbox* mailbox_ = nullptr;
kernel::actor::ActorImpl* sender_ = nullptr;
kernel::actor::ActorImpl* receiver_ = nullptr;
void* src_buff_ = nullptr;
size_t src_buff_size_ = sizeof(void*);
std::string tracing_category_ = "";
- std::atomic_int_fast32_t refcount_{0};
/* FIXME: expose these elements in the API */
bool detached_ = false;
int (*match_fun_)(void*, void*, kernel::activity::CommImpl*) = nullptr;
void (*clean_fun_)(void*) = nullptr;
void (*copy_data_function_)(kernel::activity::CommImpl*, void*, size_t) = nullptr;
- Comm() : Activity() {}
+ Comm() = default;
public:
#ifndef DOXYGEN
- friend XBT_PUBLIC void intrusive_ptr_release(Comm* c);
- friend XBT_PUBLIC void intrusive_ptr_add_ref(Comm* c);
friend Mailbox; // Factory of comms
#endif
#include <simgrid/s4u/Actor.hpp>
#include <xbt/ex.h>
-#include <atomic>
-
namespace simgrid {
namespace s4u {
* They are generated from this_actor::exec_init() or Host::execute(), and can be used to model pools of threads or
* similar mechanisms.
*/
-class XBT_PUBLIC Exec : public Activity {
- std::string name_ = "";
+class XBT_PUBLIC Exec : public Activity_T<Exec> {
double priority_ = 1.0;
double bound_ = 0.0;
double timeout_ = 0.0;
- std::string tracing_category_ = "";
- std::atomic_int_fast32_t refcount_{0};
protected:
Exec();
- virtual ~Exec() = default;
public:
+ virtual ~Exec() = default;
#ifndef DOXYGEN
Exec(Exec const&) = delete;
Exec& operator=(Exec const&) = delete;
friend ExecSeq;
friend ExecPar;
- friend XBT_PUBLIC void intrusive_ptr_release(Exec* e);
- friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e);
#endif
static xbt::signal<void(Actor const&, Exec const&)> on_start;
static xbt::signal<void(Actor const&, Exec const&)> on_completion;
bool test() override;
ExecPtr set_bound(double bound);
- ExecPtr set_name(const std::string& name);
ExecPtr set_priority(double priority);
- ExecPtr set_tracing_category(const std::string& category);
ExecPtr set_timeout(double timeout);
Exec* cancel() override;
- const std::string& get_name() const { return name_; }
- const char* get_cname() const { return name_.c_str(); }
Host* get_host() const;
unsigned int get_host_number() const;
double get_start_time() const;
#include <simgrid/forward.h>
#include <simgrid/s4u/Activity.hpp>
-#include <atomic>
#include <string>
namespace simgrid {
* They are generated from Disk::io_init(), Disk::read() Disk::read_async(), Disk::write() and Disk::write_async().
*/
-class XBT_PUBLIC Io : public Activity {
+class XBT_PUBLIC Io : public Activity_T<Io> {
public:
enum class OpType { READ, WRITE };
Disk* disk_ = nullptr;
sg_size_t size_ = 0;
OpType type_ = OpType::READ;
- std::string name_ = "";
- std::atomic_int_fast32_t refcount_{0};
explicit Io(sg_storage_t storage, sg_size_t size, OpType type);
explicit Io(sg_disk_t disk, sg_size_t size, OpType type);
public:
#ifndef DOXYGEN
- friend XBT_PUBLIC void intrusive_ptr_release(simgrid::s4u::Io* i);
- friend XBT_PUBLIC void intrusive_ptr_add_ref(simgrid::s4u::Io* i);
friend Disk; // Factory of IOs
friend Storage; // Factory of IOs
#endif
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-#include "xbt/asserts.h"
#include "xbt/log.h"
#include "simgrid/s4u/Activity.hpp"
Comm* Comm::start()
{
- xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
+ xbt_assert(get_state() == State::INITED, "You cannot use %s() once your communication started (not implemented)",
__FUNCTION__);
if (src_buff_ != nullptr) { // Sender side
on_sender_start(*Actor::self());
pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
- clean_fun_, copy_data_function_, user_data_, detached_);
+ clean_fun_, copy_data_function_, get_user_data(), detached_);
} else if (dst_buff_ != nullptr) { // Receiver side
xbt_assert(not detached_, "Receive cannot be detached");
on_receiver_start(*Actor::self());
pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
- copy_data_function_, user_data_, rate_);
+ copy_data_function_, get_user_data(), rate_);
} else {
xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
if (src_buff_ != nullptr) {
on_sender_start(*Actor::self());
simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
- copy_data_function_, user_data_, timeout);
+ copy_data_function_, get_user_data(), timeout);
} else { // Receiver
on_receiver_start(*Actor::self());
simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
- user_data_, timeout, rate_);
+ get_user_data(), timeout, rate_);
}
state_ = State::FINISHED;
break;
return sender_ ? sender_->ciface() : nullptr;
}
-void intrusive_ptr_release(simgrid::s4u::Comm* c)
-{
- if (c->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
- std::atomic_thread_fence(std::memory_order_acquire);
- delete c;
- }
-}
-void intrusive_ptr_add_ref(simgrid::s4u::Comm* c)
-{
- c->refcount_.fetch_add(1, std::memory_order_relaxed);
-}
} // namespace s4u
} // namespace simgrid
return this;
}
-void intrusive_ptr_release(simgrid::s4u::Exec* e)
-{
- if (e->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
- std::atomic_thread_fence(std::memory_order_acquire);
- delete e;
- }
-}
-
-void intrusive_ptr_add_ref(simgrid::s4u::Exec* e)
-{
- e->refcount_.fetch_add(1, std::memory_order_relaxed);
-}
-
/** @brief change the execution bound
* This means changing the maximal amount of flops per second that it may consume, regardless of what the host may
* deliver. Currently, this cannot be changed once the exec started.
return this;
}
-ExecPtr Exec::set_name(const std::string& name)
-{
- xbt_assert(state_ == State::INITED, "Cannot change the name of an exec after its start");
- name_ = name;
- return this;
-}
-
/** @brief Retrieve the host on which this activity takes place.
* If it runs on more than one host, only the first host is returned.
*/
return this;
}
-ExecPtr Exec::set_tracing_category(const std::string& category)
-{
- xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
- tracing_category_ = category;
- return this;
-}
-
///////////// SEQUENTIAL EXECUTIONS ////////
ExecSeq::ExecSeq(sg_host_t host, double flops_amount) : Exec(), flops_amount_(flops_amount)
{
{
kernel::actor::simcall([this] {
(*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
- .set_name(name_)
- .set_tracing_category(tracing_category_)
+ .set_name(get_name())
+ .set_tracing_category(get_tracing_category())
.set_sharing_penalty(1. / priority_)
.set_bound(bound_)
.set_flops_amount(flops_amount_)
kernel::actor::simcall([this] {
if (storage_) {
(*boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_))
- .set_name(name_)
+ .set_name(get_name())
.set_storage(storage_->get_impl())
.set_size(size_)
.set_type(type_)
.start();
} else {
(*boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_))
- .set_name(name_)
+ .set_name(get_name())
.set_disk(disk_->get_impl())
.set_size(size_)
.set_type(type_)
[this]() { return boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->get_performed_ioops(); });
}
-void intrusive_ptr_release(simgrid::s4u::Io* i)
-{
- if (i->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
- std::atomic_thread_fence(std::memory_order_acquire);
- delete i;
- }
-}
-
-void intrusive_ptr_add_ref(simgrid::s4u::Io* i)
-{
- i->refcount_.fetch_add(1, std::memory_order_relaxed);
-}
} // namespace s4u
} // namespace simgrid