4 #include "config_portability.h"
6 #include <stddef.h> /* offsetof() */
7 #include <stdlib.h> /* free() malloc() REALLOC() */
15 #include "diagnostic.h" /* FAIL() LOG() */
16 #include "protocol.h" /* Socket functions */
21 #if defined(WITH_THREAD)
22 pthread_mutex_t Message_lock = PTHREAD_MUTEX_INITIALIZER;
23 pthread_cond_t Message_wait = PTHREAD_COND_INITIALIZER;
25 pthread_t Message_holding = (pthread_t)-1;
28 static void *lock = NULL; /* local mutex */
31 * Info on registered listeners. #message# is the message for which #listener#
32 * is registered; #image# the message image. Note that, since we provide no
33 * way to terminate listening for messages, we can simply expand the list by
34 * one every time a new listener is registered.
39 ListenFunction listener;
42 static ListenerInfo *listeners = NULL;
43 static unsigned listenerCount = 0;
45 static LdapListenFunction ldapListener = NULL;
49 * A header sent with messages. #version# is the NWS version and is presently
50 * ignored, but it could be used for compatibility. #message# is the actual
51 * message. #dataSize# is the number of bytes that accompany the message.
53 static const DataDescriptor headerDescriptor[] =
54 {SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(MessageHeader, version)),
55 SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(MessageHeader, message)),
56 SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(MessageHeader, dataSize))};
57 #define headerDescriptorLength 3
60 #if defined(WITH_THREAD)
62 * ** uses mutex (which might spin) to test and set a global variable
64 * ** note that the pthread condition variable allows waiting under a lock
65 * ** and will block the calling thread
67 void LockMessageSystem()
69 pthread_mutex_lock(&Message_lock);
70 while(Message_busy == 1)
72 pthread_cond_wait(&Message_wait,&Message_lock);
76 Message_holding = pthread_self(); /* debugging info only */
78 pthread_mutex_unlock(&Message_lock);
83 void UnlockMessageSystem()
85 pthread_mutex_lock(&Message_lock);
88 Message_holding = (pthread_t)-1; /* debugging info only */
89 pthread_cond_signal(&Message_wait);
91 pthread_mutex_unlock(&Message_lock);
95 #define LockMessageSystem()
96 #define UnlockMessageSystem()
100 * Returns 1 or 0 depending on whether or not format conversion is required for
101 * data with the format described by the #howMany#-long array #description#.
103 * I believe is thread safe (DataSize & DifferentFOrmat are thread safe)
106 ConversionRequired( const DataDescriptor *description,
110 if(DataSize(description, howMany, HOST_FORMAT) !=
111 DataSize(description, howMany, NETWORK_FORMAT)) {
115 for(i = 0; i < howMany; i++) {
116 if(description[i].type == STRUCT_TYPE) {
117 if(ConversionRequired(description[i].members, description[i].length)) {
120 } else if(DifferentFormat(description[i].type))
128 /* it should be thread safe (all the conversions routines should be
134 const DataDescriptor *description,
142 size_t totalSize = DataSize(description, howMany, NETWORK_FORMAT);
147 convertIt = ConversionRequired(description, howMany);
150 converted = malloc(totalSize);
151 if(converted == NULL) {
152 UnlockMessageSystem();
153 FAIL1("RecvData: memory allocation of %d bytes failed\n", totalSize);
155 destination = converted;
160 /* use adaptive timeouts? */
164 /* adaptive timeout */
165 start = CurrentTime();
167 recvResult = RecvBytes(sd, destination, totalSize, GetTimeOut(RECV, Peer(sd), totalSize));
168 /* we assume a failure is a timeout ... Shouldn't hurt
169 * too much getting a bigger timeout anyway */
170 SetTimeOut(RECV, Peer(sd), CurrentTime()-start, totalSize, !recvResult);
172 recvResult = RecvBytes(sd, destination, totalSize, timeOut);
174 if (recvResult != 0) {
175 if(DifferentOrder() || convertIt)
176 ConvertData(data, destination, description,
177 howMany, NETWORK_FORMAT);
179 if(converted != NULL)
182 UnlockMessageSystem();
188 /* It should be thread safe (just read headerDescriptor[Lenght] and
189 * RecvByte is thread safe) */
191 RecvMessage(Socket sd,
196 MessageHeader header;
201 headerDescriptorLength,
203 FAIL("RecvMessage: no message received\n");
208 if(header.message != message) {
209 garbage = malloc(2048);
210 if (garbage == NULL) {
211 FAIL("RecvMessage: out of memory!");
213 /* get the rigth timeout */
215 timeOut = GetTimeOut(RECV, Peer(sd), 1);
217 while(header.dataSize > 0) {
218 /* if we time out let's drop the socket */
219 if (!RecvBytes(sd, garbage, (header.dataSize > sizeof(garbage)) ? sizeof(garbage) : header.dataSize, timeOut)) {
221 WARN("RecvMessage: timeout on receiving non-handled message: dropping socket\n");
224 header.dataSize -= sizeof(garbage);
228 UnlockMessageSystem();
229 FAIL1("RecvMessage: unexpected message %d received\n", header.message);
231 *dataSize = header.dataSize;
232 UnlockMessageSystem();
236 /* it should be thread safe */
238 RecvMessageAndDatas(Socket sd,
241 const DataDescriptor *description1,
244 const DataDescriptor *description2,
249 if (RecvMessage(sd, message, &dataSize, timeOut) != 1) {
250 /* failed to receive message: errors already printed out
251 * by RecvMessage() */
256 if(!RecvData(sd, data1, description1, howMany1, timeOut)) {
257 FAIL("RecvMessageAndDatas: data receive failed\n");
262 if(!RecvData(sd, data2, description2, howMany2, timeOut)) {
263 FAIL("RecvMessageAndDatas: data receive failed\n");
271 * waits for timeOut seconds for incoming messages and calls the
272 * appropriate (registered) listener function.
275 ListenForMessages(double timeOut) {
277 MessageHeader header;
281 if(!IncomingRequest(timeOut, &sd, &ldap))
286 ** WARNING! Not sure if ldapListener is thread safe (yet)
289 if (ldapListener == NULL) {
290 WARN2("Unexpected LDAP message received from %s on %d\n",
292 SendLdapDisconnect(&sd, LDAP_UNAVAILABLE);
295 LOG2("Received LDAP message from %s on %d\n", PeerName(sd), sd);
301 /* let's use the adaptive timeouts on receiving the header */
302 if(!RecvData(sd, (void *)&header, headerDescriptor, headerDescriptorLength, -1)) {
303 /* Likely a connection closed by the other side. There
304 * doesn't seem to be any reliable way to detect this,
305 * and, for some reason, select() reports it as a
306 * connection ready for reading. */
313 for(i = 0; i < listenerCount; i++) {
314 if(listeners[i].message == header.message) {
315 LOG3("Received %s message from %s on %d\n", listeners[i].image, PeerName(sd), sd);
316 listeners[i].listener(&sd, header);
321 if(i == listenerCount) {
322 WARN3("Unknown message %d received from %s on %d\n", header.message, PeerName(sd), sd);
332 /* regsiters the functions which should be called upon the receive of the
333 * messageType message. Should be thread safe */
335 RegisterListener(MessageType message,
337 ListenFunction listener) {
339 if (!GetNWSLock(&lock)) {
340 ERROR("RegisterListener: couldn't obtain the lock\n");
342 listeners = REALLOC(listeners, (listenerCount+1)*sizeof(ListenerInfo));
343 listeners[listenerCount].message = message;
344 listeners[listenerCount].image = image;
345 listeners[listenerCount].listener = listener;
347 ReleaseNWSLock(&lock);
348 UnlockMessageSystem();
354 RegisterLdapListener(LdapListenFunction listener) {
355 ldapListener = listener;
360 SendLdapDisconnect (Socket *sd,
361 ber_int_t resultCode)
364 ** Send an unsolicitied notice of disconnection, in compliance with the
365 ** LDAP RFC. This notice is pre-created to allow us to send it
366 ** even if the lber libraries have failed (for example, due to a memory
369 ** abortMessage contains the unsolicited notice of disconnection.
370 ** abortMessageLength gives the length of the message, needed
371 ** due to the message's embedded NULLs. errorOffset gives the location
372 ** of the error code within the message.
374 ** To create this message using the ber libraries, call ber_print as follows:
375 ** ber_printf(ber, "{it{essts}}", 0, LDAP_RES_EXTENDED, resultCode,
376 ** "", "", LDAP_TAG_EXOP_RES_OID, LDAP_NOTICE_OF_DISCONNECTION);
377 ** The components of an unsolicited disconnect message (and hence the
378 ** parameters for the above ber_printf) are as follows:
379 ** messageID (must be zero for unsoliticed notification), protocolOp,
380 ** resultCode, matchedDN, errorMessage, responseName (optional, and omitted
381 ** for an unsolicited disconnect), response (with tag)
384 static char abortMessage[] = "0$\x02\x01\x00x\x1f\x0a\x01\x02\x04\x00\x04"
385 "\x00\x8a\x16" "1.3.6.1.4.1.1466.20036";
386 int abortMessageLength = 38;
388 abortMessage[errorOffset] = resultCode;
389 SendBytes(*sd, abortMessage, abortMessageLength, -1);
394 /* it should be thread safe (Convert*, SendBytes, DataSize abd
395 * DifferentOrder are thread safe) */
399 const DataDescriptor *description,
406 size_t totalSize = DataSize(description, howMany, NETWORK_FORMAT);
411 if(DifferentOrder() || ConversionRequired(description, howMany)) {
412 converted = malloc(totalSize);
413 if(converted == NULL) {
414 UnlockMessageSystem();
415 FAIL("SendData: memory allocation failed\n");
417 ConvertData(converted, data, description, howMany, HOST_FORMAT);
423 /* use adaptive timeouts? */
427 /* adaptive timeout */
428 start = CurrentTime();
429 sendResult = SendBytes(sd, source, totalSize, GetTimeOut(SEND, Peer(sd),totalSize));
430 /* we assume a failure is a timeout ... Shouldn't hurt
431 * too much getting a bigger timeout anyway */
432 SetTimeOut(SEND, Peer(sd), CurrentTime()-start, totalSize, !sendResult);
434 sendResult = SendBytes(sd, source, totalSize, timeOut);
436 if(converted != NULL)
437 free((void *)converted);
439 UnlockMessageSystem();
443 /* it should be thread safe (SendData, DataSize are thread safe) */
445 SendMessageAndDatas(Socket sd,
448 const DataDescriptor *description1,
451 const DataDescriptor *description2,
455 MessageHeader header;
459 header.version = NWS_VERSION;
460 header.message = message;
463 header.dataSize += DataSize(description1, howMany1, NETWORK_FORMAT);
465 header.dataSize += DataSize(description2, howMany2, NETWORK_FORMAT);
467 UnlockMessageSystem();
472 headerDescriptorLength,
474 FAIL("SendMessageAndDatas: header send failed \n");
476 if((data1 != NULL) && !SendData(sd, data1, description1, howMany1, timeOut)) {
477 FAIL("SendMessageAndDatas: data1 send failed\n");
479 if((data2 != NULL) && !SendData(sd, data2, description2, howMany2, timeOut)) {
480 FAIL("SendMessageAndDatas: data2 send failed\n");
486 * reads the NWS header associated with in incoming message and returns
487 * the message type. returns -1 if the read fails
489 * it should be thread safe (RecvData is thread safe and header* are only
492 int RecvMsgType(Socket sd, double timeout)
495 MessageHeader header;
497 status = RecvData(sd,
500 headerDescriptorLength,
507 return((int)header.message);