Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a thread to listen the network. Not used so far
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 17 Jul 2007 14:08:26 +0000 (14:08 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 17 Jul 2007 14:08:26 +0000 (14:08 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3832 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/Makefile.am
src/Makefile.in
src/gras/Msg/gras_msg_listener.c [new file with mode: 0644]
src/gras/Msg/msg_private.h

index 5685d01..36eea9f 100644 (file)
@@ -210,7 +210,7 @@ GRAS_COMMON_SRC= \
   gras/gras.c  \
   gras/Transport/transport.c          gras/Transport/transport_private.h  \
   gras/Msg/gras_msg_mod.c             gras/Msg/gras_msg_types.c           \
-  gras/Msg/gras_msg_exchange.c                                            \
+  gras/Msg/gras_msg_exchange.c        gras/Msg/gras_msg_listener.c        \
   gras/Msg/rpc.c                      gras/Msg/timer.c                    \
   gras/Msg/msg_interface.h            gras/Msg/msg_private.h              \
   \
index 2d2964a..ad36bbd 100644 (file)
@@ -83,9 +83,10 @@ am__objects_1 = snprintf.lo xbt_str.lo ex.lo xbt_virtu.lo \
        xbt_matrix.lo xbt_queue.lo xbt_peer.lo xbt_main.lo config.lo \
        cunit.lo graphxml_parse.lo
 am__objects_2 = gras.lo transport.lo gras_msg_mod.lo gras_msg_types.lo \
-       gras_msg_exchange.lo rpc.lo timer.lo process.lo gras_module.lo \
-       ddt_create.lo ddt_convert.lo ddt_exchange.lo cbps.lo \
-       datadesc.lo ddt_parse.lo ddt_parse.yy.lo
+       gras_msg_exchange.lo gras_msg_listener.lo rpc.lo timer.lo \
+       process.lo gras_module.lo ddt_create.lo ddt_convert.lo \
+       ddt_exchange.lo cbps.lo datadesc.lo ddt_parse.lo \
+       ddt_parse.yy.lo
 am__objects_3 = xbt_rl_synchro.lo xbt_rl_time.lo
 am__objects_4 = rl_stubs.lo xbt_os_thread.lo rl_transport.lo \
        transport_plugin_file.lo transport_plugin_tcp.lo rl_emul.lo \
@@ -123,12 +124,12 @@ am__libsimgrid_la_SOURCES_DIST = xbt/snprintf.c xbt/xbt_str.c xbt/ex.c \
        simdag/sd_workstation.c gras/gras.c gras/Transport/transport.c \
        gras/Transport/transport_private.h gras/Msg/gras_msg_mod.c \
        gras/Msg/gras_msg_types.c gras/Msg/gras_msg_exchange.c \
-       gras/Msg/rpc.c gras/Msg/timer.c gras/Msg/msg_interface.h \
-       gras/Msg/msg_private.h gras/Virtu/process.c \
-       gras/Virtu/gras_module.c gras/DataDesc/ddt_create.c \
-       gras/DataDesc/ddt_convert.c gras/DataDesc/ddt_exchange.c \
-       gras/DataDesc/cbps.c gras/DataDesc/datadesc.c \
-       gras/DataDesc/datadesc_interface.h \
+       gras/Msg/gras_msg_listener.c gras/Msg/rpc.c gras/Msg/timer.c \
+       gras/Msg/msg_interface.h gras/Msg/msg_private.h \
+       gras/Virtu/process.c gras/Virtu/gras_module.c \
+       gras/DataDesc/ddt_create.c gras/DataDesc/ddt_convert.c \
+       gras/DataDesc/ddt_exchange.c gras/DataDesc/cbps.c \
+       gras/DataDesc/datadesc.c gras/DataDesc/datadesc_interface.h \
        gras/DataDesc/datadesc_private.h gras/DataDesc/ddt_parse.c \
        gras/DataDesc/ddt_parse.yy.c gras/DataDesc/ddt_parse.yy.h \
        gras/Transport/sg_transport.c \
@@ -192,12 +193,12 @@ am__libsimgrid4java_la_SOURCES_DIST = xbt/snprintf.c xbt/xbt_str.c \
        gras/gras.c gras/Transport/transport.c \
        gras/Transport/transport_private.h gras/Msg/gras_msg_mod.c \
        gras/Msg/gras_msg_types.c gras/Msg/gras_msg_exchange.c \
-       gras/Msg/rpc.c gras/Msg/timer.c gras/Msg/msg_interface.h \
-       gras/Msg/msg_private.h gras/Virtu/process.c \
-       gras/Virtu/gras_module.c gras/DataDesc/ddt_create.c \
-       gras/DataDesc/ddt_convert.c gras/DataDesc/ddt_exchange.c \
-       gras/DataDesc/cbps.c gras/DataDesc/datadesc.c \
-       gras/DataDesc/datadesc_interface.h \
+       gras/Msg/gras_msg_listener.c gras/Msg/rpc.c gras/Msg/timer.c \
+       gras/Msg/msg_interface.h gras/Msg/msg_private.h \
+       gras/Virtu/process.c gras/Virtu/gras_module.c \
+       gras/DataDesc/ddt_create.c gras/DataDesc/ddt_convert.c \
+       gras/DataDesc/ddt_exchange.c gras/DataDesc/cbps.c \
+       gras/DataDesc/datadesc.c gras/DataDesc/datadesc_interface.h \
        gras/DataDesc/datadesc_private.h gras/DataDesc/ddt_parse.c \
        gras/DataDesc/ddt_parse.yy.c gras/DataDesc/ddt_parse.yy.h \
        gras/Transport/sg_transport.c \
@@ -576,7 +577,7 @@ GRAS_COMMON_SRC = \
   gras/gras.c  \
   gras/Transport/transport.c          gras/Transport/transport_private.h  \
   gras/Msg/gras_msg_mod.c             gras/Msg/gras_msg_types.c           \
-  gras/Msg/gras_msg_exchange.c                                            \
+  gras/Msg/gras_msg_exchange.c        gras/Msg/gras_msg_listener.c        \
   gras/Msg/rpc.c                      gras/Msg/timer.c                    \
   gras/Msg/msg_interface.h            gras/Msg/msg_private.h              \
   \
@@ -789,6 +790,7 @@ distclean-compile:
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras.Plo@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_module.Plo@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_msg_exchange.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_msg_listener.Plo@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_msg_mod.Plo@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gras_msg_types.Plo@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gtnets_interface.Plo@am__quote@
@@ -1128,6 +1130,13 @@ gras_msg_exchange.lo: gras/Msg/gras_msg_exchange.c
 @AMDEP_TRUE@@am__fastdepCC_FALSE@      DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
 @am__fastdepCC_FALSE@  $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o gras_msg_exchange.lo `test -f 'gras/Msg/gras_msg_exchange.c' || echo '$(srcdir)/'`gras/Msg/gras_msg_exchange.c
 
+gras_msg_listener.lo: gras/Msg/gras_msg_listener.c
+@am__fastdepCC_TRUE@   $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT gras_msg_listener.lo -MD -MP -MF $(DEPDIR)/gras_msg_listener.Tpo -c -o gras_msg_listener.lo `test -f 'gras/Msg/gras_msg_listener.c' || echo '$(srcdir)/'`gras/Msg/gras_msg_listener.c
+@am__fastdepCC_TRUE@   mv -f $(DEPDIR)/gras_msg_listener.Tpo $(DEPDIR)/gras_msg_listener.Plo
+@AMDEP_TRUE@@am__fastdepCC_FALSE@      source='gras/Msg/gras_msg_listener.c' object='gras_msg_listener.lo' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@      DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@  $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o gras_msg_listener.lo `test -f 'gras/Msg/gras_msg_listener.c' || echo '$(srcdir)/'`gras/Msg/gras_msg_listener.c
+
 rpc.lo: gras/Msg/rpc.c
 @am__fastdepCC_TRUE@   $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT rpc.lo -MD -MP -MF $(DEPDIR)/rpc.Tpo -c -o rpc.lo `test -f 'gras/Msg/rpc.c' || echo '$(srcdir)/'`gras/Msg/rpc.c
 @am__fastdepCC_TRUE@   mv -f $(DEPDIR)/rpc.Tpo $(DEPDIR)/rpc.Plo
diff --git a/src/gras/Msg/gras_msg_listener.c b/src/gras/Msg/gras_msg_listener.c
new file mode 100644 (file)
index 0000000..028dffe
--- /dev/null
@@ -0,0 +1,53 @@
+/* $Id$ */
+
+/* Thread in charge of listening the network and queuing incoming messages  */
+
+/* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "gras/Msg/msg_private.h"
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read,gras_msg,"Message reader thread");
+
+#include "xbt/ex.h"
+#include "xbt/queue.h"
+#include "xbt/synchro.h"
+
+//#include "gras/Virtu/virtu_interface.h"
+//#include "gras/DataDesc/datadesc_interface.h"
+#include "gras/Transport/transport_interface.h" /* gras_select */
+//#include "portable.h" /* execinfo when available to propagate exceptions */
+
+
+typedef struct s_gras_msg_listener_ {
+  xbt_queue_t incomming_messages;
+  xbt_thread_t listener;
+} s_gras_msg_listener_t;
+
+static void listener_function(void *p) {
+  gras_msg_listener_t me = (gras_msg_listener_t)p;
+  s_gras_msg_t msg;
+
+  while (1) {
+    msg.expe = gras_trp_select(-1);
+    gras_msg_recv(msg.expe, &msg);
+    xbt_queue_push(me->incomming_messages, &msg);
+  }
+}
+
+gras_msg_listener_t
+gras_msg_listener_launch(xbt_queue_t msg_exchange){
+  gras_msg_listener_t arg = xbt_new0(s_gras_msg_listener_t,1);
+
+  arg->incomming_messages = msg_exchange;
+
+  arg->listener =  xbt_thread_create(listener_function,&arg);
+  return arg;
+}
+
+void gras_msg_listener_shutdown(gras_msg_listener_t l) {
+  xbt_thread_cancel(l->listener);
+  xbt_queue_free(&l->incomming_messages);
+  xbt_free(l);
+}
index 540e000..07c1425 100644 (file)
@@ -17,6 +17,7 @@
 #include "xbt/sysdep.h"
 #include "xbt/log.h"
 #include "xbt/dynar.h"
+#include "xbt/queue.h"
 #include "xbt/set.h"
 #include "gras/transport.h"
 #include "gras/datadesc.h"
@@ -73,6 +74,13 @@ void gras_msg_send_ext(gras_socket_t   sock,
                       gras_msgtype_t  msgtype,
                       void           *payload);
 
+/* The thread in charge of receiving messages and queuing them */
+typedef struct s_gras_msg_listener_ *gras_msg_listener_t;
+gras_msg_listener_t
+gras_msg_listener_launch(xbt_queue_t msg_exchange);
+/* The caller has the responsability to cleanup the queues himself */
+void gras_msg_listener_shutdown(gras_msg_listener_t);
+
 /**
  * gras_cblist_t:
  *
@@ -121,6 +129,7 @@ gras_msg_cb_ctx_t gras_msg_cb_ctx_new(gras_socket_t expe,
                                                 double timeout);
 
 
+
 /* We deploy a mallocator on the RPC contextes */
 #include "xbt/mallocator.h"
 extern xbt_mallocator_t gras_msg_ctx_mallocator;