Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Python network-nonlinear example
authorBruno Donassolo <bruno.donassolo@inria.fr>
Wed, 11 Aug 2021 17:37:22 +0000 (19:37 +0200)
committerBruno Donassolo <bruno.donassolo@inria.fr>
Mon, 16 Aug 2021 08:58:53 +0000 (10:58 +0200)
Implement network-nonlinear using python bindings.

Add bindings support for many methods (Link, NetZone, etc), hovewer not fully covered.

MANIFEST.in
examples/python/CMakeLists.txt
examples/python/network-nonlinear/network-nonlinear.py [new file with mode: 0644]
examples/python/network-nonlinear/network-nonlinear.tesh [new file with mode: 0644]
src/bindings/python/simgrid_python.cpp

index e2dda2b..b219eaa 100644 (file)
@@ -527,6 +527,8 @@ include examples/python/exec-dvfs/exec-dvfs.py
 include examples/python/exec-dvfs/exec-dvfs.tesh
 include examples/python/exec-remote/exec-remote.py
 include examples/python/exec-remote/exec-remote.tesh
+include examples/python/network-nonlinear/network-nonlinear.py
+include examples/python/network-nonlinear/network-nonlinear.tesh
 include examples/smpi/NAS/DGraph.c
 include examples/smpi/NAS/DGraph.h
 include examples/smpi/NAS/README.install
index c7b515c..168f7b8 100644 (file)
@@ -1,6 +1,7 @@
 foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
                 comm-wait comm-waitall comm-waitany
-                exec-async exec-basic exec-dvfs exec-remote)
+                exec-async exec-basic exec-dvfs exec-remote
+                network-nonlinear)
   set(tesh_files    ${tesh_files}   ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.tesh)
   set(examples_src  ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.py)
 
