Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
New thread to receive messages. Not working yet.
authordonassbr <donassbr@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 18 Jul 2007 12:29:51 +0000 (12:29 +0000)
committerdonassbr <donassbr@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 18 Jul 2007 12:29:51 +0000 (12:29 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3859 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Msg/gras_msg_exchange.c
src/gras/Msg/gras_msg_listener.c
src/gras/Msg/gras_msg_mod.c
src/gras/Msg/msg_interface.h
src/gras/Virtu/virtu_private.h
src/gras/gras.c
src/simix/smx_process.c
src/xbt/xbt_sg_synchro.c

index 5a7b9cf..b47dae8 100644 (file)
@@ -92,8 +92,11 @@ gras_msg_wait_ext_(double           timeout,
     memset(&msg,sizeof(msg),0);
 
     TRY {
+                       xbt_queue_shift_timed(pd->msg_received,&msg,timeout ? timeout - now + start : 0);
+                       /*
       msg.expe = gras_trp_select(timeout ? timeout - now + start : 0);
       gras_msg_recv(msg.expe, &msg);
+                       */
     } CATCH(e) {
       if (e.category == system_error &&
          !strncmp("Socket closed by remote side",e.msg,
@@ -332,28 +335,14 @@ gras_msg_handle(double timeOut) {
     xbt_dynar_shift(pd->msg_queue,&msg);
   } else {
     TRY {
-      msg.expe = gras_trp_select(timeOut);
+                       xbt_queue_shift_timed(pd->msg_received,&msg,timeOut);
+//      msg.expe = gras_trp_select(timeOut);
     } CATCH(e) {
       if (e.category != timeout_error)
        RETHROW;
       xbt_ex_free(e);
       timeouted = 1;
     }
-
-    if (!timeouted) {
-      TRY {
-       /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
-       gras_msg_recv(msg.expe, &msg);
-       DEBUG1("Received a msg from the socket kind:%s",
-              e_gras_msg_kind_names[msg.kind]);
-    
-      } CATCH(e) {
-       RETHROW4("Error while receiving a message on select()ed socket %p to [%s]%s:%d: %s",
-                msg.expe,
-                gras_socket_peer_proc(msg.expe),gras_socket_peer_name(msg.expe),
-                gras_socket_peer_port(msg.expe));
-      }
-    }
   }
 
   if (timeouted) {
index 028dffe..32dee00 100644 (file)
@@ -30,7 +30,7 @@ static void listener_function(void *p) {
   s_gras_msg_t msg;
 
   while (1) {
-    msg.expe = gras_trp_select(-1);
+    msg.expe = gras_trp_select(1000);
     gras_msg_recv(msg.expe, &msg);
     xbt_queue_push(me->incomming_messages, &msg);
   }
@@ -42,7 +42,7 @@ gras_msg_listener_launch(xbt_queue_t msg_exchange){
 
   arg->incomming_messages = msg_exchange;
 
-  arg->listener =  xbt_thread_create(listener_function,&arg);
+  arg->listener =  xbt_thread_create(listener_function,arg);
   return arg;
 }
 
index e1662a3..2ae723f 100644 (file)
@@ -30,6 +30,7 @@ static void *gras_msg_procdata_new() {
    res->timers        = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
    res->msg_to_receive_queue = xbt_fifo_new();
    res->msg_to_receive_queue_meas = xbt_fifo_new();
+        res->msg_received = xbt_queue_new(0,sizeof(s_gras_msg_t));
    
    return (void*)res;
 }
index 9c53b39..7c726d9 100644 (file)
@@ -41,6 +41,8 @@ typedef struct {
        /* queue storing the msgs that have to received and the process synchronization made (wait the surf action done) */
        xbt_fifo_t msg_to_receive_queue; /* elm type: s_gras_msg_t */
        xbt_fifo_t msg_to_receive_queue_meas; /* elm type: s_gras_msg_t */
+       xbt_queue_t msg_received;
+
 
 } s_gras_msg_procdata_t,*gras_msg_procdata_t;
 
index 394012c..4171d2e 100644 (file)
@@ -16,6 +16,7 @@
 #include "xbt/dynar.h"
 #include "gras/Virtu/virtu_interface.h"
 #include "simix/simix.h"
+#include "gras/Msg/msg_private.h"
 
 /** @brief Data for each process */
 typedef struct {
@@ -36,6 +37,7 @@ typedef struct {
        
   int pid; /* pid of process, only for SG */
   int ppid; /* ppid of process, only for SG */
+       gras_msg_listener_t listener;
 } gras_procdata_t;
 
 gras_procdata_t *gras_procdata_get(void);
index 7ca7b9e..98c538a 100644 (file)
 #include "xbt/virtu.h" /* set the XBT virtualization to use GRAS */
 #include "xbt/module.h" /* xbt_init/exit */
 #include "xbt/xbt_os_time.h" /* xbt_os_time */
+#include "xbt/synchro.h"
 
 #include "Virtu/virtu_interface.h" /* Module mechanism FIXME: deplace&rename */
+#include "Virtu/virtu_private.h"
 #include "gras_modinter.h"   /* module init/exit */
 #include "amok/amok_modinter.h"   /* module init/exit */
 #include "xbt_modinter.h"   /* module init/exit */
 
 #include "gras.h"
 #include "gras/process.h" /* FIXME: killme and put process_init in modinter */
-  
+#include "gras/Msg/msg_private.h"
 #include "portable.h" /* hexa_*(); signalling stuff */
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras,XBT_LOG_ROOT_CAT,"All GRAS categories (cf. section \ref GRAS_API)");
@@ -46,6 +48,8 @@ static void gras_sigint_handler(int sig) {
 
 void gras_init(int *argc,char **argv) {
 
+       gras_procdata_t *pd;
+       gras_msg_procdata_t msg_pd;
   VERB0("Initialize GRAS");
 
   xbt_getpid = &gras_os_getpid;
@@ -83,12 +87,18 @@ void gras_init(int *argc,char **argv) {
    
   /* and then init amok */
   amok_init();
+       pd = gras_procdata_get();
+       msg_pd = gras_libdata_by_name("gras_msg");
+       pd->listener = gras_msg_listener_launch(msg_pd->msg_received);
 }
 
 void gras_exit(void) {
+       gras_procdata_t *pd;
   INFO0("Exiting GRAS");
   amok_exit();
   gras_moddata_leave();
+       pd = gras_procdata_get();
+       gras_msg_listener_shutdown(pd->listener);
   gras_process_exit();
   if (--gras_running_process == 0) {
     gras_msg_exit();
index 915bc2b..c41798c 100644 (file)
@@ -209,7 +209,7 @@ void *SIMIX_process_get_data(smx_process_t process)
 void SIMIX_process_set_data(smx_process_t process,void *data)
 {
   xbt_assert0((process != NULL), "Invalid parameters");
-  xbt_assert0((process->data == NULL), "Data already set");
+  //xbt_assert0((process->data == NULL), "Data already set");
   
   process->data = data;
    
index bf28586..45ec409 100644 (file)
@@ -24,10 +24,12 @@ typedef struct s_xbt_thread_ {
    smx_process_t s_process;
    void_f_pvoid_t *code;
    void *userparam;
+        void *father_data;
 } s_xbt_thread_t;
 
 static int xbt_thread_create_wrapper(int argc, char *argv[]) {
    xbt_thread_t t = (xbt_thread_t)SIMIX_process_get_data(SIMIX_process_self());
+        SIMIX_process_set_data(SIMIX_process_self(),t->father_data);
    (*t->code)(t->userparam);
    return 0;
 }
@@ -36,6 +38,7 @@ xbt_thread_t xbt_thread_create(void_f_pvoid_t* code, void* param)  {
    xbt_thread_t res = xbt_new0(s_xbt_thread_t,1);
    res->userparam = param;
    res->code = code;
+        res->father_data = SIMIX_process_get_data(SIMIX_process_self());
    res->s_process = SIMIX_process_create(NULL, 
                                         xbt_thread_create_wrapper, res,
                                         SIMIX_host_get_name(SIMIX_host_self()),