3 #include "config_portability.h"
5 #include <unistd.h> /* close() pipe() read() write() */
6 #include <signal.h> /* close() pipe() read() write() */
7 #include <netinet/in.h> /* sometimes required for #include <arpa/inet.h> */
8 #include <netinet/tcp.h> /* TCP_NODELAY */
10 # include <inttypes.h>
12 #include <arpa/inet.h> /* inet_ntoa() */
13 #include <netdb.h> /* getprotobyname() */
14 #include <sys/time.h> /* struct timeval */
15 #include <errno.h> /* errno */
16 #include <sys/wait.h> /* waitpid() */
17 #include <sys/socket.h> /* getpeername() socket() */
23 #include "diagnostic.h"
31 static void *lock = NULL; /* local mutex */
33 /* Global variables: they need to be protected with locks when accessed */
34 #define MAX_NOTIFIES 40
35 static SocketFunction disconnectFunctions[MAX_NOTIFIES];
36 static fd_set connectedEars;
37 static fd_set connectedPipes;
38 static fd_set connectedSockets;
41 /* IncomingRequest requires some care in case we have thread. I
42 * don't want multiple IncomingRequest called at the same time:
43 * if that happens you may need to rewrite your code. Just to be
44 * sure this doesn't happen I use a cheat test-and-set relying
45 * upon the global lock. It may be easier to use semaphores, but
46 * given that we don't use them anywhere else ....
49 static short running = 0;
52 /* This is used when it's time to call CloseDisconnections(): when
53 * receiving a SIGPIPE, this flag is set to 1 so that IncomingRequest,
54 * SendBytes and RecvBytes (the functions which operates on sockets) will
55 * call CloseDisconnections to pick up the disconnected socket */
56 static short needDisconnect = 0;
59 extern void LockMessageSystem();
60 extern void UnlockMessageSystem();
62 #define LockMessageSystem()
63 #define UnlockMessageSystem()
68 * Beginning of connection functions.
77 * Remove #sock# from all maintained socket sets.
79 * It should be thread safe.
82 ClearSocket(Socket sock) {
83 /* operates on global variables */
85 FD_CLR(sock, &connectedPipes);
86 FD_CLR(sock, &connectedSockets);
87 FD_CLR(sock, &connectedEars);
88 /* clear also the inUse state */
90 ReleaseNWSLock(&lock);
94 /* It should be thread safe */
96 ConditionSocket(Socket sd) {
99 if(setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(int)) < 0) {
100 WARN("ConditionSocket: keepalive option failed\n");
103 if(setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&one, sizeof(int)) < 0) {
104 WARN("ConditionSocket: couldn't set NODELAY flag\n");
112 * Time-out signal handler for CallAddr().
115 ConnectTimeOut(int sig) {
116 WARN("Connect timed out\n");
121 * Notifies all registered functions that #sock# has been closed.
123 * We should lock the call ...
126 DoDisconnectNotification(Socket sock) {
129 for(i = 0; i < MAX_NOTIFIES; i++) {
130 if(disconnectFunctions[i] != NULL) {
131 disconnectFunctions[i](sock);
138 * Time-out signal handler for RecvBytes().
141 RecvTimeOut(int sig) {
142 WARN("Send/Receive timed out\n");
147 * Returns the tcp protocol number from the network protocol data base.
149 * getprotobyname() is not thread safe. We need to lock it.
152 TcpProtoNumber(void) {
153 struct protoent *fetchedEntry;
154 static int returnValue = 0;
156 if(returnValue == 0) {
158 fetchedEntry = getprotobyname("tcp");
159 if(fetchedEntry != NULL) {
160 returnValue = fetchedEntry->p_proto;
162 ReleaseNWSLock(&lock);
171 CallAddr(IPAddress addr,
176 struct sockaddr_in server; /* remote host address */
181 int tmp_errno, ret = 0;
184 memset((char *)&server, 0, sizeof(server));
185 server.sin_addr.s_addr = addr;
186 server.sin_family = AF_INET;
187 server.sin_port = htons((u_short)port);
189 sd = socket(AF_INET, SOCK_STREAM, 0);
193 ERROR("CallAddr: cannot create socket to server\n");
199 /* set the adaptive timeout or the user selected one */
203 /* adaptive timeouts */
204 ltimeout = GetTimeOut(CONN, addr, 0);
207 DDEBUG1("CallAddr: setting timer to %.2f\n", ltimeout);
208 if (SignalAlarm(ConnectTimeOut, &was) == 0) {
209 WARN("Failed to set the alarm signal! exiting\n");
212 SetRealTimer((unsigned int)ltimeout);
216 start = CurrentTime();
218 if(connect(sd, (struct sockaddr *)&server, sizeof(server)) < 0) {
220 /* save a copy or errno */
222 ReleaseNWSLock(&lock);
227 /* get how long it took to get it wrong */
228 start = CurrentTime() - start;
230 if(tmp_errno == EINTR) {
231 WARN("CallAddr: connect timed out\n");
233 ERROR1("CallAddr: connect failed (errno=%d)\n", tmp_errno);
237 /* get how long it took */
238 start = CurrentTime() - start;
242 /* print log message */
243 peer = IPAddressMachine_r(addr);
244 LOG4("CallAddr: connected socket %d to %s:%d in %.2f seconds\n",
245 sd, peer, port, start);
249 FD_SET(sd, &connectedSockets);
250 ReleaseNWSLock(&lock);
252 /* everything is cool */
258 SignalAlarm(was, NULL);
260 /* adaptive timeouts */
261 SetTimeOut(CONN, addr, start, 0, (ret==0));
269 /* it should be thread safe (we lock up access to connected*) */
271 CloseConnections(int closeEars,
278 for(i = 0; i < FD_SETSIZE; i++) {
280 tmp = FD_ISSET(i, &connectedEars);
281 ReleaseNWSLock(&lock);
289 for(i = 0; i < FD_SETSIZE; i++) {
291 tmp = FD_ISSET(i, &connectedPipes);
292 ReleaseNWSLock(&lock);
300 for(i = 0; i < FD_SETSIZE; i++) {
302 tmp = FD_ISSET(i, &connectedSockets);
303 ReleaseNWSLock(&lock);
314 * Returns 1 or 0 depending on whether or not #sd# is a connected socket.
316 * it should be thread safe.
319 IsConnected(Socket sd) {
320 struct sockaddr peer_name_buff;
322 SOCKLEN_T peer_name_buff_size = sizeof(peer_name_buff);
323 return(getpeername(sd, &peer_name_buff, &peer_name_buff_size) >= 0);
329 CloseDisconnections(void) {
331 int returnValue = 0, tmp;
333 for(i = 0; i < FD_SETSIZE; i++) {
335 tmp = FD_ISSET(i, &connectedSockets);
336 ReleaseNWSLock(&lock);
337 if(tmp && !IsConnected(i)) {
348 /* (cross finger): thread safe */
350 CloseSocket(Socket *sock,
355 struct timeval timeout;
358 DDEBUG1("CloseSocket: Closing connection %d\n", *sock);
360 if(*sock == NO_SOCKET) {
361 return 1; /* Already closed; nothing to do. */
364 if(waitForPeer > 0) {
366 FD_SET(sd, &readFDs);
367 timeout.tv_sec = waitForPeer;
370 if(select(FD_SETSIZE, &readFDs, NULL, NULL, &timeout) < 0) {
371 ERROR2("CloseSocket: no response on select %d %d\n", sd, errno);
377 tmp_errno = FD_ISSET(sd, &connectedPipes);
378 ReleaseNWSLock(&lock);
380 if(shutdown(sd, 2) < 0) {
383 ReleaseNWSLock(&lock);
385 /* The other side may have beaten us to the reset. */
386 if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
387 WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
395 ReleaseNWSLock(&lock);
397 WARN2("CloseSocket: close error %d (%s)\n", tmp_errno, strerror(tmp_errno));
401 DoDisconnectNotification(sd);
412 CreateLocalChild(pid_t *pid,
413 Socket *parentToChild,
414 Socket *childToParent) {
420 if(parentToChild != NULL) {
421 if(pipe(parentWrite) == -1) {
422 FAIL1("CreateLocalChild: couldn't get pipe, errno: %d\n", errno);
425 if(childToParent != NULL) {
426 if(pipe(childWrite) == -1) {
427 if(parentToChild != NULL) {
428 close(parentWrite[0]);
429 close(parentWrite[1]);
431 FAIL1("CreateLocalChild: couldn't get pipe, errno: %d\n", errno);
438 if(parentToChild != NULL) {
439 close(parentWrite[0]);
440 close(parentWrite[1]);
442 if(childToParent != NULL) {
443 close(childWrite[0]);
444 close(childWrite[1]);
446 FAIL2("CreateLocalChild: couldn't fork, errno: %d (%s)\n",
447 errno, strerror(errno));
450 /* Close descriptors that this process won't be using. */
451 if(parentToChild != NULL) {
452 myEnd = (*pid == 0) ? READ_END : WRITE_END;
453 close(parentWrite[1 - myEnd]);
454 FD_SET(parentWrite[myEnd], &connectedPipes);
455 *parentToChild = parentWrite[myEnd];
458 if(childToParent != NULL) {
459 myEnd = (*pid == 0) ? WRITE_END : READ_END;
460 close(childWrite[1 - myEnd]);
461 FD_SET(childWrite[myEnd], &connectedPipes);
462 *childToParent = childWrite[myEnd];
470 /* it should be thread safe (provided that setsockopt, bind and listen
471 * are thread safe) */
473 EstablishAnEar(unsigned short startingPort,
474 unsigned short endingPort,
476 unsigned short *earPort) {
481 Socket sd = NO_SOCKET;
482 struct sockaddr_in server;
484 for(port = startingPort; port <= endingPort; port++) {
485 server.sin_port = htons((u_short)port);
486 server.sin_addr.s_addr = INADDR_ANY;
487 server.sin_family = AF_INET;
488 if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
489 ERROR("EstablishAnEar: socket allocation failed\n");
492 (void)setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
493 /* Set the socket buffer sizes to 32k, which just happens
494 * to correspond to the most common option value for
495 * tcpMessageMonitor activities. This allows us to use a
496 * client connection to conduct the experiment, rather
497 * than needing to configure and open a new connection.
499 (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&k32, sizeof(k32));
500 (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&k32, sizeof(k32));
501 if(bind(sd, (struct sockaddr *)&server, sizeof(server)) != -1 &&
502 listen(sd, 5) != -1) {
508 if(port > endingPort) {
509 FAIL2("EstablishAnEar: couldn't find a port between %d and %d\n", startingPort, endingPort);
513 FD_SET(sd, &connectedEars);
514 DDEBUG1("Openned an ear on sock %d\n",sd);
515 ReleaseNWSLock(&lock);
520 DDEBUG1("EstablistAnEar: connected socket %d\n", sd);
526 /* thread safe ... hopefully. We #ifdef HAVE_THREAD_H for performance
527 * reasons when threads are not required. */
529 IncomingRequest(double timeOut,
538 struct sockaddr_in peer_in;
539 SOCKLEN_T peer_in_len = sizeof(peer_in);
543 int tmp_errno, done = -1;
545 /* nextToService is used to make sure that every connection gets
546 * a chance to be serviced. If we always started checking at 0,
547 * very active connections with small descriptor numbers could
548 * starve larger-descriptor connections. */
549 /* static Socket nextToService = 0; */
550 /* Obi: I don't use the "static" approach since may be trouble
551 * with thread. Instead of locking I use the CurrentTime() to
552 * (hopefully) start from a different place each time */
553 Socket nextToService = ((int)CurrentTime() % FD_SETSIZE);
555 /* let's check we are the only one running here ... */
556 #ifdef HAVE_PTHREAD_H
559 ReleaseNWSLock(&lock);
560 ERROR("IncomingRequest: another instance is running!\n");
564 ReleaseNWSLock(&lock);
569 wakeup = CurrentTime() + timeOut;
573 /* let's see if we need to check disconnected socket */
574 if (needDisconnect == 1) {
576 (void)CloseDisconnections();
579 /* is the timeout expired? */
581 if (now == -1 || now >= wakeup) {
583 WARN("IncomingRequest: time() failed.\n");
585 done = 0; /* didn't find anything */
586 break; /* let's get out of here */
589 /* Construct in readFds the union of connected ears,
590 * pipes, and sockets. */
591 /* connected* are global variables and even though we
592 * only read them we play safe and we lock */
595 for(i = 0; i < FD_SETSIZE; i++) {
596 if (FD_ISSET(i, &connectedSockets) && !FD_ISSET(i, &inUse)) {
599 if(FD_ISSET(i, &connectedPipes)) {
602 if(FD_ISSET(i, &connectedEars)) {
606 ReleaseNWSLock(&lock);
608 /* set the timeout */
609 tout.tv_sec = (unsigned int)wakeup - (unsigned int)now;
612 tmp_errno = select(FD_SETSIZE, &readFds, NULL, NULL, &tout);
613 if (tmp_errno == -1) {
614 /* save the errno value */
617 ReleaseNWSLock(&lock);
619 /* EINTR we have to go ahead and retry: nothing
621 if(tmp_errno == EINTR) {
623 } else if(tmp_errno == EINVAL) {
624 /* we blew it somehow -- (osf likely) */
625 /* can be because (from man page):
626 [EINVAL] The time limit specified by the
627 timeout parameter is invalid. The nfds
628 parameter is less than 0, or greater than
629 or equal to FD_SETSIZE. One of the
630 specified file descriptors refers to a
631 STREAM or multiplexer that is linked
632 (directly or indirectly) downstream from
635 ERROR4("IncomingRequest: invalid select - nfds: %d, rfds: %d, timeout: %d.%d", FD_SETSIZE, readFds, tout.tv_sec, tout.tv_usec);
637 ERROR1("IncomingRequest: select error %d\n", tmp_errno);
639 done = 0; /* didn't find anything */
640 break; /* done here */
641 } else if (tmp_errno == 0) {
642 /* this was a timeout */
646 /* let's find out what socket is available and for what */
648 /* we do an all around loop to check all sockets
649 * starting from nextToService until we have one
651 for (i = -1; i != nextToService; i=(i+1) % FD_SETSIZE) {
653 /* first time around */
657 if(!FD_ISSET(i, &readFds)) {
658 /* nothing to do here */
662 if(FD_ISSET(i, &connectedEars)) {
663 /* Add the new connection
664 * to connectedSockets. */
665 newSock = accept(i, (struct sockaddr *)&peer_in, &peer_in_len);
667 SocketFailure(SIGPIPE);
671 ConditionSocket(newSock);
672 peer = PeerName_r(newSock);
674 DDEBUG2("IncomingRequest: connected socket %d to %s\n", newSock, peer);
677 /* operating on a global variable */
679 FD_SET(newSock, &connectedSockets);
680 ReleaseNWSLock(&lock);
682 } else if(FD_ISSET(i, &connectedPipes)) {
683 /* we found a good one */
689 /* Existing socket connection. */
690 if(recv(i, &lookahead, 1, MSG_PEEK) > 0) {
693 *ldap = ((int) lookahead == LBER_SEQUENCE);
697 #ifdef HAVE_PTHREAD_H
698 /* the socket is in use:
699 * client needs to call
700 * SocketIsAvailable to
702 /* it only makes sense in
703 * a threaded environment */
706 ReleaseNWSLock(&lock);
711 /* This is how we find
712 * out about connections
714 * Drop it from our list
715 * of known connections.
717 DDEBUG1("IncomingRequest: Dropping closed connection %d\n", i);
727 #ifdef HAVE_PTHREAD_H
730 ReleaseNWSLock(&lock);
739 SocketIsAvailable(Socket sd) {
744 WARN("SocketIsAvailable: socket is negative\n");
748 /* check if the socket is in connectedSockets and is in use */
751 ReleaseNWSLock(&lock);
758 SocketInUse(Socket sd) {
762 WARN("SocketInUse: socket is negative\n");
768 ReleaseNWSLock(&lock);
778 struct timeval timeout;
786 FD_SET(sd, &readFds);
787 FD_SET(sd, &writeFds);
788 timeout.tv_sec = GetTimeOut(SEND, Peer(sd), 1);
791 return(select(FD_SETSIZE, NULL, &writeFds, NULL, &timeout) == 1);
798 NotifyOnDisconnection(SocketFunction notifyFn) {
801 /* operating on global variables */
803 for(i = 0; i < MAX_NOTIFIES; i++) {
804 if(disconnectFunctions[i] == NULL) {
805 disconnectFunctions[i] = notifyFn;
809 ReleaseNWSLock(&lock);
814 static pid_t passedPids[MAXPASSES];
815 static Socket passedSockets[MAXPASSES];
819 PassSocket(Socket *sock,
824 /* Clean up any sockets previously passed to children who have exited. */
825 for(i = 0; i < MAXPASSES; i++) {
826 if(passedPids[i] != 0) {
827 if((waitpid(passedPids[i], &childStat, WNOHANG) < 0) ||
828 WIFEXITED(childStat)) {
829 LOG1("PassSocket: Reclaiming connection %d\n", passedSockets[i]);
830 (void)shutdown(passedSockets[i], 2);
831 (void)close(passedSockets[i]);
832 DoDisconnectNotification(passedSockets[i]);
838 /* Record this socket in passedSockets and remove all other memory of it. */
839 for(i = 0; i < MAXPASSES; i++) {
840 if(passedPids[i] == 0) {
841 LOG2("PassSocket: Passing connection %d to %d\n", *sock, child);
842 passedPids[i] = child;
843 passedSockets[i] = *sock;
855 /* here to be used by dnsutil.c (GetPeerName) */
861 if (FD_ISSET(sd, &connectedPipes)) {
864 ReleaseNWSLock(&lock);
875 double start = 0, myTimeOut = 0;
876 int isPipe, tmp, done=1;
879 int recvd = 0, totalRecvd=0;
880 struct timeval tout, *tv;
883 /* let's see if we need to check disconnected socket */
884 if (needDisconnect == 1) {
886 (void)CloseDisconnections();
890 if (sd < 0 || bytes == NULL) {
891 WARN("RecvBytes: parameters out of range!\n");
895 /* connectedPipes is global */
897 isPipe = FD_ISSET(sd, &connectedPipes);
898 ReleaseNWSLock(&lock);
901 FD_SET(sd, &readFds);
903 /* select the adaptive timeouts or the passed in one. */
905 myTimeOut = (int)timeOut;
906 if (SignalAlarm(RecvTimeOut, &was) == 0) {
907 WARN("Failed to set the alarm signal! Exiting\n");
910 /* let's start the clock */
911 start = CurrentTime();
914 for(nextByte=(char*)bytes; totalRecvd < byteSize; totalRecvd += recvd) {
916 UnlockMessageSystem();
918 /* set the timeout if requested by the user */
920 myTimeOut = timeOut - (CurrentTime() - start);
926 tout.tv_sec = (int) myTimeOut;
929 /* 0 is the special flag for don't use touts */
933 tmp = select(FD_SETSIZE, &readFds, NULL, NULL, tv);
937 /* just in case another call modify errno */
940 ReleaseNWSLock(&lock);
942 /* if interrupted, let's try again */
947 ERROR2("RecvBytes: select on %d failed (%s)\n", sd, strerror(tmp));
950 } else if (tmp == 0) {
954 ERROR1("RecvBytes: Socket %d timed out\n", sd);
960 /* let's read the data */
962 myTimeOut = timeOut - (CurrentTime() - start);
963 /* should always be > 0 since we didn't
964 * timed out on select */
966 SetRealTimer((unsigned int)myTimeOut);
968 ERROR1("RecvBytes: trying to set negative timeout on socket %d\n", sd);
976 recvd = read(sd, nextByte, byteSize - totalRecvd);
979 recvd = recv(sd, nextByte, byteSize - totalRecvd, 0);
981 /* just in case another call modify errno */
984 ReleaseNWSLock(&lock);
992 WARN2("RecvBytes: read failed errno:%d, %s\n", tmp, strerror(tmp));
993 ERROR3("RecvBytes: socket %d failed after %d of %d bytes\n", sd, totalRecvd, byteSize);
1000 /* resetting the sigalarm */
1002 SignalAlarm(was, NULL);
1009 /* it should be thread safe. */
1011 SendBytes( Socket sd,
1017 int sent =0, totalSent = 0;
1018 int isPipe, tmp, done = 1;
1019 struct timeval tout, *tv;
1020 double start, myTimeOut = 0;
1024 /* let's see if we need to check disconnected socket */
1025 if (needDisconnect == 1) {
1027 (void)CloseDisconnections();
1030 /* connectedPipes is global */
1032 isPipe = FD_ISSET(sd, &connectedPipes);
1033 ReleaseNWSLock(&lock);
1035 /* select the adaptive timeouts or the passed in one. */
1037 myTimeOut = (double) timeOut;
1038 if (SignalAlarm(RecvTimeOut, &was) == 0) {
1039 ERROR("Failed to set the alarm signal! Exiting\n");
1044 /* let's start the timer */
1045 start = CurrentTime();
1047 for(nextByte = (char*)bytes; totalSent < byteSize; totalSent += sent) {
1048 UnlockMessageSystem();
1050 /* set the timeout, and if we timed out get out */
1052 /* 0 is the special flag for don't use touts */
1053 myTimeOut = timeOut - (CurrentTime() - start);
1054 if (myTimeOut < 0) {
1059 tout.tv_sec = (int) myTimeOut;
1061 SetRealTimer((unsigned int)myTimeOut);
1066 sent = write(sd, nextByte, byteSize - totalSent);
1069 sent = send(sd, nextByte, byteSize - totalSent, 0);
1072 /* errno could be modified by another thread */
1075 ReleaseNWSLock(&lock);
1081 LockMessageSystem();
1083 ERROR3("SendBytes: send on socket %d failed (errno=%d %s)\n", sd, tmp, strerror(tmp));
1090 /* reset sigalalrm */
1092 SignalAlarm(was, NULL);
1100 SocketFailure(int sig) {
1101 HoldSignal(SIGPIPE);
1103 if(signal(SIGPIPE, SocketFailure) == SIG_ERR) {
1104 WARN("SocketFailure: error resetting signal\n");
1106 ReleaseSignal(SIGPIPE);