Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add ptasks in the Python bindings
authorTom Cornebize <tom.cornebize@intel.com>
Wed, 29 Jun 2022 13:24:48 +0000 (15:24 +0200)
committerTom Cornebize <tom.cornebize@intel.com>
Wed, 29 Jun 2022 14:18:02 +0000 (16:18 +0200)
- This adds several functions in the Python bindings used for manipulating
  ptasks, such as parallel_execute or exec_init.
- This also adds the Engine.set_config function, needed for changing the
  configuration to 'host/model:ptask_L07' in the script.
- Finally, an exec-ptask.py example is added, exact translation of the
  C++ example s4u-exec-ptask.cpp.

ChangeLog
examples/python/exec-ptask/exec-ptask.py [new file with mode: 0644]
examples/python/exec-ptask/exec-ptask.tesh [new file with mode: 0644]
src/bindings/python/simgrid_python.cpp

index 6503665..3dc42c3 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -19,7 +19,13 @@ Python:
     - Engine:
       - Engine.host_by_name [example: examples/python/comm-host2host/]
       - Engine.mailbox_by_name_or_create [example: examples/python/comm-pingpong/]
+         - Engine.set_config
     - Mailbox: Mailbox.ready [example: examples/python/comm-ready/]
+       - Ptask [example: examples/python/exec-ptask/]:
+         - this_actor.exec_init
+         - this_actor.parallel_execute
+         - Exec.suspend
+         - Exec.wait_for
   - Added an AssertionError exception that may be thrown in case of error. For instance, creating tow hosts with the
     same name will now throw this exception instead of killing the interpreter.
 
