From f537f6705eaf9d1d99186860f2da468a9d755417 Mon Sep 17 00:00:00 2001 From: alegrand Date: Fri, 4 Feb 2005 00:13:24 +0000 Subject: [PATCH] Adding Killall function Adding interface with Paje git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@844 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- include/msg/msg.h | 2 + src/msg/global.c | 210 ++++++++++++++++++++++++++++++++++++++++++-- src/msg/gos.c | 16 +++- src/msg/host.c | 4 + src/msg/m_process.c | 51 +++++++++-- src/msg/private.h | 71 +++++++++++++++ 6 files changed, 342 insertions(+), 12 deletions(-) diff --git a/include/msg/msg.h b/include/msg/msg.h index cff0463536..57edac096e 100644 --- a/include/msg/msg.h +++ b/include/msg/msg.h @@ -22,6 +22,7 @@ MSG_error_t MSG_clean(void); void MSG_function_register(const char *name, m_process_code_t code); m_process_code_t MSG_get_registered_function(const char *name); void MSG_launch_application(const char *file); +void MSG_paje_output(const char *filename); long double MSG_getClock(void); @@ -47,6 +48,7 @@ m_process_t MSG_process_create_with_arguments(const char *name, m_process_code_t code, void *data, m_host_t host, int argc, char **argv); void MSG_process_kill(m_process_t process); +int MSG_process_killall(int reset_PIDs); MSG_error_t MSG_get_arguments(int *argc, char ***argv); MSG_error_t MSG_set_arguments(m_process_t process,int argc, char *argv[]); diff --git a/src/msg/global.c b/src/msg/global.c index c29633b706..877d5f1d46 100644 --- a/src/msg/global.c +++ b/src/msg/global.c @@ -31,6 +31,9 @@ void MSG_global_init(void) MSG_global_init_args(&argc,argv); } +/** \ingroup msg_simulation + * \brief Initialize some MSG internal data. + */ void MSG_global_init_args(int *argc, char **argv) { if (!msg_global) { @@ -44,7 +47,174 @@ void MSG_global_init_args(int *argc, char **argv) msg_global->max_channel = 0; msg_global->current_process = NULL; msg_global->registered_functions = xbt_dict_new(); + msg_global->PID = 1; + } +} + +/** \ingroup msg_easier_life + * \brief Traces MSG events in the Paje format. + */ +void MSG_paje_output(const char *filename) +{ + int i; + const char *paje_preembule="%EventDef SetLimits 0\n" + "% StartTime date\n" + "% EndTime date\n" + "%EndEventDef\n" + "%EventDef PajeDefineContainerType 1\n" + "% NewType string\n" + "% ContainerType string\n" + "% NewName string\n" + "%EndEventDef\n" + "%EventDef PajeDefineEventType 2\n" + "% NewType string\n" + "% ContainerType string\n" + "% NewName string\n" + "%EndEventDef\n" + "%EventDef PajeDefineStateType 3\n" + "% NewType string\n" + "% ContainerType string\n" + "% NewName string\n" + "%EndEventDef\n" + "%EventDef PajeDefineVariableType 4\n" + "% NewType string\n" + "% ContainerType string\n" + "% NewName string\n" + "%EndEventDef\n" + "%EventDef PajeDefineLinkType 5\n" + "% NewType string\n" + "% ContainerType string\n" + "% SourceContainerType string\n" + "% DestContainerType string\n" + "% NewName string\n" + "%EndEventDef\n" + "%EventDef PajeDefineEntityValue 6\n" + "% NewValue string\n" + "% EntityType string\n" + "% NewName string\n" + "%EndEventDef\n" + "%EventDef PajeCreateContainer 7\n" + "% Time date\n" + "% NewContainer string\n" + "% NewContainerType string\n" + "% Container string\n" + "% NewName string\n" + "%EndEventDef\n" + "%EventDef PajeDestroyContainer 8\n" + "% Time date\n" + "% Name string\n" + "% Type string\n" + "%EndEventDef\n" + "%EventDef PajeNewEvent 9\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value string\n" + "%EndEventDef\n" + "%EventDef PajeSetState 10\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value string\n" + "%EndEventDef\n" + "%EventDef PajeSetState 101\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value string\n" + "% FileName string\n" + "% LineNumber int\n" + "%EndEventDef\n" + "%EventDef PajePushState 111\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value string\n" + "% FileName string\n" + "% LineNumber int\n" + "%EndEventDef\n" + "%EventDef PajePushState 11\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value string\n" + "%EndEventDef\n" + "%EventDef PajePopState 12\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "%EndEventDef\n" + "%EventDef PajeSetVariable 13\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value double\n" + "%EndEventDef\n" + "%EventDef PajeAddVariable 14\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value double\n" + "%EndEventDef\n" + "%EventDef PajeSubVariable 15\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value double\n" + "%EndEventDef\n" + "%EventDef PajeStartLink 16\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value string\n" + "% SourceContainer string\n" + "% Key string\n" + "%EndEventDef\n" + "%EventDef PajeEndLink 17\n" + "% Time date\n" + "% EntityType string\n" + "% Container string\n" + "% Value string\n" + "% DestContainer string\n" + "% Key string\n" + "%EndEventDef\n"; + + const char *type_definitions = "1 Sim_t 0 Simulation_t\n" + "1 H_t Sim_t m_host_t\n" + "1 P_t H_t m_process_t\n" + "3 S_t P_t \"Process State\"\n" + "6 E S_t Executing\n" + "6 B S_t Blocked\n" + "6 C S_t Communicating\n" + "5 Comm Sim_t P_t P_t Communication_t\n"; + + const char *ext = ".trace"; + int ext_len = strlen(ext); + int len; + + xbt_assert0(msg_global, "Initialize MSG first\n"); + xbt_assert0(!msg_global->paje_output, "Paje output allready defined\n"); + xbt_assert0(filename, "Need a real file name\n"); + + len = strlen(filename); + if((lenpaje_output=fopen(filename,"w"); + xbt_assert1(msg_global->paje_output, "Failed to open %s \n",filename); + + fprintf(msg_global->paje_output,"%s",paje_preembule); + fprintf(msg_global->paje_output,"%s",type_definitions); + for(i=0; imax_channel; i++) { + fprintf(msg_global->paje_output, "6 COMM_%d Comm \"Channel %d\"\n" ,i,i); + } + fprintf(msg_global->paje_output, + "7 0.0 CUR Sim_t 0 \"MSG simulation\"\n"); + } /** \ingroup msg_simulation @@ -115,8 +285,6 @@ MSG_error_t MSG_main(void) { m_process_t process = NULL; int nbprocess,i; - long double Before=0.0; - long double Now=0.0; double elapsed_time = 0.0; /* Clean IO before the run */ @@ -129,13 +297,16 @@ MSG_error_t MSG_main(void) xbt_context_empty_trash(); while ((process = xbt_fifo_pop(msg_global->process_to_run))) { /* fprintf(stderr,"-> %s (%d)\n",process->name, process->simdata->PID); */ + DEBUG3("Scheduling %s(%d) on %s", + process->name,process->simdata->PID, + process->simdata->host->name); msg_global->current_process = process; xbt_context_schedule(process->simdata->context); msg_global->current_process = NULL; } - Before = MSG_getClock(); + DEBUG1("%Lg : Calling surf_solve",MSG_getClock()); elapsed_time = surf_solve(); - Now = MSG_getClock(); + DEBUG1("Elapsed_time %lg",elapsed_time); /* fprintf(stderr, "====== %Lg =====\n",Now); */ /* if (elapsed_time==0.0) { */ @@ -195,12 +366,13 @@ MSG_error_t MSG_main(void) "MSG: Congratulations ! Simulation terminated : all process are over\n"); return MSG_OK; } else { + xbt_fifo_item_t item = NULL; fprintf(stderr,"MSG: Oops ! Deadlock or code not perfectly clean.\n"); fprintf(stderr,"MSG: %d processes are still running, waiting for something.\n", nbprocess); /* List the process and their state */ fprintf(stderr,"MSG: () on : .\n"); - while ((process=xbt_fifo_pop(msg_global->process_list))) { + xbt_fifo_foreach(msg_global->process_list,item,process,m_process_t) { simdata_process_t p_simdata = (simdata_process_t) process->simdata; simdata_host_t h_simdata=(simdata_host_t)p_simdata->host->simdata; @@ -292,6 +464,30 @@ MSG_error_t MSG_main(void) /* return failedHostList; */ /* } */ +/** \ingroup msg_simulation + * \brief Kill all running process + + * \param reset_PIDs should we reset the PID numbers. A negative + * number means no reset and a positive number will be used to set the PID + * of the next newly created process. + */ +int MSG_process_killall(int reset_PIDs) +{ + xbt_fifo_item_t i = NULL; + m_process_t p = NULL; + + while((p=xbt_fifo_shift(msg_global->process_list))) { + MSG_process_kill(p); + } + xbt_context_empty_trash(); + if(reset_PIDs>0) { + msg_global->PID = reset_PIDs; + msg_global->session++; + } + + return msg_global->PID; +} + /** \ingroup msg_simulation * \brief Clean the MSG simulation */ @@ -315,6 +511,10 @@ MSG_error_t MSG_clean(void) xbt_fifo_free(msg_global->process_list); xbt_dict_free(&(msg_global->registered_functions)); + if(msg_global->paje_output) { + fclose(msg_global->paje_output); + msg_global->paje_output = NULL; + } xbt_free(msg_global); surf_finalize(); diff --git a/src/msg/gos.c b/src/msg/gos.c index 9041afd174..17f0326181 100644 --- a/src/msg/gos.c +++ b/src/msg/gos.c @@ -57,8 +57,10 @@ MSG_error_t MSG_task_get(m_task_t * task, h = MSG_host_self(); h_simdata = h->simdata; while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) { - xbt_assert0(!(h_simdata->sleeping[channel]), - "A process is already blocked on this channel"); + xbt_assert2(!(h_simdata->sleeping[channel]), + "A process (%s(%d)) is already blocked on this channel", + h_simdata->sleeping[channel]->name, + h_simdata->sleeping[channel]->simdata->PID); h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */ __MSG_process_block(); if(surf_workstation_resource->extension_public->get_state(h_simdata->host) @@ -84,6 +86,8 @@ MSG_error_t MSG_task_get(m_task_t * task, if(__MSG_process_isBlocked(t_simdata->sender)) __MSG_process_unblock(t_simdata->sender); + PAJE_PROCESS_STATE(process,"C"); + do { __MSG_task_wait_event(process, t); state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm); @@ -94,6 +98,8 @@ MSG_error_t MSG_task_get(m_task_t * task, xbt_context_yield(); } + PAJE_COMM_STOP(process,t,channel); + if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK); else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) == SURF_CPU_OFF) @@ -192,6 +198,8 @@ MSG_error_t MSG_task_put(m_task_t task, xbt_fifo_push(((simdata_host_t) remote_host->simdata)-> mbox[channel], task); + + PAJE_COMM_START(process,task,channel); if(remote_host->simdata->sleeping[channel]) __MSG_process_unblock(remote_host->simdata->sleeping[channel]); @@ -204,6 +212,8 @@ MSG_error_t MSG_task_put(m_task_t task, process->simdata->put_channel = -1; /* } */ + PAJE_PROCESS_STATE(process,"C"); + state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm); while (state==SURF_ACTION_RUNNING) { __MSG_task_wait_event(process, task); @@ -231,6 +241,7 @@ MSG_error_t MSG_task_put_bounded(m_task_t task, { task->simdata->rate=max_rate; return(MSG_task_put(task, dest, channel)); + task->simdata->rate=-1.0; } /** \ingroup msg_gos_functions @@ -248,6 +259,7 @@ MSG_error_t MSG_task_execute(m_task_t task) m_process_t process = MSG_process_self(); __MSG_task_execute(process, task); + PAJE_PROCESS_STATE(process,"E"); return __MSG_wait_for_computation(process,task); } diff --git a/src/msg/host.c b/src/msg/host.c index 11aa69a61b..d008e2b465 100644 --- a/src/msg/host.c +++ b/src/msg/host.c @@ -36,6 +36,8 @@ m_host_t __MSG_host_create(const char *name, xbt_fifo_push(msg_global->host, host); + PAJE_HOST_NEW(host); + return host; } @@ -110,6 +112,8 @@ void __MSG_host_destroy(m_host_t host) xbt_assert0((host != NULL), "Invalid parameters"); + PAJE_HOST_FREE(host); + /* Clean Simulator data */ simdata = (host)->simdata; diff --git a/src/msg/m_process.c b/src/msg/m_process.c index 2062073325..d049c4aa02 100644 --- a/src/msg/m_process.c +++ b/src/msg/m_process.c @@ -28,6 +28,9 @@ m_process_t MSG_process_create(const char *name, static void MSG_process_cleanup(void *arg) { + + PAJE_PROCESS_FREE(arg); + xbt_fifo_remove(msg_global->process_list, arg); xbt_fifo_remove(msg_global->process_to_run, arg); xbt_fifo_remove(((m_process_t) arg)->simdata->host->simdata->process_list, arg); @@ -67,12 +70,11 @@ m_process_t MSG_process_create_with_arguments(const char *name, simdata_process_t simdata = xbt_new0(s_simdata_process_t,1); m_process_t process = xbt_new0(s_m_process_t,1); m_process_t self = NULL; - static int PID = 1; xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters"); /* Simulator Data */ - simdata->PID = PID++; + simdata->PID = msg_global->PID++; simdata->host = host; simdata->waiting_task = NULL; simdata->argc = argc; @@ -104,6 +106,8 @@ m_process_t MSG_process_create_with_arguments(const char *name, xbt_fifo_push(msg_global->process_list, process); xbt_fifo_push(msg_global->process_to_run, process); + PAJE_PROCESS_NEW(process); + return process; } @@ -114,9 +118,36 @@ m_process_t MSG_process_create_with_arguments(const char *name, */ void MSG_process_kill(m_process_t process) { + int i; + simdata_process_t p_simdata = process->simdata; + simdata_host_t h_simdata= p_simdata->host->simdata; + +/* fprintf(stderr,"Killing %s(%d) on %s.\n",process->name, */ +/* p_simdata->PID,p_simdata->host->name); */ + + for (i=0; imax_channel; i++) { + if (h_simdata->sleeping[i] == process) { + h_simdata->sleeping[i] = NULL; + break; + } + } + if (i==msg_global->max_channel) { + if(p_simdata->waiting_task) { + if(p_simdata->waiting_task->simdata->compute) + surf_workstation_resource->common_public-> + action_free(p_simdata->waiting_task->simdata->compute); + else if (p_simdata->waiting_task->simdata->comm) + surf_workstation_resource->common_public-> + action_change_state(p_simdata->waiting_task->simdata->comm,SURF_ACTION_FAILED); + else + fprintf(stderr,"UNKNOWN STATUS. Please report this bug.\n"); + } else { /* Must be trying to put a task somewhere */ + fprintf(stderr,"UNKNOWN STATUS. Please report this bug.\n"); + } + } + xbt_fifo_remove(msg_global->process_list,process); xbt_context_free(process->simdata->context); - MSG_process_cleanup(process); } /** \ingroup m_process_management @@ -289,6 +320,8 @@ MSG_error_t MSG_process_suspend(m_process_t process) xbt_assert0(((process) && (process->simdata)), "Invalid parameters"); + PAJE_PROCESS_STATE(process,"S"); + if(process!=MSG_process_self()) { simdata = process->simdata; @@ -341,6 +374,8 @@ MSG_error_t MSG_process_resume(m_process_t process) simdata = process->simdata; if(simdata->blocked) { + PAJE_PROCESS_STATE(process,"B"); + simdata->suspended = 0; /* He'll wake up by itself */ MSG_RETURN(MSG_OK); } @@ -352,10 +387,14 @@ MSG_error_t MSG_process_resume(m_process_t process) simdata_task = simdata->waiting_task->simdata; - if(simdata_task->compute) + if(simdata_task->compute) { surf_workstation_resource->common_public->resume(simdata_task->compute); - else + PAJE_PROCESS_STATE(process,"E"); + } + else { + PAJE_PROCESS_STATE(process,"C"); surf_workstation_resource->common_public->resume(simdata_task->comm); + } MSG_RETURN(MSG_OK); } @@ -380,6 +419,8 @@ MSG_error_t __MSG_process_block(void) m_task_t dummy = MSG_TASK_UNINITIALIZED; dummy = MSG_task_create("blocked", 0.0, 0, NULL); + PAJE_PROCESS_STATE(process,"B"); + process->simdata->blocked=1; __MSG_task_execute(process,dummy); surf_workstation_resource->common_public->suspend(dummy->simdata->compute); diff --git a/src/msg/private.h b/src/msg/private.h index 4e47231fee..b2cb6a13a6 100644 --- a/src/msg/private.h +++ b/src/msg/private.h @@ -64,6 +64,10 @@ typedef struct MSG_Global { int max_channel; m_process_t current_process; xbt_dict_t registered_functions; + FILE *paje_output; + int paje_maxPID; + int PID; + int session; } s_MSG_Global_t, *MSG_Global_t; extern MSG_Global_t msg_global; @@ -90,4 +94,71 @@ MSG_error_t __MSG_process_block(void); MSG_error_t __MSG_process_unblock(m_process_t process); int __MSG_process_isBlocked(m_process_t process); +/* #define ALVIN_SPECIAL_LOGING */ +#ifdef ALVIN_SPECIAL_LOGING +#define PAJE_PROCESS_STATE(process,state)\ + if(msg_global->paje_output) \ + fprintf(msg_global->paje_output,"10 %lg S_t P%d %s\n",\ + surf_get_clock(), process->simdata->PID,state) +#define PAJE_PROCESS_FREE(process) +#define PAJE_PROCESS_NEW(process)\ + if((msg_global->paje_output)) {\ + if((msg_global->session==0) || ((msg_global->session>0) && (process->simdata->PID > msg_global->paje_maxPID))) \ + fprintf(msg_global->paje_output,"7 %lg P%d P_t %p \"%s %d (%d)\"\n", \ + surf_get_clock(), process->simdata->PID, process->simdata->host, \ + process->name, process->simdata->PID, msg_global->session);\ + if(msg_global->paje_maxPIDsimdata->PID) msg_global->paje_maxPID=process->simdata->PID;\ + } +#define PAJE_COMM_START(process,task,channel)\ + if(msg_global->paje_output) \ + fprintf(msg_global->paje_output,\ + "16 %lg Comm CUR COMM_%d P%d %p\n", \ + surf_get_clock(), channel, process->simdata->PID, task) +#define PAJE_COMM_STOP(process,task,channel)\ + if(msg_global->paje_output) \ + fprintf(msg_global->paje_output,\ + "17 %lg Comm CUR COMM_%d P%d %p\n", \ + surf_get_clock(), channel, process->simdata->PID, task) +#define PAJE_HOST_NEW(host)\ + if(msg_global->paje_output)\ + fprintf(msg_global->paje_output,"7 %lg %p H_t CUR \"%s\"\n",surf_get_clock(), \ + host, host->name) +#define PAJE_HOST_FREE(host)\ + if(msg_global->paje_output)\ + fprintf(msg_global->paje_output,"8 %lg %p H_t\n",surf_get_clock(), host) + +#else + +#define PAJE_PROCESS_STATE(process,state)\ + if(msg_global->paje_output) \ + fprintf(msg_global->paje_output,"10 %lg S_t %p %s\n",\ + surf_get_clock(), process,state) +#define PAJE_PROCESS_FREE(process)\ + if(msg_global->paje_output) \ + fprintf(msg_global->paje_output,"8 %lg %p P_t\n", \ + surf_get_clock(), process) +#define PAJE_PROCESS_NEW(process)\ + if(msg_global->paje_output) \ + fprintf(msg_global->paje_output,"7 %lg %p P_t %p \"%s %d (%d)\"\n", \ + surf_get_clock(), process, process->simdata->host, \ + process->name, process->simdata->PID, msg_global->session) +#define PAJE_COMM_START(process,task,channel)\ + if(msg_global->paje_output) \ + fprintf(msg_global->paje_output,\ + "16 %lg Comm CUR COMM_%d %p %p\n", \ + surf_get_clock(), channel, process, task) +#define PAJE_COMM_STOP(process,task,channel)\ + if(msg_global->paje_output) \ + fprintf(msg_global->paje_output,\ + "17 %lg Comm CUR COMM_%d %p %p\n", \ + surf_get_clock(), channel, process, task) +#define PAJE_HOST_NEW(host)\ + if(msg_global->paje_output)\ + fprintf(msg_global->paje_output,"7 %lg %p H_t CUR \"%s\"\n",surf_get_clock(), \ + host, host->name) +#define PAJE_HOST_FREE(host)\ + if(msg_global->paje_output)\ + fprintf(msg_global->paje_output,"8 %lg %p H_t\n",surf_get_clock(), host); + +#endif /* Alvin_Special_Loging */ #endif -- 2.20.1