Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
More work on SG, almost done
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 21 Jun 2004 17:52:38 +0000 (17:52 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 21 Jun 2004 17:52:38 +0000 (17:52 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@138 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Transport/sg_transport.c
src/gras/Transport/transport_plugin_sg.c
src/gras/Virtu/sg_process.c
src/gras/Virtu/virtu_sg.h

index f6e3f05..bf56bc5 100644 (file)
 
 #include "Transport/transport_private.h"
 #include <msg.h>
 
 #include "Transport/transport_private.h"
 #include <msg.h>
+#include "Virtu/virtu_sg.h"
 
 
-//GRAS_LOG_EXTERNAL_CATEGORY(transport);
-//GRAS_LOG_DEFAULT_CATEGORY(transport);
+GRAS_LOG_EXTERNAL_CATEGORY(transport);
+GRAS_LOG_DEFAULT_CATEGORY(transport);
 
 /**
  * gras_trp_select:
 
 /**
  * gras_trp_select:
  * if timeout>0 and no message there, wait at most that amount of time before giving up.
  */
 gras_error_t 
  * if timeout>0 and no message there, wait at most that amount of time before giving up.
  */
 gras_error_t 
-gras_trp_select(double timeout,
+gras_trp_select(double timeout, 
                gras_socket_t **dst) {
 
   double startTime=gras_time();
   gras_procdata_t *pd=gras_procdata_get();
                gras_socket_t **dst) {
 
   double startTime=gras_time();
   gras_procdata_t *pd=gras_procdata_get();
+  gras_trp_sg_sock_data_t *sockdata;
+
+  int r_pid, cpt;
+  m_process_t remote;
+  gras_hostdata_t *remote_hd;
  
   do {
  
   do {
-    if (MSG_task_Iprobe((m_channel_t) pd->chan)) {
+    r_pid = MSG_task_probe_from((m_channel_t) pd->chan);
+    if (r_pid >= 0) {
       *dst = pd->sock;
       *dst = pd->sock;
+      sockdata = (*dst)->data;
+
+      remote = MSG_process_from_PID(r_pid);
+      sockdata->from_PID = r_pid;
+      sockdata->to_PID = MSG_process_self_PID();
+      sockdata->to_host = MSG_process_get_host(remote);
+
+      remote_hd=(gras_hostdata_t *)MSG_host_get_data(sockdata->to_host);
+      gras_assert0(remote_hd,"Run gras_process_init!!");
+
+      sockdata->to_chan = -1;
+      for (cpt=0; cpt< GRAS_MAX_CHANNEL; cpt++)
+       if (r_pid == remote_hd->proc[cpt])
+         sockdata->to_chan = cpt;
+
+      gras_assert0(sockdata->to_chan>0,
+                  "Got a message from a process without channel");
 
       return no_error;
     } else {
 
       return no_error;
     } else {
-      MSG_process_sleep(0.001);
+      MSG_process_sleep(0.01);
     }
   } while (gras_time()-startTime < timeout
           || MSG_task_Iprobe((m_channel_t) pd->chan));
     }
   } while (gras_time()-startTime < timeout
           || MSG_task_Iprobe((m_channel_t) pd->chan));
index 971870f..0e4a950 100644 (file)
@@ -57,18 +57,6 @@ typedef struct {
   int placeholder; /* nothing plugin specific so far */
 } gras_trp_sg_plug_data_t;
 
   int placeholder; /* nothing plugin specific so far */
 } gras_trp_sg_plug_data_t;
 
-/***
- *** Specific socket part
- ***/
-typedef struct {
-  int from_PID;    /* process which sent this message */
-  int to_PID;      /* process to which this message is destinated */
-
-  m_host_t to_host;   /* Who's on other side */
-  m_channel_t to_chan;/* Channel on which the other side is earing */
-} gras_trp_sg_sock_data_t;
-
-
 
 /***
  *** Code
 
 /***
  *** Code
@@ -202,18 +190,6 @@ gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
     pr.raw    = sock->raw;
     TRY(gras_dynar_push(hd->ports,&pr));
     
     pr.raw    = sock->raw;
     TRY(gras_dynar_push(hd->ports,&pr));
     
-    if (sock->raw) {
-      if (pd->rawSock) 
-       WARN1("asked to open two raw server sockets on %s, first one lost",
-             MSG_host_get_name(MSG_host_self()));
-      pd->rawSock = sock;
-    } else {
-      if (pd->sock) 
-       WARN1("asked to open two server sockets on %s, first one lost",
-             MSG_host_get_name(MSG_host_self()));
-      pd->sock = sock;
-    }
-
   default:
     return errcode;
   }
   default:
     return errcode;
   }
@@ -238,7 +214,6 @@ gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
 
 void gras_trp_sg_socket_close(gras_socket_t *sock){
   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
 
 void gras_trp_sg_socket_close(gras_socket_t *sock){
   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
-  gras_procdata_t *pd=gras_procdata_get();
   int cpt;
   gras_sg_portrec_t *pr;
 
   int cpt;
   gras_sg_portrec_t *pr;
 
@@ -253,11 +228,6 @@ void gras_trp_sg_socket_close(gras_socket_t *sock){
       if (pr->port == sock->port) {
        gras_dynar_cursor_rm(hd->ports, &cpt);
 
       if (pr->port == sock->port) {
        gras_dynar_cursor_rm(hd->ports, &cpt);
 
-       if (sock->raw) {
-         pd->rawSock = NULL;
-       } else {
-         pd->sock = NULL;
-       }
        return;
       }
     }
        return;
       }
     }
index fad11a9..a69254a 100644 (file)
@@ -48,7 +48,6 @@ gras_process_init() {
            MSG_host_get_name(MSG_host_self()),GRAS_MAX_CHANNEL);
 
   pd->chan = i;
            MSG_host_get_name(MSG_host_self()),GRAS_MAX_CHANNEL);
 
   pd->chan = i;
-  pd->sock = NULL;
   hd->proc[ i ] = MSG_process_self_PID();
 
   /* take a free RAW channel for this process */
   hd->proc[ i ] = MSG_process_self_PID();
 
   /* take a free RAW channel for this process */
@@ -59,9 +58,13 @@ gras_process_init() {
            MSG_host_get_name(MSG_host_self()),GRAS_MAX_CHANNEL);
   }
   pd->rawChan = i;
            MSG_host_get_name(MSG_host_self()),GRAS_MAX_CHANNEL);
   }
   pd->rawChan = i;
