Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
on exit, close all the sockets left open and others leak plugs
[simgrid.git] / src / gras / SG / gras_sg.c
1 /* $Id$ */
2
3 /* gras_sg - legacy implementation of GRAS on top of the SimGrid simulator  */
4 /* This file should be KILLED whenever the raw sockets work in the new gras */
5
6 /* Copyright (c) 2003 Martin Quinson. All rights reserved.                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include <stdio.h>
12 #include <string.h>
13
14 #include "gras_sg.h"
15
16 XBT_LOG_DEFAULT_CATEGORY(GRAS);
17
18 /* **************************************************************************
19  * Openning/Maintaining/Closing connexions (private functions for both raw
20  * and regular sockets)
21  * **************************************************************************/
22 xbt_error_t
23 _gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
24                       int raw, unsigned int bufSize, /* OUT */ gras_sock_t **sock) {
25
26   gras_hostdata_t *hd=hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
27   grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
28   int port,i;
29   const char *host=MSG_host_get_name(MSG_host_self());
30
31   xbt_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
32   xbt_assert0(pd,"ProcessData=NULL !! Please run grasInit on each process\n");
33
34   for (port=startingPort ; port <= endingPort ; port++) {
35     for (i=0; i<hd->portLen && hd->port[i] != port; i++);
36     if (i<hd->portLen && hd->port[i] == port)
37       continue;
38
39     /* port not used so far. Do it */
40     if (i == hd->portLen) {
41       /* need to enlarge the tables */
42       if (hd->portLen++) {
43         hd->port2chan=(int*)realloc(hd->port2chan,hd->portLen*sizeof(int));
44         hd->port     =(int*)realloc(hd->port     ,hd->portLen*sizeof(int));
45         hd->raw      =(int*)realloc(hd->raw      ,hd->portLen*sizeof(int));
46       } else {
47         hd->port2chan=(int*)malloc(hd->portLen*sizeof(int));
48         hd->port     =(int*)malloc(hd->portLen*sizeof(int));
49         hd->raw      =(int*)malloc(hd->portLen*sizeof(int));
50       }
51       if (!hd->port2chan || !hd->port || !hd->raw) {
52         fprintf(stderr,"GRAS: PANIC: A malloc error did lose all ports attribution on this host\n");
53         hd->portLen = 0;
54         return malloc_error;
55       }
56     }
57     hd->port2chan[ i ]=raw ? pd->rawChan : pd->chan;
58     hd->port[ i ]=port;
59     hd->raw[ i ]=raw;
60
61     /* Create the socket */
62     if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
63       fprintf(stderr,"GRAS: openServerSocket: out of memory\n");
64       return malloc_error;
65     }    
66     
67     (*sock)->server_sock  = 1;
68     (*sock)->raw_sock     = raw;
69     (*sock)->from_PID     = -1;
70     (*sock)->to_PID       = MSG_process_self_PID();
71     (*sock)->to_host      = MSG_host_self();
72     (*sock)->to_port      = port;  
73     (*sock)->to_chan      = pd->chan;
74
75     /*
76     fprintf(stderr,"GRAS: '%s' (%d) ears on %s:%d%s (%p).\n",
77             MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
78             host,port,raw? " (mode RAW)":"",*sock);
79     */
80     return no_error;
81   }
82   /* if we go out of the previous for loop, that's that we didn't find any
83      suited port number */
84
85   fprintf(stderr,
86           "GRAS: can't find an empty port between %d and %d to open a socket on host %s\n.",
87           startingPort,endingPort,host);
88   return mismatch_error;
89 }
90
91 xbt_error_t
92 _gras_sock_client_open(const char *host, short port, int raw, unsigned int bufSize,
93                      /* OUT */ gras_sock_t **sock) {
94   m_host_t peer;
95   gras_hostdata_t *hd;
96   int i;
97
98   /* make sure this socket will reach someone */
99   if (!(peer=MSG_get_host_by_name(host))) {
100       fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host);
101       return mismatch_error;
102   }
103   if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) {
104       fprintf(stderr,"GRAS: can't connect to %s: no process on this host.\n",host);
105       return mismatch_error;
106   }
107   for (i=0; i<hd->portLen && port != hd->port[i]; i++);
108   if (i == hd->portLen) {
109     fprintf(stderr,"GRAS: can't connect to %s:%d, no process listen on this port.\n",host,port);
110     return mismatch_error;
111   } 
112
113   if (hd->raw[i] && !raw) {
114     fprintf(stderr,"GRAS: can't connect to %s:%d in regular mode, the process listen in raw mode on this port.\n",host,port);
115     return mismatch_error;
116   }
117   if (!hd->raw[i] && raw) {
118     fprintf(stderr,"GRAS: can't connect to %s:%d in raw mode, the process listen in regular mode on this port.\n",host,port);
119     return mismatch_error;
120   }
121     
122
123   /* Create the socket */
124   if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
125       fprintf(stderr,"GRAS: openClientSocket: out of memory\n");
126       return malloc_error;
127   }    
128
129   (*sock)->server_sock  = 0;
130   (*sock)->raw_sock     = raw;
131   (*sock)->from_PID     = MSG_process_self_PID();
132   (*sock)->to_PID       = hd->proc[ hd->port2chan[i] ];
133   (*sock)->to_host      = peer;
134   (*sock)->to_port      = port;  
135   (*sock)->to_chan      = hd->port2chan[i];
136
137   /*
138   fprintf(stderr,"GRAS: %s (PID %d) connects in %s mode to %s:%d (to_PID=%d).\n",
139           MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
140           raw?"RAW":"regular",host,port,(*sock)->to_PID);
141   */
142   return no_error;
143 }
144
145 xbt_error_t _gras_sock_close(int raw, gras_sock_t *sd) {
146   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
147   int i;
148
149   xbt_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
150
151   if (!sd) return no_error;
152   if (raw && !sd->raw_sock) {
153       fprintf(stderr,"GRAS: gras_rawsock_close: Was passed a regular socket. Please use gras_sock_close()\n");
154   }
155   if (!raw && sd->raw_sock) {
156       fprintf(stderr,"GRAS: grasSockClose: Was passed a raw socket. Please use gras_rawsock_close()\n");
157   }
158   if (sd->server_sock) {
159     /* server mode socket. Un register it from 'OS' tables */
160     for (i=0; 
161          i<hd->portLen && sd->to_port != hd->port[i]; 
162          i++);
163
164     if (i==hd->portLen) {
165       fprintf(stderr,"GRAS: closeSocket: The host does not know this server socket.\n");
166     } else {
167       memmove(&(hd->port[i]),      &(hd->port[i+1]),      (hd->portLen -i -1) * sizeof(int));
168       memmove(&(hd->raw[i]),       &(hd->raw[i+1]),       (hd->portLen -i -1) * sizeof(int));
169       memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
170       hd->portLen--;
171     }
172   } 
173   free(sd);
174   return no_error;
175 }
176
177 /* **************************************************************************
178  * Creating/Using regular sockets
179  * **************************************************************************/
180 xbt_error_t
181 gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
182                      /* OUT */ gras_sock_t **sock) {
183   return _gras_sock_server_open(startingPort,endingPort,0,0,sock);
184 }
185
186 xbt_error_t
187 gras_sock_client_open(const char *host, short port,
188                      /* OUT */ gras_sock_t **sock) {
189   return _gras_sock_client_open(host,port,0,0,sock);
190 }
191
192 xbt_error_t gras_sock_close(gras_sock_t *sd) {
193   return _gras_sock_close(0,sd);
194 }
195
196 unsigned short
197 gras_sock_get_my_port(gras_sock_t *sd) {
198   if (!sd || !sd->server_sock) return -1;
199   return sd->to_port;
200 }
201
202 unsigned short
203 gras_sock_get_peer_port(gras_sock_t *sd) {
204   if (!sd || sd->server_sock) return -1;
205   return sd->to_port;
206 }
207
208 char *
209 gras_sock_get_peer_name(gras_sock_t *sd) {
210   m_process_t proc;
211
212   if (!sd) return NULL;
213   if ((proc=MSG_process_from_PID(sd->to_PID))) {
214     return (char*) MSG_host_get_name(MSG_process_get_host(proc));
215   } else {
216     fprintf(stderr,"GRAS: try to access hostname of unknown process %d\n",sd->to_PID);
217     return (char*) "(dead or unknown host)";
218   }
219 }
220 /* **************************************************************************
221  * Creating/Using raw sockets
222  * **************************************************************************/
223 xbt_error_t gras_rawsock_server_open(unsigned short startingPort, unsigned short endingPort,
224                                   unsigned int bufSize, gras_rawsock_t **sock) {
225   return _gras_sock_server_open(startingPort,endingPort,1,bufSize,(gras_sock_t**)sock);
226 }
227
228 xbt_error_t gras_rawsock_client_open(const char *host, short port, 
229                                   unsigned int bufSize, gras_rawsock_t **sock) {
230   return _gras_sock_client_open(host,port,1,bufSize,(gras_sock_t**)sock);
231 }
232
233 xbt_error_t gras_rawsock_close(gras_rawsock_t *sd) {
234   return _gras_sock_close(1,(gras_sock_t*)sd);
235 }
236
237 unsigned short
238 gras_rawsock_get_peer_port(gras_rawsock_t *sd) {
239   if (!sd || !sd->server_sock) return -1;
240   return sd->to_port;
241 }
242
243 xbt_error_t
244 gras_rawsock_send(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize) {
245   unsigned int bytesTotal;
246   static unsigned int count=0;
247   m_task_t task=NULL;
248   char name[256];
249
250   xbt_assert0(sock->raw_sock,"Asked to send raw data on a regular socket\n");
251
252   for(bytesTotal = 0;
253       bytesTotal < expSize;
254       bytesTotal += msgSize) {
255     
256     sprintf(name,"Raw data[%d]",count++);
257
258     task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
259     /*
260     fprintf(stderr, "%f:%s: gras_rawsock_send(%f %s -> %s) BEGIN\n",
261             gras_time(),
262             MSG_process_get_name(MSG_process_self()),
263             ((double)msgSize)/(1024.0*1024.0),
264             MSG_host_get_name( MSG_host_self()),
265             MSG_host_get_name( sock->to_host));
266     */
267     if (MSG_task_put(task, sock->to_host,sock->to_chan) != MSG_OK) {
268       fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
269       return unknown_error;
270     }
271     /*fprintf(stderr, "%f:%s: gras_rawsock_send(%f -> %s) END\n",
272             gras_time(),
273             MSG_process_get_name(MSG_process_self()),
274             ((double)msgSize)/(1024.0*1024.0),
275             MSG_host_get_name( sock->to_host));*/
276   }
277   return no_error;
278 }
279
280 xbt_error_t
281 gras_rawsock_recv(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize,
282                 unsigned int timeout) {
283   grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
284   unsigned int bytesTotal;
285   m_task_t task=NULL;
286   double startTime;
287
288   xbt_assert0(sock->raw_sock,"Asked to receive raw data on a regular socket\n");
289
290   startTime=gras_time();
291   for(bytesTotal = 0;
292       bytesTotal < expSize;
293       bytesTotal += msgSize) {
294     
295     task=NULL;
296     /*
297     fprintf(stderr, "%f:%s: gras_rawsock_recv() BEGIN\n",
298             gras_time(),
299             MSG_process_get_name(MSG_process_self()));
300     */
301     do {
302       if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) { 
303         if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
304           fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
305           return unknown_error;
306         }
307         if (MSG_task_destroy(task) != MSG_OK) {
308           fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
309           return unknown_error;
310         }
311         /*
312         fprintf(stderr, "%f:%s: gras_rawsock_recv() END\n",
313                 gras_time(),
314                 MSG_process_get_name(MSG_process_self()));
315         */
316         break;
317       } else { 
318         MSG_process_sleep(0.0001);
319       }
320     } while (gras_time() - startTime < timeout);
321
322     if (gras_time() - startTime > timeout)
323       return timeout_error;
324   }
325   return no_error;
326 }
327
328
329 /* **************************************************************************
330  * Actually exchanging messages
331  * **************************************************************************/
332
333 xbt_error_t
334 grasMsgRecv(gras_msg_t **msg,
335             double timeOut) {
336
337   double startTime=gras_time();
338   grasProcessData_t *pd=grasProcessDataGet();
339   m_task_t task=NULL;
340
341   do {
342     if (MSG_task_Iprobe((m_channel_t) pd->chan)) {
343       if (MSG_task_get(&task, (m_channel_t) pd->chan) != MSG_OK) {
344         fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
345         return unknown_error;
346       }
347       
348       *msg=(gras_msg_t*)MSG_task_get_data(task);
349       /*
350         { 
351         int i,j;
352         gras_msg_t *__dm_=*msg;
353         
354         fprintf(stderr, "grasMsgRecv(%s) = %d seq (",
355         __dm_->entry->name, __dm_->entry->seqCount );
356         
357         for (i=0; i<__dm_->entry->seqCount; i++) {
358         fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
359         for (j=0; j<__dm_->dataCount[i]; j++) { 
360         fprintf(stderr,"%p; ",__dm_->data[i]);
361         }
362         fprintf(stderr,"},");
363         }
364         fprintf(stderr, ")\n");
365         }
366       */
367
368       if (MSG_task_destroy(task) != MSG_OK) {
369         fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
370         return unknown_error;
371       }
372       return no_error;
373
374     } else {
375       MSG_process_sleep(1);
376     }
377   } while (gras_time()-startTime < timeOut || MSG_task_Iprobe((m_channel_t) pd->chan));
378   return timeout_error;
379 }
380
381 xbt_error_t
382 gras_msg_send(gras_sock_t *sd,
383             gras_msg_t *_msg,
384             e_xbt_free_directive_t freeDirective) {
385   
386   grasProcessData_t *pd=grasProcessDataGet();
387   m_task_t task;
388   static int count=0;
389   char name[256];
390   gras_msg_t *msg;
391   gras_sock_t *answer;
392
393   /* arg validity checks */
394   xbt_assert0(msg,"Trying to send NULL message");
395   xbt_assert0(sd ,"Trying to send over a NULL socket");
396   
397   if (!(answer=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
398     RAISE_MALLOC;
399   }
400   answer->server_sock=0;
401   answer->raw_sock   =0;
402   answer->from_PID   = sd->to_PID;
403   answer->to_PID     = MSG_process_self_PID();
404   answer->to_host    = MSG_host_self();
405   answer->to_port    = 0;
406   answer->to_chan    = pd->chan;
407     
408   sprintf(name,"%s[%d]",_msg->entry->name,count++);
409   /* if freeDirective == free_never, we have to build a copy of the message */
410   if (freeDirective == free_never) {
411     msg=gras_msg_copy(_msg);
412   } else {
413     msg=_msg;
414   }
415   msg->sock = answer;
416
417   /*
418   fprintf(stderr,"Send %s with answer(%p)=",msg->entry->name,msg->sock);
419   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
420           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
421           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
422   fprintf(stderr,"Send over %p=(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
423           sd,sd->server_sock,sd->raw_sock,sd->from_PID,sd->to_PID,sd->to_host,sd->to_port,sd->to_chan);
424   */
425
426   /*
427   { 
428     int i,j;
429     gras_msg_t *__dm_=msg;
430
431     fprintf(stderr, "gras_msg_send(%s) = %d seq (",
432             __dm_->entry->name, __dm_->entry->seqCount );
433
434     for (i=0; i<__dm_->entry->seqCount; i++) {
435       fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
436       for (j=0; j<__dm_->dataCount[i]; j++) { 
437         fprintf(stderr,"%p; ",__dm_->data[i]);
438       }
439       fprintf(stderr,"},");
440     }
441     fprintf(stderr, ")\n");
442   }
443   */
444
445   /* Send it */
446   task=MSG_task_create(name,0,((double)msg->header->dataSize)/(1024.0*1024.0),msg);
447   if (MSG_task_put(task, sd->to_host,sd->to_chan) != MSG_OK) {
448     fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
449     return unknown_error;
450   }
451   return no_error;
452 }
453
454 gras_sock_t *gras_sock_new(void) {
455   return malloc(sizeof(gras_sock_t));
456 }
457
458 void grasSockFree(gras_sock_t *s) {
459   if (s) free (s);
460 }
461
462 /* **************************************************************************
463  * Process data
464  * **************************************************************************/
465 grasProcessData_t *grasProcessDataGet() {
466   return (grasProcessData_t *)MSG_process_get_data(MSG_process_self());
467 }
468
469 /* **************************************************************************
470  * Wrappers over OS functions
471  * **************************************************************************/
472 double gras_time() {
473   return MSG_getClock();
474 }
475
476 void gras_sleep(unsigned long sec, unsigned long usec) {
477   MSG_process_sleep((double)sec + ((double)usec)/1000000);
478 }
479