Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Continuation of the internal cleanup; Copy/paste of the SG implementation (not workin...
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 14 Jun 2004 20:28:06 +0000 (20:28 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 14 Jun 2004 20:28:06 +0000 (20:28 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@106 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Transport/transport_plugin_file.c
src/gras/Transport/transport_plugin_sg.c
src/gras/Transport/transport_plugin_tcp.c

index 8bbb942..de90623 100644 (file)
 
 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_file,transport);
 
-typedef struct {
-  int buffsize;
-} gras_trp_tcp_sock_specific_t;
-
 /***
  *** Prototypes 
  ***/
@@ -35,7 +31,6 @@ gras_error_t gras_trp_file_chunk_recv(gras_socket_t *sd,
                                      char *data,
                                      size_t size);
 
-void         gras_trp_file_free_specific(void *s);
 
 /***
  *** Specific plugin part
@@ -43,47 +38,36 @@ void         gras_trp_file_free_specific(void *s);
 
 typedef struct {
   fd_set incoming_socks;
-} gras_trp_file_specific_t;
+} gras_trp_file_plug_data_t;
 
 /***
  *** Specific socket part
  ***/
 
 
+
 /***
  *** Code
  ***/
 gras_error_t
-gras_trp_file_init(gras_trp_plugin_t **dst) {
+gras_trp_file_setup(gras_trp_plugin_t *plug) {
 
-  gras_trp_plugin_t *res=malloc(sizeof(gras_trp_plugin_t));
-  gras_trp_file_specific_t *specif = malloc(sizeof(gras_trp_file_specific_t));
-  if (!res || !specif)
+  gras_trp_file_plug_data_t *file = malloc(sizeof(gras_trp_file_plug_data_t));
+  if (!file)
     RAISE_MALLOC;
 
-  FD_ZERO(&(specif->incoming_socks));
+  FD_ZERO(&(file->incoming_socks));
 
-  res->name = strdup("file");
-  res->socket_client = NULL;
-  res->socket_server = NULL;
-  res->socket_accept = NULL;
-  res->socket_close  = gras_trp_file_close;
+  plug->socket_close = gras_trp_file_close;
 
-  res->chunk_send    = gras_trp_file_chunk_send;
-  res->chunk_recv    = gras_trp_file_chunk_recv;
+  plug->chunk_send   = gras_trp_file_chunk_send;
+  plug->chunk_recv   = gras_trp_file_chunk_recv;
 
-  res->specific      = (void*)specif;
-  res->free_specific = gras_trp_file_free_specific;
+  plug->data         = (void*)file;
 
-  *dst = res;
   return no_error;
 }
 
-void gras_trp_file_free_specific(void *s) {
-  gras_trp_file_specific_t *specific = s;
-  free(specific);
-}
-
 /**
  * gras_socket_client_from_file:
  *
@@ -186,10 +170,10 @@ gras_socket_server_from_file(const char*path,
 }
 
 void gras_trp_file_close(gras_socket_t *sock){
-  gras_trp_file_specific_t *specific;
+  gras_trp_file_plug_data_t *data;
   
   if (!sock) return; /* close only once */
-  specific=sock->plugin->specific;
+  data=sock->plugin->data;
 
   if (sock->sd == 0) {
     DEBUG0("Do not close stdin");
@@ -199,7 +183,7 @@ void gras_trp_file_close(gras_socket_t *sock){
     DEBUG1("close file connection %d", sock->sd);
 
     /* forget about the socket */
-    FD_CLR(sock->sd, &(specific->incoming_socks));
+    FD_CLR(sock->sd, &(data->incoming_socks));
 
     /* close the socket */
     if(close(sock->sd) < 0) {
@@ -219,6 +203,7 @@ gras_trp_file_chunk_send(gras_socket_t *sock,
                         char *data,
                         size_t size) {
   
+  gras_assert0(sock->outgoing, "Cannot write on client file socket");
   gras_assert0(size >= 0, "Cannot send a negative amount of data");
 
   while (size) {
@@ -253,8 +238,8 @@ gras_trp_file_chunk_recv(gras_socket_t *sock,
                        char *data,
                        size_t size) {
 
-  /* TCP sockets are in duplex mode, don't check direction */
   gras_assert0(sock, "Cannot recv on an NULL socket");
+  gras_assert0(sock->incoming, "Cannot recv on client file socket");
   gras_assert0(size >= 0, "Cannot receive a negative amount of data");
   
   while (size) {
index 822812e..9b86f53 100644 (file)
 /* This program is free software; you can redistribute it and/or modify it
    under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include <msg.h>
+
 #include "gras_private.h"
 #include "transport_private.h"
 
 GRAS_LOG_EXTERNAL_CATEGORY(transport);
 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport);
 
-
 /***
  *** Prototypes 
  ***/
-void         gras_trp_sg_exit(gras_trp_plugin_t *plugin);
-gras_error_t gras_trp_sg_socket_client(const char *host,
+gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
+                                      const char *host,
+                                      unsigned short port,
+                                      /* OUT */ gras_socket_t *sock);
+gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
                                       unsigned short port,
-                                      int raw, 
-                                      /* OUT */ gras_socket_t **dst);
-gras_error_t gras_trp_sg_socket_server(unsigned short port,
-                                      int raw, 
-                                      /* OUT */ gras_socket_t **dst);
-void         gras_trp_sg_socket_close(gras_socket_t **sd);
-gras_error_t gras_trp_sg_select(double timeOut,
-                               gras_socket_t **sd);
-
-gras_error_t gras_trp_sg_bloc_send(gras_socket_t *sd,
-                                  void *data,
-                                  size_t size,
-                                  double timeOut);
-
-gras_error_t gras_trp_sg_bloc_recv(gras_socket_t *sd,
-                                  void *data,
-                                  size_t size,
-                                  double timeOut);
-gras_error_t gras_trp_sg_flush(gras_socket_t *sd);
+                                      /* OUT */ gras_socket_t *sock);
+void         gras_trp_sg_socket_close(gras_socket_t *sd);
+
+gras_error_t gras_trp_sg_chunk_send(gras_socket_t *sd,
+                                   char *data,
+                                   size_t size);
+
+gras_error_t gras_trp_sg_chunk_recv(gras_socket_t *sd,
+                                   char *data,
+                                   size_t size);
+
+/* FIXME
+  gras_error_t gras_trp_sg_flush(gras_socket_t *sd);
+*/
 
 /***
  *** Specific plugin part
  ***/
 typedef struct {
-  int dummy;
-} gras_trp_sg_specific_t;
+  int placeholder; /* nothing plugin specific so far */
+} gras_trp_sg_plug_specific_t;
 
 /***
  *** Specific socket part
  ***/
+typedef struct {
+  int from_PID;    /* process which sent this message */
+  int to_PID;      /* process to which this message is destinated */
+
+  m_host_t to_host;   /* Who's on other side */
+  m_channel_t to_chan;/* Channel on which the other side is earing */
+} gras_trp_sg_sock_specific_t;
+
 
 /***
  *** Code
  ***/
 
 gras_error_t
-gras_trp_sg_init(gras_trp_plugin_t **dst) {
+gras_trp_sg_setup(gras_trp_plugin_t *plug) {
 
-  gras_trp_sg_specific_t *specific = malloc(sizeof(gras_trp_sg_specific_t));
-  if (!specific)
+  gras_trp_sg_plug_specific_t *sg=malloc(sizeof(gras_trp_sg_plug_specific_t));
+  if (!sg)
     RAISE_MALLOC;
 
+  plug->socket_client = gras_trp_sg_socket_client;
+  plug->socket_server = gras_trp_sg_socket_server;
+  plug->socket_close  = gras_trp_sg_socket_close;
+
+  plug->chunk_send    = gras_trp_sg_chunk_send;
+  plug->chunk_recv    = gras_trp_sg_chunk_recv;
+
+  plug->data      = sg; 
+
   return no_error;
 }
 
-void
-gras_trp_sg_exit(gras_trp_plugin_t *plugin) {
-  gras_trp_sg_specific_t *specific = (gras_trp_sg_specific_t*)plugin->specific;
-  free(specific);
+gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
+                                      const char *host,
+                                      unsigned short port,
+                                      /* OUT */ gras_socket_t *dst){
+
+  m_host_t peer;
+  gras_hostdata_t *hd;
+  int i;
+
+  /* make sure this socket will reach someone */
+  if (!(peer=MSG_get_host_by_name(host))) {
+      fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host);
+      return mismatch_error;
+  }
+  if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) {
+      fprintf(stderr,"GRAS: can't connect to %s: no process on this host.\n",host);
+      return mismatch_error;
+  }
+  for (i=0; i<hd->portLen && port != hd->port[i]; i++);
+  if (i == hd->portLen) {
+    fprintf(stderr,"GRAS: can't connect to %s:%d, no process listen on this port.\n",host,port);
+    return mismatch_error;
+  } 
+
+  if (hd->raw[i] && !raw) {
+    fprintf(stderr,"GRAS: can't connect to %s:%d in regular mode, the process listen in raw mode on this port.\n",host,port);
+    return mismatch_error;
+  }
+  if (!hd->raw[i] && raw) {
+    fprintf(stderr,"GRAS: can't connect to %s:%d in raw mode, the process listen in regular mode on this port.\n",host,port);
+    return mismatch_error;
+  }
+    
+
+  /* Create the socket */
+  if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
+      fprintf(stderr,"GRAS: openClientSocket: out of memory\n");
+      return malloc_error;
+  }    
+
+  (*sock)->server_sock  = 0;
+  (*sock)->raw_sock     = raw;
+  (*sock)->from_PID     = MSG_process_self_PID();
+  (*sock)->to_PID       = hd->proc[ hd->port2chan[i] ];
+  (*sock)->to_host      = peer;
+  (*sock)->to_port      = port;  
+  (*sock)->to_chan      = hd->port2chan[i];
+
+  /*
+  fprintf(stderr,"GRAS: %s (PID %d) connects in %s mode to %s:%d (to_PID=%d).\n",
+         MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
+         raw?"RAW":"regular",host,port,(*sock)->to_PID);
+  */
 }
 
-gras_error_t gras_trp_sg_socket_client(const char *host,
+gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
                                       unsigned short port,
-                                      int raw, 
-                                      /* OUT */ gras_socket_t **dst){
-  RAISE_UNIMPLEMENTED;
-}
+                                      /* OUT */ gras_socket_t *dst){
 
-gras_error_t gras_trp_sg_socket_server(unsigned short port,
-                                      int raw, 
-                                      /* OUT */ gras_socket_t **dst){
-  RAISE_UNIMPLEMENTED;
-}
+  gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
+  gras_procdata_t *pd=(gras_procdata_t *)MSG_process_get_data(MSG_process_self());
+  int port,i;
+  const char *host=MSG_host_get_name(MSG_host_self());
+
+  gras_assert0(hd,"Please run gras_process_init on each process");
+  gras_assert0(pd,"Please run gras_process_init on each process");
+
+  for (port=startingPort ; port <= endingPort ; port++) {
+    for (i=0; i<hd->portLen && hd->port[i] != port; i++);
+    if (i<hd->portLen && hd->port[i] == port)
+      continue;
+
+    /* port not used so far. Do it */
+    if (i == hd->portLen) {
+      /* need to enlarge the tables */
+      if (hd->portLen++) {
+       hd->port2chan=(int*)realloc(hd->port2chan,hd->portLen*sizeof(int));
+       hd->port     =(int*)realloc(hd->port     ,hd->portLen*sizeof(int));
+       hd->raw      =(int*)realloc(hd->raw      ,hd->portLen*sizeof(int));
+      } else {
+       hd->port2chan=(int*)malloc(hd->portLen*sizeof(int));
+       hd->port     =(int*)malloc(hd->portLen*sizeof(int));
+       hd->raw      =(int*)malloc(hd->portLen*sizeof(int));
+      }
+      if (!hd->port2chan || !hd->port || !hd->raw) {
+       fprintf(stderr,"GRAS: PANIC: A malloc error did lose all ports attribution on this host\n");
+       hd->portLen = 0;
+       return malloc_error;
+      }
+    }
+    hd->port2chan[ i ]=raw ? pd->rawChan : pd->chan;
+    hd->port[ i ]=port;
+    hd->raw[ i ]=raw;
+
+    /* Create the socket */
+    if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
+      fprintf(stderr,"GRAS: openServerSocket: out of memory\n");
+      return malloc_error;
+    }    
+    
+    (*sock)->server_sock  = 1;
+    (*sock)->raw_sock     = raw;
+    (*sock)->from_PID     = -1;
+    (*sock)->to_PID       = MSG_process_self_PID();
+    (*sock)->to_host      = MSG_host_self();
+    (*sock)->to_port      = port;  
+    (*sock)->to_chan      = pd->chan;
+
+    /*
+    fprintf(stderr,"GRAS: '%s' (%d) ears on %s:%d%s (%p).\n",
+           MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
+           host,port,raw? " (mode RAW)":"",*sock);
+    */
+    return no_error;
+  }
+  /* if we go out of the previous for loop, that's that we didn't find any
+     suited port number */
 
-void gras_trp_sg_socket_close(gras_socket_t **sd){
-  ERROR1("%s not implemented",__FUNCTION__);
-  abort();
+  fprintf(stderr,
+         "GRAS: can't find an empty port between %d and %d to open a socket on host %s\n.",
+         startingPort,endingPort,host);
+  return mismatch_error;
 }
 
-gras_error_t gras_trp_sg_select(double timeOut,
-                               gras_socket_t **sd){
-  RAISE_UNIMPLEMENTED;
+void gras_trp_sg_socket_close(gras_socket_t *sd){
+  gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
+  int i;
+
+  if (!sd) return;
+  gras_assert0(hd,"Please run gras_process_init on each process");
+
+  if (raw && !sd->raw_sock) {
+      fprintf(stderr,"GRAS: gras_rawsock_close: Was passed a regular socket. Please use gras_sock_close()\n");
+  }
+  if (!raw && sd->raw_sock) {
+      fprintf(stderr,"GRAS: grasSockClose: Was passed a raw socket. Please use gras_rawsock_close()\n");
+  }
+  if (sd->server_sock) {
+    /* server mode socket. Un register it from 'OS' tables */
+    for (i=0; 
+        i<hd->portLen && sd->to_port != hd->port[i]; 
+        i++);
+
+    if (i==hd->portLen) {
+      fprintf(stderr,"GRAS: closeSocket: The host does not know this server socket.\n");
+    } else {
+      memmove(&(hd->port[i]),      &(hd->port[i+1]),      (hd->portLen -i -1) * sizeof(int));
+      memmove(&(hd->raw[i]),       &(hd->raw[i+1]),       (hd->portLen -i -1) * sizeof(int));
+      memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
+      hd->portLen--;
+    }
+  } 
+  free(sd);
 }
+
+gras_error_t gras_trp_sg_chunk_send(gras_socket_t *sd,
+                                   char *data,
+                                   size_t size) {
+  m_task_t task=NULL;
+  static unsigned int count=0;
+  char name[256];
   
-gras_error_t gras_trp_sg_bloc_send(gras_socket_t *sd,
-                                  void *data,
-                                  size_t size,
-                                  double timeOut){
-  RAISE_UNIMPLEMENTED;
+  sprintf(name,"Chunk[%d]",count++);
+  task=MSG_task_create(name,0,((double)size)/(1024.0*1024.0),NULL);
+
+  if (MSG_task_put(task, sock->to_host,sock->to_chan) != MSG_OK) {
+    RAISE(system_error,"Problem during the MSG_task_put");
+  }
+
+  return no_error;
 }
 
-gras_error_t gras_trp_sg_bloc_recv(gras_socket_t *sd,
-                                  void *data,
-                                  size_t size,
-                                  double timeOut){
-  RAISE_UNIMPLEMENTED;
+gras_error_t gras_trp_sg_chunk_recv(gras_socket_t *sd,
+                                   char *data,
+                                   size_t size){
+  gras_procdata_t *pd=
+    (gras_procdata_t*)MSG_process_get_data(MSG_process_self());
+
+  unsigned int bytesTotal=0;
+  m_task_t task=NULL;
+
+  if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
+    fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
+    return unknown_error;
+  }
+  if (MSG_task_destroy(task) != MSG_OK) {
+    fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
+    return unknown_error;
+  }
+
+  return no_error;
 }
 
+/*FIXME
+
 gras_error_t gras_trp_sg_flush(gras_socket_t *sd){
   RAISE_UNIMPLEMENTED;
 }
-
+*/
index 4d29dbf..2f6f368 100644 (file)
 
 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport);
 
-typedef struct {
-  int buffsize;
-} gras_trp_tcp_sock_specific_t;
-
 /***
  *** Prototypes 
  ***/
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
                                        const char *host,
                                        unsigned short port,
-                                       int raw,
                                        /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
                                        unsigned short port,
-                                       int raw,
                                        /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  *sock,
                                        gras_socket_t **dst);
@@ -64,60 +58,54 @@ static int TcpProtoNumber(void);
  ***/
 
 typedef struct {
-  fd_set incoming_socks;
-} gras_trp_tcp_specific_t;
+  fd_set msg_socks;
+  fd_set raw_socks;
+} gras_trp_tcp_plug_data_t;
 
 /***
  *** Specific socket part
  ***/
 
+typedef struct {
+  int buffsize;
+} gras_trp_tcp_sock_data_t;
+
 
 /***
  *** Code
  ***/
 gras_error_t
-gras_trp_tcp_init(gras_trp_plugin_t **dst) {
+gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
 
-  gras_trp_plugin_t *res=malloc(sizeof(gras_trp_plugin_t));
-  gras_trp_tcp_specific_t *tcp = malloc(sizeof(gras_trp_tcp_specific_t));
-  if (!res || !tcp)
+  gras_trp_tcp_plug_data_t *tcp = malloc(sizeof(gras_trp_tcp_plug_data_t));
+  if (!tcp)
     RAISE_MALLOC;
 
-  FD_ZERO(&(tcp->incoming_socks));
+  FD_ZERO(&(tcp->msg_socks));
+  FD_ZERO(&(tcp->raw_socks));
 
-  res->name = strdup("TCP");
-  res->socket_client = gras_trp_tcp_socket_client;
-  res->socket_server = gras_trp_tcp_socket_server;
-  res->socket_accept = gras_trp_tcp_socket_accept;
-  res->socket_close  = gras_trp_tcp_socket_close;
+  plug->socket_client = gras_trp_tcp_socket_client;
+  plug->socket_server = gras_trp_tcp_socket_server;
+  plug->socket_accept = gras_trp_tcp_socket_accept;
+  plug->socket_close  = gras_trp_tcp_socket_close;
 
-  res->chunk_send    = gras_trp_tcp_chunk_send;
-  res->chunk_recv    = gras_trp_tcp_chunk_recv;
+  plug->chunk_send    = gras_trp_tcp_chunk_send;
+  plug->chunk_recv    = gras_trp_tcp_chunk_recv;
 
-  res->specific      = (void*)tcp;
-  res->free_specific = gras_trp_tcp_free_specific;
+  plug->data      = (void*)tcp;
 
-  *dst = res;
   return no_error;
 }
 
-void gras_trp_tcp_free_specific(void *s) {
-  gras_trp_tcp_specific_t *specific = s;
-  free(specific);
-}
-
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
                                        const char *host,
                                        unsigned short port,
-                                       int raw,
                                        /* OUT */ gras_socket_t *sock){
   
   struct sockaddr_in addr;
   struct hostent *he;
   struct in_addr *haddr;
 
-  gras_assert0(!raw,"Raw TCP sockets not implemented yet");
-   
   sock->incoming = 1; /* TCP sockets are duplex'ed */
 
   sock->sd = socket (AF_INET, SOCK_STREAM, 0);
@@ -159,15 +147,12 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
  */
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
                                        unsigned short port,
-                                       int raw,
                                        /* OUT */ gras_socket_t *sock){
 //  int size = bufSize * 1024;
   int on = 1;
   struct sockaddr_in server;
 
-  gras_assert0(!raw,"Raw TCP sockets not implemented yet");
-
-   gras_trp_tcp_specific_t *data=(gras_trp_tcp_specific_t*)self -> specific;
+  gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data;
  
   sock->outgoing  = 1; /* TCP => duplex mode */
 
@@ -194,7 +179,10 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
     RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno));
   }
 
-  FD_SET(sock->sd, &(data->incoming_socks));
+  if (sock->raw)
+    FD_SET(sock->sd, &(tcp->raw_socks));
+  else
+    FD_SET(sock->sd, &(tcp->msg_socks));
 
   DEBUG2("Openned a server socket on port %d (sock %d)",port,sock->sd);
   
@@ -274,10 +262,10 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
 }
 
 void gras_trp_tcp_socket_close(gras_socket_t *sock){
-  gras_trp_tcp_specific_t *tcp;
+  gras_trp_tcp_plug_data_t *tcp;
   
   if (!sock) return; /* close only once */
-  tcp=sock->plugin->specific;
+  tcp=sock->plugin->data;
 
   DEBUG1("close tcp connection %d\n", sock->sd);
 
@@ -296,7 +284,10 @@ void gras_trp_tcp_socket_close(gras_socket_t *sock){
   } */
 
   /* forget about the socket */
-  FD_CLR(sock->sd, &(tcp->incoming_socks));
+  if (sock->raw)
+    FD_CLR(sock->sd, &(tcp->raw_socks));
+  else
+    FD_CLR(sock->sd, &(tcp->msg_socks));
 
   /* close the socket */
   if(close(sock->sd) < 0) {