Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
04dfd8ce40f869ed99938b869657bb4c6a53a76c
[simgrid.git] / src / gras / SG / gras_sg.c
1 /* $Id$ */
2
3 /* gras_rl - implementation of GRAS on top of the SimGrid simulator         */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <string.h>
12
13 #include "gras_sg.h"
14
15 XBT_LOG_DEFAULT_CATEGORY(GRAS);
16
17 /* **************************************************************************
18  * Openning/Maintaining/Closing connexions (private functions for both raw
19  * and regular sockets)
20  * **************************************************************************/
21 xbt_error_t
22 _gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
23                       int raw, unsigned int bufSize, /* OUT */ gras_sock_t **sock) {
24
25   gras_hostdata_t *hd=hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
26   grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
27   int port,i;
28   const char *host=MSG_host_get_name(MSG_host_self());
29
30   xbt_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
31   xbt_assert0(pd,"ProcessData=NULL !! Please run grasInit on each process\n");
32
33   for (port=startingPort ; port <= endingPort ; port++) {
34     for (i=0; i<hd->portLen && hd->port[i] != port; i++);
35     if (i<hd->portLen && hd->port[i] == port)
36       continue;
37
38     /* port not used so far. Do it */
39     if (i == hd->portLen) {
40       /* need to enlarge the tables */
41       if (hd->portLen++) {
42         hd->port2chan=(int*)realloc(hd->port2chan,hd->portLen*sizeof(int));
43         hd->port     =(int*)realloc(hd->port     ,hd->portLen*sizeof(int));
44         hd->raw      =(int*)realloc(hd->raw      ,hd->portLen*sizeof(int));
45       } else {
46         hd->port2chan=(int*)malloc(hd->portLen*sizeof(int));
47         hd->port     =(int*)malloc(hd->portLen*sizeof(int));
48         hd->raw      =(int*)malloc(hd->portLen*sizeof(int));
49       }
50       if (!hd->port2chan || !hd->port || !hd->raw) {
51         fprintf(stderr,"GRAS: PANIC: A malloc error did lose all ports attribution on this host\n");
52         hd->portLen = 0;
53         return malloc_error;
54       }
55     }
56     hd->port2chan[ i ]=raw ? pd->rawChan : pd->chan;
57     hd->port[ i ]=port;
58     hd->raw[ i ]=raw;
59
60     /* Create the socket */
61     if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
62       fprintf(stderr,"GRAS: openServerSocket: out of memory\n");
63       return malloc_error;
64     }    
65     
66     (*sock)->server_sock  = 1;
67     (*sock)->raw_sock     = raw;
68     (*sock)->from_PID     = -1;
69     (*sock)->to_PID       = MSG_process_self_PID();
70     (*sock)->to_host      = MSG_host_self();
71     (*sock)->to_port      = port;  
72     (*sock)->to_chan      = pd->chan;
73
74     /*
75     fprintf(stderr,"GRAS: '%s' (%d) ears on %s:%d%s (%p).\n",
76             MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
77             host,port,raw? " (mode RAW)":"",*sock);
78     */
79     return no_error;
80   }
81   /* if we go out of the previous for loop, that's that we didn't find any
82      suited port number */
83
84   fprintf(stderr,
85           "GRAS: can't find an empty port between %d and %d to open a socket on host %s\n.",
86           startingPort,endingPort,host);
87   return mismatch_error;
88 }
89
90 xbt_error_t
91 _gras_sock_client_open(const char *host, short port, int raw, unsigned int bufSize,
92                      /* OUT */ gras_sock_t **sock) {
93   m_host_t peer;
94   gras_hostdata_t *hd;
95   int i;
96
97   /* make sure this socket will reach someone */
98   if (!(peer=MSG_get_host_by_name(host))) {
99       fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host);
100       return mismatch_error;
101   }
102   if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) {
103       fprintf(stderr,"GRAS: can't connect to %s: no process on this host.\n",host);
104       return mismatch_error;
105   }
106   for (i=0; i<hd->portLen && port != hd->port[i]; i++);
107   if (i == hd->portLen) {
108     fprintf(stderr,"GRAS: can't connect to %s:%d, no process listen on this port.\n",host,port);
109     return mismatch_error;
110   } 
111
112   if (hd->raw[i] && !raw) {
113     fprintf(stderr,"GRAS: can't connect to %s:%d in regular mode, the process listen in raw mode on this port.\n",host,port);
114     return mismatch_error;
115   }
116   if (!hd->raw[i] && raw) {
117     fprintf(stderr,"GRAS: can't connect to %s:%d in raw mode, the process listen in regular mode on this port.\n",host,port);
118     return mismatch_error;
119   }
120     
121
122   /* Create the socket */
123   if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
124       fprintf(stderr,"GRAS: openClientSocket: out of memory\n");
125       return malloc_error;
126   }    
127
128   (*sock)->server_sock  = 0;
129   (*sock)->raw_sock     = raw;
130   (*sock)->from_PID     = MSG_process_self_PID();
131   (*sock)->to_PID       = hd->proc[ hd->port2chan[i] ];
132   (*sock)->to_host      = peer;
133   (*sock)->to_port      = port;  
134   (*sock)->to_chan      = hd->port2chan[i];
135
136   /*
137   fprintf(stderr,"GRAS: %s (PID %d) connects in %s mode to %s:%d (to_PID=%d).\n",
138           MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
139           raw?"RAW":"regular",host,port,(*sock)->to_PID);
140   */
141   return no_error;
142 }
143
144 xbt_error_t _gras_sock_close(int raw, gras_sock_t *sd) {
145   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
146   int i;
147
148   xbt_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
149
150   if (!sd) return no_error;
151   if (raw && !sd->raw_sock) {
152       fprintf(stderr,"GRAS: gras_rawsock_close: Was passed a regular socket. Please use gras_sock_close()\n");
153   }
154   if (!raw && sd->raw_sock) {
155       fprintf(stderr,"GRAS: grasSockClose: Was passed a raw socket. Please use gras_rawsock_close()\n");
156   }
157   if (sd->server_sock) {
158     /* server mode socket. Un register it from 'OS' tables */
159     for (i=0; 
160          i<hd->portLen && sd->to_port != hd->port[i]; 
161          i++);
162
163     if (i==hd->portLen) {
164       fprintf(stderr,"GRAS: closeSocket: The host does not know this server socket.\n");
165     } else {
166       memmove(&(hd->port[i]),      &(hd->port[i+1]),      (hd->portLen -i -1) * sizeof(int));
167       memmove(&(hd->raw[i]),       &(hd->raw[i+1]),       (hd->portLen -i -1) * sizeof(int));
168       memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
169       hd->portLen--;
170     }
171   } 
172   free(sd);
173   return no_error;
174 }
175
176 /* **************************************************************************
177  * Creating/Using regular sockets
178  * **************************************************************************/
179 xbt_error_t
180 gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
181                      /* OUT */ gras_sock_t **sock) {
182   return _gras_sock_server_open(startingPort,endingPort,0,0,sock);
183 }
184
185 xbt_error_t
186 gras_sock_client_open(const char *host, short port,
187                      /* OUT */ gras_sock_t **sock) {
188   return _gras_sock_client_open(host,port,0,0,sock);
189 }
190
191 xbt_error_t gras_sock_close(gras_sock_t *sd) {
192   return _gras_sock_close(0,sd);
193 }
194
195 unsigned short
196 gras_sock_get_my_port(gras_sock_t *sd) {
197   if (!sd || !sd->server_sock) return -1;
198   return sd->to_port;
199 }
200
201 unsigned short
202 gras_sock_get_peer_port(gras_sock_t *sd) {
203   if (!sd || sd->server_sock) return -1;
204   return sd->to_port;
205 }
206
207 char *
208 gras_sock_get_peer_name(gras_sock_t *sd) {
209   m_process_t proc;
210
211   if (!sd) return NULL;
212   if ((proc=MSG_process_from_PID(sd->to_PID))) {
213     return (char*) MSG_host_get_name(MSG_process_get_host(proc));
214   } else {
215     fprintf(stderr,"GRAS: try to access hostname of unknown process %d\n",sd->to_PID);
216     return (char*) "(dead or unknown host)";
217   }
218 }
219 /* **************************************************************************
220  * Creating/Using raw sockets
221  * **************************************************************************/
222 xbt_error_t gras_rawsock_server_open(unsigned short startingPort, unsigned short endingPort,
223                                   unsigned int bufSize, gras_rawsock_t **sock) {
224   return _gras_sock_server_open(startingPort,endingPort,1,bufSize,(gras_sock_t**)sock);
225 }
226
227 xbt_error_t gras_rawsock_client_open(const char *host, short port, 
228                                   unsigned int bufSize, gras_rawsock_t **sock) {
229   return _gras_sock_client_open(host,port,1,bufSize,(gras_sock_t**)sock);
230 }
231
232 xbt_error_t gras_rawsock_close(gras_rawsock_t *sd) {
233   return _gras_sock_close(1,(gras_sock_t*)sd);
234 }
235
236 unsigned short
237 gras_rawsock_get_peer_port(gras_rawsock_t *sd) {
238   if (!sd || !sd->server_sock) return -1;
239   return sd->to_port;
240 }
241
242 xbt_error_t
243 gras_rawsock_send(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize) {
244   unsigned int bytesTotal;
245   static unsigned int count=0;
246   m_task_t task=NULL;
247   char name[256];
248
249   xbt_assert0(sock->raw_sock,"Asked to send raw data on a regular socket\n");
250
251   for(bytesTotal = 0;
252       bytesTotal < expSize;
253       bytesTotal += msgSize) {
254     
255     sprintf(name,"Raw data[%d]",count++);
256
257     task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
258     /*
259     fprintf(stderr, "%f:%s: gras_rawsock_send(%f %s -> %s) BEGIN\n",
260             gras_time(),
261             MSG_process_get_name(MSG_process_self()),
262             ((double)msgSize)/(1024.0*1024.0),
263             MSG_host_get_name( MSG_host_self()),
264             MSG_host_get_name( sock->to_host));
265     */
266     if (MSG_task_put(task, sock->to_host,sock->to_chan) != MSG_OK) {
267       fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
268       return unknown_error;
269     }
270     /*fprintf(stderr, "%f:%s: gras_rawsock_send(%f -> %s) END\n",
271             gras_time(),
272             MSG_process_get_name(MSG_process_self()),
273             ((double)msgSize)/(1024.0*1024.0),
274             MSG_host_get_name( sock->to_host));*/
275   }
276   return no_error;
277 }
278
279 xbt_error_t
280 gras_rawsock_recv(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize,
281                 unsigned int timeout) {
282   grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
283   unsigned int bytesTotal;
284   m_task_t task=NULL;
285   double startTime;
286
287   xbt_assert0(sock->raw_sock,"Asked to receive raw data on a regular socket\n");
288
289   startTime=gras_time();
290   for(bytesTotal = 0;
291       bytesTotal < expSize;
292       bytesTotal += msgSize) {
293     
294     task=NULL;
295     /*
296     fprintf(stderr, "%f:%s: gras_rawsock_recv() BEGIN\n",
297             gras_time(),
298             MSG_process_get_name(MSG_process_self()));
299     */
300     do {
301       if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) { 
302         if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
303           fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
304           return unknown_error;
305         }
306         if (MSG_task_destroy(task) != MSG_OK) {
307           fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
308           return unknown_error;
309         }
310         /*
311         fprintf(stderr, "%f:%s: gras_rawsock_recv() END\n",
312                 gras_time(),
313                 MSG_process_get_name(MSG_process_self()));
314         */
315         break;
316       } else { 
317         MSG_process_sleep(0.0001);
318       }
319     } while (gras_time() - startTime < timeout);
320
321     if (gras_time() - startTime > timeout)
322       return timeout_error;
323   }
324   return no_error;
325 }
326
327
328 /* **************************************************************************
329  * Actually exchanging messages
330  * **************************************************************************/
331
332 xbt_error_t
333 grasMsgRecv(gras_msg_t **msg,
334             double timeOut) {
335
336   double startTime=gras_time();
337   grasProcessData_t *pd=grasProcessDataGet();
338   m_task_t task=NULL;
339
340   do {
341     if (MSG_task_Iprobe((m_channel_t) pd->chan)) {
342       if (MSG_task_get(&task, (m_channel_t) pd->chan) != MSG_OK) {
343         fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
344         return unknown_error;
345       }
346       
347       *msg=(gras_msg_t*)MSG_task_get_data(task);
348       /*
349         { 
350         int i,j;
351         gras_msg_t *__dm_=*msg;
352         
353         fprintf(stderr, "grasMsgRecv(%s) = %d seq (",
354         __dm_->entry->name, __dm_->entry->seqCount );
355         
356         for (i=0; i<__dm_->entry->seqCount; i++) {
357         fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
358         for (j=0; j<__dm_->dataCount[i]; j++) { 
359         fprintf(stderr,"%p; ",__dm_->data[i]);
360         }
361         fprintf(stderr,"},");
362         }
363         fprintf(stderr, ")\n");
364         }
365       */
366
367       if (MSG_task_destroy(task) != MSG_OK) {
368         fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
369         return unknown_error;
370       }
371       return no_error;
372
373     } else {
374       MSG_process_sleep(1);
375     }
376   } while (gras_time()-startTime < timeOut || MSG_task_Iprobe((m_channel_t) pd->chan));
377   return timeout_error;
378 }
379
380 xbt_error_t
381 gras_msg_send(gras_sock_t *sd,
382             gras_msg_t *_msg,
383             e_xbt_free_directive_t freeDirective) {
384   
385   grasProcessData_t *pd=grasProcessDataGet();
386   m_task_t task;
387   static int count=0;
388   char name[256];
389   gras_msg_t *msg;
390   gras_sock_t *answer;
391
392   /* arg validity checks */
393   xbt_assert0(msg,"Trying to send NULL message");
394   xbt_assert0(sd ,"Trying to send over a NULL socket");
395   
396   if (!(answer=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
397     RAISE_MALLOC;
398   }
399   answer->server_sock=0;
400   answer->raw_sock   =0;
401   answer->from_PID   = sd->to_PID;
402   answer->to_PID     = MSG_process_self_PID();
403   answer->to_host    = MSG_host_self();
404   answer->to_port    = 0;
405   answer->to_chan    = pd->chan;
406     
407   sprintf(name,"%s[%d]",_msg->entry->name,count++);
408   /* if freeDirective == free_never, we have to build a copy of the message */
409   if (freeDirective == free_never) {
410     msg=gras_msg_copy(_msg);
411   } else {
412     msg=_msg;
413   }
414   msg->sock = answer;
415
416   /*
417   fprintf(stderr,"Send %s with answer(%p)=",msg->entry->name,msg->sock);
418   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
419           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
420           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
421   fprintf(stderr,"Send over %p=(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
422           sd,sd->server_sock,sd->raw_sock,sd->from_PID,sd->to_PID,sd->to_host,sd->to_port,sd->to_chan);
423   */
424
425   /*
426   { 
427     int i,j;
428     gras_msg_t *__dm_=msg;
429
430     fprintf(stderr, "gras_msg_send(%s) = %d seq (",
431             __dm_->entry->name, __dm_->entry->seqCount );
432
433     for (i=0; i<__dm_->entry->seqCount; i++) {
434       fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
435       for (j=0; j<__dm_->dataCount[i]; j++) { 
436         fprintf(stderr,"%p; ",__dm_->data[i]);
437       }
438       fprintf(stderr,"},");
439     }
440     fprintf(stderr, ")\n");
441   }
442   */
443
444   /* Send it */
445   task=MSG_task_create(name,0,((double)msg->header->dataSize)/(1024.0*1024.0),msg);
446   if (MSG_task_put(task, sd->to_host,sd->to_chan) != MSG_OK) {
447     fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
448     return unknown_error;
449   }
450   return no_error;
451 }
452
453 gras_sock_t *gras_sock_new(void) {
454   return malloc(sizeof(gras_sock_t));
455 }
456
457 void grasSockFree(gras_sock_t *s) {
458   if (s) free (s);
459 }
460
461 /* **************************************************************************
462  * Process data
463  * **************************************************************************/
464 grasProcessData_t *grasProcessDataGet() {
465   return (grasProcessData_t *)MSG_process_get_data(MSG_process_self());
466 }
467
468 /* **************************************************************************
469  * Wrappers over OS functions
470  * **************************************************************************/
471 double gras_time() {
472   return MSG_getClock();
473 }
474
475 void gras_sleep(unsigned long sec, unsigned long usec) {
476   MSG_process_sleep((double)sec + ((double)usec)/1000000);
477 }
478