Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
a bit more debugs
[simgrid.git] / src / gras / Msg / gras_msg_exchange.c
index 65b30f9..89b7cbf 100644 (file)
@@ -12,7 +12,6 @@
 #include "xbt/ex_interface.h"
 #include "gras/Msg/msg_private.h"
 #include "gras/Virtu/virtu_interface.h"
-#include "gras/Transport/transport_interface.h" /* gras_select */
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
 
@@ -92,8 +91,11 @@ gras_msg_wait_ext_(double           timeout,
     memset(&msg,sizeof(msg),0);
 
     TRY {
+                       xbt_queue_shift_timed(pd->msg_received,&msg,timeout ? timeout - now + start : 0);
+                       /*
       msg.expe = gras_trp_select(timeout ? timeout - now + start : 0);
       gras_msg_recv(msg.expe, &msg);
+                       */
     } CATCH(e) {
       if (e.category == system_error &&
          !strncmp("Socket closed by remote side",e.msg,
@@ -288,7 +290,9 @@ gras_msg_handleall(double period) {
  * @param timeOut: How long to wait for incoming messages (in seconds)
  * @return the error code (or no_error).
  *
- * Messages are passed to the callbacks. See also gras_msg_handleall().
+ * Any message arriving in the given interval is passed to the callbacks.
+ * 
+ * @sa gras_msg_handleall().
  */
 void
 gras_msg_handle(double timeOut) {
@@ -330,28 +334,15 @@ gras_msg_handle(double timeOut) {
     xbt_dynar_shift(pd->msg_queue,&msg);
   } else {
     TRY {
-      msg.expe = gras_trp_select(timeOut);
+       xbt_queue_shift_timed(pd->msg_received,&msg,timeOut);
+//      msg.expe = gras_trp_select(timeOut);
     } CATCH(e) {
       if (e.category != timeout_error)
        RETHROW;
+      DEBUG0("Damn. Timeout while getting a message from the queue");
       xbt_ex_free(e);
       timeouted = 1;
     }
-
-    if (!timeouted) {
-      TRY {
-       /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
-       gras_msg_recv(msg.expe, &msg);
-       DEBUG1("Received a msg from the socket kind:%s",
-              e_gras_msg_kind_names[msg.kind]);
-    
-      } CATCH(e) {
-       RETHROW4("Error while receiving a message on select()ed socket %p to [%s]%s:%d: %s",
-                msg.expe,
-                gras_socket_peer_proc(msg.expe),gras_socket_peer_name(msg.expe),
-                gras_socket_peer_port(msg.expe));
-      }
-    }
   }
 
   if (timeouted) {
@@ -446,9 +437,12 @@ gras_msg_handle(double timeOut) {
        RETHROW0("Callback raised an exception: %s");
       }
     }
-    xbt_assert0(!(ctx.answer_due),
-               "RPC callback didn't call gras_msg_rpcreturn");
 
+    xbt_assert1(! ctx.answer_due,
+               "Bug in user code: RPC callback to message '%s' didn't call gras_msg_rpcreturn",msg.type->name);
+    if (ctx.answer_due)
+       CRITICAL1("BUGS BOTH IN USER CODE (RPC callback to message '%s' didn't call gras_msg_rpcreturn) "
+                "AND IN SIMGRID (process wasn't killed by an assert)",msg.type->name);
     if (!ran_ok)
       THROW1(mismatch_error,0,
             "Message '%s' refused by all registered callbacks", msg.type->name);