Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
36e6e0dbd2137e8881160320e0eb48d1ad56bddc
[simgrid.git] / include / simgrid / s4u / Task.hpp
1 #ifndef SIMGRID_S4U_TASK_H_
2 #define SIMGRID_S4U_TASK_H_
3
4 #include <simgrid/s4u/Activity.hpp>
5 #include <simgrid/s4u/Io.hpp>
6 #include <xbt/Extendable.hpp>
7
8 #include <atomic>
9 #include <deque>
10 #include <map>
11 #include <memory>
12 #include <set>
13
14 namespace simgrid::s4u {
15
16 class Task;
17 using TaskPtr = boost::intrusive_ptr<Task>;
18 XBT_PUBLIC void intrusive_ptr_release(Task* o);
19 XBT_PUBLIC void intrusive_ptr_add_ref(Task* o);
20 class ExecTask;
21 using ExecTaskPtr = boost::intrusive_ptr<ExecTask>;
22 class CommTask;
23 using CommTaskPtr = boost::intrusive_ptr<CommTask>;
24 class IoTask;
25 using IoTaskPtr = boost::intrusive_ptr<IoTask>;
26
27 class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
28
29 class Task {
30   std::string name_;
31   double amount_;
32   int queued_firings_     = 0;
33   int count_              = 0;
34   int running_instances_  = 0;
35   int parallelism_degree_ = 1;
36
37   std::set<Task*> successors_                 = {};
38   std::map<Task*, unsigned int> predecessors_ = {};
39   std::atomic_int_fast32_t refcount_{0};
40
41   bool ready_to_run() const;
42   void receive(Task* source);
43
44   std::shared_ptr<Token> token_ = nullptr;
45   std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
46   std::deque<ActivityPtr> current_activities_;
47
48   inline static xbt::signal<void(Task*)> on_start;
49   xbt::signal<void(Task*)> on_this_start;
50   inline static xbt::signal<void(Task*)> on_completion;
51   xbt::signal<void(Task*)> on_this_completion;
52
53 protected:
54   explicit Task(const std::string& name);
55   virtual ~Task() = default;
56
57   virtual void fire();
58   void complete();
59
60   void store_activity(ActivityPtr a) { current_activities_.push_back(a); }
61
62 public:
63   void set_name(std::string name);
64   const std::string& get_name() const { return name_; }
65   const char* get_cname() const { return name_.c_str(); }
66   void set_amount(double amount);
67   double get_amount() const { return amount_; }
68   int get_count() const { return count_; }
69   void set_parallelism_degree(int n);
70   int get_parallelism_degree() { return parallelism_degree_; }
71
72   void set_token(std::shared_ptr<Token> token);
73   std::shared_ptr<Token> get_next_token_from(TaskPtr t);
74
75   void add_successor(TaskPtr t);
76   void remove_successor(TaskPtr t);
77   void remove_all_successors();
78   const std::set<Task*>& get_successors() const { return successors_; }
79
80   void enqueue_firings(int n);
81
82   /** Add a callback fired before this task activity starts */
83   void on_this_start_cb(const std::function<void(Task*)>& func) { on_this_start.connect(func); }
84   /** Add a callback fired before a task activity starts.
85    * Triggered after the on_this_start function**/
86   static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
87   /** Add a callback fired before this task activity ends */
88   void on_this_completion_cb(const std::function<void(Task*)>& func) { on_this_completion.connect(func); };
89   /** Add a callback fired after a task activity ends.
90    * Triggered after the on_this_end function, but before sending tokens to successors.**/
91   static void on_completion_cb(const std::function<void(Task*)>& cb) { on_completion.connect(cb); }
92
93 #ifndef DOXYGEN
94   friend void intrusive_ptr_release(Task* o)
95   {
96     if (o->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
97       std::atomic_thread_fence(std::memory_order_acquire);
98       delete o;
99     }
100   }
101   friend void intrusive_ptr_add_ref(Task* o) { o->refcount_.fetch_add(1, std::memory_order_relaxed); }
102 #endif
103 };
104
105 class CommTask : public Task {
106   Host* source_;
107   Host* destination_;
108
109   explicit CommTask(const std::string& name);
110   void fire() override;
111
112 public:
113   static CommTaskPtr init(const std::string& name);
114   static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination);
115
116   CommTaskPtr set_source(Host* source);
117   Host* get_source() const { return source_; }
118   CommTaskPtr set_destination(Host* destination);
119   Host* get_destination() const { return destination_; }
120   CommTaskPtr set_bytes(double bytes);
121   double get_bytes() const { return get_amount(); }
122 };
123
124 class ExecTask : public Task {
125   Host* host_;
126
127   explicit ExecTask(const std::string& name);
128   void fire() override;
129
130 public:
131   static ExecTaskPtr init(const std::string& name);
132   static ExecTaskPtr init(const std::string& name, double flops, Host* host);
133
134   ExecTaskPtr set_host(Host* host);
135   Host* get_host() const { return host_; }
136   ExecTaskPtr set_flops(double flops);
137   double get_flops() const { return get_amount(); }
138 };
139
140 class IoTask : public Task {
141   Disk* disk_;
142   Io::OpType type_;
143   explicit IoTask(const std::string& name);
144   void fire() override;
145
146 public:
147   static IoTaskPtr init(const std::string& name);
148   static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type);
149
150   IoTaskPtr set_disk(Disk* disk);
151   Disk* get_disk() const { return disk_; }
152   IoTaskPtr set_bytes(double bytes);
153   double get_bytes() const { return get_amount(); }
154   IoTaskPtr set_op_type(Io::OpType type);
155   Io::OpType get_op_type() const { return type_; }
156 };
157 } // namespace simgrid::s4u
158 #endif