Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Python: an example with disks
authorBruno Donassolo <bruno.donassolo@inria.fr>
Mon, 16 Aug 2021 14:09:58 +0000 (16:09 +0200)
committerBruno Donassolo <bruno.donassolo@inria.fr>
Tue, 17 Aug 2021 10:30:42 +0000 (12:30 +0200)
MANIFEST.in
examples/python/CMakeLists.txt
examples/python/io-degradation/io-degradation.py [new file with mode: 0644]
examples/python/io-degradation/io-degradation.tesh [new file with mode: 0644]
src/bindings/python/simgrid_python.cpp

index f392e27..07aa6d4 100644 (file)
@@ -529,6 +529,8 @@ include examples/python/exec-dvfs/exec-dvfs.py
 include examples/python/exec-dvfs/exec-dvfs.tesh
 include examples/python/exec-remote/exec-remote.py
 include examples/python/exec-remote/exec-remote.tesh
+include examples/python/io-degradation/io-degradation.py
+include examples/python/io-degradation/io-degradation.tesh
 include examples/python/network-nonlinear/network-nonlinear.py
 include examples/python/network-nonlinear/network-nonlinear.tesh
 include examples/smpi/NAS/DGraph.c
index d8d91a7..3756d27 100644 (file)
@@ -1,7 +1,7 @@
 foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
                 comm-wait comm-waitall comm-waitany
                 exec-async exec-basic exec-dvfs exec-remote
-                network-nonlinear clusters-multicpu)
+                network-nonlinear clusters-multicpu io-degradation)
   set(tesh_files    ${tesh_files}   ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.tesh)
   set(examples_src  ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.py)
 
