Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MSG almost works. It enabled me to find some bug in the SURF. I'm going to optimize...
authoralegrand <alegrand@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 29 Dec 2004 03:15:51 +0000 (03:15 +0000)
committeralegrand <alegrand@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 29 Dec 2004 03:15:51 +0000 (03:15 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@701 48e7efb5-ca39-0410-a469-dd3cf9ba447f

include/msg/msg.h
src/msg/deployment.c
src/msg/global.c
src/msg/gos.c
src/msg/m_process.c
src/msg/private.h
src/msg/task.c
src/surf/cpu.c
testsuite/msg/msg_test.c

index a36cefd..2b4df2d 100644 (file)
@@ -42,7 +42,9 @@ m_host_t *MSG_get_host_table(void);
 m_process_t MSG_process_create(const char *name,
                               m_process_code_t code, void *data,
                               m_host_t host);
-
+m_process_t MSG_process_create_with_arguments(const char *name,
+                                             m_process_code_t code, void *data,
+                                             m_host_t host, int argc, char **argv);
 MSG_error_t MSG_get_arguments(int *argc, char ***argv);
 MSG_error_t MSG_set_arguments(m_process_t process,int argc, char *argv[]);
 
index 3cf1577..b53e22e 100644 (file)
@@ -38,7 +38,7 @@ extern char *yytext;
 void MSG_launch_application(const char *file) 
 {
   char *host_name = NULL;
-  int argc = 0 ;
+  int argc = -1 ;
   char **argv = NULL;
   m_process_t process = NULL ;
   m_host_t host = NULL;
@@ -67,9 +67,10 @@ void MSG_launch_application(const char *file)
       host = MSG_get_host_by_name(host_name);
       xbt_assert1(host, "Unknown host %s",host_name);
 
-     process = MSG_process_create(argv[0], code, NULL, host);
-     MSG_set_arguments(process, argc, argv);
-     xbt_free(host_name); 
+      process = MSG_process_create_with_arguments(argv[0], code, NULL, host,argc,argv);
+      argc=-1;
+      argv=NULL;
+      xbt_free(host_name); 
     }
     else {
       CRITICAL1("Parse error line %d\n", surf_line_pos);
@@ -148,10 +149,9 @@ MSG_error_t MSG_set_arguments(m_process_t process,int argc, char *argv[])
 {
   simdata_process_t simdata = NULL;
 
-  xbt_assert0((process) && (process->simdata), "Invalid parameters");
-  simdata = process->simdata;
-  simdata->argc = argc;
-  simdata->argv = argv;
+  xbt_assert0(0,"Deprecated ! Do not use anymore. "
+             "Use MSG_process_create_with_arguments instead.\n");
+
   return MSG_OK;
 }
 
index fee4cb9..0d13dba 100644 (file)
@@ -30,6 +30,7 @@ void MSG_global_init(void)
     msg_global = xbt_new0(s_MSG_Global_t,1);
 
     surf_init(&argc, argv);    /* Initialize some common structures */
+    xbt_context_init();
     msg_global->host = xbt_fifo_new();
     msg_global->process_to_run = xbt_fifo_new();
     msg_global->process_list = xbt_fifo_new();
@@ -120,19 +121,22 @@ MSG_error_t MSG_main(void)
   fflush(stderr);
 
   surf_solve(); /* Takes traces into account. Returns 0.0 */
-  while (xbt_fifo_size(msg_global->process_list)) {
+  while (xbt_fifo_size(msg_global->process_to_run)) {
     while ((process = xbt_fifo_pop(msg_global->process_to_run))) {
+      fprintf(stderr,"-> %s (%d)\n",process->name, process->simdata->PID);
       msg_global->current_process = process;
-      xbt_context_yield(process->simdata->context);
+      xbt_context_schedule(process->simdata->context);
       msg_global->current_process = NULL;
     }
     Before = MSG_getClock();
     elapsed_time = surf_solve();
     Now = MSG_getClock();
+    fprintf(stderr, "====== %Lg =====\n",Now);
 
-    if (elapsed_time==0.0)
-      break;
-    
+    if (elapsed_time==0.0) {
+      fprintf(stderr, "No change in time\n");
+/*       break; */
+    }
 /*     /\* Handle Failures *\/ */
 /*     { */
 /*       xbt_fifo_t failedHostList = MSG_buildFailedHostList(Before,Now); */
@@ -191,17 +195,34 @@ MSG_error_t MSG_main(void)
     {
       surf_action_t action = NULL;
       surf_resource_t resource = NULL;
+      m_task_t task = NULL;
       
       xbt_dynar_foreach(resource_list, i, resource) {
        while ((action =
               xbt_swag_extract(resource->common_public->states.
                                failed_action_set))) {
-/*       xbt_fifo_insert(msg_global->process_to_run, process); */
+         task = action->data;
+         if(task) {
+           int _cursor;
+           fprintf(stderr,"** %s **\n",task->name);
+           xbt_dynar_foreach(task->simdata->sleeping,_cursor,process) {
+             xbt_fifo_unshift(msg_global->process_to_run, process);
+           }
+           process=NULL;
+         }
        }
        while ((action =
               xbt_swag_extract(resource->common_public->states.
                                done_action_set))) {
-/*       xbt_fifo_insert(msg_global->process_to_run, process); */
+         task = action->data;
+         if(task) {
+           int _cursor;
+           fprintf(stderr,"** %s **\n",task->name);
+           xbt_dynar_foreach(task->simdata->sleeping,_cursor,process) {
+             xbt_fifo_unshift(msg_global->process_to_run, process);
+           }
+           process=NULL;
+         }
        }
       }
     }
@@ -216,7 +237,7 @@ MSG_error_t MSG_main(void)
     fprintf(stderr,"MSG: %d processes are still running, waiting for something.\n",
            nbprocess);
     /*  List the process and their state */
-    fprintf(stderr,"MSG: <process>(<pid>) on <host>(<pid>): <status>.\n");
+    fprintf(stderr,"MSG: <process>(<pid>) on <host>: <status>.\n");
     while ((process=xbt_fifo_pop(msg_global->process_list))) {
       simdata_process_t p_simdata = (simdata_process_t) process->simdata;
       simdata_host_t h_simdata=(simdata_host_t)p_simdata->host->simdata;
@@ -231,16 +252,22 @@ MSG_error_t MSG_main(void)
        }
       }
       if (i==msg_global->max_channel) {
-       if(p_simdata->waiting_task) 
-         fprintf(stderr,"Waiting for %s to finish.\n",p_simdata->waiting_task->name);
+       if(p_simdata->waiting_task) {
+         if(p_simdata->waiting_task->simdata->compute)
+           fprintf(stderr,"Waiting for %s to finish.\n",p_simdata->waiting_task->name);
+         else if (p_simdata->waiting_task->simdata->comm)
+           fprintf(stderr,"Waiting for %s to be finished transfered.\n",
+                   p_simdata->waiting_task->name);
+         else
+           fprintf(stderr,"UNKNOWN STATUS. Please report this bug.\n");
+       }
        else { /* Must be trying to put a task somewhere */
-         CRITICAL0("Well! I don't know. I have to fix this part of the code. ;)");
-/*       if(p_simdata->put_host) { */
-/*         fprintf(stderr,"Trying to send a task on Host %s, channel %d.\n", */
-/*                 p_simdata->put_host->name, p_simdata->put_channel); */
-/*       } else { */
-/*         fprintf(stderr,"UNKNOWN STATUS. Please report this bug.\n"); */
-/*       } */
+         if(p_simdata->put_host) {
+           fprintf(stderr,"Trying to send a task on Host %s, channel %d.\n",
+                   p_simdata->put_host->name, p_simdata->put_channel);
+         } else {
+           fprintf(stderr,"UNKNOWN STATUS. Please report this bug.\n");
+         }
        }       
       } 
     }
index d167c7a..18094ea 100644 (file)
@@ -77,7 +77,8 @@ MSG_error_t MSG_task_get(m_task_t * task,
   t_simdata->using++;
   t_simdata->comm = surf_workstation_resource->extension_public->
     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
-               h, t_simdata->message_size);
+               h->simdata->host, t_simdata->message_size);
+  surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
 
   do {
     __MSG_task_wait_event(process, t);
@@ -157,9 +158,14 @@ MSG_error_t MSG_task_put(m_task_t task,
     
   if(remote_host->simdata->sleeping[channel]) 
     MSG_process_resume(remote_host->simdata->sleeping[channel]);
-  else 
+  else {
+    process->simdata->put_host = dest;
+    process->simdata->put_channel = channel;
     MSG_process_suspend(process);
-  
+    process->simdata->put_host = NULL;
+    process->simdata->put_channel = -1;
+  }
+
   do {
     __MSG_task_wait_event(process, task);
     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
@@ -203,6 +209,7 @@ void __MSG_task_execute(m_process_t process, m_task_t task)
   simdata->compute = surf_workstation_resource->extension_public->
     execute(MSG_process_get_host(process)->simdata->host,
            simdata->computation_amount);
+  surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
 }
 
 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
@@ -247,6 +254,7 @@ MSG_error_t MSG_process_sleep(long double nb_sec)
   simdata->compute = surf_workstation_resource->extension_public->
     sleep(MSG_process_get_host(process)->simdata->host,
            simdata->computation_amount);
+  surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
 
   
   simdata->using++;
index 15ec2c9..a3bd124 100644 (file)
@@ -38,6 +38,19 @@ m_process_t MSG_process_create(const char *name,
                               m_process_code_t code, void *data,
                               m_host_t host)
 {
+  return MSG_process_create_with_arguments(name, code, data, host, -1, NULL);
+}
+
+static void MSG_process_cleanup(void *arg)
+{
+  xbt_fifo_remove(msg_global->process_list, arg);
+  xbt_fifo_remove(msg_global->process_to_run, arg);
+}
+
+m_process_t MSG_process_create_with_arguments(const char *name,
+                                             m_process_code_t code, void *data,
+                                             m_host_t host, int argc, char **argv)
+{
   simdata_process_t simdata = xbt_new0(s_simdata_process_t,1);
   m_process_t process = xbt_new0(s_m_process_t,1);
   m_process_t self = NULL;
@@ -49,9 +62,11 @@ m_process_t MSG_process_create(const char *name,
   simdata->PID = PID++;
   simdata->host = host;
   simdata->waiting_task = NULL;
-  simdata->argc = -1;
-  simdata->argv = NULL;
-  simdata->context = xbt_context_new(code, simdata->argc, simdata->argv);
+  simdata->argc = argc;
+  simdata->argv = argv;
+  simdata->context = xbt_context_new(code, NULL, NULL, 
+                                    MSG_process_cleanup, process, 
+                                    simdata->argc, simdata->argv);
 
   if((self=msg_global->current_process)) {
     simdata->PPID = MSG_process_get_PID(self);
@@ -72,6 +87,10 @@ m_process_t MSG_process_create(const char *name,
   self = msg_global->current_process;
   xbt_context_start(process->simdata->context);
   msg_global->current_process = self;
+
+  xbt_fifo_push(msg_global->process_list, process);
+  xbt_fifo_push(msg_global->process_to_run, process);
+
   return process;
 }
 
@@ -258,12 +277,11 @@ MSG_error_t MSG_process_suspend(m_process_t process)
                "Got a problem in deciding which action to choose !");
     if(simdata_task->compute) 
       surf_workstation_resource->extension_public->suspend(simdata_task->compute);
-    else 
+    else
       surf_workstation_resource->extension_public->suspend(simdata_task->comm);
-
   } else {
     m_task_t dummy = MSG_TASK_UNINITIALIZED;
-    dummy = MSG_task_create("suspended", 0.01, 0, NULL);
+    dummy = MSG_task_create("suspended", 0.0, 0, NULL);
 
     __MSG_task_execute(process,dummy);
     surf_workstation_resource->extension_public->suspend(dummy->simdata->compute);
index b3f64e8..8c8af4d 100644 (file)
@@ -46,6 +46,8 @@ typedef struct simdata_process {
   int PID;                     /* used for debugging purposes */
   int PPID;                    /* The parent PID */
   m_task_t waiting_task;        
+  m_host_t put_host;           /* used for debugging purposes */
+  m_channel_t put_channel;     /* used for debugging purposes */
   int argc;                     /* arguments number if any */
   char **argv;                  /* arguments table if any */
   MSG_error_t last_errno;       /* the last value returned by a MSG_function */
index d0f6001..530be8b 100644 (file)
@@ -80,12 +80,13 @@ MSG_error_t MSG_task_destroy(m_task_t task)
   int i;
 
   xbt_assert0((task != NULL), "Invalid parameter");
-  xbt_assert0((xbt_dynar_length(task->simdata->sleeping)==0), 
-             "Task still used. Cannot destroy it now!");
 
   task->simdata->using--;
   if(task->simdata->using>0) return MSG_OK;
 
+  xbt_assert0((xbt_dynar_length(task->simdata->sleeping)==0), 
+             "Task still used. There is a problem. Cannot destroy it now!");
+
   if(task->name) xbt_free(task->name);
 
   xbt_dynar_free(&(task->simdata->sleeping));
@@ -154,13 +155,20 @@ MSG_error_t MSG_task_destroy(m_task_t task)
 
 MSG_error_t __MSG_task_wait_event(m_process_t process, m_task_t task)
 {
+  int _cursor;
+  m_process_t proc = NULL;
+
   xbt_assert0(((task != NULL)
               && (task->simdata != NULL)), "Invalid parameters");
 
-  xbt_dynar_push(task->simdata->sleeping, process);
+  xbt_dynar_push(task->simdata->sleeping, &process);
   process->simdata->waiting_task = task;
-  xbt_context_yield(process->simdata->context);
+  xbt_context_yield();
   process->simdata->waiting_task = NULL;
+  xbt_dynar_foreach(task->simdata->sleeping,_cursor,proc) {
+    if(proc==process) 
+      xbt_dynar_remove_at(task->simdata->sleeping,_cursor,&proc);
+  }
 
   return MSG_OK;
 }
index e9ecb06..ca1056e 100644 (file)
@@ -199,7 +199,8 @@ static void update_actions_state(double now, double delta)
     if (action->generic_action.max_duration != NO_MAX_DURATION)
       action->generic_action.max_duration -= delta;
 /*     if(action->generic_action.remains<.00001) action->generic_action.remains=0; */
-    if (action->generic_action.remains <= 0) {
+    if ((action->generic_action.remains <= 0) && 
+       (lmm_get_variable_weight(action->variable)>0)) {
       action->generic_action.finish = surf_get_clock();
       action_change_state((surf_action_t) action, SURF_ACTION_DONE);
     } else if ((action->generic_action.max_duration != NO_MAX_DURATION) &&
@@ -313,7 +314,7 @@ static void action_resume(surf_action_t action)
 
 static int action_is_suspended(surf_action_t action)
 {
-  return (lmm_get_variable_weight(maxmin_system, ((surf_action_cpu_t) action)->variable) == 0.0);
+  return (lmm_get_variable_weight(((surf_action_cpu_t) action)->variable) == 0.0);
 }
 
 static e_surf_cpu_state_t get_state(void *cpu)
index 6359c75..e377e44 100644 (file)
@@ -12,7 +12,7 @@
 #include "msg/msg.h"
 
 /** This flag enable the debugging messages from PRINT_DEBUG_MESSAGE() */
-#undef VERBOSE
+#define VERBOSE
 #include "messages.h"
 
 int unix_emitter(int argc, char *argv[]);
@@ -27,6 +27,17 @@ typedef enum {
   MAX_CHANNEL
 } channel_t;
 
+void print_args(int argc, char** argv);
+void print_args(int argc, char** argv)
+{
+  int i ; 
+
+  fprintf(stderr,"<");
+  for(i=0; i<argc; i++) 
+    fprintf(stderr,"%s ",argv[i]);
+  fprintf(stderr,">\n");
+}
+
 /** The number of task each slave will process */
 #define NB_TASK 3
 int unix_emitter(int argc, char *argv[])
@@ -37,6 +48,8 @@ int unix_emitter(int argc, char *argv[])
   m_task_t *todo = NULL;
 
   int i;
+  PRINT_MESSAGE("Hello !");
+  print_args(argc,argv);
 
   {                  /* Process organisation */
     slaves_count = argc - 1;
@@ -93,6 +106,8 @@ int unix_receiver(int argc, char *argv[])
   m_task_t *todo = (m_task_t *) calloc(NB_TASK, sizeof(m_task_t));
   int i;
 
+  PRINT_MESSAGE("Hello !");
+  print_args(argc,argv);
 
   for (i = 0; i < NB_TASK;) {
     int a;
@@ -112,6 +127,7 @@ int unix_receiver(int argc, char *argv[])
     }
   }
   free(todo);
+  PRINT_MESSAGE("I'm done. See you!\n");
   return 0;
 }
 
@@ -135,13 +151,13 @@ void test_all(const char *platform_file,const char *application_file, double sha
     MSG_launch_application(application_file);
   }
   MSG_main();
-/*   printf("Simulation time %g\n",MSG_getClock()); */
-  MSG_clean();
+  printf("Simulation time %Lg\n",MSG_getClock());
+/*   MSG_clean(); */
 }
 
 int main(int argc, char *argv[])
 {
   test_all("msg_platform.txt","msg_deployment.txt",-.1);
-  test_all("msg_platform.txt","msg_deployment.txt",.1);
+/*   test_all("msg_platform.txt","msg_deployment.txt",.1); */
   return (0);
 }