From b1020c2311048f26a586af241101434faa59445b Mon Sep 17 00:00:00 2001 From: Martin Quinson Date: Wed, 20 Jan 2021 17:47:32 +0100 Subject: [PATCH] New functions: Comm::sendto_{init,async} Along with a new example. --- ChangeLog | 2 + MANIFEST.in | 4 +- examples/s4u/CMakeLists.txt | 4 +- .../s4u/comm-host2host/s4u-comm-host2host.cpp | 67 +++++++++++++++++++ .../comm-host2host/s4u-comm-host2host.tesh | 12 ++++ include/simgrid/s4u/Activity.hpp | 2 + include/simgrid/s4u/Comm.hpp | 10 ++- include/simgrid/s4u/Host.hpp | 2 +- src/kernel/activity/ActivityImpl.cpp | 41 ++++++++++++ src/kernel/activity/ActivityImpl.hpp | 1 + src/kernel/activity/CommImpl.cpp | 30 ++++++--- src/kernel/activity/CommImpl.hpp | 5 ++ src/s4u/s4u_Activity.cpp | 21 ++++++ src/s4u/s4u_Comm.cpp | 34 ++++++++-- src/s4u/s4u_Host.cpp | 8 +-- 15 files changed, 218 insertions(+), 25 deletions(-) create mode 100644 examples/s4u/comm-host2host/s4u-comm-host2host.cpp create mode 100644 examples/s4u/comm-host2host/s4u-comm-host2host.tesh diff --git a/ChangeLog b/ChangeLog index b1684e8c58..64b9124bfb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -15,6 +15,8 @@ S4U: or Mailbox::get_async() if you really want to play with void*. - Unify the interface of Activity::on_{start/activity} - New function: Comm::get_dst_data() + - New functions: Comm::sendto_{init,async} to initiate a communication + on between two (possibly remote) hosts. ---------------------------------------------------------------------------- diff --git a/MANIFEST.in b/MANIFEST.in index 37aa366127..0a79b29dc2 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -368,6 +368,8 @@ include examples/s4u/cloud-simple/s4u-cloud-simple.cpp include examples/s4u/cloud-simple/s4u-cloud-simple.tesh include examples/s4u/comm-dependent/s4u-comm-dependent.cpp include examples/s4u/comm-dependent/s4u-comm-dependent.tesh +include examples/s4u/comm-host2host/s4u-comm-host2host.cpp +include examples/s4u/comm-host2host/s4u-comm-host2host.tesh include examples/s4u/comm-ready/s4u-comm-ready.cpp include examples/s4u/comm-ready/s4u-comm-ready.tesh include examples/s4u/comm-ready/s4u-comm-ready_d.xml @@ -2302,7 +2304,6 @@ include src/mc/inspect/mc_unw.hpp include src/mc/inspect/mc_unw_vmread.cpp include src/mc/mc_api.cpp include src/mc/mc_api.hpp -include src/mc/udpor_global.hpp include src/mc/mc_base.cpp include src/mc/mc_base.h include src/mc/mc_client_api.cpp @@ -2349,6 +2350,7 @@ include src/mc/sosp/Region.hpp include src/mc/sosp/Snapshot.cpp include src/mc/sosp/Snapshot.hpp include src/mc/sosp/Snapshot_test.cpp +include src/mc/udpor_global.hpp include src/msg/msg_comm.cpp include src/msg/msg_global.cpp include src/msg/msg_legacy.cpp diff --git a/examples/s4u/CMakeLists.txt b/examples/s4u/CMakeLists.txt index 1e9d5603c6..b83673b633 100644 --- a/examples/s4u/CMakeLists.txt +++ b/examples/s4u/CMakeLists.txt @@ -63,7 +63,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize app-bittorrent app-chainsend app-pingpong app-token-ring comm-ready comm-suspend comm-wait comm-waitany comm-waitall comm-waituntil - comm-dependent + comm-dependent comm-host2host cloud-capping cloud-migration cloud-simple dht-chord dht-kademlia energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi @@ -71,7 +71,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany exec-waitfor exec-dependent maestro-set mc-bugged1 mc-bugged2 mc-electric-fence mc-failing-assert - network-wifi + network-wifi io-async io-file-system io-file-remote io-disk-raw io-dependent platform-failures platform-profile platform-properties plugin-host-load plugin-link-load diff --git a/examples/s4u/comm-host2host/s4u-comm-host2host.cpp b/examples/s4u/comm-host2host/s4u-comm-host2host.cpp new file mode 100644 index 0000000000..790b9cb7ab --- /dev/null +++ b/examples/s4u/comm-host2host/s4u-comm-host2host.cpp @@ -0,0 +1,67 @@ +/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* This simple example demonstrates the Comm::sento_init() Comm::sento_async() functions, + that can be used to create a direct communication from one host to another without + relying on the mailbox mechanism. + + There is not much to say, actually: The _init variant creates the communication and + leaves it unstarted (in case you want to modify this communication before it starts), + while the _async variant creates and start it. In both cases, you need to wait() it. + + It is mostly useful when you want to have a centralized simulation of your settings, + with a central actor declaring all communications occuring on your distributed system. + */ + +#include +namespace sg4 = simgrid::s4u; + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_host2host, "Messages specific for this s4u example"); + +static void sender(sg4::Host* h1, sg4::Host* h2, sg4::Host* h3, sg4::Host* h4) +{ + XBT_INFO("Send c12 with sendto_async(%s -> %s), and c34 with sendto_init(%s -> %s)", h1->get_cname(), h2->get_cname(), + h3->get_cname(), h4->get_cname()); + + auto c12 = sg4::Comm::sendto_async(h1, h2, 1.5e7); // Creates and start a direct communication + + auto c34 = sg4::Comm::sendto_init(h3, h4); // Creates but do not start another direct communication + c34->set_remaining(1e7); // Specify the amount of bytes to exchange in this comm + + XBT_INFO("After creation, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)", + c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining()); + sg4::this_actor::sleep_for(1); + XBT_INFO("One sec later, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)", + c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining()); + c34->start(); + XBT_INFO("After c34->start,c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)", + c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining()); + c12->wait(); + XBT_INFO("After c12->wait, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)", + c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining()); + c34->wait(); + XBT_INFO("After c34->wait, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)", + c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining()); + + /* As usual, you don't have to explicitly start communications that were just init()ed. The wait() will start it + * automatically. */ + auto c14 = sg4::Comm::sendto_init(h1, h4); + c14->set_remaining(100)->wait(); // Chaining 2 operations on this new communication +} + +int main(int argc, char* argv[]) +{ + sg4::Engine e(&argc, argv); + e.load_platform(argv[1]); + + sg4::Actor::create("sender", sg4::Host::by_name("Tremblay"), sender, sg4::Host::by_name("Tremblay"), + sg4::Host::by_name("Jupiter"), sg4::Host::by_name("Fafard"), sg4::Host::by_name("Ginette")); + + e.run(); + + XBT_INFO("Total simulation time: %.3f", e.get_clock()); + + return 0; +} diff --git a/examples/s4u/comm-host2host/s4u-comm-host2host.tesh b/examples/s4u/comm-host2host/s4u-comm-host2host.tesh new file mode 100644 index 0000000000..5f3fc03bf2 --- /dev/null +++ b/examples/s4u/comm-host2host/s4u-comm-host2host.tesh @@ -0,0 +1,12 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-comm-host2host ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" +> [ 0.000000] (1:pinger@Tremblay) Ping from mailbox Mailbox 1 to mailbox Mailbox 2 +> [ 0.000000] (2:ponger@Jupiter) Pong from mailbox Mailbox 2 to mailbox Mailbox 1 +> [ 0.019014] (2:ponger@Jupiter) Payload received : small communication (latency bound) +> [ 0.019014] (2:ponger@Jupiter) Ping time (latency bound) 0.019014 +> [ 0.019014] (2:ponger@Jupiter) payload = 0.019 +> [150.178356] (1:pinger@Tremblay) Payload received : large communication (bandwidth bound) +> [150.178356] (1:pinger@Tremblay) Pong time (bandwidth bound): 150.159 +> [150.178356] (0:maestro@) Total simulation time: 150.178 + diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index e4fb4c8500..1881740875 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -87,6 +87,8 @@ public: virtual Activity* cancel() = 0; /** Retrieve the current state of the activity */ Activity::State get_state() const { return state_; } + /** Return a string representation of the activity's state (one of INITED, STARTING, STARTED, CANCELED, FINISHED) */ + const char* get_state_str(); void set_state(Activity::State state) { state_ = state; } /** Tests whether the given activity is terminated yet. */ virtual bool test(); diff --git a/include/simgrid/s4u/Comm.hpp b/include/simgrid/s4u/Comm.hpp index a49d52a1d7..86bbc5c51e 100644 --- a/include/simgrid/s4u/Comm.hpp +++ b/include/simgrid/s4u/Comm.hpp @@ -20,8 +20,10 @@ namespace s4u { */ class XBT_PUBLIC Comm : public Activity_T { Mailbox* mailbox_ = nullptr; - kernel::actor::ActorImpl* sender_ = nullptr; + kernel::actor::ActorImpl* sender_ = nullptr; /* specified for normal mailbox-based communications*/ kernel::actor::ActorImpl* receiver_ = nullptr; + Host* from_ = nullptr; /* specified only for direct host-to-host communications */ + Host* to_ = nullptr; double rate_ = -1; void* dst_buff_ = nullptr; size_t dst_buff_size_ = 0; @@ -42,6 +44,12 @@ public: ~Comm() override; + /*! Creates a communication beween the two given hosts, bypassing the mailbox mechanism. */ + static CommPtr sendto_init(Host* from, Host* to); + /*! Creates and start a communication of the given amount of bytes beween the two given hosts, bypassing the mailbox + * mechanism */ + static CommPtr sendto_async(Host* from, Host* to, double simulated_size_in_bytes); + static xbt::signal on_start; static xbt::signal on_completion; diff --git a/include/simgrid/s4u/Host.hpp b/include/simgrid/s4u/Host.hpp index c6691d751a..d8b4d43d41 100644 --- a/include/simgrid/s4u/Host.hpp +++ b/include/simgrid/s4u/Host.hpp @@ -173,7 +173,7 @@ public: * There is really no limit on the hosts involved. In particular, the actor does not have to be on one of the involved * hosts. */ - ActivityPtr sendto_async(Host* dest, double byte_amount); + CommPtr sendto_async(Host* dest, double byte_amount); #ifndef DOXYGEN XBT_ATTRIB_DEPRECATED_v330("Please use Host::sendto()") void send_to(Host* dest, double byte_amount) diff --git a/src/kernel/activity/ActivityImpl.cpp b/src/kernel/activity/ActivityImpl.cpp index 06a44e25d2..cd4b5570d6 100644 --- a/src/kernel/activity/ActivityImpl.cpp +++ b/src/kernel/activity/ActivityImpl.cpp @@ -40,6 +40,47 @@ double ActivityImpl::get_remaining() const return surf_action_ ? surf_action_->get_remains() : 0; } +const char* ActivityImpl::get_state_str() +{ + switch (state_) { + case State::WAITING: + return "WAITING"; + + case State::READY: + return "READY"; + + case State::RUNNING: + return "RUNNING"; + + case State::CANCELED: + return "CANCELED"; + + case State::FAILED: + return "FAILED"; + + case State::DONE: + return "DONE"; + + case State::SRC_HOST_FAILURE: + return "SRC_HOST_FAILURE"; + + case State::DST_HOST_FAILURE: + return "DST_HOST_FAILURE"; + + case State::TIMEOUT: + return "TIMEOUT"; + + case State::SRC_TIMEOUT: + return "SRC_TIMEOUT"; + case State::DST_TIMEOUT: + return "DST_TIMEOUT"; + + case State::LINK_FAILURE: + return "LINK_FAILURE"; + } + THROW_IMPOSSIBLE; +} + bool ActivityImpl::test() { if (state_ != State::WAITING && state_ != State::RUNNING) { diff --git a/src/kernel/activity/ActivityImpl.hpp b/src/kernel/activity/ActivityImpl.hpp index 853d3dc967..dc4c84f10c 100644 --- a/src/kernel/activity/ActivityImpl.hpp +++ b/src/kernel/activity/ActivityImpl.hpp @@ -73,6 +73,7 @@ public: virtual void register_simcall(smx_simcall_t simcall); void clean_action(); virtual double get_remaining() const; + const char* get_state_str(); // Support for the boost::intrusive_ptr datatype friend XBT_PUBLIC void intrusive_ptr_add_ref(ActivityImpl* activity); friend XBT_PUBLIC void intrusive_ptr_release(ActivityImpl* activity); diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index 9f4b218e56..5836273e1d 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -408,6 +408,11 @@ CommImpl& CommImpl::detach() return *this; } +CommImpl::CommImpl(s4u::Host* from, s4u::Host* to, double bytes) : size_(bytes), detached_(true), from_(from), to_(to) +{ + state_ = State::READY; +} + CommImpl::~CommImpl() { XBT_DEBUG("Really free communication %p in state %d (detached = %d)", this, static_cast(state_), detached_); @@ -430,25 +435,26 @@ CommImpl* CommImpl::start() { /* If both the sender and the receiver are already there, start the communication */ if (state_ == State::READY) { - s4u::Host* sender = src_actor_->get_host(); - s4u::Host* receiver = dst_actor_->get_host(); + from_ = from_ != nullptr ? from_ : src_actor_->get_host(); + to_ = to_ != nullptr ? to_ : dst_actor_->get_host(); - surf_action_ = surf_network_model->communicate(sender, receiver, size_, rate_); + surf_action_ = surf_network_model->communicate(from_, to_, size_, rate_); surf_action_->set_activity(this); surf_action_->set_category(get_tracing_category()); state_ = State::RUNNING; - XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", this, sender->get_cname(), - receiver->get_cname(), surf_action_); + XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p; state: %s)", this, from_->get_cname(), + to_->get_cname(), surf_action_, get_state_str()); /* If a link is failed, detect it immediately */ if (surf_action_->get_state() == resource::Action::State::FAILED) { - XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(), - receiver->get_cname()); + XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", from_->get_cname(), + to_->get_cname()); state_ = State::LINK_FAILURE; post(); - } else if (src_actor_->is_suspended() || dst_actor_->is_suspended()) { + } else if ((src_actor_ != nullptr && src_actor_->is_suspended()) || + (dst_actor_ != nullptr && dst_actor_->is_suspended())) { /* If any of the process is suspended, create the synchro but stop its execution, it will be restarted when the sender process resume */ if (src_actor_->is_suspended()) @@ -561,7 +567,7 @@ void CommImpl::post() } else state_ = State::DONE; - XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", this, (int)state_, + XBT_DEBUG("CommImpl::post(): comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(), src_actor_.get(), dst_actor_.get(), detached_); /* destroy the surf actions associated with the Simix communication */ @@ -707,8 +713,10 @@ void CommImpl::finish() if (src_actor_) src_actor_->activities_.remove(this); } else { - dst_actor_->activities_.remove(this); - src_actor_->activities_.remove(this); + if (dst_actor_ != nullptr) + dst_actor_->activities_.remove(this); + if (src_actor_ != nullptr) + src_actor_->activities_.remove(this); } } } diff --git a/src/kernel/activity/CommImpl.hpp b/src/kernel/activity/CommImpl.hpp index ece1fdb6ef..c47f7489ff 100644 --- a/src/kernel/activity/CommImpl.hpp +++ b/src/kernel/activity/CommImpl.hpp @@ -26,6 +26,9 @@ class XBT_PUBLIC CommImpl : public ActivityImpl_T { MailboxImpl* mbox_ = nullptr; /* Rendez-vous where the comm is queued */ public: + CommImpl() = default; + CommImpl(s4u::Host* from, s4u::Host* to, double bytes); + enum class Type { SEND = 0, RECEIVE, READY, DONE }; CommImpl& set_type(CommImpl::Type type); @@ -68,6 +71,8 @@ expectations of the other side, too. See */ resource::Action* dst_timeout_ = nullptr; /* Surf's actions to instrument the timeouts */ actor::ActorImplPtr src_actor_ = nullptr; actor::ActorImplPtr dst_actor_ = nullptr; + s4u::Host* from_ = nullptr; /* Pre-determined only for direct host-to-host communications */ + s4u::Host* to_ = nullptr; /* Otherwise, computed at start() time from the actors */ /* Data to be transferred */ unsigned char* src_buff_ = nullptr; diff --git a/src/s4u/s4u_Activity.cpp b/src/s4u/s4u_Activity.cpp index 3e3b92f030..529d08f471 100644 --- a/src/s4u/s4u_Activity.cpp +++ b/src/s4u/s4u_Activity.cpp @@ -65,6 +65,27 @@ Activity* Activity::resume() return this; } +const char* Activity::get_state_str() +{ + switch (state_) { + case State::INITED: + return "INITED"; + + case State::STARTING: + return "STARTING"; + + case State::STARTED: + return "STARTED"; + + case State::CANCELED: + return "CANCELED"; + + case State::FINISHED: + return "FINISHED"; + } + THROW_IMPOSSIBLE; +} + double Activity::get_remaining() const { if (state_ == State::INITED || state_ == State::STARTING) diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index 0a2fd1d604..f9c3c37dea 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -115,12 +115,36 @@ CommPtr Comm::set_dst_data(void** buff, size_t size) return this; } +CommPtr Comm::sendto_init(Host* from, Host* to) +{ + CommPtr res(new Comm()); + res->from_ = from; + res->to_ = to; + + return res; +} +CommPtr Comm::sendto_async(Host* from, Host* to, double simulated_size_in_bytes) +{ + auto res = Comm::sendto_init(from, to); + res->set_remaining(simulated_size_in_bytes)->start(); + return res; +} + Comm* Comm::start() { xbt_assert(get_state() == State::INITED || get_state() == State::STARTING, "You cannot use %s() once your communication started (not implemented)", __FUNCTION__); - - if (src_buff_ != nullptr) { // Sender side + if (from_ != nullptr || to_ != nullptr) { + xbt_assert(from_ != nullptr && to_ != nullptr, "When either from_ or to_ is specified, both must be."); + xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr, + "Direct host-to-host communications cannot carry any data."); + pimpl_ = kernel::actor::simcall([this] { + auto res = new kernel::activity::CommImpl(this->from_, this->to_, this->get_remaining()); + res->start(); + return res; + }); + + } else if (src_buff_ != nullptr) { // Sender side on_start(*this, true /* is_sender*/); pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_, clean_fun_, copy_data_function_, get_user_data(), detached_); @@ -160,8 +184,10 @@ Comm* Comm::wait_for(double timeout) break; case State::INITED: - case State::STARTING: // It's not started yet. Do it in one simcall - if (src_buff_ != nullptr) { + case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication + if (from_ != nullptr || to_ != nullptr) { + return start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls + } else if (src_buff_ != nullptr) { on_start(*this, true /*is_sender*/); simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_, copy_data_function_, get_user_data(), timeout); diff --git a/src/s4u/s4u_Host.cpp b/src/s4u/s4u_Host.cpp index 02e6e33222..8a70f85374 100644 --- a/src/s4u/s4u_Host.cpp +++ b/src/s4u/s4u_Host.cpp @@ -6,6 +6,7 @@ #include "simgrid/host.h" #include "simgrid/kernel/routing/NetPoint.hpp" #include "simgrid/s4u/Actor.hpp" +#include "simgrid/s4u/Comm.hpp" #include "simgrid/s4u/Engine.hpp" #include "simgrid/s4u/Exec.hpp" #include "simgrid/s4u/VirtualMachine.hpp" @@ -180,12 +181,9 @@ void Host::sendto(Host* dest, double byte_amount) sendto_async(dest, byte_amount)->wait(); } -ActivityPtr Host::sendto_async(Host* dest, double byte_amount) +CommPtr Host::sendto_async(Host* dest, double byte_amount) { - std::vector m_host_list = {this, dest}; - std::vector flops_amount = {0, 0}; - std::vector bytes_amount = {0, byte_amount, 0, 0}; - return this_actor::exec_init(m_host_list, flops_amount, bytes_amount)->start(); + return Comm::sendto_async(this, dest, byte_amount); } /** Get the properties assigned to a host */ -- 2.20.1