diff --git a/examples/python/io-degradation/io-degradation.py b/examples/python/io-degradation/io-degradation.py
new file mode 100644 (file)
index 0000000..a81910b
--- /dev/null
@@ -0,0 +1,124 @@
+# Copyright (c) 2006-2021. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+# This example shows how to simulate a non-linear resource sharing for disk
+# operations.
+#
+# It is inspired on the paper
+# "Adding Storage Simulation Capacities to the SimGridToolkit: Concepts, Models, and API"
+# Available at : https://hal.inria.fr/hal-01197128/document
+#
+# It shows how to simulate concurrent operations degrading overall performance of IO
+# operations (specifically the effects presented in Fig. 8 of the paper).
+
+
+from simgrid import Actor, Engine, NetZone, Host, Disk, this_actor
+import sys
+import functools
+
+
+def estimate_bw(disk: Disk, n_flows: int, read: bool):
+    """ Calculates the bandwidth for disk doing async operations """
+    size = 100000
+    cur_time = Engine.get_clock()
+    activities = [disk.read_async(size) if read else disk.write_async(
+        size) for _ in range(n_flows)]
+
+    for act in activities:
+        act.wait()
+
+    elapsed_time = Engine.get_clock() - cur_time
+    estimated_bw = float(size * n_flows) / elapsed_time
+    this_actor.info("Disk: %s, concurrent %s: %d, estimated bandwidth: %f" % (
+        disk.name, "read" if read else "write", n_flows, estimated_bw))
+
+
+def host():
+    # Estimating bw for each disk and considering concurrent flows
+    for n in range(1, 15, 2):
+        for disk in Host.current().get_disks():
+            estimate_bw(disk, n, True)
+            estimate_bw(disk, n, False)
+
+
+def ssd_dynamic_sharing(disk: Disk, op: str, capacity: float, n: int) -> float:
+    """
+    Non-linear resource callback for SSD disks
+
+    In this case, we have measurements for some resource sharing and directly use them to return the
+    correct value
+    :param disk: Disk on which the operation is happening (defined by the user through the std::bind)
+    :param op: read or write operation (defined by the user through the std::bind)
+    :param capacity: Resource current capacity in SimGrid
+    :param n: Number of activities sharing this resource
+    """
+    # measurements for SSD disks
+    speed = {
+        "write": {1: 131.},
+        "read": {1: 152., 2: 161., 3: 184., 4: 197., 5: 207., 6: 215., 7: 220., 8: 224., 9: 227., 10: 231., 11: 233., 12: 235., 13: 237., 14: 238., 15: 239.}
+    }
+
+    # no special bandwidth for this disk sharing N flows, just returns maximal capacity
+    if (n in speed[op]):
+        capacity = speed[op][n]
+
+    return capacity
+
+
+def sata_dynamic_sharing(disk: Disk, capacity: float, n: int) -> float:
+    """
+    Non-linear resource callback for SATA disks
+
+    In this case, the degradation for read operations is linear and we have a formula that represents it.
+
+    :param disk: Disk on which the operation is happening (defined by the user through the std::bind)
+    :param capacity: Resource current capacity in SimGrid
+    :param n: Number of activities sharing this resource
+    :return: New disk capacity
+    """
+    return 68.3 - 1.7 * n
+
+
+def create_ssd_disk(host: Host, disk_name: str):
+    """ Creates an SSD disk, setting the appropriate callback for non-linear resource sharing """
+    disk = host.create_disk(disk_name, "240MBps", "170MBps")
+    disk.set_sharing_policy(Disk.Operation.READ, Disk.SharingPolicy.NONLINEAR,
+                            functools.partial(ssd_dynamic_sharing, disk, "read"))
+    disk.set_sharing_policy(Disk.Operation.WRITE, Disk.SharingPolicy.NONLINEAR,
+                            functools.partial(ssd_dynamic_sharing, disk, "write"))
+    disk.set_sharing_policy(Disk.Operation.READWRITE,
+                            Disk.SharingPolicy.LINEAR)
+
+
+def create_sata_disk(host: Host, disk_name: str):
+    """ Same for a SATA disk, only read operation follows a non-linear resource sharing """
+    disk = host.create_disk(disk_name, "68MBps", "50MBps")
+    disk.set_sharing_policy(Disk.Operation.READ, Disk.SharingPolicy.NONLINEAR,
+                            functools.partial(sata_dynamic_sharing, disk))
+    # this is the default behavior, expliciting only to make it clearer
+    disk.set_sharing_policy(Disk.Operation.WRITE, Disk.SharingPolicy.LINEAR)
+    disk.set_sharing_policy(Disk.Operation.READWRITE,
+                            Disk.SharingPolicy.LINEAR)
+
+
+if __name__ == '__main__':
+    e = Engine(sys.argv)
+    # simple platform containing 1 host and 2 disk
+    zone = NetZone.create_full_zone("bob_zone")
+    bob = zone.create_host("bob", 1e6)
+    create_ssd_disk(bob, "Edel (SSD)")
+    create_sata_disk(bob, "Griffon (SATA II)")
+    zone.seal()
+
+    Actor.create("", bob, host)
+
+    e.run()
+    this_actor.info("Simulated time: %g" % Engine.get_clock())
+
+    # explicitly deleting Engine object to avoid segfault during cleanup phase.
+    # During Engine destruction, the cleanup of std::function linked to non_linear callback is called.
+    # If we let the cleanup by itself, it fails trying on its destruction because the python main program
+    # has already freed its variables
+    del(e)
diff --git a/examples/python/io-degradation/io-degradation.tesh b/examples/python/io-degradation/io-degradation.tesh
new file mode 100644 (file)
index 0000000..c7feea2
--- /dev/null
@@ -0,0 +1,32 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/io-degradation.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [657.894737] (1:@bob) Disk: Edel (SSD), concurrent read: 1, estimated bandwidth: 152.000000
+> [1421.253515] (1:@bob) Disk: Edel (SSD), concurrent write: 1, estimated bandwidth: 131.000000
+> [2922.755017] (1:@bob) Disk: Griffon (SATA II), concurrent read: 1, estimated bandwidth: 66.600000
+> [2922.757017] (1:@bob) Disk: Griffon (SATA II), concurrent write: 1, estimated bandwidth: 50000000.001182
+> [4553.191800] (1:@bob) Disk: Edel (SSD), concurrent read: 3, estimated bandwidth: 184.000000
+> [4553.193564] (1:@bob) Disk: Edel (SSD), concurrent write: 3, estimated bandwidth: 170000000.022058
+> [9300.029007] (1:@bob) Disk: Griffon (SATA II), concurrent read: 3, estimated bandwidth: 63.200000
+> [9300.035007] (1:@bob) Disk: Griffon (SATA II), concurrent write: 3, estimated bandwidth: 50000000.004972
+> [11715.493945] (1:@bob) Disk: Edel (SSD), concurrent read: 5, estimated bandwidth: 207.000000
+> [11715.496886] (1:@bob) Disk: Edel (SSD), concurrent write: 5, estimated bandwidth: 170000000.039581
+> [20076.700899] (1:@bob) Disk: Griffon (SATA II), concurrent read: 5, estimated bandwidth: 59.800000
+> [20076.710899] (1:@bob) Disk: Griffon (SATA II), concurrent write: 5, estimated bandwidth: 50000000.008004
+> [23258.529081] (1:@bob) Disk: Edel (SSD), concurrent read: 7, estimated bandwidth: 220.000000
+> [23258.533199] (1:@bob) Disk: Edel (SSD), concurrent write: 7, estimated bandwidth: 170000000.009542
+> [35669.880716] (1:@bob) Disk: Griffon (SATA II), concurrent read: 7, estimated bandwidth: 56.400000
+> [35669.894716] (1:@bob) Disk: Griffon (SATA II), concurrent write: 7, estimated bandwidth: 49999999.989814
+> [39634.652426] (1:@bob) Disk: Edel (SSD), concurrent read: 9, estimated bandwidth: 227.000000
+> [39634.657720] (1:@bob) Disk: Edel (SSD), concurrent write: 9, estimated bandwidth: 169999999.992853
+> [56615.789795] (1:@bob) Disk: Griffon (SATA II), concurrent read: 9, estimated bandwidth: 53.000000
+> [56615.807795] (1:@bob) Disk: Griffon (SATA II), concurrent write: 9, estimated bandwidth: 50000000.010025
+> [61336.837838] (1:@bob) Disk: Edel (SSD), concurrent read: 11, estimated bandwidth: 233.000000
+> [61336.844309] (1:@bob) Disk: Edel (SSD), concurrent write: 11, estimated bandwidth: 170000000.077813
+> [83514.263663] (1:@bob) Disk: Griffon (SATA II), concurrent read: 11, estimated bandwidth: 49.600000
+> [83514.285663] (1:@bob) Disk: Griffon (SATA II), concurrent write: 11, estimated bandwidth: 50000000.006350
+> [88999.517731] (1:@bob) Disk: Edel (SSD), concurrent read: 13, estimated bandwidth: 237.000000
+> [88999.525378] (1:@bob) Disk: Edel (SSD), concurrent write: 13, estimated bandwidth: 169999999.974881
+> [117138.053517] (1:@bob) Disk: Griffon (SATA II), concurrent read: 13, estimated bandwidth: 46.200000
+> [117138.079517] (1:@bob) Disk: Griffon (SATA II), concurrent write: 13, estimated bandwidth: 50000000.003806
+> [117138.079517] (0:maestro@) Simulated time: 117138
index 6d4d372..037c2ff 100644 (file)
@@ -330,12 +330,25 @@ PYBIND11_MODULE(simgrid, m)
           "This is the max potential speed.");
 
   /* Class Disk */
