X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/1363ce9624f4327f3ad5c934b15736a776637dfd..bfd4a01db7f2f21e67c9ce03d32eb47a9601f759:/include/simgrid/s4u/Task.hpp diff --git a/include/simgrid/s4u/Task.hpp b/include/simgrid/s4u/Task.hpp index 5c8050b86f..ea144e239d 100644 --- a/include/simgrid/s4u/Task.hpp +++ b/include/simgrid/s4u/Task.hpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace simgrid::s4u { @@ -27,47 +28,62 @@ using IoTaskPtr = boost::intrusive_ptr; class XBT_PUBLIC Token : public xbt::Extendable {}; class Task { + std::string name_; - double amount_; - int queued_firings_ = 0; - int count_ = 0; - bool working_ = false; + + std::map amount_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}}; + std::map queued_firings_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}}; + std::map running_instances_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}}; + std::map count_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}}; + std::map parallelism_degree_ = {{"instance_0", 1}, {"dispatcher", 1}, {"collector", 1}}; + std::map internal_bytes_to_send_ = {{"instance_0", 0}, {"dispatcher", 0}}; + + std::function load_balancing_function_; std::set successors_ = {}; std::map predecessors_ = {}; std::atomic_int_fast32_t refcount_{0}; - bool ready_to_run() const; + bool ready_to_run(std::string instance); void receive(Task* source); std::shared_ptr token_ = nullptr; std::deque>> tokens_received_; - ActivityPtr previous_activity_; - ActivityPtr current_activity_; + std::map> current_activities_ = { + {"instance_0", {}}, {"dispatcher", {}}, {"collector", {}}}; + + inline static xbt::signal on_start; + xbt::signal on_this_start; + inline static xbt::signal on_completion; + xbt::signal on_this_completion; protected: explicit Task(const std::string& name); virtual ~Task() = default; - virtual void fire(); - void complete(); + virtual void fire(std::string instance); + void complete(std::string instance); - void set_current_activity(ActivityPtr a) { current_activity_ = a; } + void store_activity(ActivityPtr a, std::string instance) { current_activities_[instance].push_back(a); } - inline static xbt::signal on_start; - xbt::signal on_this_start; - inline static xbt::signal on_completion; - xbt::signal on_this_completion; + virtual void add_instances(int n); + virtual void remove_instances(int n); public: + void set_name(std::string name); const std::string& get_name() const { return name_; } const char* get_cname() const { return name_.c_str(); } - void set_amount(double amount); - double get_amount() const { return amount_; } - int get_count() const { return count_; } + void set_amount(double amount, std::string instance = "instance_0"); + double get_amount(std::string instance = "instance_0") const { return amount_.at(instance); } + int get_count(std::string instance = "collector") const { return count_.at(instance); } + void set_parallelism_degree(int n, std::string instance = "all"); + int get_parallelism_degree(std::string instance = "instance_0") const { return parallelism_degree_.at(instance); } + void set_internal_bytes(int bytes, std::string instance = "instance_0"); + double get_internal_bytes(std::string instance = "instance_0") const { return internal_bytes_to_send_.at(instance); } + void set_load_balancing_function(std::function func); void set_token(std::shared_ptr token); - std::shared_ptr get_next_token_from(TaskPtr t); + std::shared_ptr get_next_token_from(TaskPtr t) const { return tokens_received_.front().at(t); } void add_successor(TaskPtr t); void remove_successor(TaskPtr t); @@ -104,7 +120,7 @@ class CommTask : public Task { Host* destination_; explicit CommTask(const std::string& name); - void fire() override; + void fire(std::string instance) override; public: static CommTaskPtr init(const std::string& name); @@ -115,30 +131,33 @@ public: CommTaskPtr set_destination(Host* destination); Host* get_destination() const { return destination_; } CommTaskPtr set_bytes(double bytes); - double get_bytes() const { return get_amount(); } + double get_bytes() const { return get_amount("instance_0"); } }; class ExecTask : public Task { - Host* host_; + std::map host_ = {{"instance_0", nullptr}, {"dispatcher", nullptr}, {"collector", nullptr}}; explicit ExecTask(const std::string& name); - void fire() override; + void fire(std::string instance) override; public: static ExecTaskPtr init(const std::string& name); static ExecTaskPtr init(const std::string& name, double flops, Host* host); - ExecTaskPtr set_host(Host* host); - Host* get_host() const { return host_; } - ExecTaskPtr set_flops(double flops); - double get_flops() const { return get_amount(); } + ExecTaskPtr set_host(Host* host, std::string instance = "all"); + Host* get_host(std::string instance = "instance_0") const { return host_.at(instance); } + ExecTaskPtr set_flops(double flops, std::string instance = "instance_0"); + double get_flops(std::string instance = "instance_0") const { return get_amount(instance); } + + void add_instances(int n) override; + void remove_instances(int n) override; }; class IoTask : public Task { Disk* disk_; Io::OpType type_; explicit IoTask(const std::string& name); - void fire() override; + void fire(std::string instance) override; public: static IoTaskPtr init(const std::string& name); @@ -147,7 +166,7 @@ public: IoTaskPtr set_disk(Disk* disk); Disk* get_disk() const { return disk_; } IoTaskPtr set_bytes(double bytes); - double get_bytes() const { return get_amount(); } + double get_bytes() const { return get_amount("instance_0"); } IoTaskPtr set_op_type(Io::OpType type); Io::OpType get_op_type() const { return type_; } };