.. autodoxymethod:: simgrid::s4u::Activity::wait_until(double time_limit)
.. autodoxymethod:: simgrid::s4u::Activity::vetoable_start()
+Suspending and resuming an activity
+-----------------------------------
+
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. autodoxymethod:: simgrid::s4u::Activity::suspend
+ .. autodoxymethod:: simgrid::s4u::Activity::resume
+ .. autodoxymethod:: simgrid::s4u::Activity::is_suspended
+
.. _API_s4u_Comm:
=============
See also :cpp:func:`sg_mailbox_put_async()` and :cpp:func:`sg_comm__wait()`.
+ - **Suspending communications:**
+ The ``suspend()`` and ``resume()`` functions allow to block the
+ progression of a given communication for a while and then unblock it.
+ ``is_suspended()`` can be used to retrieve whether the activity is
+ currently blocked or not.
+
+ .. tabs::
+
+ .. example-tab:: examples/s4u/comm-suspend/s4u-comm-suspend.cpp
+
+ See also :cpp:func:`simgrid::s4u::Activity::suspend()`
+ :cpp:func:`simgrid::s4u::Activity::resume()` and
+ :cpp:func:`simgrid::s4u::Activity::is_suspended()`.
+
+
- **Waiting for all communications in a set:**
The ``wait_all()`` function is useful when you want to block until
all activities in a given set have completed.
foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize
app-bittorrent app-chainsend app-pingpong app-token-ring
- comm-ready comm-wait comm-waitany comm-waitall comm-waituntil
+ comm-ready comm-suspend comm-wait comm-waitany comm-waitall comm-waituntil
comm-dependent
cloud-capping cloud-migration cloud-simple
dht-chord dht-kademlia
--- /dev/null
+/* Copyright (c) 2010-2020. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example shows how to suspend and resume an asynchronous communication. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_wait, "Messages specific for this s4u example");
+
+static void sender(int argc, char**)
+{
+ xbt_assert(argc == 1, "Expecting no parameter from the XML deployment file but got %d", argc - 1);
+
+ simgrid::s4u::Mailbox* mbox = simgrid::s4u::Mailbox::by_name("receiver");
+
+ // Copy the data we send: the 'msg_content' variable is not a stable storage location.
+ // It will be destroyed when this actor leaves the loop, ie before the receiver gets the data
+ std::string* payload = new std::string("Sent message");
+
+ /* Create a communication representing the ongoing communication and then */
+ simgrid::s4u::CommPtr comm = mbox->put_init(payload, 13194230);
+ XBT_INFO("Suspend the communication before it starts (remaining: %.0f bytes) and wait a second.",
+ comm->get_remaining());
+ simgrid::s4u::this_actor::sleep_for(1);
+ XBT_INFO("Now, start the communication (remaining: %.0f bytes) and wait another second.", comm->get_remaining());
+ comm->start();
+ simgrid::s4u::this_actor::sleep_for(1);
+
+ XBT_INFO("There is still %.0f bytes to transfer in this communication. Suspend it for one second.",
+ comm->get_remaining());
+ comm->suspend();
+ XBT_INFO("Now there is %.0f bytes to transfer. Resume it and wait for its completion.", comm->get_remaining());
+ comm->resume();
+ comm->wait();
+ XBT_INFO("There is %f bytes to transfer after the communication completion.", comm->get_remaining());
+ XBT_INFO("Suspending a completed activity is a no-op.");
+ comm->suspend();
+}
+
+static void receiver(int, char**)
+{
+ simgrid::s4u::Mailbox* mbox = simgrid::s4u::Mailbox::by_name("receiver");
+
+ XBT_INFO("Wait for the message.");
+ void* payload = mbox->get();
+
+ const std::string* received = static_cast<std::string*>(payload);
+ XBT_INFO("I got '%s'.", received->c_str());
+
+ delete received;
+}
+
+int main(int argc, char* argv[])
+{
+ xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n", argv[0]);
+
+ simgrid::s4u::Engine e(&argc, argv);
+ e.register_function("sender", &sender);
+ e.register_function("receiver", &receiver);
+
+ e.load_platform(argv[1]);
+ e.load_deployment(argv[2]);
+ e.run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-comm-suspend ${platfdir}/small_platform.xml s4u-comm-suspend_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
+> [ 0.000000] (1:sender@Tremblay) Suspend the communication before it starts (remaining: 13194230 bytes) and wait a second.
+> [ 0.000000] (2:receiver@Jupiter) Wait for the message.
+> [ 1.000000] (1:sender@Tremblay) Now, start the communication (remaining: 13194230 bytes) and wait another second.
+> [ 2.000000] (1:sender@Tremblay) There is still 6660438 bytes to transfer in this communication. Suspend it for one second.
+> [ 2.000000] (1:sender@Tremblay) Now there is 6660438 bytes to transfer. Resume it and wait for its completion.
+> [ 3.000000] (2:receiver@Jupiter) I got 'Sent message'.
+> [ 3.000000] (1:sender@Tremblay) There is 0.000000 bytes to transfer after the communication completion.
+> [ 3.000000] (1:sender@Tremblay) Suspending a completed activity is a no-op.
--- /dev/null
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "https://simgrid.org/simgrid.dtd">
+<platform version="4.1">
+ <actor host="Tremblay" function="sender" />
+ <actor host="Jupiter" function="receiver" />
+</platform>
* This function is optional: you can call wait() even if you didn't call start()
*/
virtual Activity* start() = 0;
- /** Blocks until the activity is terminated */
+ /** Blocks the current actor until the activity is terminated */
virtual Activity* wait() = 0;
- /** Blocks until the activity is terminated, or until the timeout is elapsed
+ /** Blocks the current actor until the activity is terminated, or until the timeout is elapsed\n
* Raises: timeout exception.*/
virtual Activity* wait_for(double timeout) = 0;
- /** Blocks until the activity is terminated, or until the time limit is reached
+ /** Blocks the current actor until the activity is terminated, or until the time limit is reached\n
* Raises: timeout exception. */
void wait_until(double time_limit);
/** Tests whether the given activity is terminated yet. */
virtual bool test();
+ /** Blocks the progression of this activity until it gets resumed */
+ virtual Activity* suspend();
+ /** Unblock the progression of this activity if it was suspended previously */
+ virtual Activity* resume();
+ /** Whether or not the progression of this activity is blocked */
+ bool is_suspended() { return suspended_; }
+
virtual const char* get_cname() const = 0;
virtual const std::string& get_name() const = 0;
kernel::activity::ActivityImplPtr pimpl_ = nullptr;
Activity::State state_ = Activity::State::INITED;
double remains_ = 0;
+ bool suspended_ = false;
std::vector<ActivityPtr> successors_;
std::set<ActivityPtr> dependencies_;
std::atomic_int_fast32_t refcount_{0};
return false;
}
+Activity* Activity::suspend()
+{
+ if (suspended_)
+ return this; // Already suspended
+ suspended_ = true;
+
+ if (state_ == State::STARTED)
+ pimpl_->suspend();
+
+ return this;
+}
+
+Activity* Activity::resume()
+{
+ if (not suspended_)
+ return this; // nothing to restore when it's not suspended
+
+ if (state_ == State::STARTED)
+ pimpl_->resume();
+
+ return this;
+}
+
double Activity::get_remaining() const
{
- return remains_;
+ if (state_ == State::INITED || state_ == State::STARTING)
+ return remains_;
+ else
+ return pimpl_->get_remaining();
}
Activity* Activity::set_remaining(double remains)
} else {
xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
}
+
+ if (suspended_)
+ pimpl_->suspend();
+
state_ = State::STARTED;
return this;
}
.set_flops_amount(flops_amounts_.front())
.start();
});
+
+ if (suspended_)
+ pimpl_->suspend();
+
state_ = State::STARTED;
on_start(*Actor::self(), *this);
return this;
.start();
}
});
+
+ if (suspended_)
+ pimpl_->suspend();
+
state_ = State::STARTED;
return this;
}