src/xbt/graphxml_parse.c
src/xbt/setset.c
src/xbt/parmap.c
- src/xbt/xbt_replay_trace_reader.c
+ src/xbt/xbt_replay.c
src/xbt/lib.c
src/xbt/automaton.c
src/xbt/datadesc/ddt_create.c
include/xbt/queue.h
include/xbt/setset.h
include/xbt/mmalloc.h
- include/xbt/replay_trace_reader.h
+ include/xbt/replay.h
include/xbt/parmap.h
include/xbt/automaton.h
include/xbt/automatonparse_promela.h
* C file, and the events to react to in a separate text file.
* Declare a function handling each of the events that you want to
* accept in your trace files, register them using \ref
- * MSG_action_register in your main, and then use \ref
+ * xbt_replay_action_register in your main, and then use \ref
* MSG_action_trace_run to launch the simulation. You can either
* have one trace file containing all your events, or a file per
* simulated process. Check the tesh files in the example directory
MSG_launch_application(argv[2]);
/* Action registration */
- MSG_action_register("init", action_init);
- MSG_action_register("finalize", action_finalize);
- MSG_action_register("comm_size",action_comm_size);
- MSG_action_register("send", action_send);
- MSG_action_register("Isend", action_Isend);
- MSG_action_register("recv", action_recv);
- MSG_action_register("Irecv", action_Irecv);
- MSG_action_register("wait", action_wait);
- MSG_action_register("barrier", action_barrier);
- MSG_action_register("bcast", action_bcast);
- MSG_action_register("reduce", action_reduce);
- MSG_action_register("allReduce",action_allReduce);
- MSG_action_register("sleep", action_sleep);
- MSG_action_register("compute", action_compute);
+ xbt_replay_action_register("init", action_init);
+ xbt_replay_action_register("finalize", action_finalize);
+ xbt_replay_action_register("comm_size",action_comm_size);
+ xbt_replay_action_register("send", action_send);
+ xbt_replay_action_register("Isend", action_Isend);
+ xbt_replay_action_register("recv", action_recv);
+ xbt_replay_action_register("Irecv", action_Irecv);
+ xbt_replay_action_register("wait", action_wait);
+ xbt_replay_action_register("barrier", action_barrier);
+ xbt_replay_action_register("bcast", action_bcast);
+ xbt_replay_action_register("reduce", action_reduce);
+ xbt_replay_action_register("allReduce",action_allReduce);
+ xbt_replay_action_register("sleep", action_sleep);
+ xbt_replay_action_register("compute", action_compute);
/* Actually do the simulation using MSG_action_trace_run */
double timeout);
/************************** Action handling **********************************/
-typedef void (*msg_action_fun) (const char *const *args);
-XBT_PUBLIC(void) MSG_action_register(const char *action_name,
- msg_action_fun function);
-XBT_PUBLIC(void) MSG_action_unregister(const char *action_name);
MSG_error_t MSG_action_trace_run(char *path);
#ifdef MSG_USE_DEPRECATED
typedef struct tmgr_trace *tmgr_trace_t; /**< Opaque structure defining an availability trace */
-XBT_PUBLIC(tmgr_trace_t) tmgr_trace_new(const char *filename);
+XBT_PUBLIC(tmgr_trace_t) tmgr_trace_new_from_file(const char *filename);
XBT_PUBLIC(tmgr_trace_t) tmgr_trace_new_from_string(const char *id,
const char *input,
double periodicity);
+XBT_PUBLIC(tmgr_trace_t) tmgr_trace_new_uniform(double alpha, double beta);
+XBT_PUBLIC(tmgr_trace_t) tmgr_trace_new_exponential(double lambda);
+XBT_PUBLIC(tmgr_trace_t) tmgr_trace_new_weibull(double lambda, double k);
/** Defines whether a given resource is working or not */
typedef enum {
--- /dev/null
+/* xbt/replay_reader.h -- Tools to parse a replay file */
+
+/* Copyright (c) 2010. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef XBT_REPLAY_H
+#define XBT_REPLAY_H
+#include "xbt/misc.h" /* SG_BEGIN_DECL */
+
+SG_BEGIN_DECL()
+
+typedef struct s_replay_reader *xbt_replay_reader_t;
+typedef void (*action_fun) (const char *const *args);
+
+static xbt_dict_t action_funs;
+static xbt_dict_t action_queues;
+
+/* To split the file if a unique one is given (specific variable for the other case live in runner()) */
+FILE *action_fp;
+
+
+xbt_replay_reader_t xbt_replay_reader_new(const char*filename);
+const char **xbt_replay_reader_get(xbt_replay_reader_t reader);
+void xbt_replay_reader_free(xbt_replay_reader_t *reader);
+const char *xbt_replay_reader_position(xbt_replay_reader_t reader);
+
+int xbt_replay_action_runner(int argc, char *argv[]);
+
+XBT_PUBLIC(void) xbt_replay_action_register(const char *action_name,
+ action_fun function);
+XBT_PUBLIC(void) xbt_replay_action_unregister(const char *action_name);
+
+SG_END_DECL()
+
+#endif /* XBT_REPLAY_H */
+++ /dev/null
-/* xbt/replay_trace_reader.h -- Tools to parse a replay file */
-
-/* Copyright (c) 2010. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#ifndef XBT_REPLAY_TRACE_READER_H
-#define XBT_REPLAY_TRACE_READER_H
-#include "xbt/misc.h" /* SG_BEGIN_DECL */
-
-SG_BEGIN_DECL()
-
-typedef struct s_replay_trace_reader *xbt_replay_trace_reader_t;
-
-xbt_replay_trace_reader_t xbt_replay_trace_reader_new(const char*filename);
-const char **xbt_replay_trace_reader_get(xbt_replay_trace_reader_t reader);
-void xbt_replay_trace_reader_free(xbt_replay_trace_reader_t *reader);
-const char *xbt_replay_trace_reader_position(xbt_replay_trace_reader_t reader);
-
-SG_END_DECL()
-
-#endif /* XBT_REPLAY_TRACE_READER_H */
#include "msg_private.h"
#include "xbt/str.h"
#include "xbt/dynar.h"
-#include "xbt/replay_trace_reader.h"
+#include "xbt/replay.h"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_action, msg,
"MSG actions for trace driven simulation");
-static xbt_dict_t action_funs;
-static xbt_dict_t action_queues;
-
-/* To split the file if a unique one is given (specific variable for the other case live in runner()) */
-static FILE *action_fp = NULL;
-static char *action_line = NULL;
-static size_t action_len = 0;
-
-static const char **action_get_action(char *name);
-
-/** \ingroup msg_trace_driven
- * \brief Registers a function to handle a kind of action
- *
- * Registers a function to handle a kind of action
- * This table is then used by #MSG_action_trace_run
- *
- * The argument of the function is the line describing the action, splitted on spaces with xbt_str_split_quoted()
- *
- * \param action_name the reference name of the action.
- * \param function prototype given by the type: void...(xbt_dynar_t action)
- */
-void MSG_action_register(const char *action_name, msg_action_fun function)
-{
- xbt_dict_set(action_funs, action_name, function, NULL);
-}
-
-/** \ingroup msg_trace_driven
- * \brief Unregisters a function, which handled a kind of action
- *
- * \param action_name the reference name of the action.
- */
-void MSG_action_unregister(const char *action_name)
-{
- xbt_dict_remove(action_funs, action_name);
-}
-
-static int MSG_action_runner(int argc, char *argv[])
-{
- const char **evt;
- if (action_fp) { // A unique trace file
-
- while ((evt = action_get_action(argv[0]))) {
- msg_action_fun function =
- (msg_action_fun)xbt_dict_get(action_funs, evt[1]);
- function(evt);
- free(evt);
- }
- } else { // Should have got my trace file in argument
- xbt_assert(argc >= 2,
- "No '%s' agent function provided, no simulation-wide trace file provided to MSG_action_trace_run(), "
- "and no process-wide trace file provided in deployment file. Aborting.",
- argv[0]
- );
- xbt_replay_trace_reader_t reader = xbt_replay_trace_reader_new(argv[1]);
- while ((evt=xbt_replay_trace_reader_get(reader))) {
- if (!strcmp(argv[0],evt[0])) {
- msg_action_fun function =
- (msg_action_fun)xbt_dict_get(action_funs, evt[1]);
- function(evt);
- free(evt);
- } else {
- XBT_WARN("%s: Ignore trace element not for me",
- xbt_replay_trace_reader_position(reader));
- }
- }
- xbt_replay_trace_reader_free(&reader);
- }
- return 0;
-}
void _MSG_action_init()
{
- action_funs = xbt_dict_new_homogeneous(NULL);
- action_queues = xbt_dict_new_homogeneous(NULL);
- MSG_function_register_default(MSG_action_runner);
+ _xbt_replay_action_init();
+ MSG_function_register_default(xbt_replay_action_runner);
}
void _MSG_action_exit()
{
- xbt_dict_free(&action_queues);
- xbt_dict_free(&action_funs);
+ _xbt_replay_action_exit();
}
-static const char **action_get_action(char *name)
-{
- xbt_dynar_t evt = NULL;
- char *evtname = NULL;
-
- xbt_dynar_t myqueue = xbt_dict_get_or_null(action_queues, name);
- if (myqueue == NULL || xbt_dynar_is_empty(myqueue)) { // nothing stored for me. Read the file further
-
- if (action_fp == NULL) { // File closed now. There's nothing more to read. I'm out of here
- goto todo_done;
- }
- // Read lines until I reach something for me (which breaks in loop body)
- // or end of file reached
- while (getline(&action_line, &action_len, action_fp) != -1) {
- // cleanup and split the string I just read
- char *comment = strchr(action_line, '#');
- if (comment != NULL)
- *comment = '\0';
- xbt_str_trim(action_line, NULL);
- if (action_line[0] == '\0')
- continue;
- /* we cannot split in place here because we parse&store several lines for
- * the colleagues... */
- evt = xbt_str_split_quoted(action_line);
-
- // if it's for me, I'm done
- evtname = xbt_dynar_get_as(evt, 0, char *);
- if (!strcmp(name, evtname)) {
- return xbt_dynar_to_array(evt);
- } else {
- // Else, I have to store it for the relevant colleague
- xbt_dynar_t otherqueue =
- xbt_dict_get_or_null(action_queues, evtname);
- if (otherqueue == NULL) { // Damn. Create the queue of that guy
- otherqueue =
- xbt_dynar_new(sizeof(xbt_dynar_t), xbt_dynar_free_voidp);
- xbt_dict_set(action_queues, evtname, otherqueue, NULL);
- }
- xbt_dynar_push(otherqueue, &evt);
- }
- }
- goto todo_done; // end of file reached while searching in vain for more work
- } else {
- // Get something from my queue and return it
- xbt_dynar_shift(myqueue, &evt);
- return xbt_dynar_to_array(evt);
- }
-
-
- // I did all my actions for me in the file (either I closed the file, or a colleague did)
- // Let's cleanup before leaving
-todo_done:
- if (myqueue != NULL) {
- xbt_dynar_free(&myqueue);
- xbt_dict_remove(action_queues, name);
- }
- return NULL;
-}
/** \ingroup msg_trace_driven
* \brief A trace loader
xbt_dynar_t todo;
xbt_dict_cursor_t cursor;
+ action_fp=NULL;
if (path) {
action_fp = fopen(path, "r");
xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
}
}
- free(action_line);
if (path)
fclose(action_fp);
xbt_dict_free(&action_queues);
#include "xbt/log.h"
#include "xbt/virtu.h"
#include "xbt/ex.h" /* ex_backtrace_display */
+#include "xbt/replay.h"
XBT_LOG_NEW_CATEGORY(msg, "All MSG categories");
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_kernel, msg,
trace = xbt_new0(s_surf_cpu_ti_trace_t, 1);
trace->time_points =
xbt_malloc0(sizeof(double) *
- (xbt_dynar_length(power_trace->event_list) + 1));
+ (xbt_dynar_length(power_trace->s_list.event_list) + 1));
trace->integral =
xbt_malloc0(sizeof(double) *
- (xbt_dynar_length(power_trace->event_list) + 1));
- trace->nb_points = xbt_dynar_length(power_trace->event_list);
- xbt_dynar_foreach(power_trace->event_list, cpt, val) {
+ (xbt_dynar_length(power_trace->s_list.event_list) + 1));
+ trace->nb_points = xbt_dynar_length(power_trace->s_list.event_list);
+ xbt_dynar_foreach(power_trace->s_list.event_list, cpt, val) {
trace->time_points[i] = time;
trace->integral[i] = integral;
integral += val.delta * val.value;
}
/* only one point available, fixed trace */
- if (xbt_dynar_length(power_trace->event_list) == 1) {
- xbt_dynar_get_cpy(power_trace->event_list, 0, &val);
+ if (xbt_dynar_length(power_trace->s_list.event_list) == 1) {
+ xbt_dynar_get_cpy(power_trace->s_list.event_list, 0, &val);
trace->type = TRACE_FIXED;
trace->value = val.value;
return trace;
trace->power_trace = power_trace;
/* count the total time of trace file */
- xbt_dynar_foreach(power_trace->event_list, cpt, val) {
+ xbt_dynar_foreach(power_trace->s_list.event_list, cpt, val) {
total_time += val.delta;
}
trace->trace = surf_cpu_ti_trace_new(power_trace);
if (state_trace)
cpu->state_event =
tmgr_history_add_trace(history, state_trace, 0.0, 0, cpu);
- if (power_trace && xbt_dynar_length(power_trace->event_list) > 1) {
+ if (power_trace && xbt_dynar_length(power_trace->s_list.event_list) > 1) {
/* add a fake trace event if periodicity == 0 */
- xbt_dynar_get_cpy(power_trace->event_list,
- xbt_dynar_length(power_trace->event_list) - 1, &val);
+ xbt_dynar_get_cpy(power_trace->s_list.event_list,
+ xbt_dynar_length(power_trace->s_list.event_list) - 1, &val);
if (val.delta == 0) {
empty_trace = tmgr_empty_trace_new();
cpu->power_event =
cpu->avail_trace = cpu_ti_parse_trace(trace, cpu->power_scale);
/* add a fake trace event if periodicity == 0 */
- if (trace && xbt_dynar_length(trace->event_list) > 1) {
+ if (trace && xbt_dynar_length(trace->s_list.event_list) > 1) {
s_tmgr_event_t val;
- xbt_dynar_get_cpy(trace->event_list,
- xbt_dynar_length(trace->event_list) - 1, &val);
+ xbt_dynar_get_cpy(trace->s_list.event_list,
+ xbt_dynar_length(trace->s_list.event_list) - 1, &val);
if (val.delta == 0) {
tmgr_trace_t empty_trace;
empty_trace = tmgr_empty_trace_new();
xbt_swag_insert(cpu, cpu_ti_modified_cpu);
power_trace = cpu->avail_trace->power_trace;
- xbt_dynar_get_cpy(power_trace->event_list,
- xbt_dynar_length(power_trace->event_list) - 1, &val);
+ xbt_dynar_get_cpy(power_trace->s_list.event_list,
+ xbt_dynar_length(power_trace->s_list.event_list) - 1, &val);
/* free old trace */
surf_cpu_ti_free_tmgr(cpu->avail_trace);
cpu->power_scale = val.value;
point =
surf_cpu_ti_binary_search(trace->trace->time_points, reduced_a, 0,
trace->trace->nb_points - 1);
- xbt_dynar_get_cpy(trace->power_trace->event_list, point, &val);
+ xbt_dynar_get_cpy(trace->power_trace->s_list.event_list, point, &val);
return val.value;
}
xbt_dict_set(patterns, "radical", bprintf("%d", i), NULL);
char *avail_file = xbt_str_varsubst(cluster->availability_trace, patterns);
XBT_DEBUG("\tavailability_file=\"%s\"", avail_file);
- host.power_trace = tmgr_trace_new(avail_file);
+ host.power_trace = tmgr_trace_new_from_file(avail_file);
xbt_free(avail_file);
} else {
XBT_DEBUG("\tavailability_file=\"\"");
if (strcmp(cluster->state_trace, "")) {
char *avail_file = xbt_str_varsubst(cluster->state_trace, patterns);
XBT_DEBUG("\tstate_file=\"%s\"", avail_file);
- host.state_trace = tmgr_trace_new(avail_file);
+ host.state_trace = tmgr_trace_new_from_file(avail_file);
xbt_free(avail_file);
} else {
XBT_DEBUG("\tstate_file=\"\"");
host.power_peak = get_cpu_power(A_surfxml_host_power);
host.power_scale = surf_parse_get_double( A_surfxml_host_availability);
host.core_amount = surf_parse_get_int(A_surfxml_host_core);
- host.power_trace = tmgr_trace_new(A_surfxml_host_availability_file);
- host.state_trace = tmgr_trace_new(A_surfxml_host_state_file);
+ host.power_trace = tmgr_trace_new_from_file(A_surfxml_host_availability_file);
+ host.state_trace = tmgr_trace_new_from_file(A_surfxml_host_state_file);
xbt_assert((A_surfxml_host_state == A_surfxml_host_state_ON) ||
(A_surfxml_host_state == A_surfxml_host_state_OFF), "Invalid state");
if (A_surfxml_host_state == A_surfxml_host_state_ON)
peer.bw_out = surf_parse_get_double(A_surfxml_peer_bw_out);
peer.lat = surf_parse_get_double(A_surfxml_peer_lat);
peer.coord = A_surfxml_peer_coordinates;
- peer.availability_trace = tmgr_trace_new(A_surfxml_peer_availability_file);
- peer.state_trace = tmgr_trace_new(A_surfxml_peer_state_file);
+ peer.availability_trace = tmgr_trace_new_from_file(A_surfxml_peer_availability_file);
+ peer.state_trace = tmgr_trace_new_from_file(A_surfxml_peer_state_file);
surfxml_call_cb_functions(STag_surfxml_peer_cb_list);
sg_platf_new_peer(&peer);
link.id = A_surfxml_link_id;
link.bandwidth = surf_parse_get_double(A_surfxml_link_bandwidth);
- link.bandwidth_trace = tmgr_trace_new(A_surfxml_link_bandwidth_file);
+ link.bandwidth_trace = tmgr_trace_new_from_file(A_surfxml_link_bandwidth_file);
link.latency = surf_parse_get_double(A_surfxml_link_latency);
- link.latency_trace = tmgr_trace_new(A_surfxml_link_latency_file);
+ link.latency_trace = tmgr_trace_new_from_file(A_surfxml_link_latency_file);
switch (A_surfxml_link_state) {
case A_surfxml_link_state_ON:
surf_parse_error("invalid state for link %s", link.id);
break;
}
- link.state_trace = tmgr_trace_new(A_surfxml_link_state_file);
+ link.state_trace = tmgr_trace_new_from_file(A_surfxml_link_state_file);
switch (A_surfxml_link_sharing_policy) {
case A_surfxml_link_sharing_policy_SHARED:
{
tmgr_trace_t trace;
if (!trace_file || strcmp(trace_file, "") != 0) {
- trace = tmgr_trace_new(trace_file);
+ trace = tmgr_trace_new_from_file(trace_file);
} else if (strcmp(surfxml_pcdata, "") == 0) {
trace = NULL;
} else {
free(h);
}
+tmgr_trace_t tmgr_trace_new_uniform(double alpha, double beta)
+{
+ tmgr_trace_t trace = NULL;
+
+ trace = xbt_new0(s_tmgr_trace_t, 1);
+ trace->type = e_trace_uniform;
+ trace->s_uniform.alpha = alpha;
+ trace->s_uniform.beta = beta;
+
+ //FIXME Generate a new event date
+
+ return trace;
+}
+
+
+tmgr_trace_t tmgr_trace_new_exponential(double lambda)
+{
+ tmgr_trace_t trace = NULL;
+
+ trace = xbt_new0(s_tmgr_trace_t, 1);
+ trace->type = e_trace_exponential;
+ trace->s_exponential.lambda = lambda;
+
+ // FIXME Generate a new event date
+
+ return trace;
+}
+
+tmgr_trace_t tmgr_trace_new_weibull(double lambda, double k)
+{
+ tmgr_trace_t trace = NULL;
+
+ trace = xbt_new0(s_tmgr_trace_t, 1);
+ trace->type = e_trace_weibull;
+ trace->s_weibull.lambda = lambda;
+ trace->s_weibull.k = k;
+
+ // FIXME Generate a new event date
+
+ return trace;
+}
+
tmgr_trace_t tmgr_trace_new_from_string(const char *id, const char *input,
double periodicity)
{
"Invalid periodicity %lg (must be positive)", periodicity);
trace = xbt_new0(s_tmgr_trace_t, 1);
- trace->event_list = xbt_dynar_new(sizeof(s_tmgr_event_t), NULL);
+ trace->type = e_trace_list;
+ trace->s_list.event_list = xbt_dynar_new(sizeof(s_tmgr_event_t), NULL);
list = xbt_str_split(input, "\n\r");
}
last_event->delta = event.delta - last_event->delta;
}
- xbt_dynar_push(trace->event_list, &event);
+ xbt_dynar_push(trace->s_list.event_list, &event);
last_event =
- xbt_dynar_get_ptr(trace->event_list,
- xbt_dynar_length(trace->event_list) - 1);
+ xbt_dynar_get_ptr(trace->s_list.event_list,
+ xbt_dynar_length(trace->s_list.event_list) - 1);
}
if (last_event)
last_event->delta = periodicity;
return trace;
}
-tmgr_trace_t tmgr_trace_new(const char *filename)
+tmgr_trace_t tmgr_trace_new_from_file(const char *filename)
{
char *tstr = NULL;
FILE *f = NULL;
s_tmgr_event_t event;
trace = xbt_new0(s_tmgr_trace_t, 1);
- trace->event_list = xbt_dynar_new(sizeof(s_tmgr_event_t), NULL);
+ trace->s_list.event_list = xbt_dynar_new(sizeof(s_tmgr_event_t), NULL);
event.delta = 0.0;
event.value = 0.0;
- xbt_dynar_push(trace->event_list, &event);
+ xbt_dynar_push(trace->s_list.event_list, &event);
return trace;
}
{
if (!trace)
return;
- xbt_dynar_free(&(trace->event_list));
+ xbt_dynar_free(&(trace->s_list.event_list));
free(trace);
}
trace_event->idx = offset;
trace_event->model = model;
- xbt_assert((trace_event->idx < xbt_dynar_length(trace->event_list)),
+ xbt_assert((trace_event->idx < xbt_dynar_length(trace->s_list.event_list)),
"You're referring to an event that does not exist!");
xbt_heap_push(h->heap, trace_event, start_time);
return NULL;
trace = trace_event->trace;
- event = xbt_dynar_get_ptr(trace->event_list, trace_event->idx);
+ event = xbt_dynar_get_ptr(trace->s_list.event_list, trace_event->idx);
*value = event->value;
*model = trace_event->model;
- if (trace_event->idx < xbt_dynar_length(trace->event_list) - 1) {
+ if (trace_event->idx < xbt_dynar_length(trace->s_list.event_list) - 1) {
xbt_heap_push(h->heap, trace_event, event_date + event->delta);
trace_event->idx++;
} else if (event->delta > 0) { /* Last element, checking for periodicity */
double value;
} s_tmgr_event_t, *tmgr_event_t;
+enum e_trace_type {
+ e_trace_list, e_trace_uniform, e_trace_exponential, e_trace_weibull
+};
+
typedef struct tmgr_trace {
- xbt_dynar_t event_list;
+ enum e_trace_type type;
+ union {
+ struct {
+ xbt_dynar_t event_list;
+ } s_list;
+ struct {
+ double alpha;
+ double beta;
+ s_tmgr_event_t next_event;
+ /* and probably other things */
+ } s_uniform;
+ struct {
+ double lambda;
+ s_tmgr_event_t next_event;
+ /* and probably other things */
+ } s_exponential;
+ struct {
+ double lambda;
+ double k;
+ s_tmgr_event_t next_event;
+ /* and probably other things */
+ } s_weibull;
+ };
} s_tmgr_trace_t;
+
typedef struct tmgr_trace_event {
tmgr_trace_t trace;
unsigned int idx;
--- /dev/null
+/* Copyright (c) 2010. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#include "simgrid_config.h" //For getline, keep that include first
+
+#include "gras_config.h"
+#include <errno.h>
+#include "xbt/sysdep.h"
+#include "xbt/log.h"
+#include "xbt/str.h"
+#include "xbt/replay.h"
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(replay,xbt,"Replay trace reader");
+
+typedef struct s_replay_reader {
+ FILE *fp;
+ char *line;
+ size_t line_len;
+ char *position; /* stable storage */
+ char *filename; int linenum;
+} s_xbt_replay_reader_t;
+
+static char *action_line = NULL;
+static size_t action_len = 0;
+
+
+static const char **action_get_action(char *name);
+
+xbt_replay_reader_t xbt_replay_reader_new(const char *filename)
+{
+ xbt_replay_reader_t res = xbt_new0(s_xbt_replay_reader_t,1);
+ res->fp = fopen(filename, "r");
+ xbt_assert(res->fp != NULL, "Cannot open %s: %s", filename,
+ strerror(errno));
+ res->filename = xbt_strdup(filename);
+ return res;
+}
+
+const char *xbt_replay_reader_position(xbt_replay_reader_t reader)
+{
+ free(reader->position);
+ reader->position = bprintf("%s:%d",reader->filename,reader->linenum);
+ return reader->position;
+}
+
+const char **xbt_replay_reader_get(xbt_replay_reader_t reader)
+{
+ ssize_t read;
+ xbt_dynar_t d;
+ read = getline(&reader->line, &reader->line_len, reader->fp);
+ //XBT_INFO("got from trace: %s",reader->line);
+ reader->linenum++;
+ if (read==-1)
+ return NULL; /* end of file */
+ char *comment = strchr(reader->line, '#');
+ if (comment != NULL)
+ *comment = '\0';
+ xbt_str_trim(reader->line, NULL);
+ if (reader->line[0] == '\0')
+ return xbt_replay_reader_get(reader); /* Get next line */
+
+ d=xbt_str_split_quoted_in_place(reader->line);
+ if (xbt_dynar_is_empty(d)) {
+ xbt_dynar_free(&d);
+ return xbt_replay_reader_get(reader); /* Get next line */
+ }
+ return xbt_dynar_to_array(d);
+}
+
+void xbt_replay_reader_free(xbt_replay_reader_t *reader)
+{
+ free((*reader)->filename);
+ free((*reader)->position);
+ fclose((*reader)->fp);
+ free((*reader)->line);
+ free(*reader);
+ *reader=NULL;
+}
+
+/** \ingroup xbt_replay
+ * \brief Registers a function to handle a kind of action
+ *
+ * Registers a function to handle a kind of action
+ * This table is then used by #xbt_replay_action_run
+ *
+ * The argument of the function is the line describing the action, splitted on spaces with xbt_str_split_quoted()
+ *
+ * \param action_name the reference name of the action.
+ * \param function prototype given by the type: void...(xbt_dynar_t action)
+ */
+void xbt_replay_action_register(const char *action_name, action_fun function)
+{
+ xbt_dict_set(action_funs, action_name, function, NULL);
+}
+
+/** \ingroup xbt_replay
+ * \brief Unregisters a function, which handled a kind of action
+ *
+ * \param action_name the reference name of the action.
+ */
+void xbt_replay_action_unregister(const char *action_name)
+{
+ xbt_dict_remove(action_funs, action_name);
+}
+
+void _xbt_replay_action_init()
+{
+ action_funs = xbt_dict_new_homogeneous(NULL);
+ action_queues = xbt_dict_new_homogeneous(NULL);
+}
+
+void _xbt_replay_action_exit()
+{
+ xbt_dict_free(&action_queues);
+ xbt_dict_free(&action_funs);
+ free(action_line);
+}
+
+int xbt_replay_action_runner(int argc, char *argv[])
+{
+ const char **evt;
+ if (action_fp) { // A unique trace file
+
+ while ((evt = action_get_action(argv[0]))) {
+ action_fun function =
+ (action_fun)xbt_dict_get(action_funs, evt[1]);
+ function(evt);
+ free(evt);
+ }
+ } else { // Should have got my trace file in argument
+ xbt_assert(argc >= 2,
+ "No '%s' agent function provided, no simulation-wide trace file provided, "
+ "and no process-wide trace file provided in deployment file. Aborting.",
+ argv[0]
+ );
+ xbt_replay_reader_t reader = xbt_replay_reader_new(argv[1]);
+ while ((evt=xbt_replay_reader_get(reader))) {
+ if (!strcmp(argv[0],evt[0])) {
+ action_fun function =
+ (action_fun)xbt_dict_get(action_funs, evt[1]);
+ function(evt);
+ free(evt);
+ } else {
+ XBT_WARN("%s: Ignore trace element not for me",
+ xbt_replay_reader_position(reader));
+ }
+ }
+ xbt_replay_reader_free(&reader);
+ }
+ return 0;
+}
+
+
+static const char **action_get_action(char *name)
+{
+ xbt_dynar_t evt = NULL;
+ char *evtname = NULL;
+
+ xbt_dynar_t myqueue = xbt_dict_get_or_null(action_queues, name);
+ if (myqueue == NULL || xbt_dynar_is_empty(myqueue)) { // nothing stored for me. Read the file further
+
+ if (action_fp == NULL) { // File closed now. There's nothing more to read. I'm out of here
+ goto todo_done;
+ }
+ // Read lines until I reach something for me (which breaks in loop body)
+ // or end of file reached
+ while (getline(&action_line, &action_len, action_fp) != -1) {
+ // cleanup and split the string I just read
+ char *comment = strchr(action_line, '#');
+ if (comment != NULL)
+ *comment = '\0';
+ xbt_str_trim(action_line, NULL);
+ if (action_line[0] == '\0')
+ continue;
+ /* we cannot split in place here because we parse&store several lines for
+ * the colleagues... */
+ evt = xbt_str_split_quoted(action_line);
+
+ // if it's for me, I'm done
+ evtname = xbt_dynar_get_as(evt, 0, char *);
+ if (!strcmp(name, evtname)) {
+ return xbt_dynar_to_array(evt);
+ } else {
+ // Else, I have to store it for the relevant colleague
+ xbt_dynar_t otherqueue =
+ xbt_dict_get_or_null(action_queues, evtname);
+ if (otherqueue == NULL) { // Damn. Create the queue of that guy
+ otherqueue =
+ xbt_dynar_new(sizeof(xbt_dynar_t), xbt_dynar_free_voidp);
+ xbt_dict_set(action_queues, evtname, otherqueue, NULL);
+ }
+ xbt_dynar_push(otherqueue, &evt);
+ }
+ }
+ goto todo_done; // end of file reached while searching in vain for more work
+ } else {
+ // Get something from my queue and return it
+ xbt_dynar_shift(myqueue, &evt);
+ return xbt_dynar_to_array(evt);
+ }
+
+
+ // I did all my actions for me in the file (either I closed the file, or a colleague did)
+ // Let's cleanup before leaving
+todo_done:
+ if (myqueue != NULL) {
+ xbt_dynar_free(&myqueue);
+ xbt_dict_remove(action_queues, name);
+ }
+ return NULL;
+}
+++ /dev/null
-/* Copyright (c) 2010. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-#include "simgrid_config.h" //For getline, keep that include first
-
-#include "gras_config.h"
-#include <errno.h>
-#include "xbt/sysdep.h"
-#include "xbt/log.h"
-#include "xbt/str.h"
-#include "xbt/replay_trace_reader.h"
-
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(replay,xbt,"Replay trace reader");
-
-typedef struct s_replay_trace_reader {
- FILE *fp;
- char *line;
- size_t line_len;
- char *position; /* stable storage */
- char *filename; int linenum;
-} s_xbt_replay_trace_reader_t;
-
-xbt_replay_trace_reader_t xbt_replay_trace_reader_new(const char *filename)
-{
- xbt_replay_trace_reader_t res = xbt_new0(s_xbt_replay_trace_reader_t,1);
- res->fp = fopen(filename, "r");
- xbt_assert(res->fp != NULL, "Cannot open %s: %s", filename,
- strerror(errno));
- res->filename = xbt_strdup(filename);
- return res;
-}
-
-const char *xbt_replay_trace_reader_position(xbt_replay_trace_reader_t reader)
-{
- free(reader->position);
- reader->position = bprintf("%s:%d",reader->filename,reader->linenum);
- return reader->position;
-}
-
-const char **xbt_replay_trace_reader_get(xbt_replay_trace_reader_t reader)
-{
- ssize_t read;
- xbt_dynar_t d;
- read = getline(&reader->line, &reader->line_len, reader->fp);
- //XBT_INFO("got from trace: %s",reader->line);
- reader->linenum++;
- if (read==-1)
- return NULL; /* end of file */
- char *comment = strchr(reader->line, '#');
- if (comment != NULL)
- *comment = '\0';
- xbt_str_trim(reader->line, NULL);
- if (reader->line[0] == '\0')
- return xbt_replay_trace_reader_get(reader); /* Get next line */
-
- d=xbt_str_split_quoted_in_place(reader->line);
- if (xbt_dynar_is_empty(d)) {
- xbt_dynar_free(&d);
- return xbt_replay_trace_reader_get(reader); /* Get next line */
- }
- return xbt_dynar_to_array(d);
-}
-
-void xbt_replay_trace_reader_free(xbt_replay_trace_reader_t *reader)
-{
- free((*reader)->filename);
- free((*reader)->position);
- fclose((*reader)->fp);
- free((*reader)->line);
- free(*reader);
- *reader=NULL;
-}
void test(void)
{
tmgr_history_t history = tmgr_history_new();
- tmgr_trace_t trace_A = tmgr_trace_new("trace_A.txt");
- tmgr_trace_t trace_B = tmgr_trace_new("trace_B.txt");
+ tmgr_trace_t trace_A = tmgr_trace_new_from_file("trace_A.txt");
+ tmgr_trace_t trace_B = tmgr_trace_new_from_file("trace_B.txt");
double next_event_date = -1.0;
double value = -1.0;
char *resource = NULL;