Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Split the Comm observers to their own files
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Fri, 18 Feb 2022 23:28:19 +0000 (00:28 +0100)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Fri, 18 Feb 2022 23:30:19 +0000 (00:30 +0100)
MANIFEST.in
src/kernel/activity/CommImpl.hpp
src/kernel/actor/CommObserver.cpp [new file with mode: 0644]
src/kernel/actor/CommObserver.hpp [new file with mode: 0644]
src/kernel/actor/SimcallObserver.cpp
src/kernel/actor/SimcallObserver.hpp
src/s4u/s4u_Activity.cpp
tools/cmake/DefinePackages.cmake

index 06bfb74..866f079 100644 (file)
@@ -2258,6 +2258,8 @@ include src/kernel/activity/SynchroRaw.cpp
 include src/kernel/activity/SynchroRaw.hpp
 include src/kernel/actor/ActorImpl.cpp
 include src/kernel/actor/ActorImpl.hpp
+include src/kernel/actor/CommObserver.cpp
+include src/kernel/actor/CommObserver.hpp
 include src/kernel/actor/SimcallObserver.cpp
 include src/kernel/actor/SimcallObserver.hpp
 include src/kernel/context/Context.cpp
index 3837b6b..d14f5dd 100644 (file)
@@ -8,7 +8,7 @@
 
 #include "src/kernel/activity/ActivityImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
