Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add timers to GRAS
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Sun, 13 Feb 2005 22:16:14 +0000 (22:16 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Sun, 13 Feb 2005 22:16:14 +0000 (22:16 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1007 48e7efb5-ca39-0410-a469-dd3cf9ba447f

ChangeLog
include/Makefile.am
include/gras.h
include/gras/timer.h [new file with mode: 0644]
src/Makefile.am
src/gras/Msg/msg.c
src/gras/Msg/msg_interface.h
src/gras/Msg/msg_private.h
src/gras/Msg/timer.c [new file with mode: 0644]
testsuite/Makefile.am
testsuite/run_tests.in

index f95c784..77d2623 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -29,14 +29,18 @@ SimGrid (2.91) unstable; urgency=low
    - Cleanup the known architecture table. Reorder the entries to group what
      should be, and use a more consistant naming scheme.
      (some of the test dataset are still to be regenerated)
-   - Allow library to register globals on each process just as userdata does.
+   - New! Allow library to register globals on each process just as userdata
+     does. 
       This is implemented using a xbt_dict and not a xbt_set, so we loose the
        lookup time (for now).
       Use it in msg and trp.
       This cleans a lot the internals and helps enforcing privacy of the
        headers between the gras components.
+   - New! Add a timer mecanism, not unlike cron(8) and at(1). 
    - Bugfix: gras_os_time was delirious in RL.
-   - Reenable GRAS
+   - Bugfix: gras_trp_select/RL don't run into the wall when asked to select
+     onto 0 sockets.
+   - Reenable GRAS now that it works.
 
  --
 
index 449c553..7c40200 100644 (file)
@@ -18,7 +18,7 @@ nobase_include_HEADERS = \
        gras/datadesc.h gras/transport.h \
        gras/virtu.h gras/cond.h gras/process.h \
        \
-       gras/messages.h \
+       gras/messages.h gras/timer.h\
        \
        amok/base.h \
        amok/bandwidth.h
index e605f71..90ae48f 100644 (file)
@@ -20,5 +20,6 @@
 #include <gras/transport.h>
 #include <gras/datadesc.h>
 #include <gras/messages.h>
+#include <gras/timer.h>
 
 #endif /* GRAS_H */
diff --git a/include/gras/timer.h b/include/gras/timer.h
new file mode 100644 (file)
index 0000000..af1aa56
--- /dev/null
@@ -0,0 +1,55 @@
+/* $Id$ */
+
+/* timer - delayed and repetitive tasks                                     */
+/* module's public interface exported to end user.                          */
+
+/* Copyright (c) 2005 Martin Quinson. All rights reserved.                  */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef GRAS_TIMER_H
+#define GRAS_TIMER_H
+
+#include "xbt/misc.h"
+
+BEGIN_DECL()
+
+/** @addtogroup GRAS_timer
+ *  @brief Delayed and repetitive tasks (Communication facility)
+ *
+ *  This is how to have a specific function called only once after the
+ *  specified amount of time or a function executed every 5 mn until it gets 
+ *  removed. In the UNIX world, this is comparable to <tt>at</tt> and 
+ *  <tt>cron</tt>.
+ *
+ *  Note that this is very soft timers: the execution of the processes won't
+ *  get interrupted at all. This is on purpose: the GRAS programming model
+ *  is distributed sequential, so that users don't have to deal with mutexes
+ *  and such within a specific process.
+ *
+ *  Timers are served by the gras_handle() function: if there is an elapsed 
+ *  timer, the associated code gets executed before any incomming connexion 
+ *  are checked. 
+ *
+ *  @{
+ */
+
+  typedef void (*void_f_void_t)(void);
+
+  void gras_timer_delay(double delay, void_f_void_t action);
+  void gras_timer_repeat(double interval, void_f_void_t action);
+
+  xbt_error_t gras_timer_cancel_delay(double interval, void_f_void_t action);
+  xbt_error_t gras_timer_cancel_repeat(double interval, void_f_void_t action);
+
+  xbt_error_t gras_timer_cancel_delay_all(void);
+  xbt_error_t gras_timer_cancel_repeat_all(void);
+
+  void gras_timer_cancel_all(void);
+
+/** @} */
+
+END_DECL()
+
+#endif /* GRAS_TIMER_H */
index adcf402..75ef81f 100644 (file)
@@ -129,13 +129,11 @@ COMMON_SRC=\
   gras/DataDesc/datadesc_interface.h  gras/DataDesc/datadesc_private.h \
   gras/DataDesc/ddt_parse.c           gras/DataDesc/ddt_parse.yy.c         gras/DataDesc/ddt_parse.yy.h \
   \
-  gras/Msg/msg.c                      \
+  gras/Msg/msg.c                      gras/Msg/timer.c                 \
   gras/Msg/msg_interface.h            gras/Msg/msg_private.h           \
   \
   gras/Virtu/process.c
 
-#gras/Msg/timer.c                 
-
 RL_SRC= \
   gras/Transport/rl_transport.c  gras/Transport/transport_plugin_tcp.c  gras/Transport/transport_plugin_file.c  \
   \
index 50c529f..cb358b3 100644 (file)
@@ -13,6 +13,8 @@
 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
 #include "gras/Virtu/virtu_interface.h"
 
+#define MIN(a,b) ((a) < (b) ? (a) : (b))
+
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
 
 xbt_set_t _gras_msgtype_set = NULL;
@@ -25,8 +27,9 @@ static char *make_namev(const char *name, short int ver);
 static void *gras_msg_procdata_new() {
    gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1);
    
-   res->msg_queue = xbt_dynar_new(sizeof(gras_msg_t),     NULL);
+   res->msg_queue = xbt_dynar_new(sizeof(s_gras_msg_t),   NULL);
    res->cbl_list  = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
+   res->timers    = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
    
    return (void*)res;
 }
@@ -39,6 +42,7 @@ static void gras_msg_procdata_free(void *data) {
    
    xbt_dynar_free(&( res->msg_queue ));
    xbt_dynar_free(&( res->cbl_list ));
+   xbt_dynar_free(&( res->timers ));
 }
 
 /*
@@ -295,7 +299,7 @@ gras_msg_wait(double           timeout,
   double start, now;
   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
   int cpt;
-  gras_msg_t msg;
+  s_gras_msg_t msg;
   
   *expeditor = NULL;
   payload_got = NULL;
@@ -355,10 +359,12 @@ gras_msg_wait(double           timeout,
 xbt_error_t 
 gras_msg_handle(double timeOut) {
   
+  double          untiltimer;
+   
   xbt_error_t    errcode;
   int             cpt;
 
-  gras_msg_t      msg;
+  s_gras_msg_t    msg;
   gras_socket_t   expeditor;
   void           *payload=NULL;
   int             payload_size;
@@ -370,19 +376,41 @@ gras_msg_handle(double timeOut) {
 
   VERB1("Handling message within the next %.2fs",timeOut);
   
+  untiltimer = gras_msg_timer_handle();
+  DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer);
+  if (untiltimer == 0.0) {
+     /* A timer was already elapsed */
+     return no_error;
+  }
+   
   /* get a message (from the queue or from the net) */
   if (xbt_dynar_length(pd->msg_queue)) {
     xbt_dynar_shift(pd->msg_queue,&msg);
     expeditor = msg.expeditor;
     msgtype   = msg.type;
     payload   = msg.payload;
-    
+    errcode   = no_error;
   } else {
-    TRY(gras_trp_select(timeOut, &expeditor));
-    TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+    errcode = gras_trp_select(MIN(timeOut,untiltimer), &expeditor);
+    if (errcode != no_error && errcode != timeout_error)
+       return errcode;
+    if (errcode != timeout_error)
+       TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+  }
+
+  if (errcode == timeout_error && untiltimer < timeOut) {
+     /* A timer elapsed before the arrival of any message even if we select()ed a bit */
+     untiltimer = gras_msg_timer_handle();
+     if (untiltimer == 0.0) {
+       return no_error;
+     } else {
+       WARN1("Weird. I computed that a timer should elapse shortly, but none did (I still should wait %f sec)",
+             untiltimer);
+       return timeout_error;
+     }
   }
