X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/9d2a6b99f1209bc178f050c8973143e591a4727a..d177472fea36c2a8f780637477591025663d30a4:/src/gras/Msg/gras_msg_listener.c diff --git a/src/gras/Msg/gras_msg_listener.c b/src/gras/Msg/gras_msg_listener.c index 028dffecf6..94d7863e4b 100644 --- a/src/gras/Msg/gras_msg_listener.c +++ b/src/gras/Msg/gras_msg_listener.c @@ -8,46 +8,79 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "gras/Msg/msg_private.h" +#include "gras/Transport/transport_private.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read,gras_msg,"Message reader thread"); #include "xbt/ex.h" #include "xbt/queue.h" #include "xbt/synchro.h" -//#include "gras/Virtu/virtu_interface.h" -//#include "gras/DataDesc/datadesc_interface.h" #include "gras/Transport/transport_interface.h" /* gras_select */ -//#include "portable.h" /* execinfo when available to propagate exceptions */ - typedef struct s_gras_msg_listener_ { xbt_queue_t incomming_messages; + gras_socket_t wakeup_sock_listener_side; + gras_socket_t wakeup_sock_master_side; xbt_thread_t listener; } s_gras_msg_listener_t; static void listener_function(void *p) { gras_msg_listener_t me = (gras_msg_listener_t)p; s_gras_msg_t msg; - + gras_msgtype_t msg_wakeup_listener_t = gras_msgtype_by_name("_wakeup_listener"); + DEBUG0("I'm the listener"); while (1) { + DEBUG0("Selecting"); msg.expe = gras_trp_select(-1); + DEBUG0("Select returned something"); gras_msg_recv(msg.expe, &msg); - xbt_queue_push(me->incomming_messages, &msg); + if (msg.type!=msg_wakeup_listener_t) + xbt_queue_push(me->incomming_messages, &msg); + else { + DEBUG0("Asked to get awake"); + } } } gras_msg_listener_t gras_msg_listener_launch(xbt_queue_t msg_exchange){ gras_msg_listener_t arg = xbt_new0(s_gras_msg_listener_t,1); + int my_port; + DEBUG0("Launch listener"); arg->incomming_messages = msg_exchange; - arg->listener = xbt_thread_create(listener_function,&arg); + /* get a free socket for the receiving part of the listener, taking care that it does not get saved as "myport" number */ + my_port = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport; + arg->wakeup_sock_listener_side=gras_socket_server_range(5000,6000,-1,0); + ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport = my_port; + /* Connect the other part of the socket */ + arg->wakeup_sock_master_side=gras_socket_client(gras_os_myname(),gras_socket_my_port(arg->wakeup_sock_listener_side)); + + /* declare the message used to awake the listener from the master */ + gras_msgtype_declare("_wakeup_listener",gras_datadesc_by_name("char")); + + /* actually start the thread */ + arg->listener = xbt_thread_create("listener",listener_function,arg); + gras_os_sleep(0); /* TODO: useless? give the listener a chance to initialize even if the main is empty and we cancel it right afterward */ return arg; } void gras_msg_listener_shutdown(gras_msg_listener_t l) { + DEBUG0("Listener quit"); xbt_thread_cancel(l->listener); xbt_queue_free(&l->incomming_messages); xbt_free(l); } + +#include "gras/Virtu/virtu_private.h" /* procdata_t content */ +void gras_msg_listener_awake(){ + gras_procdata_t *pd; + char c = '1'; + + DEBUG0("Awaking the listener"); + pd = gras_procdata_get(); + if (pd->listener) { + gras_msg_send(pd->listener->wakeup_sock_master_side,"_wakeup_listener",&c); + } +}