- :cpp:func:`Comm::on_send <simgrid::s4u::Comm::on_send_cb>`
:cpp:func:`Comm::on_recv <simgrid::s4u::Comm::on_recv_cb>`
+ - :cpp:func:`Comm::on_start <simgrid::s4u::Comm::on_start_cb>`
:cpp:func:`Comm::on_completion <simgrid::s4u::Comm::on_completion_cb>`
:cpp:func:`Comm::on_suspend <simgrid::s4u::Comm::on_suspend_cb>`
:cpp:func:`Comm::on_resume <simgrid::s4u::Comm::on_resume_cb>`
:cpp:func:`Comm::on_veto <simgrid::s4u::Comm::on_veto_cb>`
- - :cpp:func:`CommImpl::on_start <simgrid::s4u::CommImpl::on_start_cb>`
- :cpp:func:`CommImpl::on_completion <simgrid::s4u::CommImpl::on_completion_cb>`
- :cpp:func:`Exec::on_start <simgrid::s4u::Exec::on_start_cb>`
- :cpp:func:`Exec::on_completion <simgrid::s4u::Exec::on_completion_cb>`
:cpp:func:`Exec::on_completion <simgrid::s4u::Exec::on_completion_cb>`
:cpp:func:`Exec::on_suspend <simgrid::s4u::Exec::on_suspend_cb>`
:cpp:func:`Exec::on_resume <simgrid::s4u::Exec::on_resume_cb>`
:cpp:func:`Exec::on_veto <simgrid::s4u::Exec::on_veto_cb>`
- :cpp:func:`Io::on_start <simgrid::s4u::Io::on_start_cb>`
- :cpp:func:`Io::on_completion <simgrid::s4u::Io::on_completion_cb>`
:cpp:func:`Io::on_completion <simgrid::s4u::Io::on_completion_cb>`
:cpp:func:`Io::on_suspend <simgrid::s4u::Io::on_suspend_cb>`
:cpp:func:`Io::on_resume <simgrid::s4u::Io::on_resume_cb>`
> [ 1.000000] (0:maestro@) Exec 'parent' is complete (start time: 0.000000, finish time: 1.000000)
> [ 1.000000] (0:maestro@) Remove a dependency from 'parent' on 'transfer'
> [ 1.000000] (0:maestro@) 'transfer' is assigned to a resource and all dependencies are solved. Let's start
-> [ 2.083775] (0:maestro@) Comm 'transfer' is complete
> [ 2.083775] (0:maestro@) Remove a dependency from 'transfer' on 'child'
> [ 2.083775] (0:maestro@) 'child' is assigned to a resource and all dependencies are solved. Let's start
+> [ 2.083775] (0:maestro@) Comm 'transfer' is complete
> [ 3.083775] (0:maestro@) Exec 'child' is complete (start time: 2.083775, finish time: 3.083775)
> [ 3.083775] (0:maestro@) Simulation time 3.08378
> 13 2.000000 11 32
> 12 2.000000 11 32 14
> 13 2.025708 11 33
-> 12 2.025708 11 33 15
> 13 2.025708 11 32
+> 12 2.025708 11 33 15
> 15 2.025708 17 0 M 32 0
> 7 2.025708 10 32
> 6 2.025708 34 10 1 "emigrant-1"
> 13 4.025708 11 34
> 12 4.025708 11 34 14
> 13 4.025903 11 33
-> 12 4.025903 11 33 15
> 13 4.025903 11 34
+> 12 4.025903 11 33 15
> 15 4.025903 17 0 M 34 1
> 7 4.025903 10 34
> 6 4.025903 35 10 2 "emigrant-1"
> 13 6.025903 11 35
> 12 6.025903 11 35 14
> 13 6.044918 11 33
-> 12 6.044918 11 33 15
> 13 6.044918 11 35
+> 12 6.044918 11 33 15
> 15 6.044918 17 0 M 35 2
> 7 6.044918 10 35
> 6 6.044918 36 10 3 "emigrant-1"
> 13 8.044918 11 36
> 12 8.044918 11 36 14
> 13 8.070626 11 33
-> 12 8.070626 11 33 15
> 13 8.070626 11 36
+> 12 8.070626 11 33 15
> 15 8.070626 17 0 M 36 3
> 7 8.070626 10 36
> 6 8.070626 37 10 4 "emigrant-1"
> 13 10.070626 11 37
> 12 10.070626 11 37 14
> 13 10.087178 11 33
-> 12 10.087178 11 33 15
> 13 10.087178 11 37
+> 12 10.087178 11 33 15
> 15 10.087178 17 0 M 37 4
> 7 10.087178 10 37
> 6 10.087178 38 10 5 "emigrant-1"
> 13 12.087178 11 38
> 12 12.087178 11 38 14
> 13 12.112617 11 33
-> 12 12.112617 11 33 15
> 13 12.112617 11 38
+> 12 12.112617 11 33 15
> 15 12.112617 17 0 M 38 5
> 7 12.112617 10 38
> 6 12.112617 39 10 3 "emigrant-1"
> 13 14.112617 11 39
> 12 14.112617 11 39 14
> 13 14.138325 11 33
-> 12 14.138325 11 33 15
> 13 14.138325 11 39
+> 12 14.138325 11 33 15
> 15 14.138325 17 0 M 39 6
> 7 14.138325 10 39
> 6 14.138325 40 10 1 "emigrant-1"
> 13 16.138325 11 40
> 12 16.138325 11 40 14
> 13 16.138521 11 33
-> 12 16.138521 11 33 15
> 13 16.138521 11 40
+> 12 16.138521 11 33 15
> 15 16.138521 17 0 M 40 7
> 7 16.138521 10 40
> 6 16.138521 41 10 4 "emigrant-1"
*/
class XBT_PUBLIC Comm : public Activity_T<Comm> {
friend Mailbox; // Factory of comms
+ friend kernel::activity::CommImpl;
/* specified for normal mailbox-based communications*/
Mailbox* mailbox_ = nullptr;
kernel::actor::ActorImpl* sender_ = nullptr;
Comm() = default;
Comm* do_start() override;
+protected:
static xbt::signal<void(Comm const&)> on_send;
static xbt::signal<void(Comm const&)> on_recv;
static xbt::signal<void(Comm const&)> on_start;
-protected:
- void fire_on_completion() const override { on_completion(*this); }
+ void fire_on_completion() const override { /* Do nothing */ }
void fire_on_veto() const override { on_veto(const_cast<Comm&>(*this)); }
void fire_on_suspend() const override { on_suspend(*this); }
void fire_on_resume() const override { on_resume(*this); }
bool is_assigned() const override;
Actor* get_sender() const;
+ Actor* get_receiver() const;
/* Comm life cycle */
/** Start the comm, and ignore its result. It can be completely forgotten after that. */
});
s4u::Comm::on_completion_cb([](const s4u::Comm& c) {
- std::string pid = instr_pid(*s4u::Actor::self());
- if (pid == "-0") { //Comm is launched directly by Maestro, use the host as container
- Container::by_name(c.get_source()->get_name())->get_state("HOST_STATE")->pop_event();
- Container::by_name(c.get_destination()->get_name())->get_state("HOST_STATE")->pop_event();
- } else
- Container::by_name(pid)->get_state("ACTOR_STATE")->pop_event();
+ if (c.get_sender()) {
+ Container::by_name(instr_pid(*c.get_sender()))->get_state("ACTOR_STATE")->pop_event();
+ Container::by_name(instr_pid(*c.get_receiver()))->get_state("ACTOR_STATE")->pop_event();
+ } else {
+ Container::by_name(c.get_source()->get_name())->get_state("HOST_STATE")->pop_event();
+ Container::by_name(c.get_destination()->get_name())->get_state("HOST_STATE")->pop_event();
+ }
});
s4u::Comm::on_send_cb([](s4u::Comm const& c) {
std::string pid = instr_pid(*s4u::Actor::self());
model_action_->set_category(get_tracing_category());
set_start_time(model_action_->get_start_time());
set_state(State::RUNNING);
- on_start(*this);
XBT_DEBUG("Starting communication %p from '%s' to '%s' (model action: %p; state: %s)", this, from_->get_cname(),
to_->get_cname(), model_action_, get_state_str());
XBT_DEBUG("CommImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
src_actor_.get(), dst_actor_.get(), detached_);
- on_completion(*this);
+ s4u::Comm::on_completion(static_cast<const s4u::Comm&>(*this->get_iface()));
/* Update synchro state */
if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Host.hpp>
#include "src/kernel/activity/CommImpl.hpp"
/************
* Resource *
************/
+static void update_bw_comm_start(const s4u::Comm& comm)
+{
+ auto* pimpl = static_cast<activity::CommImpl*>(comm.get_impl());
+
+ auto const* actionWifi = dynamic_cast<const kernel::resource::WifiLinkAction*>(pimpl->model_action_);
+ if (actionWifi == nullptr)
+ return;
+
+ if (auto* link_src = actionWifi->get_src_link()) {
+ link_src->inc_active_flux();
+ }
+ if (auto* link_dst = actionWifi->get_dst_link()) {
+ link_dst->inc_active_flux();
+ }
+}
WifiLinkImpl::WifiLinkImpl(const std::string& name, const std::vector<double>& bandwidths, lmm::System* system)
: StandardLinkImpl(name)
this->set_constraint(system->constraint_new(this, 1));
for (auto bandwidth : bandwidths)
bandwidths_.push_back({bandwidth, 1.0, nullptr});
- kernel::activity::CommImpl::on_start.connect(&update_bw_comm_start);
+ s4u::Comm::on_start_cb(&update_bw_comm_start);
s4u::Link::on_communication_state_change_cb(&update_bw_comm_end);
}
nb_active_flux_--;
}
-void WifiLinkImpl::update_bw_comm_start(const kernel::activity::CommImpl& comm)
-{
- auto const* actionWifi = dynamic_cast<const simgrid::kernel::resource::WifiLinkAction*>(comm.model_action_);
- if (actionWifi == nullptr)
- return;
-
- if (auto* link_src = actionWifi->get_src_link()) {
- link_src->inc_active_flux();
- }
- if (auto* link_dst = actionWifi->get_dst_link()) {
- link_dst->inc_active_flux();
- }
-}
-
void WifiLinkImpl::update_bw_comm_end(const simgrid::kernel::resource::NetworkAction& action,
simgrid::kernel::resource::Action::State /*state*/)
{
void set_latency(double) override;
bool toggle_callback();
- static void update_bw_comm_start(const kernel::activity::CommImpl&);
- static void update_bw_comm_end(const simgrid::kernel::resource::NetworkAction& action,
- simgrid::kernel::resource::Action::State state);
+ static void update_bw_comm_end(const NetworkAction& action, Action::State state);
void inc_active_flux();
void dec_active_flux();
static double wifi_link_dynamic_sharing(const WifiLinkImpl& link, double capacity, int n);
#include "simgrid/Exception.hpp"
#include "simgrid/host.h"
#include "simgrid/plugins/energy.h"
+#include "simgrid/s4u/Comm.hpp"
#include "simgrid/s4u/Engine.hpp"
#include "simgrid/s4u/Link.hpp"
#include "src/kernel/activity/CommImpl.hpp"
XBT_INFO("Total energy over all links: %f", total_energy);
}
-static void on_communication(const simgrid::kernel::activity::CommImpl& comm)
+static void on_communication(const simgrid::s4u::Comm& comm)
{
- for (auto const* link : comm.get_traversed_links()) {
+ auto* pimpl = static_cast<simgrid::kernel::activity::CommImpl*>(comm.get_impl());
+ for (auto const* link : pimpl->get_traversed_links()) {
if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) {
XBT_DEBUG("Update %s on Comm Start/End", link->get_cname());
link->extension<LinkEnergy>()->update();
}
}
}
+
/* **************************** Public interface *************************** */
int sg_link_energy_is_inited()
link.extension<LinkEnergy>()->get_consumed_energy());
});
- simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication);
- simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication);
+ simgrid::s4u::Comm::on_start_cb(&on_communication);
+ simgrid::s4u::Comm::on_completion_cb(&on_communication);
simgrid::s4u::Engine::on_simulation_end_cb(&on_simulation_end);
}
* under the terms of the license (GNU LGPL) which comes with this package. */
#include <simgrid/plugins/energy.h>
+#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Engine.hpp>
#include <simgrid/s4u/Link.hpp>
using simgrid::plugin::LinkEnergyWifi;
/* **************************** events callback *************************** */
-static void on_communication(const simgrid::kernel::activity::CommImpl& comm)
+static void on_communication(const simgrid::s4u::Comm& comm)
{
- for (const auto* link : comm.get_traversed_links()) {
+ auto* pimpl = static_cast<simgrid::kernel::activity::CommImpl*>(comm.get_impl());
+ for (auto const* link : pimpl->get_traversed_links()) {
if (link != nullptr && link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI) {
auto* link_energy = link->extension<LinkEnergyWifi>();
XBT_DEBUG("Update %s on Comm Start/End", link->get_cname());
}
});
- simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication);
- simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication);
+ simgrid::s4u::Comm::on_start_cb(&on_communication);
+ simgrid::s4u::Comm::on_completion_cb(&on_communication);
}
* under the terms of the license (GNU LGPL) which comes with this package. */
#include <simgrid/plugins/load.h>
+#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Engine.hpp>
#include "src/kernel/activity/CommImpl.hpp"
using simgrid::plugin::LinkLoad;
/* **************************** events callback *************************** */
-static void on_communication(const simgrid::kernel::activity::CommImpl& comm)
+static void on_communication(const simgrid::s4u::Comm& comm)
{
- for (const auto* link : comm.get_traversed_links()) {
+ auto* pimpl = static_cast<simgrid::kernel::activity::CommImpl*>(comm.get_impl());
+ for (auto const* link : pimpl->get_traversed_links()) {
if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) {
auto* link_load = link->extension<LinkLoad>();
XBT_DEBUG("Update %s on Comm Start/End", link->get_cname());
});
// Call this plugin on some of the links' events.
- simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication);
- simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication);
+ simgrid::s4u::Comm::on_start_cb(&on_communication);
+ simgrid::s4u::Comm::on_completion_cb(&on_communication);
simgrid::s4u::Link::on_onoff_cb([](simgrid::s4u::Link const& link) {
if (link.get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) {
namespace simgrid::s4u {
xbt::signal<void(Comm const&)> Comm::on_send;
xbt::signal<void(Comm const&)> Comm::on_recv;
+xbt::signal<void(Comm const&)> Comm::on_start;
CommPtr Comm::set_copy_data_callback(const std::function<void(kernel::activity::CommImpl*, void*, size_t)>& callback)
{
return sender ? sender->get_ciface() : nullptr;
}
+Actor* Comm::get_receiver() const
+{
+ kernel::actor::ActorImplPtr receiver = nullptr;
+ if (pimpl_)
+ receiver = boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->dst_actor_;
+ return receiver ? receiver->get_ciface() : nullptr;
+}
+
bool Comm::is_assigned() const
{
return (pimpl_ && boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->is_assigned()) ||
if (not detached_) {
pimpl_->set_iface(this);
pimpl_->set_actor(sender_);
+ // Only throw the signal when both sides are here and the status is READY
+ if (pimpl_->get_state() != kernel::activity::State::WAITING)
+ on_start(*this);
}
state_ = State::STARTED;