-      
-  /* handle it */
+   
+  /* A message was already there or arrived in the meanwhile. handle it */
   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
     if (list->id == msgtype->code) {
       break;
index 28816ec..b16ed5a 100644 (file)
  */
 typedef struct {
   /*queue of msgs storing the ones got while msg_wait'ing for something else */
-  xbt_dynar_t msg_queue; /* elm type: gras_msg_t */
+  xbt_dynar_t msg_queue; /* elm type: s_gras_msg_t */
 
   /* registered callbacks for each message */
   xbt_dynar_t cbl_list; /* elm type: gras_cblist_t */
+   
+  /* registered timers */
+  xbt_dynar_t timers; /* elm type: s_gras_timer_t */
 
 } s_gras_msg_procdata_t,*gras_msg_procdata_t;
 
index b812469..1739d8b 100644 (file)
@@ -24,6 +24,7 @@
 #include "gras/virtu.h"
 
 #include "gras/messages.h"
+#include "gras/timer.h"
 #include "gras_modinter.h"
 
 #include "gras/Msg/msg_interface.h"
@@ -35,7 +36,7 @@ typedef struct {
   gras_msgtype_t  type;
   void           *payload;
   int             payload_size;
-} gras_msg_t;
+} s_gras_msg_t, *gras_msg_t;
 
 /**
  * gras_msgtype_t:
@@ -80,16 +81,15 @@ void gras_cblist_free(void *cbl);
 /* ********* *
  * * TIMER * *
  * ********* */
