3 /* file trp (transport) - send/receive a bunch of bytes in SG realm */
5 /* Note that this is only used to debug other parts of GRAS since message */
6 /* exchange in SG realm is implemented directly without mimicing real life */
7 /* This would be terribly unefficient. */
9 /* Authors: Martin Quinson */
10 /* Copyright (C) 2004 Martin Quinson. */
12 /* This program is free software; you can redistribute it and/or modify it
13 under the terms of the license (GNU LGPL) which comes with this package. */
17 #include "gras_private.h"
18 #include "transport_private.h"
20 GRAS_LOG_EXTERNAL_CATEGORY(transport);
21 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport);
26 gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
29 /* OUT */ gras_socket_t *sock);
30 gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
32 /* OUT */ gras_socket_t *sock);
33 void gras_trp_sg_socket_close(gras_socket_t *sd);
35 gras_error_t gras_trp_sg_chunk_send(gras_socket_t *sd,
39 gras_error_t gras_trp_sg_chunk_recv(gras_socket_t *sd,
44 gras_error_t gras_trp_sg_flush(gras_socket_t *sd);
48 *** Specific plugin part
51 int placeholder; /* nothing plugin specific so far */
52 } gras_trp_sg_plug_specific_t;
55 *** Specific socket part
58 int from_PID; /* process which sent this message */
59 int to_PID; /* process to which this message is destinated */
61 m_host_t to_host; /* Who's on other side */
62 m_channel_t to_chan;/* Channel on which the other side is earing */
63 } gras_trp_sg_sock_specific_t;
71 gras_trp_sg_setup(gras_trp_plugin_t *plug) {
73 gras_trp_sg_plug_specific_t *sg=malloc(sizeof(gras_trp_sg_plug_specific_t));
77 plug->socket_client = gras_trp_sg_socket_client;
78 plug->socket_server = gras_trp_sg_socket_server;
79 plug->socket_close = gras_trp_sg_socket_close;
81 plug->chunk_send = gras_trp_sg_chunk_send;
82 plug->chunk_recv = gras_trp_sg_chunk_recv;
89 gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
92 /* OUT */ gras_socket_t *dst){
98 /* make sure this socket will reach someone */
99 if (!(peer=MSG_get_host_by_name(host))) {
100 fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host);
101 return mismatch_error;
103 if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) {
104 fprintf(stderr,"GRAS: can't connect to %s: no process on this host.\n",host);
105 return mismatch_error;
107 for (i=0; i<hd->portLen && port != hd->port[i]; i++);
108 if (i == hd->portLen) {
109 fprintf(stderr,"GRAS: can't connect to %s:%d, no process listen on this port.\n",host,port);
110 return mismatch_error;
113 if (hd->raw[i] && !raw) {
114 fprintf(stderr,"GRAS: can't connect to %s:%d in regular mode, the process listen in raw mode on this port.\n",host,port);
115 return mismatch_error;
117 if (!hd->raw[i] && raw) {
118 fprintf(stderr,"GRAS: can't connect to %s:%d in raw mode, the process listen in regular mode on this port.\n",host,port);
119 return mismatch_error;
123 /* Create the socket */
124 if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
125 fprintf(stderr,"GRAS: openClientSocket: out of memory\n");
129 (*sock)->server_sock = 0;
130 (*sock)->raw_sock = raw;
131 (*sock)->from_PID = MSG_process_self_PID();
132 (*sock)->to_PID = hd->proc[ hd->port2chan[i] ];
133 (*sock)->to_host = peer;
134 (*sock)->to_port = port;
135 (*sock)->to_chan = hd->port2chan[i];
138 fprintf(stderr,"GRAS: %s (PID %d) connects in %s mode to %s:%d (to_PID=%d).\n",
139 MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
140 raw?"RAW":"regular",host,port,(*sock)->to_PID);
144 gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
146 /* OUT */ gras_socket_t *dst){
148 gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
149 gras_procdata_t *pd=(gras_procdata_t *)MSG_process_get_data(MSG_process_self());
151 const char *host=MSG_host_get_name(MSG_host_self());
153 gras_assert0(hd,"Please run gras_process_init on each process");
154 gras_assert0(pd,"Please run gras_process_init on each process");
156 for (port=startingPort ; port <= endingPort ; port++) {
157 for (i=0; i<hd->portLen && hd->port[i] != port; i++);
158 if (i<hd->portLen && hd->port[i] == port)
161 /* port not used so far. Do it */
162 if (i == hd->portLen) {
163 /* need to enlarge the tables */
165 hd->port2chan=(int*)realloc(hd->port2chan,hd->portLen*sizeof(int));
166 hd->port =(int*)realloc(hd->port ,hd->portLen*sizeof(int));
167 hd->raw =(int*)realloc(hd->raw ,hd->portLen*sizeof(int));
169 hd->port2chan=(int*)malloc(hd->portLen*sizeof(int));
170 hd->port =(int*)malloc(hd->portLen*sizeof(int));
171 hd->raw =(int*)malloc(hd->portLen*sizeof(int));
173 if (!hd->port2chan || !hd->port || !hd->raw) {
174 fprintf(stderr,"GRAS: PANIC: A malloc error did lose all ports attribution on this host\n");
179 hd->port2chan[ i ]=raw ? pd->rawChan : pd->chan;
183 /* Create the socket */
184 if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
185 fprintf(stderr,"GRAS: openServerSocket: out of memory\n");
189 (*sock)->server_sock = 1;
190 (*sock)->raw_sock = raw;
191 (*sock)->from_PID = -1;
192 (*sock)->to_PID = MSG_process_self_PID();
193 (*sock)->to_host = MSG_host_self();
194 (*sock)->to_port = port;
195 (*sock)->to_chan = pd->chan;
198 fprintf(stderr,"GRAS: '%s' (%d) ears on %s:%d%s (%p).\n",
199 MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
200 host,port,raw? " (mode RAW)":"",*sock);
204 /* if we go out of the previous for loop, that's that we didn't find any
205 suited port number */
208 "GRAS: can't find an empty port between %d and %d to open a socket on host %s\n.",
209 startingPort,endingPort,host);
210 return mismatch_error;
213 void gras_trp_sg_socket_close(gras_socket_t *sd){
214 gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
218 gras_assert0(hd,"Please run gras_process_init on each process");
220 if (raw && !sd->raw_sock) {
221 fprintf(stderr,"GRAS: gras_rawsock_close: Was passed a regular socket. Please use gras_sock_close()\n");
223 if (!raw && sd->raw_sock) {
224 fprintf(stderr,"GRAS: grasSockClose: Was passed a raw socket. Please use gras_rawsock_close()\n");
226 if (sd->server_sock) {
227 /* server mode socket. Un register it from 'OS' tables */
229 i<hd->portLen && sd->to_port != hd->port[i];
232 if (i==hd->portLen) {
233 fprintf(stderr,"GRAS: closeSocket: The host does not know this server socket.\n");
235 memmove(&(hd->port[i]), &(hd->port[i+1]), (hd->portLen -i -1) * sizeof(int));
236 memmove(&(hd->raw[i]), &(hd->raw[i+1]), (hd->portLen -i -1) * sizeof(int));
237 memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
244 gras_error_t gras_trp_sg_chunk_send(gras_socket_t *sd,
248 static unsigned int count=0;
251 sprintf(name,"Chunk[%d]",count++);
252 task=MSG_task_create(name,0,((double)size)/(1024.0*1024.0),NULL);
254 if (MSG_task_put(task, sock->to_host,sock->to_chan) != MSG_OK) {
255 RAISE(system_error,"Problem during the MSG_task_put");
261 gras_error_t gras_trp_sg_chunk_recv(gras_socket_t *sd,
265 (gras_procdata_t*)MSG_process_get_data(MSG_process_self());
267 unsigned int bytesTotal=0;
270 if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
271 fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
272 return unknown_error;
274 if (MSG_task_destroy(task) != MSG_OK) {
275 fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
276 return unknown_error;
284 gras_error_t gras_trp_sg_flush(gras_socket_t *sd){