this->num_workers = num_workers;
this->synchro = new_synchro(mode);
- /* Create the pool of worker threads */
+ /* Create the pool of worker threads (the caller of apply() will be worker[0]) */
this->workers[0] = nullptr;
unsigned int core_bind = 0;
for (unsigned i = 1; i < num_workers; i++) {
// BoostContextFactory
-BoostContextFactory::BoostContextFactory()
- : ContextFactory("BoostContextFactory"), parallel_(SIMIX_context_is_parallel())
-{
- BoostContext::set_maestro(nullptr);
- if (parallel_)
- SwappedContext::initialize();
-}
+BoostContextFactory::BoostContextFactory() : SwappedContextFactory("BoostContextFactory") {}
-BoostContextFactory::~BoostContextFactory()
-{
- if (parallel_)
- SwappedContext::finalize();
-}
+BoostContextFactory::~BoostContextFactory() = default;
smx_context_t BoostContextFactory::create_context(std::function<void()> code, void_pfn_smxprocess_t cleanup_func,
smx_actor_t process)
{
if (parallel_)
- return this->new_context<ParallelBoostContext>(std::move(code), cleanup_func, process);
- return this->new_context<BoostContext>(std::move(code), cleanup_func, process);
-}
-
-void BoostContextFactory::run_all()
-{
- if (parallel_)
- ParallelBoostContext::run_all();
- else
- SwappedContext::run_all();
+ return this->new_context<ParallelBoostContext>(std::move(code), cleanup_func, process, this);
+ return this->new_context<BoostContext>(std::move(code), cleanup_func, process, this);
}
// BoostContext
-BoostContext::BoostContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process)
- : SwappedContext(std::move(code), cleanup_func, process)
+BoostContext::BoostContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory)
+ : SwappedContext(std::move(code), cleanup_func, process, factory)
{
/* if the user provided a function for the process then use it, otherwise it is the context for maestro */
}
// ParallelBoostContext
-
-void ParallelBoostContext::run_all()
-{
- threads_working_ = 0;
- if (parmap_ == nullptr)
- parmap_ = new simgrid::xbt::Parmap<smx_actor_t>(SIMIX_context_get_nthreads(), SIMIX_context_get_parallel_mode());
- parmap_->apply(
- [](smx_actor_t process) {
- ParallelBoostContext* context = static_cast<ParallelBoostContext*>(process->context_);
- context->resume();
- },
- simix_global->process_to_run);
-}
-
void ParallelBoostContext::suspend()
{
boost::optional<smx_actor_t> next_work = parmap_->next();
/** @brief Userspace context switching implementation based on Boost.Context */
class BoostContext : public SwappedContext {
public:
- BoostContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process);
+ BoostContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory);
~BoostContext() override;
void stop() override;
class ParallelBoostContext : public BoostContext {
public:
- ParallelBoostContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process)
- : BoostContext(std::move(code), cleanup_func, process)
+ ParallelBoostContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory)
+ : BoostContext(std::move(code), cleanup_func, process, factory)
{
}
void suspend() override;
void resume() override;
-
- static void run_all();
};
-class BoostContextFactory : public ContextFactory {
+class BoostContextFactory : public SwappedContextFactory {
public:
BoostContextFactory();
~BoostContextFactory() override;
Context* create_context(std::function<void()> code, void_pfn_smxprocess_t cleanup, smx_actor_t process) override;
- void run_all() override;
-
-private:
- bool parallel_;
};
}}} // namespace
// RawContextFactory
-RawContextFactory::RawContextFactory() : ContextFactory("RawContextFactory"), parallel_(SIMIX_context_is_parallel())
-{
- RawContext::set_maestro(nullptr);
- if (parallel_) {
- // TODO: choose dynamically when SIMIX_context_get_parallel_threshold() > 1
- SwappedContext::initialize();
- }
-}
+RawContextFactory::RawContextFactory() : SwappedContextFactory("RawContextFactory") {}
-RawContextFactory::~RawContextFactory()
-{
- if (parallel_)
- SwappedContext::finalize();
-}
+RawContextFactory::~RawContextFactory() = default;
Context* RawContextFactory::create_context(std::function<void()> code, void_pfn_smxprocess_t cleanup_func,
smx_actor_t process)
{
if (parallel_)
- return this->new_context<ParallelRawContext>(std::move(code), cleanup_func, process);
- return this->new_context<RawContext>(std::move(code), cleanup_func, process);
-}
-
-void RawContextFactory::run_all()
-{
- if (parallel_)
- ParallelRawContext::run_all();
- else
- SwappedContext::run_all();
+ return this->new_context<ParallelRawContext>(std::move(code), cleanup_func, process, this);
+ return this->new_context<RawContext>(std::move(code), cleanup_func, process, this);
}
// RawContext
-RawContext::RawContext(std::function<void()> code, void_pfn_smxprocess_t cleanup, smx_actor_t process)
- : SwappedContext(std::move(code), cleanup, process)
+RawContext::RawContext(std::function<void()> code, void_pfn_smxprocess_t cleanup, smx_actor_t process,
+ SwappedContextFactory* factory)
+ : SwappedContext(std::move(code), cleanup, process, factory)
{
if (has_code()) {
#if PTH_STACKGROWTH == -1
*/
class RawContext : public SwappedContext {
public:
- RawContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process);
+ RawContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory);
~RawContext() override;
void stop() override;
class ParallelRawContext : public RawContext {
public:
- ParallelRawContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process)
- : RawContext(std::move(code), cleanup_func, process)
+ ParallelRawContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory)
+ : RawContext(std::move(code), cleanup_func, process, factory)
{
}
void suspend() override;
static void run_all();
};
-class RawContextFactory : public ContextFactory {
+class RawContextFactory : public SwappedContextFactory {
public:
RawContextFactory();
~RawContextFactory() override;
Context* create_context(std::function<void()> code, void_pfn_smxprocess_t cleanup, smx_actor_t process) override;
- void run_all() override;
-
-private:
- bool parallel_;
};
}}} // namespace
#include "xbt/parmap.hpp"
#include "src/kernel/context/ContextSwapped.hpp"
+#include "src/kernel/context/ContextUnix.hpp" // FIXME: temporary reverse import
#ifdef _WIN32
#include <malloc.h>
thread_local uintptr_t SwappedContext::worker_id_; /* thread-specific storage for the thread id */
std::vector<SwappedContext*> SwappedContext::workers_context_; /* space to save the worker's context in each thread */
-void SwappedContext::initialize()
+SwappedContextFactory::SwappedContextFactory(std::string name)
+ : ContextFactory(name), parallel_(SIMIX_context_is_parallel())
{
- parmap_ = nullptr;
- workers_context_.clear();
- workers_context_.resize(SIMIX_context_get_nthreads(), nullptr);
+ SwappedContext::set_maestro(nullptr);
+ SwappedContext::initialize(parallel_);
+}
+SwappedContextFactory::~SwappedContextFactory()
+{
+ SwappedContext::finalize();
+}
+/** Maestro wants to run all ready actors */
+void SwappedContextFactory::run_all()
+{
+ /* This function is called by maestro at the beginning of a scheduling round to get all working threads executing some
+ * stuff It is much easier to understand what happens if you see the working threads as bodies that swap their soul
+ * for the ones of the simulated processes that must run.
+ */
+ if (parallel_) {
+ SwappedContext::threads_working_ = 0;
+
+ // We lazily create the parmap so that all options are actually processed when doing so.
+ if (SwappedContext::parmap_ == nullptr)
+ SwappedContext::parmap_ =
+ new simgrid::xbt::Parmap<smx_actor_t>(SIMIX_context_get_nthreads(), SIMIX_context_get_parallel_mode());
+
+ // Usually, Parmap::apply() executes the provided function on all elements of the array.
+ // Here, the executed function does not return the control to the parmap before all the array is processed:
+ // - suspend() should switch back to the worker_context (either maestro or one of its minions) to return
+ // the control to the parmap. Instead, it uses parmap_->next() to steal another work, and does it directly.
+ // It only yields back to worker_context when the work array is exhausted.
+ // - So, resume() is only launched from the parmap for the first job of each minion.
+ SwappedContext::parmap_->apply(
+ [](smx_actor_t process) {
+ SwappedContext* context = static_cast<SwappedContext*>(process->context_);
+ context->resume();
+ },
+ simix_global->process_to_run);
+ } else { // sequential execution
+ if (simix_global->process_to_run.empty())
+ return;
+ smx_actor_t first_process = simix_global->process_to_run.front();
+ SwappedContext::process_index_ = 1;
+ /* execute the first process */
+ static_cast<SwappedContext*>(first_process->context_)->resume();
+ }
+}
+
+void SwappedContext::initialize(bool parallel)
+{
+ parmap_ = nullptr; // will be created lazily with the right parameters if needed (ie, in parallel)
+ if (parallel) {
+ workers_context_.clear();
+ workers_context_.resize(SIMIX_context_get_nthreads(), nullptr);
+ }
}
void SwappedContext::finalize()
SwappedContext* SwappedContext::maestro_context_ = nullptr;
-SwappedContext::SwappedContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process)
- : Context(std::move(code), cleanup_func, process)
+SwappedContext::SwappedContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory)
+ : Context(std::move(code), cleanup_func, process), factory_(factory)
{
if (has_code()) {
if (smx_context_guard_size > 0 && not MC_is_active()) {
SwappedContext::~SwappedContext()
{
- if (stack_ == nullptr)
+ if (stack_ == nullptr) // maestro has no extra stack
return;
#if HAVE_VALGRIND_H
xbt_free(stack_);
}
-/** Maestro wants to run all read_to_run actors */
-void SwappedContext::run_all()
-{
- if (simix_global->process_to_run.empty())
- return;
- smx_actor_t first_process = simix_global->process_to_run.front();
- process_index_ = 1;
- /* execute the first process */
- static_cast<SwappedContext*>(first_process->context_)->resume();
-}
-
/** Maestro wants to yield back to a given actor */
void SwappedContext::resume()
{
- // Maestro is always the calling thread of this function (ie, self() == maestro)
- SwappedContext* old = static_cast<SwappedContext*>(self());
- Context::set_current(this);
- old->swap_into(this);
+ if (factory_->parallel_) {
+ THROW_IMPOSSIBLE;
+ } else { // sequential execution
+ // Maestro is always the calling thread of this function (ie, self() == maestro)
+ SwappedContext* old = static_cast<SwappedContext*>(self());
+ Context::set_current(this);
+ old->swap_into(this);
+ }
}
/** The actor wants to yield back to maestro */
void SwappedContext::suspend()
{
- /* determine the next context */
- SwappedContext* next_context;
- unsigned long int i = process_index_;
- process_index_++;
-
- if (i < simix_global->process_to_run.size()) {
- /* Actually swap into the next actor directly without transiting to maestro */
- XBT_DEBUG("Run next process");
- next_context = static_cast<SwappedContext*>(simix_global->process_to_run[i]->context_);
- } else {
- /* all processes were run, actually return to maestro */
- XBT_DEBUG("No more process to run");
- next_context = static_cast<SwappedContext*>(get_maestro());
+ if (factory_->parallel_) {
+ THROW_IMPOSSIBLE;
+ } else { // sequential execution
+ /* determine the next context */
+ SwappedContext* next_context;
+ unsigned long int i = process_index_;
+ process_index_++;
+
+ if (i < simix_global->process_to_run.size()) {
+ /* Actually swap into the next actor directly without transiting to maestro */
+ XBT_DEBUG("Run next process");
+ next_context = static_cast<SwappedContext*>(simix_global->process_to_run[i]->context_);
+ } else {
+ /* all processes were run, actually return to maestro */
+ XBT_DEBUG("No more process to run");
+ next_context = static_cast<SwappedContext*>(get_maestro());
+ }
+ Context::set_current(next_context);
+ this->swap_into(next_context);
}
- Context::set_current(next_context);
- this->swap_into(next_context);
}
} // namespace context
namespace simgrid {
namespace kernel {
namespace context {
+class SwappedContext;
+
+class SwappedContextFactory : public ContextFactory {
+ friend SwappedContext; // Reads whether we are in parallel mode
+public:
+ SwappedContextFactory(std::string name);
+ ~SwappedContextFactory() override;
+ void run_all() override;
+
+protected: // FIXME temporary internal exposure
+ bool parallel_;
+};
class SwappedContext : public Context {
public:
- SwappedContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process);
+ SwappedContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory);
virtual ~SwappedContext();
- static void initialize(); // Initialize the module, using the options
+ static void initialize(bool parallel); // Initialize the module, using the options
static void finalize(); // Finalize the module
virtual void suspend();
virtual void resume();
- static void run_all();
-
virtual void swap_into(SwappedContext* to) = 0; // Defined in subclasses
static SwappedContext* get_maestro() { return maestro_context_; }
static void set_maestro(SwappedContext* maestro) { maestro_context_ = maestro; }
-protected:
- void* stack_ = nullptr; /* the thread stack */
+ static unsigned long process_index_; // FIXME killme
/* For the parallel execution */
static simgrid::xbt::Parmap<smx_actor_t>* parmap_;
static std::atomic<uintptr_t> threads_working_;
static thread_local uintptr_t worker_id_;
+protected:
+ void* stack_ = nullptr; /* the thread stack */
+
private:
- static unsigned long process_index_;
static SwappedContext* maestro_context_;
+ /* For sequential and parallel run_all() */
+ SwappedContextFactory* factory_;
};
} // namespace context
// UContextFactory
-UContextFactory::UContextFactory() : ContextFactory("UContextFactory"), parallel_(SIMIX_context_is_parallel())
-{
- UContext::set_maestro(nullptr);
- if (parallel_)
- SwappedContext::initialize();
-}
-
-UContextFactory::~UContextFactory()
-{
- if (parallel_)
- SwappedContext::finalize();
-}
+UContextFactory::UContextFactory() : SwappedContextFactory("UContextFactory") {}
+UContextFactory::~UContextFactory() = default;
Context* UContextFactory::create_context(std::function<void()> code, void_pfn_smxprocess_t cleanup, smx_actor_t process)
{
if (parallel_)
- return new_context<ParallelUContext>(std::move(code), cleanup, process);
+ return new_context<ParallelUContext>(std::move(code), cleanup, process, this);
else
- return new_context<UContext>(std::move(code), cleanup, process);
+ return new_context<UContext>(std::move(code), cleanup, process, this);
}
-/* This function is called by maestro at the beginning of a scheduling round to get all working threads executing some
- * stuff It is much easier to understand what happens if you see the working threads as bodies that swap their soul for
- * the ones of the simulated processes that must run.
- */
-void UContextFactory::run_all()
-{
- if (parallel_)
- ParallelUContext::run_all();
- else
- SwappedContext::run_all();
-}
// UContext
-UContext::UContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process)
- : SwappedContext(std::move(code), cleanup_func, process)
+UContext::UContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory)
+ : SwappedContext(std::move(code), cleanup_func, process, factory)
{
/* if the user provided a function for the process then use it, otherwise it is the context for maestro */
if (has_code()) {
void ParallelUContext::run_all()
{
- threads_working_ = 0;
-
- // We lazily create the parmap so that all options are actually processed when doing so.
- if (parmap_ == nullptr)
- parmap_ = new simgrid::xbt::Parmap<smx_actor_t>(SIMIX_context_get_nthreads(), SIMIX_context_get_parallel_mode());
-
- // Usually, Parmap::apply() executes the provided function on all elements of the array.
- // Here, the executed function does not return the control to the parmap before all the array is processed:
- // - suspend() should switch back to the worker_context (either maestro or one of its minions) to return
- // the control to the parmap. Instead, it uses parmap_->next() to steal another work, and does it directly.
- // It only yields back to worker_context when the work array is exhausted.
- // - So, resume() is only launched from the parmap for the first job of each minion.
- parmap_->apply(
- [](smx_actor_t process) {
- ParallelUContext* context = static_cast<ParallelUContext*>(process->context_);
- context->resume();
- },
- simix_global->process_to_run);
}
/** Run one particular simulated process on the current thread.
class UContext : public SwappedContext {
public:
- UContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process);
+ UContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory);
~UContext() override;
void stop() override;
class ParallelUContext : public UContext {
public:
- ParallelUContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process)
- : UContext(std::move(code), cleanup_func, process)
+ ParallelUContext(std::function<void()> code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process,
+ SwappedContextFactory* factory)
+ : UContext(std::move(code), cleanup_func, process, factory)
{
}
void suspend() override;
static void run_all();
};
-class UContextFactory : public ContextFactory {
+class UContextFactory : public SwappedContextFactory {
public:
UContextFactory();
~UContextFactory() override;
- Context* create_context(std::function<void()> code, void_pfn_smxprocess_t cleanup, smx_actor_t process) override;
- void run_all() override;
-private:
- bool parallel_;
+ Context* create_context(std::function<void()> code, void_pfn_smxprocess_t cleanup, smx_actor_t process) override;
};
}}} // namespace