Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
New functions: Comm::sendto_{init,async}
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 20 Jan 2021 16:47:32 +0000 (17:47 +0100)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 20 Jan 2021 16:47:32 +0000 (17:47 +0100)
Along with a new example.

15 files changed:
ChangeLog
MANIFEST.in
examples/s4u/CMakeLists.txt
examples/s4u/comm-host2host/s4u-comm-host2host.cpp [new file with mode: 0644]
examples/s4u/comm-host2host/s4u-comm-host2host.tesh [new file with mode: 0644]
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/Host.hpp
src/kernel/activity/ActivityImpl.cpp
src/kernel/activity/ActivityImpl.hpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/CommImpl.hpp
src/s4u/s4u_Activity.cpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Host.cpp

index b1684e8..64b9124 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -15,6 +15,8 @@ S4U:
    or Mailbox::get_async<void>() if you really want to play with void*.
  - Unify the interface of Activity::on_{start/activity}
  - New function: Comm::get_dst_data()
+ - New functions: Comm::sendto_{init,async} to initiate a communication
+   on between two (possibly remote) hosts.
 
 ----------------------------------------------------------------------------
 
index 37aa366..0a79b29 100644 (file)
@@ -368,6 +368,8 @@ include examples/s4u/cloud-simple/s4u-cloud-simple.cpp
 include examples/s4u/cloud-simple/s4u-cloud-simple.tesh
 include examples/s4u/comm-dependent/s4u-comm-dependent.cpp
 include examples/s4u/comm-dependent/s4u-comm-dependent.tesh
+include examples/s4u/comm-host2host/s4u-comm-host2host.cpp
+include examples/s4u/comm-host2host/s4u-comm-host2host.tesh
 include examples/s4u/comm-ready/s4u-comm-ready.cpp
 include examples/s4u/comm-ready/s4u-comm-ready.tesh
 include examples/s4u/comm-ready/s4u-comm-ready_d.xml
@@ -2302,7 +2304,6 @@ include src/mc/inspect/mc_unw.hpp
 include src/mc/inspect/mc_unw_vmread.cpp
 include src/mc/mc_api.cpp
 include src/mc/mc_api.hpp
-include src/mc/udpor_global.hpp
 include src/mc/mc_base.cpp
 include src/mc/mc_base.h
 include src/mc/mc_client_api.cpp
@@ -2349,6 +2350,7 @@ include src/mc/sosp/Region.hpp
 include src/mc/sosp/Snapshot.cpp
 include src/mc/sosp/Snapshot.hpp
 include src/mc/sosp/Snapshot_test.cpp
+include src/mc/udpor_global.hpp
 include src/msg/msg_comm.cpp
 include src/msg/msg_global.cpp
 include src/msg/msg_legacy.cpp
index 1e9d560..b83673b 100644 (file)
@@ -63,7 +63,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
                  actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize
                  app-bittorrent app-chainsend app-pingpong app-token-ring
                  comm-ready comm-suspend comm-wait comm-waitany comm-waitall comm-waituntil
-                 comm-dependent
+                 comm-dependent comm-host2host
                  cloud-capping cloud-migration cloud-simple
                  dht-chord dht-kademlia
                  energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi
@@ -71,7 +71,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
                  exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany exec-waitfor exec-dependent
                  maestro-set
                  mc-bugged1 mc-bugged2 mc-electric-fence mc-failing-assert
-                network-wifi
+                            network-wifi
                  io-async io-file-system io-file-remote io-disk-raw io-dependent
                  platform-failures platform-profile platform-properties
                  plugin-host-load plugin-link-load