-  py::class_<simgrid::s4u::Disk, std::unique_ptr<simgrid::s4u::Disk, py::nodelete>>(m, "Disk", "Simulated disk")
-      .def("read", &simgrid::s4u::Disk::read, py::call_guard<GilScopedRelease>(), "Read data from disk")
+  py::class_<simgrid::s4u::Disk, std::unique_ptr<simgrid::s4u::Disk, py::nodelete>> disk(m, "Disk", "Simulated disk");
+  disk.def("read", &simgrid::s4u::Disk::read, py::call_guard<GilScopedRelease>(), "Read data from disk")
       .def("write", &simgrid::s4u::Disk::write, py::call_guard<GilScopedRelease>(), "Write data in disk")
+      .def("read_async", &simgrid::s4u::Disk::read_async, "Non-blocking read data from disk")
+      .def("write_async", &simgrid::s4u::Disk::write_async, "Non-blocking write data in disk")
+      .def("set_sharing_policy", &simgrid::s4u::Disk::set_sharing_policy, "Set sharing policy for this disk",
+           py::arg("op"), py::arg("policy"), py::arg("cb") = simgrid::s4u::NonLinearResourceCb())
       .def("seal", &simgrid::s4u::Disk::seal, "Seal this disk")
       .def_property_readonly(
           "name", [](const simgrid::s4u::Disk* self) { return self->get_name(); }, "The name of this disk");