-typedef void (*void_f_void_t)(void);
-  
 typedef struct {
   double expiry;
   double period;
   void_f_void_t action;
   int repeat;
-} *gras_timer_t;
+} s_gras_timer_t, *gras_timer_t;
 
-extern xbt_dynar_t _gras_timers;
+/* returns 0 if it handled a timer, or the delay until next timer, or -1 if no armed timer */
+double gras_msg_timer_handle(void);
 
 
 #endif  /* GRAS_MESSAGE_PRIVATE_H */
diff --git a/src/gras/Msg/timer.c b/src/gras/Msg/timer.c
new file mode 100644 (file)
index 0000000..8af69cc
--- /dev/null
@@ -0,0 +1,180 @@
+/* $Id$ */
+
+/* timer - Delayed and repetitive actions                                   */
+
+/* Copyright (c) 2005 Martin Quinson. All rights reserved.                  */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+
+#include "gras/Msg/msg_private.h"
+#include "gras/timer.h"
+#include "gras/Virtu/virtu_interface.h"
+
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_timer,gras,
+                               "Delayed and repetitive actions");
+
+/** @brief Request \a action to be called once in \a delay seconds */
+void gras_timer_delay(double delay, void_f_void_t action) {
+  gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+   
+  gras_timer_t timer = xbt_dynar_push_ptr(pd->timers);
+   
+  VERB1("Register delayed action %p", action);
+  timer->period = delay;
+  timer->expiry = delay+gras_os_time();
+  timer->action = action;
+  timer->repeat = FALSE;
+}
+
+/** @brief Request \a action to be called every \a interval seconds */
+void gras_timer_repeat(double interval, void_f_void_t action) {
+  gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+   
+  gras_timer_t timer = xbt_dynar_push_ptr(pd->timers);
+
+  VERB1("Register repetitive action %p", action);
+  timer->period = interval;
+  timer->expiry = interval+gras_os_time();
+  timer->action = action;
+  timer->repeat = TRUE;
+}
+
+/** @brief Cancel a delayed task */
+xbt_error_t gras_timer_cancel_delay(double interval, void_f_void_t action) {
+  gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+  int cursor,found;
+  s_gras_timer_t timer;
+
+  found = FALSE;
+  xbt_dynar_foreach(pd->timers,cursor,timer){
+     if (timer.repeat == FALSE    &&
+        timer.period == interval &&
+        timer.action == action) {
+       
+       found = TRUE;
+       xbt_dynar_cursor_rm(pd->timers, &cursor);
+     }
+  }
+  
+  if (!found)
+     RAISE2(mismatch_error,"Cannot remove the action %p delayed of %f second: not found",
+           action,interval);
+   
+  return no_error;
+}
+
+/** @brief Cancel a repetitive task */
+xbt_error_t gras_timer_cancel_repeat(double interval, void_f_void_t action) {
+  gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+  int cursor,found;
+  s_gras_timer_t timer;
+
+  found = FALSE;
+  xbt_dynar_foreach(pd->timers,cursor,timer){
+     if (timer.repeat == TRUE     &&
+        timer.period == interval &&
+        timer.action == action) {
+       
+       found = TRUE;
+       xbt_dynar_cursor_rm(pd->timers, &cursor);
+     }
+  }
+  
+  if (!found)
+     RAISE2(mismatch_error,"Cannot remove the action %p delayed of %f second: not found",
+           action,interval);
+   
+  return no_error;
+}
+
+/** @brief Cancel all delayed tasks */
+xbt_error_t gras_timer_cancel_delay_all(void) {
+  gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+  int cursor, found;
+  s_gras_timer_t timer;
+
+  found = FALSE;
+  xbt_dynar_foreach(pd->timers,cursor,timer){
+     if (timer.repeat   == FALSE) {
+       
+       found = TRUE;
+       xbt_dynar_cursor_rm(pd->timers, &cursor);
+     }
+  }
+  
+  if (!found)
+     RAISE0(mismatch_error,"No delayed action to remove");
+   
+  return no_error;
+}
+
+/** @brief Cancel all repetitive tasks */
+xbt_error_t gras_timer_cancel_repeat_all(void){
+  gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+  int cursor, found;
+  s_gras_timer_t timer;
+
+  found = FALSE;
+  xbt_dynar_foreach(pd->timers,cursor,timer){
+     if (timer.repeat   == FALSE) {
+       
+       found = TRUE;
+       xbt_dynar_cursor_rm(pd->timers, &cursor);
+     }
+  }
+  
+  if (!found)
+     RAISE0(mismatch_error,"No repetitive action to remove");
+   
+  return no_error;
+}
+
+/** @brief Cancel all delayed and repetitive tasks */
+void gras_timer_cancel_all(void) {
+  gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+  xbt_dynar_reset( pd->timers );
+}
+
+   
+/* returns 0 if it handled a timer, or the delay until next timer, or -1 if no armed timer */
+double gras_msg_timer_handle(void) {
+  gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+  int cursor;
+  gras_timer_t timer;
+  double now=gras_os_time();
+  double untilnext = -1.0;
+  
+  for (cursor=0; cursor < xbt_dynar_length(pd->timers); cursor++) {
+     double untilthis;
+     
+     timer = xbt_dynar_get_ptr (pd->timers, cursor);
+     untilthis = timer->expiry - now;
+     
+     DEBUG2("Action %p expires in %f", timer->action, untilthis);
+     
+     if (untilthis <= 0.0) {
+       
+       DEBUG5("[%.0f] Serve %s action %p (%f<%f)",gras_os_time(),
+             timer->repeat ? "repetitive" : "delayed", timer->action,
+             timer->expiry, now);
+       timer->action();
+       
+       if (timer->repeat) {    
+          timer->expiry = now + timer->period;
+          DEBUG4("[%.0f] Re-arm repetitive action %p for %f (period=%f)",
+                gras_os_time(),
+                timer->action, timer->expiry, timer->period);
+       } else {
+          DEBUG2("[%.0f] Remove %p now that it's done", gras_os_time(), timer->action);
+          xbt_dynar_cursor_rm(pd->timers, &cursor);
+       }
+       return 0.0;
+     } else if (untilthis < untilnext || untilnext == -1) {
+       untilnext = untilthis;
+     }
+  }
+  return untilnext;
+}
index d667155..e836cda 100644 (file)
@@ -33,7 +33,8 @@ xbt_tests =                                                      \
 RL_tests =                                              \
        gras/trp_tcp_client   gras/trp_tcp_server       \
        gras/trp_file_client  gras/trp_file_server      \
