include examples/c/plugin-host-load/plugin-host-load.tesh
include examples/c/synchro-semaphore/synchro-semaphore.c
include examples/c/synchro-semaphore/synchro-semaphore.tesh
+include examples/cpp/activity-testany/s4u-activity-testany.cpp
+include examples/cpp/activity-testany/s4u-activity-testany.tesh
+include examples/cpp/activity-waitany/s4u-activity-waitany.cpp
+include examples/cpp/activity-waitany/s4u-activity-waitany.tesh
include examples/cpp/actor-create/s4u-actor-create.cpp
include examples/cpp/actor-create/s4u-actor-create.tesh
include examples/cpp/actor-create/s4u-actor-create_d.xml
> [ 3.000000] (maestro@) Oops! Deadlock or code not perfectly clean.
> [ 3.000000] (maestro@) 1 actors are still running, waiting for something.
> [ 3.000000] (maestro@) Legend of the following listing: "Actor <pid> (<name>@<host>): <status>"
-> [ 3.000000] (maestro@) Actor 3 (C@Ginette): waiting for communication activity 0xdeadbeef () in state WAITING to finish
+> [ 3.000000] (maestro@) Actor 3 (C@Ginette): waiting for synchronization activity 0xdeadbeef () in state WAITING to finish
> [ 3.000000] (C@Ginette) I was killed!
> [ 3.000000] (C@Ginette) The backtrace would be displayed here if --log=no_loc would not have been passed
# Deal with each example
-foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
+foreach (example activity-testany activity-waitany
+ actor-create actor-daemon actor-exiting actor-join actor-kill
actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize
app-bittorrent app-chainsend app-token-ring
comm-pingpong comm-ready comm-serialize comm-suspend comm-testany comm-wait comm-waitany comm-waitall comm-waituntil
--- /dev/null
+/* Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_testany, "Messages specific for this s4u example");
+
+static void bob()
+{
+ sg4::Mailbox* mbox = sg4::Mailbox::by_name(std::string("mbox"));
+ const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
+ std::string* payload;
+
+ XBT_INFO("Create my asynchronous activities");
+ auto exec = sg4::this_actor::exec_async(5e9);
+ auto comm = mbox->get_async(&payload);
+ auto io = disk->read_async(3e8);
+
+ std::vector<sg4::ActivityPtr> pending_activities = {boost::dynamic_pointer_cast<sg4::Activity>(exec),
+ boost::dynamic_pointer_cast<sg4::Activity>(comm),
+ boost::dynamic_pointer_cast<sg4::Activity>(io)};
+
+ XBT_INFO("Sleep_for a while");
+ sg4::this_actor::sleep_for(1);
+
+ XBT_INFO("Test for completed activities");
+ while (not pending_activities.empty()) {
+ ssize_t changed_pos = sg4::Activity::test_any(pending_activities);
+ if (changed_pos != -1) {
+ auto* completed_one = pending_activities[changed_pos].get();
+ if (dynamic_cast<sg4::Comm*>(completed_one))
+ XBT_INFO("Completed a Comm");
+ if (dynamic_cast<sg4::Exec*>(completed_one))
+ XBT_INFO("Completed an Exec");
+ if (dynamic_cast<sg4::Io*>(completed_one))
+ XBT_INFO("Completed an I/O");
+ pending_activities.erase(pending_activities.begin() + changed_pos);
+ } else { // nothing matches, wait for a little bit
+ XBT_INFO("Nothing matches, test again in 0.5s");
+ sg4::this_actor::sleep_for(.5);
+ }
+ }
+ XBT_INFO("Last activity is complete");
+ delete payload;
+}
+
+static void alice()
+{
+ auto* payload = new std::string("Message");
+ XBT_INFO("Send '%s'", payload->c_str());
+ sg4::Mailbox::by_name(std::string("mbox"))->put(payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+
+ e.load_platform(argv[1]);
+
+ sg4::Actor::create("bob", e.host_by_name("bob"), bob);
+ sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+
+ e.run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-activity-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
+> [0.00] [alice] Send 'Message'
+> [0.00] [ bob] Create my asynchronous activities
+> [0.00] [ bob] Sleep_for a while
+> [1.00] [ bob] Test for completed activities
+> [1.00] [ bob] Nothing matches, test again in 0.5s
+> [1.50] [ bob] Nothing matches, test again in 0.5s
+> [2.00] [ bob] Nothing matches, test again in 0.5s
+> [2.50] [ bob] Nothing matches, test again in 0.5s
+> [3.00] [ bob] Completed an I/O
+> [3.00] [ bob] Nothing matches, test again in 0.5s
+> [3.50] [ bob] Nothing matches, test again in 0.5s
+> [4.00] [ bob] Nothing matches, test again in 0.5s
+> [4.50] [ bob] Nothing matches, test again in 0.5s
+> [5.00] [ bob] Completed an Exec
+> [5.00] [ bob] Nothing matches, test again in 0.5s
+> [5.50] [ bob] Completed a Comm
+> [5.50] [ bob] Last activity is complete
--- /dev/null
+/* Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+ sg4::Mailbox* mbox = sg4::Mailbox::by_name(std::string("mbox"));
+ const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
+ std::string* payload;
+
+ XBT_INFO("Create my asynchronous activities");
+ auto exec = sg4::this_actor::exec_async(5e9);
+ auto comm = mbox->get_async(&payload);
+ auto io = disk->read_async(3e8);
+
+ std::vector<sg4::ActivityPtr> pending_activities = {boost::dynamic_pointer_cast<sg4::Activity>(exec),
+ boost::dynamic_pointer_cast<sg4::Activity>(comm),
+ boost::dynamic_pointer_cast<sg4::Activity>(io)};
+
+ XBT_INFO("Wait for asynchrounous activities to complete");
+ while (not pending_activities.empty()) {
+ ssize_t changed_pos = sg4::Activity::wait_any(pending_activities);
+ if (changed_pos != -1) {
+ auto* completed_one = pending_activities[changed_pos].get();
+ if (dynamic_cast<sg4::Comm*>(completed_one))
+ XBT_INFO("Completed a Comm");
+ if (dynamic_cast<sg4::Exec*>(completed_one))
+ XBT_INFO("Completed an Exec");
+ if (dynamic_cast<sg4::Io*>(completed_one))
+ XBT_INFO("Completed an I/O");
+ pending_activities.erase(pending_activities.begin() + changed_pos);
+ }
+ }
+ XBT_INFO("Last activity is complete");
+ delete payload;
+}
+
+static void alice()
+{
+ auto* payload = new std::string("Message");
+ XBT_INFO("Send '%s'", payload->c_str());
+ sg4::Mailbox::by_name(std::string("mbox"))->put(payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+
+ e.load_platform(argv[1]);
+
+ sg4::Actor::create("bob", e.host_by_name("bob"), bob);
+ sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+
+ e.run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-activity-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchrounous activities to complete
+> [3.000000] [ bob] Completed an I/O
+> [5.000000] [ bob] Completed an Exec
+> [5.197828] [ bob] Completed a Comm
+> [5.197828] [ bob] Last activity is complete
> [ 3.000000] (maestro@) Oops! Deadlock or code not perfectly clean.
> [ 3.000000] (maestro@) 1 actors are still running, waiting for something.
> [ 3.000000] (maestro@) Legend of the following listing: "Actor <pid> (<name>@<host>): <status>"
-> [ 3.000000] (maestro@) Actor 3 (C@Ginette): waiting for communication activity 0xdeadbeef () in state WAITING to finish
+> [ 3.000000] (maestro@) Actor 3 (C@Ginette): waiting for synchronization activity 0xdeadbeef () in state WAITING to finish
> [ 3.000000] (C@Ginette) I was killed!
> [ 3.000000] (C@Ginette) The backtrace would be displayed here if --log=no_loc would not have been passed
> [ 3.000000] (maestro@) Actor C terminates now
static void test()
{
- std::vector<simgrid::s4u::IoPtr> pending_ios;
+ std::vector<simgrid::s4u::ActivityPtr> pending_activities;
simgrid::s4u::ExecPtr bob_compute = simgrid::s4u::this_actor::exec_init(1e9);
+ pending_activities.push_back(boost::dynamic_pointer_cast<simgrid::s4u::Activity>(bob_compute));
simgrid::s4u::IoPtr bob_write =
simgrid::s4u::Host::current()->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::WRITE);
- pending_ios.push_back(bob_write);
+ pending_activities.push_back(boost::dynamic_pointer_cast<simgrid::s4u::Activity>(bob_write));
simgrid::s4u::IoPtr carl_read =
simgrid::s4u::Host::by_name("carl")->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::READ);
- pending_ios.push_back(carl_read);
+ pending_activities.push_back(boost::dynamic_pointer_cast<simgrid::s4u::Activity>(carl_read));
simgrid::s4u::ExecPtr carl_compute = simgrid::s4u::Host::by_name("carl")->exec_init(1e9);
+ pending_activities.push_back(boost::dynamic_pointer_cast<simgrid::s4u::Activity>(carl_compute));
// Name the activities (for logging purposes only)
bob_compute->set_name("bob compute");
carl_compute->vetoable_start();
// wait for the completion of all activities
- bob_compute->wait();
- while (not pending_ios.empty()) {
- ssize_t changed_pos = simgrid::s4u::Io::wait_any(pending_ios);
- XBT_INFO("Io '%s' is complete", pending_ios[changed_pos]->get_cname());
- pending_ios.erase(pending_ios.begin() + changed_pos);
+ while (not pending_activities.empty()) {
+ ssize_t changed_pos = simgrid::s4u::Activity::wait_any(pending_activities);
+ XBT_INFO("Activity '%s' is complete", pending_activities[changed_pos]->get_cname());
+ pending_activities.erase(pending_activities.begin() + changed_pos);
}
- carl_compute->wait();
}
int main(int argc, char* argv[])
$ ${bindir:=.}/s4u-io-dependent ${platfdir}/hosts_with_disks.xml --log=s4u_activity.t:verbose "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
> [ 1.000000] (1:bob@bob) 'bob write' is assigned to a resource and all dependencies are solved. Let's start
> [ 1.000000] (1:bob@bob) Remove a dependency from 'bob compute' on 'bob write'
+> [ 1.000000] (1:bob@bob) Activity 'bob compute' is complete
> [ 1.100000] (1:bob@bob) 'carl read' is assigned to a resource and all dependencies are solved. Let's start
-> [ 1.100000] (1:bob@bob) Io 'bob write' is complete
+> [ 1.100000] (1:bob@bob) Activity 'bob write' is complete
> [ 1.100000] (1:bob@bob) Remove a dependency from 'bob write' on 'carl read'
-> [ 1.140000] (1:bob@bob) Io 'carl read' is complete
+> [ 1.140000] (1:bob@bob) Activity 'carl read' is complete
> [ 1.140000] (1:bob@bob) 'carl compute' is assigned to a resource and all dependencies are solved. Let's start
> [ 1.140000] (1:bob@bob) Remove a dependency from 'carl read' on 'carl compute'
+> [ 2.140000] (1:bob@bob) Activity 'carl compute' is complete
> [ 2.140000] (0:maestro@) Simulation time 2.14
> [ 0.000000] (0:maestro@) Counter-example execution trace:
> [ 0.000000] (0:maestro@) [(1)HostA (server)] iRecv(dst=(1)HostA (server), buff=(verbose only), size=(verbose only))
> [ 0.000000] (0:maestro@) [(2)HostB (client)] iSend(src=(2)HostB (client), buff=(verbose only), size=(verbose only))
-> [ 0.000000] (0:maestro@) [(1)HostA (server)] Wait(comm=(verbose only) [(2)HostB (client)-> (1)HostA (server)])
+> [ 0.000000] (0:maestro@) [(1)HostA (server)] Wait(comm=(verbose only) [(2)HostB (client) -> (1)HostA (server)])
> [ 0.000000] (0:maestro@) [(1)HostA (server)] iRecv(dst=(1)HostA (server), buff=(verbose only), size=(verbose only))
-> [ 0.000000] (0:maestro@) [(2)HostB (client)] Wait(comm=(verbose only) [(2)HostB (client)-> (1)HostA (server)])
+> [ 0.000000] (0:maestro@) [(2)HostB (client)] Wait(comm=(verbose only) [(2)HostB (client) -> (1)HostA (server)])
> [ 0.000000] (0:maestro@) [(4)HostD (client)] iSend(src=(4)HostD (client), buff=(verbose only), size=(verbose only))
-> [ 0.000000] (0:maestro@) [(1)HostA (server)] Wait(comm=(verbose only) [(4)HostD (client)-> (1)HostA (server)])
+> [ 0.000000] (0:maestro@) [(1)HostA (server)] Wait(comm=(verbose only) [(4)HostD (client) -> (1)HostA (server)])
> [ 0.000000] (0:maestro@) [(1)HostA (server)] iRecv(dst=(1)HostA (server), buff=(verbose only), size=(verbose only))
> [ 0.000000] (0:maestro@) [(3)HostC (client)] iSend(src=(3)HostC (client), buff=(verbose only), size=(verbose only))
-> [ 0.000000] (0:maestro@) [(1)HostA (server)] Wait(comm=(verbose only) [(3)HostC (client)-> (1)HostA (server)])
+> [ 0.000000] (0:maestro@) [(1)HostA (server)] Wait(comm=(verbose only) [(3)HostC (client) -> (1)HostA (server)])
> [ 0.000000] (0:maestro@) Path = 1;2;1;1;2;4;1;1;3;1
> [ 0.000000] (0:maestro@) Expanded states = 22
> [ 0.000000] (0:maestro@) Visited states = 56
*/
virtual Activity* start() = 0;
/** Blocks the current actor until the activity is terminated */
+ /** Tests whether the given activity is terminated yet. */
+ virtual bool test();
+ /*! take a vector s4u::ActivityPtr and return the rank of the first finished one (or -1 if none is done). */
+ static ssize_t test_any(const std::vector<ActivityPtr>& activities);
+
Activity* wait() { return wait_for(-1.0); }
/** Blocks the current actor until the activity is terminated, or until the timeout is elapsed\n
* Raises: timeout exception.*/
/** Blocks the current actor until the activity is terminated, or until the time limit is reached\n
* Raises: timeout exception. */
void wait_until(double time_limit);
+ /*! take a vector of s4u::ActivityPtr and return when one of them is finished.
+ * The return value is the rank of the first finished ActivityPtr. */
+ static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return wait_any_for(activities, -1); }
+ /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
+ static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout);
/** Cancel that activity */
Activity* cancel();
/** Return a string representation of the activity's state (one of INITED, STARTING, STARTED, CANCELED, FINISHED) */
const char* get_state_str() const;
void set_state(Activity::State state) { state_ = state; }
- /** Tests whether the given activity is terminated yet. */
- virtual bool test();
/** Blocks the progression of this activity until it gets resumed */
virtual Activity* suspend();
Comm* start() override;
Comm* wait_for(double timeout) override;
- bool test() override;
/** Start the comm, and ignore its result. It can be completely forgotten after that. */
Comm* detach();
* under the terms of the license (GNU LGPL) which comes with this package. */
#include <simgrid/modelchecker.h>
+#include <simgrid/s4u/Engine.hpp>
#include "src/kernel/activity/ActivityImpl.hpp"
+#include "src/kernel/activity/CommImpl.hpp"
#include "src/kernel/activity/SynchroRaw.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
#include "src/kernel/actor/SimcallObserver.hpp"
+#include "src/kernel/resource/CpuImpl.hpp"
#include "src/mc/mc_replay.hpp"
#include <boost/range/algorithm.hpp>
return to_c_str(state_);
}
-bool ActivityImpl::test()
+bool ActivityImpl::test(actor::ActorImpl* issuer)
{
+ // Associate this simcall to the synchro
+ auto* observer = dynamic_cast<kernel::actor::ActivityTestSimcall*>(issuer->simcall_.observer_);
+ if (observer)
+ register_simcall(&issuer->simcall_);
+
if (state_ != State::WAITING && state_ != State::RUNNING) {
finish();
+ issuer->exception_ = nullptr; // Do not propagate exception in that case
return true;
}
+
+ if (observer) {
+ observer->set_result(false);
+ issuer->waiting_synchro_ = nullptr;
+ unregister_simcall(&issuer->simcall_);
+ issuer->simcall_answer();
+ }
return false;
}
+ssize_t ActivityImpl::test_any(actor::ActorImpl* issuer, const std::vector<ActivityImpl*>& activities)
+{
+ if (MC_is_active() || MC_record_replay_is_active()) {
+ int idx = issuer->simcall_.mc_value_;
+ xbt_assert(idx == -1 || activities[idx]->test(issuer));
+ return idx;
+ }
+
+ for (std::size_t i = 0; i < activities.size(); ++i) {
+ if (activities[i]->test(issuer)) {
+ auto* observer = dynamic_cast<kernel::actor::ActivityTestanySimcall*>(issuer->simcall_.observer_);
+ xbt_assert(observer != nullptr);
+ observer->set_result(i);
+ issuer->simcall_answer();
+ return i;
+ }
+ }
+ issuer->simcall_answer();
+ return -1;
+}
+
void ActivityImpl::wait_for(actor::ActorImpl* issuer, double timeout)
{
- XBT_DEBUG("Wait for execution of synchro %p, state %s", this, to_c_str(state_));
+ XBT_DEBUG("Wait for execution of synchro %p, state %s", this, get_state_str());
xbt_assert(std::isfinite(timeout), "timeout is not finite!");
/* Associate this simcall to the synchro */
xbt_assert(not MC_is_active() && not MC_record_replay_is_active(), "MC is currently not supported here.");
/* If the synchro is already finished then perform the error handling */
- if (state_ != State::RUNNING) {
+ if (state_ != State::WAITING && state_ != State::RUNNING) {
finish();
} else {
+ auto* comm = dynamic_cast<CommImpl*>(this);
+ if (comm != nullptr) {
+ resource::Action* sleep = issuer->get_host()->get_cpu()->sleep(timeout);
+ sleep->set_activity(comm);
+
+ if (issuer == comm->src_actor_)
+ comm->src_timeout_ = sleep;
+ else
+ comm->dst_timeout_ = sleep;
+ }
/* we need a sleep action (even when the timeout is infinite) to be notified of host failures */
RawImplPtr synchro(new RawImpl([this, issuer]() {
this->unregister_simcall(&issuer->simcall_);
issuer->waiting_synchro_ = nullptr;
- auto* observer = dynamic_cast<kernel::actor::ActivityWaitSimcall*>(issuer->simcall_.observer_);
+ issuer->exception_ = nullptr;
+ auto* observer = dynamic_cast<kernel::actor::ActivityWaitSimcall*>(issuer->simcall_.observer_);
xbt_assert(observer != nullptr);
observer->set_result(true);
}));
}
}
+void ActivityImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<ActivityImpl*>& activities, double timeout)
+{
+ XBT_DEBUG("Wait for execution of any synchro");
+ if (timeout < 0.0) {
+ issuer->simcall_.timeout_cb_ = nullptr;
+ } else {
+ issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, &activities]() {
+ issuer->simcall_.timeout_cb_ = nullptr;
+ for (auto* act : activities)
+ act->unregister_simcall(&issuer->simcall_);
+ // default result (-1) is set in actor::ActivityWaitanySimcall
+ issuer->simcall_answer();
+ });
+ }
+
+ for (auto* act : activities) {
+ /* associate this simcall to the the synchro */
+ act->simcalls_.push_back(&issuer->simcall_);
+ /* see if the synchro is already finished */
+ if (act->get_state() != State::WAITING && act->get_state() != State::RUNNING) {
+ act->finish();
+ break;
+ }
+ }
+ XBT_DEBUG("Exit from ActivityImlp::wait_any_for");
+}
+
void ActivityImpl::suspend()
{
if (surf_action_ == nullptr)
state_ = State::CANCELED;
}
+void ActivityImpl::handle_activity_waitany(smx_simcall_t simcall)
+{
+ /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+ * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
+ * simcall */
+ if (auto* observer = dynamic_cast<actor::ActivityWaitanySimcall*>(simcall->observer_)) {
+ if (simcall->timeout_cb_) {
+ simcall->timeout_cb_->remove();
+ simcall->timeout_cb_ = nullptr;
+ }
+
+ auto activities = observer->get_activities();
+ for (auto* act : activities)
+ act->unregister_simcall(simcall);
+
+ if (not MC_is_active() && not MC_record_replay_is_active()) {
+ auto element = std::find(activities.begin(), activities.end(), this);
+ int rank = element != activities.end() ? static_cast<int>(std::distance(activities.begin(), element)) : -1;
+ auto* observer = dynamic_cast<kernel::actor::ActivityWaitanySimcall*>(simcall->observer_);
+ observer->set_result(rank);
+ }
+ }
+}
+
// boost::intrusive_ptr<Activity> support:
void intrusive_ptr_add_ref(ActivityImpl* activity)
{
void set_finish_time(double finish_time) { finish_time_ = finish_time; }
double get_finish_time() const { return finish_time_; }
- virtual bool test();
+ virtual bool test(actor::ActorImpl* issuer);
+ static ssize_t test_any(actor::ActorImpl* issuer, const std::vector<ActivityImpl*>& activities);
+
virtual void wait_for(actor::ActorImpl* issuer, double timeout);
+ static void wait_any_for(actor::ActorImpl* issuer, const std::vector<ActivityImpl*>& activities, double timeout);
virtual ActivityImpl& set_timeout(double) { THROW_UNIMPLEMENTED; }
virtual void suspend();
void register_simcall(smx_simcall_t simcall);
void unregister_simcall(smx_simcall_t simcall);
+ void handle_activity_waitany(smx_simcall_t simcall);
void clean_action();
virtual double get_remaining() const;
// Support for the boost::intrusive_ptr<ActivityImpl> datatype
comm->wait_for(simcall->issuer_, timeout);
}
-bool simcall_HANDLER_comm_test(smx_simcall_t, simgrid::kernel::activity::CommImpl* comm)
+bool simcall_HANDLER_comm_test(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comm)
{
- return comm->test();
+ return comm->test(simcall->issuer_);
}
ssize_t simcall_HANDLER_comm_testany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count)
{
- std::vector<simgrid::kernel::activity::CommImpl*> comms_vec(comms, comms + count);
- return simgrid::kernel::activity::CommImpl::test_any(simcall->issuer_, comms_vec);
+ std::vector<simgrid::kernel::activity::ActivityImpl*> comms_vec(comms, comms + count);
+ return simgrid::kernel::activity::ActivityImpl::test_any(simcall->issuer_, comms_vec);
}
void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count,
copied_ = true;
}
-bool CommImpl::test()
+bool CommImpl::test(actor::ActorImpl* issuer)
{
if ((MC_is_active() || MC_record_replay_is_active()) && src_actor_ && dst_actor_)
set_state(State::DONE);
- return ActivityImpl::test();
+ return ActivityImpl::test(issuer);
}
void CommImpl::wait_for(actor::ActorImpl* issuer, double timeout)
/* Associate this simcall to the wait synchro */
register_simcall(&issuer->simcall_);
-
if (MC_is_active() || MC_record_replay_is_active()) {
int idx = issuer->simcall_.mc_value_;
if (idx == 0) {
finish();
return;
}
-
- /* If the synchro has already finish perform the error handling, */
- /* otherwise set up a waiting timeout on the right side */
- if (get_state() != State::WAITING && get_state() != State::RUNNING) {
- finish();
- } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */
- resource::Action* sleep = issuer->get_host()->get_cpu()->sleep(timeout);
- sleep->set_activity(this);
-
- if (issuer == src_actor_)
- src_timeout_ = sleep;
- else
- dst_timeout_ = sleep;
- }
-}
-
-ssize_t CommImpl::test_any(const actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms)
-{
- if (MC_is_active() || MC_record_replay_is_active()) {
- int idx = issuer->simcall_.mc_value_;
- xbt_assert(idx == -1 || comms[idx]->test());
- return idx;
- }
-
- for (std::size_t i = 0; i < comms.size(); ++i) {
- if (comms[i]->test())
- return i;
- }
- return -1;
+ ActivityImpl::wait_for(issuer, timeout);
}
void CommImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms, double timeout)
comm->finish();
return;
}
-
- if (timeout < 0.0) {
- issuer->simcall_.timeout_cb_ = nullptr;
- } else {
- issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, comms]() {
- // FIXME: Vector `comms' is copied here. Use a reference once its lifetime is extended (i.e. when the simcall is
- // modernized).
- issuer->simcall_.timeout_cb_ = nullptr;
- for (auto* comm : comms)
- comm->unregister_simcall(&issuer->simcall_);
- simcall_comm_waitany__set__result(&issuer->simcall_, -1);
- issuer->simcall_answer();
- });
- }
-
- for (auto* comm : comms) {
- /* associate this simcall to the the synchro */
- comm->simcalls_.push_back(&issuer->simcall_);
-
- /* see if the synchro is already finished */
- if (comm->get_state() != State::WAITING && comm->get_state() != State::RUNNING) {
- comm->finish();
- break;
- }
- }
+ std::vector<ActivityImpl*> activities(comms.begin(), comms.end());
+ ActivityImpl::wait_any_for(issuer, activities, timeout);
}
void CommImpl::suspend()
if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
continue; // if actor handling comm is killed
- if (simcall->call_ == simix::Simcall::COMM_WAITANY) {
- CommImpl** comms = simcall_comm_waitany__get__comms(simcall);
- size_t count = simcall_comm_waitany__get__count(simcall);
- for (size_t i = 0; i < count; i++)
- comms[i]->unregister_simcall(simcall);
- if (simcall->timeout_cb_) {
- simcall->timeout_cb_->remove();
- simcall->timeout_cb_ = nullptr;
- }
- if (not MC_is_active() && not MC_record_replay_is_active()) {
- auto element = std::find(comms, comms + count, this);
- ssize_t rank = (element != comms + count) ? element - comms : -1;
- simcall_comm_waitany__set__result(simcall, rank);
- }
- }
+
+ handle_activity_waitany(simcall);
/* Check out for errors */
set_exception(simcall->issuer_);
simcall->issuer_->simcall_answer();
}
- /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
- if (simcall->issuer_->exception_ &&
- (simcall->call_ == simix::Simcall::COMM_WAITANY || simcall->call_ == simix::Simcall::COMM_TESTANY)) {
- // First retrieve the rank of our failing synchro
- CommImpl** comms;
- size_t count;
- if (simcall->call_ == simix::Simcall::COMM_WAITANY) {
- comms = simcall_comm_waitany__get__comms(simcall);
- count = simcall_comm_waitany__get__count(simcall);
- } else {
- /* simcall->call_ == simix::Simcall::COMM_TESTANY */
- comms = simcall_comm_testany__get__comms(simcall);
- count = simcall_comm_testany__get__count(simcall);
- }
- auto element = std::find(comms, comms + count, this);
- ssize_t rank = (element != comms + count) ? element - comms : -1;
- // In order to modify the exception we have to rethrow it:
- try {
- std::rethrow_exception(simcall->issuer_->exception_);
- } catch (Exception& e) {
- e.set_value(rank);
- }
- }
simcall->issuer_->waiting_synchro_ = nullptr;
simcall->issuer_->activities_.remove(this);
std::vector<s4u::Link*> get_traversed_links() const;
void copy_data();
- bool test() override;
+ bool test(actor::ActorImpl* issuer) override;
void wait_for(actor::ActorImpl* issuer, double timeout) override;
- static ssize_t test_any(const actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms);
static void wait_any_for(actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms, double timeout);
CommImpl* start();
if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
continue; // if process handling comm is killed
- /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
- * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
- * simcall */
- if (auto* observer = dynamic_cast<actor::ExecutionWaitanySimcall*>(simcall->observer_)) {
- const auto& execs = observer->get_execs();
-
- for (auto* exec : execs)
- exec->unregister_simcall(simcall);
-
- if (simcall->timeout_cb_) {
- simcall->timeout_cb_->remove();
- simcall->timeout_cb_ = nullptr;
- }
-
- if (not MC_is_active() && not MC_record_replay_is_active()) {
- auto element = std::find(execs.begin(), execs.end(), this);
- int rank = element != execs.end() ? static_cast<int>(std::distance(execs.begin(), element)) : -1;
- observer->set_result(rank);
- }
- }
+ handle_activity_waitany(simcall);
set_exception(simcall->issuer_);
return this;
}
-void ExecImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<ExecImpl*>& execs, double timeout)
-{
- if (timeout < 0.0) {
- issuer->simcall_.timeout_cb_ = nullptr;
- } else {
- issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, &execs]() {
- issuer->simcall_.timeout_cb_ = nullptr;
- for (auto* exec : execs)
- exec->unregister_simcall(&issuer->simcall_);
- // default result (-1) is set in actor::ExecutionWaitanySimcall
- issuer->simcall_answer();
- });
- }
-
- for (auto* exec : execs) {
- /* associate this simcall to the the synchro */
- exec->simcalls_.push_back(&issuer->simcall_);
- /* see if the synchro is already finished */
- if (exec->get_state() != State::WAITING && exec->get_state() != State::RUNNING) {
- exec->finish();
- break;
- }
- }
-}
-
/*************
* Callbacks *
*************/
void finish() override;
void reset();
- static void wait_any_for(actor::ActorImpl* issuer, const std::vector<ExecImpl*>& execs, double timeout);
static xbt::signal<void(ExecImpl const&, s4u::Host*)> on_migration;
};
if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
continue; // if process handling comm is killed
- if (auto* observer = dynamic_cast<kernel::actor::IoWaitanySimcall*>(simcall->observer_)) { // simcall is a wait_any?
- const auto& ios = observer->get_ios();
-
- for (auto* io : ios) {
- io->unregister_simcall(simcall);
-
- if (simcall->timeout_cb_) {
- simcall->timeout_cb_->remove();
- simcall->timeout_cb_ = nullptr;
- }
- }
-
- if (not MC_is_active() && not MC_record_replay_is_active()) {
- auto element = std::find(ios.begin(), ios.end(), this);
- int rank = element != ios.end() ? static_cast<int>(std::distance(ios.begin(), element)) : -1;
- observer->set_result(rank);
- }
- }
+
+ handle_activity_waitany(simcall);
set_exception(simcall->issuer_);
}
}
-void IoImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<IoImpl*>& ios, double timeout)
-{
- if (timeout < 0.0) {
- issuer->simcall_.timeout_cb_ = nullptr;
- } else {
- issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, &ios]() {
- issuer->simcall_.timeout_cb_ = nullptr;
- for (auto* io : ios)
- io->unregister_simcall(&issuer->simcall_);
- // default result (-1) is set in actor::IoWaitanySimcall
- issuer->simcall_answer();
- });
- }
-
- for (auto* io : ios) {
- /* associate this simcall to the the synchro */
- io->simcalls_.push_back(&issuer->simcall_);
-
- /* see if the synchro is already finished */
- if (io->get_state() != State::WAITING && io->get_state() != State::RUNNING) {
- io->finish();
- break;
- }
- }
-}
-
} // namespace activity
} // namespace kernel
} // namespace simgrid
void post() override;
void set_exception(actor::ActorImpl* issuer) override;
void finish() override;
- static void wait_any_for(actor::ActorImpl* issuer, const std::vector<IoImpl*>& ios, double timeout);
};
} // namespace activity
} // namespace kernel
/* Go into sleep and return control to maestro */
context_->suspend();
-
/* Ok, maestro returned control to us */
XBT_DEBUG("Control returned to me: '%s'", get_cname());
e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
}
}
-
#if HAVE_SMPI
if (not finished_)
smpi_switch_data_segment(get_iface());
#include "src/kernel/actor/SimcallObserver.hpp"
#include "simgrid/s4u/Host.hpp"
+#include "src/kernel/activity/CommImpl.hpp"
#include "src/kernel/activity/MutexImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/mc/mc_config.hpp"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_observer, mc, "Logging specific to MC simcall observation");
issuer_->get_cname());
}
-std::string SimcallObserver::dot_label() const
+std::string SimcallObserver::dot_label(int /*times_considered*/) const
{
if (issuer_->get_host())
- return xbt::string_printf("[(%ld)%s] ", issuer_->get_pid(), issuer_->get_cname());
+ return xbt::string_printf("[(%ld)%s] ", issuer_->get_pid(), issuer_->get_host()->get_cname());
return xbt::string_printf("[(%ld)] ", issuer_->get_pid());
}
return SimcallObserver::to_string(times_considered) + "MC_RANDOM(" + std::to_string(times_considered) + ")";
}
-std::string RandomSimcall::dot_label() const
+std::string RandomSimcall::dot_label(int times_considered) const
{
- return SimcallObserver::dot_label() + "MC_RANDOM(" + std::to_string(next_value_) + ")";
+ return SimcallObserver::dot_label(times_considered) + "MC_RANDOM(" + std::to_string(next_value_) + ")";
}
void RandomSimcall::prepare(int times_considered)
return SimcallObserver::to_string(times_considered) + "Mutex UNLOCK";
}
-std::string MutexUnlockSimcall::dot_label() const
+std::string MutexUnlockSimcall::dot_label(int times_considered) const
{
- return SimcallObserver::dot_label() + "Mutex UNLOCK";
+ return SimcallObserver::dot_label(times_considered) + "Mutex UNLOCK";
}
std::string MutexLockSimcall::to_string(int times_considered) const
return res;
}
-std::string MutexLockSimcall::dot_label() const
+std::string MutexLockSimcall::dot_label(int times_considered) const
{
- return SimcallObserver::dot_label() + (blocking_ ? "Mutex LOCK" : "Mutex TRYLOCK");
+ return SimcallObserver::dot_label(times_considered) + (blocking_ ? "Mutex LOCK" : "Mutex TRYLOCK");
}
bool MutexLockSimcall::is_enabled() const
return res;
}
-std::string ConditionWaitSimcall::dot_label() const
+std::string ConditionWaitSimcall::dot_label(int times_considered) const
{
- return SimcallObserver::dot_label() + "Condition WAIT";
+ return SimcallObserver::dot_label(times_considered) + "Condition WAIT";
}
bool ConditionWaitSimcall::is_enabled() const
return res;
}
-std::string SemAcquireSimcall::dot_label() const
+std::string SemAcquireSimcall::dot_label(int times_considered) const
{
- return SimcallObserver::dot_label() + "Sem ACQUIRE";
+ return SimcallObserver::dot_label(times_considered) + "Sem ACQUIRE";
}
bool SemAcquireSimcall::is_enabled() const
return true;
}
-std::string ExecutionWaitanySimcall::to_string(int times_considered) const
+int ActivityTestanySimcall::get_max_consider() const
{
- std::string res = SimcallObserver::to_string(times_considered) + "Execution WAITANY";
- res += "(" + (timeout_ == -1.0 ? "" : std::to_string(timeout_)) + ")";
+ // Only Comms are of interest to MC for now. When all types of activities can be consider, this function can simply
+ // return the size of activities_.
+ int count = 0;
+ for (const auto& act : activities_)
+ if (dynamic_cast<activity::CommImpl*>(act) != nullptr)
+ count++;
+ return count;
+}
+
+std::string ActivityTestanySimcall::to_string(int times_considered) const
+{
+ std::string res = SimcallObserver::to_string(times_considered);
+ if (times_considered == -1) {
+ res += "TestAny FALSE(-)";
+ } else {
+ res += "TestAny(" + xbt::string_printf("(%d of %zu)", times_considered + 1, activities_.size());
+ }
+
return res;
}
-std::string ExecutionWaitanySimcall::dot_label() const
+std::string ActivityTestanySimcall::dot_label(int times_considered) const
{
- return SimcallObserver::dot_label() + "Execution WAITANY";
+ std::string res = SimcallObserver::dot_label(times_considered) + "TestAny ";
+ if (times_considered == -1) {
+ res += "FALSE";
+ } else {
+ res += xbt::string_printf("TRUE [%d of %zu]", times_considered + 1, activities_.size());
+ }
+ return res;
}
-std::string IoWaitanySimcall::to_string(int times_considered) const
+std::string ActivityTestSimcall::to_string(int times_considered) const
{
- std::string res = SimcallObserver::to_string(times_considered) + "I/O WAITANY";
- res += "(" + (timeout_ == -1.0 ? "" : std::to_string(timeout_)) + ")";
+ std::string res = SimcallObserver::to_string(times_considered) + "Test ";
+ auto* comm = dynamic_cast<activity::CommImpl*>(activity_);
+ if (comm) {
+ if (comm->src_actor_.get() == nullptr || comm->dst_actor_.get() == nullptr) {
+ res += "FALSE(comm=";
+ res += XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose) ? xbt::string_printf("%p)", comm)
+ : "(verbose only))";
+ } else {
+ res += "TRUE(comm=";
+
+ auto src = comm->src_actor_;
+ auto dst = comm->dst_actor_;
+ res +=
+ XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose) ? xbt::string_printf("%p", comm) : "(verbose only) ";
+ res += xbt::string_printf("[(%ld)%s (%s) ", src->get_pid(), src->get_host()->get_cname(), src->get_cname()) +
+ "-> " +
+ xbt::string_printf("(%ld)%s (%s)])", dst->get_pid(), dst->get_host()->get_cname(), dst->get_cname());
+ }
+ }
+ return res;
+}
+
+std::string ActivityTestSimcall::dot_label(int times_considered) const
+{
+ std::string res = SimcallObserver::dot_label(times_considered) + "Test ";
+ auto* comm = dynamic_cast<activity::CommImpl*>(activity_);
+ if (comm && (comm->src_actor_.get() == nullptr || comm->dst_actor_.get() == nullptr)) {
+ res += "FALSE";
+ } else {
+ res += "TRUE";
+ }
+ return res;
+}
+
+std::string ActivityWaitSimcall::to_string(int times_considered) const
+{
+ std::string res = SimcallObserver::to_string(times_considered);
+ auto* comm = dynamic_cast<activity::CommImpl*>(activity_);
+ if (comm == nullptr)
+ return res;
+ if (times_considered == -1) {
+ res += "WaitTimeout(comm=" + XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose)
+ ? xbt::string_printf("%p)", comm)
+ : "(verbose only))";
+ } else {
+ res += "Wait(comm=";
+
+ auto src = comm->src_actor_;
+ auto dst = comm->dst_actor_;
+ res +=
+ XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose) ? xbt::string_printf("%p", comm) : "(verbose only) ";
+ res += xbt::string_printf("[(%ld)%s (%s) ", src->get_pid(), src->get_host()->get_cname(), src->get_cname()) +
+ "-> " + xbt::string_printf("(%ld)%s (%s)])", dst->get_pid(), dst->get_host()->get_cname(), dst->get_cname());
+ }
return res;
}
-std::string IoWaitanySimcall::dot_label() const
+std::string ActivityWaitSimcall::dot_label(int times_considered) const
{
- return SimcallObserver::dot_label() + "I/O WAITANY";
+ std::string res = SimcallObserver::dot_label(times_considered);
+ res += (times_considered == -1) ? "WaitTimeout " : "Wait ";
+
+ auto* comm = dynamic_cast<activity::CommImpl*>(activity_);
+ if (comm) {
+ auto src = comm->src_actor_;
+ auto dst = comm->dst_actor_;
+ res += " [(" + std::to_string(src ? src->get_pid() : 0) + ")";
+ res += "->(" + std::to_string(dst ? dst->get_pid() : 0) + ")]";
+ }
+ return res;
+}
+
+bool ActivityWaitSimcall::is_enabled() const
+{
+ /* FIXME: check also that src and dst processes are not suspended */
+ const auto* comm = dynamic_cast<activity::CommImpl*>(activity_);
+ if (comm == nullptr)
+ return true;
+
+ if (comm->src_timeout_ || comm->dst_timeout_) {
+ /* If it has a timeout it will be always be enabled (regardless of who declared the timeout),
+ * because even if the communication is not ready, it can timeout and won't block. */
+ if (_sg_mc_timeout == 1)
+ return true;
+ }
+ /* On the other hand if it hasn't a timeout, check if the comm is ready.*/
+ else if (comm->detached() && comm->src_actor_ == nullptr && comm->get_state() == activity::State::READY)
+ return (comm->dst_actor_ != nullptr);
+ return (comm->src_actor_ && comm->dst_actor_);
+}
+
+std::string ActivityWaitanySimcall::dot_label(int times_considered) const
+{
+ return SimcallObserver::dot_label(times_considered) +
+ xbt::string_printf("WaitAny [%d of %zu]", times_considered + 1, activities_.size());
+}
+
+bool ActivityWaitanySimcall::is_enabled() const
+{
+ // FIXME: deal with other kind of activities (Exec and I/Os)
+ for (auto act : activities_) {
+ const auto* comm = dynamic_cast<activity::CommImpl*>(act);
+ if (comm != nullptr && comm->src_actor_ && comm->dst_actor_)
+ return true;
+ }
+ return false;
+}
+
+int ActivityWaitanySimcall::get_max_consider() const
+{
+ // Only Comms are of interest to MC for now. When all types of activities can be consider, this function can simply
+ // return the size of activities_.
+ int count = 0;
+ for (const auto& act : activities_)
+ if (dynamic_cast<activity::CommImpl*>(act) != nullptr)
+ count++;
+ return count;
+}
+
+std::string ActivityWaitanySimcall::to_string(int times_considered) const
+{
+ std::string res = SimcallObserver::to_string(times_considered) + "WaitAny(";
+ size_t count = activities_.size();
+ if (count > 0) {
+ if (auto* comm = dynamic_cast<kernel::activity::CommImpl*>(activities_[times_considered]))
+ res += "comm=" + XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose)
+ ? xbt::string_printf("%p", comm)
+ : "(verbose only)" + xbt::string_printf("(%d of %zu))", times_considered + 1, count);
+ } else
+ res += "comm at idx " + std::to_string(times_considered) + ")";
+ return res;
}
} // namespace actor
* Most simcalls are not visible from the MC because they don't have an observer at all. */
virtual bool is_visible() const { return true; }
virtual std::string to_string(int times_considered) const = 0;
- virtual std::string dot_label() const = 0;
+ virtual std::string dot_label(int times_considered) const = 0;
};
template <class T> class ResultingSimcall : public SimcallObserver {
T result_;
public:
- ResultingSimcall(smx_actor_t actor, T default_result) : SimcallObserver(actor), result_(default_result) {}
+ ResultingSimcall(ActorImpl* actor, T default_result) : SimcallObserver(actor), result_(default_result) {}
void set_result(T res) { result_ = res; }
T get_result() const { return result_; }
};
int next_value_ = 0;
public:
- RandomSimcall(smx_actor_t actor, int min, int max) : SimcallObserver(actor), min_(min), max_(max)
+ RandomSimcall(ActorImpl* actor, int min, int max) : SimcallObserver(actor), min_(min), max_(max)
{
xbt_assert(min < max);
}
int get_max_consider() const override;
void prepare(int times_considered) override;
std::string to_string(int times_considered) const override;
- std::string dot_label() const override;
+ std::string dot_label(int times_considered) const override;
int get_value() const { return next_value_; }
bool depends(SimcallObserver* other) override;
};
activity::MutexImpl* const mutex_;
public:
- MutexSimcall(smx_actor_t actor, activity::MutexImpl* mutex) : SimcallObserver(actor), mutex_(mutex) {}
+ MutexSimcall(ActorImpl* actor, activity::MutexImpl* mutex) : SimcallObserver(actor), mutex_(mutex) {}
activity::MutexImpl* get_mutex() const { return mutex_; }
bool depends(SimcallObserver* other) override;
};
public:
SimcallObserver* clone() override { return new MutexUnlockSimcall(get_issuer(), get_mutex()); }
std::string to_string(int times_considered) const override;
- std::string dot_label() const override;
+ std::string dot_label(int times_considered) const override;
};
class MutexLockSimcall : public MutexSimcall {
const bool blocking_;
public:
- MutexLockSimcall(smx_actor_t actor, activity::MutexImpl* mutex, bool blocking = true)
+ MutexLockSimcall(ActorImpl* actor, activity::MutexImpl* mutex, bool blocking = true)
: MutexSimcall(actor, mutex), blocking_(blocking)
{
}
SimcallObserver* clone() override { return new MutexLockSimcall(get_issuer(), get_mutex(), blocking_); }
bool is_enabled() const override;
std::string to_string(int times_considered) const override;
- std::string dot_label() const override;
+ std::string dot_label(int times_considered) const override;
};
class ConditionWaitSimcall : public ResultingSimcall<bool> {
const double timeout_;
public:
- ConditionWaitSimcall(smx_actor_t actor, activity::ConditionVariableImpl* cond, activity::MutexImpl* mutex,
+ ConditionWaitSimcall(ActorImpl* actor, activity::ConditionVariableImpl* cond, activity::MutexImpl* mutex,
double timeout = -1.0)
: ResultingSimcall(actor, false), cond_(cond), mutex_(mutex), timeout_(timeout)
{
bool is_enabled() const override;
bool is_visible() const override { return false; }
std::string to_string(int times_considered) const override;
- std::string dot_label() const override;
+ std::string dot_label(int times_considered) const override;
activity::ConditionVariableImpl* get_cond() const { return cond_; }
activity::MutexImpl* get_mutex() const { return mutex_; }
double get_timeout() const { return timeout_; }
const double timeout_;
public:
- SemAcquireSimcall(smx_actor_t actor, activity::SemaphoreImpl* sem, double timeout = -1.0)
+ SemAcquireSimcall(ActorImpl* actor, activity::SemaphoreImpl* sem, double timeout = -1.0)
: ResultingSimcall(actor, false), sem_(sem), timeout_(timeout)
{
}
bool is_enabled() const override;
bool is_visible() const override { return false; }
std::string to_string(int times_considered) const override;
- std::string dot_label() const override;
+ std::string dot_label(int times_considered) const override;
activity::SemaphoreImpl* get_sem() const { return sem_; }
double get_timeout() const { return timeout_; }
};
-class ActivityWaitSimcall : public ResultingSimcall<bool> {
+class ActivityTestSimcall : public ResultingSimcall<bool> {
activity::ActivityImpl* const activity_;
- const double timeout_;
public:
- ActivityWaitSimcall(smx_actor_t actor, activity::ActivityImpl* activity, double timeout)
- : ResultingSimcall(actor, false), activity_(activity), timeout_(timeout)
+ ActivityTestSimcall(ActorImpl* actor, activity::ActivityImpl* activity)
+ : ResultingSimcall(actor, true), activity_(activity)
{
}
- SimcallObserver* clone() override { return new ActivityWaitSimcall(get_issuer(), activity_, timeout_); }
- bool is_visible() const override { return false; }
- std::string to_string(int times_considered) const override { return SimcallObserver::to_string(times_considered); }
- std::string dot_label() const override { return SimcallObserver::dot_label(); }
+ SimcallObserver* clone() override { return new ActivityTestSimcall(get_issuer(), activity_); }
+ bool is_visible() const override { return true; }
+ std::string to_string(int times_considered) const override;
+ std::string dot_label(int times_considered) const override;
activity::ActivityImpl* get_activity() const { return activity_; }
- double get_timeout() const { return timeout_; }
};
-class ExecutionWaitanySimcall : public ResultingSimcall<ssize_t> {
- const std::vector<activity::ExecImpl*>& execs_;
+class ActivityTestanySimcall : public ResultingSimcall<ssize_t> {
+ const std::vector<activity::ActivityImpl*>& activities_;
+
+public:
+ ActivityTestanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities)
+ : ResultingSimcall(actor, -1), activities_(activities)
+ {
+ }
+ SimcallObserver* clone() override { return new ActivityTestanySimcall(get_issuer(), activities_); }
+ bool is_visible() const override { return true; }
+ int get_max_consider() const override;
+ std::string to_string(int times_considered) const override;
+ std::string dot_label(int times_considered) const override;
+ const std::vector<activity::ActivityImpl*>& get_activities() const { return activities_; }
+};
+
+class ActivityWaitSimcall : public ResultingSimcall<bool> {
+ activity::ActivityImpl* activity_;
const double timeout_;
public:
- ExecutionWaitanySimcall(smx_actor_t actor, const std::vector<activity::ExecImpl*>& execs, double timeout)
- : ResultingSimcall(actor, -1), execs_(execs), timeout_(timeout)
+ ActivityWaitSimcall(ActorImpl* actor, activity::ActivityImpl* activity, double timeout)
+ : ResultingSimcall(actor, false), activity_(activity), timeout_(timeout)
{
}
- SimcallObserver* clone() override { return new ExecutionWaitanySimcall(get_issuer(), execs_, timeout_); }
- bool is_visible() const override { return false; }
+ SimcallObserver* clone() override { return new ActivityWaitSimcall(get_issuer(), activity_, timeout_); }
+ bool is_visible() const override { return true; }
+ bool is_enabled() const override;
std::string to_string(int times_considered) const override;
- std::string dot_label() const override;
- const std::vector<activity::ExecImpl*>& get_execs() const { return execs_; }
+ std::string dot_label(int times_considered) const override;
+ activity::ActivityImpl* get_activity() const { return activity_; }
+ void set_activity(activity::ActivityImpl* activity) { activity_ = activity; }
double get_timeout() const { return timeout_; }
};
-class IoWaitanySimcall : public ResultingSimcall<ssize_t> {
- const std::vector<activity::IoImpl*>& ios_;
+class ActivityWaitanySimcall : public ResultingSimcall<ssize_t> {
+ const std::vector<activity::ActivityImpl*>& activities_;
const double timeout_;
public:
- IoWaitanySimcall(smx_actor_t actor, const std::vector<activity::IoImpl*>& ios, double timeout)
- : ResultingSimcall(actor, -1), ios_(ios), timeout_(timeout)
+ ActivityWaitanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities, double timeout)
+ : ResultingSimcall(actor, -1), activities_(activities), timeout_(timeout)
{
}
- SimcallObserver* clone() override { return new IoWaitanySimcall(get_issuer(), ios_, timeout_); }
- bool is_visible() const override { return false; }
+ SimcallObserver* clone() override { return new ActivityWaitanySimcall(get_issuer(), activities_, timeout_); }
+ bool is_enabled() const override;
+ bool is_visible() const override { return true; }
+ int get_max_consider() const override;
std::string to_string(int times_considered) const override;
- std::string dot_label() const override;
- const std::vector<activity::IoImpl*>& get_ios() const { return ios_; }
+ std::string dot_label(int times_considered) const override;
+ const std::vector<activity::ActivityImpl*>& get_activities() const { return activities_; }
double get_timeout() const { return timeout_; }
};
+
} // namespace actor
} // namespace kernel
} // namespace simgrid
return XBT_LOG_ISENABLED(Api, xbt_log_priority_verbose) ? std::to_string(buff_size) : "(verbose only)";
}
-static void simcall_translate(smx_simcall_t req,
- simgrid::mc::Remote<simgrid::kernel::activity::CommImpl>& buffered_comm);
+static void simcall_translate(smx_simcall_t req, Remote<kernel::activity::CommImpl>& buffered_comm);
static bool request_is_enabled_by_idx(const RemoteProcess& process, smx_simcall_t req, unsigned int idx)
{
- kernel::activity::CommImpl* remote_act = nullptr;
+ kernel::activity::ActivityImpl* remote_act = nullptr;
+ if (auto wait = dynamic_cast<kernel::actor::ActivityWaitSimcall*>(req->observer_))
+ /* FIXME: check also that src and dst processes are not suspended */
+ remote_act = wait->get_activity();
+ else if (auto waitany = dynamic_cast<kernel::actor::ActivityWaitanySimcall*>(req->observer_))
+ remote_act = waitany->get_activities().at(idx);
+ else if (auto testany = dynamic_cast<kernel::actor::ActivityTestanySimcall*>(req->observer_))
+ remote_act = testany->get_activities().at(idx);
+
switch (req->call_) {
case Simcall::COMM_WAIT:
- /* FIXME: check also that src and dst processes are not suspended */
- remote_act = simcall_comm_wait__getraw__comm(req);
- break;
-
case Simcall::COMM_WAITANY:
- remote_act = process.read(remote(simcall_comm_waitany__get__comms(req) + idx));
- break;
-
case Simcall::COMM_TESTANY:
- remote_act = process.read(remote(simcall_comm_testany__get__comms(req) + idx));
break;
-
default:
return true;
}
Remote<kernel::activity::CommImpl> temp_comm;
- process.read(temp_comm, remote(remote_act));
+ process.read(temp_comm, remote(static_cast<kernel::activity::CommImpl*>(remote_act)));
const kernel::activity::CommImpl* comm = temp_comm.get_buffer();
return comm->src_actor_.get() && comm->dst_actor_.get();
}
}
}
-simgrid::kernel::activity::CommImpl* Api::get_comm_or_nullptr(smx_simcall_t const r) const
+kernel::activity::CommImpl* Api::get_comm_or_nullptr(smx_simcall_t const r) const
{
- if (r->call_ == Simcall::COMM_WAIT)
- return simcall_comm_wait__get__comm(r);
- if (r->call_ == Simcall::COMM_TEST)
- return simcall_comm_test__get__comm(r);
+ if (auto wait = dynamic_cast<kernel::actor::ActivityWaitSimcall*>(r->observer_))
+ return static_cast<kernel::activity::CommImpl*>(wait->get_activity());
+ if (auto test = dynamic_cast<kernel::actor::ActivityTestSimcall*>(r->observer_))
+ return static_cast<kernel::activity::CommImpl*>(test->get_activity());
return nullptr;
}
break;
}
- case Simcall::COMM_WAIT: {
- simgrid::kernel::activity::CommImpl* remote_act = simcall_comm_wait__get__comm(req);
- if (value == -1) {
- type = "WaitTimeout";
- args = "comm=" + pointer_to_string(remote_act);
- } else {
- type = "Wait";
-
- simgrid::mc::Remote<simgrid::kernel::activity::CommImpl> temp_activity;
- const simgrid::kernel::activity::CommImpl* act;
- mc_model_checker->get_remote_process().read(temp_activity, remote(remote_act));
- act = temp_activity.get_buffer();
-
- smx_actor_t src_proc =
- mc_model_checker->get_remote_process().resolve_actor(simgrid::mc::remote(act->src_actor_.get()));
- smx_actor_t dst_proc =
- mc_model_checker->get_remote_process().resolve_actor(simgrid::mc::remote(act->dst_actor_.get()));
- args = "comm=" + pointer_to_string(remote_act);
- args += " [" + get_actor_string(src_proc) + "-> " + get_actor_string(dst_proc) + "]";
- }
- break;
- }
-
- case Simcall::COMM_TEST: {
- simgrid::kernel::activity::CommImpl* remote_act = simcall_comm_test__get__comm(req);
- simgrid::mc::Remote<simgrid::kernel::activity::CommImpl> temp_activity;
- const simgrid::kernel::activity::CommImpl* act;
- mc_model_checker->get_remote_process().read(temp_activity, remote(remote_act));
- act = temp_activity.get_buffer();
-
- if (act->src_actor_.get() == nullptr || act->dst_actor_.get() == nullptr) {
- type = "Test FALSE";
- args = "comm=" + pointer_to_string(remote_act);
- } else {
- type = "Test TRUE";
-
- smx_actor_t src_proc =
- mc_model_checker->get_remote_process().resolve_actor(simgrid::mc::remote(act->src_actor_.get()));
- smx_actor_t dst_proc =
- mc_model_checker->get_remote_process().resolve_actor(simgrid::mc::remote(act->dst_actor_.get()));
- args = "comm=" + pointer_to_string(remote_act);
- args += " [" + get_actor_string(src_proc) + " -> " + get_actor_string(dst_proc) + "]";
- }
- break;
- }
-
- case Simcall::COMM_WAITANY: {
- type = "WaitAny";
- size_t count = simcall_comm_waitany__get__count(req);
- if (count > 0) {
- simgrid::kernel::activity::CommImpl* remote_sync;
- remote_sync =
- mc_model_checker->get_remote_process().read(remote(simcall_comm_waitany__get__comms(req) + value));
- args = "comm=" + pointer_to_string(remote_sync) + xbt::string_printf("(%d of %zu)", value + 1, count);
- } else
- args = "comm at idx " + std::to_string(value);
- break;
- }
-
+ case Simcall::COMM_WAIT:
+ // See ActivityWaitSimcall::to_string(int times_considered)
+ case Simcall::COMM_TEST:
+ // See ActivityTestSimcall::to_string(int times_considered)
+ case Simcall::COMM_WAITANY:
+ // See ActivityWaitanySimcall::to_string(int times_considered)
case Simcall::COMM_TESTANY:
- if (value == -1) {
- type = "TestAny FALSE";
- args = "-";
- } else {
- type = "TestAny";
- args = xbt::string_printf("(%d of %zu)", value + 1, simcall_comm_testany__get__count(req));
- }
+ // See ActivityTestanySimcall::to_string(int times_considered)
break;
default:
break;
case Simcall::COMM_WAIT:
- if (value == -1) {
- label = "[" + get_actor_dot_label(issuer) + "] WaitTimeout";
- } else {
- kernel::activity::ActivityImpl* remote_act = simcall_comm_wait__get__comm(req);
- Remote<kernel::activity::CommImpl> temp_comm;
- mc_model_checker->get_remote_process().read(temp_comm,
- remote(static_cast<kernel::activity::CommImpl*>(remote_act)));
- const kernel::activity::CommImpl* comm = temp_comm.get_buffer();
-
- const kernel::actor::ActorImpl* src_proc =
- mc_model_checker->get_remote_process().resolve_actor(mc::remote(comm->src_actor_.get()));
- const kernel::actor::ActorImpl* dst_proc =
- mc_model_checker->get_remote_process().resolve_actor(mc::remote(comm->dst_actor_.get()));
- label = "[" + get_actor_dot_label(issuer) + "] Wait";
- label += " [(" + std::to_string(src_proc ? src_proc->get_pid() : 0) + ")";
- label += "->(" + std::to_string(dst_proc ? dst_proc->get_pid() : 0) + ")]";
- }
- break;
-
- case Simcall::COMM_TEST: {
- kernel::activity::ActivityImpl* remote_act = simcall_comm_test__get__comm(req);
- Remote<simgrid::kernel::activity::CommImpl> temp_comm;
- mc_model_checker->get_remote_process().read(temp_comm,
- remote(static_cast<kernel::activity::CommImpl*>(remote_act)));
- const kernel::activity::CommImpl* comm = temp_comm.get_buffer();
- if (comm->src_actor_.get() == nullptr || comm->dst_actor_.get() == nullptr) {
- label = "[" + get_actor_dot_label(issuer) + "] Test FALSE";
- } else {
- label = "[" + get_actor_dot_label(issuer) + "] Test TRUE";
- }
- break;
- }
-
+ // See ActivityWaitSimcall::dot_label(int times_considered)
+ case Simcall::COMM_TEST:
+ // See ActivityTestSimcall::dot_label(int times_considered)
case Simcall::COMM_WAITANY:
- label = "[" + get_actor_dot_label(issuer) + "] WaitAny";
- label += xbt::string_printf(" [%d of %zu]", value + 1, simcall_comm_waitany__get__count(req));
- break;
-
+ // See ActivityWaittanySimcall::dot_label(int times_considered)
case Simcall::COMM_TESTANY:
- if (value == -1) {
- label = "[" + get_actor_dot_label(issuer) + "] TestAny FALSE";
- } else {
- label = "[" + get_actor_dot_label(issuer) + "] TestAny TRUE";
- label += xbt::string_printf(" [%d of %zu]", value + 1, simcall_comm_testany__get__count(req));
- }
+ // See ActivityTestanySimcall::dot_label(int times_considered)
break;
default:
simgrid::kernel::activity::CommImpl* get_comm_or_nullptr(smx_simcall_t const r) const;
bool request_depend_asymmetric(smx_simcall_t r1, smx_simcall_t r2) const;
simgrid::mc::ActorInformation* actor_info_cast(smx_actor_t actor) const;
+
+public:
std::string get_actor_string(smx_actor_t actor) const;
std::string get_actor_dot_label(smx_actor_t actor) const;
-public:
// No copy:
Api(Api const&) = delete;
void operator=(Api const&) = delete;
* - if the wait will succeed immediately (if both peer of the comm are there already or if the mutex is available)
* - if a timeout is provided, because we can fire the timeout if the transition is not ready without blocking in this
* transition for ever.
- *
+ * This is controlled in the is_enabled() method of the corresponding observers.
*/
// Called from both MCer and MCed:
bool actor_is_enabled(smx_actor_t actor)
if (req->observer_ != nullptr)
return req->observer_->is_enabled();
- using simix::Simcall;
- switch (req->call_) {
- case Simcall::NONE:
- return false;
-
- case Simcall::COMM_WAIT: {
- /* FIXME: check also that src and dst processes are not suspended */
- const kernel::activity::CommImpl* act = simcall_comm_wait__getraw__comm(req);
-
- if (act->src_timeout_ || act->dst_timeout_) {
- /* If it has a timeout it will be always be enabled (regardless of who declared the timeout),
- * because even if the communication is not ready, it can timeout and won't block. */
- if (_sg_mc_timeout == 1)
- return true;
- }
- /* On the other hand if it hasn't a timeout, check if the comm is ready.*/
- else if (act->detached() && act->src_actor_ == nullptr &&
- act->get_state() == simgrid::kernel::activity::State::READY)
- return (act->dst_actor_ != nullptr);
- return (act->src_actor_ && act->dst_actor_);
- }
-
- case Simcall::COMM_WAITANY: {
- simgrid::kernel::activity::CommImpl** comms = simcall_comm_waitany__get__comms(req);
- size_t count = simcall_comm_waitany__get__count(req);
- for (unsigned int index = 0; index < count; ++index) {
- auto const* comm = comms[index];
- if (comm->src_actor_ && comm->dst_actor_)
- return true;
- }
- return false;
- }
-
- default:
- /* The rest of the requests are always enabled */
- return true;
- }
+ if (req->call_ == simix::Simcall::NONE)
+ return false;
+ else
+ /* The rest of the requests are always enabled */
+ return true;
}
/* This is the list of requests that are visible from the checker algorithm.
return req->observer_->is_visible();
using simix::Simcall;
- return req->call_ == Simcall::COMM_ISEND || req->call_ == Simcall::COMM_IRECV || req->call_ == Simcall::COMM_WAIT ||
- req->call_ == Simcall::COMM_WAITANY || req->call_ == Simcall::COMM_TEST || req->call_ == Simcall::COMM_TESTANY;
+ return req->call_ == Simcall::COMM_ISEND || req->call_ == Simcall::COMM_IRECV;
}
}
const kernel::actor::ActorImpl* actor = kernel::actor::ActorImpl::by_pid(msg_simcall->aid);
xbt_assert(actor != nullptr, "Invalid pid %ld", msg_simcall->aid);
xbt_assert(actor->simcall_.observer_, "The transition of %s has no observer", actor->get_cname());
- std::string value = actor->simcall_.observer_->dot_label();
+ std::string value = actor->simcall_.observer_->dot_label(msg_simcall->time_considered);
// Send result:
s_mc_message_simcall_to_string_answer_t answer{MessageType::SIMCALL_TO_STRING_ANSWER, {0}};
ssize_t finished_index = -1;
/* Create the equivalent array with SIMIX objects: */
- std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
+ std::vector<simgrid::s4u::CommPtr> s_comms;
s_comms.reserve(xbt_dynar_length(comms));
msg_comm_t comm;
unsigned int cursor;
- xbt_dynar_foreach (comms, cursor, comm) {
- s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm->get_impl()));
- }
+ xbt_dynar_foreach (comms, cursor, comm)
+ s_comms.push_back(comm->s_comm);
msg_error_t status = MSG_OK;
try {
- finished_index = simcall_comm_testany(s_comms.data(), s_comms.size());
+ finished_index = simgrid::s4u::Comm::test_any(s_comms);
} catch (const simgrid::TimeoutException& e) {
finished_index = e.get_value();
status = MSG_TIMEOUT;
ssize_t finished_index = -1;
/* Create the equivalent array with SIMIX objects: */
- std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
+ std::vector<simgrid::s4u::CommPtr> s_comms;
s_comms.reserve(xbt_dynar_length(comms));
msg_comm_t comm;
unsigned int cursor;
xbt_dynar_foreach (comms, cursor, comm) {
- s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm->get_impl()));
+ s_comms.push_back(comm->s_comm);
}
msg_error_t status = MSG_OK;
try {
- finished_index = simcall_comm_waitany(s_comms.data(), s_comms.size(), -1);
+ finished_index = simgrid::s4u::Comm::wait_any_for(s_comms, -1);
} catch (const simgrid::TimeoutException& e) {
finished_index = e.get_value();
status = MSG_TIMEOUT;
*/
double MSG_task_get_remaining_communication(const_msg_task_t task)
{
- XBT_DEBUG("calling simcall_communication_get_remains(%p)", task->comm.get());
+ XBT_DEBUG("calling s4u::Comm::get_remaining (%p)", task->comm.get());
return task->comm->get_remaining();
}
#include <simgrid/Exception.hpp>
#include <simgrid/s4u/Activity.hpp>
+#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Engine.hpp>
#include <simgrid/s4u/Exec.hpp>
#include <simgrid/s4u/Io.hpp>
vetoable_start();
if (state_ == State::FAILED) {
+ if (dynamic_cast<Comm*>(this))
+ throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed comm");
if (dynamic_cast<Exec*>(this))
throw HostFailureException(XBT_THROW_POINT, "Cannot wait for a failed exec");
if (dynamic_cast<Io*>(this))
if (state_ == State::INITED || state_ == State::STARTING)
this->vetoable_start();
- if (kernel::actor::simcall([this] { return this->get_impl()->test(); })) {
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityTestSimcall observer{issuer, pimpl_.get()};
+ if (kernel::actor::simcall_blocking([&observer] { observer.get_activity()->test(observer.get_issuer()); },
+ &observer)) {
complete(State::FINISHED);
return true;
}
-
return false;
}
+ssize_t Activity::test_any(const std::vector<ActivityPtr>& activities)
+{
+ std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
+ std::transform(begin(activities), end(activities), begin(ractivities),
+ [](const ActivityPtr& act) { return act->pimpl_.get(); });
+
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityTestanySimcall observer{issuer, ractivities};
+ ssize_t changed_pos = kernel::actor::simcall_blocking(
+ [&observer] { kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); },
+ &observer);
+ if (changed_pos != -1)
+ activities.at(changed_pos)->complete(State::FINISHED);
+ return changed_pos;
+}
+
+ssize_t Activity::wait_any_for(const std::vector<ActivityPtr>& activities, double timeout)
+{
+ std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
+ std::transform(begin(activities), end(activities), begin(ractivities),
+ [](const ActivityPtr& activity) { return activity->pimpl_.get(); });
+
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityWaitanySimcall observer{issuer, ractivities, timeout};
+ ssize_t changed_pos = kernel::actor::simcall_blocking(
+ [&observer] {
+ kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+ observer.get_timeout());
+ },
+ &observer);
+ if (changed_pos != -1)
+ activities.at(changed_pos)->complete(State::FINISHED);
+ return changed_pos;
+}
+
Activity* Activity::cancel()
{
kernel::actor::simcall([this] {
#include "src/kernel/activity/CommImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications");
ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
{
- std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
- std::transform(begin(comms), end(comms), begin(rcomms),
- [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
+ std::vector<ActivityPtr> activities;
+ for (const auto& comm : comms)
+ activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
ssize_t changed_pos;
try {
- changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout);
+ changed_pos = Activity::wait_any_for(activities, timeout);
} catch (const NetworkFailureException& e) {
for (auto c : comms) {
if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
}
e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
}
- if (changed_pos != -1)
- comms.at(changed_pos)->complete(State::FINISHED);
return changed_pos;
}
on_recv(*this);
pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
copy_data_function_, get_data<void>(), rate_);
-
} else {
xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
}
* Negative values denote infinite wait times. 0 as a timeout returns immediately. */
Comm* Comm::wait_for(double timeout)
{
+ XBT_DEBUG("Calling Comm::wait_for with state %s", get_state_str());
+ kernel::actor::ActorImpl* issuer = nullptr;
switch (state_) {
case State::FINISHED:
break;
case State::FAILED:
throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
-
case State::INITED:
case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
if (from_ != nullptr || to_ != nullptr) {
get_data<void>(), timeout, rate_);
}
break;
-
case State::STARTED:
try {
- simcall_comm_wait(get_impl(), timeout);
+ issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout};
+ if (kernel::actor::simcall_blocking(
+ [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
+ &observer)) {
+ throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+ }
} catch (const NetworkFailureException& e) {
+ issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr
complete(State::FAILED);
e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
}
ssize_t Comm::test_any(const std::vector<CommPtr>& comms)
{
- std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
- std::transform(begin(comms), end(comms), begin(rcomms),
- [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
- ssize_t changed_pos = simcall_comm_testany(rcomms.data(), rcomms.size());
- if (changed_pos != -1)
- comms.at(changed_pos)->complete(State::FINISHED);
- return changed_pos;
+ std::vector<ActivityPtr> activities;
+ for (const auto& comm : comms)
+ activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
+ return Activity::test_any(activities);
}
Comm* Comm::detach()
return this;
}
-bool Comm::test() // TODO: merge with Activity::test, once modernized
-{
- xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::STARTING ||
- state_ == State::CANCELED || state_ == State::FINISHED);
-
- if (state_ == State::CANCELED || state_ == State::FINISHED)
- return true;
-
- if (state_ == State::INITED || state_ == State::STARTING)
- this->vetoable_start();
-
- if (simcall_comm_test(get_impl())) {
- complete(State::FINISHED);
- return true;
- }
- return false;
-}
-
Mailbox* Comm::get_mailbox() const
{
return mailbox_;
ssize_t Exec::wait_any_for(const std::vector<ExecPtr>& execs, double timeout)
{
- std::vector<kernel::activity::ExecImpl*> rexecs(execs.size());
- std::transform(begin(execs), end(execs), begin(rexecs),
- [](const ExecPtr& exec) { return static_cast<kernel::activity::ExecImpl*>(exec->pimpl_.get()); });
-
- kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ExecutionWaitanySimcall observer{issuer, rexecs, timeout};
- ssize_t changed_pos = kernel::actor::simcall_blocking(
- [&observer] {
- kernel::activity::ExecImpl::wait_any_for(observer.get_issuer(), observer.get_execs(), observer.get_timeout());
- },
- &observer);
- if (changed_pos != -1)
- execs.at(changed_pos)->complete(State::FINISHED);
- return changed_pos;
+ std::vector<ActivityPtr> activities;
+ for (const auto& exec : execs)
+ activities.push_back(boost::dynamic_pointer_cast<Activity>(exec));
+ return Activity::wait_any_for(activities, timeout);
}
/** @brief change the execution bound
ssize_t Io::wait_any_for(const std::vector<IoPtr>& ios, double timeout)
{
- std::vector<kernel::activity::IoImpl*> rios(ios.size());
- std::transform(begin(ios), end(ios), begin(rios),
- [](const IoPtr& io) { return static_cast<kernel::activity::IoImpl*>(io->pimpl_.get()); });
-
- kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::IoWaitanySimcall observer{issuer, rios, timeout};
- ssize_t changed_pos = kernel::actor::simcall_blocking(
- [&observer] {
- kernel::activity::IoImpl::wait_any_for(observer.get_issuer(), observer.get_ios(), observer.get_timeout());
- },
- &observer);
- if (changed_pos != -1)
- ios.at(changed_pos)->complete(State::FINISHED);
- return changed_pos;
+ std::vector<ActivityPtr> activities;
+ for (const auto& io : ios)
+ activities.push_back(boost::dynamic_pointer_cast<Activity>(io));
+ return Activity::wait_any_for(activities, timeout);
}
IoPtr Io::set_disk(const_sg_disk_t disk)
#include "mc/mc.h"
#include "private.hpp"
#include "simgrid/Exception.hpp"
+#include "simgrid/s4u/ConditionVariable.hpp"
#include "simgrid/s4u/Exec.hpp"
#include "simgrid/s4u/Mutex.hpp"
-#include "simgrid/s4u/ConditionVariable.hpp"
#include "smpi_comm.hpp"
#include "smpi_datatype.hpp"
#include "smpi_host.hpp"
#include "smpi_op.hpp"
#include "src/kernel/activity/CommImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
#include "src/mc/mc_replay.hpp"
#include "src/smpi/include/smpi_actor.hpp"
if (((*request)->flags_ & (MPI_REQ_PREPARED | MPI_REQ_FINISHED)) == 0) {
if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){
try{
- *flag = simcall_comm_test((*request)->action_.get());
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()};
+ *flag = kernel::actor::simcall_blocking([&observer] { observer.get_activity()->test(observer.get_issuer()); },
+ &observer);
} catch (const Exception&) {
*flag = 0;
return ret;
int Request::testany(int count, MPI_Request requests[], int *index, int* flag, MPI_Status * status)
{
- std::vector<simgrid::kernel::activity::CommImpl*> comms;
+ std::vector<simgrid::kernel::activity::ActivityImpl*> comms;
comms.reserve(count);
*flag = 0;
std::vector<int> map; /** Maps all matching comms back to their location in requests **/
for (int i = 0; i < count; i++) {
if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
- comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
+ comms.push_back(requests[i]->action_.get());
map.push_back(i);
}
}
simgrid::s4u::this_actor::sleep_for(nsleeps * smpi_test_sleep);
ssize_t i;
try{
- i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches!
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityTestanySimcall observer{issuer, comms};
+ i = kernel::actor::simcall_blocking(
+ [&observer] { kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); },
+ &observer);
} catch (const Exception&) {
XBT_DEBUG("Exception in testany");
return 0;
if ((*request)->action_ != nullptr){
try{
// this is not a detached send
- simcall_comm_wait((*request)->action_.get(), -1.0);
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1};
+ kernel::actor::simcall_blocking(
+ [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
+ &observer);
} catch (const CancelException&) {
XBT_VERB("Request cancelled");
}
if(count > 0) {
// Wait for a request to complete
- std::vector<simgrid::kernel::activity::CommImpl*> comms;
+ std::vector<simgrid::kernel::activity::ActivityImpl*> comms;
std::vector<int> map;
XBT_DEBUG("Wait for one of %d", count);
for(int i = 0; i < count; i++) {
not(requests[i]->flags_ & MPI_REQ_FINISHED)) {
if (requests[i]->action_ != nullptr) {
XBT_DEBUG("Waiting any %p ", requests[i]);
- comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
+ comms.push_back(requests[i]->action_.get());
map.push_back(i);
} else {
// This is a finished detached request, let's return this one
XBT_DEBUG("Enter waitany for %zu comms", comms.size());
ssize_t i;
try{
- i = simcall_comm_waitany(comms.data(), comms.size(), -1);
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1};
+ i = kernel::actor::simcall_blocking(
+ [&observer] {
+ kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+ observer.get_timeout());
+ },
+ &observer);
} catch (const CancelException&) {
XBT_INFO("request cancelled");
i = -1;