+  py::enum_<simgrid::s4u::Disk::SharingPolicy>(disk, "SharingPolicy")
+      .value("NONLINEAR", simgrid::s4u::Disk::SharingPolicy::NONLINEAR)
+      .value("LINEAR", simgrid::s4u::Disk::SharingPolicy::LINEAR)
+      .export_values();
+  py::enum_<simgrid::s4u::Disk::Operation>(disk, "Operation")
+      .value("READ", simgrid::s4u::Disk::Operation::READ)
+      .value("WRITE", simgrid::s4u::Disk::Operation::WRITE)
+      .value("READWRITE", simgrid::s4u::Disk::Operation::READWRITE)
+      .export_values();
 
   /* Class NetPoint */
   py::class_<simgrid::kernel::routing::NetPoint, std::unique_ptr<simgrid::kernel::routing::NetPoint, py::nodelete>>(
@@ -343,19 +356,19 @@ PYBIND11_MODULE(simgrid, m)
 
   /* Class Link */
   py::class_<simgrid::s4u::Link, std::unique_ptr<simgrid::s4u::Link, py::nodelete>> link(m, "Link", "Network link");
-  link.def("set_latency", py::overload_cast<const std::string&>(&simgrid::s4u::Link::set_latency), "Set the latency");
-  link.def("set_latency", py::overload_cast<double>(&simgrid::s4u::Link::set_latency), "Set the latency");
-  link.def("set_sharing_policy", &simgrid::s4u::Link::set_sharing_policy, "Set sharing policy for this link");
-  link.def("set_concurrency_limit", &simgrid::s4u::Link::set_concurrency_limit, "Set concurrency limit for this link");
-  link.def("set_host_wifi_rate", &simgrid::s4u::Link::set_host_wifi_rate,
-           "Set level of communication speed of given host on this Wi-Fi link");
-  link.def("seal", &simgrid::s4u::Link::seal, "Seal this link");
-  link.def_property_readonly(
-      "name",
-      [](const simgrid::s4u::Link* self) {
-        return std::string(self->get_name().c_str()); // Convert from xbt::string because of MC
-      },
-      "The name of this link");
+  link.def("set_latency", py::overload_cast<const std::string&>(&simgrid::s4u::Link::set_latency), "Set the latency")
+      .def("set_latency", py::overload_cast<double>(&simgrid::s4u::Link::set_latency), "Set the latency")
+      .def("set_sharing_policy", &simgrid::s4u::Link::set_sharing_policy, "Set sharing policy for this link")
+      .def("set_concurrency_limit", &simgrid::s4u::Link::set_concurrency_limit, "Set concurrency limit for this link")
+      .def("set_host_wifi_rate", &simgrid::s4u::Link::set_host_wifi_rate,
+           "Set level of communication speed of given host on this Wi-Fi link")
+      .def("seal", &simgrid::s4u::Link::seal, "Seal this link")
+      .def_property_readonly(
+          "name",
+          [](const simgrid::s4u::Link* self) {
+            return std::string(self->get_name().c_str()); // Convert from xbt::string because of MC
+          },
+          "The name of this link");
   py::enum_<simgrid::s4u::Link::SharingPolicy>(link, "SharingPolicy")
       .value("NONLINEAR", simgrid::s4u::Link::SharingPolicy::NONLINEAR)
       .value("WIFI", simgrid::s4u::Link::SharingPolicy::WIFI)
@@ -450,6 +463,17 @@ PYBIND11_MODULE(simgrid, m)
           py::call_guard<GilScopedRelease>(),
           "Block until the completion of any communication in the list and return the index of the terminated one.");
 
+  /* Class Io */
+  py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr>(m, "Io", "I/O activities")
+      .def("test", &simgrid::s4u::Io::test, py::call_guard<GilScopedRelease>(), "Test whether the I/O is terminated.")
+      .def("wait", &simgrid::s4u::Io::wait, py::call_guard<GilScopedRelease>(),
+           "Block until the completion of that I/O operation")
+      .def_static(
+          "wait_any_for", &simgrid::s4u::Io::wait_any_for, py::call_guard<GilScopedRelease>(),
+          "Block until the completion of any I/O in the list (or timeout) and return the index of the terminated one.")
+      .def_static("wait_any", &simgrid::s4u::Io::wait_any, py::call_guard<GilScopedRelease>(),
+                  "Block until the completion of any I/O in the list and return the index of the terminated one.");
+
   /* Class Exec */
   py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr>(m, "Exec", "Execution")
       .def_property_readonly(