void MSG_function_register(const char *name, m_process_code_t code);
m_process_code_t MSG_get_registered_function(const char *name);
void MSG_launch_application(const char *file);
+void MSG_paje_output(const char *filename);
long double MSG_getClock(void);
m_process_code_t code, void *data,
m_host_t host, int argc, char **argv);
void MSG_process_kill(m_process_t process);
+int MSG_process_killall(int reset_PIDs);
MSG_error_t MSG_get_arguments(int *argc, char ***argv);
MSG_error_t MSG_set_arguments(m_process_t process,int argc, char *argv[]);
MSG_global_init_args(&argc,argv);
}
+/** \ingroup msg_simulation
+ * \brief Initialize some MSG internal data.
+ */
void MSG_global_init_args(int *argc, char **argv)
{
if (!msg_global) {
msg_global->max_channel = 0;
msg_global->current_process = NULL;
msg_global->registered_functions = xbt_dict_new();
+ msg_global->PID = 1;
+ }
+}
+
+/** \ingroup msg_easier_life
+ * \brief Traces MSG events in the Paje format.
+ */
+void MSG_paje_output(const char *filename)
+{
+ int i;
+ const char *paje_preembule="%EventDef SetLimits 0\n"
+ "% StartTime date\n"
+ "% EndTime date\n"
+ "%EndEventDef\n"
+ "%EventDef PajeDefineContainerType 1\n"
+ "% NewType string\n"
+ "% ContainerType string\n"
+ "% NewName string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeDefineEventType 2\n"
+ "% NewType string\n"
+ "% ContainerType string\n"
+ "% NewName string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeDefineStateType 3\n"
+ "% NewType string\n"
+ "% ContainerType string\n"
+ "% NewName string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeDefineVariableType 4\n"
+ "% NewType string\n"
+ "% ContainerType string\n"
+ "% NewName string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeDefineLinkType 5\n"
+ "% NewType string\n"
+ "% ContainerType string\n"
+ "% SourceContainerType string\n"
+ "% DestContainerType string\n"
+ "% NewName string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeDefineEntityValue 6\n"
+ "% NewValue string\n"
+ "% EntityType string\n"
+ "% NewName string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeCreateContainer 7\n"
+ "% Time date\n"
+ "% NewContainer string\n"
+ "% NewContainerType string\n"
+ "% Container string\n"
+ "% NewName string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeDestroyContainer 8\n"
+ "% Time date\n"
+ "% Name string\n"
+ "% Type string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeNewEvent 9\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeSetState 10\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeSetState 101\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value string\n"
+ "% FileName string\n"
+ "% LineNumber int\n"
+ "%EndEventDef\n"
+ "%EventDef PajePushState 111\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value string\n"
+ "% FileName string\n"
+ "% LineNumber int\n"
+ "%EndEventDef\n"
+ "%EventDef PajePushState 11\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value string\n"
+ "%EndEventDef\n"
+ "%EventDef PajePopState 12\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeSetVariable 13\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value double\n"
+ "%EndEventDef\n"
+ "%EventDef PajeAddVariable 14\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value double\n"
+ "%EndEventDef\n"
+ "%EventDef PajeSubVariable 15\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value double\n"
+ "%EndEventDef\n"
+ "%EventDef PajeStartLink 16\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value string\n"
+ "% SourceContainer string\n"
+ "% Key string\n"
+ "%EndEventDef\n"
+ "%EventDef PajeEndLink 17\n"
+ "% Time date\n"
+ "% EntityType string\n"
+ "% Container string\n"
+ "% Value string\n"
+ "% DestContainer string\n"
+ "% Key string\n"
+ "%EndEventDef\n";
+
+ const char *type_definitions = "1 Sim_t 0 Simulation_t\n"
+ "1 H_t Sim_t m_host_t\n"
+ "1 P_t H_t m_process_t\n"
+ "3 S_t P_t \"Process State\"\n"
+ "6 E S_t Executing\n"
+ "6 B S_t Blocked\n"
+ "6 C S_t Communicating\n"
+ "5 Comm Sim_t P_t P_t Communication_t\n";
+
+ const char *ext = ".trace";
+ int ext_len = strlen(ext);
+ int len;
+
+ xbt_assert0(msg_global, "Initialize MSG first\n");
+ xbt_assert0(!msg_global->paje_output, "Paje output allready defined\n");
+ xbt_assert0(filename, "Need a real file name\n");
+
+ len = strlen(filename);
+ if((len<ext_len) || (strncmp(filename+len-ext_len,ext,ext_len))) {
+ CRITICAL2("%s does not end by \"%s\". It may cause troubles when using Paje\n",
+ filename,ext);
}
+
+ xbt_assert0(filename, "Need a real file name\n");
+
+ msg_global->paje_output=fopen(filename,"w");
+ xbt_assert1(msg_global->paje_output, "Failed to open %s \n",filename);
+
+ fprintf(msg_global->paje_output,"%s",paje_preembule);
+ fprintf(msg_global->paje_output,"%s",type_definitions);
+ for(i=0; i<msg_global->max_channel; i++) {
+ fprintf(msg_global->paje_output, "6 COMM_%d Comm \"Channel %d\"\n" ,i,i);
+ }
+ fprintf(msg_global->paje_output,
+ "7 0.0 CUR Sim_t 0 \"MSG simulation\"\n");
+
}
/** \ingroup msg_simulation
{
m_process_t process = NULL;
int nbprocess,i;
- long double Before=0.0;
- long double Now=0.0;
double elapsed_time = 0.0;
/* Clean IO before the run */
xbt_context_empty_trash();
while ((process = xbt_fifo_pop(msg_global->process_to_run))) {
/* fprintf(stderr,"-> %s (%d)\n",process->name, process->simdata->PID); */
+ DEBUG3("Scheduling %s(%d) on %s",
+ process->name,process->simdata->PID,
+ process->simdata->host->name);
msg_global->current_process = process;
xbt_context_schedule(process->simdata->context);
msg_global->current_process = NULL;
}
- Before = MSG_getClock();
+ DEBUG1("%Lg : Calling surf_solve",MSG_getClock());
elapsed_time = surf_solve();
- Now = MSG_getClock();
+ DEBUG1("Elapsed_time %lg",elapsed_time);
/* fprintf(stderr, "====== %Lg =====\n",Now); */
/* if (elapsed_time==0.0) { */
"MSG: Congratulations ! Simulation terminated : all process are over\n");
return MSG_OK;
} else {
+ xbt_fifo_item_t item = NULL;
fprintf(stderr,"MSG: Oops ! Deadlock or code not perfectly clean.\n");
fprintf(stderr,"MSG: %d processes are still running, waiting for something.\n",
nbprocess);
/* List the process and their state */
fprintf(stderr,"MSG: <process>(<pid>) on <host>: <status>.\n");
- while ((process=xbt_fifo_pop(msg_global->process_list))) {
+ xbt_fifo_foreach(msg_global->process_list,item,process,m_process_t) {
simdata_process_t p_simdata = (simdata_process_t) process->simdata;
simdata_host_t h_simdata=(simdata_host_t)p_simdata->host->simdata;
/* return failedHostList; */
/* } */
+/** \ingroup msg_simulation
+ * \brief Kill all running process
+
+ * \param reset_PIDs should we reset the PID numbers. A negative
+ * number means no reset and a positive number will be used to set the PID
+ * of the next newly created process.
+ */
+int MSG_process_killall(int reset_PIDs)
+{
+ xbt_fifo_item_t i = NULL;
+ m_process_t p = NULL;
+
+ while((p=xbt_fifo_shift(msg_global->process_list))) {
+ MSG_process_kill(p);
+ }
+ xbt_context_empty_trash();
+ if(reset_PIDs>0) {
+ msg_global->PID = reset_PIDs;
+ msg_global->session++;
+ }
+
+ return msg_global->PID;
+}
+
/** \ingroup msg_simulation
* \brief Clean the MSG simulation
*/
xbt_fifo_free(msg_global->process_list);
xbt_dict_free(&(msg_global->registered_functions));
+ if(msg_global->paje_output) {
+ fclose(msg_global->paje_output);
+ msg_global->paje_output = NULL;
+ }
xbt_free(msg_global);
surf_finalize();
h = MSG_host_self();
h_simdata = h->simdata;
while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
- xbt_assert0(!(h_simdata->sleeping[channel]),
- "A process is already blocked on this channel");
+ xbt_assert2(!(h_simdata->sleeping[channel]),
+ "A process (%s(%d)) is already blocked on this channel",
+ h_simdata->sleeping[channel]->name,
+ h_simdata->sleeping[channel]->simdata->PID);
h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
__MSG_process_block();
if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
if(__MSG_process_isBlocked(t_simdata->sender))
__MSG_process_unblock(t_simdata->sender);
+ PAJE_PROCESS_STATE(process,"C");
+
do {
__MSG_task_wait_event(process, t);
state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
xbt_context_yield();
}
+ PAJE_COMM_STOP(process,t,channel);
+
if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
== SURF_CPU_OFF)
xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
mbox[channel], task);
+
+ PAJE_COMM_START(process,task,channel);
if(remote_host->simdata->sleeping[channel])
__MSG_process_unblock(remote_host->simdata->sleeping[channel]);
process->simdata->put_channel = -1;
/* } */
+ PAJE_PROCESS_STATE(process,"C");
+
state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
while (state==SURF_ACTION_RUNNING) {
__MSG_task_wait_event(process, task);
{
task->simdata->rate=max_rate;
return(MSG_task_put(task, dest, channel));
+ task->simdata->rate=-1.0;
}
/** \ingroup msg_gos_functions
m_process_t process = MSG_process_self();
__MSG_task_execute(process, task);
+ PAJE_PROCESS_STATE(process,"E");
return __MSG_wait_for_computation(process,task);
}
xbt_fifo_push(msg_global->host, host);
+ PAJE_HOST_NEW(host);
+
return host;
}
xbt_assert0((host != NULL), "Invalid parameters");
+ PAJE_HOST_FREE(host);
+
/* Clean Simulator data */
simdata = (host)->simdata;
static void MSG_process_cleanup(void *arg)
{
+
+ PAJE_PROCESS_FREE(arg);
+
xbt_fifo_remove(msg_global->process_list, arg);
xbt_fifo_remove(msg_global->process_to_run, arg);
xbt_fifo_remove(((m_process_t) arg)->simdata->host->simdata->process_list, arg);
simdata_process_t simdata = xbt_new0(s_simdata_process_t,1);
m_process_t process = xbt_new0(s_m_process_t,1);
m_process_t self = NULL;
- static int PID = 1;
xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters");
/* Simulator Data */
- simdata->PID = PID++;
+ simdata->PID = msg_global->PID++;
simdata->host = host;
simdata->waiting_task = NULL;
simdata->argc = argc;
xbt_fifo_push(msg_global->process_list, process);
xbt_fifo_push(msg_global->process_to_run, process);
+ PAJE_PROCESS_NEW(process);
+
return process;
}
*/
void MSG_process_kill(m_process_t process)
{
+ int i;
+ simdata_process_t p_simdata = process->simdata;
+ simdata_host_t h_simdata= p_simdata->host->simdata;
+
+/* fprintf(stderr,"Killing %s(%d) on %s.\n",process->name, */
+/* p_simdata->PID,p_simdata->host->name); */
+
+ for (i=0; i<msg_global->max_channel; i++) {
+ if (h_simdata->sleeping[i] == process) {
+ h_simdata->sleeping[i] = NULL;
+ break;
+ }
+ }
+ if (i==msg_global->max_channel) {
+ if(p_simdata->waiting_task) {
+ if(p_simdata->waiting_task->simdata->compute)
+ surf_workstation_resource->common_public->
+ action_free(p_simdata->waiting_task->simdata->compute);
+ else if (p_simdata->waiting_task->simdata->comm)
+ surf_workstation_resource->common_public->
+ action_change_state(p_simdata->waiting_task->simdata->comm,SURF_ACTION_FAILED);
+ else
+ fprintf(stderr,"UNKNOWN STATUS. Please report this bug.\n");
+ } else { /* Must be trying to put a task somewhere */
+ fprintf(stderr,"UNKNOWN STATUS. Please report this bug.\n");
+ }
+ }
+
xbt_fifo_remove(msg_global->process_list,process);
xbt_context_free(process->simdata->context);
- MSG_process_cleanup(process);
}
/** \ingroup m_process_management
xbt_assert0(((process) && (process->simdata)), "Invalid parameters");
+ PAJE_PROCESS_STATE(process,"S");
+
if(process!=MSG_process_self()) {
simdata = process->simdata;
simdata = process->simdata;
if(simdata->blocked) {
+ PAJE_PROCESS_STATE(process,"B");
+
simdata->suspended = 0; /* He'll wake up by itself */
MSG_RETURN(MSG_OK);
}
simdata_task = simdata->waiting_task->simdata;
- if(simdata_task->compute)
+ if(simdata_task->compute) {
surf_workstation_resource->common_public->resume(simdata_task->compute);
- else
+ PAJE_PROCESS_STATE(process,"E");
+ }
+ else {
+ PAJE_PROCESS_STATE(process,"C");
surf_workstation_resource->common_public->resume(simdata_task->comm);
+ }
MSG_RETURN(MSG_OK);
}
m_task_t dummy = MSG_TASK_UNINITIALIZED;
dummy = MSG_task_create("blocked", 0.0, 0, NULL);
+ PAJE_PROCESS_STATE(process,"B");
+
process->simdata->blocked=1;
__MSG_task_execute(process,dummy);
surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
int max_channel;
m_process_t current_process;
xbt_dict_t registered_functions;
+ FILE *paje_output;
+ int paje_maxPID;
+ int PID;
+ int session;
} s_MSG_Global_t, *MSG_Global_t;
extern MSG_Global_t msg_global;
MSG_error_t __MSG_process_unblock(m_process_t process);
int __MSG_process_isBlocked(m_process_t process);
+/* #define ALVIN_SPECIAL_LOGING */
+#ifdef ALVIN_SPECIAL_LOGING
+#define PAJE_PROCESS_STATE(process,state)\
+ if(msg_global->paje_output) \
+ fprintf(msg_global->paje_output,"10 %lg S_t P%d %s\n",\
+ surf_get_clock(), process->simdata->PID,state)
+#define PAJE_PROCESS_FREE(process)
+#define PAJE_PROCESS_NEW(process)\
+ if((msg_global->paje_output)) {\
+ if((msg_global->session==0) || ((msg_global->session>0) && (process->simdata->PID > msg_global->paje_maxPID))) \
+ fprintf(msg_global->paje_output,"7 %lg P%d P_t %p \"%s %d (%d)\"\n", \
+ surf_get_clock(), process->simdata->PID, process->simdata->host, \
+ process->name, process->simdata->PID, msg_global->session);\
+ if(msg_global->paje_maxPID<process->simdata->PID) msg_global->paje_maxPID=process->simdata->PID;\
+ }
+#define PAJE_COMM_START(process,task,channel)\
+ if(msg_global->paje_output) \
+ fprintf(msg_global->paje_output,\
+ "16 %lg Comm CUR COMM_%d P%d %p\n", \
+ surf_get_clock(), channel, process->simdata->PID, task)
+#define PAJE_COMM_STOP(process,task,channel)\
+ if(msg_global->paje_output) \
+ fprintf(msg_global->paje_output,\
+ "17 %lg Comm CUR COMM_%d P%d %p\n", \
+ surf_get_clock(), channel, process->simdata->PID, task)
+#define PAJE_HOST_NEW(host)\
+ if(msg_global->paje_output)\
+ fprintf(msg_global->paje_output,"7 %lg %p H_t CUR \"%s\"\n",surf_get_clock(), \
+ host, host->name)
+#define PAJE_HOST_FREE(host)\
+ if(msg_global->paje_output)\
+ fprintf(msg_global->paje_output,"8 %lg %p H_t\n",surf_get_clock(), host)
+
+#else
+
+#define PAJE_PROCESS_STATE(process,state)\
+ if(msg_global->paje_output) \
+ fprintf(msg_global->paje_output,"10 %lg S_t %p %s\n",\
+ surf_get_clock(), process,state)
+#define PAJE_PROCESS_FREE(process)\
+ if(msg_global->paje_output) \
+ fprintf(msg_global->paje_output,"8 %lg %p P_t\n", \
+ surf_get_clock(), process)
+#define PAJE_PROCESS_NEW(process)\
+ if(msg_global->paje_output) \
+ fprintf(msg_global->paje_output,"7 %lg %p P_t %p \"%s %d (%d)\"\n", \
+ surf_get_clock(), process, process->simdata->host, \
+ process->name, process->simdata->PID, msg_global->session)
+#define PAJE_COMM_START(process,task,channel)\
+ if(msg_global->paje_output) \
+ fprintf(msg_global->paje_output,\
+ "16 %lg Comm CUR COMM_%d %p %p\n", \
+ surf_get_clock(), channel, process, task)
+#define PAJE_COMM_STOP(process,task,channel)\
+ if(msg_global->paje_output) \
+ fprintf(msg_global->paje_output,\
+ "17 %lg Comm CUR COMM_%d %p %p\n", \
+ surf_get_clock(), channel, process, task)
+#define PAJE_HOST_NEW(host)\
+ if(msg_global->paje_output)\
+ fprintf(msg_global->paje_output,"7 %lg %p H_t CUR \"%s\"\n",surf_get_clock(), \
+ host, host->name)
+#define PAJE_HOST_FREE(host)\
+ if(msg_global->paje_output)\
+ fprintf(msg_global->paje_output,"8 %lg %p H_t\n",surf_get_clock(), host);
+
+#endif /* Alvin_Special_Loging */
#endif