From d9e2eb51f2650b1468c22d15f636a6e24fdbf3cc Mon Sep 17 00:00:00 2001 From: Bruno Donassolo Date: Mon, 16 Aug 2021 16:09:58 +0200 Subject: [PATCH] Python: an example with disks --- MANIFEST.in | 2 + examples/python/CMakeLists.txt | 2 +- .../python/io-degradation/io-degradation.py | 124 ++++++++++++++++++ .../python/io-degradation/io-degradation.tesh | 32 +++++ src/bindings/python/simgrid_python.cpp | 54 +++++--- 5 files changed, 198 insertions(+), 16 deletions(-) create mode 100644 examples/python/io-degradation/io-degradation.py create mode 100644 examples/python/io-degradation/io-degradation.tesh diff --git a/MANIFEST.in b/MANIFEST.in index f392e2705b..07aa6d425a 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -529,6 +529,8 @@ include examples/python/exec-dvfs/exec-dvfs.py include examples/python/exec-dvfs/exec-dvfs.tesh include examples/python/exec-remote/exec-remote.py include examples/python/exec-remote/exec-remote.tesh +include examples/python/io-degradation/io-degradation.py +include examples/python/io-degradation/io-degradation.tesh include examples/python/network-nonlinear/network-nonlinear.py include examples/python/network-nonlinear/network-nonlinear.tesh include examples/smpi/NAS/DGraph.c diff --git a/examples/python/CMakeLists.txt b/examples/python/CMakeLists.txt index d8d91a7197..3756d27958 100644 --- a/examples/python/CMakeLists.txt +++ b/examples/python/CMakeLists.txt @@ -1,7 +1,7 @@ foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime comm-wait comm-waitall comm-waitany exec-async exec-basic exec-dvfs exec-remote - network-nonlinear clusters-multicpu) + network-nonlinear clusters-multicpu io-degradation) set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.tesh) set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.py) diff --git a/examples/python/io-degradation/io-degradation.py b/examples/python/io-degradation/io-degradation.py new file mode 100644 index 0000000000..a81910b2de --- /dev/null +++ b/examples/python/io-degradation/io-degradation.py @@ -0,0 +1,124 @@ +# Copyright (c) 2006-2021. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +# This example shows how to simulate a non-linear resource sharing for disk +# operations. +# +# It is inspired on the paper +# "Adding Storage Simulation Capacities to the SimGridToolkit: Concepts, Models, and API" +# Available at : https://hal.inria.fr/hal-01197128/document +# +# It shows how to simulate concurrent operations degrading overall performance of IO +# operations (specifically the effects presented in Fig. 8 of the paper). + + +from simgrid import Actor, Engine, NetZone, Host, Disk, this_actor +import sys +import functools + + +def estimate_bw(disk: Disk, n_flows: int, read: bool): + """ Calculates the bandwidth for disk doing async operations """ + size = 100000 + cur_time = Engine.get_clock() + activities = [disk.read_async(size) if read else disk.write_async( + size) for _ in range(n_flows)] + + for act in activities: + act.wait() + + elapsed_time = Engine.get_clock() - cur_time + estimated_bw = float(size * n_flows) / elapsed_time + this_actor.info("Disk: %s, concurrent %s: %d, estimated bandwidth: %f" % ( + disk.name, "read" if read else "write", n_flows, estimated_bw)) + + +def host(): + # Estimating bw for each disk and considering concurrent flows + for n in range(1, 15, 2): + for disk in Host.current().get_disks(): + estimate_bw(disk, n, True) + estimate_bw(disk, n, False) + + +def ssd_dynamic_sharing(disk: Disk, op: str, capacity: float, n: int) -> float: + """ + Non-linear resource callback for SSD disks + + In this case, we have measurements for some resource sharing and directly use them to return the + correct value + :param disk: Disk on which the operation is happening (defined by the user through the std::bind) + :param op: read or write operation (defined by the user through the std::bind) + :param capacity: Resource current capacity in SimGrid + :param n: Number of activities sharing this resource + """ + # measurements for SSD disks + speed = { + "write": {1: 131.}, + "read": {1: 152., 2: 161., 3: 184., 4: 197., 5: 207., 6: 215., 7: 220., 8: 224., 9: 227., 10: 231., 11: 233., 12: 235., 13: 237., 14: 238., 15: 239.} + } + + # no special bandwidth for this disk sharing N flows, just returns maximal capacity + if (n in speed[op]): + capacity = speed[op][n] + + return capacity + + +def sata_dynamic_sharing(disk: Disk, capacity: float, n: int) -> float: + """ + Non-linear resource callback for SATA disks + + In this case, the degradation for read operations is linear and we have a formula that represents it. + + :param disk: Disk on which the operation is happening (defined by the user through the std::bind) + :param capacity: Resource current capacity in SimGrid + :param n: Number of activities sharing this resource + :return: New disk capacity + """ + return 68.3 - 1.7 * n + + +def create_ssd_disk(host: Host, disk_name: str): + """ Creates an SSD disk, setting the appropriate callback for non-linear resource sharing """ + disk = host.create_disk(disk_name, "240MBps", "170MBps") + disk.set_sharing_policy(Disk.Operation.READ, Disk.SharingPolicy.NONLINEAR, + functools.partial(ssd_dynamic_sharing, disk, "read")) + disk.set_sharing_policy(Disk.Operation.WRITE, Disk.SharingPolicy.NONLINEAR, + functools.partial(ssd_dynamic_sharing, disk, "write")) + disk.set_sharing_policy(Disk.Operation.READWRITE, + Disk.SharingPolicy.LINEAR) + + +def create_sata_disk(host: Host, disk_name: str): + """ Same for a SATA disk, only read operation follows a non-linear resource sharing """ + disk = host.create_disk(disk_name, "68MBps", "50MBps") + disk.set_sharing_policy(Disk.Operation.READ, Disk.SharingPolicy.NONLINEAR, + functools.partial(sata_dynamic_sharing, disk)) + # this is the default behavior, expliciting only to make it clearer + disk.set_sharing_policy(Disk.Operation.WRITE, Disk.SharingPolicy.LINEAR) + disk.set_sharing_policy(Disk.Operation.READWRITE, + Disk.SharingPolicy.LINEAR) + + +if __name__ == '__main__': + e = Engine(sys.argv) + # simple platform containing 1 host and 2 disk + zone = NetZone.create_full_zone("bob_zone") + bob = zone.create_host("bob", 1e6) + create_ssd_disk(bob, "Edel (SSD)") + create_sata_disk(bob, "Griffon (SATA II)") + zone.seal() + + Actor.create("", bob, host) + + e.run() + this_actor.info("Simulated time: %g" % Engine.get_clock()) + + # explicitly deleting Engine object to avoid segfault during cleanup phase. + # During Engine destruction, the cleanup of std::function linked to non_linear callback is called. + # If we let the cleanup by itself, it fails trying on its destruction because the python main program + # has already freed its variables + del(e) diff --git a/examples/python/io-degradation/io-degradation.tesh b/examples/python/io-degradation/io-degradation.tesh new file mode 100644 index 0000000000..c7feea236c --- /dev/null +++ b/examples/python/io-degradation/io-degradation.tesh @@ -0,0 +1,32 @@ +#!/usr/bin/env tesh + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/io-degradation.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +> [657.894737] (1:@bob) Disk: Edel (SSD), concurrent read: 1, estimated bandwidth: 152.000000 +> [1421.253515] (1:@bob) Disk: Edel (SSD), concurrent write: 1, estimated bandwidth: 131.000000 +> [2922.755017] (1:@bob) Disk: Griffon (SATA II), concurrent read: 1, estimated bandwidth: 66.600000 +> [2922.757017] (1:@bob) Disk: Griffon (SATA II), concurrent write: 1, estimated bandwidth: 50000000.001182 +> [4553.191800] (1:@bob) Disk: Edel (SSD), concurrent read: 3, estimated bandwidth: 184.000000 +> [4553.193564] (1:@bob) Disk: Edel (SSD), concurrent write: 3, estimated bandwidth: 170000000.022058 +> [9300.029007] (1:@bob) Disk: Griffon (SATA II), concurrent read: 3, estimated bandwidth: 63.200000 +> [9300.035007] (1:@bob) Disk: Griffon (SATA II), concurrent write: 3, estimated bandwidth: 50000000.004972 +> [11715.493945] (1:@bob) Disk: Edel (SSD), concurrent read: 5, estimated bandwidth: 207.000000 +> [11715.496886] (1:@bob) Disk: Edel (SSD), concurrent write: 5, estimated bandwidth: 170000000.039581 +> [20076.700899] (1:@bob) Disk: Griffon (SATA II), concurrent read: 5, estimated bandwidth: 59.800000 +> [20076.710899] (1:@bob) Disk: Griffon (SATA II), concurrent write: 5, estimated bandwidth: 50000000.008004 +> [23258.529081] (1:@bob) Disk: Edel (SSD), concurrent read: 7, estimated bandwidth: 220.000000 +> [23258.533199] (1:@bob) Disk: Edel (SSD), concurrent write: 7, estimated bandwidth: 170000000.009542 +> [35669.880716] (1:@bob) Disk: Griffon (SATA II), concurrent read: 7, estimated bandwidth: 56.400000 +> [35669.894716] (1:@bob) Disk: Griffon (SATA II), concurrent write: 7, estimated bandwidth: 49999999.989814 +> [39634.652426] (1:@bob) Disk: Edel (SSD), concurrent read: 9, estimated bandwidth: 227.000000 +> [39634.657720] (1:@bob) Disk: Edel (SSD), concurrent write: 9, estimated bandwidth: 169999999.992853 +> [56615.789795] (1:@bob) Disk: Griffon (SATA II), concurrent read: 9, estimated bandwidth: 53.000000 +> [56615.807795] (1:@bob) Disk: Griffon (SATA II), concurrent write: 9, estimated bandwidth: 50000000.010025 +> [61336.837838] (1:@bob) Disk: Edel (SSD), concurrent read: 11, estimated bandwidth: 233.000000 +> [61336.844309] (1:@bob) Disk: Edel (SSD), concurrent write: 11, estimated bandwidth: 170000000.077813 +> [83514.263663] (1:@bob) Disk: Griffon (SATA II), concurrent read: 11, estimated bandwidth: 49.600000 +> [83514.285663] (1:@bob) Disk: Griffon (SATA II), concurrent write: 11, estimated bandwidth: 50000000.006350 +> [88999.517731] (1:@bob) Disk: Edel (SSD), concurrent read: 13, estimated bandwidth: 237.000000 +> [88999.525378] (1:@bob) Disk: Edel (SSD), concurrent write: 13, estimated bandwidth: 169999999.974881 +> [117138.053517] (1:@bob) Disk: Griffon (SATA II), concurrent read: 13, estimated bandwidth: 46.200000 +> [117138.079517] (1:@bob) Disk: Griffon (SATA II), concurrent write: 13, estimated bandwidth: 50000000.003806 +> [117138.079517] (0:maestro@) Simulated time: 117138 diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index 6d4d372974..037c2ffde0 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -330,12 +330,25 @@ PYBIND11_MODULE(simgrid, m) "This is the max potential speed."); /* Class Disk */ - py::class_>(m, "Disk", "Simulated disk") - .def("read", &simgrid::s4u::Disk::read, py::call_guard(), "Read data from disk") + py::class_> disk(m, "Disk", "Simulated disk"); + disk.def("read", &simgrid::s4u::Disk::read, py::call_guard(), "Read data from disk") .def("write", &simgrid::s4u::Disk::write, py::call_guard(), "Write data in disk") + .def("read_async", &simgrid::s4u::Disk::read_async, "Non-blocking read data from disk") + .def("write_async", &simgrid::s4u::Disk::write_async, "Non-blocking write data in disk") + .def("set_sharing_policy", &simgrid::s4u::Disk::set_sharing_policy, "Set sharing policy for this disk", + py::arg("op"), py::arg("policy"), py::arg("cb") = simgrid::s4u::NonLinearResourceCb()) .def("seal", &simgrid::s4u::Disk::seal, "Seal this disk") .def_property_readonly( "name", [](const simgrid::s4u::Disk* self) { return self->get_name(); }, "The name of this disk"); + py::enum_(disk, "SharingPolicy") + .value("NONLINEAR", simgrid::s4u::Disk::SharingPolicy::NONLINEAR) + .value("LINEAR", simgrid::s4u::Disk::SharingPolicy::LINEAR) + .export_values(); + py::enum_(disk, "Operation") + .value("READ", simgrid::s4u::Disk::Operation::READ) + .value("WRITE", simgrid::s4u::Disk::Operation::WRITE) + .value("READWRITE", simgrid::s4u::Disk::Operation::READWRITE) + .export_values(); /* Class NetPoint */ py::class_>( @@ -343,19 +356,19 @@ PYBIND11_MODULE(simgrid, m) /* Class Link */ py::class_> link(m, "Link", "Network link"); - link.def("set_latency", py::overload_cast(&simgrid::s4u::Link::set_latency), "Set the latency"); - link.def("set_latency", py::overload_cast(&simgrid::s4u::Link::set_latency), "Set the latency"); - link.def("set_sharing_policy", &simgrid::s4u::Link::set_sharing_policy, "Set sharing policy for this link"); - link.def("set_concurrency_limit", &simgrid::s4u::Link::set_concurrency_limit, "Set concurrency limit for this link"); - link.def("set_host_wifi_rate", &simgrid::s4u::Link::set_host_wifi_rate, - "Set level of communication speed of given host on this Wi-Fi link"); - link.def("seal", &simgrid::s4u::Link::seal, "Seal this link"); - link.def_property_readonly( - "name", - [](const simgrid::s4u::Link* self) { - return std::string(self->get_name().c_str()); // Convert from xbt::string because of MC - }, - "The name of this link"); + link.def("set_latency", py::overload_cast(&simgrid::s4u::Link::set_latency), "Set the latency") + .def("set_latency", py::overload_cast(&simgrid::s4u::Link::set_latency), "Set the latency") + .def("set_sharing_policy", &simgrid::s4u::Link::set_sharing_policy, "Set sharing policy for this link") + .def("set_concurrency_limit", &simgrid::s4u::Link::set_concurrency_limit, "Set concurrency limit for this link") + .def("set_host_wifi_rate", &simgrid::s4u::Link::set_host_wifi_rate, + "Set level of communication speed of given host on this Wi-Fi link") + .def("seal", &simgrid::s4u::Link::seal, "Seal this link") + .def_property_readonly( + "name", + [](const simgrid::s4u::Link* self) { + return std::string(self->get_name().c_str()); // Convert from xbt::string because of MC + }, + "The name of this link"); py::enum_(link, "SharingPolicy") .value("NONLINEAR", simgrid::s4u::Link::SharingPolicy::NONLINEAR) .value("WIFI", simgrid::s4u::Link::SharingPolicy::WIFI) @@ -450,6 +463,17 @@ PYBIND11_MODULE(simgrid, m) py::call_guard(), "Block until the completion of any communication in the list and return the index of the terminated one."); + /* Class Io */ + py::class_(m, "Io", "I/O activities") + .def("test", &simgrid::s4u::Io::test, py::call_guard(), "Test whether the I/O is terminated.") + .def("wait", &simgrid::s4u::Io::wait, py::call_guard(), + "Block until the completion of that I/O operation") + .def_static( + "wait_any_for", &simgrid::s4u::Io::wait_any_for, py::call_guard(), + "Block until the completion of any I/O in the list (or timeout) and return the index of the terminated one.") + .def_static("wait_any", &simgrid::s4u::Io::wait_any, py::call_guard(), + "Block until the completion of any I/O in the list and return the index of the terminated one."); + /* Class Exec */ py::class_(m, "Exec", "Execution") .def_property_readonly( -- 2.20.1