From 3dac5ff35d86983862226dd0ecf222c22f87b9d4 Mon Sep 17 00:00:00 2001 From: Bruno Donassolo Date: Wed, 11 Aug 2021 19:37:22 +0200 Subject: [PATCH] Python network-nonlinear example Implement network-nonlinear using python bindings. Add bindings support for many methods (Link, NetZone, etc), hovewer not fully covered. --- MANIFEST.in | 2 + examples/python/CMakeLists.txt | 3 +- .../network-nonlinear/network-nonlinear.py | 131 ++++++++++++++++++ .../network-nonlinear/network-nonlinear.tesh | 42 ++++++ src/bindings/python/simgrid_python.cpp | 95 ++++++++++++- 5 files changed, 266 insertions(+), 7 deletions(-) create mode 100644 examples/python/network-nonlinear/network-nonlinear.py create mode 100644 examples/python/network-nonlinear/network-nonlinear.tesh diff --git a/MANIFEST.in b/MANIFEST.in index e2dda2b2a1..b219eaa4a4 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -527,6 +527,8 @@ include examples/python/exec-dvfs/exec-dvfs.py include examples/python/exec-dvfs/exec-dvfs.tesh include examples/python/exec-remote/exec-remote.py include examples/python/exec-remote/exec-remote.tesh +include examples/python/network-nonlinear/network-nonlinear.py +include examples/python/network-nonlinear/network-nonlinear.tesh include examples/smpi/NAS/DGraph.c include examples/smpi/NAS/DGraph.h include examples/smpi/NAS/README.install diff --git a/examples/python/CMakeLists.txt b/examples/python/CMakeLists.txt index c7b515c18d..168f7b80e3 100644 --- a/examples/python/CMakeLists.txt +++ b/examples/python/CMakeLists.txt @@ -1,6 +1,7 @@ foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime comm-wait comm-waitall comm-waitany - exec-async exec-basic exec-dvfs exec-remote) + exec-async exec-basic exec-dvfs exec-remote + network-nonlinear) set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.tesh) set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.py) diff --git a/examples/python/network-nonlinear/network-nonlinear.py b/examples/python/network-nonlinear/network-nonlinear.py new file mode 100644 index 0000000000..60572c0e01 --- /dev/null +++ b/examples/python/network-nonlinear/network-nonlinear.py @@ -0,0 +1,131 @@ +# Copyright (c) 2006-2021. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +# This example shows how to simulate a non-linear resource sharing for +# network links. + +from simgrid import Actor, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor +import sys +import functools + +class Sender: + """ + Send a series of messages to mailbox "receiver" + """ + def __init__(self, msg_count, msg_size=int(1e6)): + self.msg_count = msg_count + self.msg_size = msg_size + + # Actors that are created as object will execute their __call__ method. + # So, the following constitutes the main function of the Sender actor. + def __call__(self): + pending_comms = [] + mbox = Mailbox.by_name("receiver") + + for i in range(self.msg_count): + msg = "Message " + str(i) + size = self.msg_size * (i + 1) + this_actor.info("Send '%s' to '%s, msg size: %d'" % (msg, mbox.name, size)) + comm = mbox.put_async(msg, size) + pending_comms.append(comm) + + this_actor.info("Done dispatching all messages") + + # Now that all message exchanges were initiated, wait for their completion in one single call + Comm.wait_all(pending_comms) + + this_actor.info("Goodbye now!") + +class Receiver: + """ + Receiver actor: wait for N messages on the mailbox "receiver" + """ + + def __init__(self, msg_count=10): + self.msg_count = msg_count + + def __call__(self): + mbox = Mailbox.by_name("receiver") + + pending_msgs = [] + pending_comms = [] + + this_actor.info("Wait for %d messages asynchronously" % self.msg_count) + for i in range(self.msg_count): + comm, data = mbox.get_async() + pending_comms.append(comm) + pending_msgs.append(data) + + while len(pending_comms) > 0: + index = Comm.wait_any(pending_comms) + msg = pending_msgs[index].get() + this_actor.info("I got '%s'." % msg) + del pending_comms[index] + del pending_msgs[index] + +#################################################################################################### +def link_nonlinear(link, capacity, n): + """ + Non-linear resource sharing for links + + Note that the callback is called twice in this example: + 1) link UP: with the number of active flows (from 9 to 1) + 2) link DOWN: with 0 active flows. A crosstraffic communication is happing + in the down link, but it's not considered as an active flow. + """ + # emulates a degradation in link according to the number of flows + # you probably want something more complex than that and based on real + # experiments + capacity = min(capacity, capacity * (1.0 - (n - 1) / 10.0)) + this_actor.info("Link %s, %d active communications, new capacity %f" % (link.name, n, capacity)) + return capacity + +def load_platform(): + """ + Create a simple 2-hosts platform */ + ________ __________ + | Sender |===============| Receiver | + |________| Link1 |__________| + + """ + zone = NetZone.create_full_zone("Zone1") + sender = zone.create_host("sender", 1).seal() + receiver = zone.create_host("receiver", 1).seal() + + link = zone.create_split_duplex_link("link1", 1e6) + # setting same callbacks (could be different) for link UP/DOWN in split-duplex link + link.get_link_up().set_sharing_policy( + Link.SharingPolicy.NONLINEAR, + functools.partial(link_nonlinear, link.get_link_up())) + link.get_link_down().set_sharing_policy( + Link.SharingPolicy.NONLINEAR, + functools.partial(link_nonlinear, link.get_link_down())) + link.set_latency(10e-6).seal() + + # create routes between nodes + zone.add_route(sender.get_netpoint(), receiver.get_netpoint(), None, None, + [LinkInRoute(link, LinkInRoute.Direction.UP)], True) + zone.seal() + + # create actors Sender/Receiver + Actor.create("receiver", receiver, Receiver(9)) + Actor.create("sender", sender, Sender(9)) + +################################################################################################### + +if __name__ == '__main__': + e = Engine(sys.argv) + + # create platform + load_platform() + + # runs the simulation + e.run() + + # explicitly deleting Engine object to avoid segfault during cleanup phase. + # During Engine destruction, the cleanup of std::function linked to link_non_linear callback is called. + # If we let the cleanup by itself, it fails trying on its destruction because the python main program + # has already freed its variables + del(e) diff --git a/examples/python/network-nonlinear/network-nonlinear.tesh b/examples/python/network-nonlinear/network-nonlinear.tesh new file mode 100644 index 0000000000..a09ff002e3 --- /dev/null +++ b/examples/python/network-nonlinear/network-nonlinear.tesh @@ -0,0 +1,42 @@ +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/network-nonlinear.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (1:receiver@receiver) Wait for 9 messages asynchronously +>[ 0.000000] (2:sender@sender) Send 'Message 0' to 'receiver, msg size: 1000000' +>[ 0.000000] (2:sender@sender) Send 'Message 1' to 'receiver, msg size: 2000000' +>[ 0.000000] (2:sender@sender) Send 'Message 2' to 'receiver, msg size: 3000000' +>[ 0.000000] (2:sender@sender) Send 'Message 3' to 'receiver, msg size: 4000000' +>[ 0.000000] (2:sender@sender) Send 'Message 4' to 'receiver, msg size: 5000000' +>[ 0.000000] (2:sender@sender) Send 'Message 5' to 'receiver, msg size: 6000000' +>[ 0.000000] (2:sender@sender) Send 'Message 6' to 'receiver, msg size: 7000000' +>[ 0.000000] (2:sender@sender) Send 'Message 7' to 'receiver, msg size: 8000000' +>[ 0.000000] (2:sender@sender) Send 'Message 8' to 'receiver, msg size: 9000000' +>[ 0.000000] (2:sender@sender) Done dispatching all messages +>[ 0.000000] (0:maestro@) Link link1_UP, 0 active communications, new capacity 1000000.000000 +>[ 0.000000] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[ 0.000130] (0:maestro@) Link link1_UP, 9 active communications, new capacity 200000.000000 +>[ 0.000130] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[ 46.391883] (1:receiver@receiver) I got 'Message 0'. +>[ 46.391883] (0:maestro@) Link link1_UP, 8 active communications, new capacity 300000.000000 +>[ 46.391883] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[ 73.883292] (1:receiver@receiver) I got 'Message 1'. +>[ 73.883292] (0:maestro@) Link link1_UP, 7 active communications, new capacity 400000.000000 +>[ 73.883292] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[ 91.924529] (1:receiver@receiver) I got 'Message 2'. +>[ 91.924529] (0:maestro@) Link link1_UP, 6 active communications, new capacity 500000.000000 +>[ 91.924529] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[104.295663] (1:receiver@receiver) I got 'Message 3'. +>[104.295663] (0:maestro@) Link link1_UP, 5 active communications, new capacity 600000.000000 +>[104.295663] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[112.886728] (1:receiver@receiver) I got 'Message 4'. +>[112.886728] (0:maestro@) Link link1_UP, 4 active communications, new capacity 700000.000000 +>[112.886728] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[118.777744] (1:receiver@receiver) I got 'Message 5'. +>[118.777744] (0:maestro@) Link link1_UP, 3 active communications, new capacity 800000.000000 +>[118.777744] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[122.643724] (1:receiver@receiver) I got 'Message 6'. +>[122.643724] (0:maestro@) Link link1_UP, 2 active communications, new capacity 900000.000000 +>[122.643724] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[124.934674] (1:receiver@receiver) I got 'Message 7'. +>[124.934674] (0:maestro@) Link link1_UP, 1 active communications, new capacity 1000000.000000 +>[124.934674] (0:maestro@) Link link1_DOWN, 0 active communications, new capacity 1000000.000000 +>[125.965602] (1:receiver@receiver) I got 'Message 8'. +>[125.965602] (2:sender@sender) Goodbye now! diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index 78cb91cb98..ee852108dd 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -39,6 +39,7 @@ #pragma GCC diagnostic pop #endif +#include "simgrid/kernel/routing/NetPoint.hpp" #include "src/kernel/context/Context.hpp" #include #include @@ -46,7 +47,9 @@ #include #include #include +#include #include +#include #include #include @@ -74,6 +77,14 @@ std::string get_simgrid_version() return simgrid::xbt::string_printf("%i.%i.%i", major, minor, patch); } +/** @brief Wrap for mailbox::get_async */ +class PyGetAsync { + std::unique_ptr data = std::make_unique(); + +public: + PyObject** get() const { return data.get(); } +}; + /* Classes GilScopedAcquire and GilScopedRelease have the same purpose as pybind11::gil_scoped_acquire and * pybind11::gil_scoped_release. Refer to the manual of pybind11 for details: * https://pybind11.readthedocs.io/en/stable/advanced/misc.html#global-interpreter-lock-gil @@ -210,11 +221,29 @@ PYBIND11_MODULE(simgrid, m) }, "Registers the main function of an actor"); + /* Class Netzone */ + py::class_>(m, "NetZone", + "Networking Zones") + .def_static("create_full_zone", &simgrid::s4u::create_full_zone, "Creates a netzone of type FullZone") + .def("add_route", + py::overload_cast&, bool>(&simgrid::s4u::NetZone::add_route), + "Add a route between 2 netpoints") + .def("create_host", py::overload_cast(&simgrid::s4u::NetZone::create_host), + "Creates a host") + .def("create_split_duplex_link", + py::overload_cast(&simgrid::s4u::NetZone::create_split_duplex_link), + "Creates a split-duplex link") + .def("seal", &simgrid::s4u::NetZone::seal, "Seal this NetZone"); + /* Class Host */ py::class_>(m, "Host", "Simulated host") .def("by_name", &Host::by_name, "Retrieves a host from its name, or die") .def("get_pstate_count", &Host::get_pstate_count, "Retrieve the count of defined pstate levels") .def("get_pstate_speed", &Host::get_pstate_speed, "Retrieve the maximal speed at the given pstate") + .def("get_netpoint", &Host::get_netpoint, "Retrieve the netpoint associated to this host") + .def("seal", &Host::seal, "Seal this host") .def_property( "pstate", &Host::get_pstate, [](Host* h, int i) { @@ -238,6 +267,47 @@ PYBIND11_MODULE(simgrid, m) "The peak computing speed in flops/s at the current pstate, taking the external load into account. " "This is the max potential speed."); + /* Class NetPoint */ + py::class_>( + m, "NetPoint", "NetPoint object"); + + /* Class Link */ + py::class_> link(m, "Link", "Network link"); + link.def("set_latency", py::overload_cast(&simgrid::s4u::Link::set_latency), "Set the latency"); + link.def("set_latency", py::overload_cast(&simgrid::s4u::Link::set_latency), "Set the latency"); + link.def("set_sharing_policy", &simgrid::s4u::Link::set_sharing_policy, "Set sharing policy for this link"); + link.def("seal", &simgrid::s4u::Link::seal, "Seal this link"); + link.def_property_readonly( + "name", + [](const simgrid::s4u::Link* self) { + return std::string(self->get_name().c_str()); // Convert from xbt::string because of MC + }, + "The name of this link"); + py::enum_(link, "SharingPolicy") + .value("NONLINEAR", simgrid::s4u::Link::SharingPolicy::NONLINEAR) + .value("WIFI", simgrid::s4u::Link::SharingPolicy::WIFI) + .value("SPLITDUPLEX", simgrid::s4u::Link::SharingPolicy::SPLITDUPLEX) + .value("SHARED", simgrid::s4u::Link::SharingPolicy::SHARED) + .value("FATPIPE", simgrid::s4u::Link::SharingPolicy::FATPIPE) + .export_values(); + + /* Class LinkInRoute */ + py::class_ linkinroute(m, "LinkInRoute", "Abstraction to add link in routes"); + linkinroute.def(py::init()); + linkinroute.def(py::init()); + py::enum_(linkinroute, "Direction") + .value("UP", simgrid::s4u::LinkInRoute::Direction::UP) + .value("DOWN", simgrid::s4u::LinkInRoute::Direction::DOWN) + .value("NONE", simgrid::s4u::LinkInRoute::Direction::NONE) + .export_values(); + + /* Class Split-Duplex Link */ + py::class_>(m, "SplitDuplexLink", + "Network split-duplex link") + .def("get_link_up", &simgrid::s4u::SplitDuplexLink::get_link_up, "Get link direction up") + .def("get_link_down", &simgrid::s4u::SplitDuplexLink::get_link_down, "Get link direction down"); + /* Class Mailbox */ py::class_>(m, "Mailbox", "Mailbox") .def( @@ -272,12 +342,25 @@ PYBIND11_MODULE(simgrid, m) return data; }, py::call_guard(), "Blocking data reception") - .def("set_receiver", - [](Mailbox* self, ActorPtr actor) { - self->set_receiver(actor); - }, - py::call_guard(), - "Sets the actor as permanent receiver"); + .def( + "get_async", + [](Mailbox* self) -> std::tuple { + PyGetAsync wrap; + auto comm = self->get_async(wrap.get()); + return std::make_tuple(std::move(comm), std::move(wrap)); + }, + py::call_guard(), + "Non-blocking data reception. Use data.get() to get the python object after the communication has finished") + .def( + "set_receiver", [](Mailbox* self, ActorPtr actor) { self->set_receiver(actor); }, + py::call_guard(), "Sets the actor as permanent receiver"); + + /* Class PyGetAsync */ + py::class_(m, "PyGetAsync", "Wrapper for async get communications") + .def(py::init<>()) + .def( + "get", [](PyGetAsync* self) { return py::reinterpret_steal(*(self->get())); }, + "Get python object after async communication in receiver side"); /* Class Comm */ py::class_(m, "Comm", "Communication") -- 2.20.1