diff --git a/examples/python/network-nonlinear/network-nonlinear.py b/examples/python/network-nonlinear/network-nonlinear.py
new file mode 100644 (file)
index 0000000..60572c0
--- /dev/null
@@ -0,0 +1,131 @@
+# Copyright (c) 2006-2021. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+# This example shows how to simulate a non-linear resource sharing for
+# network links.
+
+from simgrid import Actor, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
+import sys
+import functools
+
+class Sender:
+  """
+  Send a series of messages to mailbox "receiver"
+  """
+  def __init__(self, msg_count, msg_size=int(1e6)):
+    self.msg_count = msg_count
+    self.msg_size = msg_size
+  
+  # Actors that are created as object will execute their __call__ method.
+  # So, the following constitutes the main function of the Sender actor.
+  def __call__(self):
+    pending_comms = []
+    mbox = Mailbox.by_name("receiver")
+
+    for i in range(self.msg_count):
+      msg = "Message " + str(i)
+      size = self.msg_size * (i + 1)
+      this_actor.info("Send '%s' to '%s, msg size: %d'" % (msg, mbox.name, size))
+      comm = mbox.put_async(msg, size)
+      pending_comms.append(comm)
+
+    this_actor.info("Done dispatching all messages")
+
+    # Now that all message exchanges were initiated, wait for their completion in one single call
+    Comm.wait_all(pending_comms)
+
+    this_actor.info("Goodbye now!")
+
+class Receiver:
+  """
+  Receiver actor: wait for N messages on the mailbox "receiver"
+  """
+
+  def __init__(self, msg_count=10):
+    self.msg_count = msg_count
+
+  def __call__(self):
+    mbox = Mailbox.by_name("receiver")
+
+    pending_msgs = []
+    pending_comms = []
+
+    this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
+    for i in range(self.msg_count):
+      comm, data = mbox.get_async()
+      pending_comms.append(comm)
+      pending_msgs.append(data)
+    
+    while len(pending_comms) > 0:
+      index = Comm.wait_any(pending_comms)
+      msg = pending_msgs[index].get()
+      this_actor.info("I got '%s'." % msg)
+      del pending_comms[index]
+      del pending_msgs[index]
+
+####################################################################################################
+def link_nonlinear(link, capacity, n):
+  """
+  Non-linear resource sharing for links
+
+  Note that the callback is called twice in this example:
+   1) link UP: with the number of active flows (from 9 to 1)
+   2) link DOWN: with 0 active flows. A crosstraffic communication is happing
+   in the down link, but it's not considered as an active flow.
+  """
+  # emulates a degradation in link according to the number of flows
+  # you probably want something more complex than that and based on real
+  # experiments
+  capacity = min(capacity, capacity * (1.0 - (n - 1) / 10.0))
+  this_actor.info("Link %s, %d active communications, new capacity %f" % (link.name, n, capacity))
+  return capacity
+
+def load_platform():
+  """
+  Create a simple 2-hosts platform */
+   ________                 __________
+  | Sender |===============| Receiver |
+  |________|    Link1      |__________|
+  
+  """
+  zone = NetZone.create_full_zone("Zone1")
+  sender = zone.create_host("sender", 1).seal()
+  receiver = zone.create_host("receiver", 1).seal()
+
+  link = zone.create_split_duplex_link("link1", 1e6)
+  # setting same callbacks (could be different) for link UP/DOWN in split-duplex link
+  link.get_link_up().set_sharing_policy(
+      Link.SharingPolicy.NONLINEAR,
+      functools.partial(link_nonlinear, link.get_link_up()))
+  link.get_link_down().set_sharing_policy(
+      Link.SharingPolicy.NONLINEAR,
+      functools.partial(link_nonlinear, link.get_link_down()))
+  link.set_latency(10e-6).seal()
+
+  # create routes between nodes
+  zone.add_route(sender.get_netpoint(), receiver.get_netpoint(), None, None,
+                 [LinkInRoute(link, LinkInRoute.Direction.UP)], True)
+  zone.seal()
+
+  # create actors Sender/Receiver
+  Actor.create("receiver", receiver, Receiver(9))
+  Actor.create("sender", sender, Sender(9))
+
+###################################################################################################
+
+if __name__ == '__main__':
+  e = Engine(sys.argv)
+
+  # create platform
+  load_platform()
+
+  # runs the simulation
+  e.run()
+
+  # explicitly deleting Engine object to avoid segfault during cleanup phase.
+  # During Engine destruction, the cleanup of std::function linked to link_non_linear callback is called.
+  # If we let the cleanup by itself, it fails trying on its destruction because the python main program
+  # has already freed its variables
+  del(e)
diff --git a/examples/python/network-nonlinear/network-nonlinear.tesh b/examples/python/network-nonlinear/network-nonlinear.tesh
new file mode 100644 (file)
index 0000000..a09ff00
--- /dev/null
@@ -0,0 +1,42 @@
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/network-nonlinear.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (1:receiver@receiver) Wait for 9 messages asynchronously
+>[  0.000000] (2:sender@sender) Send 'Message 0' to 'receiver, msg size: 1000000'
+>[  0.000000] (2:sender@sender) Send 'Message 1' to 'receiver, msg size: 2000000'
+>[  0.000000] (2:sender@sender) Send 'Message 2' to 'receiver, msg size: 3000000'
+>[  0.000000] (2:sender@sender) Send 'Message 3' to 'receiver, msg size: 4000000'
+>[  0.000000] (2:sender@sender) Send 'Message 4' to 'receiver, msg size: 5000000'
+>[  0.000000] (2:sender@sender) Send 'Message 5' to 'receiver, msg size: 6000000'
+>[  0.000000] (2:sender@sender) Send 'Message 6' to 'receiver, msg size: 7000000'
+>[  0.000000] (2:sender@sender) Send 'Message 7' to 'receiver, msg size: 8000000'
+>[  0.000000] (2:sender@sender) Send 'Message 8' to 'receiver, msg size: 9000000'
+>[  0.000000] (2:sender@sender) Done dispatching all messages
+>[  0.000000] (0:maestro@) Link link1_UP, 0 active communications, new capacity 1000000.000000
+>[  0.000000] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[  0.000130] (0:maestro@) Link link1_UP, 9 active communications, new capacity 200000.000000
+>[  0.000130] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[ 46.391883] (1:receiver@receiver) I got 'Message 0'.
+>[ 46.391883] (0:maestro@) Link link1_UP, 8 active communications, new capacity 300000.000000
+>[ 46.391883] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[ 73.883292] (1:receiver@receiver) I got 'Message 1'.
+>[ 73.883292] (0:maestro@) Link link1_UP, 7 active communications, new capacity 400000.000000
+>[ 73.883292] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[ 91.924529] (1:receiver@receiver) I got 'Message 2'.
+>[ 91.924529] (0:maestro@) Link link1_UP, 6 active communications, new capacity 500000.000000
+>[ 91.924529] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[104.295663] (1:receiver@receiver) I got 'Message 3'.
+>[104.295663] (0:maestro@) Link link1_UP, 5 active communications, new capacity 600000.000000
+>[104.295663] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[112.886728] (1:receiver@receiver) I got 'Message 4'.
+>[112.886728] (0:maestro@) Link link1_UP, 4 active communications, new capacity 700000.000000
+>[112.886728] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[118.777744] (1:receiver@receiver) I got 'Message 5'.
+>[118.777744] (0:maestro@) Link link1_UP, 3 active communications, new capacity 800000.000000
+>[118.777744] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[122.643724] (1:receiver@receiver) I got 'Message 6'.
+>[122.643724] (0:maestro@) Link link1_UP, 2 active communications, new capacity 900000.000000
+>[122.643724] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[124.934674] (1:receiver@receiver) I got 'Message 7'.
+>[124.934674] (0:maestro@) Link link1_UP, 1 active communications, new capacity 1000000.000000
+>[124.934674] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000
+>[125.965602] (1:receiver@receiver) I got 'Message 8'.
+>[125.965602] (2:sender@sender) Goodbye now!
index 78cb91c..ee85210 100644 (file)
@@ -39,6 +39,7 @@
 #pragma GCC diagnostic pop
 #endif
 