-#include "src/kernel/actor/SimcallObserver.hpp"
+#include "src/kernel/actor/CommObserver.hpp"
 
 namespace simgrid {
 namespace kernel {
diff --git a/src/kernel/actor/CommObserver.cpp b/src/kernel/actor/CommObserver.cpp
new file mode 100644 (file)
index 0000000..ccb7df9
--- /dev/null
@@ -0,0 +1,166 @@
+/* Copyright (c) 2019-2022. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u/Host.hpp"
+#include "src/kernel/activity/CommImpl.hpp"
+#include "src/kernel/activity/MailboxImpl.hpp"
+#include "src/kernel/activity/MutexImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
+#include "src/mc/mc_config.hpp"
+
+#include <sstream>
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(obs_comm, mc_observer, "Logging specific to the Communication simcalls observation");
+
+namespace simgrid {
+namespace kernel {
+namespace actor {
+
+ActivityTestanySimcall::ActivityTestanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities)
+    : ResultingSimcall(actor, -1), activities_(activities)
+{
+}
+
+int ActivityTestanySimcall::get_max_consider()
+{
+  indexes_.clear();
+  // list all the activities that are ready
+  for (unsigned i = 0; i < activities_.size(); i++)
+    if (activities_[i]->test(get_issuer()))
+      indexes_.push_back(i);
+  return indexes_.size() + 1;
+}
+
+void ActivityTestanySimcall::prepare(int times_considered)
+{
+  if (times_considered < static_cast<int>(indexes_.size()))
+    next_value_ = indexes_.at(times_considered);
+  else
+    next_value_ = -1;
+}
+static void serialize_activity_test(const activity::ActivityImpl* act, std::stringstream& stream)
+{
+  if (auto* comm = dynamic_cast<activity::CommImpl const*>(act)) {
+    stream << "  " << (short)mc::Transition::Type::COMM_TEST;
+    stream << ' ' << (uintptr_t)comm;
+    stream << ' ' << (comm->src_actor_ != nullptr ? comm->src_actor_->get_pid() : -1);
+    stream << ' ' << (comm->dst_actor_ != nullptr ? comm->dst_actor_->get_pid() : -1);
+    stream << ' ' << comm->get_mailbox_id();
+    stream << ' ' << (uintptr_t)comm->src_buff_ << ' ' << (uintptr_t)comm->dst_buff_ << ' ' << comm->src_buff_size_;
+  } else {
+    stream << (short)mc::Transition::Type::UNKNOWN;
+  }
+}
+void ActivityTestanySimcall::serialize(std::stringstream& stream) const
+{
+  stream << (short)mc::Transition::Type::TESTANY << ' ' << activities_.size() << ' ';
+  for (auto const& act : activities_) {
+    serialize_activity_test(act, stream);
+    stream << ' ';
+  }
+}
+void ActivityTestSimcall::serialize(std::stringstream& stream) const
+{
+  serialize_activity_test(activity_, stream);
+}
+static void serialize_activity_wait(const activity::ActivityImpl* act, bool timeout, std::stringstream& stream)
+{
+  if (auto* comm = dynamic_cast<activity::CommImpl const*>(act)) {
+    stream << (short)mc::Transition::Type::COMM_WAIT << ' ';
+    stream << timeout << ' ' << (uintptr_t)comm;
+
+    stream << ' ' << (comm->src_actor_ != nullptr ? comm->src_actor_->get_pid() : -1);
+    stream << ' ' << (comm->dst_actor_ != nullptr ? comm->dst_actor_->get_pid() : -1);
+    stream << ' ' << comm->get_mailbox_id();
+    stream << ' ' << (uintptr_t)comm->src_buff_ << ' ' << (uintptr_t)comm->dst_buff_ << ' ' << comm->src_buff_size_;
+  } else {
+    stream << (short)mc::Transition::Type::UNKNOWN;
+  }
+}
+
+void ActivityWaitSimcall::serialize(std::stringstream& stream) const
+{
+  serialize_activity_wait(activity_, timeout_ > 0, stream);
+}
+void ActivityWaitanySimcall::serialize(std::stringstream& stream) const
+{
+  stream << (short)mc::Transition::Type::WAITANY << ' ' << activities_.size() << ' ';
+  for (auto const& act : activities_) {
+    serialize_activity_wait(act, timeout_ > 0, stream);
+    stream << ' ';
+  }
+}
+ActivityWaitanySimcall::ActivityWaitanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities,
+                                               double timeout)
+    : ResultingSimcall(actor, -1), activities_(activities), timeout_(timeout)
+{
+}
+
+bool ActivityWaitSimcall::is_enabled()
+{
+  // FIXME: if _sg_mc_timeout == 1 and if we have either a sender or receiver timeout, the transition is enabled
+  // because even if the communication is not ready, it can timeout and won't block.
+
+  return activity_->test(get_issuer());
+}
+
+bool ActivityWaitanySimcall::is_enabled()
+{
+  // list all the activities that are ready
+  indexes_.clear();
+  for (unsigned i = 0; i < activities_.size(); i++)
+    if (activities_[i]->test(get_issuer()))
+      indexes_.push_back(i);
+
+  //  if (_sg_mc_timeout && timeout_)  FIXME: deal with the potential timeout of the WaitAny
+
+  // FIXME: even if the WaitAny has no timeout, some of the activities may still have one.
+  // we should iterate over the vector searching for them
+  return not indexes_.empty();
+}
+
+int ActivityWaitanySimcall::get_max_consider()
+{
+  // list all the activities that are ready
+  indexes_.clear();
+  for (unsigned i = 0; i < activities_.size(); i++)
+    if (activities_[i]->test(get_issuer()))
+      indexes_.push_back(i);
+
+  int res = indexes_.size();
+  //  if (_sg_mc_timeout && timeout_)
+  //    res++;
+
+  return res;
+}
+
+void ActivityWaitanySimcall::prepare(int times_considered)
+{
+  if (times_considered < static_cast<int>(indexes_.size()))
+    next_value_ = indexes_.at(times_considered);
+  else
+    next_value_ = -1;
+}
+
+void CommIsendSimcall::serialize(std::stringstream& stream) const
+{
+  stream << (short)mc::Transition::Type::COMM_SEND << ' ';
+  stream << (uintptr_t)comm_ << ' ' << mbox_->get_id() << ' ' << (uintptr_t)src_buff_ << ' ' << src_buff_size_ << ' '
+         << tag_;
+  XBT_DEBUG("SendObserver comm:%p mbox:%u buff:%p size:%zu tag:%d", comm_, mbox_->get_id(), src_buff_, src_buff_size_,
+            tag_);
+}
+
+void CommIrecvSimcall::serialize(std::stringstream& stream) const
+{
+  stream << (short)mc::Transition::Type::COMM_RECV << ' ';
+  stream << (uintptr_t)comm_ << ' ' << mbox_->get_id() << ' ' << (uintptr_t)dst_buff_ << ' ' << tag_;
+  XBT_DEBUG("RecvObserver comm:%p mbox:%u buff:%p tag:%d", comm_, mbox_->get_id(), dst_buff_, tag_);
+}
+
+} // namespace actor
+} // namespace kernel
+} // namespace simgrid
diff --git a/src/kernel/actor/CommObserver.hpp b/src/kernel/actor/CommObserver.hpp
new file mode 100644 (file)
index 0000000..c64f2b0
--- /dev/null
@@ -0,0 +1,179 @@
+/* Copyright (c) 2019-2022. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_MC_SIMCALL_COMM_OBSERVER_HPP
+#define SIMGRID_MC_SIMCALL_COMM_OBSERVER_HPP
+
+#include "simgrid/forward.h"
+#include "src/kernel/actor/SimcallObserver.hpp"
+#include "src/mc/transition/Transition.hpp"
+#include "xbt/asserts.h"
+
+#include <string>
+
+namespace simgrid {
+namespace kernel {
+namespace actor {
+
+class ActivityTestSimcall : public ResultingSimcall<bool> {
+  activity::ActivityImpl* const activity_;
+
+public:
+  ActivityTestSimcall(ActorImpl* actor, activity::ActivityImpl* activity)
+      : ResultingSimcall(actor, true), activity_(activity)
+  {
+  }
+  bool is_visible() const override { return true; }
+  activity::ActivityImpl* get_activity() const { return activity_; }
+  void serialize(std::stringstream& stream) const override;
+};
+
+class ActivityTestanySimcall : public ResultingSimcall<ssize_t> {
+  const std::vector<activity::ActivityImpl*>& activities_;
+  std::vector<int> indexes_; // indexes in activities_ pointing to ready activities (=whose test() is positive)
+  int next_value_ = 0;
+
+public:
+  ActivityTestanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities);
+  bool is_visible() const override { return true; }
+  bool is_enabled() override { return true; /* can return -1 if no activity is ready */ }
+  void serialize(std::stringstream& stream) const override;
+  int get_max_consider() override;
+  void prepare(int times_considered) override;
+  const std::vector<activity::ActivityImpl*>& get_activities() const { return activities_; }
+  int get_value() const { return next_value_; }
+};
+
+class ActivityWaitSimcall : public ResultingSimcall<bool> {
+  activity::ActivityImpl* activity_;
+  const double timeout_;
+
+public:
+  ActivityWaitSimcall(ActorImpl* actor, activity::ActivityImpl* activity, double timeout)
+      : ResultingSimcall(actor, false), activity_(activity), timeout_(timeout)
+  {
+  }
+  void serialize(std::stringstream& stream) const override;
+  bool is_visible() const override { return true; }
+  bool is_enabled() override;
+  activity::ActivityImpl* get_activity() const { return activity_; }
+  void set_activity(activity::ActivityImpl* activity) { activity_ = activity; }
+  double get_timeout() const { return timeout_; }
+};
+
+class ActivityWaitanySimcall : public ResultingSimcall<ssize_t> {
+  const std::vector<activity::ActivityImpl*>& activities_;
+  std::vector<int> indexes_; // indexes in activities_ pointing to ready activities (=whose test() is positive)
+  const double timeout_;
+  int next_value_ = 0;
+
+public:
+  ActivityWaitanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities, double timeout);
+  bool is_enabled() override;
+  void serialize(std::stringstream& stream) const override;
+  bool is_visible() const override { return true; }
+  void prepare(int times_considered) override;
+  int get_max_consider() override;
+  const std::vector<activity::ActivityImpl*>& get_activities() const { return activities_; }
+  double get_timeout() const { return timeout_; }
+  int get_value() const { return next_value_; }
+};
+
+class CommIsendSimcall : public SimcallObserver {
+  activity::MailboxImpl* mbox_;
+  double payload_size_;
+  double rate_;
+  unsigned char* src_buff_;
+  size_t src_buff_size_;
+  void* payload_;
+  bool detached_;
+  activity::CommImpl* comm_;
+  int tag_;
+
+  bool (*match_fun_)(void*, void*, activity::CommImpl*);
+  void (*clean_fun_)(void*); // used to free the synchro in case of problem after a detached send
+  void (*copy_data_fun_)(activity::CommImpl*, void*, size_t); // used to copy data if not default one
+
+public:
+  CommIsendSimcall(ActorImpl* actor, activity::MailboxImpl* mbox, double payload_size, double rate,
+                   unsigned char* src_buff, size_t src_buff_size, bool (*match_fun)(void*, void*, activity::CommImpl*),
+                   void (*clean_fun)(void*), // used to free the synchro in case of problem after a detached send
+                   void (*copy_data_fun)(activity::CommImpl*, void*, size_t), // used to copy data if not default one
+                   void* payload, bool detached)
+      : SimcallObserver(actor)
+      , mbox_(mbox)
+      , payload_size_(payload_size)
+      , rate_(rate)
+      , src_buff_(src_buff)
+      , src_buff_size_(src_buff_size)
+      , payload_(payload)
+      , detached_(detached)
+      , match_fun_(match_fun)
+      , clean_fun_(clean_fun)
+      , copy_data_fun_(copy_data_fun)
+  {
+  }
+  void serialize(std::stringstream& stream) const override;
+  bool is_visible() const override { return true; }
+  activity::MailboxImpl* get_mailbox() const { return mbox_; }
+  double get_payload_size() const { return payload_size_; }
+  double get_rate() const { return rate_; }
+  unsigned char* get_src_buff() const { return src_buff_; }
+  size_t get_src_buff_size() const { return src_buff_size_; }
+  void* get_payload() const { return payload_; }
+  bool is_detached() const { return detached_; }
+  void set_comm(activity::CommImpl* comm) { comm_ = comm; }
+  void set_tag(int tag) { tag_ = tag; }
+
+  auto get_match_fun() const { return match_fun_; }
+  auto get_clean_fun() const { return clean_fun_; }
+  auto get_copy_data_fun() const { return copy_data_fun_; }
+};
+
+class CommIrecvSimcall : public SimcallObserver {
+  activity::MailboxImpl* mbox_;
+  unsigned char* dst_buff_;
+  size_t* dst_buff_size_;
+  void* payload_;
+  double rate_;
+  int tag_;
+  activity::CommImpl* comm_;
+
+  bool (*match_fun_)(void*, void*, activity::CommImpl*);
+  void (*copy_data_fun_)(activity::CommImpl*, void*, size_t); // used to copy data if not default one
+
+public:
+  CommIrecvSimcall(ActorImpl* actor, activity::MailboxImpl* mbox, unsigned char* dst_buff, size_t* dst_buff_size,
+                   bool (*match_fun)(void*, void*, activity::CommImpl*),
+                   void (*copy_data_fun)(activity::CommImpl*, void*, size_t), void* payload, double rate)
+      : SimcallObserver(actor)
+      , mbox_(mbox)
+      , dst_buff_(dst_buff)
+      , dst_buff_size_(dst_buff_size)
+      , payload_(payload)
+      , rate_(rate)
+      , match_fun_(match_fun)
+      , copy_data_fun_(copy_data_fun)
+  {
+  }
+  void serialize(std::stringstream& stream) const override;
+  bool is_visible() const override { return true; }
+  activity::MailboxImpl* get_mailbox() const { return mbox_; }
+  double get_rate() const { return rate_; }
+  unsigned char* get_dst_buff() const { return dst_buff_; }
+  size_t* get_dst_buff_size() const { return dst_buff_size_; }
+  void* get_payload() const { return payload_; }
+  void set_comm(activity::CommImpl* comm) { comm_ = comm; }
+  void set_tag(int tag) { tag_ = tag; }
+
+  auto get_match_fun() const { return match_fun_; };
+  auto get_copy_data_fun() const { return copy_data_fun_; }
+};
+
+} // namespace actor
+} // namespace kernel
+} // namespace simgrid
+
+#endif
index 6da0ea5..3809e1a 100644 (file)
@@ -108,148 +108,6 @@ bool SemAcquireSimcall::is_enabled()
   return true;
 }
 
