Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9b86f53debeb5502a5e1b3bb2fdc175c1e89ab02
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
1 /* $Id$ */
2
3 /* file trp (transport) - send/receive a bunch of bytes in SG realm         */
4
5 /* Note that this is only used to debug other parts of GRAS since message   */
6 /*  exchange in SG realm is implemented directly without mimicing real life */
7 /*  This would be terribly unefficient.                                     */
8
9 /* Authors: Martin Quinson                                                  */
10 /* Copyright (C) 2004 Martin Quinson.                                       */
11
12 /* This program is free software; you can redistribute it and/or modify it
13    under the terms of the license (GNU LGPL) which comes with this package. */
14
15 #include <msg.h>
16
17 #include "gras_private.h"
18 #include "transport_private.h"
19
20 GRAS_LOG_EXTERNAL_CATEGORY(transport);
21 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport);
22
23 /***
24  *** Prototypes 
25  ***/
26 gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
27                                        const char *host,
28                                        unsigned short port,
29                                        /* OUT */ gras_socket_t *sock);
30 gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
31                                        unsigned short port,
32                                        /* OUT */ gras_socket_t *sock);
33 void         gras_trp_sg_socket_close(gras_socket_t *sd);
34
35 gras_error_t gras_trp_sg_chunk_send(gras_socket_t *sd,
36                                     char *data,
37                                     size_t size);
38
39 gras_error_t gras_trp_sg_chunk_recv(gras_socket_t *sd,
40                                     char *data,
41                                     size_t size);
42
43 /* FIXME
44   gras_error_t gras_trp_sg_flush(gras_socket_t *sd);
45 */
46
47 /***
48  *** Specific plugin part
49  ***/
50 typedef struct {
51   int placeholder; /* nothing plugin specific so far */
52 } gras_trp_sg_plug_specific_t;
53
54 /***
55  *** Specific socket part
56  ***/
57 typedef struct {
58   int from_PID;    /* process which sent this message */
59   int to_PID;      /* process to which this message is destinated */
60
61   m_host_t to_host;   /* Who's on other side */
62   m_channel_t to_chan;/* Channel on which the other side is earing */
63 } gras_trp_sg_sock_specific_t;
64
65
66 /***
67  *** Code
68  ***/
69
70 gras_error_t
71 gras_trp_sg_setup(gras_trp_plugin_t *plug) {
72
73   gras_trp_sg_plug_specific_t *sg=malloc(sizeof(gras_trp_sg_plug_specific_t));
74   if (!sg)
75     RAISE_MALLOC;
76
77   plug->socket_client = gras_trp_sg_socket_client;
78   plug->socket_server = gras_trp_sg_socket_server;
79   plug->socket_close  = gras_trp_sg_socket_close;
80
81   plug->chunk_send    = gras_trp_sg_chunk_send;
82   plug->chunk_recv    = gras_trp_sg_chunk_recv;
83
84   plug->data      = sg; 
85
86   return no_error;
87 }
88
89 gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
90                                        const char *host,
91                                        unsigned short port,
92                                        /* OUT */ gras_socket_t *dst){
93
94   m_host_t peer;
95   gras_hostdata_t *hd;
96   int i;
97
98   /* make sure this socket will reach someone */
99   if (!(peer=MSG_get_host_by_name(host))) {
100       fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host);
101       return mismatch_error;
102   }
103   if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) {
104       fprintf(stderr,"GRAS: can't connect to %s: no process on this host.\n",host);
105       return mismatch_error;
106   }
107   for (i=0; i<hd->portLen && port != hd->port[i]; i++);
108   if (i == hd->portLen) {
109     fprintf(stderr,"GRAS: can't connect to %s:%d, no process listen on this port.\n",host,port);
110     return mismatch_error;
111   } 
112
113   if (hd->raw[i] && !raw) {
114     fprintf(stderr,"GRAS: can't connect to %s:%d in regular mode, the process listen in raw mode on this port.\n",host,port);
115     return mismatch_error;
116   }
117   if (!hd->raw[i] && raw) {
118     fprintf(stderr,"GRAS: can't connect to %s:%d in raw mode, the process listen in regular mode on this port.\n",host,port);
119     return mismatch_error;
120   }
121     
122
123   /* Create the socket */
124   if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
125       fprintf(stderr,"GRAS: openClientSocket: out of memory\n");
126       return malloc_error;
127   }    
128
129   (*sock)->server_sock  = 0;
130   (*sock)->raw_sock     = raw;
131   (*sock)->from_PID     = MSG_process_self_PID();
132   (*sock)->to_PID       = hd->proc[ hd->port2chan[i] ];
133   (*sock)->to_host      = peer;
134   (*sock)->to_port      = port;  
135   (*sock)->to_chan      = hd->port2chan[i];
136
137   /*
138   fprintf(stderr,"GRAS: %s (PID %d) connects in %s mode to %s:%d (to_PID=%d).\n",
139           MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
140           raw?"RAW":"regular",host,port,(*sock)->to_PID);
141   */
142 }
143
144 gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
145                                        unsigned short port,
146                                        /* OUT */ gras_socket_t *dst){
147
148   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
149   gras_procdata_t *pd=(gras_procdata_t *)MSG_process_get_data(MSG_process_self());
150   int port,i;
151   const char *host=MSG_host_get_name(MSG_host_self());
152
153   gras_assert0(hd,"Please run gras_process_init on each process");
154   gras_assert0(pd,"Please run gras_process_init on each process");
155
156   for (port=startingPort ; port <= endingPort ; port++) {
157     for (i=0; i<hd->portLen && hd->port[i] != port; i++);
158     if (i<hd->portLen && hd->port[i] == port)
159       continue;
160
161     /* port not used so far. Do it */
162     if (i == hd->portLen) {
163       /* need to enlarge the tables */
164       if (hd->portLen++) {
165         hd->port2chan=(int*)realloc(hd->port2chan,hd->portLen*sizeof(int));
166         hd->port     =(int*)realloc(hd->port     ,hd->portLen*sizeof(int));
167         hd->raw      =(int*)realloc(hd->raw      ,hd->portLen*sizeof(int));
168       } else {
169         hd->port2chan=(int*)malloc(hd->portLen*sizeof(int));
170         hd->port     =(int*)malloc(hd->portLen*sizeof(int));
171         hd->raw      =(int*)malloc(hd->portLen*sizeof(int));
172       }
173       if (!hd->port2chan || !hd->port || !hd->raw) {
174         fprintf(stderr,"GRAS: PANIC: A malloc error did lose all ports attribution on this host\n");
175         hd->portLen = 0;
176         return malloc_error;
177       }
178     }
179     hd->port2chan[ i ]=raw ? pd->rawChan : pd->chan;
180     hd->port[ i ]=port;
181     hd->raw[ i ]=raw;
182
183     /* Create the socket */
184     if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
185       fprintf(stderr,"GRAS: openServerSocket: out of memory\n");
186       return malloc_error;
187     }    
188     
189     (*sock)->server_sock  = 1;
190     (*sock)->raw_sock     = raw;
191     (*sock)->from_PID     = -1;
192     (*sock)->to_PID       = MSG_process_self_PID();
193     (*sock)->to_host      = MSG_host_self();
194     (*sock)->to_port      = port;  
195     (*sock)->to_chan      = pd->chan;
196
197     /*
198     fprintf(stderr,"GRAS: '%s' (%d) ears on %s:%d%s (%p).\n",
199             MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
200             host,port,raw? " (mode RAW)":"",*sock);
201     */
202     return no_error;
203   }
204   /* if we go out of the previous for loop, that's that we didn't find any
205      suited port number */
206
207   fprintf(stderr,
208           "GRAS: can't find an empty port between %d and %d to open a socket on host %s\n.",
209           startingPort,endingPort,host);
210   return mismatch_error;
211 }
212
213 void gras_trp_sg_socket_close(gras_socket_t *sd){
214   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
215   int i;
216
217   if (!sd) return;
218   gras_assert0(hd,"Please run gras_process_init on each process");
219
220   if (raw && !sd->raw_sock) {
221       fprintf(stderr,"GRAS: gras_rawsock_close: Was passed a regular socket. Please use gras_sock_close()\n");
222   }
223   if (!raw && sd->raw_sock) {
224       fprintf(stderr,"GRAS: grasSockClose: Was passed a raw socket. Please use gras_rawsock_close()\n");
225   }
226   if (sd->server_sock) {
227     /* server mode socket. Un register it from 'OS' tables */
228     for (i=0; 
229          i<hd->portLen && sd->to_port != hd->port[i]; 
230          i++);
231
232     if (i==hd->portLen) {
233       fprintf(stderr,"GRAS: closeSocket: The host does not know this server socket.\n");
234     } else {
235       memmove(&(hd->port[i]),      &(hd->port[i+1]),      (hd->portLen -i -1) * sizeof(int));
236       memmove(&(hd->raw[i]),       &(hd->raw[i+1]),       (hd->portLen -i -1) * sizeof(int));
237       memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
238       hd->portLen--;
239     }
240   } 
241   free(sd);
242 }
243
244 gras_error_t gras_trp_sg_chunk_send(gras_socket_t *sd,
245                                     char *data,
246                                     size_t size) {
247   m_task_t task=NULL;
248   static unsigned int count=0;
249   char name[256];
250   
251   sprintf(name,"Chunk[%d]",count++);
252   task=MSG_task_create(name,0,((double)size)/(1024.0*1024.0),NULL);
253
254   if (MSG_task_put(task, sock->to_host,sock->to_chan) != MSG_OK) {
255     RAISE(system_error,"Problem during the MSG_task_put");
256   }
257
258   return no_error;
259 }
260
261 gras_error_t gras_trp_sg_chunk_recv(gras_socket_t *sd,
262                                     char *data,
263                                     size_t size){
264   gras_procdata_t *pd=
265     (gras_procdata_t*)MSG_process_get_data(MSG_process_self());
266
267   unsigned int bytesTotal=0;
268   m_task_t task=NULL;
269
270   if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
271     fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
272     return unknown_error;
273   }
274   if (MSG_task_destroy(task) != MSG_OK) {
275     fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
276     return unknown_error;
277   }
278
279   return no_error;
280 }
281
282 /*FIXME
283
284 gras_error_t gras_trp_sg_flush(gras_socket_t *sd){
285   RAISE_UNIMPLEMENTED;
286 }
287 */