set(EXECUTABLE_OUTPUT_PATH "${CMAKE_CURRENT_BINARY_DIR}")
add_executable(sendrecv sendrecv.c)
+add_executable(sendrecv_main sendrecv_main.c)
### Add definitions for compile
target_link_libraries(sendrecv simgrid )
+target_link_libraries(sendrecv_main simgrid )
set(tesh_files
${tesh_files}
set(xml_files
${xml_files}
${CMAKE_CURRENT_SOURCE_DIR}/deployment_sendrecv.xml
+ ${CMAKE_CURRENT_SOURCE_DIR}/deployment_sendrecv_main.xml
PARENT_SCOPE
)
set(examples_src
${examples_src}
${CMAKE_CURRENT_SOURCE_DIR}/sendrecv.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/sendrecv_main.c
PARENT_SCOPE
)
set(bin_files
--- /dev/null
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid/simgrid.dtd">
+<platform version="4"><!-- For using with ping_pong, platform_sendrecv.xml -->
+ <process host="Jupiter" function="receiver"/>
+</platform>
--- /dev/null
+#! ./tesh
+
+p Testing the deprecated CM02 network model
+
+! output sort 19
+$ $SG_TEST_EXENV sendrecv/sendrecv_main$EXEEXT ${srcdir:=.}/../platforms/small_platform.xml ${srcdir:=.}/sendrecv/deployment_sendrecv_main.xml --cfg=cpu/model:Cas01 --cfg=network/model:CM02 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
+> [ 0.000000] (0:maestro@) Configuration change: Set 'cpu/model' to 'Cas01'
+> [ 0.000000] (0:maestro@) Configuration change: Set 'network/model' to 'CM02'
+> [ 0.000000] (0:maestro@) test_all
+> [ 0.000000] (1:sender@Tremblay) sender
+> [ 0.000000] (1:sender@Tremblay) host = Jupiter
+> [ 0.000000] (1:sender@Tremblay) task_la->data = 0.000000e+00
+> [ 0.000000] (2:receiver@Jupiter) receiver
+> [ 0.001462] (2:receiver@Jupiter) Task received : latency task
+> [ 0.001462] (2:receiver@Jupiter) Communic. time 1.461656e-03
+> [ 0.001462] (2:receiver@Jupiter) --- la 0.001462 ----
+> [ 0.001462] (1:sender@Tremblay) task_bw->data = 1.461656e-03
+> [138.703988] (2:receiver@Jupiter) Task received : bandwidth task
+> [138.703988] (2:receiver@Jupiter) Communic. time 1.387025e+02
+> [138.703988] (2:receiver@Jupiter) --- bw 7209674.030423 ----
+> [138.703988] (0:maestro@) Total simulation time: 1.387040e+02
--- /dev/null
+/* Copyright (c) 2007-2015. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <stdio.h>
+
+#include "simgrid/msg.h" /* Yeah! If you want to use msg, you need to include simgrid/msg.h */
+#include "xbt/sysdep.h" /* calloc */
+
+/* Create a log channel to have nice outputs. */
+#include "xbt/log.h"
+#include "xbt/asserts.h"
+
+/** @addtogroup MSG_examples
+ *
+ * - <b>sendrecv/sendrecv.c: Ping-pong example</b>. It's hard to
+ * think of a simpler example. The tesh files laying in the
+ * directory are instructive concerning the way to pass options to the simulators (as described in \ref options).
+ */
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
+ "Messages specific for this msg example");
+
+int sender(int argc, char *argv[]);
+int receiver(int argc, char *argv[]);
+
+double task_comm_size_lat = 1;
+double task_comm_size_bw = 10e8;
+
+/** Emitter function */
+int sender(int argc, char *argv[])
+{
+ msg_host_t host = NULL;
+ double time;
+ msg_task_t task_la = NULL;
+ msg_task_t task_bw = NULL;
+ char sprintf_buffer_la[64];
+ char sprintf_buffer_bw[64];
+
+ XBT_INFO("sender");
+
+ /*host = xbt_new0(msg_host_t,1); */
+
+ XBT_INFO("host = %s", argv[1]);
+
+ host = MSG_host_by_name(argv[1]);
+
+ if (host == NULL) {
+ XBT_INFO("Unknown host %s. Stopping Now! ", argv[1]);
+ abort();
+ }
+
+ /* Latency */
+ time = MSG_get_clock();
+ sprintf(sprintf_buffer_la, "latency task");
+ task_la =
+ MSG_task_create(sprintf_buffer_la, 0.0, task_comm_size_lat, NULL);
+ task_la->data = xbt_new(double, 1);
+ *(double *) task_la->data = time;
+ XBT_INFO("task_la->data = %e", *((double *) task_la->data));
+ MSG_task_send(task_la, argv[1]);
+
+ /* Bandwidth */
+ time = MSG_get_clock();
+ sprintf(sprintf_buffer_bw, "bandwidth task");
+ task_bw =
+ MSG_task_create(sprintf_buffer_bw, 0.0, task_comm_size_bw, NULL);
+ task_bw->data = xbt_new(double, 1);
+ *(double *) task_bw->data = time;
+ XBT_INFO("task_bw->data = %e", *((double *) task_bw->data));
+ MSG_task_send(task_bw, argv[1]);
+
+ return 0;
+} /* end_of_client */
+
+/** Receiver function */
+int receiver(int argc, char *argv[])
+{
+ double time, time1, sender_time;
+ msg_task_t task_la = NULL;
+ msg_task_t task_bw = NULL;
+ int a;
+ double communication_time = 0;
+
+ XBT_INFO("receiver");
+
+ /* Get Latency */
+ a = MSG_task_receive(&task_la,MSG_host_get_name(MSG_host_self()));
+ if (a == MSG_OK) {
+ time1 = MSG_get_clock();
+ sender_time = *((double *) (task_la->data));
+ time = sender_time;
+ communication_time = time1 - time;
+ XBT_INFO("Task received : %s", task_la->name);
+ xbt_free(task_la->data);
+ MSG_task_destroy(task_la);
+ XBT_INFO("Communic. time %e", communication_time);
+ XBT_INFO("--- la %f ----", communication_time);
+ } else {
+ xbt_die("Unexpected behavior");
+ }
+
+ /* Get Bandwidth */
+ a = MSG_task_receive(&task_bw,MSG_host_get_name(MSG_host_self()));
+ if (a == MSG_OK) {
+ time1 = MSG_get_clock();
+ sender_time = *((double *) (task_bw->data));
+ time = sender_time;
+ communication_time = time1 - time;
+ XBT_INFO("Task received : %s", task_bw->name);
+ xbt_free(task_bw->data);
+ MSG_task_destroy(task_bw);
+ XBT_INFO("Communic. time %e", communication_time);
+ XBT_INFO("--- bw %f ----", task_comm_size_bw / communication_time);
+ } else {
+ xbt_die("Unexpected behavior");
+ }
+
+
+ return 0;
+} /* end_of_receiver */
+
+struct application {
+ const char* platform_file;
+ const char* application_file;
+};
+
+/** Test function */
+static msg_error_t test_all(struct application* app)
+{
+ msg_error_t res = MSG_OK;
+
+ XBT_INFO("test_all");
+
+ /* Simulation setting */
+ MSG_create_environment(app->platform_file);
+
+ /* Become one of the simulated process.
+ *
+ * This must be done after the creation of the platform
+ * because we are depending attaching to a host.*/
+ MSG_process_attach("sender", NULL, MSG_host_by_name("Tremblay"), NULL);
+
+ /* Application deployment */
+ MSG_function_register("receiver", receiver);
+
+ MSG_launch_application(app->application_file);
+
+ // Execute the sender code:
+ const char* argv[3] = { "sender", "Jupiter", NULL };
+ sender(2, (char**) argv);
+
+ MSG_process_detach();
+ return res;
+} /* end_of_test_all */
+
+static
+void maestro(void* data)
+{
+ // struct application* app = (struct application*) data;
+ MSG_main();
+}
+
+/** Main function */
+int main(int argc, char *argv[])
+{
+ msg_error_t res = MSG_OK;
+
+#ifdef _MSC_VER
+ unsigned int prev_exponent_format =
+ _set_output_format(_TWO_DIGIT_EXPONENT);
+#endif
+
+ struct application app;
+ app.platform_file = argv[1];
+ app.application_file = argv[2];
+
+ SIMIX_set_maestro(maestro, &app);
+ MSG_init(&argc, argv);
+
+ if (argc != 3) {
+ XBT_CRITICAL("Usage: %s platform_file deployment_file\n",
+ argv[0]);
+ xbt_die("example: %s msg_platform.xml msg_deployment.xml\n",argv[0]);
+ }
+
+ res = test_all(&app);
+
+ XBT_INFO("Total simulation time: %e", MSG_get_clock());
+
+#ifdef _MSC_VER
+ _set_output_format(prev_exponent_format);
+#endif
+
+ if (res == MSG_OK)
+ return 0;
+ else
+ return 1;
+} /* end_of_main */
char **argv,
xbt_dict_t
properties);
+XBT_PUBLIC(msg_process_t) MSG_process_attach(
+ const char *name, void *data,
+ msg_host_t host, xbt_dict_t properties);
+XBT_PUBLIC(void) MSG_process_detach(void);
+
XBT_PUBLIC(void) MSG_process_kill(msg_process_t process);
XBT_PUBLIC(int) MSG_process_killall(int reset_PIDs);
XBT_PUBLIC(msg_error_t) MSG_process_migrate(msg_process_t process, msg_host_t host);
/* Initialization and exit */
XBT_PUBLIC(void) SIMIX_global_init(int *argc, char **argv);
+/* Set to execute in the maestro
+ *
+ * If no maestro code is registered (the default), the main thread
+ * is assumed to be the maestro. */
+XBT_PUBLIC(void) SIMIX_set_maestro(void (*code)(void*), void* data);
XBT_PUBLIC(void) SIMIX_function_register_process_cleanup(void_pfn_smxprocess_t function);
XBT_PUBLIC(void) SIMIX_function_register_process_create(smx_creation_func_t function);
double process_start_time,
double process_kill_time);
+/*********************************** Host *************************************/
+/* Functions for running a process in main()
+ *
+ * 1. create the maestro process
+ * 2. attach (create a context and wait for maestro to give control back to you)
+ * 3. do you process job
+ * 4. detach (this waits for the simulation to terminate)
+ */
+
+XBT_PUBLIC(void) SIMIX_maestro_create(void (*code)(void*), void* data);
+XBT_PUBLIC(smx_process_t) SIMIX_process_attach(
+ const char* name,
+ void *data,
+ const char* hostname,
+ xbt_dict_t properties,
+ smx_process_t parent_process);
+XBT_PUBLIC(void) SIMIX_process_detach(void);
+
/*********************************** Host *************************************/
XBT_PUBLIC(sg_host_t) SIMIX_host_self(void);
XBT_PUBLIC(const char*) SIMIX_host_self_get_name(void);
return promise.get_future().get();
}
+class args {
+private:
+ int argc_;
+ char** argv_;
+public:
+
+ // Main constructors
+ args() : argc_(0), argv_(nullptr) {}
+ args(int argc, char** argv) : argc_(argc), argv_(argv) {}
+
+ // Free
+ void clear()
+ {
+ for (int i = 0; i < this->argc_; i++)
+ free(this->argv_[i]);
+ free(this->argv_);
+ this->argc_ = 0;
+ this->argv_ = nullptr;
+ }
+ ~args() { clear(); }
+
+ // Copy
+ args(args const& that) = delete;
+ args& operator=(args const& that) = delete;
+
+ // Move:
+ args(args&& that) : argc_(that.argc_), argv_(that.argv_)
+ {
+ that.argc_ = 0;
+ that.argv_ = nullptr;
+ }
+ args& operator=(args&& that)
+ {
+ this->argc_ = that.argc_;
+ this->argv_ = that.argv_;
+ that.argc_ = 0;
+ that.argv_ = nullptr;
+ return *this;
+ }
+
+ int argc() const { return argc_; }
+ char** argv() { return argv_; }
+ const char*const* argv() const { return argv_; }
+ char* operator[](std::size_t i) { return argv_[i]; }
+};
+
+inline
+std::function<void()> wrap_main(xbt_main_func_t code, int argc, char **argv)
+{
+ if (code) {
+ auto arg = std::make_shared<simgrid::simix::args>(argc, argv);
+ return [=]() {
+ code(arg->argc(), arg->argv());
+ };
+ }
+ // TODO, we should free argv
+ else return std::function<void()>();
+}
+
class Context;
class ContextFactory;
virtual ~ContextFactory();
virtual Context* create_context(std::function<void()> code,
void_pfn_smxprocess_t cleanup, smx_process_t process) = 0;
+
+ // Optional methods for attaching main() as a context:
+
+ /** Creates a context from the current context of execution
+ *
+ * This will not work on all implementation of `ContextFactory`.
+ */
+ virtual Context* attach(void_pfn_smxprocess_t cleanup_func, smx_process_t process);
+ virtual Context* create_maestro(std::function<void()> code, smx_process_t process);
+
virtual void run_all() = 0;
virtual Context* self();
std::string const& name() const
virtual void suspend() = 0;
};
+XBT_PUBLIC_CLASS AttachContext : public Context {
+public:
+
+ AttachContext(std::function<void()> code,
+ void_pfn_smxprocess_t cleanup_func,
+ smx_process_t process)
+ : Context(std::move(code), cleanup_func, process)
+ {}
+
+ ~AttachContext();
+
+ /** Called by the context when it is ready to give control
+ * to the maestro.
+ */
+ virtual void attach_start() = 0;
+
+ /** Called by the context when it has finished its job */
+ virtual void attach_stop() = 0;
+};
+
}
}
return process;
}
+static
+int MSG_maestro(int argc, char** argv)
+{
+ int res = MSG_main();
+ return res;
+}
+
+/* Become a process in the simulation
+ *
+ * Currently this can only be called by the main thread (once) and only work
+ * with some thread factories (currently ThreadContextFactory).
+ *
+ * In the future, it might be extended in order to attach other threads created
+ * by a third party library.
+ */
+msg_process_t MSG_process_attach(
+ const char *name, void *data,
+ msg_host_t host, xbt_dict_t properties)
+{
+ xbt_assert(host != NULL, "Invalid parameters: host and code params must not be NULL");
+ simdata_process_t simdata = xbt_new0(s_simdata_process_t, 1);
+ msg_process_t process;
+
+ /* Simulator data for MSG */
+ simdata->waiting_action = NULL;
+ simdata->waiting_task = NULL;
+ simdata->m_host = host;
+ simdata->argc = 0;
+ simdata->argv = NULL;
+ simdata->data = data;
+ simdata->last_errno = MSG_OK;
+
+ /* Let's create the process: SIMIX may decide to start it right now,
+ * even before returning the flow control to us */
+ process = SIMIX_process_attach(name, simdata, sg_host_get_name(host), properties, NULL);
+ if (!process)
+ xbt_die("Could not attach");
+ simcall_process_on_exit(process,(int_f_pvoid_pvoid_t)TRACE_msg_process_kill,process);
+ return process;
+}
+
+/** Detach a process attached with `MSG_process_attach()`
+ *
+ * This is called when the current process has finished its job.
+ * Used in the main thread, it waits for the simulation to finish before
+ * returning. When it returns, the other simulated processes and the maestro
+ * are destroyed.
+ */
+void MSG_process_detach(void)
+{
+ SIMIX_process_detach();
+}
+
/** \ingroup m_process_management
* \param process poor victim
*
#include "mc/mc.h"
#include <src/simix/smx_private.h>
+#include <src/simix/smx_private.hpp>
void SIMIX_process_set_cleanup_function(
smx_process_t process, void_pfn_smxprocess_t cleanup)
process->context->set_cleanup(cleanup);
}
-namespace simgrid {
-namespace simix {
-
-class XBT_PRIVATE args {
-private:
- int argc_;
- char** argv_;
-public:
-
- // Main constructors
- args() : argc_(0), argv_(nullptr) {}
- args(int argc, char** argv) : argc_(argc), argv_(argv) {}
-
- // Free
- void clear()
- {
- for (int i = 0; i < this->argc_; i++)
- free(this->argv_[i]);
- free(this->argv_);
- this->argc_ = 0;
- this->argv_ = nullptr;
- }
- ~args() { clear(); }
-
- // Copy
- args(args const& that) = delete;
- args& operator=(args const& that) = delete;
-
- // Move:
- args(args&& that) : argc_(that.argc_), argv_(that.argv_)
- {
- that.argc_ = 0;
- that.argv_ = nullptr;
- }
- args& operator=(args&& that)
- {
- this->argc_ = that.argc_;
- this->argv_ = that.argv_;
- that.argc_ = 0;
- that.argv_ = nullptr;
- return *this;
- }
-
- int argc() const { return argc_; }
- char** argv() { return argv_; }
- const char*const* argv() const { return argv_; }
- char* operator[](std::size_t i) { return argv_[i]; }
-};
-
-}
-}
-
-static
-std::function<void()> wrap_main(xbt_main_func_t code, int argc, char **argv)
-{
- if (code) {
- auto arg = std::make_shared<simgrid::simix::args>(argc, argv);
- return [=]() {
- code(arg->argc(), arg->argv());
- };
- } else return std::function<void()>();
-}
-
/**
* \brief creates a new context for a user level process
* \param code a main function
if (!simix_global)
xbt_die("simix is not initialized, please call MSG_init first");
return simix_global->context_factory->create_context(
- wrap_main(code, argc, argv), cleanup_func, simix_process);
+ simgrid::simix::wrap_main(code, argc, argv), cleanup_func, simix_process);
}
namespace simgrid {
#endif
}
+Context* ContextFactory::attach(void_pfn_smxprocess_t cleanup_func, smx_process_t process)
+{
+ xbt_die("Cannot attach with this ContextFactory.\n"
+ "Try using --cfg=contexts/factory:thread instead.\n");
+}
+
+Context* ContextFactory::create_maestro(std::function<void()> code, smx_process_t process)
+{
+ xbt_die("Cannot create_maestro with this ContextFactory.\n"
+ "Try using --cfg=contexts/factory:thread instead.\n");
+}
+
Context::Context(std::function<void()> code,
void_pfn_smxprocess_t cleanup_func, smx_process_t process)
: code_(std::move(code)), process_(process), iwannadie(false)
this->iwannadie = true;
}
+AttachContext::~AttachContext()
+{
+}
+
}
}
\ No newline at end of file
std::function<void()> code,
void_pfn_smxprocess_t cleanup, smx_process_t process)
{
- return this->new_context<ThreadContext>(std::move(code), cleanup, process);
-}
-
-ThreadContext::ThreadContext(std::function<void()> code,
- void_pfn_smxprocess_t cleanup, smx_process_t process)
- : Context(std::move(code), cleanup, process)
-{
- /* If the user provided a function for the process then use it
- otherwise is the context for maestro */
- if (has_code()) {
- this->begin_ = xbt_os_sem_init(0);
- this->end_ = xbt_os_sem_init(0);
- if (smx_context_stack_size_was_set)
- xbt_os_thread_setstacksize(smx_context_stack_size);
- if (smx_context_guard_size_was_set)
- xbt_os_thread_setguardsize(smx_context_guard_size);
-
- /* create and start the process */
- /* NOTE: The first argument to xbt_os_thread_create used to be the process *
- * name, but now the name is stored at SIMIX level, so we pass a null */
- this->thread_ =
- xbt_os_thread_create(NULL, ThreadContext::wrapper, this, this);
-
- /* wait the starting of the newly created process */
- xbt_os_sem_acquire(this->end_);
-
- } else {
- xbt_os_thread_set_extra_data(this);
- }
+ return this->new_context<ThreadContext>(std::move(code), cleanup, process, !code);
}
void ThreadContextFactory::run_all()
return static_cast<ThreadContext*>(xbt_os_thread_get_extra_data());
}
+ThreadContext* ThreadContextFactory::attach(void_pfn_smxprocess_t cleanup_func, smx_process_t process)
+{
+ return this->new_context<ThreadContext>(
+ std::function<void()>(), cleanup_func, process, false);
+}
+
+ThreadContext* ThreadContextFactory::create_maestro(std::function<void()> code, smx_process_t process)
+{
+ return this->new_context<ThreadContext>(std::move(code), nullptr, process, true);
+}
+
+ThreadContext::ThreadContext(std::function<void()> code,
+ void_pfn_smxprocess_t cleanup, smx_process_t process, bool maestro)
+ : AttachContext(std::move(code), cleanup, process)
+{
+ // We do not need the semaphores when maestro is in main:
+ // if (!(maestro && !code)) {
+ this->begin_ = xbt_os_sem_init(0);
+ this->end_ = xbt_os_sem_init(0);
+ // }
+
+ /* If the user provided a function for the process then use it */
+ if (has_code()) {
+ if (smx_context_stack_size_was_set)
+ xbt_os_thread_setstacksize(smx_context_stack_size);
+ if (smx_context_guard_size_was_set)
+ xbt_os_thread_setguardsize(smx_context_guard_size);
+
+ /* create and start the process */
+ /* NOTE: The first argument to xbt_os_thread_create used to be the process *
+ * name, but now the name is stored at SIMIX level, so we pass a null */
+ this->thread_ =
+ xbt_os_thread_create(NULL,
+ maestro ? ThreadContext::maestro_wrapper : ThreadContext::wrapper,
+ this, this);
+ /* wait the starting of the newly created process */
+ xbt_os_sem_acquire(this->end_);
+ }
+
+ /* Otherwise, we attach to the current thread */
+ else {
+ xbt_os_thread_set_extra_data(this);
+ }
+}
+
ThreadContext::~ThreadContext()
{
/* check if this is the context of maestro (it doesn't have a real thread) */
#endif
/* Tell the maestro we are starting, and wait for its green light */
xbt_os_sem_release(context->end_);
+
xbt_os_sem_acquire(context->begin_);
if (smx_ctx_thread_sem) /* parallel run */
xbt_os_sem_acquire(smx_ctx_thread_sem);
(*context)();
context->stop();
+
+ return nullptr;
+}
+
+void *ThreadContext::maestro_wrapper(void *param)
+{
+ ThreadContext* context = static_cast<ThreadContext*>(param);
+
+#ifndef WIN32
+ /* Install alternate signal stack, for SIGSEGV handler. */
+ stack_t stack;
+ stack.ss_sp = sigsegv_stack;
+ stack.ss_size = sizeof sigsegv_stack;
+ stack.ss_flags = 0;
+ sigaltstack(&stack, nullptr);
+#endif
+ /* Tell the caller we are starting */
+ xbt_os_sem_release(context->end_);
+
+ // Wait for the caller to give control back to us:
+ xbt_os_sem_acquire(context->begin_);
+ (*context)();
+
+ // Tell main that we have finished:
+ xbt_os_sem_release(context->end_);
+
return nullptr;
}
+void ThreadContext::start()
+{
+ xbt_os_sem_acquire(this->begin_);
+ if (smx_ctx_thread_sem) /* parallel run */
+ xbt_os_sem_acquire(smx_ctx_thread_sem);
+}
+
void ThreadContext::stop()
{
Context::stop();
xbt_os_sem_acquire(smx_ctx_thread_sem);
}
+void ThreadContext::attach_start()
+{
+ // We're breaking the layers here by depending on the upper layer:
+ ThreadContext* maestro = (ThreadContext*) simix_global->maestro_process->context;
+ xbt_os_sem_release(maestro->begin_);
+ this->start();
+}
+
+void ThreadContext::attach_stop()
+{
+ if (smx_ctx_thread_sem)
+ xbt_os_sem_release(smx_ctx_thread_sem);
+ xbt_os_sem_release(this->end_);
+
+ ThreadContext* maestro = (ThreadContext*) simix_global->maestro_process->context;
+ xbt_os_sem_acquire(maestro->end_);
+
+ xbt_os_thread_set_extra_data(nullptr);
+}
+
}
}
class ThreadContext;
class ThreadContextFactory;
-class ThreadContext : public Context {
+class ThreadContext : public AttachContext {
public:
friend ThreadContextFactory;
ThreadContext(std::function<void()> code,
void_pfn_smxprocess_t cleanup_func,
- smx_process_t process);
+ smx_process_t process, bool maestro =false);
~ThreadContext();
void stop() override;
void suspend() override;
+ void attach_start() override;
+ void attach_stop() override;
private:
/** A portable thread */
xbt_os_thread_t thread_ = nullptr;
xbt_os_sem_t end_ = nullptr;
private:
static void* wrapper(void *param);
+ static void* maestro_wrapper(void *param);
+public:
+ void start();
};
class ThreadContextFactory : public ContextFactory {
void_pfn_smxprocess_t cleanup_func, smx_process_t process) override;
void run_all() override;
ThreadContext* self() override;
+
+ // Optional methods:
+ ThreadContext* attach(void_pfn_smxprocess_t cleanup_func, smx_process_t process) override;
+ ThreadContext* create_maestro(std::function<void()> code, smx_process_t process) override;
};
}
SIMIX_storage_create(key, storage, NULL);
}
+static void (*maestro_code)(void*) = nullptr;
+static void* maestro_data = nullptr;
+
+void SIMIX_set_maestro(void (*code)(void*), void* data)
+{
+ maestro_code = code;
+ maestro_data = data;
+}
+
/**
* \ingroup SIMIX_API
* \brief Initialize SIMIX internal data.
surf_init(argc, argv); /* Initialize SURF structures */
SIMIX_context_mod_init();
- SIMIX_create_maestro_process();
+
+ // Either create a new context with maestro or create
+ // a context object with the current context mestro):
+ SIMIX_maestro_create(maestro_code, maestro_data);
/* context exception handlers */
__xbt_running_ctx_fetch = SIMIX_process_get_running_context;
XBT_PRIVATE void SIMIX_context_mod_init(void);
XBT_PRIVATE void SIMIX_context_mod_exit(void);
-smx_context_t SIMIX_context_new(
+XBT_PRIVATE smx_context_t SIMIX_context_new(
xbt_main_func_t code, int argc, char **argv,
void_pfn_smxprocess_t cleanup_func,
smx_process_t simix_process);
XBT_DEBUG("%p should not be run anymore",process);
xbt_swag_remove(process, simix_global->process_list);
- xbt_swag_remove(process, sg_host_simix(process->host)->process_list);
+ if (process->host)
+ xbt_swag_remove(process, sg_host_simix(process->host)->process_list);
xbt_swag_insert(process, simix_global->process_to_destroy);
process->context->iwannadie = 0;
/**
* \brief Creates and runs the maestro process
*/
-void SIMIX_create_maestro_process()
+void SIMIX_maestro_create(void (*code)(void*), void* data)
{
smx_process_t maestro = NULL;
-
/* Create maestro process and intilialize it */
maestro = xbt_new0(s_smx_process_t, 1);
maestro->pid = simix_process_maxpid++;
maestro->ppid = -1;
- maestro->name = (char *) "";
+ maestro->name = (char*) "";
+ maestro->data = data;
maestro->running_ctx = (xbt_running_ctx_t*) xbt_malloc0(sizeof(xbt_running_ctx_t));
XBT_RUNNING_CTX_INITIALIZE(maestro->running_ctx);
- maestro->context = SIMIX_context_new(NULL, 0, NULL, NULL, maestro);
+
+ if (!code) {
+ maestro->context = SIMIX_context_new(NULL, 0, nullptr, NULL, maestro);
+ } else {
+ if (!simix_global)
+ xbt_die("simix is not initialized, please call MSG_init first");
+ maestro->context =
+ simix_global->context_factory->create_maestro(
+ std::bind(code, data), maestro);
+ }
+
maestro->simcall.issuer = maestro;
simix_global->maestro_process = maestro;
}
+
/**
* \brief Stops a process.
*
return process;
}
+smx_process_t SIMIX_process_attach(
+ const char* name,
+ void *data,
+ const char* hostname,
+ xbt_dict_t properties,
+ smx_process_t parent_process)
+{
+ // This is mostly a copy/paste from SIMIX_process_new(),
+ // it'd be nice to share some code between those two functions.
+
+ sg_host_t host = sg_host_by_name(hostname);
+ XBT_DEBUG("Attach process %s on host '%s'", name, hostname);
+
+ if (host->is_off()) {
+ XBT_WARN("Cannot launch process '%s' on failed host '%s'",
+ name, hostname);
+ return nullptr;
+ }
+
+ smx_process_t process = xbt_new0(s_smx_process_t, 1);
+ /* Process data */
+ process->pid = simix_process_maxpid++;
+ process->name = xbt_strdup(name);
+ process->host = host;
+ process->data = data;
+ process->comms = xbt_fifo_new();
+ process->simcall.issuer = process;
+ process->ppid = -1;
+ /* Initiliaze data segment to default value */
+ SIMIX_segment_index_set(process, -1);
+ if (parent_process != NULL) {
+ process->ppid = SIMIX_process_get_PID(parent_process);
+ /* SMPI process have their own data segment and
+ each other inherit from their father */
+ #ifdef HAVE_SMPI
+ if(smpi_privatize_global_variables){
+ if(parent_process->pid != 0){
+ SIMIX_segment_index_set(process, parent_process->segment_index);
+ } else {
+ SIMIX_segment_index_set(process, process->pid - 1);
+ }
+ }
+ #endif
+ }
+
+ /* Process data for auto-restart */
+ process->auto_restart = false;
+ process->code = nullptr;
+ process->argc = 0;
+ process->argv = nullptr;
+
+ XBT_VERB("Create context %s", process->name);
+ if (!simix_global)
+ xbt_die("simix is not initialized, please call MSG_init first");
+ process->context = simix_global->context_factory->attach(
+ simix_global->cleanup_process_function, process);
+
+ process->running_ctx = (xbt_running_ctx_t*) xbt_malloc0(sizeof(xbt_running_ctx_t));
+ XBT_RUNNING_CTX_INITIALIZE(process->running_ctx);
+
+ if(MC_is_active()){
+ MC_ignore_heap(process->running_ctx, sizeof(*process->running_ctx));
+ }
+
+ /* Add properties */
+ process->properties = properties;
+
+ /* Add the process to it's host process list */
+ xbt_swag_insert(process, sg_host_simix(host)->process_list);
+
+ /* Now insert it in the global process list and in the process to run list */
+ xbt_swag_insert(process, simix_global->process_list);
+ XBT_DEBUG("Inserting %s(%s) in the to_run list", process->name, sg_host_get_name(host));
+ xbt_dynar_push_as(simix_global->process_to_run, smx_process_t, process);
+
+ /* Tracing the process creation */
+ TRACE_msg_process_create(process->name, process->pid, process->host);
+
+ auto context = dynamic_cast<simgrid::simix::AttachContext*>(process->context);
+ if (!context)
+ xbt_die("Not a suitable context");
+
+ context->attach_start();
+ return process;
+}
+
+void SIMIX_process_detach(void)
+{
+ auto context = dynamic_cast<simgrid::simix::AttachContext*>(SIMIX_context_self());
+ if (!context)
+ xbt_die("Not a suitable context");
+
+ simix_global->cleanup_process_function(context->process());
+
+ // Let maestro ignore we are still alive:
+ // xbt_swag_remove(context->process(), simix_global->process_list);
+
+ // TODDO, Remove from proces list:
+ // xbt_swag_remove(process, sg_host_simix(host)->process_list);
+
+ context->attach_stop();
+ // delete context;
+}
+
/**
* \brief Executes the processes from simix_global->process_to_run.
*
/* callback: context fetching */
xbt_running_ctx_t *SIMIX_process_get_running_context(void)
{
- return SIMIX_process_self()->running_ctx;
+ smx_process_t process = SIMIX_process_self();
+ if (process)
+ return process->running_ctx;
+ else
+ return nullptr;
}
/* callback: termination */
xbt_dict_t properties,
int auto_restart,
smx_process_t parent_process);
+
XBT_PRIVATE void SIMIX_process_runall(void);
XBT_PRIVATE void SIMIX_process_kill(smx_process_t process, smx_process_t issuer);
XBT_PRIVATE void SIMIX_process_killall(smx_process_t issuer, int reset_pid);
XBT_PRIVATE smx_process_t SIMIX_process_create_from_wrapper(smx_process_arg_t args);
-XBT_PRIVATE void SIMIX_create_maestro_process(void);
XBT_PRIVATE void SIMIX_process_stop(smx_process_t arg);
XBT_PRIVATE void SIMIX_process_cleanup(smx_process_t arg);
XBT_PRIVATE void SIMIX_process_empty_trash(void);
"pthread_key_create failed for xbt_self_thread_key");
main_thread = xbt_new(s_xbt_os_thread_t, 1);
+ main_thread->name = NULL;
+ main_thread->detached = 0;
main_thread->name = (char *) "main";
- main_thread->start_routine = NULL;
main_thread->param = NULL;
+ main_thread->start_routine = NULL;
main_thread->running_ctx = xbt_new(xbt_running_ctx_t, 1);
+ main_thread->extra_data = NULL;
XBT_RUNNING_CTX_INITIALIZE(main_thread->running_ctx);
if ((errcode = pthread_setspecific(xbt_self_thread_key, main_thread)))
void *xbt_os_thread_get_extra_data(void)
{
- return xbt_os_thread_self()->extra_data;
+ xbt_os_thread_t thread = xbt_os_thread_self();
+ if (thread)
+ return xbt_os_thread_self()->extra_data;
+ else
+ return NULL;
}
xbt_os_rmutex_t xbt_os_rmutex_init(void)
ADD_TESH_FACTORIES(msg-sendrecv-CLM03 "thread;ucontext;raw;boost" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/sendrecv/sendrecv_CLM03.tesh)
ADD_TESH_FACTORIES(msg-sendrecv-Vegas "thread;ucontext;raw;boost" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/sendrecv/sendrecv_Vegas.tesh)
ADD_TESH_FACTORIES(msg-sendrecv-Reno "thread;ucontext;raw;boost" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/sendrecv/sendrecv_Reno.tesh)
+ ADD_TESH_FACTORIES(msg-sendrecv-CLM03-main "thread" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/sendrecv/sendrecv_CLM03_main.tesh)
ADD_TESH_FACTORIES(msg-suspend "thread;ucontext;raw;boost" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/suspend/suspend.tesh)
# This one is not usable:
# ADD_TESH_FACTORIES(msg-exception "thread;ucontext;raw" --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/exception/exception.tesh)