--- /dev/null
+# Copyright (c) 2018-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+
+# This script does exactly the same thing as file s4u-exec-ptask.cpp
+
+import sys
+from simgrid import Actor, Engine, Host, this_actor, TimeoutException
+
+def runner():
+ hosts = Engine.instance.all_hosts
+ hosts_count = len(hosts)
+
+ # Test 1
+ this_actor.info("First, build a classical parallel activity, with 1 Gflop to execute on each node, "
+ "and 10MB to exchange between each pair")
+ computation_amounts = [1e9]*hosts_count
+ communication_amounts = [0]*hosts_count*hosts_count
+ for i in range(hosts_count):
+ for j in range(i+1, hosts_count):
+ communication_amounts[i * hosts_count + j] = 1e7
+ this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
+
+ # Test 2
+ this_actor.info("We can do the same with a timeout of 10 seconds enabled.")
+ activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+ try:
+ activity.wait_for(10.0)
+ sys.exit("Woops, this did not timeout as expected... Please report that bug.")
+ except TimeoutException:
+ this_actor.info("Caught the expected timeout exception.")
+ activity.cancel()
+
+ # Test 3
+ this_actor.info("Then, build a parallel activity involving only computations (of different amounts) and no communication")
+ computation_amounts = [3e8, 6e8, 1e9]
+ communication_amounts = []
+ this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
+
+ # Test 4
+ this_actor.info("Then, build a parallel activity with no computation nor communication (synchro only)")
+ computation_amounts = []
+ this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
+
+ # Test 5
+ this_actor.info("Then, Monitor the execution of a parallel activity")
+ computation_amounts = [1e6]*hosts_count
+ communication_amounts = [0, 1e6, 0, 0, 0, 1e6, 1e6, 0, 0]
+ activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+ activity.start()
+ while not activity.test():
+ ratio = activity.remaining_ratio * 100
+ this_actor.info(f"Remaining flop ratio: {ratio:.0f}%")
+ this_actor.sleep_for(5)
+ activity.wait()
+
+ # Test 6
+ this_actor.info("Finally, simulate a malleable task (a parallel execution that gets reconfigured after its start).")
+ this_actor.info(" - Start a regular parallel execution, with both comm and computation")
+ computation_amounts = [1e6]*hosts_count
+ communication_amounts = [0, 1e6, 0, 0, 1e6, 0, 1e6, 0, 0]
+ activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+ activity.start()
+ this_actor.sleep_for(10)
+ remaining_ratio = activity.remaining_ratio
+ this_actor.info(f" - After 10 seconds, {remaining_ratio*100:.2f}% remains to be done. Change it from 3 hosts to 2 hosts only.")
+ this_actor.info(" Let's first suspend the task.")
+ activity.suspend()
+ this_actor.info(" - Now, simulate the reconfiguration (modeled as a comm from the removed host to the remaining ones).")
+ rescheduling_comp = [0, 0, 0]
+ rescheduling_comm = [0, 0, 0, 0, 0, 0, 25000, 25000, 0]
+ this_actor.parallel_execute(hosts, rescheduling_comp, rescheduling_comm)
+ this_actor.info(" - Now, let's cancel the old task and create a new task with modified comm and computation vectors:")
+ this_actor.info(" What was already done is removed, and the load of the removed host is shared between remaining ones.")
+ for i in range(2):
+ # remove what we've done so far, for both comm and compute load
+ computation_amounts[i] *= remaining_ratio
+ communication_amounts[i] *= remaining_ratio
+ # The work from 1 must be shared between 2 remaining ones. 1/2=50% of extra work for each
+ computation_amounts[i] *= 1.5;
+ communication_amounts[i] *= 1.5;
+ hosts = hosts[:2]
+ computation_amounts = computation_amounts[:2]
+ remaining_comm = communication_amounts[1]
+ communication_amounts = [0, remaining_comm, remaining_comm, 0]
+ activity.cancel()
+ activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+ this_actor.info(" - Done, let's wait for the task completion")
+ activity.wait()
+ this_actor.info("Goodbye now!")
+
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ sys.exit(f"Syntax: {sys.argv[0]} <platform_file>")
+ platform = sys.argv[1]
+ engine = Engine.instance
+ Engine.set_config("host/model:ptask_L07") # /!\ this is required for running ptasks
+ engine.load_platform(platform)
+ Actor.create("foo", engine.host_by_name("MyHost1"), runner)
+ engine.run()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir}/exec-ptask.py ${platfdir}/energy_platform.xml
+> [0.000000] [xbt_cfg/INFO] Configuration change: Set 'contexts/factory' to 'thread'
+> [0.000000] [xbt_cfg/INFO] Configuration change: Set 'host/model' to 'ptask_L07'
+> [0.000000] [xbt_cfg/INFO] Switching to the L07 model to handle parallel tasks.
+> [MyHost1:foo:(1) 0.000000] [python/INFO] First, build a classical parallel activity, with 1 Gflop to execute on each node, and 10MB to exchange between each pair
+> [MyHost1:foo:(1) 300.000000] [python/INFO] We can do the same with a timeout of 10 seconds enabled.
+> [MyHost1:foo:(1) 310.000000] [python/INFO] Caught the expected timeout exception.
+> [MyHost1:foo:(1) 310.000000] [python/INFO] Then, build a parallel activity involving only computations (of different amounts) and no communication
+> [MyHost1:foo:(1) 320.000000] [python/INFO] Then, build a parallel activity with no computation nor communication (synchro only)
+> [MyHost1:foo:(1) 320.000000] [python/INFO] Then, Monitor the execution of a parallel activity
+> [MyHost1:foo:(1) 320.000000] [python/INFO] Remaining flop ratio: 100%
+> [MyHost1:foo:(1) 325.000000] [python/INFO] Remaining flop ratio: 83%
+> [MyHost1:foo:(1) 330.000000] [python/INFO] Remaining flop ratio: 67%
+> [MyHost1:foo:(1) 335.000000] [python/INFO] Remaining flop ratio: 50%
+> [MyHost1:foo:(1) 340.000000] [python/INFO] Remaining flop ratio: 33%
+> [MyHost1:foo:(1) 345.000000] [python/INFO] Remaining flop ratio: 17%
+> [MyHost1:foo:(1) 350.000000] [python/INFO] Finally, simulate a malleable task (a parallel execution that gets reconfigured after its start).
+> [MyHost1:foo:(1) 350.000000] [python/INFO] - Start a regular parallel execution, with both comm and computation
+> [MyHost1:foo:(1) 360.000000] [python/INFO] - After 10 seconds, 50.00% remains to be done. Change it from 3 hosts to 2 hosts only.
+> [MyHost1:foo:(1) 360.000000] [python/INFO] Let's first suspend the task.
+> [MyHost1:foo:(1) 360.000000] [python/INFO] - Now, simulate the reconfiguration (modeled as a comm from the removed host to the remaining ones).
+> [MyHost1:foo:(1) 360.500000] [python/INFO] - Now, let's cancel the old task and create a new task with modified comm and computation vectors:
+> [MyHost1:foo:(1) 360.500000] [python/INFO] What was already done is removed, and the load of the removed host is shared between remaining ones.
+> [MyHost1:foo:(1) 360.500000] [python/INFO] - Done, let's wait for the task completion
+> [MyHost1:foo:(1) 375.500000] [python/INFO] Goodbye now!
py::call_guard<py::gil_scoped_release>())
.def("exec_async", py::overload_cast<double>(&simgrid::s4u::this_actor::exec_async),
py::call_guard<py::gil_scoped_release>())
+ .def("parallel_execute", &simgrid::s4u::this_actor::parallel_execute,
+ py::call_guard<py::gil_scoped_release>(),
+ "Run a parallel task (requires the 'ptask_L07' model)")
+ .def("exec_init",
+ py::overload_cast<const std::vector<simgrid::s4u::Host*>&, const std::vector<double>&,
+ const std::vector<double>&> (&simgrid::s4u::this_actor::exec_init),
+ py::call_guard<py::gil_scoped_release>(),
+ "Initiate a parallel task (requires the 'ptask_L07' model)")
.def("get_host", &simgrid::s4u::this_actor::get_host, "Retrieves host on which the current actor is located")
.def("set_host", &simgrid::s4u::this_actor::set_host, py::call_guard<py::gil_scoped_release>(),
"Moves the current actor to another host.", py::arg("dest"))
"Retrieve the root netzone, containing all others.")
.def("netpoint_by_name", &Engine::netpoint_by_name_or_null)
.def("netzone_by_name", &Engine::netzone_by_name_or_null)
+ .def("set_config", py::overload_cast<const std::string&>(&Engine::set_config),
+ "Change one of SimGrid's configurations")
.def("load_platform", &Engine::load_platform, "Load a platform file describing the environment")
.def("load_deployment", &Engine::load_deployment, "Load a deployment file and launch the actors that it contains")
.def("mailbox_by_name_or_create", &Engine::mailbox_by_name_or_create,
"Test whether the execution is terminated.")
.def("cancel", &simgrid::s4u::Exec::cancel, py::call_guard<py::gil_scoped_release>(), "Cancel that execution.")
.def("start", &simgrid::s4u::Exec::start, py::call_guard<py::gil_scoped_release>(), "Start that execution.")
+ .def("suspend", &simgrid::s4u::Exec::suspend, py::call_guard<py::gil_scoped_release>(), "Suspend that execution.")
.def("wait", &simgrid::s4u::Exec::wait, py::call_guard<py::gil_scoped_release>(),
- "Block until the completion of that execution.");
+ "Block until the completion of that execution.")
+ .def("wait_for", &simgrid::s4u::Exec::wait_for, py::call_guard<py::gil_scoped_release>(),
+ py::arg("timeout"),
+ "Block until the completion of that activity, or raises TimeoutException after the specified timeout.");
/* Class Semaphore */
py::class_<Semaphore, SemaphorePtr>(m, "Semaphore",