*/
void MSG_launch_application(const char *file)
{
+ xbt_assert0(msg_global,"MSG_global_init_args has to be called before MSG_launch_application.");
+ SIMIX_launch_application(file);
return;
}
*/
void MSG_function_register(const char *name,m_process_code_t code)
{
+ SIMIX_function_register(name, code);
return;
}
{
m_process_code_t code = NULL;
+ code = (m_process_code_t)SIMIX_get_registered_function(name);
return code;
}
*/
m_host_t MSG_get_host_by_name(const char *name)
{
- return NULL;
+ smx_host_t simix_h = NULL;
+
+ simix_h = SIMIX_host_get_by_name(name);
+ if (simix_h == NULL) {
+ return NULL;
+ }
+ else return (m_host_t)simix_h->data;
}
/** \ingroup msg_easier_life
*/
void MSG_create_environment(const char *file)
{
+ smx_host_t *workstation = NULL;
+ int i;
+
+ SIMIX_create_environment(file);
+
+ /* Initialize MSG hosts */
+ workstation = SIMIX_host_get_table();
+ for (i=0; i< SIMIX_host_get_number();i++) {
+ __MSG_host_create(workstation[i], NULL);
+ }
return;
}
#include "xbt/log.h"
#include "xbt/ex.h" /* ex_backtrace_display */
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_kernel, msg, "Logging specific to MSG (kernel)");
+
MSG_Global_t msg_global = NULL;
-/* static void MarkAsFailed(m_task_t t, TBX_HashTable_t failedProcessList); */
-/* static xbt_fifo_t MSG_buildFailedHostList(double a, double b); */
/** \defgroup msg_simulation MSG simulation Functions
* \brief This section describes the functions you need to know to
*/
void MSG_global_init(int *argc, char **argv)
{
+ if (!msg_global) {
+ SIMIX_global_init(argc, argv);
+
+ msg_global = xbt_new0(s_MSG_Global_t,1);
+
+ msg_global->host = xbt_fifo_new();
+ msg_global->process_list = xbt_fifo_new();
+ msg_global->max_channel = 0;
+ msg_global->PID = 1;
+ }
return;
}
/** \ingroup msg_easier_life
* \brief Traces MSG events in the Paje format.
*/
+
void MSG_paje_output(const char *filename)
{
- return;
}
/** \defgroup m_channel_management Understanding channels
*/
MSG_error_t MSG_set_channel_number(int number)
{
+ xbt_assert0((msg_global) && (msg_global->max_channel == 0), "Channel number already set!");
+
+ msg_global->max_channel = number;
return MSG_OK;
}
*/
int MSG_get_channel_number(void)
{
- return 0;
+ xbt_assert0((msg_global)&&(msg_global->max_channel != 0), "Channel number not set yet!");
+
+ return msg_global->max_channel;
}
void __MSG_display_process_status(void)
{
}
+
/* FIXME: Yeah, I'll do it in a portable maner one day [Mt] */
#include <signal.h>
*/
MSG_error_t MSG_main(void)
{
+ smx_cond_t cond = NULL;
+ smx_action_t smx_action;
+ xbt_fifo_t actions_done = xbt_fifo_new();
+ xbt_fifo_t actions_failed = xbt_fifo_new();
+
+ /* Prepare to display some more info when dying on Ctrl-C pressing */
+ signal(SIGINT,inthandler);
+
+ /* Clean IO before the run */
+ fflush(stdout);
+ fflush(stderr);
+
+ //surf_solve(); /* Takes traces into account. Returns 0.0 */
+ /* xbt_fifo_size(msg_global->process_to_run) */
+
+ while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
+
+ while ( (smx_action = xbt_fifo_pop(actions_failed)) ) {
+
+ xbt_fifo_item_t _cursor;
+
+ DEBUG1("** %s failed **",smx_action->name);
+ xbt_fifo_foreach(smx_action->cond_list,_cursor,cond,smx_cond_t) {
+ SIMIX_cond_broadcast(cond);
+ /* remove conditional from action */
+ xbt_fifo_remove(smx_action->cond_list,cond);
+ }
+ }
+
+ while ( (smx_action = xbt_fifo_pop(actions_done)) ) {
+ xbt_fifo_item_t _cursor;
+
+ DEBUG1("** %s done **",smx_action->name);
+ xbt_fifo_foreach(smx_action->cond_list,_cursor,cond,smx_cond_t) {
+ SIMIX_cond_broadcast(cond);
+ /* remove conditional from action */
+ xbt_fifo_remove(smx_action->cond_list,cond);
+ }
+ }
+ }
return MSG_OK;
}
*/
int MSG_process_killall(int reset_PIDs)
{
+ xbt_die("not implemented yet");
return 0;
}
*/
MSG_error_t MSG_clean(void)
{
+ xbt_fifo_item_t i = NULL;
+ m_host_t h = NULL;
+ m_process_t p = NULL;
+
+
+ while((p=xbt_fifo_pop(msg_global->process_list))) {
+ MSG_process_kill(p);
+ }
+
+ xbt_fifo_foreach(msg_global->host,i,h,m_host_t) {
+ __MSG_host_destroy(h);
+ }
+ xbt_fifo_free(msg_global->host);
+ xbt_fifo_free(msg_global->process_list);
+
+ free(msg_global);
+ SIMIX_clean();
+
return MSG_OK;
}
*/
double MSG_get_clock(void)
{
- return 0.0;
+ return SIMIX_get_clock();
}
m_host_t host)
{
+ xbt_die("not implemented yet");
MSG_RETURN(MSG_OK);
}
*/
int MSG_task_Iprobe(m_channel_t channel)
{
+ xbt_die("not implemented yet");
return 0;
}
*/
int MSG_task_probe_from(m_channel_t channel)
{
+ xbt_die("not implemented yet");
return 0;
}
MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
int *PID)
{
+ xbt_die("not implemented yet");
MSG_RETURN(MSG_OK);
}
*/
int MSG_task_probe_from_host(int channel, m_host_t host)
{
+ xbt_die("not implemented yet");
return 0;
}
MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
m_channel_t channel, double max_duration)
{
+ xbt_die("not implemented yet");
MSG_RETURN(MSG_OK);
}
/** \ingroup msg_gos_functions
void *data)
{
m_task_t task = xbt_new0(s_m_task_t,1);
+ xbt_die("not implemented yet");
return task;
}
MSG_error_t MSG_parallel_task_execute(m_task_t task)
{
- m_process_t process = MSG_process_self();
- MSG_error_t res;
- DEBUG0("Computing on a tons of guys");
-
- __MSG_parallel_task_execute(process, task);
-
- if(task->simdata->compute)
- res = __MSG_wait_for_computation(process,task);
- else
- res = MSG_OK;
-
- return res;
+ xbt_die("not implemented yet");
+ return MSG_OK;
}
*/
MSG_error_t MSG_process_sleep(double nb_sec)
{
+ smx_action_t act_sleep;
+ m_process_t proc = MSG_process_self();
+ smx_mutex_t mutex;
+ smx_cond_t cond;
+ /* create action to sleep */
+ act_sleep = SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->smx_process),nb_sec);
+
+ mutex = SIMIX_mutex_init();
+ SIMIX_mutex_lock(mutex);
+ /* create conditional and register action to it */
+ cond = SIMIX_cond_init();
+
+ SIMIX_register_condition_to_action(act_sleep, cond);
+ SIMIX_register_action_to_condition(act_sleep, cond);
+ SIMIX_cond_wait(cond,mutex);
+ SIMIX_mutex_unlock(mutex);
+
+ /* remove variables */
+ SIMIX_action_destroy(act_sleep);
+ SIMIX_cond_destroy(cond);
+ SIMIX_mutex_destroy(mutex);
+
MSG_RETURN(MSG_OK);
}
*/
static int MSG_get_msgload(void)
{
+ xbt_die("not implemented yet");
return 0;
}
*/
/********************************* Host **************************************/
-m_host_t __MSG_host_create(const char *name,
- void *workstation,
- void *data)
+m_host_t __MSG_host_create(smx_host_t workstation, void *data)
{
+ const char * name;
+ simdata_host_t simdata = xbt_new0(s_simdata_host_t,1);
m_host_t host = xbt_new0(s_m_host_t,1);
+ int i;
+ name = SIMIX_host_get_name(workstation);
+ /* Host structure */
+ host->name = xbt_strdup(name);
+ host->simdata = simdata;
+ host->data = data;
+
+ simdata->host = workstation;
+
+ simdata->mbox = xbt_new0(xbt_fifo_t, msg_global->max_channel);
+ for (i = 0; i < msg_global->max_channel; i++)
+ simdata->mbox[i] = xbt_fifo_new();
+
+ simdata->sleeping = xbt_new0(smx_cond_t, msg_global->max_channel);
+ simdata->mutex = SIMIX_mutex_init();
+ SIMIX_host_set_data(workstation, host);
+
+ /* Update global variables */
+ xbt_fifo_unshift(msg_global->host, host);
+
return host;
}
or not and attach \a data to \a host if it is possible.
*/
MSG_error_t MSG_host_set_data(m_host_t host, void *data)
-{
+{
xbt_assert0((host!=NULL), "Invalid parameters");
xbt_assert0((host->data == NULL), "Data already set");
*/
void __MSG_host_destroy(m_host_t host)
{
+ simdata_host_t simdata = NULL;
+ int i = 0;
+
+ xbt_assert0((host != NULL), "Invalid parameters");
+
+ /* Clean Simulator data */
+ /* SIMIX host will be cleaned when MSG_clean calls SIMIX_clean */
+ simdata = (host)->simdata;
+
+ for (i = 0; i < msg_global->max_channel; i++)
+ xbt_fifo_free(simdata->mbox[i]);
+ free(simdata->mbox);
+ free(simdata->sleeping);
+ SIMIX_mutex_destroy(simdata->mutex);
+ free(simdata);
+
+ /* Clean host structure */
+ free(host->name);
+ free(host);
+
return;
}
xbt_assert0(0, "Not implemented yet");
return(0);
-/* return(surf_workstation_resource->extension_public->get_load(h->simdata->host)); */
}
/** \ingroup m_host_management
*/
double MSG_get_host_speed(m_host_t h)
{
- return 0.0;
+ xbt_assert0((h!= NULL), "Invalid parameters");
+
+ return(SIMIX_host_get_speed(h->simdata->host));
}
/** \ingroup msg_gos_functions
*/
int MSG_host_is_avail (m_host_t h)
{
- return 0;
+ xbt_assert0((h!= NULL), "Invalid parameters");
+ return (SIMIX_host_get_state(h->simdata->host));
}
static void MSG_process_cleanup(void *arg)
{
+ xbt_die("not implemented yet");
return;
}
m_process_code_t code, void *data,
m_host_t host, int argc, char **argv)
{
+ simdata_process_t simdata = xbt_new0(s_simdata_process_t,1);
m_process_t process = xbt_new0(s_m_process_t,1);
- return process;
+
+ xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters");
+
+ /* Simulator Data */
+ simdata->PID = msg_global->PID++;
+ simdata->host = host;
+ simdata->argc = argc;
+ simdata->argv = argv;
+ simdata->smx_process = SIMIX_process_create_with_arguments(name, (smx_process_code_t)code,
+ (void*)process, host->simdata->host, argc, argv );
+
+ if (SIMIX_process_self()) {
+ simdata->PPID = MSG_process_get_PID(SIMIX_process_self()->data);
+ }
+ else simdata->PPID = -1;
+ simdata->last_errno=MSG_OK;
+
+
+ /* Process structure */
+ process->name = xbt_strdup(name);
+ process->simdata = simdata;
+ process->data = data;
+
+ return process;
}
/** \ingroup m_process_management
*/
void MSG_process_kill(m_process_t process)
{
+ xbt_die("not implemented yet");
return;
}
*/
MSG_error_t MSG_process_change_host(m_process_t process, m_host_t host)
{
+ xbt_die("not implemented yet");
return MSG_OK;
}
*/
m_process_t MSG_process_self(void)
{
- return NULL;
+ return (m_process_t)SIMIX_process_self()->data;
}
/** \ingroup m_process_management
*/
MSG_error_t MSG_process_suspend(m_process_t process)
{
- return MSG_OK;
+ xbt_die("not implemented yet");
+ return MSG_OK;
}
/** \ingroup m_process_management
*/
MSG_error_t MSG_process_resume(m_process_t process)
{
+ xbt_die("not implemented yet");
MSG_RETURN(MSG_OK);
}
*/
int MSG_process_is_suspended(m_process_t process)
{
- return 0;
+ xbt_die("not implemented yet");
+ return 0;
}
int __MSG_process_block(double max_duration, const char *info)
*/
void MSG_config(const char *name, ...)
{
+ va_list pa;
+ /* xbt_cfg_dump("msg_cfg_set","",_msg_cfg_set);*/
+ va_start(pa,name);
+ SIMIX_config(name,pa);
+ va_end(pa);
return;
}
#define METASIMGRID_PRIVATE_H
#include "msg/msg.h"
-#include "surf/surf.h"
+#include "simix/simix.h"
#include "xbt/fifo.h"
#include "xbt/dynar.h"
#include "xbt/swag.h"
#include "xbt/dict.h"
#include "xbt/context.h"
#include "xbt/config.h"
-#include "xbt/mallocator.h"
/**************** datatypes **********************************/
typedef struct simdata_host {
- void *host; /* SURF modeling */
+ smx_host_t host; /* SURF modeling */
xbt_fifo_t *mbox; /* array of FIFOs used as a mailboxes */
- m_process_t *sleeping; /* array of process used to know whether a local process is
+ smx_cond_t *sleeping; /* array of process used to know whether a local process is
waiting for a communication on a channel */
- xbt_fifo_t process_list;
+ smx_mutex_t mutex;
} s_simdata_host_t;
/********************************* Task **************************************/
typedef struct simdata_task {
- surf_action_t compute; /* SURF modeling of computation */
- surf_action_t comm; /* SURF modeling of communication */
+ smx_action_t compute; /* SURF modeling of computation */
+ smx_action_t comm; /* SURF modeling of communication */
double message_size; /* Data size */
double computation_amount; /* Computation size */
- xbt_dynar_t sleeping; /* process to wake-up */
+ smx_cond_t cond;
+ smx_mutex_t mutex;
m_process_t sender;
m_host_t source;
double priority;
typedef struct simdata_process {
m_host_t host; /* the host on which the process is running */
- xbt_context_t context; /* the context that executes the scheduler fonction */
+ smx_process_t smx_process;
int PID; /* used for debugging purposes */
int PPID; /* The parent PID */
- m_task_t waiting_task;
+ //m_task_t waiting_task;
int blocked;
int suspended;
m_host_t put_host; /* used for debugging purposes */
int argc; /* arguments number if any */
char **argv; /* arguments table if any */
MSG_error_t last_errno; /* the last value returned by a MSG_function */
- int paje_state; /* the number of states stacked with Paje */
} s_simdata_process_t;
typedef struct process_arg {
/************************** Global variables ********************************/
typedef struct MSG_Global {
xbt_fifo_t host;
- xbt_fifo_t process_to_run;
xbt_fifo_t process_list;
int max_channel;
- m_process_t current_process;
- xbt_dict_t registered_functions;
- FILE *paje_output;
- int paje_maxPID;
int PID;
int session;
- xbt_mallocator_t task_mallocator;
- xbt_mallocator_t task_simdata_mallocator;
} s_MSG_Global_t, *MSG_Global_t;
extern MSG_Global_t msg_global;
-/************************** Configuration support ********************************/
-void msg_config_init(void); /* create the config set, call this before use! */
-void msg_config_finalize(void); /* destroy the config set, call this at cleanup. */
-extern int _msg_init_status; /* 0: beginning of time;
- 1: pre-inited (cfg_set created);
- 2: inited (running) */
-extern xbt_cfg_t _msg_cfg_set;
-
/*************************************************************/
#define PROCESS_SET_ERRNO(val) (MSG_process_self()->simdata->last_errno=val)
get_state(MSG_host_self()->simdata->host)==SURF_CPU_ON,\
"Host failed, you cannot call this function.")
-m_host_t __MSG_host_create(const char *name, void *workstation,
- void *data);
+m_host_t __MSG_host_create(smx_host_t workstation, void *data);
void __MSG_host_destroy(m_host_t host);
void __MSG_task_execute(m_process_t process, m_task_t task);
MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task);
void __MSG_display_process_status(void);
-m_task_t task_mallocator_new_f(void);
-void task_mallocator_free_f(m_task_t task);
-void task_mallocator_reset_f(m_task_t task);
-
-
-
-#define PAJE_PROCESS_STATE(process,state)\
- if(msg_global->paje_output) \
- fprintf(msg_global->paje_output,"10 %f S_t %p %s\n",\
- surf_get_clock(), (process),(state))
-#define PAJE_PROCESS_PUSH_STATE(process,state,task)\
- if(msg_global->paje_output) \
- fprintf(msg_global->paje_output,"11 %f S_t %p %s \"%s\"\n",\
- surf_get_clock(), (process),(state),(task)?((m_task_t)(task))->name:" ")
-#define PAJE_PROCESS_POP_STATE(process)\
- if(msg_global->paje_output) \
- fprintf(msg_global->paje_output,"12 %f S_t %p\n",\
- surf_get_clock(), (process))
-
-#define PAJE_PROCESS_FREE(process)\
- if(msg_global->paje_output) \
- fprintf(msg_global->paje_output,"8 %f %p P_t\n", \
- surf_get_clock(), (process))
-#define PAJE_PROCESS_NEW(process)\
- if(msg_global->paje_output) \
- fprintf(msg_global->paje_output,"7 %f %p P_t %p \"%s %d (%d)\"\n", \
- surf_get_clock(), (process), (process)->simdata->host, \
- (process)->name, (process)->simdata->PID, msg_global->session)
-#define PAJE_COMM_START(process,task,channel)\
- if(msg_global->paje_output) \
- fprintf(msg_global->paje_output,\
- "16 %f Comm CUR \"CHANNEL_%d %s\" %p %p\n", \
- surf_get_clock(), channel, task->name, (process), task)
-#define PAJE_COMM_STOP(process,task,channel)\
- if(msg_global->paje_output) \
- fprintf(msg_global->paje_output,\
- "17 %f Comm CUR \"CHANNEL_%d %s\" %p %p\n", \
- surf_get_clock(), channel, task->name, (process), task)
-#define PAJE_HOST_NEW(host)\
- if(msg_global->paje_output)\
- fprintf(msg_global->paje_output,"7 %f %p H_t CUR \"%s\"\n",surf_get_clock(), \
- host, host->name)
-#define PAJE_HOST_FREE(host)\
- if(msg_global->paje_output)\
- fprintf(msg_global->paje_output,"8 %f %p H_t\n",surf_get_clock(), host);
#endif
m_task_t MSG_task_create(const char *name, double compute_duration,
double message_size, void *data)
{
- m_task_t task = xbt_mallocator_get(msg_global->task_mallocator);
+ m_task_t task = xbt_new(s_m_task_t,1);
+ simdata_task_t simdata = task->simdata;
+ task->simdata = simdata;
+ /* Task structure */
+ task->name = xbt_strdup(name);
+ task->data = data;
+
+ /* Simulator Data */
+ simdata->computation_amount = compute_duration;
+ simdata->message_size = message_size;
+ simdata->rate = -1.0;
+ simdata->priority = 1.0;
+ simdata->using = 1;
+ simdata->sender = NULL;
+ simdata->cond = SIMIX_cond_init();
+ simdata->mutex = SIMIX_mutex_init();
return task;
}
*/
MSG_error_t MSG_task_destroy(m_task_t task)
{
+ smx_action_t action = NULL;
+ xbt_assert0((task != NULL), "Invalid parameter");
+
+ /* why? if somebody is using, then you can't free! ok... but will return MSG_OK? when this task will be destroyed, isn't the code wrong? */
+ task->simdata->using--;
+ if(task->simdata->using>0) return MSG_OK;
+
+ if(task->name) free(task->name);
+
+ SIMIX_cond_destroy(task->simdata->cond);
+ SIMIX_mutex_destroy(task->simdata->mutex);
+
+ action = task->simdata->compute;
+ if(action) SIMIX_action_destroy(action);
+ action = task->simdata->comm;
+ if(action) SIMIX_action_destroy(action);
+ /* parallel tasks only */
+ if(task->simdata->host_list) xbt_free(task->simdata->host_list);
+
+ /* free main structures */
+ xbt_free(task->simdata);
+ xbt_free(task);
+
return MSG_OK;
}
*/
MSG_error_t MSG_task_cancel(m_task_t task)
{
+ xbt_assert0((task != NULL), "Invalid parameter");
+
+ if(task->simdata->compute) {
+ SIMIX_action_cancel(task->simdata->compute);
+ return MSG_OK;
+ }
+ if(task->simdata->comm) {
+ SIMIX_action_cancel(task->simdata->comm);
+ return MSG_OK;
+ }
+
return MSG_FATAL;
}
*/
double MSG_task_get_compute_duration(m_task_t task)
{
- return 0.0;
+ xbt_assert0((task != NULL) && (task->simdata != NULL), "Invalid parameter");
+
+ return task->simdata->computation_amount;
}
/** \ingroup m_task_management
*/
double MSG_task_get_remaining_computation(m_task_t task)
{
- return 0.0;
+ xbt_assert0((task != NULL) && (task->simdata != NULL), "Invalid parameter");
+
+ if(task->simdata->compute) {
+ return SIMIX_action_get_remains(task->simdata->compute);
+ } else {
+ return task->simdata->computation_amount;
+ }
}
/** \ingroup m_task_management
return task->simdata->message_size;
}
-MSG_error_t __MSG_task_wait_event(m_process_t process, m_task_t task)
-{
- return MSG_OK;
-}
/** \ingroup m_task_management
*/
void MSG_task_set_priority(m_task_t task, double priority)
{
+ xbt_assert0((task != NULL) && (task->simdata != NULL), "Invalid parameter");
+ task->simdata->priority = 1/priority;
+ if(task->simdata->compute)
+ SIMIX_action_set_priority(task->simdata->compute, task->simdata->priority);
}
-/* Mallocator functions */
-m_task_t task_mallocator_new_f(void)
-{
- m_task_t task = xbt_new(s_m_task_t, 1);
- simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
- task->simdata = simdata;
- return task;
-}
-
-void task_mallocator_free_f(m_task_t task)
-{
- xbt_assert0((task != NULL), "Invalid parameter");
-
- xbt_free(task->simdata);
- xbt_free(task);
-
- return;
-}
-
-void task_mallocator_reset_f(m_task_t task)
-{
- memset(task->simdata, 0, sizeof(s_simdata_task_t));
-}