diff --git a/examples/python/exec-ptask/exec-ptask.py b/examples/python/exec-ptask/exec-ptask.py
new file mode 100644 (file)
index 0000000..d5ddb6a
--- /dev/null
@@ -0,0 +1,103 @@
+# Copyright (c) 2018-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+
+# This script does exactly the same thing as file s4u-exec-ptask.cpp
+
+import sys
+from simgrid import Actor, Engine, Host, this_actor, TimeoutException
+
+def runner():
+    hosts = Engine.instance.all_hosts
+    hosts_count = len(hosts)
+
+    # Test 1
+    this_actor.info("First, build a classical parallel activity, with 1 Gflop to execute on each node, "
+               "and 10MB to exchange between each pair")
+    computation_amounts = [1e9]*hosts_count
+    communication_amounts = [0]*hosts_count*hosts_count
+    for i in range(hosts_count):
+        for j in range(i+1, hosts_count):
+            communication_amounts[i * hosts_count + j] = 1e7
+    this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
+
+    # Test 2
+    this_actor.info("We can do the same with a timeout of 10 seconds enabled.")
+    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+    try:
+        activity.wait_for(10.0)
+        sys.exit("Woops, this did not timeout as expected... Please report that bug.")
+    except TimeoutException:
+        this_actor.info("Caught the expected timeout exception.")
+        activity.cancel()
+
+    # Test 3
+    this_actor.info("Then, build a parallel activity involving only computations (of different amounts) and no communication")
+    computation_amounts = [3e8, 6e8, 1e9]
+    communication_amounts = []
+    this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
+
+    # Test 4
+    this_actor.info("Then, build a parallel activity with no computation nor communication (synchro only)")
+    computation_amounts = []
+    this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
+
+    # Test 5
+    this_actor.info("Then, Monitor the execution of a parallel activity")
+    computation_amounts = [1e6]*hosts_count
+    communication_amounts = [0, 1e6, 0, 0, 0, 1e6, 1e6, 0, 0]
+    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+    activity.start()
+    while not activity.test():
+        ratio = activity.remaining_ratio * 100
+        this_actor.info(f"Remaining flop ratio: {ratio:.0f}%")
+        this_actor.sleep_for(5)
+    activity.wait()
+
+    # Test 6
+    this_actor.info("Finally, simulate a malleable task (a parallel execution that gets reconfigured after its start).")
+    this_actor.info("  - Start a regular parallel execution, with both comm and computation")
+    computation_amounts = [1e6]*hosts_count
+    communication_amounts = [0, 1e6, 0, 0, 1e6, 0, 1e6, 0, 0]
+    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+    activity.start()
+    this_actor.sleep_for(10)
+    remaining_ratio = activity.remaining_ratio
+    this_actor.info(f"  - After 10 seconds, {remaining_ratio*100:.2f}% remains to be done. Change it from 3 hosts to 2 hosts only.")
+    this_actor.info("    Let's first suspend the task.")
+    activity.suspend()
+    this_actor.info("  - Now, simulate the reconfiguration (modeled as a comm from the removed host to the remaining ones).")
+    rescheduling_comp = [0, 0, 0]
+    rescheduling_comm = [0, 0, 0, 0, 0, 0, 25000, 25000, 0]
+    this_actor.parallel_execute(hosts, rescheduling_comp, rescheduling_comm)
+    this_actor.info("  - Now, let's cancel the old task and create a new task with modified comm and computation vectors:")
+    this_actor.info("    What was already done is removed, and the load of the removed host is shared between remaining ones.")
+    for i in range(2):
+        # remove what we've done so far, for both comm and compute load
+        computation_amounts[i]   *= remaining_ratio
+        communication_amounts[i] *= remaining_ratio
+        # The work from 1 must be shared between 2 remaining ones. 1/2=50% of extra work for each
+        computation_amounts[i]   *= 1.5;
+        communication_amounts[i] *= 1.5;
+    hosts = hosts[:2]
+    computation_amounts = computation_amounts[:2]
+    remaining_comm = communication_amounts[1]
+    communication_amounts = [0, remaining_comm, remaining_comm, 0]
+    activity.cancel()
+    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+    this_actor.info("  - Done, let's wait for the task completion")
+    activity.wait()
+    this_actor.info("Goodbye now!")
+
+
+if __name__ == "__main__":
+    if len(sys.argv) != 2:
+        sys.exit(f"Syntax: {sys.argv[0]} <platform_file>")
+    platform = sys.argv[1]
+    engine = Engine.instance
+    Engine.set_config("host/model:ptask_L07")  # /!\ this is required for running ptasks
+    engine.load_platform(platform)
+    Actor.create("foo", engine.host_by_name("MyHost1"), runner)
+    engine.run()
diff --git a/examples/python/exec-ptask/exec-ptask.tesh b/examples/python/exec-ptask/exec-ptask.tesh
new file mode 100644 (file)
index 0000000..0c1ce91
--- /dev/null
@@ -0,0 +1,27 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir}/exec-ptask.py ${platfdir}/energy_platform.xml
+> [0.000000] [xbt_cfg/INFO] Configuration change: Set 'contexts/factory' to 'thread'
+> [0.000000] [xbt_cfg/INFO] Configuration change: Set 'host/model' to 'ptask_L07'
+> [0.000000] [xbt_cfg/INFO] Switching to the L07 model to handle parallel tasks.
+> [MyHost1:foo:(1) 0.000000] [python/INFO] First, build a classical parallel activity, with 1 Gflop to execute on each node, and 10MB to exchange between each pair
+> [MyHost1:foo:(1) 300.000000] [python/INFO] We can do the same with a timeout of 10 seconds enabled.
+> [MyHost1:foo:(1) 310.000000] [python/INFO] Caught the expected timeout exception.
+> [MyHost1:foo:(1) 310.000000] [python/INFO] Then, build a parallel activity involving only computations (of different amounts) and no communication
+> [MyHost1:foo:(1) 320.000000] [python/INFO] Then, build a parallel activity with no computation nor communication (synchro only)
+> [MyHost1:foo:(1) 320.000000] [python/INFO] Then, Monitor the execution of a parallel activity
+> [MyHost1:foo:(1) 320.000000] [python/INFO] Remaining flop ratio: 100%
+> [MyHost1:foo:(1) 325.000000] [python/INFO] Remaining flop ratio: 83%
+> [MyHost1:foo:(1) 330.000000] [python/INFO] Remaining flop ratio: 67%
+> [MyHost1:foo:(1) 335.000000] [python/INFO] Remaining flop ratio: 50%
+> [MyHost1:foo:(1) 340.000000] [python/INFO] Remaining flop ratio: 33%
+> [MyHost1:foo:(1) 345.000000] [python/INFO] Remaining flop ratio: 17%
+> [MyHost1:foo:(1) 350.000000] [python/INFO] Finally, simulate a malleable task (a parallel execution that gets reconfigured after its start).
+> [MyHost1:foo:(1) 350.000000] [python/INFO]   - Start a regular parallel execution, with both comm and computation
+> [MyHost1:foo:(1) 360.000000] [python/INFO]   - After 10 seconds, 50.00% remains to be done. Change it from 3 hosts to 2 hosts only.
+> [MyHost1:foo:(1) 360.000000] [python/INFO]     Let's first suspend the task.
+> [MyHost1:foo:(1) 360.000000] [python/INFO]   - Now, simulate the reconfiguration (modeled as a comm from the removed host to the remaining ones).
+> [MyHost1:foo:(1) 360.500000] [python/INFO]   - Now, let's cancel the old task and create a new task with modified comm and computation vectors:
+> [MyHost1:foo:(1) 360.500000] [python/INFO]     What was already done is removed, and the load of the removed host is shared between remaining ones.
+> [MyHost1:foo:(1) 360.500000] [python/INFO]   - Done, let's wait for the task completion
+> [MyHost1:foo:(1) 375.500000] [python/INFO] Goodbye now!
index 256129e..e1c42ed 100644 (file)
@@ -123,6 +123,14 @@ PYBIND11_MODULE(simgrid, m)
            py::call_guard<py::gil_scoped_release>())
       .def("exec_async", py::overload_cast<double>(&simgrid::s4u::this_actor::exec_async),
            py::call_guard<py::gil_scoped_release>())
