+ DEBUG1("gras_trp value %d",_gras_trp_started);
+ if (_gras_trp_started == 0) {
+ return;
+ }
+
+ if ( --_gras_trp_started == 0 ) {
+#ifdef HAVE_WINSOCK_H
+ if ( WSACleanup() == SOCKET_ERROR ) {
+ if ( WSAGetLastError() == WSAEINPROGRESS ) {
+ WSACancelBlockingCall();
+ WSACleanup();
+ }
+ }
+#endif
+
+ /* Delete the plugins */
+ xbt_dict_free(&_gras_trp_plugins);
+ }
+}
+
+
+void gras_trp_plugin_free(void *p) {
+ gras_trp_plugin_t plug = p;
+
+ if (plug) {
+ if (plug->exit) {
+ plug->exit(plug);
+ } else if (plug->data) {
+ DEBUG1("Plugin %s lacks exit(). Free data anyway.",plug->name);
+ free(plug->data);
+ }
+
+ free(plug->name);
+ free(plug);
+ }
+}
+
+
+/**
+ * gras_trp_socket_new:
+ *
+ * Malloc a new socket, and initialize it with defaults
+ */
+void gras_trp_socket_new(int incoming,
+ gras_socket_t *dst) {
+
+ gras_socket_t sock=xbt_new0(s_gras_socket_t,1);
+
+ VERB1("Create a new socket (%p)", (void*)sock);
+
+ sock->plugin = NULL;
+
+ sock->incoming = incoming ? 1:0;
+ sock->outgoing = incoming ? 0:1;
+ sock->accepting = incoming ? 1:0;
+ sock->meas = 0;
+ sock->recv_ok = 1;
+ sock->valid = 1;
+ sock->moredata = 0;
+
+ sock->sd = -1;
+ sock->port = -1;
+ sock->peer_port = -1;
+ sock->peer_name = NULL;
+ sock->peer_proc = NULL;
+
+ sock->data = NULL;
+ sock->bufdata = NULL;
+
+ *dst = sock;
+
+ xbt_dynar_push(((gras_trp_procdata_t)
+ gras_libdata_by_id(gras_trp_libdata_id))->sockets,dst);
+ XBT_OUT;
+}
+
+/**
+ * @brief Opens a server socket and makes it ready to be listened to.
+ * @param port: port on which you want to listen
+ * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
+ * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
+ *
+ * In real life, you'll get a TCP socket.
+ */
+gras_socket_t
+gras_socket_server_ext(unsigned short port,
+
+ unsigned long int buf_size,
+ int measurement) {
+
+ xbt_ex_t e;
+ gras_trp_plugin_t trp;
+ gras_socket_t sock;
+
+ DEBUG2("Create a server socket from plugin %s on port %d",
+ gras_if_RL() ? "tcp" : "sg",
+ port);
+ trp = gras_trp_plugin_get_by_name(gras_if_SG() ? "sg":"tcp");
+
+ /* defaults settings */
+ gras_trp_socket_new(1,&sock);
+ sock->plugin= trp;
+ sock->port=port;
+ sock->buf_size = buf_size>0 ? buf_size : 32*1024;
+ sock->meas = measurement;
+
+ /* Call plugin socket creation function */
+ DEBUG1("Prepare socket with plugin (fct=%p)",trp->socket_server);
+ TRY {
+ trp->socket_server(trp, sock);
+ DEBUG3("in=%c out=%c accept=%c",
+ sock->incoming?'y':'n',
+ sock->outgoing?'y':'n',
+ sock->accepting?'y':'n');
+ } CATCH(e) {
+ int cursor;
+ gras_socket_t sock_iter;
+ xbt_dynar_t socks = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
+ xbt_dynar_foreach(socks, cursor, sock_iter) {
+ if (sock_iter==sock) {
+ xbt_dynar_cursor_rm(socks,&cursor);
+ }
+ }
+ free(sock);
+ RETHROW;
+ }
+
+ if (!measurement)
+ ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport = port;
+ return sock;
+}
+/**
+ * @brief Opens a server socket on any port in the given range
+ *
+ * @param minport: first port we will try
+ * @param maxport: last port we will try
+ * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
+ * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
+ *
+ * If none of the provided ports works, raises the exception got when trying the last possibility
+ */
+gras_socket_t
+gras_socket_server_range(unsigned short minport, unsigned short maxport,
+ unsigned long int buf_size, int measurement) {
+
+ int port;
+ gras_socket_t res=NULL;
+ xbt_ex_t e;
+
+ for (port=minport; port<maxport;port ++) {
+ TRY {
+ res=gras_socket_server_ext(port,buf_size,measurement);
+ } CATCH(e) {
+ if (port==maxport)
+ RETHROW;
+ xbt_ex_free(e);
+ }
+ if (res)
+ return res;
+ }
+ THROW_IMPOSSIBLE;
+}
+
+/**
+ * @brief Opens a client socket to a remote host.
+ * @param host: who you want to connect to
+ * @param port: where you want to connect to on this host
+ * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
+ * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
+ *
+ * In real life, you'll get a TCP socket.
+ */
+gras_socket_t
+gras_socket_client_ext(const char *host,
+ unsigned short port,
+
+ unsigned long int buf_size,
+ int measurement) {
+
+ xbt_ex_t e;
+ gras_trp_plugin_t trp;
+ gras_socket_t sock;
+
+ trp = gras_trp_plugin_get_by_name(gras_if_SG() ? "sg":"tcp");
+
+ DEBUG1("Create a client socket from plugin %s",gras_if_RL() ? "tcp" : "sg");
+ /* defaults settings */
+ gras_trp_socket_new(0,&sock);
+ sock->plugin= trp;
+ sock->peer_port = port;
+ sock->peer_name = (char*)strdup(host?host:"localhost");
+ sock->buf_size = buf_size>0 ? buf_size: 32*1024;
+ sock->meas = measurement;
+
+ /* plugin-specific */
+ TRY {
+ (*trp->socket_client)(trp, sock);
+ DEBUG3("in=%c out=%c accept=%c",
+ sock->incoming?'y':'n',
+ sock->outgoing?'y':'n',
+ sock->accepting?'y':'n');
+ } CATCH(e) {
+ xbt_dynar_pop(((gras_trp_procdata_t)
+ gras_libdata_by_id(gras_trp_libdata_id))->sockets,NULL);
+ free(sock);
+ RETHROW;
+ }
+
+ return sock;
+}
+
+/**
+ * gras_socket_server:
+ *
+ * Opens a server socket and make it ready to be listened to.
+ * In real life, you'll get a TCP socket.
+ */
+gras_socket_t
+gras_socket_server(unsigned short port) {
+ return gras_socket_server_ext(port,32*1024,0);
+}
+
+/** @brief Opens a client socket to a remote host */
+gras_socket_t
+gras_socket_client(const char *host,
+ unsigned short port) {
+ return gras_socket_client_ext(host,port,0,0);
+}
+
+/** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
+gras_socket_t
+gras_socket_client_from_string(const char *host) {
+ xbt_peer_t p = xbt_peer_from_string(host);
+ gras_socket_t res = gras_socket_client_ext(p->name,p->port,0,0);
+ xbt_peer_free(p);
+ return res;
+}
+
+/** \brief Close socket */
+void gras_socket_close(gras_socket_t sock) {
+ xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
+ gras_socket_t sock_iter = NULL;
+ int cursor;
+
+ XBT_IN;
+ VERB1("Close %p",sock);
+ if (sock == _gras_lastly_selected_socket) {
+ xbt_assert0(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
+ "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
+
+ if (sock->moredata)
+ CRITICAL0("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
+ _gras_lastly_selected_socket=NULL;
+ }
+
+ /* FIXME: Issue an event when the socket is closed */
+ DEBUG1("sockets pointer before %p",sockets);
+ if (sock) {
+ /* FIXME: Cannot get the dynar mutex, because it can be already locked */
+// _xbt_dynar_foreach(sockets,cursor,sock_iter) {
+ for (cursor=0; cursor< xbt_dynar_length(sockets); cursor++) {
+ _xbt_dynar_cursor_get(sockets,&cursor,&sock_iter);
+ if (sock == sock_iter) {
+ DEBUG2("remove sock cursor %d dize %lu\n",cursor,xbt_dynar_length(sockets));
+ xbt_dynar_cursor_rm(sockets,&cursor);
+ if (sock->plugin->socket_close)
+ (* sock->plugin->socket_close)(sock);
+
+ /* free the memory */
+ if (sock->peer_name)
+ free(sock->peer_name);
+ free(sock);
+ XBT_OUT;
+ return;
+ }
+ }
+ WARN1("Ignoring request to free an unknown socket (%p). Execution stack:",sock);
+ xbt_backtrace_display_current();
+ }
+ XBT_OUT;
+}