Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Tentative implementation of Activity::suspend() and resume()
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 8 Jul 2020 14:48:57 +0000 (16:48 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 8 Jul 2020 15:16:23 +0000 (17:16 +0200)
I decided not to change the State when the activity is suspended,
because that State merely denotes whether pimpl_ was created already
or not.

This commit also adds a is_suspended() method, as well as an example.

docs/source/app_s4u.rst
examples/README.rst
examples/s4u/CMakeLists.txt
examples/s4u/comm-suspend/s4u-comm-suspend.cpp [new file with mode: 0644]
examples/s4u/comm-suspend/s4u-comm-suspend.tesh [new file with mode: 0644]
examples/s4u/comm-suspend/s4u-comm-suspend_d.xml [new file with mode: 0644]
include/simgrid/s4u/Activity.hpp
src/s4u/s4u_Activity.cpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Exec.cpp
src/s4u/s4u_Io.cpp

index fdb8c90..1342527 100644 (file)
@@ -1708,6 +1708,17 @@ Activities life cycle
       .. autodoxymethod:: simgrid::s4u::Activity::wait_until(double time_limit)
       .. autodoxymethod:: simgrid::s4u::Activity::vetoable_start()
 
+Suspending and resuming an activity
+-----------------------------------
+
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. autodoxymethod:: simgrid::s4u::Activity::suspend
+      .. autodoxymethod:: simgrid::s4u::Activity::resume
+      .. autodoxymethod:: simgrid::s4u::Activity::is_suspended
+
 .. _API_s4u_Comm:
 
 =============
index 3ac1648..f334c3b 100644 (file)
@@ -301,6 +301,21 @@ Communications on the Network
 
          See also :cpp:func:`sg_mailbox_put_async()` and :cpp:func:`sg_comm__wait()`.
 
+ - **Suspending communications:**
+   The ``suspend()`` and ``resume()`` functions allow to block the
+   progression of a given communication for a while and then unblock it.
+   ``is_suspended()`` can be used to retrieve whether the activity is
+   currently blocked or not.
+   
+   .. tabs::
+
+      .. example-tab:: examples/s4u/comm-suspend/s4u-comm-suspend.cpp
+
+         See also :cpp:func:`simgrid::s4u::Activity::suspend()`
+        :cpp:func:`simgrid::s4u::Activity::resume()` and
+        :cpp:func:`simgrid::s4u::Activity::is_suspended()`.
+
+        
  - **Waiting for all communications in a set:**
    The ``wait_all()`` function is useful when you want to block until
    all activities in a given set have completed. 
index 5961f08..4f70f84 100644 (file)
@@ -62,7 +62,7 @@ endif()
 foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
                  actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize
                  app-bittorrent app-chainsend app-pingpong app-token-ring
-                 comm-ready comm-wait comm-waitany comm-waitall comm-waituntil
+                 comm-ready comm-suspend comm-wait comm-waitany comm-waitall comm-waituntil
                  comm-dependent
                  cloud-capping cloud-migration cloud-simple
                  dht-chord dht-kademlia
diff --git a/examples/s4u/comm-suspend/s4u-comm-suspend.cpp b/examples/s4u/comm-suspend/s4u-comm-suspend.cpp
new file mode 100644 (file)
index 0000000..144a029
--- /dev/null
@@ -0,0 +1,71 @@
+/* Copyright (c) 2010-2020. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example shows how to suspend and resume an asynchronous communication. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_wait, "Messages specific for this s4u example");
+
+static void sender(int argc, char**)
+{
+  xbt_assert(argc == 1, "Expecting no parameter from the XML deployment file but got %d", argc - 1);
+
+  simgrid::s4u::Mailbox* mbox = simgrid::s4u::Mailbox::by_name("receiver");
+
+  // Copy the data we send: the 'msg_content' variable is not a stable storage location.
+  // It will be destroyed when this actor leaves the loop, ie before the receiver gets the data
+  std::string* payload = new std::string("Sent message");
+
+  /* Create a communication representing the ongoing communication and then */
+  simgrid::s4u::CommPtr comm = mbox->put_init(payload, 13194230);
+  XBT_INFO("Suspend the communication before it starts (remaining: %.0f bytes) and wait a second.",
+           comm->get_remaining());
+  simgrid::s4u::this_actor::sleep_for(1);
+  XBT_INFO("Now, start the communication (remaining: %.0f bytes) and wait another second.", comm->get_remaining());
+  comm->start();
+  simgrid::s4u::this_actor::sleep_for(1);
+
+  XBT_INFO("There is still %.0f bytes to transfer in this communication. Suspend it for one second.",
+           comm->get_remaining());
+  comm->suspend();
+  XBT_INFO("Now there is %.0f bytes to transfer. Resume it and wait for its completion.", comm->get_remaining());
+  comm->resume();
+  comm->wait();
+  XBT_INFO("There is %f bytes to transfer after the communication completion.", comm->get_remaining());
+  XBT_INFO("Suspending a completed activity is a no-op.");
+  comm->suspend();
+}
+
+static void receiver(int, char**)
+{
+  simgrid::s4u::Mailbox* mbox = simgrid::s4u::Mailbox::by_name("receiver");
+
+  XBT_INFO("Wait for the message.");
+  void* payload = mbox->get();
+
+  const std::string* received = static_cast<std::string*>(payload);
+  XBT_INFO("I got '%s'.", received->c_str());
+
+  delete received;
+}
+
+int main(int argc, char* argv[])
+{
+  xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n", argv[0]);
+
+  simgrid::s4u::Engine e(&argc, argv);
+  e.register_function("sender", &sender);
+  e.register_function("receiver", &receiver);
+
+  e.load_platform(argv[1]);
+  e.load_deployment(argv[2]);
+  e.run();
+
+  return 0;
+}
diff --git a/examples/s4u/comm-suspend/s4u-comm-suspend.tesh b/examples/s4u/comm-suspend/s4u-comm-suspend.tesh
new file mode 100644 (file)
index 0000000..8d05012
--- /dev/null
@@ -0,0 +1,11 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-comm-suspend ${platfdir}/small_platform.xml s4u-comm-suspend_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
+> [  0.000000] (1:sender@Tremblay) Suspend the communication before it starts (remaining: 13194230 bytes) and wait a second.
+> [  0.000000] (2:receiver@Jupiter) Wait for the message.
+> [  1.000000] (1:sender@Tremblay) Now, start the communication (remaining: 13194230 bytes) and wait another second.
+> [  2.000000] (1:sender@Tremblay) There is still 6660438 bytes to transfer in this communication. Suspend it for one second.
+> [  2.000000] (1:sender@Tremblay) Now there is 6660438 bytes to transfer. Resume it and wait for its completion.
+> [  3.000000] (2:receiver@Jupiter) I got 'Sent message'.
+> [  3.000000] (1:sender@Tremblay) There is 0.000000 bytes to transfer after the communication completion.
+> [  3.000000] (1:sender@Tremblay) Suspending a completed activity is a no-op.
diff --git a/examples/s4u/comm-suspend/s4u-comm-suspend_d.xml b/examples/s4u/comm-suspend/s4u-comm-suspend_d.xml
new file mode 100644 (file)
index 0000000..3c24900
--- /dev/null
@@ -0,0 +1,6 @@
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "https://simgrid.org/simgrid.dtd">
+<platform version="4.1">
+  <actor host="Tremblay" function="sender" />
+  <actor host="Jupiter" function="receiver" />
+</platform>
index 2572d61..4a8789a 100644 (file)
@@ -76,12 +76,12 @@ public:
    * This function is optional: you can call wait() even if you didn't call start()
    */
   virtual Activity* start() = 0;
