Along with a new example.
or Mailbox::get_async<void>() if you really want to play with void*.
- Unify the interface of Activity::on_{start/activity}
- New function: Comm::get_dst_data()
+ - New functions: Comm::sendto_{init,async} to initiate a communication
+ on between two (possibly remote) hosts.
----------------------------------------------------------------------------
include examples/s4u/cloud-simple/s4u-cloud-simple.tesh
include examples/s4u/comm-dependent/s4u-comm-dependent.cpp
include examples/s4u/comm-dependent/s4u-comm-dependent.tesh
+include examples/s4u/comm-host2host/s4u-comm-host2host.cpp
+include examples/s4u/comm-host2host/s4u-comm-host2host.tesh
include examples/s4u/comm-ready/s4u-comm-ready.cpp
include examples/s4u/comm-ready/s4u-comm-ready.tesh
include examples/s4u/comm-ready/s4u-comm-ready_d.xml
include src/mc/inspect/mc_unw_vmread.cpp
include src/mc/mc_api.cpp
include src/mc/mc_api.hpp
-include src/mc/udpor_global.hpp
include src/mc/mc_base.cpp
include src/mc/mc_base.h
include src/mc/mc_client_api.cpp
include src/mc/sosp/Snapshot.cpp
include src/mc/sosp/Snapshot.hpp
include src/mc/sosp/Snapshot_test.cpp
+include src/mc/udpor_global.hpp
include src/msg/msg_comm.cpp
include src/msg/msg_global.cpp
include src/msg/msg_legacy.cpp
actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize
app-bittorrent app-chainsend app-pingpong app-token-ring
comm-ready comm-suspend comm-wait comm-waitany comm-waitall comm-waituntil
- comm-dependent
+ comm-dependent comm-host2host
cloud-capping cloud-migration cloud-simple
dht-chord dht-kademlia
energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi
exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany exec-waitfor exec-dependent
maestro-set
mc-bugged1 mc-bugged2 mc-electric-fence mc-failing-assert
- network-wifi
+ network-wifi
io-async io-file-system io-file-remote io-disk-raw io-dependent
platform-failures platform-profile platform-properties
plugin-host-load plugin-link-load
--- /dev/null
+/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This simple example demonstrates the Comm::sento_init() Comm::sento_async() functions,
+ that can be used to create a direct communication from one host to another without
+ relying on the mailbox mechanism.
+
+ There is not much to say, actually: The _init variant creates the communication and
+ leaves it unstarted (in case you want to modify this communication before it starts),
+ while the _async variant creates and start it. In both cases, you need to wait() it.
+
+ It is mostly useful when you want to have a centralized simulation of your settings,
+ with a central actor declaring all communications occuring on your distributed system.
+ */
+
+#include <simgrid/s4u.hpp>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_host2host, "Messages specific for this s4u example");
+
+static void sender(sg4::Host* h1, sg4::Host* h2, sg4::Host* h3, sg4::Host* h4)
+{
+ XBT_INFO("Send c12 with sendto_async(%s -> %s), and c34 with sendto_init(%s -> %s)", h1->get_cname(), h2->get_cname(),
+ h3->get_cname(), h4->get_cname());
+
+ auto c12 = sg4::Comm::sendto_async(h1, h2, 1.5e7); // Creates and start a direct communication
+
+ auto c34 = sg4::Comm::sendto_init(h3, h4); // Creates but do not start another direct communication
+ c34->set_remaining(1e7); // Specify the amount of bytes to exchange in this comm
+
+ XBT_INFO("After creation, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+ c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+ sg4::this_actor::sleep_for(1);
+ XBT_INFO("One sec later, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+ c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+ c34->start();
+ XBT_INFO("After c34->start,c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+ c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+ c12->wait();
+ XBT_INFO("After c12->wait, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+ c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+ c34->wait();
+ XBT_INFO("After c34->wait, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
+ c12->get_state_str(), c12->get_remaining(), c34->get_state_str(), c34->get_remaining());
+
+ /* As usual, you don't have to explicitly start communications that were just init()ed. The wait() will start it
+ * automatically. */
+ auto c14 = sg4::Comm::sendto_init(h1, h4);
+ c14->set_remaining(100)->wait(); // Chaining 2 operations on this new communication
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+
+ sg4::Actor::create("sender", sg4::Host::by_name("Tremblay"), sender, sg4::Host::by_name("Tremblay"),
+ sg4::Host::by_name("Jupiter"), sg4::Host::by_name("Fafard"), sg4::Host::by_name("Ginette"));
+
+ e.run();
+
+ XBT_INFO("Total simulation time: %.3f", e.get_clock());
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-comm-host2host ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
+> [ 0.000000] (1:pinger@Tremblay) Ping from mailbox Mailbox 1 to mailbox Mailbox 2
+> [ 0.000000] (2:ponger@Jupiter) Pong from mailbox Mailbox 2 to mailbox Mailbox 1
+> [ 0.019014] (2:ponger@Jupiter) Payload received : small communication (latency bound)
+> [ 0.019014] (2:ponger@Jupiter) Ping time (latency bound) 0.019014
+> [ 0.019014] (2:ponger@Jupiter) payload = 0.019
+> [150.178356] (1:pinger@Tremblay) Payload received : large communication (bandwidth bound)
+> [150.178356] (1:pinger@Tremblay) Pong time (bandwidth bound): 150.159
+> [150.178356] (0:maestro@) Total simulation time: 150.178
+
virtual Activity* cancel() = 0;
/** Retrieve the current state of the activity */
Activity::State get_state() const { return state_; }
+ /** Return a string representation of the activity's state (one of INITED, STARTING, STARTED, CANCELED, FINISHED) */
+ const char* get_state_str();
void set_state(Activity::State state) { state_ = state; }
/** Tests whether the given activity is terminated yet. */
virtual bool test();
*/
class XBT_PUBLIC Comm : public Activity_T<Comm> {
Mailbox* mailbox_ = nullptr;
- kernel::actor::ActorImpl* sender_ = nullptr;
+ kernel::actor::ActorImpl* sender_ = nullptr; /* specified for normal mailbox-based communications*/
kernel::actor::ActorImpl* receiver_ = nullptr;
+ Host* from_ = nullptr; /* specified only for direct host-to-host communications */
+ Host* to_ = nullptr;
double rate_ = -1;
void* dst_buff_ = nullptr;
size_t dst_buff_size_ = 0;
~Comm() override;
+ /*! Creates a communication beween the two given hosts, bypassing the mailbox mechanism. */
+ static CommPtr sendto_init(Host* from, Host* to);
+ /*! Creates and start a communication of the given amount of bytes beween the two given hosts, bypassing the mailbox
+ * mechanism */
+ static CommPtr sendto_async(Host* from, Host* to, double simulated_size_in_bytes);
+
static xbt::signal<void(Comm const&, bool is_sender)> on_start;
static xbt::signal<void(Comm const&)> on_completion;
* There is really no limit on the hosts involved. In particular, the actor does not have to be on one of the involved
* hosts.
*/
- ActivityPtr sendto_async(Host* dest, double byte_amount);
+ CommPtr sendto_async(Host* dest, double byte_amount);
#ifndef DOXYGEN
XBT_ATTRIB_DEPRECATED_v330("Please use Host::sendto()") void send_to(Host* dest, double byte_amount)
return surf_action_ ? surf_action_->get_remains() : 0;
}
+const char* ActivityImpl::get_state_str()
+{
+ switch (state_) {
+ case State::WAITING:
+ return "WAITING";
+
+ case State::READY:
+ return "READY";
+
+ case State::RUNNING:
+ return "RUNNING";
+
+ case State::CANCELED:
+ return "CANCELED";
+
+ case State::FAILED:
+ return "FAILED";
+
+ case State::DONE:
+ return "DONE";
+
+ case State::SRC_HOST_FAILURE:
+ return "SRC_HOST_FAILURE";
+
+ case State::DST_HOST_FAILURE:
+ return "DST_HOST_FAILURE";
+
+ case State::TIMEOUT:
+ return "TIMEOUT";
+
+ case State::SRC_TIMEOUT:
+ return "SRC_TIMEOUT";
+ case State::DST_TIMEOUT:
+ return "DST_TIMEOUT";
+
+ case State::LINK_FAILURE:
+ return "LINK_FAILURE";
+ }
+ THROW_IMPOSSIBLE;
+}
+
bool ActivityImpl::test()
{
if (state_ != State::WAITING && state_ != State::RUNNING) {
virtual void register_simcall(smx_simcall_t simcall);
void clean_action();
virtual double get_remaining() const;
+ const char* get_state_str();
// Support for the boost::intrusive_ptr<ActivityImpl> datatype
friend XBT_PUBLIC void intrusive_ptr_add_ref(ActivityImpl* activity);
friend XBT_PUBLIC void intrusive_ptr_release(ActivityImpl* activity);
return *this;
}
+CommImpl::CommImpl(s4u::Host* from, s4u::Host* to, double bytes) : size_(bytes), detached_(true), from_(from), to_(to)
+{
+ state_ = State::READY;
+}
+
CommImpl::~CommImpl()
{
XBT_DEBUG("Really free communication %p in state %d (detached = %d)", this, static_cast<int>(state_), detached_);
{
/* If both the sender and the receiver are already there, start the communication */
if (state_ == State::READY) {
- s4u::Host* sender = src_actor_->get_host();
- s4u::Host* receiver = dst_actor_->get_host();
+ from_ = from_ != nullptr ? from_ : src_actor_->get_host();
+ to_ = to_ != nullptr ? to_ : dst_actor_->get_host();
- surf_action_ = surf_network_model->communicate(sender, receiver, size_, rate_);
+ surf_action_ = surf_network_model->communicate(from_, to_, size_, rate_);
surf_action_->set_activity(this);
surf_action_->set_category(get_tracing_category());
state_ = State::RUNNING;
- XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", this, sender->get_cname(),
- receiver->get_cname(), surf_action_);
+ XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p; state: %s)", this, from_->get_cname(),
+ to_->get_cname(), surf_action_, get_state_str());
/* If a link is failed, detect it immediately */
if (surf_action_->get_state() == resource::Action::State::FAILED) {
- XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(),
- receiver->get_cname());
+ XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", from_->get_cname(),
+ to_->get_cname());
state_ = State::LINK_FAILURE;
post();
- } else if (src_actor_->is_suspended() || dst_actor_->is_suspended()) {
+ } else if ((src_actor_ != nullptr && src_actor_->is_suspended()) ||
+ (dst_actor_ != nullptr && dst_actor_->is_suspended())) {
/* If any of the process is suspended, create the synchro but stop its execution,
it will be restarted when the sender process resume */
if (src_actor_->is_suspended())
} else
state_ = State::DONE;
- XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", this, (int)state_,
+ XBT_DEBUG("CommImpl::post(): comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
src_actor_.get(), dst_actor_.get(), detached_);
/* destroy the surf actions associated with the Simix communication */
if (src_actor_)
src_actor_->activities_.remove(this);
} else {
- dst_actor_->activities_.remove(this);
- src_actor_->activities_.remove(this);
+ if (dst_actor_ != nullptr)
+ dst_actor_->activities_.remove(this);
+ if (src_actor_ != nullptr)
+ src_actor_->activities_.remove(this);
}
}
}
MailboxImpl* mbox_ = nullptr; /* Rendez-vous where the comm is queued */
public:
+ CommImpl() = default;
+ CommImpl(s4u::Host* from, s4u::Host* to, double bytes);
+
enum class Type { SEND = 0, RECEIVE, READY, DONE };
CommImpl& set_type(CommImpl::Type type);
resource::Action* dst_timeout_ = nullptr; /* Surf's actions to instrument the timeouts */
actor::ActorImplPtr src_actor_ = nullptr;
actor::ActorImplPtr dst_actor_ = nullptr;
+ s4u::Host* from_ = nullptr; /* Pre-determined only for direct host-to-host communications */
+ s4u::Host* to_ = nullptr; /* Otherwise, computed at start() time from the actors */
/* Data to be transferred */
unsigned char* src_buff_ = nullptr;
return this;
}
+const char* Activity::get_state_str()
+{
+ switch (state_) {
+ case State::INITED:
+ return "INITED";
+
+ case State::STARTING:
+ return "STARTING";
+
+ case State::STARTED:
+ return "STARTED";
+
+ case State::CANCELED:
+ return "CANCELED";
+
+ case State::FINISHED:
+ return "FINISHED";
+ }
+ THROW_IMPOSSIBLE;
+}
+
double Activity::get_remaining() const
{
if (state_ == State::INITED || state_ == State::STARTING)
return this;
}
+CommPtr Comm::sendto_init(Host* from, Host* to)
+{
+ CommPtr res(new Comm());
+ res->from_ = from;
+ res->to_ = to;
+
+ return res;
+}
+CommPtr Comm::sendto_async(Host* from, Host* to, double simulated_size_in_bytes)
+{
+ auto res = Comm::sendto_init(from, to);
+ res->set_remaining(simulated_size_in_bytes)->start();
+ return res;
+}
+
Comm* Comm::start()
{
xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
"You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
-
- if (src_buff_ != nullptr) { // Sender side
+ if (from_ != nullptr || to_ != nullptr) {
+ xbt_assert(from_ != nullptr && to_ != nullptr, "When either from_ or to_ is specified, both must be.");
+ xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
+ "Direct host-to-host communications cannot carry any data.");
+ pimpl_ = kernel::actor::simcall([this] {
+ auto res = new kernel::activity::CommImpl(this->from_, this->to_, this->get_remaining());
+ res->start();
+ return res;
+ });
+
+ } else if (src_buff_ != nullptr) { // Sender side
on_start(*this, true /* is_sender*/);
pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
clean_fun_, copy_data_function_, get_user_data(), detached_);
break;
case State::INITED:
- case State::STARTING: // It's not started yet. Do it in one simcall
- if (src_buff_ != nullptr) {
+ case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
+ if (from_ != nullptr || to_ != nullptr) {
+ return start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls
+ } else if (src_buff_ != nullptr) {
on_start(*this, true /*is_sender*/);
simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
copy_data_function_, get_user_data(), timeout);
#include "simgrid/host.h"
#include "simgrid/kernel/routing/NetPoint.hpp"
#include "simgrid/s4u/Actor.hpp"
+#include "simgrid/s4u/Comm.hpp"
#include "simgrid/s4u/Engine.hpp"
#include "simgrid/s4u/Exec.hpp"
#include "simgrid/s4u/VirtualMachine.hpp"
sendto_async(dest, byte_amount)->wait();
}
-ActivityPtr Host::sendto_async(Host* dest, double byte_amount)
+CommPtr Host::sendto_async(Host* dest, double byte_amount)
{
- std::vector<Host*> m_host_list = {this, dest};
- std::vector<double> flops_amount = {0, 0};
- std::vector<double> bytes_amount = {0, byte_amount, 0, 0};
- return this_actor::exec_init(m_host_list, flops_amount, bytes_amount)->start();
+ return Comm::sendto_async(this, dest, byte_amount);
}
/** Get the properties assigned to a host */