-       gras/datadesc_usage
+       gras/datadesc_usage                             \
+       gras/timer_usage
 
 SG_tests =                                              \
        surf/maxmin_usage surf/maxmin_bench \
@@ -83,8 +84,9 @@ gras_trp_tcp_client_LDADD=     $(LDADD_RL)
 gras_trp_tcp_server_LDADD=     $(LDADD_RL)
 gras_trp_file_client_LDADD=    $(LDADD_RL)
 gras_trp_file_server_LDADD=    $(LDADD_RL)
+gras_timer_usage_LDADD=        $(LDADD_RL)
 
-gras_datadesc_usage_SOURCES= gras/datadesc_usage.c gras/datadesc_structs.c
+gras_datadesc_usage_SOURCES=   gras/datadesc_usage.c gras/datadesc_structs.c
 gras_datadesc_usage_LDADD=     $(LDADD_RL)
 
 gras/datadesc_structs.c: gras/mk_datadesc_structs.pl
index 2deaf29..eb7c46a 100755 (executable)
@@ -43,6 +43,7 @@ surf_TESTS="surf/maxmin_usage@EXEEXT@;                                        \
 msg_TESTS=" msg/msg_test@EXEEXT@ --surf-path=@top_srcdir@/examples/msg/;"
 
 gras_TESTS="gras/trp_tcp_usage;      gras/trp_file_usage;                        \
+           gras/timer_usage@EXEEXT@;                                            \
            gras/datadesc_usage@EXEEXT@;                                         \
             gras/datadesc_usage@EXEEXT@ --read @srcdir@/gras/datadesc.little32;  \
            gras/datadesc_usage@EXEEXT@ --read @srcdir@/gras/datadesc.little32_4;\