From 0d379536a99d65399ab2a1b212ecd5a2718945dc Mon Sep 17 00:00:00 2001 From: mquinson Date: Sun, 13 Feb 2005 22:16:14 +0000 Subject: [PATCH] add timers to GRAS git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1007 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- ChangeLog | 8 +- include/Makefile.am | 2 +- include/gras.h | 1 + include/gras/timer.h | 55 +++++++++++ src/Makefile.am | 4 +- src/gras/Msg/msg.c | 44 +++++++-- src/gras/Msg/msg_interface.h | 5 +- src/gras/Msg/msg_private.h | 10 +- src/gras/Msg/timer.c | 180 +++++++++++++++++++++++++++++++++++ testsuite/Makefile.am | 6 +- testsuite/run_tests.in | 1 + 11 files changed, 294 insertions(+), 22 deletions(-) create mode 100644 include/gras/timer.h create mode 100644 src/gras/Msg/timer.c diff --git a/ChangeLog b/ChangeLog index f95c784812..77d2623283 100644 --- a/ChangeLog +++ b/ChangeLog @@ -29,14 +29,18 @@ SimGrid (2.91) unstable; urgency=low - Cleanup the known architecture table. Reorder the entries to group what should be, and use a more consistant naming scheme. (some of the test dataset are still to be regenerated) - - Allow library to register globals on each process just as userdata does. + - New! Allow library to register globals on each process just as userdata + does. This is implemented using a xbt_dict and not a xbt_set, so we loose the lookup time (for now). Use it in msg and trp. This cleans a lot the internals and helps enforcing privacy of the headers between the gras components. + - New! Add a timer mecanism, not unlike cron(8) and at(1). - Bugfix: gras_os_time was delirious in RL. - - Reenable GRAS + - Bugfix: gras_trp_select/RL don't run into the wall when asked to select + onto 0 sockets. + - Reenable GRAS now that it works. -- diff --git a/include/Makefile.am b/include/Makefile.am index 449c553f37..7c40200f69 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -18,7 +18,7 @@ nobase_include_HEADERS = \ gras/datadesc.h gras/transport.h \ gras/virtu.h gras/cond.h gras/process.h \ \ - gras/messages.h \ + gras/messages.h gras/timer.h\ \ amok/base.h \ amok/bandwidth.h diff --git a/include/gras.h b/include/gras.h index e605f71af4..90ae48fd5c 100644 --- a/include/gras.h +++ b/include/gras.h @@ -20,5 +20,6 @@ #include #include #include +#include #endif /* GRAS_H */ diff --git a/include/gras/timer.h b/include/gras/timer.h new file mode 100644 index 0000000000..af1aa56037 --- /dev/null +++ b/include/gras/timer.h @@ -0,0 +1,55 @@ +/* $Id$ */ + +/* timer - delayed and repetitive tasks */ +/* module's public interface exported to end user. */ + +/* Copyright (c) 2005 Martin Quinson. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef GRAS_TIMER_H +#define GRAS_TIMER_H + +#include "xbt/misc.h" + +BEGIN_DECL() + +/** @addtogroup GRAS_timer + * @brief Delayed and repetitive tasks (Communication facility) + * + * This is how to have a specific function called only once after the + * specified amount of time or a function executed every 5 mn until it gets + * removed. In the UNIX world, this is comparable to at and + * cron. + * + * Note that this is very soft timers: the execution of the processes won't + * get interrupted at all. This is on purpose: the GRAS programming model + * is distributed sequential, so that users don't have to deal with mutexes + * and such within a specific process. + * + * Timers are served by the gras_handle() function: if there is an elapsed + * timer, the associated code gets executed before any incomming connexion + * are checked. + * + * @{ + */ + + typedef void (*void_f_void_t)(void); + + void gras_timer_delay(double delay, void_f_void_t action); + void gras_timer_repeat(double interval, void_f_void_t action); + + xbt_error_t gras_timer_cancel_delay(double interval, void_f_void_t action); + xbt_error_t gras_timer_cancel_repeat(double interval, void_f_void_t action); + + xbt_error_t gras_timer_cancel_delay_all(void); + xbt_error_t gras_timer_cancel_repeat_all(void); + + void gras_timer_cancel_all(void); + +/** @} */ + +END_DECL() + +#endif /* GRAS_TIMER_H */ diff --git a/src/Makefile.am b/src/Makefile.am index adcf402393..75ef81f5c5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -129,13 +129,11 @@ COMMON_SRC=\ gras/DataDesc/datadesc_interface.h gras/DataDesc/datadesc_private.h \ gras/DataDesc/ddt_parse.c gras/DataDesc/ddt_parse.yy.c gras/DataDesc/ddt_parse.yy.h \ \ - gras/Msg/msg.c \ + gras/Msg/msg.c gras/Msg/timer.c \ gras/Msg/msg_interface.h gras/Msg/msg_private.h \ \ gras/Virtu/process.c -#gras/Msg/timer.c - RL_SRC= \ gras/Transport/rl_transport.c gras/Transport/transport_plugin_tcp.c gras/Transport/transport_plugin_file.c \ \ diff --git a/src/gras/Msg/msg.c b/src/gras/Msg/msg.c index 50c529f1db..cb358b3415 100644 --- a/src/gras/Msg/msg.c +++ b/src/gras/Msg/msg.c @@ -13,6 +13,8 @@ #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */ #include "gras/Virtu/virtu_interface.h" +#define MIN(a,b) ((a) < (b) ? (a) : (b)) + XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging"); xbt_set_t _gras_msgtype_set = NULL; @@ -25,8 +27,9 @@ static char *make_namev(const char *name, short int ver); static void *gras_msg_procdata_new() { gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1); - res->msg_queue = xbt_dynar_new(sizeof(gras_msg_t), NULL); + res->msg_queue = xbt_dynar_new(sizeof(s_gras_msg_t), NULL); res->cbl_list = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free); + res->timers = xbt_dynar_new(sizeof(s_gras_timer_t), NULL); return (void*)res; } @@ -39,6 +42,7 @@ static void gras_msg_procdata_free(void *data) { xbt_dynar_free(&( res->msg_queue )); xbt_dynar_free(&( res->cbl_list )); + xbt_dynar_free(&( res->timers )); } /* @@ -295,7 +299,7 @@ gras_msg_wait(double timeout, double start, now; gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg"); int cpt; - gras_msg_t msg; + s_gras_msg_t msg; *expeditor = NULL; payload_got = NULL; @@ -355,10 +359,12 @@ gras_msg_wait(double timeout, xbt_error_t gras_msg_handle(double timeOut) { + double untiltimer; + xbt_error_t errcode; int cpt; - gras_msg_t msg; + s_gras_msg_t msg; gras_socket_t expeditor; void *payload=NULL; int payload_size; @@ -370,19 +376,41 @@ gras_msg_handle(double timeOut) { VERB1("Handling message within the next %.2fs",timeOut); + untiltimer = gras_msg_timer_handle(); + DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer); + if (untiltimer == 0.0) { + /* A timer was already elapsed */ + return no_error; + } + /* get a message (from the queue or from the net) */ if (xbt_dynar_length(pd->msg_queue)) { xbt_dynar_shift(pd->msg_queue,&msg); expeditor = msg.expeditor; msgtype = msg.type; payload = msg.payload; - + errcode = no_error; } else { - TRY(gras_trp_select(timeOut, &expeditor)); - TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size)); + errcode = gras_trp_select(MIN(timeOut,untiltimer), &expeditor); + if (errcode != no_error && errcode != timeout_error) + return errcode; + if (errcode != timeout_error) + TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size)); + } + + if (errcode == timeout_error && untiltimer < timeOut) { + /* A timer elapsed before the arrival of any message even if we select()ed a bit */ + untiltimer = gras_msg_timer_handle(); + if (untiltimer == 0.0) { + return no_error; + } else { + WARN1("Weird. I computed that a timer should elapse shortly, but none did (I still should wait %f sec)", + untiltimer); + return timeout_error; + } } - - /* handle it */ + + /* A message was already there or arrived in the meanwhile. handle it */ xbt_dynar_foreach(pd->cbl_list,cpt,list) { if (list->id == msgtype->code) { break; diff --git a/src/gras/Msg/msg_interface.h b/src/gras/Msg/msg_interface.h index 28816ec3f0..b16ed5a3d6 100644 --- a/src/gras/Msg/msg_interface.h +++ b/src/gras/Msg/msg_interface.h @@ -21,10 +21,13 @@ */ typedef struct { /*queue of msgs storing the ones got while msg_wait'ing for something else */ - xbt_dynar_t msg_queue; /* elm type: gras_msg_t */ + xbt_dynar_t msg_queue; /* elm type: s_gras_msg_t */ /* registered callbacks for each message */ xbt_dynar_t cbl_list; /* elm type: gras_cblist_t */ + + /* registered timers */ + xbt_dynar_t timers; /* elm type: s_gras_timer_t */ } s_gras_msg_procdata_t,*gras_msg_procdata_t; diff --git a/src/gras/Msg/msg_private.h b/src/gras/Msg/msg_private.h index b8124694a5..1739d8b4fc 100644 --- a/src/gras/Msg/msg_private.h +++ b/src/gras/Msg/msg_private.h @@ -24,6 +24,7 @@ #include "gras/virtu.h" #include "gras/messages.h" +#include "gras/timer.h" #include "gras_modinter.h" #include "gras/Msg/msg_interface.h" @@ -35,7 +36,7 @@ typedef struct { gras_msgtype_t type; void *payload; int payload_size; -} gras_msg_t; +} s_gras_msg_t, *gras_msg_t; /** * gras_msgtype_t: @@ -80,16 +81,15 @@ void gras_cblist_free(void *cbl); /* ********* * * * TIMER * * * ********* */ -typedef void (*void_f_void_t)(void); - typedef struct { double expiry; double period; void_f_void_t action; int repeat; -} *gras_timer_t; +} s_gras_timer_t, *gras_timer_t; -extern xbt_dynar_t _gras_timers; +/* returns 0 if it handled a timer, or the delay until next timer, or -1 if no armed timer */ +double gras_msg_timer_handle(void); #endif /* GRAS_MESSAGE_PRIVATE_H */ diff --git a/src/gras/Msg/timer.c b/src/gras/Msg/timer.c new file mode 100644 index 0000000000..8af69cced2 --- /dev/null +++ b/src/gras/Msg/timer.c @@ -0,0 +1,180 @@ +/* $Id$ */ + +/* timer - Delayed and repetitive actions */ + +/* Copyright (c) 2005 Martin Quinson. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + + +#include "gras/Msg/msg_private.h" +#include "gras/timer.h" +#include "gras/Virtu/virtu_interface.h" + + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_timer,gras, + "Delayed and repetitive actions"); + +/** @brief Request \a action to be called once in \a delay seconds */ +void gras_timer_delay(double delay, void_f_void_t action) { + gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg"); + + gras_timer_t timer = xbt_dynar_push_ptr(pd->timers); + + VERB1("Register delayed action %p", action); + timer->period = delay; + timer->expiry = delay+gras_os_time(); + timer->action = action; + timer->repeat = FALSE; +} + +/** @brief Request \a action to be called every \a interval seconds */ +void gras_timer_repeat(double interval, void_f_void_t action) { + gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg"); + + gras_timer_t timer = xbt_dynar_push_ptr(pd->timers); + + VERB1("Register repetitive action %p", action); + timer->period = interval; + timer->expiry = interval+gras_os_time(); + timer->action = action; + timer->repeat = TRUE; +} + +/** @brief Cancel a delayed task */ +xbt_error_t gras_timer_cancel_delay(double interval, void_f_void_t action) { + gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg"); + int cursor,found; + s_gras_timer_t timer; + + found = FALSE; + xbt_dynar_foreach(pd->timers,cursor,timer){ + if (timer.repeat == FALSE && + timer.period == interval && + timer.action == action) { + + found = TRUE; + xbt_dynar_cursor_rm(pd->timers, &cursor); + } + } + + if (!found) + RAISE2(mismatch_error,"Cannot remove the action %p delayed of %f second: not found", + action,interval); + + return no_error; +} + +/** @brief Cancel a repetitive task */ +xbt_error_t gras_timer_cancel_repeat(double interval, void_f_void_t action) { + gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg"); + int cursor,found; + s_gras_timer_t timer; + + found = FALSE; + xbt_dynar_foreach(pd->timers,cursor,timer){ + if (timer.repeat == TRUE && + timer.period == interval && + timer.action == action) { + + found = TRUE; + xbt_dynar_cursor_rm(pd->timers, &cursor); + } + } + + if (!found) + RAISE2(mismatch_error,"Cannot remove the action %p delayed of %f second: not found", + action,interval); + + return no_error; +} + +/** @brief Cancel all delayed tasks */ +xbt_error_t gras_timer_cancel_delay_all(void) { + gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg"); + int cursor, found; + s_gras_timer_t timer; + + found = FALSE; + xbt_dynar_foreach(pd->timers,cursor,timer){ + if (timer.repeat == FALSE) { + + found = TRUE; + xbt_dynar_cursor_rm(pd->timers, &cursor); + } + } + + if (!found) + RAISE0(mismatch_error,"No delayed action to remove"); + + return no_error; +} + +/** @brief Cancel all repetitive tasks */ +xbt_error_t gras_timer_cancel_repeat_all(void){ + gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg"); + int cursor, found; + s_gras_timer_t timer; + + found = FALSE; + xbt_dynar_foreach(pd->timers,cursor,timer){ + if (timer.repeat == FALSE) { + + found = TRUE; + xbt_dynar_cursor_rm(pd->timers, &cursor); + } + } + + if (!found) + RAISE0(mismatch_error,"No repetitive action to remove"); + + return no_error; +} + +/** @brief Cancel all delayed and repetitive tasks */ +void gras_timer_cancel_all(void) { + gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg"); + xbt_dynar_reset( pd->timers ); +} + + +/* returns 0 if it handled a timer, or the delay until next timer, or -1 if no armed timer */ +double gras_msg_timer_handle(void) { + gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg"); + int cursor; + gras_timer_t timer; + double now=gras_os_time(); + double untilnext = -1.0; + + for (cursor=0; cursor < xbt_dynar_length(pd->timers); cursor++) { + double untilthis; + + timer = xbt_dynar_get_ptr (pd->timers, cursor); + untilthis = timer->expiry - now; + + DEBUG2("Action %p expires in %f", timer->action, untilthis); + + if (untilthis <= 0.0) { + + DEBUG5("[%.0f] Serve %s action %p (%f<%f)",gras_os_time(), + timer->repeat ? "repetitive" : "delayed", timer->action, + timer->expiry, now); + timer->action(); + + if (timer->repeat) { + timer->expiry = now + timer->period; + DEBUG4("[%.0f] Re-arm repetitive action %p for %f (period=%f)", + gras_os_time(), + timer->action, timer->expiry, timer->period); + } else { + DEBUG2("[%.0f] Remove %p now that it's done", gras_os_time(), timer->action); + xbt_dynar_cursor_rm(pd->timers, &cursor); + } + return 0.0; + } else if (untilthis < untilnext || untilnext == -1) { + untilnext = untilthis; + } + } + return untilnext; +} diff --git a/testsuite/Makefile.am b/testsuite/Makefile.am index d66715563e..e836cda0af 100644 --- a/testsuite/Makefile.am +++ b/testsuite/Makefile.am @@ -33,7 +33,8 @@ xbt_tests = \ RL_tests = \ gras/trp_tcp_client gras/trp_tcp_server \ gras/trp_file_client gras/trp_file_server \ - gras/datadesc_usage + gras/datadesc_usage \ + gras/timer_usage SG_tests = \ surf/maxmin_usage surf/maxmin_bench \ @@ -83,8 +84,9 @@ gras_trp_tcp_client_LDADD= $(LDADD_RL) gras_trp_tcp_server_LDADD= $(LDADD_RL) gras_trp_file_client_LDADD= $(LDADD_RL) gras_trp_file_server_LDADD= $(LDADD_RL) +gras_timer_usage_LDADD= $(LDADD_RL) -gras_datadesc_usage_SOURCES= gras/datadesc_usage.c gras/datadesc_structs.c +gras_datadesc_usage_SOURCES= gras/datadesc_usage.c gras/datadesc_structs.c gras_datadesc_usage_LDADD= $(LDADD_RL) gras/datadesc_structs.c: gras/mk_datadesc_structs.pl diff --git a/testsuite/run_tests.in b/testsuite/run_tests.in index 2deaf29f4b..eb7c46a013 100755 --- a/testsuite/run_tests.in +++ b/testsuite/run_tests.in @@ -43,6 +43,7 @@ surf_TESTS="surf/maxmin_usage@EXEEXT@; \ msg_TESTS=" msg/msg_test@EXEEXT@ --surf-path=@top_srcdir@/examples/msg/;" gras_TESTS="gras/trp_tcp_usage; gras/trp_file_usage; \ + gras/timer_usage@EXEEXT@; \ gras/datadesc_usage@EXEEXT@; \ gras/datadesc_usage@EXEEXT@ --read @srcdir@/gras/datadesc.little32; \ gras/datadesc_usage@EXEEXT@ --read @srcdir@/gras/datadesc.little32_4;\ -- 2.20.1