From: Gabriel Corona Date: Tue, 19 Jan 2016 13:49:13 +0000 (+0100) Subject: [simix] Attach context X-Git-Tag: v3_13~906 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/ebbf93ef2f2d1164e90a579d18328a570fd67acb [simix] Attach context This is expected ot be used in order to use the main caller/thread as one of the simulated function. This can be used in order to expose an API where the maestro is hidden and the main thread is the main process and can create other simulated processes. --- diff --git a/examples/msg/sendrecv/CMakeLists.txt b/examples/msg/sendrecv/CMakeLists.txt index fbfa614149..cfd35f8008 100644 --- a/examples/msg/sendrecv/CMakeLists.txt +++ b/examples/msg/sendrecv/CMakeLists.txt @@ -1,9 +1,11 @@ set(EXECUTABLE_OUTPUT_PATH "${CMAKE_CURRENT_BINARY_DIR}") add_executable(sendrecv sendrecv.c) +add_executable(sendrecv_main sendrecv_main.c) ### Add definitions for compile target_link_libraries(sendrecv simgrid ) +target_link_libraries(sendrecv_main simgrid ) set(tesh_files ${tesh_files} @@ -16,11 +18,13 @@ set(tesh_files set(xml_files ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/deployment_sendrecv.xml + ${CMAKE_CURRENT_SOURCE_DIR}/deployment_sendrecv_main.xml PARENT_SCOPE ) set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/sendrecv.c + ${CMAKE_CURRENT_SOURCE_DIR}/sendrecv_main.c PARENT_SCOPE ) set(bin_files diff --git a/examples/msg/sendrecv/deployment_sendrecv_main.xml b/examples/msg/sendrecv/deployment_sendrecv_main.xml new file mode 100644 index 0000000000..5b5309735a --- /dev/null +++ b/examples/msg/sendrecv/deployment_sendrecv_main.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/examples/msg/sendrecv/sendrecv_CLM03_main.tesh b/examples/msg/sendrecv/sendrecv_CLM03_main.tesh new file mode 100644 index 0000000000..8cfd78411c --- /dev/null +++ b/examples/msg/sendrecv/sendrecv_CLM03_main.tesh @@ -0,0 +1,21 @@ +#! ./tesh + +p Testing the deprecated CM02 network model + +! output sort 19 +$ $SG_TEST_EXENV sendrecv/sendrecv_main$EXEEXT ${srcdir:=.}/../platforms/small_platform.xml ${srcdir:=.}/sendrecv/deployment_sendrecv_main.xml --cfg=cpu/model:Cas01 --cfg=network/model:CM02 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" +> [ 0.000000] (0:maestro@) Configuration change: Set 'cpu/model' to 'Cas01' +> [ 0.000000] (0:maestro@) Configuration change: Set 'network/model' to 'CM02' +> [ 0.000000] (0:maestro@) test_all +> [ 0.000000] (1:sender@Tremblay) sender +> [ 0.000000] (1:sender@Tremblay) host = Jupiter +> [ 0.000000] (1:sender@Tremblay) task_la->data = 0.000000e+00 +> [ 0.000000] (2:receiver@Jupiter) receiver +> [ 0.001462] (2:receiver@Jupiter) Task received : latency task +> [ 0.001462] (2:receiver@Jupiter) Communic. time 1.461656e-03 +> [ 0.001462] (2:receiver@Jupiter) --- la 0.001462 ---- +> [ 0.001462] (1:sender@Tremblay) task_bw->data = 1.461656e-03 +> [138.703988] (2:receiver@Jupiter) Task received : bandwidth task +> [138.703988] (2:receiver@Jupiter) Communic. time 1.387025e+02 +> [138.703988] (2:receiver@Jupiter) --- bw 7209674.030423 ---- +> [138.703988] (0:maestro@) Total simulation time: 1.387040e+02 diff --git a/examples/msg/sendrecv/sendrecv_main.c b/examples/msg/sendrecv/sendrecv_main.c new file mode 100644 index 0000000000..684693488c --- /dev/null +++ b/examples/msg/sendrecv/sendrecv_main.c @@ -0,0 +1,201 @@ +/* Copyright (c) 2007-2015. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include + +#include "simgrid/msg.h" /* Yeah! If you want to use msg, you need to include simgrid/msg.h */ +#include "xbt/sysdep.h" /* calloc */ + +/* Create a log channel to have nice outputs. */ +#include "xbt/log.h" +#include "xbt/asserts.h" + +/** @addtogroup MSG_examples + * + * - sendrecv/sendrecv.c: Ping-pong example. It's hard to + * think of a simpler example. The tesh files laying in the + * directory are instructive concerning the way to pass options to the simulators (as described in \ref options). + */ + +XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, + "Messages specific for this msg example"); + +int sender(int argc, char *argv[]); +int receiver(int argc, char *argv[]); + +double task_comm_size_lat = 1; +double task_comm_size_bw = 10e8; + +/** Emitter function */ +int sender(int argc, char *argv[]) +{ + msg_host_t host = NULL; + double time; + msg_task_t task_la = NULL; + msg_task_t task_bw = NULL; + char sprintf_buffer_la[64]; + char sprintf_buffer_bw[64]; + + XBT_INFO("sender"); + + /*host = xbt_new0(msg_host_t,1); */ + + XBT_INFO("host = %s", argv[1]); + + host = MSG_host_by_name(argv[1]); + + if (host == NULL) { + XBT_INFO("Unknown host %s. Stopping Now! ", argv[1]); + abort(); + } + + /* Latency */ + time = MSG_get_clock(); + sprintf(sprintf_buffer_la, "latency task"); + task_la = + MSG_task_create(sprintf_buffer_la, 0.0, task_comm_size_lat, NULL); + task_la->data = xbt_new(double, 1); + *(double *) task_la->data = time; + XBT_INFO("task_la->data = %e", *((double *) task_la->data)); + MSG_task_send(task_la, argv[1]); + + /* Bandwidth */ + time = MSG_get_clock(); + sprintf(sprintf_buffer_bw, "bandwidth task"); + task_bw = + MSG_task_create(sprintf_buffer_bw, 0.0, task_comm_size_bw, NULL); + task_bw->data = xbt_new(double, 1); + *(double *) task_bw->data = time; + XBT_INFO("task_bw->data = %e", *((double *) task_bw->data)); + MSG_task_send(task_bw, argv[1]); + + return 0; +} /* end_of_client */ + +/** Receiver function */ +int receiver(int argc, char *argv[]) +{ + double time, time1, sender_time; + msg_task_t task_la = NULL; + msg_task_t task_bw = NULL; + int a; + double communication_time = 0; + + XBT_INFO("receiver"); + + /* Get Latency */ + a = MSG_task_receive(&task_la,MSG_host_get_name(MSG_host_self())); + if (a == MSG_OK) { + time1 = MSG_get_clock(); + sender_time = *((double *) (task_la->data)); + time = sender_time; + communication_time = time1 - time; + XBT_INFO("Task received : %s", task_la->name); + xbt_free(task_la->data); + MSG_task_destroy(task_la); + XBT_INFO("Communic. time %e", communication_time); + XBT_INFO("--- la %f ----", communication_time); + } else { + xbt_die("Unexpected behavior"); + } + + /* Get Bandwidth */ + a = MSG_task_receive(&task_bw,MSG_host_get_name(MSG_host_self())); + if (a == MSG_OK) { + time1 = MSG_get_clock(); + sender_time = *((double *) (task_bw->data)); + time = sender_time; + communication_time = time1 - time; + XBT_INFO("Task received : %s", task_bw->name); + xbt_free(task_bw->data); + MSG_task_destroy(task_bw); + XBT_INFO("Communic. time %e", communication_time); + XBT_INFO("--- bw %f ----", task_comm_size_bw / communication_time); + } else { + xbt_die("Unexpected behavior"); + } + + + return 0; +} /* end_of_receiver */ + +struct application { + const char* platform_file; + const char* application_file; +}; + +/** Test function */ +static msg_error_t test_all(struct application* app) +{ + msg_error_t res = MSG_OK; + + XBT_INFO("test_all"); + + /* Simulation setting */ + MSG_create_environment(app->platform_file); + + /* Become one of the simulated process. + * + * This must be done after the creation of the platform + * because we are depending attaching to a host.*/ + MSG_process_attach("sender", NULL, MSG_host_by_name("Tremblay"), NULL); + + /* Application deployment */ + MSG_function_register("receiver", receiver); + + MSG_launch_application(app->application_file); + + // Execute the sender code: + const char* argv[3] = { "sender", "Jupiter", NULL }; + sender(2, (char**) argv); + + MSG_process_detach(); + return res; +} /* end_of_test_all */ + +static +void maestro(void* data) +{ + // struct application* app = (struct application*) data; + MSG_main(); +} + +/** Main function */ +int main(int argc, char *argv[]) +{ + msg_error_t res = MSG_OK; + +#ifdef _MSC_VER + unsigned int prev_exponent_format = + _set_output_format(_TWO_DIGIT_EXPONENT); +#endif + + struct application app; + app.platform_file = argv[1]; + app.application_file = argv[2]; + + SIMIX_set_maestro(maestro, &app); + MSG_init(&argc, argv); + + if (argc != 3) { + XBT_CRITICAL("Usage: %s platform_file deployment_file\n", + argv[0]); + xbt_die("example: %s msg_platform.xml msg_deployment.xml\n",argv[0]); + } + + res = test_all(&app); + + XBT_INFO("Total simulation time: %e", MSG_get_clock()); + +#ifdef _MSC_VER + _set_output_format(prev_exponent_format); +#endif + + if (res == MSG_OK) + return 0; + else + return 1; +} /* end_of_main */ diff --git a/include/simgrid/msg.h b/include/simgrid/msg.h index 93b95399a5..97c370b44c 100644 --- a/include/simgrid/msg.h +++ b/include/simgrid/msg.h @@ -336,6 +336,11 @@ XBT_PUBLIC(msg_process_t) MSG_process_create_with_environment(const char char **argv, xbt_dict_t properties); +XBT_PUBLIC(msg_process_t) MSG_process_attach( + const char *name, void *data, + msg_host_t host, xbt_dict_t properties); +XBT_PUBLIC(void) MSG_process_detach(void); + XBT_PUBLIC(void) MSG_process_kill(msg_process_t process); XBT_PUBLIC(int) MSG_process_killall(int reset_PIDs); XBT_PUBLIC(msg_error_t) MSG_process_migrate(msg_process_t process, msg_host_t host); diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index bd12391e93..984f1c440a 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -174,6 +174,11 @@ XBT_PUBLIC(int) SIMIX_is_maestro(); /* Initialization and exit */ XBT_PUBLIC(void) SIMIX_global_init(int *argc, char **argv); +/* Set to execute in the maestro + * + * If no maestro code is registered (the default), the main thread + * is assumed to be the maestro. */ +XBT_PUBLIC(void) SIMIX_set_maestro(void (*code)(void*), void* data); XBT_PUBLIC(void) SIMIX_function_register_process_cleanup(void_pfn_smxprocess_t function); XBT_PUBLIC(void) SIMIX_function_register_process_create(smx_creation_func_t function); @@ -210,6 +215,24 @@ XBT_PUBLIC(void) SIMIX_process_set_function(const char* process_host, double process_start_time, double process_kill_time); +/*********************************** Host *************************************/ +/* Functions for running a process in main() + * + * 1. create the maestro process + * 2. attach (create a context and wait for maestro to give control back to you) + * 3. do you process job + * 4. detach (this waits for the simulation to terminate) + */ + +XBT_PUBLIC(void) SIMIX_maestro_create(void (*code)(void*), void* data); +XBT_PUBLIC(smx_process_t) SIMIX_process_attach( + const char* name, + void *data, + const char* hostname, + xbt_dict_t properties, + smx_process_t parent_process); +XBT_PUBLIC(void) SIMIX_process_detach(void); + /*********************************** Host *************************************/ XBT_PUBLIC(sg_host_t) SIMIX_host_self(void); XBT_PUBLIC(const char*) SIMIX_host_self_get_name(void); diff --git a/include/simgrid/simix.hpp b/include/simgrid/simix.hpp index 2f19728b8d..3d3e660d28 100644 --- a/include/simgrid/simix.hpp +++ b/include/simgrid/simix.hpp @@ -79,6 +79,65 @@ typename std::result_of::type kernel(F&& code) return promise.get_future().get(); } +class args { +private: + int argc_; + char** argv_; +public: + + // Main constructors + args() : argc_(0), argv_(nullptr) {} + args(int argc, char** argv) : argc_(argc), argv_(argv) {} + + // Free + void clear() + { + for (int i = 0; i < this->argc_; i++) + free(this->argv_[i]); + free(this->argv_); + this->argc_ = 0; + this->argv_ = nullptr; + } + ~args() { clear(); } + + // Copy + args(args const& that) = delete; + args& operator=(args const& that) = delete; + + // Move: + args(args&& that) : argc_(that.argc_), argv_(that.argv_) + { + that.argc_ = 0; + that.argv_ = nullptr; + } + args& operator=(args&& that) + { + this->argc_ = that.argc_; + this->argv_ = that.argv_; + that.argc_ = 0; + that.argv_ = nullptr; + return *this; + } + + int argc() const { return argc_; } + char** argv() { return argv_; } + const char*const* argv() const { return argv_; } + char* operator[](std::size_t i) { return argv_[i]; } +}; + +inline +std::function wrap_main(xbt_main_func_t code, int argc, char **argv) +{ + if (code) { + auto arg = std::make_shared(argc, argv); + return [=]() { + code(arg->argc(), arg->argv()); + }; + } + // TODO, we should free argv + else return std::function(); +} + class Context; class ContextFactory; @@ -91,6 +150,16 @@ public: virtual ~ContextFactory(); virtual Context* create_context(std::function code, void_pfn_smxprocess_t cleanup, smx_process_t process) = 0; + + // Optional methods for attaching main() as a context: + + /** Creates a context from the current context of execution + * + * This will not work on all implementation of `ContextFactory`. + */ + virtual Context* attach(void_pfn_smxprocess_t cleanup_func, smx_process_t process); + virtual Context* create_maestro(std::function code, smx_process_t process); + virtual void run_all() = 0; virtual Context* self(); std::string const& name() const @@ -143,6 +212,26 @@ public: virtual void suspend() = 0; }; +XBT_PUBLIC_CLASS AttachContext : public Context { +public: + + AttachContext(std::function code, + void_pfn_smxprocess_t cleanup_func, + smx_process_t process) + : Context(std::move(code), cleanup_func, process) + {} + + ~AttachContext(); + + /** Called by the context when it is ready to give control + * to the maestro. + */ + virtual void attach_start() = 0; + + /** Called by the context when it has finished its job */ + virtual void attach_stop() = 0; +}; + } } diff --git a/src/msg/msg_process.cpp b/src/msg/msg_process.cpp index a941ea21ba..ab3bc9bbe0 100644 --- a/src/msg/msg_process.cpp +++ b/src/msg/msg_process.cpp @@ -181,6 +181,59 @@ msg_process_t MSG_process_create_with_environment(const char *name, return process; } +static +int MSG_maestro(int argc, char** argv) +{ + int res = MSG_main(); + return res; +} + +/* Become a process in the simulation + * + * Currently this can only be called by the main thread (once) and only work + * with some thread factories (currently ThreadContextFactory). + * + * In the future, it might be extended in order to attach other threads created + * by a third party library. + */ +msg_process_t MSG_process_attach( + const char *name, void *data, + msg_host_t host, xbt_dict_t properties) +{ + xbt_assert(host != NULL, "Invalid parameters: host and code params must not be NULL"); + simdata_process_t simdata = xbt_new0(s_simdata_process_t, 1); + msg_process_t process; + + /* Simulator data for MSG */ + simdata->waiting_action = NULL; + simdata->waiting_task = NULL; + simdata->m_host = host; + simdata->argc = 0; + simdata->argv = NULL; + simdata->data = data; + simdata->last_errno = MSG_OK; + + /* Let's create the process: SIMIX may decide to start it right now, + * even before returning the flow control to us */ + process = SIMIX_process_attach(name, simdata, sg_host_get_name(host), properties, NULL); + if (!process) + xbt_die("Could not attach"); + simcall_process_on_exit(process,(int_f_pvoid_pvoid_t)TRACE_msg_process_kill,process); + return process; +} + +/** Detach a process attached with `MSG_process_attach()` + * + * This is called when the current process has finished its job. + * Used in the main thread, it waits for the simulation to finish before + * returning. When it returns, the other simulated processes and the maestro + * are destroyed. + */ +void MSG_process_detach(void) +{ + SIMIX_process_detach(); +} + /** \ingroup m_process_management * \param process poor victim * diff --git a/src/simix/Context.cpp b/src/simix/Context.cpp index 7115e3ab37..4c478a2d20 100644 --- a/src/simix/Context.cpp +++ b/src/simix/Context.cpp @@ -15,6 +15,7 @@ #include "mc/mc.h" #include +#include void SIMIX_process_set_cleanup_function( smx_process_t process, void_pfn_smxprocess_t cleanup) @@ -22,69 +23,6 @@ void SIMIX_process_set_cleanup_function( process->context->set_cleanup(cleanup); } -namespace simgrid { -namespace simix { - -class XBT_PRIVATE args { -private: - int argc_; - char** argv_; -public: - - // Main constructors - args() : argc_(0), argv_(nullptr) {} - args(int argc, char** argv) : argc_(argc), argv_(argv) {} - - // Free - void clear() - { - for (int i = 0; i < this->argc_; i++) - free(this->argv_[i]); - free(this->argv_); - this->argc_ = 0; - this->argv_ = nullptr; - } - ~args() { clear(); } - - // Copy - args(args const& that) = delete; - args& operator=(args const& that) = delete; - - // Move: - args(args&& that) : argc_(that.argc_), argv_(that.argv_) - { - that.argc_ = 0; - that.argv_ = nullptr; - } - args& operator=(args&& that) - { - this->argc_ = that.argc_; - this->argv_ = that.argv_; - that.argc_ = 0; - that.argv_ = nullptr; - return *this; - } - - int argc() const { return argc_; } - char** argv() { return argv_; } - const char*const* argv() const { return argv_; } - char* operator[](std::size_t i) { return argv_[i]; } -}; - -} -} - -static -std::function wrap_main(xbt_main_func_t code, int argc, char **argv) -{ - if (code) { - auto arg = std::make_shared(argc, argv); - return [=]() { - code(arg->argc(), arg->argv()); - }; - } else return std::function(); -} - /** * \brief creates a new context for a user level process * \param code a main function @@ -101,7 +39,7 @@ smx_context_t SIMIX_context_new( if (!simix_global) xbt_die("simix is not initialized, please call MSG_init first"); return simix_global->context_factory->create_context( - wrap_main(code, argc, argv), cleanup_func, simix_process); + simgrid::simix::wrap_main(code, argc, argv), cleanup_func, simix_process); } namespace simgrid { @@ -125,6 +63,18 @@ void ContextFactory::declare_context(void* context, std::size_t size) #endif } +Context* ContextFactory::attach(void_pfn_smxprocess_t cleanup_func, smx_process_t process) +{ + xbt_die("Cannot attach with this ContextFactory.\n" + "Try using --cfg=contexts/factory:thread instead.\n"); +} + +Context* ContextFactory::create_maestro(std::function code, smx_process_t process) +{ + xbt_die("Cannot create_maestro with this ContextFactory.\n" + "Try using --cfg=contexts/factory:thread instead.\n"); +} + Context::Context(std::function code, void_pfn_smxprocess_t cleanup_func, smx_process_t process) : code_(std::move(code)), process_(process), iwannadie(false) @@ -153,5 +103,9 @@ void Context::stop() this->iwannadie = true; } +AttachContext::~AttachContext() +{ +} + } } \ No newline at end of file diff --git a/src/simix/ThreadContext.cpp b/src/simix/ThreadContext.cpp index c3a3c0099d..c84dc52918 100644 --- a/src/simix/ThreadContext.cpp +++ b/src/simix/ThreadContext.cpp @@ -52,35 +52,7 @@ ThreadContext* ThreadContextFactory::create_context( std::function code, void_pfn_smxprocess_t cleanup, smx_process_t process) { - return this->new_context(std::move(code), cleanup, process); -} - -ThreadContext::ThreadContext(std::function code, - void_pfn_smxprocess_t cleanup, smx_process_t process) - : Context(std::move(code), cleanup, process) -{ - /* If the user provided a function for the process then use it - otherwise is the context for maestro */ - if (has_code()) { - this->begin_ = xbt_os_sem_init(0); - this->end_ = xbt_os_sem_init(0); - if (smx_context_stack_size_was_set) - xbt_os_thread_setstacksize(smx_context_stack_size); - if (smx_context_guard_size_was_set) - xbt_os_thread_setguardsize(smx_context_guard_size); - - /* create and start the process */ - /* NOTE: The first argument to xbt_os_thread_create used to be the process * - * name, but now the name is stored at SIMIX level, so we pass a null */ - this->thread_ = - xbt_os_thread_create(NULL, ThreadContext::wrapper, this, this); - - /* wait the starting of the newly created process */ - xbt_os_sem_acquire(this->end_); - - } else { - xbt_os_thread_set_extra_data(this); - } + return this->new_context(std::move(code), cleanup, process, !code); } void ThreadContextFactory::run_all() @@ -111,6 +83,51 @@ ThreadContext* ThreadContextFactory::self() return static_cast(xbt_os_thread_get_extra_data()); } +ThreadContext* ThreadContextFactory::attach(void_pfn_smxprocess_t cleanup_func, smx_process_t process) +{ + return this->new_context( + std::function(), cleanup_func, process, false); +} + +ThreadContext* ThreadContextFactory::create_maestro(std::function code, smx_process_t process) +{ + return this->new_context(std::move(code), nullptr, process, true); +} + +ThreadContext::ThreadContext(std::function code, + void_pfn_smxprocess_t cleanup, smx_process_t process, bool maestro) + : AttachContext(std::move(code), cleanup, process) +{ + // We do not need the semaphores when maestro is in main: + // if (!(maestro && !code)) { + this->begin_ = xbt_os_sem_init(0); + this->end_ = xbt_os_sem_init(0); + // } + + /* If the user provided a function for the process then use it */ + if (has_code()) { + if (smx_context_stack_size_was_set) + xbt_os_thread_setstacksize(smx_context_stack_size); + if (smx_context_guard_size_was_set) + xbt_os_thread_setguardsize(smx_context_guard_size); + + /* create and start the process */ + /* NOTE: The first argument to xbt_os_thread_create used to be the process * + * name, but now the name is stored at SIMIX level, so we pass a null */ + this->thread_ = + xbt_os_thread_create(NULL, + maestro ? ThreadContext::maestro_wrapper : ThreadContext::wrapper, + this, this); + /* wait the starting of the newly created process */ + xbt_os_sem_acquire(this->end_); + } + + /* Otherwise, we attach to the current thread */ + else { + xbt_os_thread_set_extra_data(this); + } +} + ThreadContext::~ThreadContext() { /* check if this is the context of maestro (it doesn't have a real thread) */ @@ -138,15 +155,49 @@ void *ThreadContext::wrapper(void *param) #endif /* Tell the maestro we are starting, and wait for its green light */ xbt_os_sem_release(context->end_); + xbt_os_sem_acquire(context->begin_); if (smx_ctx_thread_sem) /* parallel run */ xbt_os_sem_acquire(smx_ctx_thread_sem); (*context)(); context->stop(); + + return nullptr; +} + +void *ThreadContext::maestro_wrapper(void *param) +{ + ThreadContext* context = static_cast(param); + +#ifndef WIN32 + /* Install alternate signal stack, for SIGSEGV handler. */ + stack_t stack; + stack.ss_sp = sigsegv_stack; + stack.ss_size = sizeof sigsegv_stack; + stack.ss_flags = 0; + sigaltstack(&stack, nullptr); +#endif + /* Tell the caller we are starting */ + xbt_os_sem_release(context->end_); + + // Wait for the caller to give control back to us: + xbt_os_sem_acquire(context->begin_); + (*context)(); + + // Tell main that we have finished: + xbt_os_sem_release(context->end_); + return nullptr; } +void ThreadContext::start() +{ + xbt_os_sem_acquire(this->begin_); + if (smx_ctx_thread_sem) /* parallel run */ + xbt_os_sem_acquire(smx_ctx_thread_sem); +} + void ThreadContext::stop() { Context::stop(); @@ -169,5 +220,25 @@ void ThreadContext::suspend() xbt_os_sem_acquire(smx_ctx_thread_sem); } +void ThreadContext::attach_start() +{ + // We're breaking the layers here by depending on the upper layer: + ThreadContext* maestro = (ThreadContext*) simix_global->maestro_process->context; + xbt_os_sem_release(maestro->begin_); + this->start(); +} + +void ThreadContext::attach_stop() +{ + if (smx_ctx_thread_sem) + xbt_os_sem_release(smx_ctx_thread_sem); + xbt_os_sem_release(this->end_); + + ThreadContext* maestro = (ThreadContext*) simix_global->maestro_process->context; + xbt_os_sem_acquire(maestro->end_); + + xbt_os_thread_set_extra_data(nullptr); +} + } } diff --git a/src/simix/ThreadContext.hpp b/src/simix/ThreadContext.hpp index d706a339b1..abed18789f 100644 --- a/src/simix/ThreadContext.hpp +++ b/src/simix/ThreadContext.hpp @@ -18,15 +18,17 @@ namespace simix { class ThreadContext; class ThreadContextFactory; -class ThreadContext : public Context { +class ThreadContext : public AttachContext { public: friend ThreadContextFactory; ThreadContext(std::function code, void_pfn_smxprocess_t cleanup_func, - smx_process_t process); + smx_process_t process, bool maestro =false); ~ThreadContext(); void stop() override; void suspend() override; + void attach_start() override; + void attach_stop() override; private: /** A portable thread */ xbt_os_thread_t thread_ = nullptr; @@ -36,6 +38,9 @@ private: xbt_os_sem_t end_ = nullptr; private: static void* wrapper(void *param); + static void* maestro_wrapper(void *param); +public: + void start(); }; class ThreadContextFactory : public ContextFactory { @@ -46,6 +51,10 @@ public: void_pfn_smxprocess_t cleanup_func, smx_process_t process) override; void run_all() override; ThreadContext* self() override; + + // Optional methods: + ThreadContext* attach(void_pfn_smxprocess_t cleanup_func, smx_process_t process) override; + ThreadContext* create_maestro(std::function code, smx_process_t process) override; }; } diff --git a/src/simix/smx_global.cpp b/src/simix/smx_global.cpp index acc00af0fa..080e606dfc 100644 --- a/src/simix/smx_global.cpp +++ b/src/simix/smx_global.cpp @@ -175,6 +175,15 @@ static void SIMIX_storage_create_(smx_storage_t storage) SIMIX_storage_create(key, storage, NULL); } +static void (*maestro_code)(void*) = nullptr; +static void* maestro_data = nullptr; + +void SIMIX_set_maestro(void (*code)(void*), void* data) +{ + maestro_code = code; + maestro_data = data; +} + /** * \ingroup SIMIX_API * \brief Initialize SIMIX internal data. @@ -218,7 +227,10 @@ void SIMIX_global_init(int *argc, char **argv) surf_init(argc, argv); /* Initialize SURF structures */ SIMIX_context_mod_init(); - SIMIX_create_maestro_process(); + + // Either create a new context with maestro or create + // a context object with the current context mestro): + SIMIX_maestro_create(maestro_code, maestro_data); /* context exception handlers */ __xbt_running_ctx_fetch = SIMIX_process_get_running_context; diff --git a/src/simix/smx_private.h b/src/simix/smx_private.h index b194599ab7..e1bfefe2f4 100644 --- a/src/simix/smx_private.h +++ b/src/simix/smx_private.h @@ -213,7 +213,7 @@ typedef struct s_smx_synchro { XBT_PRIVATE void SIMIX_context_mod_init(void); XBT_PRIVATE void SIMIX_context_mod_exit(void); -smx_context_t SIMIX_context_new( +XBT_PRIVATE smx_context_t SIMIX_context_new( xbt_main_func_t code, int argc, char **argv, void_pfn_smxprocess_t cleanup_func, smx_process_t simix_process); diff --git a/src/simix/smx_process.cpp b/src/simix/smx_process.cpp index 9ced079ade..65d1f6c271 100644 --- a/src/simix/smx_process.cpp +++ b/src/simix/smx_process.cpp @@ -103,7 +103,8 @@ void SIMIX_process_cleanup(smx_process_t process) XBT_DEBUG("%p should not be run anymore",process); xbt_swag_remove(process, simix_global->process_list); - xbt_swag_remove(process, sg_host_simix(process->host)->process_list); + if (process->host) + xbt_swag_remove(process, sg_host_simix(process->host)->process_list); xbt_swag_insert(process, simix_global->process_to_destroy); process->context->iwannadie = 0; @@ -141,21 +142,32 @@ void SIMIX_process_empty_trash(void) /** * \brief Creates and runs the maestro process */ -void SIMIX_create_maestro_process() +void SIMIX_maestro_create(void (*code)(void*), void* data) { smx_process_t maestro = NULL; - /* Create maestro process and intilialize it */ maestro = xbt_new0(s_smx_process_t, 1); maestro->pid = simix_process_maxpid++; maestro->ppid = -1; - maestro->name = (char *) ""; + maestro->name = (char*) ""; + maestro->data = data; maestro->running_ctx = (xbt_running_ctx_t*) xbt_malloc0(sizeof(xbt_running_ctx_t)); XBT_RUNNING_CTX_INITIALIZE(maestro->running_ctx); - maestro->context = SIMIX_context_new(NULL, 0, NULL, NULL, maestro); + + if (!code) { + maestro->context = SIMIX_context_new(NULL, 0, nullptr, NULL, maestro); + } else { + if (!simix_global) + xbt_die("simix is not initialized, please call MSG_init first"); + maestro->context = + simix_global->context_factory->create_maestro( + std::bind(code, data), maestro); + } + maestro->simcall.issuer = maestro; simix_global->maestro_process = maestro; } + /** * \brief Stops a process. * @@ -327,6 +339,110 @@ smx_process_t SIMIX_process_create( return process; } +smx_process_t SIMIX_process_attach( + const char* name, + void *data, + const char* hostname, + xbt_dict_t properties, + smx_process_t parent_process) +{ + // This is mostly a copy/paste from SIMIX_process_new(), + // it'd be nice to share some code between those two functions. + + sg_host_t host = sg_host_by_name(hostname); + XBT_DEBUG("Attach process %s on host '%s'", name, hostname); + + if (host->is_off()) { + XBT_WARN("Cannot launch process '%s' on failed host '%s'", + name, hostname); + return nullptr; + } + + smx_process_t process = xbt_new0(s_smx_process_t, 1); + /* Process data */ + process->pid = simix_process_maxpid++; + process->name = xbt_strdup(name); + process->host = host; + process->data = data; + process->comms = xbt_fifo_new(); + process->simcall.issuer = process; + process->ppid = -1; + /* Initiliaze data segment to default value */ + SIMIX_segment_index_set(process, -1); + if (parent_process != NULL) { + process->ppid = SIMIX_process_get_PID(parent_process); + /* SMPI process have their own data segment and + each other inherit from their father */ + #ifdef HAVE_SMPI + if(smpi_privatize_global_variables){ + if(parent_process->pid != 0){ + SIMIX_segment_index_set(process, parent_process->segment_index); + } else { + SIMIX_segment_index_set(process, process->pid - 1); + } + } + #endif + } + + /* Process data for auto-restart */ + process->auto_restart = false; + process->code = nullptr; + process->argc = 0; + process->argv = nullptr; + + XBT_VERB("Create context %s", process->name); + if (!simix_global) + xbt_die("simix is not initialized, please call MSG_init first"); + process->context = simix_global->context_factory->attach( + simix_global->cleanup_process_function, process); + + process->running_ctx = (xbt_running_ctx_t*) xbt_malloc0(sizeof(xbt_running_ctx_t)); + XBT_RUNNING_CTX_INITIALIZE(process->running_ctx); + + if(MC_is_active()){ + MC_ignore_heap(process->running_ctx, sizeof(*process->running_ctx)); + } + + /* Add properties */ + process->properties = properties; + + /* Add the process to it's host process list */ + xbt_swag_insert(process, sg_host_simix(host)->process_list); + + /* Now insert it in the global process list and in the process to run list */ + xbt_swag_insert(process, simix_global->process_list); + XBT_DEBUG("Inserting %s(%s) in the to_run list", process->name, sg_host_get_name(host)); + xbt_dynar_push_as(simix_global->process_to_run, smx_process_t, process); + + /* Tracing the process creation */ + TRACE_msg_process_create(process->name, process->pid, process->host); + + auto context = dynamic_cast(process->context); + if (!context) + xbt_die("Not a suitable context"); + + context->attach_start(); + return process; +} + +void SIMIX_process_detach(void) +{ + auto context = dynamic_cast(SIMIX_context_self()); + if (!context) + xbt_die("Not a suitable context"); + + simix_global->cleanup_process_function(context->process()); + + // Let maestro ignore we are still alive: + // xbt_swag_remove(context->process(), simix_global->process_list); + + // TODDO, Remove from proces list: + // xbt_swag_remove(process, sg_host_simix(host)->process_list); + + context->attach_stop(); + // delete context; +} + /** * \brief Executes the processes from simix_global->process_to_run. * @@ -906,7 +1022,11 @@ void SIMIX_process_yield(smx_process_t self) /* callback: context fetching */ xbt_running_ctx_t *SIMIX_process_get_running_context(void) { - return SIMIX_process_self()->running_ctx; + smx_process_t process = SIMIX_process_self(); + if (process) + return process->running_ctx; + else + return nullptr; } /* callback: termination */ diff --git a/src/simix/smx_process_private.h b/src/simix/smx_process_private.h index febf7a58b0..76653dc8f6 100644 --- a/src/simix/smx_process_private.h +++ b/src/simix/smx_process_private.h @@ -75,11 +75,11 @@ XBT_PRIVATE smx_process_t SIMIX_process_create( xbt_dict_t properties, int auto_restart, smx_process_t parent_process); + XBT_PRIVATE void SIMIX_process_runall(void); XBT_PRIVATE void SIMIX_process_kill(smx_process_t process, smx_process_t issuer); XBT_PRIVATE void SIMIX_process_killall(smx_process_t issuer, int reset_pid); XBT_PRIVATE smx_process_t SIMIX_process_create_from_wrapper(smx_process_arg_t args); -XBT_PRIVATE void SIMIX_create_maestro_process(void); XBT_PRIVATE void SIMIX_process_stop(smx_process_t arg); XBT_PRIVATE void SIMIX_process_cleanup(smx_process_t arg); XBT_PRIVATE void SIMIX_process_empty_trash(void); diff --git a/src/xbt/xbt_os_thread.c b/src/xbt/xbt_os_thread.c index 095a28c435..78c47333eb 100644 --- a/src/xbt/xbt_os_thread.c +++ b/src/xbt/xbt_os_thread.c @@ -103,10 +103,13 @@ void xbt_os_thread_mod_preinit(void) "pthread_key_create failed for xbt_self_thread_key"); main_thread = xbt_new(s_xbt_os_thread_t, 1); + main_thread->name = NULL; + main_thread->detached = 0; main_thread->name = (char *) "main"; - main_thread->start_routine = NULL; main_thread->param = NULL; + main_thread->start_routine = NULL; main_thread->running_ctx = xbt_new(xbt_running_ctx_t, 1); + main_thread->extra_data = NULL; XBT_RUNNING_CTX_INITIALIZE(main_thread->running_ctx); if ((errcode = pthread_setspecific(xbt_self_thread_key, main_thread))) @@ -1299,7 +1302,11 @@ void xbt_os_thread_set_extra_data(void *data) void *xbt_os_thread_get_extra_data(void) { - return xbt_os_thread_self()->extra_data; + xbt_os_thread_t thread = xbt_os_thread_self(); + if (thread) + return xbt_os_thread_self()->extra_data; + else + return NULL; } xbt_os_rmutex_t xbt_os_rmutex_init(void) diff --git a/tools/cmake/Tests.cmake b/tools/cmake/Tests.cmake index 8b7fb84fe1..4f8e30ff3c 100644 --- a/tools/cmake/Tests.cmake +++ b/tools/cmake/Tests.cmake @@ -210,6 +210,7 @@ IF(NOT enable_memcheck) ADD_TESH_FACTORIES(msg-sendrecv-CLM03 "thread;ucontext;raw;boost" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/sendrecv/sendrecv_CLM03.tesh) ADD_TESH_FACTORIES(msg-sendrecv-Vegas "thread;ucontext;raw;boost" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/sendrecv/sendrecv_Vegas.tesh) ADD_TESH_FACTORIES(msg-sendrecv-Reno "thread;ucontext;raw;boost" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/sendrecv/sendrecv_Reno.tesh) + ADD_TESH_FACTORIES(msg-sendrecv-CLM03-main "thread" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/sendrecv/sendrecv_CLM03_main.tesh) ADD_TESH_FACTORIES(msg-suspend "thread;ucontext;raw;boost" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/suspend/suspend.tesh) # This one is not usable: # ADD_TESH_FACTORIES(msg-exception "thread;ucontext;raw" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/exception/exception.tesh)