Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
4159c9c3a82ad5d9f227831d370f1f1995c0ae1d
[simgrid.git] / src / gras / SG / gras_sg.c
1 /* $Id$ */
2
3 /* gras_rl - implementation of GRAS on top of the SimGrid simulator         */
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2003 the OURAGAN project.                                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include <stdio.h>
12 #include <string.h>
13
14 #include "gras_sg.h"
15
16 GRAS_LOG_DEFAULT_CATEGORY(GRAS);
17
18 gras_error_t
19 gras_process_init() {
20   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
21   grasProcessData_t *pd;
22   int i;
23   
24   if (!(pd=(grasProcessData_t *)malloc(sizeof(grasProcessData_t)))) {
25     fprintf(stderr,"grasInit: out of memory\n");
26     return malloc_error;
27   }
28   pd->grasMsgQueueLen=0;
29   pd->grasMsgQueue = NULL;
30
31   pd->grasCblListLen = 0;
32   pd->grasCblList = NULL;
33
34   if (MSG_process_set_data(MSG_process_self(),(void*)pd) != MSG_OK) {
35     return unknown_error;
36   }
37
38   if (!hd) {
39     if (!(hd=(gras_hostdata_t *)malloc(sizeof(gras_hostdata_t)))) {
40       fprintf(stderr,"grasInit: out of memory\n");
41       return malloc_error;
42     }
43     hd->portLen = 0;
44     hd->port=NULL;
45     hd->port2chan=NULL;
46     for (i=0; i<MAX_CHANNEL; i++) {
47       hd->proc[i]=0;
48     }
49
50     if (MSG_host_set_data(MSG_host_self(),(void*)hd) != MSG_OK) {
51       return unknown_error;
52     }
53   }
54   
55   /* take a free channel for this process */
56   for (i=0; i<MAX_CHANNEL && hd->proc[i]; i++);
57   if (i == MAX_CHANNEL) {
58         fprintf(stderr,
59             "GRAS: Can't add a new process on %s, because all channel are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS\n.",
60             MSG_host_get_name(MSG_host_self()),MAX_CHANNEL);
61         return system_error;
62   }
63   pd->chan = i;
64   hd->proc[ i ] = MSG_process_self_PID();
65
66   /* take a free channel for this process */
67   for (i=0; i<MAX_CHANNEL && hd->proc[i]; i++);
68   if (i == MAX_CHANNEL) {
69         fprintf(stderr,
70             "GRAS: Can't add a new process on %s, because all channel are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS\n.",
71             MSG_host_get_name(MSG_host_self()),MAX_CHANNEL);
72         return system_error;
73   }
74   pd->rawChan = i;
75   hd->proc[ i ] = MSG_process_self_PID();
76
77   /*
78   fprintf(stderr,"GRAS: Creating process '%s' (%d)\n",
79           MSG_process_get_name(MSG_process_self()),MSG_process_self_PID());
80   */
81   return no_error;
82 }
83
84 gras_error_t
85 gras_process_finalize() {
86   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
87   grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
88   int myPID=MSG_process_self_PID();
89   int i;
90
91   gras_assert0(hd && pd,"Run gras_process_init!!\n");
92
93   
94   fprintf(stderr,"GRAS: Finalizing process '%s' (%d)\n",
95           MSG_process_get_name(MSG_process_self()),MSG_process_self_PID());
96   if (pd->grasMsgQueueLen) {
97     fprintf(stderr,"GRAS: Warning: process %d terminated, but some queued messages where not handled\n",MSG_process_self_PID());
98   }
99     
100   for (i=0; i< MAX_CHANNEL; i++)
101     if (myPID == hd->proc[i])
102       hd->proc[i] = 0;
103
104   for (i=0; i<hd->portLen; i++) {
105     if (hd->port2chan[ i ] == pd->chan) {
106       memmove(&(hd->port[i]),      &(hd->port[i+1]),      (hd->portLen -i -1) * sizeof(int));
107       memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
108       hd->portLen--;
109       i--; /* counter the effect of the i++ at the end of the iteration */
110     }
111   }
112
113   return no_error;
114 }
115 /* **************************************************************************
116  * Openning/Maintaining/Closing connexions (private functions for both raw
117  * and regular sockets)
118  * **************************************************************************/
119 gras_error_t
120 _gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
121                       int raw, unsigned int bufSize, /* OUT */ gras_sock_t **sock) {
122
123   gras_hostdata_t *hd=hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
124   grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
125   int port,i;
126   const char *host=MSG_host_get_name(MSG_host_self());
127
128   gras_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
129   gras_assert0(pd,"ProcessData=NULL !! Please run grasInit on each process\n");
130
131   for (port=startingPort ; port <= endingPort ; port++) {
132     for (i=0; i<hd->portLen && hd->port[i] != port; i++);
133     if (i<hd->portLen && hd->port[i] == port)
134       continue;
135
136     /* port not used so far. Do it */
137     if (i == hd->portLen) {
138       /* need to enlarge the tables */
139       if (hd->portLen++) {
140         hd->port2chan=(int*)realloc(hd->port2chan,hd->portLen*sizeof(int));
141         hd->port     =(int*)realloc(hd->port     ,hd->portLen*sizeof(int));
142         hd->raw      =(int*)realloc(hd->raw      ,hd->portLen*sizeof(int));
143       } else {
144         hd->port2chan=(int*)malloc(hd->portLen*sizeof(int));
145         hd->port     =(int*)malloc(hd->portLen*sizeof(int));
146         hd->raw      =(int*)malloc(hd->portLen*sizeof(int));
147       }
148       if (!hd->port2chan || !hd->port || !hd->raw) {
149         fprintf(stderr,"GRAS: PANIC: A malloc error did lose all ports attribution on this host\n");
150         hd->portLen = 0;
151         return malloc_error;
152       }
153     }
154     hd->port2chan[ i ]=raw ? pd->rawChan : pd->chan;
155     hd->port[ i ]=port;
156     hd->raw[ i ]=raw;
157
158     /* Create the socket */
159     if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
160       fprintf(stderr,"GRAS: openServerSocket: out of memory\n");
161       return malloc_error;
162     }    
163     
164     (*sock)->server_sock  = 1;
165     (*sock)->raw_sock     = raw;
166     (*sock)->from_PID     = -1;
167     (*sock)->to_PID       = MSG_process_self_PID();
168     (*sock)->to_host      = MSG_host_self();
169     (*sock)->to_port      = port;  
170     (*sock)->to_chan      = pd->chan;
171
172     /*
173     fprintf(stderr,"GRAS: '%s' (%d) ears on %s:%d%s (%p).\n",
174             MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
175             host,port,raw? " (mode RAW)":"",*sock);
176     */
177     return no_error;
178   }
179   /* if we go out of the previous for loop, that's that we didn't find any
180      suited port number */
181
182   fprintf(stderr,
183           "GRAS: can't find an empty port between %d and %d to open a socket on host %s\n.",
184           startingPort,endingPort,host);
185   return mismatch_error;
186 }
187
188 gras_error_t
189 _gras_sock_client_open(const char *host, short port, int raw, unsigned int bufSize,
190                      /* OUT */ gras_sock_t **sock) {
191   m_host_t peer;
192   gras_hostdata_t *hd;
193   int i;
194
195   /* make sure this socket will reach someone */
196   if (!(peer=MSG_get_host_by_name(host))) {
197       fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host);
198       return mismatch_error;
199   }
200   if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) {
201       fprintf(stderr,"GRAS: can't connect to %s: no process on this host.\n",host);
202       return mismatch_error;
203   }
204   for (i=0; i<hd->portLen && port != hd->port[i]; i++);
205   if (i == hd->portLen) {
206     fprintf(stderr,"GRAS: can't connect to %s:%d, no process listen on this port.\n",host,port);
207     return mismatch_error;
208   } 
209
210   if (hd->raw[i] && !raw) {
211     fprintf(stderr,"GRAS: can't connect to %s:%d in regular mode, the process listen in raw mode on this port.\n",host,port);
212     return mismatch_error;
213   }
214   if (!hd->raw[i] && raw) {
215     fprintf(stderr,"GRAS: can't connect to %s:%d in raw mode, the process listen in regular mode on this port.\n",host,port);
216     return mismatch_error;
217   }
218     
219
220   /* Create the socket */
221   if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
222       fprintf(stderr,"GRAS: openClientSocket: out of memory\n");
223       return malloc_error;
224   }    
225
226   (*sock)->server_sock  = 0;
227   (*sock)->raw_sock     = raw;
228   (*sock)->from_PID     = MSG_process_self_PID();
229   (*sock)->to_PID       = hd->proc[ hd->port2chan[i] ];
230   (*sock)->to_host      = peer;
231   (*sock)->to_port      = port;  
232   (*sock)->to_chan      = hd->port2chan[i];
233
234   /*
235   fprintf(stderr,"GRAS: %s (PID %d) connects in %s mode to %s:%d (to_PID=%d).\n",
236           MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
237           raw?"RAW":"regular",host,port,(*sock)->to_PID);
238   */
239   return no_error;
240 }
241
242 gras_error_t _gras_sock_close(int raw, gras_sock_t *sd) {
243   gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
244   int i;
245
246   gras_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
247
248   if (!sd) return no_error;
249   if (raw && !sd->raw_sock) {
250       fprintf(stderr,"GRAS: gras_rawsock_close: Was passed a regular socket. Please use gras_sock_close()\n");
251   }
252   if (!raw && sd->raw_sock) {
253       fprintf(stderr,"GRAS: grasSockClose: Was passed a raw socket. Please use gras_rawsock_close()\n");
254   }
255   if (sd->server_sock) {
256     /* server mode socket. Un register it from 'OS' tables */
257     for (i=0; 
258          i<hd->portLen && sd->to_port != hd->port[i]; 
259          i++);
260
261     if (i==hd->portLen) {
262       fprintf(stderr,"GRAS: closeSocket: The host does not know this server socket.\n");
263     } else {
264       memmove(&(hd->port[i]),      &(hd->port[i+1]),      (hd->portLen -i -1) * sizeof(int));
265       memmove(&(hd->raw[i]),       &(hd->raw[i+1]),       (hd->portLen -i -1) * sizeof(int));
266       memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
267       hd->portLen--;
268     }
269   } 
270   free(sd);
271   return no_error;
272 }
273
274 /* **************************************************************************
275  * Creating/Using regular sockets
276  * **************************************************************************/
277 gras_error_t
278 gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
279                      /* OUT */ gras_sock_t **sock) {
280   return _gras_sock_server_open(startingPort,endingPort,0,0,sock);
281 }
282
283 gras_error_t
284 gras_sock_client_open(const char *host, short port,
285                      /* OUT */ gras_sock_t **sock) {
286   return _gras_sock_client_open(host,port,0,0,sock);
287 }
288
289 gras_error_t gras_sock_close(gras_sock_t *sd) {
290   return _gras_sock_close(0,sd);
291 }
292
293 unsigned short
294 gras_sock_get_my_port(gras_sock_t *sd) {
295   if (!sd || !sd->server_sock) return -1;
296   return sd->to_port;
297 }
298
299 unsigned short
300 gras_sock_get_peer_port(gras_sock_t *sd) {
301   if (!sd || sd->server_sock) return -1;
302   return sd->to_port;
303 }
304
305 char *
306 gras_sock_get_peer_name(gras_sock_t *sd) {
307   m_process_t proc;
308
309   if (!sd) return NULL;
310   if ((proc=MSG_process_from_PID(sd->to_PID))) {
311     return (char*) MSG_host_get_name(MSG_process_get_host(proc));
312   } else {
313     fprintf(stderr,"GRAS: try to access hostname of unknown process %d\n",sd->to_PID);
314     return (char*) "(dead or unknown host)";
315   }
316 }
317 /* **************************************************************************
318  * Creating/Using raw sockets
319  * **************************************************************************/
320 gras_error_t gras_rawsock_server_open(unsigned short startingPort, unsigned short endingPort,
321                                   unsigned int bufSize, gras_rawsock_t **sock) {
322   return _gras_sock_server_open(startingPort,endingPort,1,bufSize,(gras_sock_t**)sock);
323 }
324
325 gras_error_t gras_rawsock_client_open(const char *host, short port, 
326                                   unsigned int bufSize, gras_rawsock_t **sock) {
327   return _gras_sock_client_open(host,port,1,bufSize,(gras_sock_t**)sock);
328 }
329
330 gras_error_t gras_rawsock_close(gras_rawsock_t *sd) {
331   return _gras_sock_close(1,(gras_sock_t*)sd);
332 }
333
334 unsigned short
335 gras_rawsock_get_peer_port(gras_rawsock_t *sd) {
336   if (!sd || !sd->server_sock) return -1;
337   return sd->to_port;
338 }
339
340 gras_error_t
341 gras_rawsock_send(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize) {
342   unsigned int bytesTotal;
343   static unsigned int count=0;
344   m_task_t task=NULL;
345   char name[256];
346
347   gras_assert0(sock->raw_sock,"Asked to send raw data on a regular socket\n");
348
349   for(bytesTotal = 0;
350       bytesTotal < expSize;
351       bytesTotal += msgSize) {
352     
353     sprintf(name,"Raw data[%d]",count++);
354
355     task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
356     /*
357     fprintf(stderr, "%f:%s: gras_rawsock_send(%f %s -> %s) BEGIN\n",
358             gras_time(),
359             MSG_process_get_name(MSG_process_self()),
360             ((double)msgSize)/(1024.0*1024.0),
361             MSG_host_get_name( MSG_host_self()),
362             MSG_host_get_name( sock->to_host));
363     */
364     if (MSG_task_put(task, sock->to_host,sock->to_chan) != MSG_OK) {
365       fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
366       return unknown_error;
367     }
368     /*fprintf(stderr, "%f:%s: gras_rawsock_send(%f -> %s) END\n",
369             gras_time(),
370             MSG_process_get_name(MSG_process_self()),
371             ((double)msgSize)/(1024.0*1024.0),
372             MSG_host_get_name( sock->to_host));*/
373   }
374   return no_error;
375 }
376
377 gras_error_t
378 gras_rawsock_recv(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize,
379                 unsigned int timeout) {
380   grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
381   unsigned int bytesTotal;
382   m_task_t task=NULL;
383   double startTime;
384
385   gras_assert0(sock->raw_sock,"Asked to receive raw data on a regular socket\n");
386
387   startTime=gras_time();
388   for(bytesTotal = 0;
389       bytesTotal < expSize;
390       bytesTotal += msgSize) {
391     
392     task=NULL;
393     /*
394     fprintf(stderr, "%f:%s: gras_rawsock_recv() BEGIN\n",
395             gras_time(),
396             MSG_process_get_name(MSG_process_self()));
397     */
398     do {
399       if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) { 
400         if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
401           fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
402           return unknown_error;
403         }
404         if (MSG_task_destroy(task) != MSG_OK) {
405           fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
406           return unknown_error;
407         }
408         /*
409         fprintf(stderr, "%f:%s: gras_rawsock_recv() END\n",
410                 gras_time(),
411                 MSG_process_get_name(MSG_process_self()));
412         */
413         break;
414       } else { 
415         MSG_process_sleep(0.0001);
416       }
417     } while (gras_time() - startTime < timeout);
418
419     if (gras_time() - startTime > timeout)
420       return timeout_error;
421   }
422   return no_error;
423 }
424
425
426 /* **************************************************************************
427  * Actually exchanging messages
428  * **************************************************************************/
429
430 gras_error_t
431 grasMsgRecv(gras_msg_t **msg,
432             double timeOut) {
433
434   double startTime=gras_time();
435   grasProcessData_t *pd=grasProcessDataGet();
436   m_task_t task=NULL;
437
438   do {
439     if (MSG_task_Iprobe((m_channel_t) pd->chan)) {
440       if (MSG_task_get(&task, (m_channel_t) pd->chan) != MSG_OK) {
441         fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
442         return unknown_error;
443       }
444       
445       *msg=(gras_msg_t*)MSG_task_get_data(task);
446       /*
447         { 
448         int i,j;
449         gras_msg_t *__dm_=*msg;
450         
451         fprintf(stderr, "grasMsgRecv(%s) = %d seq (",
452         __dm_->entry->name, __dm_->entry->seqCount );
453         
454         for (i=0; i<__dm_->entry->seqCount; i++) {
455         fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
456         for (j=0; j<__dm_->dataCount[i]; j++) { 
457         fprintf(stderr,"%p; ",__dm_->data[i]);
458         }
459         fprintf(stderr,"},");
460         }
461         fprintf(stderr, ")\n");
462         }
463       */
464
465       if (MSG_task_destroy(task) != MSG_OK) {
466         fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
467         return unknown_error;
468       }
469       return no_error;
470
471     } else {
472       MSG_process_sleep(1);
473     }
474   } while (gras_time()-startTime < timeOut || MSG_task_Iprobe((m_channel_t) pd->chan));
475   return timeout_error;
476 }
477
478 gras_error_t
479 gras_msg_send(gras_sock_t *sd,
480             gras_msg_t *_msg,
481             e_gras_free_directive_t freeDirective) {
482   
483   grasProcessData_t *pd=grasProcessDataGet();
484   m_task_t task;
485   static int count=0;
486   char name[256];
487   gras_msg_t *msg;
488   gras_sock_t *answer;
489
490   /* arg validity checks */
491   gras_assert0(msg,"Trying to send NULL message");
492   gras_assert0(sd ,"Trying to send over a NULL socket");
493   
494   if (!(answer=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
495     RAISE_MALLOC;
496   }
497   answer->server_sock=0;
498   answer->raw_sock   =0;
499   answer->from_PID   = sd->to_PID;
500   answer->to_PID     = MSG_process_self_PID();
501   answer->to_host    = MSG_host_self();
502   answer->to_port    = 0;
503   answer->to_chan    = pd->chan;
504     
505   sprintf(name,"%s[%d]",_msg->entry->name,count++);
506   /* if freeDirective == free_never, we have to build a copy of the message */
507   if (freeDirective == free_never) {
508     msg=gras_msg_copy(_msg);
509   } else {
510     msg=_msg;
511   }
512   msg->sock = answer;
513
514   /*
515   fprintf(stderr,"Send %s with answer(%p)=",msg->entry->name,msg->sock);
516   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
517           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
518           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
519   fprintf(stderr,"Send over %p=(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
520           sd,sd->server_sock,sd->raw_sock,sd->from_PID,sd->to_PID,sd->to_host,sd->to_port,sd->to_chan);
521   */
522
523   /*
524   { 
525     int i,j;
526     gras_msg_t *__dm_=msg;
527
528     fprintf(stderr, "gras_msg_send(%s) = %d seq (",
529             __dm_->entry->name, __dm_->entry->seqCount );
530
531     for (i=0; i<__dm_->entry->seqCount; i++) {
532       fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
533       for (j=0; j<__dm_->dataCount[i]; j++) { 
534         fprintf(stderr,"%p; ",__dm_->data[i]);
535       }
536       fprintf(stderr,"},");
537     }
538     fprintf(stderr, ")\n");
539   }
540   */
541
542   /* Send it */
543   task=MSG_task_create(name,0,((double)msg->header->dataSize)/(1024.0*1024.0),msg);
544   if (MSG_task_put(task, sd->to_host,sd->to_chan) != MSG_OK) {
545     fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
546     return unknown_error;
547   }
548   return no_error;
549 }
550
551 gras_sock_t *gras_sock_new(void) {
552   return malloc(sizeof(gras_sock_t));
553 }
554
555 void grasSockFree(gras_sock_t *s) {
556   if (s) free (s);
557 }
558
559 /* **************************************************************************
560  * Process data
561  * **************************************************************************/
562 grasProcessData_t *grasProcessDataGet() {
563   return (grasProcessData_t *)MSG_process_get_data(MSG_process_self());
564 }
565
566 /* **************************************************************************
567  * Wrappers over OS functions
568  * **************************************************************************/
569 double gras_time() {
570   return MSG_getClock();
571 }
572
573 void gras_sleep(unsigned long sec, unsigned long usec) {
574   MSG_process_sleep((double)sec + ((double)usec)/1000000);
575 }
576