-  pd->rawSock = NULL;
+
   hd->proc[ i ] = MSG_process_self_PID();
 
   hd->proc[ i ] = MSG_process_self_PID();
 
+  /* Connect a dummy socket to ourselves. It's returned by select() */
+  TRY(gras_socket_client(MSG_host_get_name(MSG_host_self()),
+                        pd->chan, &(pd->sock)));
+
   VERB2("Creating process '%s' (%d)",
           MSG_process_get_name(MSG_process_self()),
           MSG_process_self_PID());
   VERB2("Creating process '%s' (%d)",
           MSG_process_get_name(MSG_process_self()),
           MSG_process_self_PID());
@@ -76,7 +79,7 @@ gras_process_exit() {
   int cpt;
   gras_sg_portrec_t pr;
 
   int cpt;
   gras_sg_portrec_t pr;
 
-  gras_assert0(hd && pd,"Run gras_process_init!!\n");
+  gras_assert0(hd && pd,"Run gras_process_init!!");
 
   INFO2("GRAS: Finalizing process '%s' (%d)",
        MSG_process_get_name(MSG_process_self()),MSG_process_self_PID());
 
   INFO2("GRAS: Finalizing process '%s' (%d)",
        MSG_process_get_name(MSG_process_self()),MSG_process_self_PID());
index 53ad96a..7a56b86 100644 (file)
@@ -31,5 +31,14 @@ typedef struct {
 
 } gras_hostdata_t;
 
 
 } gras_hostdata_t;
 
+/* data for each socket (FIXME: find a better location for that)*/
+typedef struct {
+  int from_PID;    /* process which sent this message */
+  int to_PID;      /* process to which this message is destinated */
+
+  m_host_t to_host;   /* Who's on other side */
+  m_channel_t to_chan;/* Channel on which the other side is earing */
+} gras_trp_sg_sock_data_t;
+
 
 #endif /* VIRTU_SG_H */
 
 #endif /* VIRTU_SG_H */