+#include "simgrid/kernel/routing/NetPoint.hpp"
 #include "src/kernel/context/Context.hpp"
 #include <simgrid/Exception.hpp>
 #include <simgrid/s4u/Actor.hpp>
@@ -46,7 +47,9 @@
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Exec.hpp>
 #include <simgrid/s4u/Host.hpp>
+#include <simgrid/s4u/Link.hpp>
 #include <simgrid/s4u/Mailbox.hpp>
+#include <simgrid/s4u/NetZone.hpp>
 #include <simgrid/version.h>
 
 #include <algorithm>
@@ -74,6 +77,14 @@ std::string get_simgrid_version()
   return simgrid::xbt::string_printf("%i.%i.%i", major, minor, patch);
 }
 
+/** @brief Wrap for mailbox::get_async */
+class PyGetAsync {
+  std::unique_ptr<PyObject*> data = std::make_unique<PyObject*>();
+
+public:
+  PyObject** get() const { return data.get(); }
+};
+
 /* Classes GilScopedAcquire and GilScopedRelease have the same purpose as pybind11::gil_scoped_acquire and
  * pybind11::gil_scoped_release.  Refer to the manual of pybind11 for details:
  * https://pybind11.readthedocs.io/en/stable/advanced/misc.html#global-interpreter-lock-gil
@@ -210,11 +221,29 @@ PYBIND11_MODULE(simgrid, m)
           },
           "Registers the main function of an actor");
 
+  /* Class Netzone */
+  py::class_<simgrid::s4u::NetZone, std::unique_ptr<simgrid::s4u::NetZone, py::nodelete>>(m, "NetZone",
+                                                                                          "Networking Zones")
+      .def_static("create_full_zone", &simgrid::s4u::create_full_zone, "Creates a netzone of type FullZone")
+      .def("add_route",
+           py::overload_cast<simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*,
+                             simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*,
+                             const std::vector<simgrid::s4u::LinkInRoute>&, bool>(&simgrid::s4u::NetZone::add_route),
+           "Add a route between 2 netpoints")
+      .def("create_host", py::overload_cast<const std::string&, double>(&simgrid::s4u::NetZone::create_host),
+           "Creates a host")
+      .def("create_split_duplex_link",
+           py::overload_cast<const std::string&, double>(&simgrid::s4u::NetZone::create_split_duplex_link),
+           "Creates a split-duplex link")
+      .def("seal", &simgrid::s4u::NetZone::seal, "Seal this NetZone");
+
   /* Class Host */
   py::class_<simgrid::s4u::Host, std::unique_ptr<Host, py::nodelete>>(m, "Host", "Simulated host")
       .def("by_name", &Host::by_name, "Retrieves a host from its name, or die")
       .def("get_pstate_count", &Host::get_pstate_count, "Retrieve the count of defined pstate levels")
       .def("get_pstate_speed", &Host::get_pstate_speed, "Retrieve the maximal speed at the given pstate")
