From: Martin Quinson Date: Sat, 15 Jul 2023 09:12:17 +0000 (+0000) Subject: Merge branch 'fix-comm-signal' into 'master' X-Git-Tag: v3.35~146 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/6c16c61daeab4491da3abfba8e1b657308f8dfd4?hp=37e14b779a6605fe42a0e0caae0b3303783f8acd Merge branch 'fix-comm-signal' into 'master' update comm status BEFORE sending signals See merge request simgrid/simgrid!167 --- diff --git a/ChangeLog b/ChangeLog index 603816a154..6e5f715a92 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,8 @@ SimGrid (3.34.1) not released (Target: fall 2023) +S4U: + - New class ActivitySet to ease wait_any()/test_any()/wait_all() + Python: - Make the host_load plugin available from Python. See examples/python/plugin-host-load diff --git a/MANIFEST.in b/MANIFEST.in index f6767fe96c..3896bde51f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -120,10 +120,14 @@ include examples/c/plugin-host-load/plugin-host-load.c include examples/c/plugin-host-load/plugin-host-load.tesh include examples/c/synchro-semaphore/synchro-semaphore.c include examples/c/synchro-semaphore/synchro-semaphore.tesh -include examples/cpp/activity-testany/s4u-activity-testany.cpp -include examples/cpp/activity-testany/s4u-activity-testany.tesh -include examples/cpp/activity-waitany/s4u-activity-waitany.cpp -include examples/cpp/activity-waitany/s4u-activity-waitany.tesh +include examples/cpp/activityset-testany/s4u-activityset-testany.cpp +include examples/cpp/activityset-testany/s4u-activityset-testany.tesh +include examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp +include examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh +include examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.cpp +include examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh +include examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp +include examples/cpp/activityset-waitany/s4u-activityset-waitany.tesh include examples/cpp/actor-create/s4u-actor-create.cpp include examples/cpp/actor-create/s4u-actor-create.tesh include examples/cpp/actor-create/s4u-actor-create_d.xml @@ -190,16 +194,10 @@ include examples/cpp/comm-ready/s4u-comm-ready.cpp include examples/cpp/comm-ready/s4u-comm-ready.tesh include examples/cpp/comm-suspend/s4u-comm-suspend.cpp include examples/cpp/comm-suspend/s4u-comm-suspend.tesh -include examples/cpp/comm-testany/s4u-comm-testany.cpp -include examples/cpp/comm-testany/s4u-comm-testany.tesh include examples/cpp/comm-throttling/s4u-comm-throttling.cpp include examples/cpp/comm-throttling/s4u-comm-throttling.tesh include examples/cpp/comm-wait/s4u-comm-wait.cpp include examples/cpp/comm-wait/s4u-comm-wait.tesh -include examples/cpp/comm-waitall/s4u-comm-waitall.cpp -include examples/cpp/comm-waitall/s4u-comm-waitall.tesh -include examples/cpp/comm-waitany/s4u-comm-waitany.cpp -include examples/cpp/comm-waitany/s4u-comm-waitany.tesh include examples/cpp/comm-waituntil/s4u-comm-waituntil.cpp include examples/cpp/comm-waituntil/s4u-comm-waituntil.tesh include examples/cpp/dag-comm/s4u-dag-comm.cpp @@ -291,8 +289,6 @@ include examples/cpp/exec-threads/s4u-exec-threads.cpp include examples/cpp/exec-threads/s4u-exec-threads.tesh include examples/cpp/exec-unassigned/s4u-exec-unassigned.cpp include examples/cpp/exec-unassigned/s4u-exec-unassigned.tesh -include examples/cpp/exec-waitany/s4u-exec-waitany.cpp -include examples/cpp/exec-waitany/s4u-exec-waitany.tesh include examples/cpp/exec-waitfor/s4u-exec-waitfor.cpp include examples/cpp/exec-waitfor/s4u-exec-waitfor.tesh include examples/cpp/io-async/s4u-io-async.cpp @@ -1951,6 +1947,7 @@ include include/simgrid/plugins/ns3.hpp include include/simgrid/plugins/photovoltaic.hpp include include/simgrid/s4u.hpp include include/simgrid/s4u/Activity.hpp +include include/simgrid/s4u/ActivitySet.hpp include include/simgrid/s4u/Actor.hpp include include/simgrid/s4u/Barrier.hpp include include/simgrid/s4u/Comm.hpp @@ -2328,6 +2325,7 @@ include src/plugins/vm/VmLiveMigration.cpp include src/plugins/vm/VmLiveMigration.hpp include src/plugins/vm/dirty_page_tracking.cpp include src/s4u/s4u_Activity.cpp +include src/s4u/s4u_ActivitySet.cpp include src/s4u/s4u_Actor.cpp include src/s4u/s4u_Barrier.cpp include src/s4u/s4u_Comm.cpp diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 66187a8b84..e1e53908e1 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -153,19 +153,19 @@ endif() # Deal with each example -foreach (example activity-testany activity-waitany +foreach (example activityset-testany activityset-waitany activityset-waitall activityset-waitallfor actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize app-bittorrent app-chainsend app-token-ring battery-degradation battery-simple battery-energy - comm-pingpong comm-ready comm-suspend comm-testany comm-wait comm-waitany comm-waitall comm-waituntil + comm-pingpong comm-ready comm-suspend comm-wait comm-waituntil comm-dependent comm-host2host comm-failure comm-throttling cloud-capping cloud-migration cloud-simple dag-comm dag-from-json-simple dag-from-dax-simple dag-from-dax dag-from-dot-simple dag-from-dot dag-failure dag-io dag-scheduling dag-simple dag-tuto dht-chord dht-kademlia energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi engine-filtering engine-run-partial - exec-async exec-basic exec-dvfs exec-remote exec-waitany exec-waitfor exec-dependent exec-unassigned + exec-async exec-basic exec-dvfs exec-remote exec-waitfor exec-dependent exec-unassigned exec-ptask-multicore exec-ptask-multicore-latency exec-cpu-nonlinear exec-cpu-factors exec-failure exec-threads maestro-set mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert diff --git a/examples/cpp/activity-testany/s4u-activity-testany.cpp b/examples/cpp/activityset-testany/s4u-activityset-testany.cpp similarity index 68% rename from examples/cpp/activity-testany/s4u-activity-testany.cpp rename to examples/cpp/activityset-testany/s4u-activityset-testany.cpp index 55b273ffd6..8128c24755 100644 --- a/examples/cpp/activity-testany/s4u-activity-testany.cpp +++ b/examples/cpp/activityset-testany/s4u-activityset-testany.cpp @@ -22,26 +22,25 @@ static void bob() auto comm = mbox->get_async(&payload); auto io = disk->read_async(3e8); - std::vector pending_activities = {boost::dynamic_pointer_cast(exec), - boost::dynamic_pointer_cast(comm), - boost::dynamic_pointer_cast(io)}; + sg4::ActivitySet pending_activities; + pending_activities.push(exec); + pending_activities.push(comm); + pending_activities.push(io); XBT_INFO("Sleep_for a while"); sg4::this_actor::sleep_for(1); XBT_INFO("Test for completed activities"); while (not pending_activities.empty()) { - ssize_t changed_pos = sg4::Activity::test_any(pending_activities); - if (changed_pos != -1) { - auto* completed_one = pending_activities[changed_pos].get(); - if (dynamic_cast(completed_one)) + auto completed_one = pending_activities.test_any(); + if (completed_one != nullptr) { + if (boost::dynamic_pointer_cast(completed_one)) XBT_INFO("Completed a Comm"); - if (dynamic_cast(completed_one)) + if (boost::dynamic_pointer_cast(completed_one)) XBT_INFO("Completed an Exec"); - if (dynamic_cast(completed_one)) + if (boost::dynamic_pointer_cast(completed_one)) XBT_INFO("Completed an I/O"); - pending_activities.erase(pending_activities.begin() + changed_pos); - } else { // nothing matches, wait for a little bit + } else { XBT_INFO("Nothing matches, test again in 0.5s"); sg4::this_actor::sleep_for(.5); } diff --git a/examples/cpp/activity-testany/s4u-activity-testany.tesh b/examples/cpp/activityset-testany/s4u-activityset-testany.tesh similarity index 88% rename from examples/cpp/activity-testany/s4u-activity-testany.tesh rename to examples/cpp/activityset-testany/s4u-activityset-testany.tesh index ab4d8208b0..c590b36890 100644 --- a/examples/cpp/activity-testany/s4u-activity-testany.tesh +++ b/examples/cpp/activityset-testany/s4u-activityset-testany.tesh @@ -1,6 +1,6 @@ #!/usr/bin/env tesh -$ ${bindir:=.}/s4u-activity-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n" +$ ${bindir:=.}/s4u-activityset-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n" > [0.00] [alice] Send 'Message' > [0.00] [ bob] Create my asynchronous activities > [0.00] [ bob] Sleep_for a while diff --git a/examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp b/examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp new file mode 100644 index 0000000000..176e50fcd6 --- /dev/null +++ b/examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp @@ -0,0 +1,55 @@ +/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "simgrid/s4u.hpp" +#include +#include +#include +namespace sg4 = simgrid::s4u; + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example"); + +static void bob() +{ + sg4::Mailbox* mbox = sg4::Mailbox::by_name("mbox"); + const sg4::Disk* disk = sg4::Host::current()->get_disks().front(); + std::string* payload; + + XBT_INFO("Create my asynchronous activities"); + auto exec = sg4::this_actor::exec_async(5e9); + auto comm = mbox->get_async(&payload); + auto io = disk->read_async(3e8); + + sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast(exec), + boost::dynamic_pointer_cast(comm), + boost::dynamic_pointer_cast(io)}); + + XBT_INFO("Wait for asynchrounous activities to complete, all in one shot."); + pending_activities.wait_all(); + + XBT_INFO("All activities are completed."); + delete payload; +} + +static void alice() +{ + auto* payload = new std::string("Message"); + XBT_INFO("Send '%s'", payload->c_str()); + sg4::Mailbox::by_name("mbox")->put(payload, 6e8); +} + +int main(int argc, char* argv[]) +{ + sg4::Engine e(&argc, argv); + + e.load_platform(argv[1]); + + sg4::Actor::create("bob", e.host_by_name("bob"), bob); + sg4::Actor::create("alice", e.host_by_name("alice"), alice); + + e.run(); + + return 0; +} diff --git a/examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh b/examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh new file mode 100644 index 0000000000..6f0142951d --- /dev/null +++ b/examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh @@ -0,0 +1,7 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-activityset-waitall ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n" +> [0.000000] [alice] Send 'Message' +> [0.000000] [ bob] Create my asynchronous activities +> [0.000000] [ bob] Wait for asynchrounous activities to complete, all in one shot. +> [5.197828] [ bob] All activities are completed. diff --git a/examples/cpp/activity-waitany/s4u-activity-waitany.cpp b/examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.cpp similarity index 67% rename from examples/cpp/activity-waitany/s4u-activity-waitany.cpp rename to examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.cpp index c19c2e6d60..b1d405d1ef 100644 --- a/examples/cpp/activity-waitany/s4u-activity-waitany.cpp +++ b/examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.cpp @@ -22,22 +22,22 @@ static void bob() auto comm = mbox->get_async(&payload); auto io = disk->read_async(3e8); - std::vector pending_activities = {boost::dynamic_pointer_cast(exec), - boost::dynamic_pointer_cast(comm), - boost::dynamic_pointer_cast(io)}; + sg4::ActivitySet pending_activities({exec, comm, io}); XBT_INFO("Wait for asynchrounous activities to complete"); while (not pending_activities.empty()) { - ssize_t changed_pos = sg4::Activity::wait_any(pending_activities); - if (changed_pos != -1) { - auto* completed_one = pending_activities[changed_pos].get(); - if (dynamic_cast(completed_one)) + try { + pending_activities.wait_all_for(1); + } catch (simgrid::TimeoutException& e) { + XBT_INFO("Not all activities are terminated yet."); + } + while (auto completed_one = pending_activities.test_any()) { + if (boost::dynamic_pointer_cast(completed_one)) XBT_INFO("Completed a Comm"); - if (dynamic_cast(completed_one)) + if (boost::dynamic_pointer_cast(completed_one)) XBT_INFO("Completed an Exec"); - if (dynamic_cast(completed_one)) + if (boost::dynamic_pointer_cast(completed_one)) XBT_INFO("Completed an I/O"); - pending_activities.erase(pending_activities.begin() + changed_pos); } } XBT_INFO("Last activity is complete"); diff --git a/examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh b/examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh new file mode 100644 index 0000000000..d0c1016965 --- /dev/null +++ b/examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh @@ -0,0 +1,15 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n" +> [0.000000] [alice] Send 'Message' +> [0.000000] [ bob] Create my asynchronous activities +> [0.000000] [ bob] Wait for asynchrounous activities to complete +> [1.000000] [ bob] Not all activities are terminated yet. +> [2.000000] [ bob] Not all activities are terminated yet. +> [3.000000] [ bob] Not all activities are terminated yet. +> [3.000000] [ bob] Completed an I/O +> [4.000000] [ bob] Not all activities are terminated yet. +> [5.000000] [ bob] Not all activities are terminated yet. +> [5.000000] [ bob] Completed an Exec +> [5.197828] [ bob] Completed a Comm +> [5.197828] [ bob] Last activity is complete diff --git a/examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp b/examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp new file mode 100644 index 0000000000..1878a13043 --- /dev/null +++ b/examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp @@ -0,0 +1,64 @@ +/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "simgrid/s4u.hpp" +#include +#include +#include +namespace sg4 = simgrid::s4u; + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example"); + +static void bob() +{ + sg4::Mailbox* mbox = sg4::Mailbox::by_name("mbox"); + const sg4::Disk* disk = sg4::Host::current()->get_disks().front(); + std::string* payload; + + XBT_INFO("Create my asynchronous activities"); + auto exec = sg4::this_actor::exec_async(5e9); + auto comm = mbox->get_async(&payload); + auto io = disk->read_async(3e8); + + sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast(exec), + boost::dynamic_pointer_cast(comm), + boost::dynamic_pointer_cast(io)}); + + XBT_INFO("Wait for asynchrounous activities to complete"); + while (not pending_activities.empty()) { + auto completed_one = pending_activities.wait_any(); + if (completed_one != nullptr) { + if (boost::dynamic_pointer_cast(completed_one)) + XBT_INFO("Completed a Comm"); + if (boost::dynamic_pointer_cast(completed_one)) + XBT_INFO("Completed an Exec"); + if (boost::dynamic_pointer_cast(completed_one)) + XBT_INFO("Completed an I/O"); + } + } + XBT_INFO("Last activity is complete"); + delete payload; +} + +static void alice() +{ + auto* payload = new std::string("Message"); + XBT_INFO("Send '%s'", payload->c_str()); + sg4::Mailbox::by_name("mbox")->put(payload, 6e8); +} + +int main(int argc, char* argv[]) +{ + sg4::Engine e(&argc, argv); + + e.load_platform(argv[1]); + + sg4::Actor::create("bob", e.host_by_name("bob"), bob); + sg4::Actor::create("alice", e.host_by_name("alice"), alice); + + e.run(); + + return 0; +} diff --git a/examples/cpp/activity-waitany/s4u-activity-waitany.tesh b/examples/cpp/activityset-waitany/s4u-activityset-waitany.tesh similarity index 75% rename from examples/cpp/activity-waitany/s4u-activity-waitany.tesh rename to examples/cpp/activityset-waitany/s4u-activityset-waitany.tesh index 7b15aa64cd..bc33cc46f1 100644 --- a/examples/cpp/activity-waitany/s4u-activity-waitany.tesh +++ b/examples/cpp/activityset-waitany/s4u-activityset-waitany.tesh @@ -1,6 +1,6 @@ #!/usr/bin/env tesh -$ ${bindir:=.}/s4u-activity-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n" +$ ${bindir:=.}/s4u-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n" > [0.000000] [alice] Send 'Message' > [0.000000] [ bob] Create my asynchronous activities > [0.000000] [ bob] Wait for asynchrounous activities to complete diff --git a/examples/cpp/comm-testany/s4u-comm-testany.cpp b/examples/cpp/comm-testany/s4u-comm-testany.cpp deleted file mode 100644 index f06cb1621b..0000000000 --- a/examples/cpp/comm-testany/s4u-comm-testany.cpp +++ /dev/null @@ -1,73 +0,0 @@ -/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#include "simgrid/s4u.hpp" -#include -#include -#include -namespace sg4 = simgrid::s4u; - -XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_testany, "Messages specific for this s4u example"); - -static void rank0() -{ - sg4::Mailbox* mbox = sg4::Mailbox::by_name("rank0"); - std::string* msg1; - std::string* msg2; - std::string* msg3; - - XBT_INFO("Post my asynchronous receives"); - auto comm1 = mbox->get_async(&msg1); - auto comm2 = mbox->get_async(&msg2); - auto comm3 = mbox->get_async(&msg3); - std::vector pending_comms = {comm1, comm2, comm3}; - - XBT_INFO("Send some data to rank-1"); - for (int i = 0; i < 3; i++) - sg4::Mailbox::by_name("rank1")->put(new int(i), 1); - - XBT_INFO("Test for completed comms"); - while (not pending_comms.empty()) { - ssize_t flag = sg4::Comm::test_any(pending_comms); - if (flag != -1) { - pending_comms.erase(pending_comms.begin() + flag); - XBT_INFO("Remove a pending comm."); - } else // nothing matches, wait for a little bit - sg4::this_actor::sleep_for(0.1); - } - XBT_INFO("Last comm is complete"); - delete msg1; - delete msg2; - delete msg3; -} - -static void rank1() -{ - sg4::Mailbox* rank0_mbox = sg4::Mailbox::by_name("rank0"); - sg4::Mailbox* rank1_mbox = sg4::Mailbox::by_name("rank1"); - - for (int i = 0; i < 3; i++) { - auto res = rank1_mbox->get_unique(); - XBT_INFO("Received %d", *res); - std::string msg_content = "Message " + std::to_string(i); - auto* payload = new std::string(msg_content); - XBT_INFO("Send '%s'", msg_content.c_str()); - rank0_mbox->put(payload, 1e6); - } -} - -int main(int argc, char* argv[]) -{ - sg4::Engine e(&argc, argv); - - e.load_platform(argv[1]); - - sg4::Actor::create("rank-0", e.host_by_name("Tremblay"), rank0); - sg4::Actor::create("rank-1", e.host_by_name("Fafard"), rank1); - - e.run(); - - return 0; -} diff --git a/examples/cpp/comm-testany/s4u-comm-testany.tesh b/examples/cpp/comm-testany/s4u-comm-testany.tesh deleted file mode 100644 index 0f19916447..0000000000 --- a/examples/cpp/comm-testany/s4u-comm-testany.tesh +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env tesh - -$ ${bindir:=.}/s4u-comm-testany ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e[%8h]%e[%a]%e%m%n" -> [ 0.000000] [Tremblay] [rank-0] Post my asynchronous receives -> [ 0.000000] [Tremblay] [rank-0] Send some data to rank-1 -> [ 0.025708] [ Fafard] [rank-1] Received 0 -> [ 0.025708] [ Fafard] [rank-1] Send 'Message 0' -> [ 0.209813] [ Fafard] [rank-1] Received 1 -> [ 0.209813] [ Fafard] [rank-1] Send 'Message 1' -> [ 0.393918] [Tremblay] [rank-0] Test for completed comms -> [ 0.393918] [ Fafard] [rank-1] Received 2 -> [ 0.393918] [ Fafard] [rank-1] Send 'Message 2' -> [ 0.393918] [Tremblay] [rank-0] Remove a pending comm. -> [ 0.393918] [Tremblay] [rank-0] Remove a pending comm. -> [ 0.593918] [Tremblay] [rank-0] Remove a pending comm. -> [ 0.593918] [Tremblay] [rank-0] Last comm is complete diff --git a/examples/cpp/comm-waitall/s4u-comm-waitall.cpp b/examples/cpp/comm-waitall/s4u-comm-waitall.cpp deleted file mode 100644 index 5c3f863626..0000000000 --- a/examples/cpp/comm-waitall/s4u-comm-waitall.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -/* This example shows how to block on the completion of a set of communications. - * - * As for the other asynchronous examples, the sender initiate all the messages it wants to send and - * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently. - * - * The sender then blocks until all ongoing communication terminate, using simgrid::s4u::Comm::wait_all() - * - */ - -#include "simgrid/s4u.hpp" -#include -#include -#include -namespace sg4 = simgrid::s4u; - -XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_async_waitall, "Messages specific for this s4u example"); - -static void sender(unsigned int messages_count, unsigned int receivers_count, long msg_size) -{ - if (messages_count == 0 || receivers_count == 0) { - XBT_WARN("Sender has nothing to do. Bail out!"); - return; - } - // sphinx-doc: init-begin (this line helps the doc to build; ignore it) - /* Vector in which we store all ongoing communications */ - std::vector pending_comms; - - /* Make a vector of the mailboxes to use */ - std::vector mboxes; - for (unsigned int i = 0; i < receivers_count; i++) - mboxes.push_back(sg4::Mailbox::by_name("receiver-" + std::to_string(i))); - // sphinx-doc: init-end - - /* Start dispatching all messages to receivers, in a round robin fashion */ - for (unsigned int i = 0; i < messages_count; i++) { - std::string msg_content = "Message " + std::to_string(i); - // Copy the data we send: the 'msg_content' variable is not a stable storage location. - // It will be destroyed when this actor leaves the loop, ie before the receiver gets it - auto* payload = new std::string(msg_content); - - XBT_INFO("Send '%s' to '%s'", msg_content.c_str(), mboxes[i % receivers_count]->get_cname()); - - /* Create a communication representing the ongoing communication, and store it in pending_comms */ - sg4::CommPtr comm = mboxes[i % receivers_count]->put_async(payload, msg_size); - pending_comms.push_back(comm); - } - - /* Start sending messages to let the workers know that they should stop */ // sphinx-doc: put-begin - for (unsigned int i = 0; i < receivers_count; i++) { - XBT_INFO("Send 'finalize' to 'receiver-%u'", i); - sg4::CommPtr comm = mboxes[i]->put_async(new std::string("finalize"), 0); - pending_comms.push_back(comm); - } - XBT_INFO("Done dispatching all messages"); - - /* Now that all message exchanges were initiated, wait for their completion in one single call */ - sg4::Comm::wait_all(pending_comms); - // sphinx-doc: put-end - - XBT_INFO("Goodbye now!"); -} - -/* Receiver actor expects 1 argument: its ID */ -static void receiver(int id) -{ - sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver-" + std::to_string(id)); - XBT_INFO("Wait for my first message"); - for (bool cont = true; cont;) { - auto received = mbox->get_unique(); - XBT_INFO("I got a '%s'.", received->c_str()); - cont = (*received != "finalize"); // If it's a finalize message, we're done - // Receiving the message was all we were supposed to do - } -} - -int main(int argc, char* argv[]) -{ - sg4::Engine e(&argc, argv); - - e.load_platform(argv[1]); - - sg4::Actor::create("sender", e.host_by_name("Tremblay"), sender, 5, 2, 1e6); - sg4::Actor::create("receiver", e.host_by_name("Ruby"), receiver, 0); - sg4::Actor::create("receiver", e.host_by_name("Perl"), receiver, 1); - - e.run(); - - return 0; -} diff --git a/examples/cpp/comm-waitall/s4u-comm-waitall.tesh b/examples/cpp/comm-waitall/s4u-comm-waitall.tesh deleted file mode 100644 index cdf73659e3..0000000000 --- a/examples/cpp/comm-waitall/s4u-comm-waitall.tesh +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env tesh - -$ ${bindir:=.}/s4u-comm-waitall ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" -> [ 0.000000] (2:receiver@Ruby) Wait for my first message -> [ 0.000000] (3:receiver@Perl) Wait for my first message -> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0' -> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1' -> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0' -> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1' -> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0' -> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0' -> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1' -> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages -> [ 0.004022] (2:receiver@Ruby) I got a 'Message 0'. -> [ 0.004022] (3:receiver@Perl) I got a 'Message 1'. -> [ 0.008043] (2:receiver@Ruby) I got a 'Message 2'. -> [ 0.008043] (3:receiver@Perl) I got a 'Message 3'. -> [ 0.009995] (3:receiver@Perl) I got a 'finalize'. -> [ 0.012065] (2:receiver@Ruby) I got a 'Message 4'. -> [ 0.014016] (2:receiver@Ruby) I got a 'finalize'. -> [ 0.014016] (1:sender@Tremblay) Goodbye now! diff --git a/examples/cpp/comm-waitany/s4u-comm-waitany.cpp b/examples/cpp/comm-waitany/s4u-comm-waitany.cpp deleted file mode 100644 index 802a4fc253..0000000000 --- a/examples/cpp/comm-waitany/s4u-comm-waitany.cpp +++ /dev/null @@ -1,107 +0,0 @@ -/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -/* This example shows how to use simgrid::s4u::this_actor::wait_any() to wait for the first occurring event. - * - * As for the other asynchronous examples, the sender initiate all the messages it wants to send and - * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently. - * - * The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender - * will notice events as soon as they occur even if it does not follow the order of the container. - * - * Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the - * other messages of this application. As expected, the trace shows that the finalize of worker 1 is - * processed before 'Message 5' that is sent to worker 0. - * - */ - -#include "simgrid/s4u.hpp" -#include -#include -#include -namespace sg4 = simgrid::s4u; - -XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_waitall, "Messages specific for this s4u example"); - -static void sender(unsigned int messages_count, unsigned int receivers_count, long msg_size) -{ - if (messages_count == 0 || receivers_count == 0) { - XBT_WARN("Sender has nothing to do. Bail out!"); - return; - } - /* Vector in which we store all ongoing communications */ - std::vector pending_comms; - - /* Make a vector of the mailboxes to use */ - std::vector mboxes; - for (unsigned int i = 0; i < receivers_count; i++) - mboxes.push_back(sg4::Mailbox::by_name("receiver-" + std::to_string(i))); - - /* Start dispatching all messages to receivers, in a round robin fashion */ - for (unsigned int i = 0; i < messages_count; i++) { - std::string msg_content = "Message " + std::to_string(i); - // Copy the data we send: the 'msg_content' variable is not a stable storage location. - // It will be destroyed when this actor leaves the loop, ie before the receiver gets it - auto* payload = new std::string(msg_content); - - XBT_INFO("Send '%s' to '%s'", msg_content.c_str(), mboxes[i % receivers_count]->get_cname()); - - /* Create a communication representing the ongoing communication, and store it in pending_comms */ - sg4::CommPtr comm = mboxes[i % receivers_count]->put_async(payload, msg_size); - pending_comms.push_back(comm); - } - - /* Start sending messages to let the workers know that they should stop */ - for (unsigned int i = 0; i < receivers_count; i++) { - XBT_INFO("Send 'finalize' to 'receiver-%u'", i); - sg4::CommPtr comm = mboxes[i]->put_async(new std::string("finalize"), 0); - pending_comms.push_back(comm); - } - XBT_INFO("Done dispatching all messages"); - - /* Now that all message exchanges were initiated, wait for their completion, in order of termination. - * - * This loop waits for first terminating message with wait_any() and remove it with erase(), until all comms are - * terminated - * Even in this simple example, the pending comms do not terminate in the exact same order of creation. - */ - while (not pending_comms.empty()) { - ssize_t changed_pos = sg4::Comm::wait_any(pending_comms); - pending_comms.erase(pending_comms.begin() + changed_pos); - if (changed_pos != 0) - XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.", - changed_pos); - } - - XBT_INFO("Goodbye now!"); -} - -/* Receiver actor expects 1 argument: its ID */ -static void receiver(int id) -{ - sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver-" + std::to_string(id)); - XBT_INFO("Wait for my first message"); - for (bool cont = true; cont;) { - auto received = mbox->get_unique(); - XBT_INFO("I got a '%s'.", received->c_str()); - cont = (*received != "finalize"); // If it's a finalize message, we're done - // Receiving the message was all we were supposed to do - } -} - -int main(int argc, char* argv[]) -{ - sg4::Engine e(&argc, argv); - - e.load_platform(argv[1]); - - sg4::Actor::create("sender", e.host_by_name("Tremblay"), sender, 6, 2, 1e6); - sg4::Actor::create("receiver", e.host_by_name("Fafard"), receiver, 0); - sg4::Actor::create("receiver", e.host_by_name("Jupiter"), receiver, 1); - - e.run(); - - return 0; -} diff --git a/examples/cpp/comm-waitany/s4u-comm-waitany.tesh b/examples/cpp/comm-waitany/s4u-comm-waitany.tesh deleted file mode 100644 index 5fe9dc925e..0000000000 --- a/examples/cpp/comm-waitany/s4u-comm-waitany.tesh +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env tesh - -p Testing this_actor->wait_any() - -! output sort 19 -$ ${bindir:=.}/s4u-comm-waitany ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" -> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0' -> [ 0.000000] (2:receiver@Fafard) Wait for my first message -> [ 0.000000] (3:receiver@Jupiter) Wait for my first message -> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1' -> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0' -> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1' -> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0' -> [ 0.000000] (1:sender@Tremblay) Send 'Message 5' to 'receiver-1' -> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0' -> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1' -> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages -> [ 0.158397] (2:receiver@Fafard) I got a 'Message 0'. -> [ 0.169155] (3:receiver@Jupiter) I got a 'Message 1'. -> [ 0.316794] (2:receiver@Fafard) I got a 'Message 2'. -> [ 0.338309] (3:receiver@Jupiter) I got a 'Message 3'. -> [ 0.475190] (2:receiver@Fafard) I got a 'Message 4'. -> [ 0.500898] (2:receiver@Fafard) I got a 'finalize'. -> [ 0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first. -> [ 0.507464] (3:receiver@Jupiter) I got a 'Message 5'. -> [ 0.526478] (3:receiver@Jupiter) I got a 'finalize'. -> [ 0.526478] (1:sender@Tremblay) Goodbye now! diff --git a/examples/cpp/exec-waitany/s4u-exec-waitany.cpp b/examples/cpp/exec-waitany/s4u-exec-waitany.cpp deleted file mode 100644 index c30dd319a9..0000000000 --- a/examples/cpp/exec-waitany/s4u-exec-waitany.cpp +++ /dev/null @@ -1,63 +0,0 @@ -/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#include "simgrid/s4u.hpp" -#include -#include -#include - -XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_waitany, "Messages specific for this s4u example"); -namespace sg4 = simgrid::s4u; - -static void worker(bool with_timeout) -{ - /* Vector in which we store all pending executions*/ - std::vector pending_executions; - - for (int i = 0; i < 3; i++) { - std::string name = "Exec-" + std::to_string(i); - double amount = (6 * (i % 2) + i + 1) * sg4::this_actor::get_host()->get_speed(); - - sg4::ExecPtr exec = sg4::this_actor::exec_init(amount)->set_name(name); - pending_executions.push_back(exec); - exec->start(); - - XBT_INFO("Activity %s has started for %.0f seconds", name.c_str(), - amount / sg4::this_actor::get_host()->get_speed()); - } - - /* Now that executions were initiated, wait for their completion, in order of termination. - * - * This loop waits for first terminating execution with wait_any() and remove it with erase(), until all execs are - * terminated. - */ - while (not pending_executions.empty()) { - ssize_t pos; - if (with_timeout) - pos = sg4::Exec::wait_any_for(pending_executions, 4); - else - pos = sg4::Exec::wait_any(pending_executions); - - if (pos < 0) { - XBT_INFO("Do not wait any longer for an activity"); - pending_executions.clear(); - } else { - XBT_INFO("Activity '%s' (at position %zd) is complete", pending_executions[pos]->get_cname(), pos); - pending_executions.erase(pending_executions.begin() + pos); - } - XBT_INFO("%zu activities remain pending", pending_executions.size()); - } -} - -int main(int argc, char* argv[]) -{ - sg4::Engine e(&argc, argv); - e.load_platform(argv[1]); - sg4::Actor::create("worker", e.host_by_name("Tremblay"), worker, false); - sg4::Actor::create("worker_timeout", e.host_by_name("Tremblay"), worker, true); - e.run(); - - return 0; -} diff --git a/examples/cpp/exec-waitany/s4u-exec-waitany.tesh b/examples/cpp/exec-waitany/s4u-exec-waitany.tesh deleted file mode 100644 index 072cab5269..0000000000 --- a/examples/cpp/exec-waitany/s4u-exec-waitany.tesh +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env tesh - -! output sort 19 -$ ${bindir:=.}/s4u-exec-waitany ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n" -> [ 0.000000] [ worker] Activity Exec-0 has started for 1 seconds -> [ 0.000000] [worker_timeout] Activity Exec-0 has started for 1 seconds -> [ 0.000000] [ worker] Activity Exec-1 has started for 8 seconds -> [ 0.000000] [worker_timeout] Activity Exec-1 has started for 8 seconds -> [ 0.000000] [ worker] Activity Exec-2 has started for 3 seconds -> [ 0.000000] [worker_timeout] Activity Exec-2 has started for 3 seconds -> [ 1.000000] [worker_timeout] Activity 'Exec-0' (at position 0) is complete -> [ 1.000000] [worker_timeout] 2 activities remain pending -> [ 1.000000] [ worker] Activity 'Exec-0' (at position 0) is complete -> [ 1.000000] [ worker] 2 activities remain pending -> [ 3.000000] [worker_timeout] Activity 'Exec-2' (at position 1) is complete -> [ 3.000000] [worker_timeout] 1 activities remain pending -> [ 3.000000] [ worker] Activity 'Exec-2' (at position 1) is complete -> [ 3.000000] [ worker] 1 activities remain pending -> [ 7.000000] [worker_timeout] Do not wait any longer for an activity -> [ 7.000000] [worker_timeout] 0 activities remain pending -> [ 8.000000] [ worker] Activity 'Exec-1' (at position 0) is complete -> [ 8.000000] [ worker] 0 activities remain pending diff --git a/examples/python/plugin-host-load/plugin-host-load.py b/examples/python/plugin-host-load/plugin-host-load.py index 9fce74332a..71d3ac8f4c 100644 --- a/examples/python/plugin-host-load/plugin-host-load.py +++ b/examples/python/plugin-host-load/plugin-host-load.py @@ -48,7 +48,7 @@ far: {host.computed_flops:.0E}, average load as reported by the HostLoad plugin: this_actor.info(f'Run an activity of {100E6:.0E} flops') this_actor.execute(100E6) this_actor.info(f'Done working on my activity; this took {Engine.clock - start}s; current peak speed: {host.speed:.0E} flop/s; number of flops computed so far: {host.computed_flops:.0E}') - Engine + start = Engine.clock this_actor.info("========= Requesting a reset of the computation and load counters") host.reset_load() diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index 07e7cbcb52..6c726824c4 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -19,8 +19,14 @@ namespace s4u { class Activity; /** Smart pointer to a simgrid::s4u::Activity */ using ActivityPtr = boost::intrusive_ptr; -XBT_PUBLIC void intrusive_ptr_release(const Activity* actor); -XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* actor); +XBT_PUBLIC void intrusive_ptr_release(const Activity* act); +XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* act); + +class ActivitySet; +/** Smart pointer to a simgrid::s4u::Activity */ +using ActivitySetPtr = boost::intrusive_ptr; +XBT_PUBLIC void intrusive_ptr_release(const ActivitySet* as); +XBT_PUBLIC void intrusive_ptr_add_ref(const ActivitySet* as); class Actor; /** Smart pointer to a simgrid::s4u::Actor */ @@ -31,8 +37,8 @@ XBT_PUBLIC void intrusive_ptr_add_ref(const Actor* actor); class Barrier; /** Smart pointer to a simgrid::s4u::Barrier */ using BarrierPtr = boost::intrusive_ptr; -XBT_PUBLIC void intrusive_ptr_release(Barrier* m); -XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* m); +XBT_PUBLIC void intrusive_ptr_release(Barrier* b); +XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* b); class Comm; /** Smart pointer to a simgrid::s4u::Comm */ diff --git a/include/simgrid/s4u.hpp b/include/simgrid/s4u.hpp index 77a71df2c4..b2853229c2 100644 --- a/include/simgrid/s4u.hpp +++ b/include/simgrid/s4u.hpp @@ -22,9 +22,11 @@ #include #include #include -#include #include +#include +#include + #include #endif /* SIMGRID_S4U_S4U_H */ diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index b66268ef3b..9973a00854 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -34,6 +34,7 @@ namespace s4u { */ class XBT_PUBLIC Activity : public xbt::Extendable { #ifndef DOXYGEN + friend ActivitySet; friend Comm; friend Exec; friend Io; diff --git a/include/simgrid/s4u/ActivitySet.hpp b/include/simgrid/s4u/ActivitySet.hpp new file mode 100644 index 0000000000..86942a3a70 --- /dev/null +++ b/include/simgrid/s4u/ActivitySet.hpp @@ -0,0 +1,86 @@ +/* Copyright (c) 2006-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_S4U_ACTIVITYSET_HPP +#define SIMGRID_S4U_ACTIVITYSET_HPP + +#include +#include + +#include + +namespace simgrid { + +extern template class XBT_PUBLIC xbt::Extendable; + +namespace s4u { +/** @brief ActivitiesSet + * + * This class is a container of activities, allowing to wait for the completion of any or all activities in the set. + * This is somehow similar to the select(2) system call under UNIX, allowing you to wait for the next event about these + * activities. + */ +class XBT_PUBLIC ActivitySet : public xbt::Extendable { + std::vector + activities_; // We use a vector instead of a set to improve reproductibility accross architectures + std::vector failed_activities_; + +public: + ActivitySet() = default; + ActivitySet(const std::vector init) : activities_(init) {} + ~ActivitySet() = default; + + /** Add an activity to the set */ + void push(ActivityPtr a) { activities_.push_back(a); } + /** Remove that activity from the set (no-op if the activity is not in the set) */ + void erase(ActivityPtr a); + + /** Get the amount of activities in the set. Failed activities (if any) are not counted */ + int size() { return activities_.size(); } + /** Return whether the set is empty. Failed activities (if any) are not counted */ + int empty() { return activities_.empty(); } + + /** Wait for the completion of all activities in the set, but not longer than the provided timeout + * + * On timeout, an exception is raised. + * + * In any case, the completed activities remain in the set. Use test_any() to retrieve them. + */ + void wait_all_for(double timeout); + /** Wait for the completion of all activities in the set. The set is NOT emptied afterward. */ + void wait_all() { wait_all_for(-1); } + /** Returns the first terminated activity if any, or ActivityPtr(nullptr) if no activity is terminated */ + ActivityPtr test_any(); + + /** Wait for the completion of one activity from the set, but not longer than the provided timeout. + * + * See wait_any() for details. + * + * @return the first terminated activity, which is automatically removed from the set. + */ + + ActivityPtr wait_any_for(double timeout); + /** Wait for the completion of one activity from the set. + * + * If an activity fails during that time, an exception is raised, and the failed exception is marked as failed in the + * set. Use get_failed_activity() to retrieve it. + * + * If more than one activity failed, the other ones are also removed from the set. Use get_failed_activity() several + * time to retrieve them all. + * + * @return the first terminated activity, which is automatically removed from the set. If more than one activity + * terminated at the same timestamp, then the other ones are still in the set. Use either test_any() or wait_any() to + * retrieve the other ones. + */ + ActivityPtr wait_any() { return wait_any_for(-1); } + + ActivityPtr get_failed_activity(); + bool has_failed_activities() { return not failed_activities_.empty(); } +}; + +} // namespace s4u +} // namespace simgrid + +#endif diff --git a/src/kernel/actor/CommObserver.hpp b/src/kernel/actor/CommObserver.hpp index c8e2c8c3fb..ab036510ee 100644 --- a/src/kernel/actor/CommObserver.hpp +++ b/src/kernel/actor/CommObserver.hpp @@ -110,7 +110,7 @@ public: const std::function& clean_fun, // used to free the synchro in case of problem after a detached send const std::function& copy_data_fun, // used to copy data if not default one - void* payload, bool detached, std::string fun_call) + void* payload, bool detached, std::string_view fun_call) : SimcallObserver(actor) , mbox_(mbox) , payload_size_(payload_size) @@ -160,7 +160,7 @@ public: CommIrecvSimcall(ActorImpl* actor, activity::MailboxImpl* mbox, unsigned char* dst_buff, size_t* dst_buff_size, const std::function& match_fun, const std::function& copy_data_fun, void* payload, - double rate, std::string fun_call) + double rate, std::string_view fun_call) : SimcallObserver(actor) , mbox_(mbox) , dst_buff_(dst_buff) diff --git a/src/mc/api/strategy/UniformStrategy.hpp b/src/mc/api/strategy/UniformStrategy.hpp index 0b665a807b..472c571d3a 100644 --- a/src/mc/api/strategy/UniformStrategy.hpp +++ b/src/mc/api/strategy/UniformStrategy.hpp @@ -48,7 +48,7 @@ public: chosen = xbt::random::uniform_int(0, possibilities-1); for (auto const& [aid, actor] : actors_to_run_) { - if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled())) + if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled())) continue; if (chosen == 0) { return std::make_pair(aid, valuation.at(aid)); diff --git a/src/s4u/s4u_ActivitySet.cpp b/src/s4u/s4u_ActivitySet.cpp new file mode 100644 index 0000000000..4d704206ca --- /dev/null +++ b/src/s4u/s4u_ActivitySet.cpp @@ -0,0 +1,98 @@ +/* Copyright (c) 2023-. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "src/kernel/activity/ActivityImpl.hpp" +#include "src/kernel/actor/ActorImpl.hpp" +#include "src/kernel/actor/CommObserver.hpp" +#include +#include + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activityset, s4u_activity, "S4U set of activities"); + +namespace simgrid { + +template class xbt::Extendable; + +namespace s4u { + +void ActivitySet::erase(ActivityPtr a) +{ + for (auto it = activities_.begin(); it != activities_.end(); it++) + if (*it == a) { + activities_.erase(it); + return; + } +} + +void ActivitySet::wait_all_for(double timeout) +{ + if (timeout < 0.0) { + for (const auto& act : activities_) + act->wait(); + + } else { + + double deadline = Engine::get_clock() + timeout; + for (const auto& act : activities_) + act->wait_until(deadline); + } +} + +ActivityPtr ActivitySet::test_any() +{ + std::vector act_impls(activities_.size()); + std::transform(begin(activities_), end(activities_), begin(act_impls), + [](const ActivityPtr& act) { return act->pimpl_.get(); }); + + kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); + kernel::actor::ActivityTestanySimcall observer{issuer, act_impls, "test_any"}; + ssize_t changed_pos = kernel::actor::simcall_answered( + [&observer] { + return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); + }, + &observer); + if (changed_pos == -1) + return ActivityPtr(nullptr); + + auto ret = activities_.at(changed_pos); + erase(ret); + ret->complete(Activity::State::FINISHED); + return ret; +} + +ActivityPtr ActivitySet::wait_any_for(double timeout) +{ + std::vector act_impls(activities_.size()); + std::transform(begin(activities_), end(activities_), begin(act_impls), + [](const ActivityPtr& activity) { return activity->pimpl_.get(); }); + + kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); + kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"}; + ssize_t changed_pos = kernel::actor::simcall_blocking( + [&observer] { + kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(), + observer.get_timeout()); + }, + &observer); + xbt_assert(changed_pos != -1, + "ActivityImpl::wait_any_for is not supposed to return -1 but instead to raise exceptions"); + + auto ret = activities_.at(changed_pos); + erase(ret); + ret->complete(Activity::State::FINISHED); + return ret; +} + +ActivityPtr ActivitySet::get_failed_activity() +{ + if (failed_activities_.empty()) + return ActivityPtr(nullptr); + auto ret = failed_activities_.back(); + failed_activities_.pop_back(); + return ret; +} + +} // namespace s4u +} // namespace simgrid diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index 20baa0b82e..a6ffe30e44 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -458,6 +458,7 @@ set(PLUGINS_SRC set(S4U_SRC src/s4u/s4u_Activity.cpp + src/s4u/s4u_ActivitySet.cpp src/s4u/s4u_Actor.cpp src/s4u/s4u_Barrier.cpp src/s4u/s4u_Comm.cpp @@ -677,6 +678,7 @@ set(headers_to_install include/simgrid/vm.h include/simgrid/zone.h include/simgrid/s4u/Activity.hpp + include/simgrid/s4u/ActivitySet.hpp include/simgrid/s4u/Actor.hpp include/simgrid/s4u/Barrier.hpp include/simgrid/s4u/Comm.hpp diff --git a/tools/jenkins/build.sh b/tools/jenkins/build.sh index 9c8fc76470..a9d9d64f6b 100755 --- a/tools/jenkins/build.sh +++ b/tools/jenkins/build.sh @@ -2,6 +2,13 @@ # This script is used by various build projects on Jenkins +case "$JENKINS_HOME" in +*-qualif) + echo "Build skipped on $JENKINS_HOME." + exit 0 + ;; +esac + # See https://ci.inria.fr/simgrid/job/SimGrid/configure # See https://ci.inria.fr/simgrid/job/Simgrid-Windows/configure