-ActivityTestanySimcall::ActivityTestanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities)
-    : ResultingSimcall(actor, -1), activities_(activities)
-{
-}
-
-int ActivityTestanySimcall::get_max_consider()
-{
-  indexes_.clear();
-  // list all the activities that are ready
-  for (unsigned i = 0; i < activities_.size(); i++)
-    if (activities_[i]->test(get_issuer()))
-      indexes_.push_back(i);
-  return indexes_.size() + 1;
-}
-
-void ActivityTestanySimcall::prepare(int times_considered)
-{
-  if (times_considered < static_cast<int>(indexes_.size()))
-    next_value_ = indexes_.at(times_considered);
-  else
-    next_value_ = -1;
-}
-static void serialize_activity_test(const activity::ActivityImpl* act, std::stringstream& stream)
-{
-  if (auto* comm = dynamic_cast<activity::CommImpl const*>(act)) {
-    stream << "  " << (short)mc::Transition::Type::COMM_TEST;
-    stream << ' ' << (uintptr_t)comm;
-    stream << ' ' << (comm->src_actor_ != nullptr ? comm->src_actor_->get_pid() : -1);
-    stream << ' ' << (comm->dst_actor_ != nullptr ? comm->dst_actor_->get_pid() : -1);
-    stream << ' ' << comm->get_mailbox_id();
-    stream << ' ' << (uintptr_t)comm->src_buff_ << ' ' << (uintptr_t)comm->dst_buff_ << ' ' << comm->src_buff_size_;
-  } else {
-    stream << (short)mc::Transition::Type::UNKNOWN;
-  }
-}
-void ActivityTestanySimcall::serialize(std::stringstream& stream) const
-{
-  stream << (short)mc::Transition::Type::TESTANY << ' ' << activities_.size() << ' ';
-  for (auto const& act : activities_) {
-    serialize_activity_test(act, stream);
-    stream << ' ';
-  }
-}
-void ActivityTestSimcall::serialize(std::stringstream& stream) const
-{
-  serialize_activity_test(activity_, stream);
-}
-static void serialize_activity_wait(const activity::ActivityImpl* act, bool timeout, std::stringstream& stream)
-{
-  if (auto* comm = dynamic_cast<activity::CommImpl const*>(act)) {
-    stream << (short)mc::Transition::Type::COMM_WAIT << ' ';
-    stream << timeout << ' ' << (uintptr_t)comm;
-
-    stream << ' ' << (comm->src_actor_ != nullptr ? comm->src_actor_->get_pid() : -1);
-    stream << ' ' << (comm->dst_actor_ != nullptr ? comm->dst_actor_->get_pid() : -1);
-    stream << ' ' << comm->get_mailbox_id();
-    stream << ' ' << (uintptr_t)comm->src_buff_ << ' ' << (uintptr_t)comm->dst_buff_ << ' ' << comm->src_buff_size_;
-  } else {
-    stream << (short)mc::Transition::Type::UNKNOWN;
-  }
-}
-
-void ActivityWaitSimcall::serialize(std::stringstream& stream) const
-{
-  serialize_activity_wait(activity_, timeout_ > 0, stream);
-}
-void ActivityWaitanySimcall::serialize(std::stringstream& stream) const
-{
-  stream << (short)mc::Transition::Type::WAITANY << ' ' << activities_.size() << ' ';
-  for (auto const& act : activities_) {
-    serialize_activity_wait(act, timeout_ > 0, stream);
-    stream << ' ';
-  }
-}
-ActivityWaitanySimcall::ActivityWaitanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities,
-                                               double timeout)
-    : ResultingSimcall(actor, -1), activities_(activities), timeout_(timeout)
-{
-}
-
-bool ActivityWaitSimcall::is_enabled()
-{
-  // FIXME: if _sg_mc_timeout == 1 and if we have either a sender or receiver timeout, the transition is enabled
-  // because even if the communication is not ready, it can timeout and won't block.
-
-  return activity_->test(get_issuer());
-}
-
-bool ActivityWaitanySimcall::is_enabled()
-{
-  // list all the activities that are ready
-  indexes_.clear();
-  for (unsigned i = 0; i < activities_.size(); i++)
-    if (activities_[i]->test(get_issuer()))
-      indexes_.push_back(i);
-
-  //  if (_sg_mc_timeout && timeout_)  FIXME: deal with the potential timeout of the WaitAny
-
-  // FIXME: even if the WaitAny has no timeout, some of the activities may still have one.
-  // we should iterate over the vector searching for them
-  return not indexes_.empty();
-}
-
-int ActivityWaitanySimcall::get_max_consider()
-{
-  // list all the activities that are ready
-  indexes_.clear();
-  for (unsigned i = 0; i < activities_.size(); i++)
-    if (activities_[i]->test(get_issuer()))
-      indexes_.push_back(i);
-
-  int res = indexes_.size();
-  //  if (_sg_mc_timeout && timeout_)
-  //    res++;
-
-  return res;
-}
-
-void ActivityWaitanySimcall::prepare(int times_considered)
-{
-  if (times_considered < static_cast<int>(indexes_.size()))
-    next_value_ = indexes_.at(times_considered);
-  else
-    next_value_ = -1;
-}
-
-void CommIsendSimcall::serialize(std::stringstream& stream) const
-{
-  stream << (short)mc::Transition::Type::COMM_SEND << ' ';
-  stream << (uintptr_t)comm_ << ' ' << mbox_->get_id() << ' ' << (uintptr_t)src_buff_ << ' ' << src_buff_size_ << ' '
-         << tag_;
-  XBT_DEBUG("SendObserver comm:%p mbox:%u buff:%p size:%zu tag:%d", comm_, mbox_->get_id(), src_buff_, src_buff_size_,
-            tag_);
-}
-
-void CommIrecvSimcall::serialize(std::stringstream& stream) const
-{
-  stream << (short)mc::Transition::Type::COMM_RECV << ' ';
-  stream << (uintptr_t)comm_ << ' ' << mbox_->get_id() << ' ' << (uintptr_t)dst_buff_ << ' ' << tag_;
-  XBT_DEBUG("RecvObserver comm:%p mbox:%u buff:%p tag:%d", comm_, mbox_->get_id(), dst_buff_, tag_);
-}
-
 } // namespace actor
 } // namespace kernel
 } // namespace simgrid