diff --git a/examples/s4u/comm-host2host/s4u-comm-host2host.cpp b/examples/s4u/comm-host2host/s4u-comm-host2host.cpp
new file mode 100644 (file)
index 0000000..790b9cb
--- /dev/null
@@ -0,0 +1,67 @@
+/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This simple example demonstrates the Comm::sento_init() Comm::sento_async() functions,
+   that can be used to create a direct communication from one host to another without
+   relying on the mailbox mechanism.
+
+   There is not much to say, actually: The _init variant creates the communication and
+   leaves it unstarted (in case you want to modify this communication before it starts),
+   while the _async variant creates and start it. In both cases, you need to wait() it.
+
+   It is mostly useful when you want to have a centralized simulation of your settings,
+   with a central actor declaring all communications occuring on your distributed system.
+  */
+
+#include <simgrid/s4u.hpp>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_host2host, "Messages specific for this s4u example");
+
+static void sender(sg4::Host* h1, sg4::Host* h2, sg4::Host* h3, sg4::Host* h4)
+{
+  XBT_INFO("Send c12 with sendto_async(%s -> %s), and c34 with sendto_init(%s -> %s)", h1->get_cname(), h2->get_cname(),
+           h3->get_cname(), h4->get_cname());
+
+  auto c12 = sg4::Comm::sendto_async(h1, h2, 1.5e7); // Creates and start a direct communication
+
+  auto c34 = sg4::Comm::sendto_init(h3, h4); // Creates but do not start another direct communication
+  c34->set_remaining(1e7);                   // Specify the amount of bytes to exchange in this comm
+
+  XBT_INFO("After creation,  c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+           c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+  sg4::this_actor::sleep_for(1);
+  XBT_INFO("One sec later,   c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+           c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+  c34->start();
+  XBT_INFO("After c34->start,c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+           c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+  c12->wait();
+  XBT_INFO("After c12->wait, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+           c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+  c34->wait();
+  XBT_INFO("After c34->wait, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+           c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+
+  /* As usual, you don't have to explicitly start communications that were just init()ed. The wait() will start it
+   * automatically. */
+  auto c14 = sg4::Comm::sendto_init(h1, h4);
+  c14->set_remaining(100)->wait(); // Chaining 2 operations on this new communication
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+  e.load_platform(argv[1]);
+
+  sg4::Actor::create("sender", sg4::Host::by_name("Tremblay"), sender, sg4::Host::by_name("Tremblay"),
+                     sg4::Host::by_name("Jupiter"), sg4::Host::by_name("Fafard"), sg4::Host::by_name("Ginette"));
+
+  e.run();
+
+  XBT_INFO("Total simulation time: %.3f", e.get_clock());
+
+  return 0;
+}
diff --git a/examples/s4u/comm-host2host/s4u-comm-host2host.tesh b/examples/s4u/comm-host2host/s4u-comm-host2host.tesh
new file mode 100644 (file)
index 0000000..5f3fc03
--- /dev/null
@@ -0,0 +1,12 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-comm-host2host ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
+> [  0.000000] (1:pinger@Tremblay) Ping from mailbox Mailbox 1 to mailbox Mailbox 2
+> [  0.000000] (2:ponger@Jupiter) Pong from mailbox Mailbox 2 to mailbox Mailbox 1
+> [  0.019014] (2:ponger@Jupiter) Payload received : small communication (latency bound)
+> [  0.019014] (2:ponger@Jupiter) Ping time (latency bound) 0.019014
+> [  0.019014] (2:ponger@Jupiter) payload = 0.019
+> [150.178356] (1:pinger@Tremblay) Payload received : large communication (bandwidth bound)
+> [150.178356] (1:pinger@Tremblay) Pong time (bandwidth bound): 150.159
+> [150.178356] (0:maestro@) Total simulation time: 150.178
+
index e4fb4c8..1881740 100644 (file)
@@ -87,6 +87,8 @@ public:
   virtual Activity* cancel() = 0;
   /** Retrieve the current state of the activity */
   Activity::State get_state() const { return state_; }
+  /** Return a string representation of the activity's state (one of INITED, STARTING, STARTED, CANCELED, FINISHED) */
+  const char* get_state_str();
   void set_state(Activity::State state) { state_ = state; }
   /** Tests whether the given activity is terminated yet. */
   virtual bool test();
index a49d52a..86bbc5c 100644 (file)
@@ -20,8 +20,10 @@ namespace s4u {
  */
 class XBT_PUBLIC Comm : public Activity_T<Comm> {
   Mailbox* mailbox_                   = nullptr;
-  kernel::actor::ActorImpl* sender_   = nullptr;
+  kernel::actor::ActorImpl* sender_   = nullptr; /* specified for normal mailbox-based communications*/
   kernel::actor::ActorImpl* receiver_ = nullptr;
+  Host* from_                         = nullptr; /* specified only for direct host-to-host communications */
+  Host* to_                           = nullptr;
   double rate_                        = -1;
   void* dst_buff_                     = nullptr;
   size_t dst_buff_size_               = 0;
@@ -42,6 +44,12 @@ public:
 
   ~Comm() override;
 
+  /*! Creates a communication beween the two given hosts, bypassing the mailbox mechanism. */
+  static CommPtr sendto_init(Host* from, Host* to);
+  /*! Creates and start a communication of the given amount of bytes beween the two given hosts, bypassing the mailbox
+   * mechanism */
+  static CommPtr sendto_async(Host* from, Host* to, double simulated_size_in_bytes);
+
   static xbt::signal<void(Comm const&, bool is_sender)> on_start;
   static xbt::signal<void(Comm const&)> on_completion;
 
index c6691d7..d8b4d43 100644 (file)
@@ -173,7 +173,7 @@ public:
    * There is really no limit on the hosts involved. In particular, the actor does not have to be on one of the involved
    * hosts.
    */
-  ActivityPtr sendto_async(Host* dest, double byte_amount);
+  CommPtr sendto_async(Host* dest, double byte_amount);
 
 #ifndef DOXYGEN
   XBT_ATTRIB_DEPRECATED_v330("Please use Host::sendto()") void send_to(Host* dest, double byte_amount)
index 06a44e2..cd4b557 100644 (file)
@@ -40,6 +40,47 @@ double ActivityImpl::get_remaining() const
   return surf_action_ ? surf_action_->get_remains() : 0;
 }
 
+const char* ActivityImpl::get_state_str()
+{
+  switch (state_) {
+    case State::WAITING:
+      return "WAITING";
+
+    case State::READY:
+      return "READY";
+
+    case State::RUNNING:
+      return "RUNNING";
+
+    case State::CANCELED:
+      return "CANCELED";
+
+    case State::FAILED:
+      return "FAILED";
+
+    case State::DONE:
+      return "DONE";
+
+    case State::SRC_HOST_FAILURE:
+      return "SRC_HOST_FAILURE";
+
+    case State::DST_HOST_FAILURE:
+      return "DST_HOST_FAILURE";
+
+    case State::TIMEOUT:
+      return "TIMEOUT";
+
+    case State::SRC_TIMEOUT:
+      return "SRC_TIMEOUT";
+    case State::DST_TIMEOUT:
+      return "DST_TIMEOUT";
+
+    case State::LINK_FAILURE:
+      return "LINK_FAILURE";
+  }
+  THROW_IMPOSSIBLE;
+}
+
 bool ActivityImpl::test()
 {
   if (state_ != State::WAITING && state_ != State::RUNNING) {
index 853d3dc..dc4c84f 100644 (file)
@@ -73,6 +73,7 @@ public:
   virtual void register_simcall(smx_simcall_t simcall);
   void clean_action();
   virtual double get_remaining() const;
+  const char* get_state_str();
   // Support for the boost::intrusive_ptr<ActivityImpl> datatype
   friend XBT_PUBLIC void intrusive_ptr_add_ref(ActivityImpl* activity);
   friend XBT_PUBLIC void intrusive_ptr_release(ActivityImpl* activity);
index 9f4b218..5836273 100644 (file)
@@ -408,6 +408,11 @@ CommImpl& CommImpl::detach()
   return *this;
 }
 
+CommImpl::CommImpl(s4u::Host* from, s4u::Host* to, double bytes) : size_(bytes), detached_(true), from_(from), to_(to)
+{
+  state_ = State::READY;
+}
+
 CommImpl::~CommImpl()
 {
   XBT_DEBUG("Really free communication %p in state %d (detached = %d)", this, static_cast<int>(state_), detached_);
@@ -430,25 +435,26 @@ CommImpl* CommImpl::start()
 {
   /* If both the sender and the receiver are already there, start the communication */
   if (state_ == State::READY) {
-    s4u::Host* sender   = src_actor_->get_host();
-    s4u::Host* receiver = dst_actor_->get_host();
+    from_ = from_ != nullptr ? from_ : src_actor_->get_host();
+    to_   = to_ != nullptr ? to_ : dst_actor_->get_host();
 
-    surf_action_ = surf_network_model->communicate(sender, receiver, size_, rate_);
+    surf_action_ = surf_network_model->communicate(from_, to_, size_, rate_);
     surf_action_->set_activity(this);
     surf_action_->set_category(get_tracing_category());
     state_ = State::RUNNING;
 
-    XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", this, sender->get_cname(),
-              receiver->get_cname(), surf_action_);
+    XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p; state: %s)", this, from_->get_cname(),
+              to_->get_cname(), surf_action_, get_state_str());
 
     /* If a link is failed, detect it immediately */
     if (surf_action_->get_state() == resource::Action::State::FAILED) {
-      XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(),
-                receiver->get_cname());
+      XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", from_->get_cname(),
+                to_->get_cname());
       state_ = State::LINK_FAILURE;
       post();
 
-    } else if (src_actor_->is_suspended() || dst_actor_->is_suspended()) {
+    } else if ((src_actor_ != nullptr && src_actor_->is_suspended()) ||
+               (dst_actor_ != nullptr && dst_actor_->is_suspended())) {
       /* If any of the process is suspended, create the synchro but stop its execution,
          it will be restarted when the sender process resume */
       if (src_actor_->is_suspended())
@@ -561,7 +567,7 @@ void CommImpl::post()
   } else
     state_ = State::DONE;
 
-  XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", this, (int)state_,
+  XBT_DEBUG("CommImpl::post(): comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
             src_actor_.get(), dst_actor_.get(), detached_);
 
   /* destroy the surf actions associated with the Simix communication */
@@ -707,8 +713,10 @@ void CommImpl::finish()
         if (src_actor_)
           src_actor_->activities_.remove(this);
       } else {
-        dst_actor_->activities_.remove(this);
-        src_actor_->activities_.remove(this);
+        if (dst_actor_ != nullptr)
+          dst_actor_->activities_.remove(this);
+        if (src_actor_ != nullptr)
+          src_actor_->activities_.remove(this);
       }
     }
   }
index ece1fdb..c47f748 100644 (file)
@@ -26,6 +26,9 @@ class XBT_PUBLIC CommImpl : public ActivityImpl_T<CommImpl> {
   MailboxImpl* mbox_ = nullptr; /* Rendez-vous where the comm is queued */
 
 public:
+  CommImpl() = default;
+  CommImpl(s4u::Host* from, s4u::Host* to, double bytes);
+
   enum class Type { SEND = 0, RECEIVE, READY, DONE };
 
   CommImpl& set_type(CommImpl::Type type);
@@ -68,6 +71,8 @@ expectations of the other side, too. See  */
   resource::Action* dst_timeout_ = nullptr; /* Surf's actions to instrument the timeouts */
   actor::ActorImplPtr src_actor_ = nullptr;
   actor::ActorImplPtr dst_actor_ = nullptr;
+  s4u::Host* from_               = nullptr; /* Pre-determined only for direct host-to-host communications */
+  s4u::Host* to_                 = nullptr; /* Otherwise, computed at start() time from the actors */
 
   /* Data to be transferred */
   unsigned char* src_buff_ = nullptr;
index 3e3b92f..529d08f 100644 (file)
@@ -65,6 +65,27 @@ Activity* Activity::resume()
   return this;
 }
 
+const char* Activity::get_state_str()
+{
+  switch (state_) {
+    case State::INITED:
+      return "INITED";
+
+    case State::STARTING:
+      return "STARTING";
+
+    case State::STARTED:
+      return "STARTED";
+
+    case State::CANCELED:
+      return "CANCELED";
+
+    case State::FINISHED:
+      return "FINISHED";
+  }
+  THROW_IMPOSSIBLE;
+}
+
 double Activity::get_remaining() const
 {
   if (state_ == State::INITED || state_ == State::STARTING)
index 0a2fd1d..f9c3c37 100644 (file)
@@ -115,12 +115,36 @@ CommPtr Comm::set_dst_data(void** buff, size_t size)
   return this;
 }
 
+CommPtr Comm::sendto_init(Host* from, Host* to)
+{
+  CommPtr res(new Comm());
+  res->from_ = from;
+  res->to_   = to;
+
+  return res;
+}
+CommPtr Comm::sendto_async(Host* from, Host* to, double simulated_size_in_bytes)
+{
+  auto res = Comm::sendto_init(from, to);
+  res->set_remaining(simulated_size_in_bytes)->start();
+  return res;
+}
+
 Comm* Comm::start()
 {
   xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
              "You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
-
-  if (src_buff_ != nullptr) { // Sender side
+  if (from_ != nullptr || to_ != nullptr) {
+    xbt_assert(from_ != nullptr && to_ != nullptr, "When either from_ or to_ is specified, both must be.");
+    xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
+               "Direct host-to-host communications cannot carry any data.");
+    pimpl_ = kernel::actor::simcall([this] {
+      auto res = new kernel::activity::CommImpl(this->from_, this->to_, this->get_remaining());
+      res->start();
+      return res;
+    });
+
+  } else if (src_buff_ != nullptr) { // Sender side
     on_start(*this, true /* is_sender*/);
     pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
                                 clean_fun_, copy_data_function_, get_user_data(), detached_);
@@ -160,8 +184,10 @@ Comm* Comm::wait_for(double timeout)
       break;
 
     case State::INITED:
-    case State::STARTING: // It's not started yet. Do it in one simcall
-      if (src_buff_ != nullptr) {
+    case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
+      if (from_ != nullptr || to_ != nullptr) {
+        return start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls
+      } else if (src_buff_ != nullptr) {
         on_start(*this, true /*is_sender*/);
         simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
                           copy_data_function_, get_user_data(), timeout);
index 02e6e33..8a70f85 100644 (file)
@@ -6,6 +6,7 @@
 #include "simgrid/host.h"
 #include "simgrid/kernel/routing/NetPoint.hpp"
 #include "simgrid/s4u/Actor.hpp"
+#include "simgrid/s4u/Comm.hpp"
 #include "simgrid/s4u/Engine.hpp"
 #include "simgrid/s4u/Exec.hpp"
 #include "simgrid/s4u/VirtualMachine.hpp"
@@ -180,12 +181,9 @@ void Host::sendto(Host* dest, double byte_amount)
   sendto_async(dest, byte_amount)->wait();
 }
 
-ActivityPtr Host::sendto_async(Host* dest, double byte_amount)
+CommPtr Host::sendto_async(Host* dest, double byte_amount)
 {
-  std::vector<Host*> m_host_list   = {this, dest};
-  std::vector<double> flops_amount = {0, 0};
-  std::vector<double> bytes_amount = {0, byte_amount, 0, 0};
-  return this_actor::exec_init(m_host_list, flops_amount, bytes_amount)->start();
+  return Comm::sendto_async(this, dest, byte_amount);
 }
 
 /** Get the properties assigned to a host */