+      .def("parallel_execute", &simgrid::s4u::this_actor::parallel_execute,
+           py::call_guard<py::gil_scoped_release>(),
+           "Run a parallel task (requires the 'ptask_L07' model)")
+      .def("exec_init",
+           py::overload_cast<const std::vector<simgrid::s4u::Host*>&, const std::vector<double>&,
+           const std::vector<double>&>  (&simgrid::s4u::this_actor::exec_init),
+           py::call_guard<py::gil_scoped_release>(),
+           "Initiate a parallel task (requires the 'ptask_L07' model)")
       .def("get_host", &simgrid::s4u::this_actor::get_host, "Retrieves host on which the current actor is located")
       .def("set_host", &simgrid::s4u::this_actor::set_host, py::call_guard<py::gil_scoped_release>(),
            "Moves the current actor to another host.", py::arg("dest"))
@@ -219,6 +227,8 @@ PYBIND11_MODULE(simgrid, m)
                              "Retrieve the root netzone, containing all others.")
       .def("netpoint_by_name", &Engine::netpoint_by_name_or_null)
       .def("netzone_by_name", &Engine::netzone_by_name_or_null)
+      .def("set_config", py::overload_cast<const std::string&>(&Engine::set_config),
+           "Change one of SimGrid's configurations")
       .def("load_platform", &Engine::load_platform, "Load a platform file describing the environment")
       .def("load_deployment", &Engine::load_deployment, "Load a deployment file and launch the actors that it contains")
       .def("mailbox_by_name_or_create", &Engine::mailbox_by_name_or_create,
@@ -840,8 +850,12 @@ PYBIND11_MODULE(simgrid, m)
            "Test whether the execution is terminated.")
       .def("cancel", &simgrid::s4u::Exec::cancel, py::call_guard<py::gil_scoped_release>(), "Cancel that execution.")
       .def("start", &simgrid::s4u::Exec::start, py::call_guard<py::gil_scoped_release>(), "Start that execution.")
+      .def("suspend", &simgrid::s4u::Exec::suspend, py::call_guard<py::gil_scoped_release>(), "Suspend that execution.")
       .def("wait", &simgrid::s4u::Exec::wait, py::call_guard<py::gil_scoped_release>(),
-           "Block until the completion of that execution.");
+           "Block until the completion of that execution.")
+      .def("wait_for", &simgrid::s4u::Exec::wait_for, py::call_guard<py::gil_scoped_release>(),
+           py::arg("timeout"),
+           "Block until the completion of that activity, or raises TimeoutException after the specified timeout.");
 
   /* Class Semaphore */
   py::class_<Semaphore, SemaphorePtr>(m, "Semaphore",