-  /** Blocks until the activity is terminated */
+  /** Blocks the current actor until the activity is terminated */
   virtual Activity* wait() = 0;
-  /** Blocks until the activity is terminated, or until the timeout is elapsed
+  /** Blocks the current actor until the activity is terminated, or until the timeout is elapsed\n
    *  Raises: timeout exception.*/
   virtual Activity* wait_for(double timeout) = 0;
-  /** Blocks until the activity is terminated, or until the time limit is reached
+  /** Blocks the current actor until the activity is terminated, or until the time limit is reached\n
    * Raises: timeout exception. */
   void wait_until(double time_limit);
 
@@ -93,6 +93,13 @@ public:
   /** Tests whether the given activity is terminated yet. */
   virtual bool test();
 
+  /** Blocks the progression of this activity until it gets resumed */
+  virtual Activity* suspend();
+  /** Unblock the progression of this activity if it was suspended previously */
+  virtual Activity* resume();
+  /** Whether or not the progression of this activity is blocked */
+  bool is_suspended() { return suspended_; }
+
   virtual const char* get_cname() const       = 0;
   virtual const std::string& get_name() const = 0;
 
@@ -127,6 +134,7 @@ private:
   kernel::activity::ActivityImplPtr pimpl_ = nullptr;
   Activity::State state_                   = Activity::State::INITED;
   double remains_                          = 0;
+  bool suspended_                          = false;
   std::vector<ActivityPtr> successors_;
   std::set<ActivityPtr> dependencies_;
   std::atomic_int_fast32_t refcount_{0};
index f1a24ae..18d27cf 100644 (file)
@@ -42,9 +42,35 @@ bool Activity::test()
   return false;
 }
 
+Activity* Activity::suspend()
+{
+  if (suspended_)
+    return this; // Already suspended
+  suspended_ = true;
+
+  if (state_ == State::STARTED)
+    pimpl_->suspend();
+
+  return this;
+}
+
+Activity* Activity::resume()
+{
+  if (not suspended_)
+    return this; // nothing to restore when it's not suspended
+
+  if (state_ == State::STARTED)
+    pimpl_->resume();
+
+  return this;
+}
+
 double Activity::get_remaining() const
 {
-  return remains_;
+  if (state_ == State::INITED || state_ == State::STARTING)
+    return remains_;
+  else
+    return pimpl_->get_remaining();
 }
 
 Activity* Activity::set_remaining(double remains)
index 4d5bb78..9141385 100644 (file)
@@ -138,6 +138,10 @@ Comm* Comm::start()
   } else {
     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
   }
+
+  if (suspended_)
+    pimpl_->suspend();
+
   state_ = State::STARTED;
   return this;
 }
index b0c7175..7908468 100644 (file)
@@ -180,6 +180,10 @@ Exec* Exec::start()
           .set_flops_amount(flops_amounts_.front())
           .start();
     });
+
+  if (suspended_)
+    pimpl_->suspend();
+
   state_ = State::STARTED;
   on_start(*Actor::self(), *this);
   return this;
index cc9794a..4831daf 100644 (file)
@@ -44,6 +44,10 @@ Io* Io::start()
           .start();
     }
   });
+
+  if (suspended_)
+    pimpl_->suspend();
+
   state_ = State::STARTED;
   return this;
 }