+      .def("get_netpoint", &Host::get_netpoint, "Retrieve the netpoint associated to this host")
+      .def("seal", &Host::seal, "Seal this host")
       .def_property(
           "pstate", &Host::get_pstate,
           [](Host* h, int i) {
@@ -238,6 +267,47 @@ PYBIND11_MODULE(simgrid, m)
           "The peak computing speed in flops/s at the current pstate, taking the external load into account. "
           "This is the max potential speed.");
 
+  /* Class NetPoint */
+  py::class_<simgrid::kernel::routing::NetPoint, std::unique_ptr<simgrid::kernel::routing::NetPoint, py::nodelete>>(
+      m, "NetPoint", "NetPoint object");
+
+  /* Class Link */
+  py::class_<simgrid::s4u::Link, std::unique_ptr<simgrid::s4u::Link, py::nodelete>> link(m, "Link", "Network link");
+  link.def("set_latency", py::overload_cast<const std::string&>(&simgrid::s4u::Link::set_latency), "Set the latency");
+  link.def("set_latency", py::overload_cast<double>(&simgrid::s4u::Link::set_latency), "Set the latency");
+  link.def("set_sharing_policy", &simgrid::s4u::Link::set_sharing_policy, "Set sharing policy for this link");
+  link.def("seal", &simgrid::s4u::Link::seal, "Seal this link");
+  link.def_property_readonly(
+      "name",
+      [](const simgrid::s4u::Link* self) {
+        return std::string(self->get_name().c_str()); // Convert from xbt::string because of MC
+      },
+      "The name of this link");
+  py::enum_<simgrid::s4u::Link::SharingPolicy>(link, "SharingPolicy")
+      .value("NONLINEAR", simgrid::s4u::Link::SharingPolicy::NONLINEAR)
+      .value("WIFI", simgrid::s4u::Link::SharingPolicy::WIFI)
+      .value("SPLITDUPLEX", simgrid::s4u::Link::SharingPolicy::SPLITDUPLEX)
+      .value("SHARED", simgrid::s4u::Link::SharingPolicy::SHARED)
+      .value("FATPIPE", simgrid::s4u::Link::SharingPolicy::FATPIPE)
+      .export_values();
+
+  /* Class LinkInRoute */
+  py::class_<simgrid::s4u::LinkInRoute> linkinroute(m, "LinkInRoute", "Abstraction to add link in routes");
+  linkinroute.def(py::init<const simgrid::s4u::Link*>());
+  linkinroute.def(py::init<const simgrid::s4u::Link*, simgrid::s4u::LinkInRoute::Direction>());
+  py::enum_<simgrid::s4u::LinkInRoute::Direction>(linkinroute, "Direction")
+      .value("UP", simgrid::s4u::LinkInRoute::Direction::UP)
+      .value("DOWN", simgrid::s4u::LinkInRoute::Direction::DOWN)
+      .value("NONE", simgrid::s4u::LinkInRoute::Direction::NONE)
+      .export_values();
+
+  /* Class Split-Duplex Link */
+  py::class_<simgrid::s4u::SplitDuplexLink, simgrid::s4u::Link,
+             std::unique_ptr<simgrid::s4u::SplitDuplexLink, py::nodelete>>(m, "SplitDuplexLink",
+                                                                           "Network split-duplex link")
+      .def("get_link_up", &simgrid::s4u::SplitDuplexLink::get_link_up, "Get link direction up")
+      .def("get_link_down", &simgrid::s4u::SplitDuplexLink::get_link_down, "Get link direction down");
+
   /* Class Mailbox */
   py::class_<simgrid::s4u::Mailbox, std::unique_ptr<Mailbox, py::nodelete>>(m, "Mailbox", "Mailbox")
       .def(
@@ -272,12 +342,25 @@ PYBIND11_MODULE(simgrid, m)
             return data;
           },
           py::call_guard<GilScopedRelease>(), "Blocking data reception")
-      .def("set_receiver",
-        [](Mailbox* self, ActorPtr actor) {
-          self->set_receiver(actor);
-        },
-        py::call_guard<GilScopedRelease>(),
-        "Sets the actor as permanent receiver");
+      .def(
+          "get_async",
+          [](Mailbox* self) -> std::tuple<simgrid::s4u::CommPtr, PyGetAsync> {
+            PyGetAsync wrap;
+            auto comm = self->get_async(wrap.get());
+            return std::make_tuple(std::move(comm), std::move(wrap));
+          },
+          py::call_guard<GilScopedRelease>(),
+          "Non-blocking data reception. Use data.get() to get the python object after the communication has finished")
+      .def(
+          "set_receiver", [](Mailbox* self, ActorPtr actor) { self->set_receiver(actor); },
+          py::call_guard<GilScopedRelease>(), "Sets the actor as permanent receiver");
+
+  /* Class PyGetAsync */
+  py::class_<PyGetAsync>(m, "PyGetAsync", "Wrapper for async get communications")
+      .def(py::init<>())
+      .def(
+          "get", [](PyGetAsync* self) { return py::reinterpret_steal<py::object>(*(self->get())); },
+          "Get python object after async communication in receiver side");
 
   /* Class Comm */
   py::class_<simgrid::s4u::Comm, simgrid::s4u::CommPtr>(m, "Comm", "Communication")