From 9d2a6b99f1209bc178f050c8973143e591a4727a Mon Sep 17 00:00:00 2001 From: mquinson Date: Tue, 17 Jul 2007 14:08:26 +0000 Subject: [PATCH] Add a thread to listen the network. Not used so far git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3832 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/Makefile.am | 2 +- src/Makefile.in | 41 ++++++++++++++---------- src/gras/Msg/gras_msg_listener.c | 53 ++++++++++++++++++++++++++++++++ src/gras/Msg/msg_private.h | 9 ++++++ 4 files changed, 88 insertions(+), 17 deletions(-) create mode 100644 src/gras/Msg/gras_msg_listener.c diff --git a/src/Makefile.am b/src/Makefile.am index 5685d01247..36eea9f4f7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -210,7 +210,7 @@ GRAS_COMMON_SRC= \ gras/gras.c \ gras/Transport/transport.c gras/Transport/transport_private.h \ gras/Msg/gras_msg_mod.c gras/Msg/gras_msg_types.c \ - gras/Msg/gras_msg_exchange.c \ + gras/Msg/gras_msg_exchange.c gras/Msg/gras_msg_listener.c \ gras/Msg/rpc.c gras/Msg/timer.c \ gras/Msg/msg_interface.h gras/Msg/msg_private.h \ \ diff --git a/src/Makefile.in b/src/Makefile.in index 2d2964a844..ad36bbd590 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -83,9 +83,10 @@ am__objects_1 = snprintf.lo xbt_str.lo ex.lo xbt_virtu.lo \ xbt_matrix.lo xbt_queue.lo xbt_peer.lo xbt_main.lo config.lo \ cunit.lo graphxml_parse.lo am__objects_2 = gras.lo transport.lo gras_msg_mod.lo gras_msg_types.lo \ - gras_msg_exchange.lo rpc.lo timer.lo process.lo gras_module.lo \ - ddt_create.lo ddt_convert.lo ddt_exchange.lo cbps.lo \ - datadesc.lo ddt_parse.lo ddt_parse.yy.lo + gras_msg_exchange.lo gras_msg_listener.lo rpc.lo timer.lo \ + process.lo gras_module.lo ddt_create.lo ddt_convert.lo \ + ddt_exchange.lo cbps.lo datadesc.lo ddt_parse.lo \ + ddt_parse.yy.lo am__objects_3 = xbt_rl_synchro.lo xbt_rl_time.lo am__objects_4 = rl_stubs.lo xbt_os_thread.lo rl_transport.lo \ transport_plugin_file.lo transport_plugin_tcp.lo rl_emul.lo \ @@ -123,12 +124,12 @@ am__libsimgrid_la_SOURCES_DIST = xbt/snprintf.c xbt/xbt_str.c xbt/ex.c \ simdag/sd_workstation.c gras/gras.c gras/Transport/transport.c \ gras/Transport/transport_private.h gras/Msg/gras_msg_mod.c \ gras/Msg/gras_msg_types.c gras/Msg/gras_msg_exchange.c \ - gras/Msg/rpc.c gras/Msg/timer.c gras/Msg/msg_interface.h \ - gras/Msg/msg_private.h gras/Virtu/process.c \ - gras/Virtu/gras_module.c gras/DataDesc/ddt_create.c \ - gras/DataDesc/ddt_convert.c gras/DataDesc/ddt_exchange.c \ - gras/DataDesc/cbps.c gras/DataDesc/datadesc.c \ - gras/DataDesc/datadesc_interface.h \ + gras/Msg/gras_msg_listener.c gras/Msg/rpc.c gras/Msg/timer.c \ + gras/Msg/msg_interface.h gras/Msg/msg_private.h \ + gras/Virtu/process.c gras/Virtu/gras_module.c \ + gras/DataDesc/ddt_create.c gras/DataDesc/ddt_convert.c \ + gras/DataDesc/ddt_exchange.c gras/DataDesc/cbps.c \ + gras/DataDesc/datadesc.c gras/DataDesc/datadesc_interface.h \ gras/DataDesc/datadesc_private.h gras/DataDesc/ddt_parse.c \ gras/DataDesc/ddt_parse.yy.c gras/DataDesc/ddt_parse.yy.h \ gras/Transport/sg_transport.c \ @@ -192,12 +193,12 @@ am__libsimgrid4java_la_SOURCES_DIST = xbt/snprintf.c xbt/xbt_str.c \ gras/gras.c gras/Transport/transport.c \ gras/Transport/transport_private.h gras/Msg/gras_msg_mod.c \ gras/Msg/gras_msg_types.c gras/Msg/gras_msg_exchange.c \ - gras/Msg/rpc.c gras/Msg/timer.c gras/Msg/msg_interface.h \ - gras/Msg/msg_private.h gras/Virtu/process.c \ - gras/Virtu/gras_module.c gras/DataDesc/ddt_create.c \ - gras/DataDesc/ddt_convert.c gras/DataDesc/ddt_exchange.c \ - gras/DataDesc/cbps.c gras/DataDesc/datadesc.c \ - gras/DataDesc/datadesc_interface.h \ + gras/Msg/gras_msg_listener.c gras/Msg/rpc.c gras/Msg/timer.c \ + gras/Msg/msg_interface.h gras/Msg/msg_private.h \ + gras/Virtu/process.c gras/Virtu/gras_module.c \ + gras/DataDesc/ddt_create.c gras/DataDesc/ddt_convert.c \ + gras/DataDesc/ddt_exchange.c gras/DataDesc/cbps.c \ + gras/DataDesc/datadesc.c gras/DataDesc/datadesc_interface.h \ gras/DataDesc/datadesc_private.h gras/DataDesc/ddt_parse.c \ gras/DataDesc/ddt_parse.yy.c gras/DataDesc/ddt_parse.yy.h \ gras/Transport/sg_transport.c \ @@ -576,7 +577,7 @@ GRAS_COMMON_SRC = \ gras/gras.c \ gras/Transport/transport.c gras/Transport/transport_private.h \ gras/Msg/gras_msg_mod.c gras/Msg/gras_msg_types.c \ - gras/Msg/gras_msg_exchange.c \ + gras/Msg/gras_msg_exchange.c gras/Msg/gras_msg_listener.c \ gras/Msg/rpc.c gras/Msg/timer.c \ gras/Msg/msg_interface.h gras/Msg/msg_private.h \ \ @@ -789,6 +790,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_module.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_msg_exchange.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_msg_listener.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_msg_mod.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_msg_types.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gtnets_interface.Plo@am__quote@ @@ -1128,6 +1130,13 @@ gras_msg_exchange.lo: gras/Msg/gras_msg_exchange.c @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o gras_msg_exchange.lo `test -f 'gras/Msg/gras_msg_exchange.c' || echo '$(srcdir)/'`gras/Msg/gras_msg_exchange.c +gras_msg_listener.lo: gras/Msg/gras_msg_listener.c +@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT gras_msg_listener.lo -MD -MP -MF $(DEPDIR)/gras_msg_listener.Tpo -c -o gras_msg_listener.lo `test -f 'gras/Msg/gras_msg_listener.c' || echo '$(srcdir)/'`gras/Msg/gras_msg_listener.c +@am__fastdepCC_TRUE@ mv -f $(DEPDIR)/gras_msg_listener.Tpo $(DEPDIR)/gras_msg_listener.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='gras/Msg/gras_msg_listener.c' object='gras_msg_listener.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o gras_msg_listener.lo `test -f 'gras/Msg/gras_msg_listener.c' || echo '$(srcdir)/'`gras/Msg/gras_msg_listener.c + rpc.lo: gras/Msg/rpc.c @am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT rpc.lo -MD -MP -MF $(DEPDIR)/rpc.Tpo -c -o rpc.lo `test -f 'gras/Msg/rpc.c' || echo '$(srcdir)/'`gras/Msg/rpc.c @am__fastdepCC_TRUE@ mv -f $(DEPDIR)/rpc.Tpo $(DEPDIR)/rpc.Plo diff --git a/src/gras/Msg/gras_msg_listener.c b/src/gras/Msg/gras_msg_listener.c new file mode 100644 index 0000000000..028dffecf6 --- /dev/null +++ b/src/gras/Msg/gras_msg_listener.c @@ -0,0 +1,53 @@ +/* $Id$ */ + +/* Thread in charge of listening the network and queuing incoming messages */ + +/* Copyright (c) 2007 Martin Quinson. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "gras/Msg/msg_private.h" +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read,gras_msg,"Message reader thread"); + +#include "xbt/ex.h" +#include "xbt/queue.h" +#include "xbt/synchro.h" + +//#include "gras/Virtu/virtu_interface.h" +//#include "gras/DataDesc/datadesc_interface.h" +#include "gras/Transport/transport_interface.h" /* gras_select */ +//#include "portable.h" /* execinfo when available to propagate exceptions */ + + +typedef struct s_gras_msg_listener_ { + xbt_queue_t incomming_messages; + xbt_thread_t listener; +} s_gras_msg_listener_t; + +static void listener_function(void *p) { + gras_msg_listener_t me = (gras_msg_listener_t)p; + s_gras_msg_t msg; + + while (1) { + msg.expe = gras_trp_select(-1); + gras_msg_recv(msg.expe, &msg); + xbt_queue_push(me->incomming_messages, &msg); + } +} + +gras_msg_listener_t +gras_msg_listener_launch(xbt_queue_t msg_exchange){ + gras_msg_listener_t arg = xbt_new0(s_gras_msg_listener_t,1); + + arg->incomming_messages = msg_exchange; + + arg->listener = xbt_thread_create(listener_function,&arg); + return arg; +} + +void gras_msg_listener_shutdown(gras_msg_listener_t l) { + xbt_thread_cancel(l->listener); + xbt_queue_free(&l->incomming_messages); + xbt_free(l); +} diff --git a/src/gras/Msg/msg_private.h b/src/gras/Msg/msg_private.h index 540e000a64..07c142529f 100644 --- a/src/gras/Msg/msg_private.h +++ b/src/gras/Msg/msg_private.h @@ -17,6 +17,7 @@ #include "xbt/sysdep.h" #include "xbt/log.h" #include "xbt/dynar.h" +#include "xbt/queue.h" #include "xbt/set.h" #include "gras/transport.h" #include "gras/datadesc.h" @@ -73,6 +74,13 @@ void gras_msg_send_ext(gras_socket_t sock, gras_msgtype_t msgtype, void *payload); +/* The thread in charge of receiving messages and queuing them */ +typedef struct s_gras_msg_listener_ *gras_msg_listener_t; +gras_msg_listener_t +gras_msg_listener_launch(xbt_queue_t msg_exchange); +/* The caller has the responsability to cleanup the queues himself */ +void gras_msg_listener_shutdown(gras_msg_listener_t); + /** * gras_cblist_t: * @@ -121,6 +129,7 @@ gras_msg_cb_ctx_t gras_msg_cb_ctx_new(gras_socket_t expe, double timeout); + /* We deploy a mallocator on the RPC contextes */ #include "xbt/mallocator.h" extern xbt_mallocator_t gras_msg_ctx_mallocator; -- 2.20.1