memset(&msg,sizeof(msg),0);
TRY {
+ xbt_queue_shift_timed(pd->msg_received,&msg,timeout ? timeout - now + start : 0);
+ /*
msg.expe = gras_trp_select(timeout ? timeout - now + start : 0);
gras_msg_recv(msg.expe, &msg);
+ */
} CATCH(e) {
if (e.category == system_error &&
!strncmp("Socket closed by remote side",e.msg,
xbt_dynar_shift(pd->msg_queue,&msg);
} else {
TRY {
- msg.expe = gras_trp_select(timeOut);
+ xbt_queue_shift_timed(pd->msg_received,&msg,timeOut);
+// msg.expe = gras_trp_select(timeOut);
} CATCH(e) {
if (e.category != timeout_error)
RETHROW;
xbt_ex_free(e);
timeouted = 1;
}
-
- if (!timeouted) {
- TRY {
- /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
- gras_msg_recv(msg.expe, &msg);
- DEBUG1("Received a msg from the socket kind:%s",
- e_gras_msg_kind_names[msg.kind]);
-
- } CATCH(e) {
- RETHROW4("Error while receiving a message on select()ed socket %p to [%s]%s:%d: %s",
- msg.expe,
- gras_socket_peer_proc(msg.expe),gras_socket_peer_name(msg.expe),
- gras_socket_peer_port(msg.expe));
- }
- }
}
if (timeouted) {
s_gras_msg_t msg;
while (1) {
- msg.expe = gras_trp_select(-1);
+ msg.expe = gras_trp_select(1000);
gras_msg_recv(msg.expe, &msg);
xbt_queue_push(me->incomming_messages, &msg);
}
arg->incomming_messages = msg_exchange;
- arg->listener = xbt_thread_create(listener_function,&arg);
+ arg->listener = xbt_thread_create(listener_function,arg);
return arg;
}
res->timers = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
res->msg_to_receive_queue = xbt_fifo_new();
res->msg_to_receive_queue_meas = xbt_fifo_new();
+ res->msg_received = xbt_queue_new(0,sizeof(s_gras_msg_t));
return (void*)res;
}
/* queue storing the msgs that have to received and the process synchronization made (wait the surf action done) */
xbt_fifo_t msg_to_receive_queue; /* elm type: s_gras_msg_t */
xbt_fifo_t msg_to_receive_queue_meas; /* elm type: s_gras_msg_t */
+ xbt_queue_t msg_received;
+
} s_gras_msg_procdata_t,*gras_msg_procdata_t;
#include "xbt/dynar.h"
#include "gras/Virtu/virtu_interface.h"
#include "simix/simix.h"
+#include "gras/Msg/msg_private.h"
/** @brief Data for each process */
typedef struct {
int pid; /* pid of process, only for SG */
int ppid; /* ppid of process, only for SG */
+ gras_msg_listener_t listener;
} gras_procdata_t;
gras_procdata_t *gras_procdata_get(void);
#include "xbt/virtu.h" /* set the XBT virtualization to use GRAS */
#include "xbt/module.h" /* xbt_init/exit */
#include "xbt/xbt_os_time.h" /* xbt_os_time */
+#include "xbt/synchro.h"
#include "Virtu/virtu_interface.h" /* Module mechanism FIXME: deplace&rename */
+#include "Virtu/virtu_private.h"
#include "gras_modinter.h" /* module init/exit */
#include "amok/amok_modinter.h" /* module init/exit */
#include "xbt_modinter.h" /* module init/exit */
#include "gras.h"
#include "gras/process.h" /* FIXME: killme and put process_init in modinter */
-
+#include "gras/Msg/msg_private.h"
#include "portable.h" /* hexa_*(); signalling stuff */
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras,XBT_LOG_ROOT_CAT,"All GRAS categories (cf. section \ref GRAS_API)");
void gras_init(int *argc,char **argv) {
+ gras_procdata_t *pd;
+ gras_msg_procdata_t msg_pd;
VERB0("Initialize GRAS");
xbt_getpid = &gras_os_getpid;
/* and then init amok */
amok_init();
+ pd = gras_procdata_get();
+ msg_pd = gras_libdata_by_name("gras_msg");
+ pd->listener = gras_msg_listener_launch(msg_pd->msg_received);
}
void gras_exit(void) {
+ gras_procdata_t *pd;
INFO0("Exiting GRAS");
amok_exit();
gras_moddata_leave();
+ pd = gras_procdata_get();
+ gras_msg_listener_shutdown(pd->listener);
gras_process_exit();
if (--gras_running_process == 0) {
gras_msg_exit();
void SIMIX_process_set_data(smx_process_t process,void *data)
{
xbt_assert0((process != NULL), "Invalid parameters");
- xbt_assert0((process->data == NULL), "Data already set");
+ //xbt_assert0((process->data == NULL), "Data already set");
process->data = data;
smx_process_t s_process;
void_f_pvoid_t *code;
void *userparam;
+ void *father_data;
} s_xbt_thread_t;
static int xbt_thread_create_wrapper(int argc, char *argv[]) {
xbt_thread_t t = (xbt_thread_t)SIMIX_process_get_data(SIMIX_process_self());
+ SIMIX_process_set_data(SIMIX_process_self(),t->father_data);
(*t->code)(t->userparam);
return 0;
}
xbt_thread_t res = xbt_new0(s_xbt_thread_t,1);
res->userparam = param;
res->code = code;
+ res->father_data = SIMIX_process_get_data(SIMIX_process_self());
res->s_process = SIMIX_process_create(NULL,
xbt_thread_create_wrapper, res,
SIMIX_host_get_name(SIMIX_host_self()),