- Cleanup the known architecture table. Reorder the entries to group what
should be, and use a more consistant naming scheme.
(some of the test dataset are still to be regenerated)
- - Allow library to register globals on each process just as userdata does.
+ - New! Allow library to register globals on each process just as userdata
+ does.
This is implemented using a xbt_dict and not a xbt_set, so we loose the
lookup time (for now).
Use it in msg and trp.
This cleans a lot the internals and helps enforcing privacy of the
headers between the gras components.
+ - New! Add a timer mecanism, not unlike cron(8) and at(1).
- Bugfix: gras_os_time was delirious in RL.
- - Reenable GRAS
+ - Bugfix: gras_trp_select/RL don't run into the wall when asked to select
+ onto 0 sockets.
+ - Reenable GRAS now that it works.
--
gras/datadesc.h gras/transport.h \
gras/virtu.h gras/cond.h gras/process.h \
\
- gras/messages.h \
+ gras/messages.h gras/timer.h\
\
amok/base.h \
amok/bandwidth.h
#include <gras/transport.h>
#include <gras/datadesc.h>
#include <gras/messages.h>
+#include <gras/timer.h>
#endif /* GRAS_H */
--- /dev/null
+/* $Id$ */
+
+/* timer - delayed and repetitive tasks */
+/* module's public interface exported to end user. */
+
+/* Copyright (c) 2005 Martin Quinson. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef GRAS_TIMER_H
+#define GRAS_TIMER_H
+
+#include "xbt/misc.h"
+
+BEGIN_DECL()
+
+/** @addtogroup GRAS_timer
+ * @brief Delayed and repetitive tasks (Communication facility)
+ *
+ * This is how to have a specific function called only once after the
+ * specified amount of time or a function executed every 5 mn until it gets
+ * removed. In the UNIX world, this is comparable to <tt>at</tt> and
+ * <tt>cron</tt>.
+ *
+ * Note that this is very soft timers: the execution of the processes won't
+ * get interrupted at all. This is on purpose: the GRAS programming model
+ * is distributed sequential, so that users don't have to deal with mutexes
+ * and such within a specific process.
+ *
+ * Timers are served by the gras_handle() function: if there is an elapsed
+ * timer, the associated code gets executed before any incomming connexion
+ * are checked.
+ *
+ * @{
+ */
+
+ typedef void (*void_f_void_t)(void);
+
+ void gras_timer_delay(double delay, void_f_void_t action);
+ void gras_timer_repeat(double interval, void_f_void_t action);
+
+ xbt_error_t gras_timer_cancel_delay(double interval, void_f_void_t action);
+ xbt_error_t gras_timer_cancel_repeat(double interval, void_f_void_t action);
+
+ xbt_error_t gras_timer_cancel_delay_all(void);
+ xbt_error_t gras_timer_cancel_repeat_all(void);
+
+ void gras_timer_cancel_all(void);
+
+/** @} */
+
+END_DECL()
+
+#endif /* GRAS_TIMER_H */
gras/DataDesc/datadesc_interface.h gras/DataDesc/datadesc_private.h \
gras/DataDesc/ddt_parse.c gras/DataDesc/ddt_parse.yy.c gras/DataDesc/ddt_parse.yy.h \
\
- gras/Msg/msg.c \
+ gras/Msg/msg.c gras/Msg/timer.c \
gras/Msg/msg_interface.h gras/Msg/msg_private.h \
\
gras/Virtu/process.c
-#gras/Msg/timer.c
-
RL_SRC= \
gras/Transport/rl_transport.c gras/Transport/transport_plugin_tcp.c gras/Transport/transport_plugin_file.c \
\
#include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
#include "gras/Virtu/virtu_interface.h"
+#define MIN(a,b) ((a) < (b) ? (a) : (b))
+
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
xbt_set_t _gras_msgtype_set = NULL;
static void *gras_msg_procdata_new() {
gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1);
- res->msg_queue = xbt_dynar_new(sizeof(gras_msg_t), NULL);
+ res->msg_queue = xbt_dynar_new(sizeof(s_gras_msg_t), NULL);
res->cbl_list = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
+ res->timers = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
return (void*)res;
}
xbt_dynar_free(&( res->msg_queue ));
xbt_dynar_free(&( res->cbl_list ));
+ xbt_dynar_free(&( res->timers ));
}
/*
double start, now;
gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
int cpt;
- gras_msg_t msg;
+ s_gras_msg_t msg;
*expeditor = NULL;
payload_got = NULL;
xbt_error_t
gras_msg_handle(double timeOut) {
+ double untiltimer;
+
xbt_error_t errcode;
int cpt;
- gras_msg_t msg;
+ s_gras_msg_t msg;
gras_socket_t expeditor;
void *payload=NULL;
int payload_size;
VERB1("Handling message within the next %.2fs",timeOut);
+ untiltimer = gras_msg_timer_handle();
+ DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer);
+ if (untiltimer == 0.0) {
+ /* A timer was already elapsed */
+ return no_error;
+ }
+
/* get a message (from the queue or from the net) */
if (xbt_dynar_length(pd->msg_queue)) {
xbt_dynar_shift(pd->msg_queue,&msg);
expeditor = msg.expeditor;
msgtype = msg.type;
payload = msg.payload;
-
+ errcode = no_error;
} else {
- TRY(gras_trp_select(timeOut, &expeditor));
- TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+ errcode = gras_trp_select(MIN(timeOut,untiltimer), &expeditor);
+ if (errcode != no_error && errcode != timeout_error)
+ return errcode;
+ if (errcode != timeout_error)
+ TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+ }
+
+ if (errcode == timeout_error && untiltimer < timeOut) {
+ /* A timer elapsed before the arrival of any message even if we select()ed a bit */
+ untiltimer = gras_msg_timer_handle();
+ if (untiltimer == 0.0) {
+ return no_error;
+ } else {
+ WARN1("Weird. I computed that a timer should elapse shortly, but none did (I still should wait %f sec)",
+ untiltimer);
+ return timeout_error;
+ }
}
-
- /* handle it */
+
+ /* A message was already there or arrived in the meanwhile. handle it */
xbt_dynar_foreach(pd->cbl_list,cpt,list) {
if (list->id == msgtype->code) {
break;
*/
typedef struct {
/*queue of msgs storing the ones got while msg_wait'ing for something else */
- xbt_dynar_t msg_queue; /* elm type: gras_msg_t */
+ xbt_dynar_t msg_queue; /* elm type: s_gras_msg_t */
/* registered callbacks for each message */
xbt_dynar_t cbl_list; /* elm type: gras_cblist_t */
+
+ /* registered timers */
+ xbt_dynar_t timers; /* elm type: s_gras_timer_t */
} s_gras_msg_procdata_t,*gras_msg_procdata_t;
#include "gras/virtu.h"
#include "gras/messages.h"
+#include "gras/timer.h"
#include "gras_modinter.h"
#include "gras/Msg/msg_interface.h"
gras_msgtype_t type;
void *payload;
int payload_size;
-} gras_msg_t;
+} s_gras_msg_t, *gras_msg_t;
/**
* gras_msgtype_t:
/* ********* *
* * TIMER * *
* ********* */
-typedef void (*void_f_void_t)(void);
-
typedef struct {
double expiry;
double period;
void_f_void_t action;
int repeat;
-} *gras_timer_t;
+} s_gras_timer_t, *gras_timer_t;
-extern xbt_dynar_t _gras_timers;
+/* returns 0 if it handled a timer, or the delay until next timer, or -1 if no armed timer */
+double gras_msg_timer_handle(void);
#endif /* GRAS_MESSAGE_PRIVATE_H */
--- /dev/null
+/* $Id$ */
+
+/* timer - Delayed and repetitive actions */
+
+/* Copyright (c) 2005 Martin Quinson. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+
+#include "gras/Msg/msg_private.h"
+#include "gras/timer.h"
+#include "gras/Virtu/virtu_interface.h"
+
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_timer,gras,
+ "Delayed and repetitive actions");
+
+/** @brief Request \a action to be called once in \a delay seconds */
+void gras_timer_delay(double delay, void_f_void_t action) {
+ gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+
+ gras_timer_t timer = xbt_dynar_push_ptr(pd->timers);
+
+ VERB1("Register delayed action %p", action);
+ timer->period = delay;
+ timer->expiry = delay+gras_os_time();
+ timer->action = action;
+ timer->repeat = FALSE;
+}
+
+/** @brief Request \a action to be called every \a interval seconds */
+void gras_timer_repeat(double interval, void_f_void_t action) {
+ gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+
+ gras_timer_t timer = xbt_dynar_push_ptr(pd->timers);
+
+ VERB1("Register repetitive action %p", action);
+ timer->period = interval;
+ timer->expiry = interval+gras_os_time();
+ timer->action = action;
+ timer->repeat = TRUE;
+}
+
+/** @brief Cancel a delayed task */
+xbt_error_t gras_timer_cancel_delay(double interval, void_f_void_t action) {
+ gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+ int cursor,found;
+ s_gras_timer_t timer;
+
+ found = FALSE;
+ xbt_dynar_foreach(pd->timers,cursor,timer){
+ if (timer.repeat == FALSE &&
+ timer.period == interval &&
+ timer.action == action) {
+
+ found = TRUE;
+ xbt_dynar_cursor_rm(pd->timers, &cursor);
+ }
+ }
+
+ if (!found)
+ RAISE2(mismatch_error,"Cannot remove the action %p delayed of %f second: not found",
+ action,interval);
+
+ return no_error;
+}
+
+/** @brief Cancel a repetitive task */
+xbt_error_t gras_timer_cancel_repeat(double interval, void_f_void_t action) {
+ gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+ int cursor,found;
+ s_gras_timer_t timer;
+
+ found = FALSE;
+ xbt_dynar_foreach(pd->timers,cursor,timer){
+ if (timer.repeat == TRUE &&
+ timer.period == interval &&
+ timer.action == action) {
+
+ found = TRUE;
+ xbt_dynar_cursor_rm(pd->timers, &cursor);
+ }
+ }
+
+ if (!found)
+ RAISE2(mismatch_error,"Cannot remove the action %p delayed of %f second: not found",
+ action,interval);
+
+ return no_error;
+}
+
+/** @brief Cancel all delayed tasks */
+xbt_error_t gras_timer_cancel_delay_all(void) {
+ gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+ int cursor, found;
+ s_gras_timer_t timer;
+
+ found = FALSE;
+ xbt_dynar_foreach(pd->timers,cursor,timer){
+ if (timer.repeat == FALSE) {
+
+ found = TRUE;
+ xbt_dynar_cursor_rm(pd->timers, &cursor);
+ }
+ }
+
+ if (!found)
+ RAISE0(mismatch_error,"No delayed action to remove");
+
+ return no_error;
+}
+
+/** @brief Cancel all repetitive tasks */
+xbt_error_t gras_timer_cancel_repeat_all(void){
+ gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+ int cursor, found;
+ s_gras_timer_t timer;
+
+ found = FALSE;
+ xbt_dynar_foreach(pd->timers,cursor,timer){
+ if (timer.repeat == FALSE) {
+
+ found = TRUE;
+ xbt_dynar_cursor_rm(pd->timers, &cursor);
+ }
+ }
+
+ if (!found)
+ RAISE0(mismatch_error,"No repetitive action to remove");
+
+ return no_error;
+}
+
+/** @brief Cancel all delayed and repetitive tasks */
+void gras_timer_cancel_all(void) {
+ gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+ xbt_dynar_reset( pd->timers );
+}
+
+
+/* returns 0 if it handled a timer, or the delay until next timer, or -1 if no armed timer */
+double gras_msg_timer_handle(void) {
+ gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
+ int cursor;
+ gras_timer_t timer;
+ double now=gras_os_time();
+ double untilnext = -1.0;
+
+ for (cursor=0; cursor < xbt_dynar_length(pd->timers); cursor++) {
+ double untilthis;
+
+ timer = xbt_dynar_get_ptr (pd->timers, cursor);
+ untilthis = timer->expiry - now;
+
+ DEBUG2("Action %p expires in %f", timer->action, untilthis);
+
+ if (untilthis <= 0.0) {
+
+ DEBUG5("[%.0f] Serve %s action %p (%f<%f)",gras_os_time(),
+ timer->repeat ? "repetitive" : "delayed", timer->action,
+ timer->expiry, now);
+ timer->action();
+
+ if (timer->repeat) {
+ timer->expiry = now + timer->period;
+ DEBUG4("[%.0f] Re-arm repetitive action %p for %f (period=%f)",
+ gras_os_time(),
+ timer->action, timer->expiry, timer->period);
+ } else {
+ DEBUG2("[%.0f] Remove %p now that it's done", gras_os_time(), timer->action);
+ xbt_dynar_cursor_rm(pd->timers, &cursor);
+ }
+ return 0.0;
+ } else if (untilthis < untilnext || untilnext == -1) {
+ untilnext = untilthis;
+ }
+ }
+ return untilnext;
+}
RL_tests = \
gras/trp_tcp_client gras/trp_tcp_server \
gras/trp_file_client gras/trp_file_server \
- gras/datadesc_usage
+ gras/datadesc_usage \
+ gras/timer_usage
SG_tests = \
surf/maxmin_usage surf/maxmin_bench \
gras_trp_tcp_server_LDADD= $(LDADD_RL)
gras_trp_file_client_LDADD= $(LDADD_RL)
gras_trp_file_server_LDADD= $(LDADD_RL)
+gras_timer_usage_LDADD= $(LDADD_RL)
-gras_datadesc_usage_SOURCES= gras/datadesc_usage.c gras/datadesc_structs.c
+gras_datadesc_usage_SOURCES= gras/datadesc_usage.c gras/datadesc_structs.c
gras_datadesc_usage_LDADD= $(LDADD_RL)
gras/datadesc_structs.c: gras/mk_datadesc_structs.pl
msg_TESTS=" msg/msg_test@EXEEXT@ --surf-path=@top_srcdir@/examples/msg/;"
gras_TESTS="gras/trp_tcp_usage; gras/trp_file_usage; \
+ gras/timer_usage@EXEEXT@; \
gras/datadesc_usage@EXEEXT@; \
gras/datadesc_usage@EXEEXT@ --read @srcdir@/gras/datadesc.little32; \
gras/datadesc_usage@EXEEXT@ --read @srcdir@/gras/datadesc.little32_4;\