From: donassbr Date: Wed, 18 Jul 2007 12:29:51 +0000 (+0000) Subject: New thread to receive messages. Not working yet. X-Git-Tag: v3.3~1501 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/945f379f0d3d7f8e923ee114de2e492e1cb5486f New thread to receive messages. Not working yet. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3859 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/gras/Msg/gras_msg_exchange.c b/src/gras/Msg/gras_msg_exchange.c index 5a7b9cfa76..b47dae8e4f 100644 --- a/src/gras/Msg/gras_msg_exchange.c +++ b/src/gras/Msg/gras_msg_exchange.c @@ -92,8 +92,11 @@ gras_msg_wait_ext_(double timeout, memset(&msg,sizeof(msg),0); TRY { + xbt_queue_shift_timed(pd->msg_received,&msg,timeout ? timeout - now + start : 0); + /* msg.expe = gras_trp_select(timeout ? timeout - now + start : 0); gras_msg_recv(msg.expe, &msg); + */ } CATCH(e) { if (e.category == system_error && !strncmp("Socket closed by remote side",e.msg, @@ -332,28 +335,14 @@ gras_msg_handle(double timeOut) { xbt_dynar_shift(pd->msg_queue,&msg); } else { TRY { - msg.expe = gras_trp_select(timeOut); + xbt_queue_shift_timed(pd->msg_received,&msg,timeOut); +// msg.expe = gras_trp_select(timeOut); } CATCH(e) { if (e.category != timeout_error) RETHROW; xbt_ex_free(e); timeouted = 1; } - - if (!timeouted) { - TRY { - /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */ - gras_msg_recv(msg.expe, &msg); - DEBUG1("Received a msg from the socket kind:%s", - e_gras_msg_kind_names[msg.kind]); - - } CATCH(e) { - RETHROW4("Error while receiving a message on select()ed socket %p to [%s]%s:%d: %s", - msg.expe, - gras_socket_peer_proc(msg.expe),gras_socket_peer_name(msg.expe), - gras_socket_peer_port(msg.expe)); - } - } } if (timeouted) { diff --git a/src/gras/Msg/gras_msg_listener.c b/src/gras/Msg/gras_msg_listener.c index 028dffecf6..32dee00641 100644 --- a/src/gras/Msg/gras_msg_listener.c +++ b/src/gras/Msg/gras_msg_listener.c @@ -30,7 +30,7 @@ static void listener_function(void *p) { s_gras_msg_t msg; while (1) { - msg.expe = gras_trp_select(-1); + msg.expe = gras_trp_select(1000); gras_msg_recv(msg.expe, &msg); xbt_queue_push(me->incomming_messages, &msg); } @@ -42,7 +42,7 @@ gras_msg_listener_launch(xbt_queue_t msg_exchange){ arg->incomming_messages = msg_exchange; - arg->listener = xbt_thread_create(listener_function,&arg); + arg->listener = xbt_thread_create(listener_function,arg); return arg; } diff --git a/src/gras/Msg/gras_msg_mod.c b/src/gras/Msg/gras_msg_mod.c index e1662a3531..2ae723f198 100644 --- a/src/gras/Msg/gras_msg_mod.c +++ b/src/gras/Msg/gras_msg_mod.c @@ -30,6 +30,7 @@ static void *gras_msg_procdata_new() { res->timers = xbt_dynar_new(sizeof(s_gras_timer_t), NULL); res->msg_to_receive_queue = xbt_fifo_new(); res->msg_to_receive_queue_meas = xbt_fifo_new(); + res->msg_received = xbt_queue_new(0,sizeof(s_gras_msg_t)); return (void*)res; } diff --git a/src/gras/Msg/msg_interface.h b/src/gras/Msg/msg_interface.h index 9c53b396ab..7c726d93d0 100644 --- a/src/gras/Msg/msg_interface.h +++ b/src/gras/Msg/msg_interface.h @@ -41,6 +41,8 @@ typedef struct { /* queue storing the msgs that have to received and the process synchronization made (wait the surf action done) */ xbt_fifo_t msg_to_receive_queue; /* elm type: s_gras_msg_t */ xbt_fifo_t msg_to_receive_queue_meas; /* elm type: s_gras_msg_t */ + xbt_queue_t msg_received; + } s_gras_msg_procdata_t,*gras_msg_procdata_t; diff --git a/src/gras/Virtu/virtu_private.h b/src/gras/Virtu/virtu_private.h index 394012c658..4171d2e425 100644 --- a/src/gras/Virtu/virtu_private.h +++ b/src/gras/Virtu/virtu_private.h @@ -16,6 +16,7 @@ #include "xbt/dynar.h" #include "gras/Virtu/virtu_interface.h" #include "simix/simix.h" +#include "gras/Msg/msg_private.h" /** @brief Data for each process */ typedef struct { @@ -36,6 +37,7 @@ typedef struct { int pid; /* pid of process, only for SG */ int ppid; /* ppid of process, only for SG */ + gras_msg_listener_t listener; } gras_procdata_t; gras_procdata_t *gras_procdata_get(void); diff --git a/src/gras/gras.c b/src/gras/gras.c index 7ca7b9e55a..98c538a5dc 100644 --- a/src/gras/gras.c +++ b/src/gras/gras.c @@ -12,15 +12,17 @@ #include "xbt/virtu.h" /* set the XBT virtualization to use GRAS */ #include "xbt/module.h" /* xbt_init/exit */ #include "xbt/xbt_os_time.h" /* xbt_os_time */ +#include "xbt/synchro.h" #include "Virtu/virtu_interface.h" /* Module mechanism FIXME: deplace&rename */ +#include "Virtu/virtu_private.h" #include "gras_modinter.h" /* module init/exit */ #include "amok/amok_modinter.h" /* module init/exit */ #include "xbt_modinter.h" /* module init/exit */ #include "gras.h" #include "gras/process.h" /* FIXME: killme and put process_init in modinter */ - +#include "gras/Msg/msg_private.h" #include "portable.h" /* hexa_*(); signalling stuff */ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras,XBT_LOG_ROOT_CAT,"All GRAS categories (cf. section \ref GRAS_API)"); @@ -46,6 +48,8 @@ static void gras_sigint_handler(int sig) { void gras_init(int *argc,char **argv) { + gras_procdata_t *pd; + gras_msg_procdata_t msg_pd; VERB0("Initialize GRAS"); xbt_getpid = &gras_os_getpid; @@ -83,12 +87,18 @@ void gras_init(int *argc,char **argv) { /* and then init amok */ amok_init(); + pd = gras_procdata_get(); + msg_pd = gras_libdata_by_name("gras_msg"); + pd->listener = gras_msg_listener_launch(msg_pd->msg_received); } void gras_exit(void) { + gras_procdata_t *pd; INFO0("Exiting GRAS"); amok_exit(); gras_moddata_leave(); + pd = gras_procdata_get(); + gras_msg_listener_shutdown(pd->listener); gras_process_exit(); if (--gras_running_process == 0) { gras_msg_exit(); diff --git a/src/simix/smx_process.c b/src/simix/smx_process.c index 915bc2bd28..c41798cc3e 100644 --- a/src/simix/smx_process.c +++ b/src/simix/smx_process.c @@ -209,7 +209,7 @@ void *SIMIX_process_get_data(smx_process_t process) void SIMIX_process_set_data(smx_process_t process,void *data) { xbt_assert0((process != NULL), "Invalid parameters"); - xbt_assert0((process->data == NULL), "Data already set"); + //xbt_assert0((process->data == NULL), "Data already set"); process->data = data; diff --git a/src/xbt/xbt_sg_synchro.c b/src/xbt/xbt_sg_synchro.c index bf28586441..45ec409a1d 100644 --- a/src/xbt/xbt_sg_synchro.c +++ b/src/xbt/xbt_sg_synchro.c @@ -24,10 +24,12 @@ typedef struct s_xbt_thread_ { smx_process_t s_process; void_f_pvoid_t *code; void *userparam; + void *father_data; } s_xbt_thread_t; static int xbt_thread_create_wrapper(int argc, char *argv[]) { xbt_thread_t t = (xbt_thread_t)SIMIX_process_get_data(SIMIX_process_self()); + SIMIX_process_set_data(SIMIX_process_self(),t->father_data); (*t->code)(t->userparam); return 0; } @@ -36,6 +38,7 @@ xbt_thread_t xbt_thread_create(void_f_pvoid_t* code, void* param) { xbt_thread_t res = xbt_new0(s_xbt_thread_t,1); res->userparam = param; res->code = code; + res->father_data = SIMIX_process_get_data(SIMIX_process_self()); res->s_process = SIMIX_process_create(NULL, xbt_thread_create_wrapper, res, SIMIX_host_get_name(SIMIX_host_self()),