index 4242b2d..f489e0c 100644 (file)
@@ -145,161 +145,6 @@ public:
   double get_timeout() const { return timeout_; }
 };
 
-class ActivityTestSimcall : public ResultingSimcall<bool> {
-  activity::ActivityImpl* const activity_;
-
-public:
-  ActivityTestSimcall(ActorImpl* actor, activity::ActivityImpl* activity)
-      : ResultingSimcall(actor, true), activity_(activity)
-  {
-  }
-  bool is_visible() const override { return true; }
-  activity::ActivityImpl* get_activity() const { return activity_; }
-  void serialize(std::stringstream& stream) const override;
-};
-
-class ActivityTestanySimcall : public ResultingSimcall<ssize_t> {
-  const std::vector<activity::ActivityImpl*>& activities_;
-  std::vector<int> indexes_; // indexes in activities_ pointing to ready activities (=whose test() is positive)
-  int next_value_ = 0;
-
-public:
-  ActivityTestanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities);
-  bool is_visible() const override { return true; }
-  bool is_enabled() override { return true; /* can return -1 if no activity is ready */ }
-  void serialize(std::stringstream& stream) const override;
-  int get_max_consider() override;
-  void prepare(int times_considered) override;
-  const std::vector<activity::ActivityImpl*>& get_activities() const { return activities_; }
-  int get_value() const { return next_value_; }
-};
-
-class ActivityWaitSimcall : public ResultingSimcall<bool> {
-  activity::ActivityImpl* activity_;
-  const double timeout_;
-
-public:
-  ActivityWaitSimcall(ActorImpl* actor, activity::ActivityImpl* activity, double timeout)
-      : ResultingSimcall(actor, false), activity_(activity), timeout_(timeout)
-  {
-  }
-  void serialize(std::stringstream& stream) const override;
-  bool is_visible() const override { return true; }
-  bool is_enabled() override;
-  activity::ActivityImpl* get_activity() const { return activity_; }
-  void set_activity(activity::ActivityImpl* activity) { activity_ = activity; }
-  double get_timeout() const { return timeout_; }
-};
-
-class ActivityWaitanySimcall : public ResultingSimcall<ssize_t> {
-  const std::vector<activity::ActivityImpl*>& activities_;
-  std::vector<int> indexes_; // indexes in activities_ pointing to ready activities (=whose test() is positive)
-  const double timeout_;
-  int next_value_ = 0;
-
-public:
-  ActivityWaitanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities, double timeout);
-  bool is_enabled() override;
-  void serialize(std::stringstream& stream) const override;
-  bool is_visible() const override { return true; }
-  void prepare(int times_considered) override;
-  int get_max_consider() override;
-  const std::vector<activity::ActivityImpl*>& get_activities() const { return activities_; }
-  double get_timeout() const { return timeout_; }
-  int get_value() const { return next_value_; }
-};
-
-class CommIsendSimcall : public SimcallObserver {
-  activity::MailboxImpl* mbox_;
-  double payload_size_;
-  double rate_;
-  unsigned char* src_buff_;
-  size_t src_buff_size_;
-  void* payload_;
-  bool detached_;
-  activity::CommImpl* comm_;
-  int tag_;
-
-  bool (*match_fun_)(void*, void*, activity::CommImpl*);
-  void (*clean_fun_)(void*); // used to free the synchro in case of problem after a detached send
-  void (*copy_data_fun_)(activity::CommImpl*, void*, size_t); // used to copy data if not default one
-
-public:
-  CommIsendSimcall(ActorImpl* actor, activity::MailboxImpl* mbox, double payload_size, double rate,
-                   unsigned char* src_buff, size_t src_buff_size, bool (*match_fun)(void*, void*, activity::CommImpl*),
-                   void (*clean_fun)(void*), // used to free the synchro in case of problem after a detached send
-                   void (*copy_data_fun)(activity::CommImpl*, void*, size_t), // used to copy data if not default one
-                   void* payload, bool detached)
-      : SimcallObserver(actor)
-      , mbox_(mbox)
-      , payload_size_(payload_size)
-      , rate_(rate)
-      , src_buff_(src_buff)
-      , src_buff_size_(src_buff_size)
-      , payload_(payload)
-      , detached_(detached)
-      , match_fun_(match_fun)
-      , clean_fun_(clean_fun)
-      , copy_data_fun_(copy_data_fun)
-  {
-  }
-  void serialize(std::stringstream& stream) const override;
-  bool is_visible() const override { return true; }
-  activity::MailboxImpl* get_mailbox() const { return mbox_; }
-  double get_payload_size() const { return payload_size_; }
-  double get_rate() const { return rate_; }
-  unsigned char* get_src_buff() const { return src_buff_; }
-  size_t get_src_buff_size() const { return src_buff_size_; }
-  void* get_payload() const { return payload_; }
-  bool is_detached() const { return detached_; }
-  void set_comm(activity::CommImpl* comm) { comm_ = comm; }
-  void set_tag(int tag) { tag_ = tag; }
-
-  auto get_match_fun() const { return match_fun_; }
-  auto get_clean_fun() const { return clean_fun_; }
-  auto get_copy_data_fun() const { return copy_data_fun_; }
-};
-
-class CommIrecvSimcall : public SimcallObserver {
-  activity::MailboxImpl* mbox_;
-  unsigned char* dst_buff_;
-  size_t* dst_buff_size_;
-  void* payload_;
-  double rate_;
-  int tag_;
-  activity::CommImpl* comm_;
-
-  bool (*match_fun_)(void*, void*, activity::CommImpl*);
-  void (*copy_data_fun_)(activity::CommImpl*, void*, size_t); // used to copy data if not default one
-
-public:
-  CommIrecvSimcall(ActorImpl* actor, activity::MailboxImpl* mbox, unsigned char* dst_buff, size_t* dst_buff_size,
-                   bool (*match_fun)(void*, void*, activity::CommImpl*),
-                   void (*copy_data_fun)(activity::CommImpl*, void*, size_t), void* payload, double rate)
-      : SimcallObserver(actor)
-      , mbox_(mbox)
-      , dst_buff_(dst_buff)
-      , dst_buff_size_(dst_buff_size)
-      , payload_(payload)
-      , rate_(rate)
-      , match_fun_(match_fun)
-      , copy_data_fun_(copy_data_fun)
-  {
-  }
-  void serialize(std::stringstream& stream) const override;
-  bool is_visible() const override { return true; }
-  activity::MailboxImpl* get_mailbox() const { return mbox_; }
-  double get_rate() const { return rate_; }
-  unsigned char* get_dst_buff() const { return dst_buff_; }
-  size_t* get_dst_buff_size() const { return dst_buff_size_; }
-  void* get_payload() const { return payload_; }
-  void set_comm(activity::CommImpl* comm) { comm_ = comm; }
-  void set_tag(int tag) { tag_ = tag; }
-
-  auto get_match_fun() const { return match_fun_; };
-  auto get_copy_data_fun() const { return copy_data_fun_; }
-};
-
 } // namespace actor
 } // namespace kernel
 } // namespace simgrid
index aed5f01..32f7bd1 100644 (file)
@@ -13,7 +13,7 @@
 
 #include "src/kernel/activity/ActivityImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
-#include "src/kernel/actor/SimcallObserver.hpp"
+#include "src/kernel/actor/CommObserver.hpp"
 
 XBT_LOG_EXTERNAL_CATEGORY(s4u);
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activity, s4u, "S4U activities");
index df10af4..b7faed5 100644 (file)
@@ -413,6 +413,8 @@ set(SIMIX_SRC
   src/kernel/activity/SynchroRaw.hpp
   src/kernel/actor/ActorImpl.cpp
   src/kernel/actor/ActorImpl.hpp
+  src/kernel/actor/CommObserver.cpp
+  src/kernel/actor/CommObserver.hpp
   src/kernel/actor/SimcallObserver.cpp
   src/kernel/actor/SimcallObserver.hpp