Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Adapt to lastest changes in dict: Create dicts before use
[simgrid.git] / src / nws_portability / protocol.c
1 /* $Id$ */
2
3 #include "config_portability.h"
4
5 #include <unistd.h>       /* close() pipe() read() write() */
6 #include <signal.h>       /* close() pipe() read() write() */
7 #include <netinet/in.h>   /* sometimes required for #include <arpa/inet.h> */
8 #include <netinet/tcp.h>  /* TCP_NODELAY */
9 #ifdef HAVE_INTTYPES_H
10 #       include <inttypes.h>
11 #endif
12 #include <arpa/inet.h>    /* inet_ntoa() */
13 #include <netdb.h>        /* getprotobyname() */
14 #include <sys/time.h>     /* struct timeval */
15 #include <errno.h>        /* errno */
16 #include <sys/wait.h>     /* waitpid() */
17 #include <sys/socket.h>   /* getpeername() socket() */
18 #include <stdlib.h>
19 #ifdef WITH_LDAP
20 #include <lber.h>
21 #endif
22
23 #include "diagnostic.h"
24 #include "osutil.h"
25 #include "protocol.h"
26 #include "strutil.h"
27 #include "dnsutil.h"
28 #include "timeouts.h"
29
30
31 static void *lock = NULL;               /* local mutex */
32
33 /* Global variables: they need to be protected with locks when accessed */
34 #define MAX_NOTIFIES 40
35 static SocketFunction disconnectFunctions[MAX_NOTIFIES];
36 static fd_set connectedEars;
37 static fd_set connectedPipes;
38 static fd_set connectedSockets;
39 static fd_set inUse;
40
41 /* IncomingRequest requires some care in case we have thread. I
42  * don't want multiple IncomingRequest called at the same time:
43  * if that happens you may need to rewrite your code. Just to be
44  * sure this doesn't happen I use a cheat test-and-set relying
45  * upon the global lock. It may be easier to use semaphores, but
46  * given that we don't use them anywhere else ....
47  */
48 #ifdef HAVE_PTHREAD_H
49 static short running = 0;
50 #endif
51
52 /* This is used when it's time to call CloseDisconnections(): when
53  * receiving a SIGPIPE, this flag is set to 1 so that IncomingRequest,
54  * SendBytes and RecvBytes (the functions which operates on sockets) will
55  * call CloseDisconnections to pick up the disconnected socket */
56 static short needDisconnect = 0;
57
58 #ifdef WITH_THREAD
59 extern void LockMessageSystem();
60 extern void UnlockMessageSystem();
61 #else
62 #define LockMessageSystem()
63 #define UnlockMessageSystem()
64 #endif
65
66
67 /*
68  * Beginning of connection functions.
69  */
70
71
72 static int
73 TcpProtoNumber(void);
74
75
76 /*
77  * Remove #sock# from all maintained socket sets.
78  *
79  * It should be thread safe.
80  */
81 void
82 ClearSocket(Socket sock) {
83         /* operates on global variables */
84         GetNWSLock(&lock);
85         FD_CLR(sock, &connectedPipes);
86         FD_CLR(sock, &connectedSockets);
87         FD_CLR(sock, &connectedEars);
88         /* clear also the inUse state */
89         FD_CLR(sock, &inUse);
90         ReleaseNWSLock(&lock);
91 }
92
93
94 /* It should be thread safe */
95 int
96 ConditionSocket(Socket sd) {
97         int one = 1;
98
99         if(setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(int)) < 0) {
100                 WARN("ConditionSocket: keepalive option failed\n");
101         }
102
103         if(setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&one, sizeof(int)) < 0) {
104                 WARN("ConditionSocket: couldn't set NODELAY flag\n");
105         }
106
107         return (1);
108 }
109
110
111 /*
112  * Time-out signal handler for CallAddr().
113  */
114 void
115 ConnectTimeOut(int sig) {
116         WARN("Connect timed out\n");
117 }
118
119
120 /*
121  * Notifies all registered functions that #sock# has been closed.
122  *
123  * We should lock the call ...
124  */
125 static void
126 DoDisconnectNotification(Socket sock) {
127         int i;
128
129         for(i = 0; i < MAX_NOTIFIES; i++) {
130                 if(disconnectFunctions[i] != NULL) {
131                         disconnectFunctions[i](sock);
132                 }
133         }
134 }
135
136
137 /*
138  * Time-out signal handler for RecvBytes().
139  */
140 void
141 RecvTimeOut(int sig) {
142         WARN("Send/Receive timed out\n");
143 }
144
145
146 /*
147  * Returns the tcp protocol number from the network protocol data base.
148  * 
149  * getprotobyname() is not thread safe. We need to lock it.
150  */
151 static int
152 TcpProtoNumber(void) {
153         struct protoent *fetchedEntry;
154         static int returnValue = 0;
155
156         if(returnValue == 0) {
157                 GetNWSLock(&lock);
158                 fetchedEntry = getprotobyname("tcp");
159                 if(fetchedEntry != NULL) {
160                         returnValue = fetchedEntry->p_proto;
161                 }
162                 ReleaseNWSLock(&lock);
163         }
164
165         return returnValue;
166 }
167
168
169 /* thread safe */
170 int
171 CallAddr(IPAddress addr,
172          short port,
173          Socket *sock,
174          double timeOut) {
175
176         struct sockaddr_in server; /* remote host address */
177         Socket sd;
178         double start;
179         double ltimeout = 0;
180         void (*was)(int);
181         int tmp_errno, ret = 0;
182         char *peer;
183
184         memset((char *)&server, 0, sizeof(server));
185         server.sin_addr.s_addr = addr;
186         server.sin_family = AF_INET;
187         server.sin_port = htons((u_short)port);
188
189         sd = socket(AF_INET, SOCK_STREAM, 0);
190
191         if(sd < 0) {
192                 *sock = NO_SOCKET;
193                 ERROR("CallAddr: cannot create socket to server\n");
194                 return 0;
195         }
196
197         ConditionSocket(sd);
198
199         /* set the adaptive timeout or the user selected one */
200         if (timeOut >= 0) {
201                 ltimeout = timeOut;
202         } else {
203                 /* adaptive timeouts */
204                 ltimeout = GetTimeOut(CONN, addr, 0);
205         }
206         if (ltimeout > 0) {
207                 DDEBUG1("CallAddr: setting timer to %.2f\n", ltimeout);
208                 if (SignalAlarm(ConnectTimeOut, &was) == 0) {
209                         WARN("Failed to set the alarm signal! exiting\n");
210                         return 0;
211                 }
212                 SetRealTimer((unsigned int)ltimeout);
213         }
214
215         /* let's time it */
216         start = CurrentTime();
217   
218         if(connect(sd, (struct sockaddr *)&server, sizeof(server)) < 0) {
219                 GetNWSLock(&lock);
220                 /* save a copy or errno */
221                 tmp_errno = errno;
222                 ReleaseNWSLock(&lock);
223
224                 shutdown(sd, 2);
225                 close(sd);
226
227                 /* get how long it took to get it wrong */
228                 start = CurrentTime() - start;
229
230                 if(tmp_errno == EINTR) {
231                         WARN("CallAddr: connect timed out\n");
232                 } else {
233                         ERROR1("CallAddr: connect failed (errno=%d)\n", tmp_errno);
234                 }
235                 *sock = NO_SOCKET;
236         } else {
237                 /* get how long it took */
238                 start = CurrentTime() - start;
239
240                 *sock = sd;
241
242                 /* print log message */
243                 peer = IPAddressMachine_r(addr);
244                 LOG4("CallAddr: connected socket %d to %s:%d in %.2f seconds\n", 
245                      sd, peer, port, start);
246                 FREE(peer);
247
248                 GetNWSLock(&lock);
249                 FD_SET(sd, &connectedSockets);
250                 ReleaseNWSLock(&lock);
251         
252                 /* everything is cool */
253                 ret = 1;
254         }
255
256         if (timeOut != 0) {
257                 RESETREALTIMER;
258                 SignalAlarm(was, NULL);
259                 if (timeOut < 0) {
260                         /* adaptive timeouts */
261                         SetTimeOut(CONN, addr, start, 0, (ret==0));
262                 }
263         }
264
265         return ret;
266 }
267
268
269 /* it should be thread safe (we lock up access to connected*) */
270 void
271 CloseConnections(int closeEars,
272                  int closePipes,
273                  int closeSockets) {
274         Socket dead;
275         int i, tmp;
276
277         if(closeEars) {
278                 for(i = 0; i < FD_SETSIZE; i++) {
279                         GetNWSLock(&lock);
280                         tmp = FD_ISSET(i, &connectedEars);
281                         ReleaseNWSLock(&lock);
282                         if(tmp) {
283                                 dead = i;
284                                 DROP_SOCKET(&dead);
285                         }
286                 }
287         }
288         if(closePipes) {
289                 for(i = 0; i < FD_SETSIZE; i++) {
290                         GetNWSLock(&lock);
291                         tmp = FD_ISSET(i, &connectedPipes);
292                         ReleaseNWSLock(&lock);
293                         if(tmp) {
294                                 dead = i;
295                                 DROP_SOCKET(&dead);
296                         }
297                 }
298         }
299         if(closeSockets) {
300                 for(i = 0; i < FD_SETSIZE; i++) {
301                         GetNWSLock(&lock);
302                         tmp = FD_ISSET(i, &connectedSockets);
303                         ReleaseNWSLock(&lock);
304                         if(tmp) {
305                                 dead = i;
306                                 DROP_SOCKET(&dead);
307                         }
308                 }
309         }
310 }
311
312
313 /*
314  * Returns 1 or 0 depending on whether or not #sd# is a connected socket.
315  *
316  * it should be thread safe.
317  */
318 static int
319 IsConnected(Socket sd) {
320         struct sockaddr peer_name_buff;
321
322         SOCKLEN_T peer_name_buff_size = sizeof(peer_name_buff);
323         return(getpeername(sd, &peer_name_buff, &peer_name_buff_size) >= 0);
324 }
325
326
327 /* thread safe */
328 int
329 CloseDisconnections(void) {
330         Socket dead, i;
331         int returnValue = 0, tmp;
332
333         for(i = 0; i < FD_SETSIZE; i++) {
334                 GetNWSLock(&lock);
335                 tmp = FD_ISSET(i, &connectedSockets);
336                 ReleaseNWSLock(&lock);
337                 if(tmp && !IsConnected(i)) {
338                         dead = i;
339                         DROP_SOCKET(&dead);
340                         returnValue++;
341                 }
342         }
343
344         return(returnValue);
345 }
346
347
348 /* (cross finger): thread safe */
349 int
350 CloseSocket(Socket *sock,
351             int waitForPeer) {
352
353         fd_set readFDs;
354         Socket sd = *sock;
355         struct timeval timeout;
356         int tmp_errno;
357
358         DDEBUG1("CloseSocket: Closing connection %d\n", *sock);
359
360         if(*sock == NO_SOCKET) {
361                 return 1;  /* Already closed; nothing to do. */
362         }
363
364         if(waitForPeer > 0) {
365                 FD_ZERO(&readFDs);
366                 FD_SET(sd, &readFDs);
367                 timeout.tv_sec = waitForPeer;
368                 timeout.tv_usec = 0;
369
370                 if(select(FD_SETSIZE, &readFDs, NULL, NULL, &timeout) < 0) {
371                         ERROR2("CloseSocket: no response on select %d %d\n", sd, errno);
372                         return 0;
373                 }
374         }
375
376         GetNWSLock(&lock);
377         tmp_errno = FD_ISSET(sd, &connectedPipes);
378         ReleaseNWSLock(&lock);
379         if(!tmp_errno) {
380                 if(shutdown(sd, 2) < 0) {
381                         GetNWSLock(&lock);
382                         tmp_errno = errno;
383                         ReleaseNWSLock(&lock);
384
385                         /* The other side may have beaten us to the reset. */
386                         if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
387                                 WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
388                         }
389                 }
390         }
391
392         if(close(sd) < 0) {
393                 GetNWSLock(&lock);
394                 tmp_errno = errno;
395                 ReleaseNWSLock(&lock);
396
397                 WARN2("CloseSocket: close error %d (%s)\n", tmp_errno, strerror(tmp_errno));
398         }
399
400         ClearSocket(sd);
401         DoDisconnectNotification(sd);
402         *sock = NO_SOCKET;
403
404         return(1);
405 }
406
407
408 #define READ_END 0
409 #define WRITE_END 1
410
411 int
412 CreateLocalChild(pid_t *pid,
413                  Socket *parentToChild,
414                  Socket *childToParent) {
415
416   int childWrite[2];
417   int parentWrite[2];
418   int myEnd;
419
420   if(parentToChild != NULL) {
421     if(pipe(parentWrite) == -1) {
422       FAIL1("CreateLocalChild: couldn't get pipe, errno: %d\n", errno);
423     }
424   }
425   if(childToParent != NULL) {
426     if(pipe(childWrite) == -1) {
427       if(parentToChild != NULL) {
428         close(parentWrite[0]);
429         close(parentWrite[1]);
430       }
431       FAIL1("CreateLocalChild: couldn't get pipe, errno: %d\n", errno);
432     }
433   }
434
435   *pid = fork();
436
437   if(*pid == -1) {
438     if(parentToChild != NULL) {
439       close(parentWrite[0]);
440       close(parentWrite[1]);
441     }
442     if(childToParent != NULL) {
443       close(childWrite[0]);
444       close(childWrite[1]);
445     }
446     FAIL2("CreateLocalChild: couldn't fork, errno: %d (%s)\n",
447           errno, strerror(errno));
448   }
449
450   /* Close descriptors that this process won't be using. */
451   if(parentToChild != NULL) {
452     myEnd = (*pid == 0) ? READ_END : WRITE_END;
453     close(parentWrite[1 - myEnd]);
454     FD_SET(parentWrite[myEnd], &connectedPipes);
455     *parentToChild = parentWrite[myEnd];
456   }
457
458   if(childToParent != NULL) {
459     myEnd = (*pid == 0) ? WRITE_END : READ_END;
460     close(childWrite[1 - myEnd]);
461     FD_SET(childWrite[myEnd], &connectedPipes);
462     *childToParent = childWrite[myEnd];
463   }
464
465   return(1);
466
467 }
468
469
470 /* it should be thread safe (provided that setsockopt, bind and listen
471  * are thread safe) */
472 int
473 EstablishAnEar(unsigned short startingPort,
474                unsigned short endingPort,
475                Socket *ear,
476                unsigned short *earPort) {
477
478         int k32 = 32 * 1024;
479         int on = 1;
480         unsigned short port;
481         Socket sd = NO_SOCKET;
482         struct sockaddr_in server;
483
484         for(port = startingPort; port <= endingPort; port++) {
485                 server.sin_port = htons((u_short)port);
486                 server.sin_addr.s_addr = INADDR_ANY;
487                 server.sin_family = AF_INET;
488                 if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
489                         ERROR("EstablishAnEar: socket allocation failed\n");
490                         return 0;
491                 }
492                 (void)setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
493                 /* Set the socket buffer sizes to 32k, which just happens
494                  * to correspond to the most common option value for
495                  * tcpMessageMonitor activities.  This allows us to use a
496                  * client connection to conduct the experiment, rather
497                  * than needing to configure and open a new connection.
498                  * */
499                 (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&k32, sizeof(k32));
500                 (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&k32, sizeof(k32));
501                 if(bind(sd, (struct sockaddr *)&server, sizeof(server)) != -1 &&
502                                         listen(sd, 5) != -1) {
503                         break;
504                 }
505                 close(sd);
506         }
507
508         if(port > endingPort) {
509                 FAIL2("EstablishAnEar: couldn't find a port between %d and %d\n", startingPort, endingPort);
510         }
511
512         GetNWSLock(&lock);
513         FD_SET(sd, &connectedEars);
514         DDEBUG1("Openned an ear on sock %d\n",sd);
515         ReleaseNWSLock(&lock);
516
517         *ear = sd;
518         *earPort = port;
519
520         DDEBUG1("EstablistAnEar: connected socket %d\n", sd);
521
522         return(1);
523 }
524
525
526 /* thread safe ... hopefully. We #ifdef HAVE_THREAD_H for performance
527  * reasons when threads are not required. */
528 int
529 IncomingRequest(double timeOut,
530                 Socket *sd,
531                 int *ldap) {
532
533         Socket dead;
534         Socket i;
535         char lookahead;
536         Socket newSock;
537         double now;
538         struct sockaddr_in peer_in;
539         SOCKLEN_T peer_in_len = sizeof(peer_in);
540         fd_set readFds;
541         struct timeval tout;
542         double wakeup;
543         int tmp_errno, done = -1;
544
545         /* nextToService is used to make sure that every connection gets
546          * a chance to be serviced.  If we always started checking at 0,
547          * very active connections with small descriptor numbers could
548          * starve larger-descriptor connections.  */
549         /* static Socket nextToService = 0; */
550         /* Obi: I don't use the "static" approach since may be trouble
551          * with thread. Instead of locking I use the CurrentTime() to
552          * (hopefully) start from a different place each time */
553         Socket nextToService = ((int)CurrentTime() % FD_SETSIZE);
554
555         /* let's check we are the only one running here ... */
556 #ifdef HAVE_PTHREAD_H
557         GetNWSLock(&lock);
558         if (running != 0) {
559                 ReleaseNWSLock(&lock);
560                 ERROR("IncomingRequest: another instance is running!\n");
561                 return 0;
562         }
563         running = 1;
564         ReleaseNWSLock(&lock);
565 #endif
566
567         *sd = NO_SOCKET;
568         tout.tv_usec = 0;
569         wakeup = CurrentTime() + timeOut;
570
571         while (done == -1) {
572
573                 /* let's see if we need to check disconnected socket */
574                 if (needDisconnect == 1) {
575                         needDisconnect = 0;
576                         (void)CloseDisconnections();
577                 }
578
579                 /* is the timeout expired? */
580                 now = CurrentTime();
581                 if (now == -1 || now >= wakeup) {
582                         if (now == -1) {
583                                 WARN("IncomingRequest: time() failed.\n");
584                         }
585                         done = 0;       /* didn't find anything */
586                         break;          /* let's get out of here */
587                 }
588
589                 /* Construct in readFds the union of connected ears,
590                  * pipes, and sockets. */
591                 /* connected* are global variables and even though we
592                  * only read them we play safe and we lock */
593                 FD_ZERO(&readFds);
594                 GetNWSLock(&lock);
595                 for(i = 0; i < FD_SETSIZE; i++) {
596                         if (FD_ISSET(i, &connectedSockets) && !FD_ISSET(i, &inUse)) {
597                                 FD_SET(i, &readFds);
598                         }
599                         if(FD_ISSET(i, &connectedPipes)) {
600                                 FD_SET(i, &readFds);
601                         }
602                         if(FD_ISSET(i, &connectedEars)) {
603                                 FD_SET(i, &readFds);
604                         }
605                 }
606                 ReleaseNWSLock(&lock);
607
608                 /* set the timeout */
609                 tout.tv_sec = (unsigned int)wakeup - (unsigned int)now;
610                 tout.tv_usec = 0;
611
612                 tmp_errno = select(FD_SETSIZE, &readFds, NULL, NULL, &tout);
613                 if (tmp_errno == -1) {
614                         /* save the errno value */
615                         GetNWSLock(&lock);
616                         tmp_errno = errno;
617                         ReleaseNWSLock(&lock);
618
619                         /* EINTR we have to go ahead and retry: nothing
620                          * to do here */
621                         if(tmp_errno == EINTR) {
622                                 continue;
623                         } else if(tmp_errno == EINVAL) {
624                                 /* we blew it somehow -- (osf likely) */
625                                 /* can be because (from man page):
626                                 [EINVAL] The time limit specified by the
627                                 timeout parameter is invalid.  The nfds
628                                 parameter is less than 0, or greater than
629                                 or equal to FD_SETSIZE.  One of the
630                                 specified file descriptors refers to a
631                                 STREAM or multiplexer that is linked
632                                 (directly or indirectly) downstream from
633                                 a multiplexer.  */
634
635                                 ERROR4("IncomingRequest: invalid select - nfds: %d, rfds: %d, timeout: %d.%d", FD_SETSIZE, readFds, tout.tv_sec, tout.tv_usec);
636                         } else {
637                                 ERROR1("IncomingRequest: select error %d\n", tmp_errno);
638                         }
639                         done = 0;       /* didn't find anything */
640                         break;                   /* done here */
641                 } else if (tmp_errno == 0) {
642                         /* this was a timeout */
643                         continue;
644                 } 
645
646                 /* let's find out what socket is available and for what */
647
648                 /* we do an all around loop to check all sockets
649                  * starting from nextToService until we have one
650                  * to service */
651                 for (i = -1; i != nextToService; i=(i+1) % FD_SETSIZE) {
652                         if (i == -1) {
653                                 /* first time around */
654                                 i = nextToService;
655                         }
656
657                         if(!FD_ISSET(i, &readFds)) {
658                                 /* nothing to do here */
659                                 continue;
660                         }
661
662                         if(FD_ISSET(i, &connectedEars)) {
663                                 /* Add the new connection
664                                  * to connectedSockets. */
665                                 newSock = accept(i, (struct sockaddr *)&peer_in, &peer_in_len);
666                                 if(newSock == -1) {
667                                         SocketFailure(SIGPIPE);
668                                 } else {
669                                         char *peer;
670
671                                         ConditionSocket(newSock);
672                                         peer = PeerName_r(newSock);
673
674                                         DDEBUG2("IncomingRequest: connected socket %d to %s\n", newSock, peer);
675                                         FREE(peer);
676
677                                         /* operating on a global variable */
678                                         GetNWSLock(&lock);
679                                         FD_SET(newSock, &connectedSockets);
680                                         ReleaseNWSLock(&lock);
681                                 }
682                         } else if(FD_ISSET(i, &connectedPipes)) {
683                                 /* we found a good one */
684                                 *sd = i;
685                                 *ldap = 0;
686                                 done = 1;
687                                 break;
688                         } else {
689                                 /* Existing socket connection. */
690                                 if(recv(i, &lookahead, 1, MSG_PEEK) > 0) {
691                                         *sd = i;
692 #ifdef WITH_LDAP
693                                         *ldap = ((int) lookahead == LBER_SEQUENCE);
694 #else
695                                         *ldap = 0;
696 #endif
697 #ifdef HAVE_PTHREAD_H
698                                         /* the socket is in use:
699                                          * client needs to call
700                                          * SocketIsAvailable to
701                                          * free it */
702                                         /* it only makes sense in
703                                          * a threaded environment */
704                                         GetNWSLock(&lock);
705                                         FD_SET(i, &inUse);
706                                         ReleaseNWSLock(&lock);
707 #endif
708                                         done = 1;
709                                         break;
710                                 } else {
711                                         /* This is how we find
712                                          * out about connections
713                                          * closed by a peer.
714                                          * Drop it from our list
715                                          * of known connections.
716                                          */
717                                         DDEBUG1("IncomingRequest: Dropping closed connection %d\n", i);
718
719                                         dead = i;
720                                         DROP_SOCKET(&dead);
721                                 }
722                         }
723                 }
724         }
725
726         /* done */
727 #ifdef HAVE_PTHREAD_H
728         GetNWSLock(&lock);
729         running = 0;
730         ReleaseNWSLock(&lock);
731 #endif
732
733         return done;
734 }
735
736
737 /* thread safe */
738 int 
739 SocketIsAvailable(Socket sd) {
740         int ret = 1;
741
742         /* sanity check */
743         if (sd < 0) {
744                 WARN("SocketIsAvailable: socket is negative\n");
745                 return 0;
746         }
747
748         /* check if the socket is in connectedSockets and is in use */
749         GetNWSLock(&lock);
750         FD_CLR(sd, &inUse);
751         ReleaseNWSLock(&lock);
752
753         return ret;
754 }
755
756 /* thread safe */
757 int
758 SocketInUse(Socket sd) {
759
760         /* sanity check */
761         if (sd < 0) {
762                 WARN("SocketInUse: socket is negative\n");
763                 return 0;
764         }
765
766         GetNWSLock(&lock);
767         FD_SET(sd, &inUse);
768         ReleaseNWSLock(&lock);
769
770         return 1;
771 }
772
773 /* thread safe */
774 int
775 IsOkay(Socket sd) {
776         fd_set readFds;
777         fd_set writeFds;
778         struct timeval timeout;
779
780         if(sd < 0) {
781                 return 0;
782         }
783
784         FD_ZERO(&readFds);
785         FD_ZERO(&writeFds);
786         FD_SET(sd, &readFds);
787         FD_SET(sd, &writeFds);
788         timeout.tv_sec  = GetTimeOut(SEND, Peer(sd), 1);
789         timeout.tv_usec = 0;
790
791         return(select(FD_SETSIZE, NULL, &writeFds, NULL, &timeout) == 1);
792
793 }
794
795
796 /* thread safe */
797 void
798 NotifyOnDisconnection(SocketFunction notifyFn) {
799         int i;
800
801         /* operating on global variables */
802         GetNWSLock(&lock);
803         for(i = 0; i < MAX_NOTIFIES; i++) {
804                 if(disconnectFunctions[i] == NULL) {
805                         disconnectFunctions[i] = notifyFn;
806                         break;
807                 }
808         }
809         ReleaseNWSLock(&lock);
810 }
811
812
813 #define MAXPASSES 40
814 static pid_t passedPids[MAXPASSES];
815 static Socket passedSockets[MAXPASSES];
816
817
818 int
819 PassSocket(Socket *sock,
820            pid_t child) {
821
822   int i, childStat;
823
824   /* Clean up any sockets previously passed to children who have exited. */
825   for(i = 0; i < MAXPASSES; i++) {
826     if(passedPids[i] != 0) {
827       if((waitpid(passedPids[i], &childStat, WNOHANG) < 0) ||
828          WIFEXITED(childStat)) {
829         LOG1("PassSocket: Reclaiming connection %d\n", passedSockets[i]);
830         (void)shutdown(passedSockets[i], 2);
831         (void)close(passedSockets[i]);
832         DoDisconnectNotification(passedSockets[i]);
833         passedPids[i] = 0;
834       }
835     }
836   }
837
838   /* Record this socket in passedSockets and remove all other memory of it. */
839   for(i = 0; i < MAXPASSES; i++) {
840     if(passedPids[i] == 0) {
841       LOG2("PassSocket: Passing connection %d to %d\n", *sock, child);
842       passedPids[i] = child;
843       passedSockets[i] = *sock;
844       ClearSocket(*sock);
845       *sock = NO_SOCKET;
846       return(1);
847     }
848   }
849
850   return(0);
851
852 }
853
854
855 /* here to be used by dnsutil.c (GetPeerName) */
856 int
857 IsPipe(Socket sd) {
858         int ret = 0;
859
860         GetNWSLock(&lock);
861         if (FD_ISSET(sd, &connectedPipes)) {
862                 ret = 1;
863         }
864         ReleaseNWSLock(&lock);
865
866         return ret;
867 }
868         
869 int
870 RecvBytes(Socket sd,
871           void *bytes,
872           size_t byteSize,
873           double timeOut) {
874
875         double start = 0, myTimeOut = 0;
876         int isPipe, tmp, done=1;
877         char *nextByte;
878         fd_set readFds;
879         int recvd = 0, totalRecvd=0;
880         struct timeval tout, *tv;
881         void (*was)(int);
882
883         /* let's see if we need to check disconnected socket */
884         if (needDisconnect == 1) {
885                 needDisconnect = 0;
886                 (void)CloseDisconnections();
887         }
888
889         /* sanity check */
890         if (sd < 0 || bytes == NULL) {
891                 WARN("RecvBytes: parameters out of range!\n");
892                 return 0;
893         }
894
895         /* connectedPipes is global */
896         GetNWSLock(&lock);
897         isPipe = FD_ISSET(sd, &connectedPipes);
898         ReleaseNWSLock(&lock);
899
900         FD_ZERO(&readFds);
901         FD_SET(sd, &readFds);
902
903         /* select the adaptive timeouts or the passed in one. */
904         if (timeOut > 0) {
905                 myTimeOut = (int)timeOut;
906                 if (SignalAlarm(RecvTimeOut, &was) == 0) {
907                         WARN("Failed to set the alarm signal! Exiting\n");
908                         exit(1);
909                 }
910                 /* let's start the clock */
911                 start = CurrentTime();
912         }
913
914         for(nextByte=(char*)bytes; totalRecvd < byteSize; totalRecvd += recvd) {
915                 recvd = 0;
916                 UnlockMessageSystem();
917
918                 /* set the timeout if requested by the user */
919                 if (timeOut > 0) {
920                         myTimeOut = timeOut - (CurrentTime() - start);
921                         if (myTimeOut < 0) {
922                                 done = 0;
923                                 break;
924                         }
925                         tout.tv_usec = 0;
926                         tout.tv_sec = (int) myTimeOut;
927                         tv = &tout;
928                 } else {
929                         /* 0 is the special flag for don't use touts */
930                         tv = NULL;
931                 }
932
933                 tmp =  select(FD_SETSIZE, &readFds, NULL, NULL, tv);
934                 if (tmp == -1) {
935                         LockMessageSystem();
936
937                         /* just in case another call modify errno */
938                         GetNWSLock(&lock);
939                         tmp = errno;
940                         ReleaseNWSLock(&lock);
941
942                         /* if interrupted, let's try again */
943                         if(tmp == EINTR) {
944                                 continue;
945                         }
946
947                         ERROR2("RecvBytes: select on %d failed (%s)\n", sd, strerror(tmp));
948                         done = 0;
949                         break;
950                 } else if (tmp == 0) {
951                         LockMessageSystem();
952
953                         /* timed out */
954                         ERROR1("RecvBytes: Socket %d timed out\n", sd);
955
956                         done = 0;
957                         break;
958                 }
959
960                 /* let's read the data */
961                 if (timeOut > 0) {
962                         myTimeOut = timeOut - (CurrentTime() - start);
963                         /* should always be > 0 since we didn't
964                          * timed out on select */
965                         if (myTimeOut > 0) {
966                                 SetRealTimer((unsigned int)myTimeOut);
967                         } else {
968                                 ERROR1("RecvBytes: trying to set negative timeout on socket %d\n", sd);
969                                 done = 0;
970                                 break;
971                         }
972                 }
973
974                 if (isPipe) {
975                         /* pipe */
976                         recvd = read(sd, nextByte, byteSize - totalRecvd);
977                 } else {
978                         /* socket */
979                         recvd = recv(sd, nextByte, byteSize - totalRecvd, 0);
980                 }
981                 /* just in case another call modify errno */
982                 GetNWSLock(&lock);
983                 tmp = errno;
984                 ReleaseNWSLock(&lock);
985
986                 if (timeOut > 0) {
987                         RESETREALTIMER;
988                 }
989                 LockMessageSystem();
990
991                 if(recvd <= 0) {
992                         WARN2("RecvBytes: read failed errno:%d, %s\n", tmp, strerror(tmp));
993                         ERROR3("RecvBytes: socket %d failed after %d of %d bytes\n", sd, totalRecvd, byteSize);
994                         done = 0;
995                         break;
996                 }
997                 nextByte += recvd;
998         }
999
1000         /* resetting the sigalarm */
1001         if (timeOut > 0) {
1002                 SignalAlarm(was, NULL);
1003         }
1004         
1005         return totalRecvd;
1006 }
1007
1008
1009 /* it should be thread safe. */
1010 int
1011 SendBytes(      Socket sd,
1012                 const void *bytes,
1013                 size_t byteSize,
1014                 double timeOut) {
1015
1016         char *nextByte;
1017         int sent =0, totalSent = 0;
1018         int isPipe, tmp, done = 1;
1019         struct timeval tout, *tv;
1020         double start, myTimeOut = 0;
1021         void (*was)(int);
1022
1023
1024         /* let's see if we need to check disconnected socket */
1025         if (needDisconnect == 1) {
1026                 needDisconnect = 0;
1027                 (void)CloseDisconnections();
1028         }
1029
1030         /* connectedPipes is global */
1031         GetNWSLock(&lock);
1032         isPipe = FD_ISSET(sd, &connectedPipes);
1033         ReleaseNWSLock(&lock);
1034
1035         /* select the adaptive timeouts or the passed in one. */
1036         if (timeOut > 0) {
1037                 myTimeOut = (double) timeOut;
1038                 if (SignalAlarm(RecvTimeOut, &was) == 0) {
1039                         ERROR("Failed to set the alarm signal! Exiting\n");
1040                         exit(1);
1041                 }
1042         }
1043
1044         /* let's start the timer */
1045         start = CurrentTime();
1046
1047         for(nextByte = (char*)bytes; totalSent < byteSize; totalSent += sent) {
1048                 UnlockMessageSystem();
1049
1050                 /* set the timeout, and if we timed out get out */
1051                 if (timeOut > 0) {
1052                         /* 0 is the special flag for don't use touts */
1053                         myTimeOut = timeOut - (CurrentTime() - start);
1054                         if (myTimeOut < 0) {
1055                                 done = 0;
1056                                 break;
1057                         } 
1058                         tout.tv_usec = 0;
1059                         tout.tv_sec = (int) myTimeOut;
1060                         tv = &tout;
1061                         SetRealTimer((unsigned int)myTimeOut);
1062                 } 
1063
1064                 if (isPipe) {
1065                         /* pipe */
1066                         sent = write(sd, nextByte, byteSize - totalSent);
1067                 } else {
1068                         /* socket */
1069                         sent = send(sd, nextByte, byteSize - totalSent, 0);
1070                 }
1071
1072                 /* errno could be modified by another thread */
1073                 GetNWSLock(&lock);
1074                 tmp = errno;
1075                 ReleaseNWSLock(&lock);
1076
1077                 if (timeOut > 0) {
1078                         RESETREALTIMER;
1079                 }
1080
1081                 LockMessageSystem();
1082                 if(sent <= 0) {
1083                         ERROR3("SendBytes: send on socket %d failed (errno=%d %s)\n", sd, tmp, strerror(tmp));
1084                         done = 0;
1085                         break;
1086                 }
1087                 nextByte += sent;
1088         }
1089
1090         /* reset sigalalrm */
1091         if (timeOut > 0) {
1092                 SignalAlarm(was, NULL);
1093         }
1094
1095         return done;
1096 }
1097
1098
1099 void
1100 SocketFailure(int sig) {
1101         HoldSignal(SIGPIPE);
1102         needDisconnect = 1;
1103         if(signal(SIGPIPE, SocketFailure) == SIG_ERR) {
1104                 WARN("SocketFailure: error resetting signal\n");
1105         }
1106         ReleaseSignal(SIGPIPE);
1107 }