Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
- Reput hook for raw sockets, needed for BW experiments
[simgrid.git] / src / nws_portability / messages.c
1 /* $Id$ */
2
3
4 #include "config_portability.h"
5
6 #include <stddef.h>          /* offsetof() */
7 #include <stdlib.h>          /* free() malloc() REALLOC() */
8 #ifdef WITH_LDAP
9 #include <ldap.h>
10 #endif
11 #ifdef WITH_THREAD
12 #include <pthread.h>
13 #endif
14
15 #include "diagnostic.h"      /* FAIL() LOG() */
16 #include "protocol.h"        /* Socket functions */
17 #include "messages.h"
18 #include "osutil.h"
19 #include "timeouts.h"
20
21 #if defined(WITH_THREAD)
22 pthread_mutex_t Message_lock = PTHREAD_MUTEX_INITIALIZER;
23 pthread_cond_t Message_wait = PTHREAD_COND_INITIALIZER;
24 int Message_busy = 0;
25 pthread_t Message_holding = (pthread_t)-1;
26 #endif
27
28 static void *lock = NULL;                       /* local mutex */
29
30 /*
31  * Info on registered listeners.  #message# is the message for which #listener#
32  * is registered; #image# the message image.  Note that, since we provide no
33  * way to terminate listening for messages, we can simply expand the list by
34  * one every time a new listener is registered.
35  */
36 typedef struct {
37         MessageType message;
38         const char *image;
39         ListenFunction listener;
40 } ListenerInfo;
41
42 static ListenerInfo *listeners = NULL;
43 static unsigned listenerCount = 0;
44 #ifdef WITH_LDAP
45 static LdapListenFunction ldapListener = NULL;
46 #endif
47
48 /*
49  * A header sent with messages.  #version# is the NWS version and is presently
50  * ignored, but it could be used for compatibility.  #message# is the actual
51  * message.  #dataSize# is the number of bytes that accompany the message.
52  */
53 static const DataDescriptor headerDescriptor[] =
54         {SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(MessageHeader, version)),
55         SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(MessageHeader, message)),
56         SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(MessageHeader, dataSize))};
57 #define headerDescriptorLength 3
58
59
60 #if defined(WITH_THREAD)
61  /*
62   * ** uses mutex (which might spin) to test and set a global variable
63   * **
64   * ** note that the pthread condition variable allows waiting under a lock
65   * ** and will block the calling thread
66   * */
67 void LockMessageSystem()
68 {
69         pthread_mutex_lock(&Message_lock);
70         while(Message_busy == 1)
71         {
72                 pthread_cond_wait(&Message_wait,&Message_lock);
73         }
74
75         Message_busy = 1;
76         Message_holding = pthread_self(); /* debugging info only */
77
78         pthread_mutex_unlock(&Message_lock);
79
80         return;
81 }
82
83 void UnlockMessageSystem()
84 {
85         pthread_mutex_lock(&Message_lock);
86
87         Message_busy = 0;
88         Message_holding = (pthread_t)-1; /* debugging info only */
89         pthread_cond_signal(&Message_wait);
90
91         pthread_mutex_unlock(&Message_lock);
92         return;
93 }
94 #else
95 #define LockMessageSystem()
96 #define UnlockMessageSystem()
97 #endif
98
99 /*
100  * Returns 1 or 0 depending on whether or not format conversion is required for
101  * data with the format described by the #howMany#-long array #description#.
102  *
103  *  I believe is thread safe (DataSize & DifferentFOrmat are thread safe)
104  */
105 static int
106 ConversionRequired(     const DataDescriptor *description,
107                         size_t howMany) {
108         int i;
109
110         if(DataSize(description, howMany, HOST_FORMAT) !=
111                         DataSize(description, howMany, NETWORK_FORMAT)) {
112                 return 1;
113         }
114
115         for(i = 0; i < howMany; i++) {
116                 if(description[i].type == STRUCT_TYPE) {
117                         if(ConversionRequired(description[i].members, description[i].length)) {
118                                 return 1;
119                         }
120                 } else if(DifferentFormat(description[i].type))
121                         return 1;
122         }
123
124         return 0;
125 }
126
127
128 /* it should be thread safe (all the conversions routines should be
129  * thread safe).
130  * */
131 int
132 RecvData(       Socket sd,
133                 void *data,
134                 const DataDescriptor *description,
135                 size_t howMany,
136                 double timeOut) {
137
138         void *converted;
139         int convertIt;
140         void *destination;
141         int recvResult;
142         size_t totalSize = DataSize(description, howMany, NETWORK_FORMAT);
143
144         LockMessageSystem();
145
146         converted = NULL;
147         convertIt = ConversionRequired(description, howMany);
148
149         if(convertIt) {
150                 converted = malloc(totalSize);
151                 if(converted == NULL) {
152                         UnlockMessageSystem();
153                         FAIL1("RecvData: memory allocation of %d bytes failed\n", totalSize);
154                 }
155                 destination = converted;
156         } else {
157                 destination = data;
158         }
159
160         /* use adaptive timeouts? */
161         if (timeOut < 0) {
162                 double start;
163
164                 /* adaptive timeout */
165                 start = CurrentTime();
166
167                 recvResult = RecvBytes(sd, destination, totalSize, GetTimeOut(RECV, Peer(sd), totalSize));
168                 /* we assume a failure is a timeout ... Shouldn't hurt
169                  * too much getting a bigger timeout anyway */
170                 SetTimeOut(RECV, Peer(sd), CurrentTime()-start, totalSize, !recvResult);
171         } else {
172                 recvResult = RecvBytes(sd, destination, totalSize, timeOut);
173         }
174         if (recvResult != 0) {
175                 if(DifferentOrder() || convertIt)
176                         ConvertData(data, destination, description, 
177                                 howMany, NETWORK_FORMAT);
178
179                 if(converted != NULL)
180                         free(converted);
181         }
182         UnlockMessageSystem();
183
184         return recvResult;
185 }
186
187
188 /* It should be thread safe (just read headerDescriptor[Lenght] and
189  * RecvByte is thread safe) */
190 int
191 RecvMessage(Socket sd,
192             MessageType message,
193             size_t *dataSize,
194             double timeOut) {
195         char *garbage;
196         MessageHeader header;
197
198         if(!RecvData(   sd,
199                         (void *)&header,
200                         headerDescriptor,
201                         headerDescriptorLength,
202                         timeOut)) {
203                 FAIL("RecvMessage: no message received\n");
204         }
205
206         LockMessageSystem();
207
208         if(header.message != message) {
209                 garbage = malloc(2048);
210                 if (garbage == NULL) {
211                         FAIL("RecvMessage: out of memory!");
212                 }
213                 /* get the rigth timeout */
214                 if (timeOut < 0) {
215                         timeOut = GetTimeOut(RECV, Peer(sd), 1);
216                 }
217                 while(header.dataSize > 0) {
218                         /* if we time out let's drop the socket */
219                         if (!RecvBytes(sd, garbage, (header.dataSize > sizeof(garbage)) ? sizeof(garbage) : header.dataSize, timeOut)) {
220                                 DROP_SOCKET(&sd);
221                                 WARN("RecvMessage: timeout on receiving non-handled message: dropping socket\n");
222                                 break;
223                         }
224                         header.dataSize -= sizeof(garbage);
225                 }
226                 free(garbage);
227
228                 UnlockMessageSystem();
229                 FAIL1("RecvMessage: unexpected message %d received\n", header.message);
230         }
231         *dataSize = header.dataSize;
232         UnlockMessageSystem();
233         return(1);
234 }
235
236 /* it should be thread safe */
237 int
238 RecvMessageAndDatas(Socket sd,
239                     MessageType message,
240                     void *data1,
241                     const DataDescriptor *description1,
242                     size_t howMany1,
243                     void *data2,
244                     const DataDescriptor *description2,
245                     size_t howMany2,
246                     double timeOut) {
247         size_t dataSize;
248
249         if (RecvMessage(sd, message, &dataSize, timeOut) != 1) {
250                 /* failed to receive message: errors already printed out
251                  * by RecvMessage() */
252                 return 0;
253         }
254
255         if(data1 != NULL) {
256                 if(!RecvData(sd, data1, description1, howMany1, timeOut)) {
257                         FAIL("RecvMessageAndDatas: data receive failed\n");
258                 }
259         }
260
261         if(data2 != NULL) {
262                 if(!RecvData(sd, data2, description2, howMany2, timeOut)) {
263                         FAIL("RecvMessageAndDatas: data receive failed\n");
264                 }
265         }
266
267         return(1);
268 }
269
270 /* 
271  * waits for timeOut seconds for incoming messages and calls the
272  * appropriate (registered) listener function.
273  */
274 void
275 ListenForMessages(double timeOut) {
276
277         MessageHeader header;
278         int i, ldap;
279         Socket sd;
280
281         if(!IncomingRequest(timeOut, &sd, &ldap))
282                 return;
283
284 #ifdef WITH_LDAP
285   /*
286    ** WARNING!  Not sure if ldapListener is thread safe (yet)
287    */
288   if (ldap) {
289     if (ldapListener == NULL) {
290       WARN2("Unexpected LDAP message received from %s on %d\n",
291             PeerName(sd), sd);
292       SendLdapDisconnect(&sd, LDAP_UNAVAILABLE);
293     }
294     else {
295       LOG2("Received LDAP message from %s on %d\n", PeerName(sd), sd);
296       ldapListener(&sd);
297     }
298   } else {
299 #endif
300
301         /* let's use the adaptive timeouts on receiving the header */
302         if(!RecvData(sd, (void *)&header, headerDescriptor, headerDescriptorLength, -1)) {
303                 /* Likely a connection closed by the other side.  There
304                  * doesn't seem to be any reliable way to detect this,
305                  * and, for some reason, select() reports it as a
306                  * connection ready for reading.  */
307                 DROP_SOCKET(&sd);
308                 return;
309         }
310
311         LockMessageSystem();
312
313         for(i = 0; i < listenerCount; i++) {
314                 if(listeners[i].message == header.message) {
315                         LOG3("Received %s message from %s on %d\n", listeners[i].image, PeerName(sd), sd);
316                         listeners[i].listener(&sd, header);
317                         break;
318                 }
319         }
320
321         if(i == listenerCount) {
322                 WARN3("Unknown message %d received from %s on %d\n", header.message, PeerName(sd), sd);
323                 DROP_SOCKET(&sd);
324         }
325 #ifdef WITH_LDAP
326    }
327 #endif
328 }
329
330
331
332 /* regsiters the functions which should be called upon the receive of the
333  * messageType message. Should be thread safe */
334 void
335 RegisterListener(MessageType message,
336                  const char *image,
337                  ListenFunction listener) {
338         LockMessageSystem();
339         if (!GetNWSLock(&lock)) {
340                 ERROR("RegisterListener: couldn't obtain the lock\n");
341         }
342         listeners = REALLOC(listeners, (listenerCount+1)*sizeof(ListenerInfo));
343         listeners[listenerCount].message = message;
344         listeners[listenerCount].image = image;
345         listeners[listenerCount].listener = listener;
346         listenerCount++;
347         ReleaseNWSLock(&lock);
348         UnlockMessageSystem();
349 }
350
351
352 #ifdef WITH_LDAP
353 void
354 RegisterLdapListener(LdapListenFunction listener) {
355   ldapListener = listener;
356 }
357
358
359 void
360 SendLdapDisconnect (Socket *sd,
361                     ber_int_t resultCode)
362 {
363   /*
364   ** Send an unsolicitied notice of disconnection, in compliance with the
365   ** LDAP RFC.  This notice is pre-created to allow us to send it
366   ** even if the lber libraries have failed (for example, due to a memory
367   ** shortage).
368   **
369   ** abortMessage contains the unsolicited notice of disconnection.  
370   ** abortMessageLength gives the length of the message, needed
371   ** due to the message's embedded NULLs.  errorOffset gives the location
372   ** of the error code within the message.
373   **
374   ** To create this message using the ber libraries, call ber_print as follows:
375   **   ber_printf(ber, "{it{essts}}", 0, LDAP_RES_EXTENDED, resultCode,
376   **     "", "", LDAP_TAG_EXOP_RES_OID, LDAP_NOTICE_OF_DISCONNECTION);
377   ** The components of an unsolicited disconnect message (and hence the
378   ** parameters for the above ber_printf) are as follows:
379   **   messageID (must be zero for unsoliticed notification), protocolOp,
380   **   resultCode, matchedDN, errorMessage, responseName (optional, and omitted
381   **   for an unsolicited disconnect), response (with tag)
382   **
383   */
384   static char abortMessage[] = "0$\x02\x01\x00x\x1f\x0a\x01\x02\x04\x00\x04"
385     "\x00\x8a\x16" "1.3.6.1.4.1.1466.20036";
386   int abortMessageLength = 38;
387   int errorOffset = 9;
388   abortMessage[errorOffset] = resultCode;
389   SendBytes(*sd, abortMessage, abortMessageLength, -1);
390   DROP_SOCKET(sd);
391 }
392 #endif
393
394 /* it should be thread safe (Convert*, SendBytes, DataSize abd
395  * DifferentOrder are thread safe) */
396 int
397 SendData(Socket sd,
398          const void *data,
399          const DataDescriptor *description,
400          size_t howMany,
401          double timeOut) {
402
403         void *converted;
404         int sendResult;
405         const void *source;
406         size_t totalSize = DataSize(description, howMany, NETWORK_FORMAT);
407
408         LockMessageSystem();
409         converted = NULL;
410
411         if(DifferentOrder() || ConversionRequired(description, howMany)) {
412                 converted = malloc(totalSize);
413                 if(converted == NULL) {
414                         UnlockMessageSystem();
415                         FAIL("SendData: memory allocation failed\n");
416                 }
417                 ConvertData(converted, data, description, howMany, HOST_FORMAT);
418                 source = converted;
419         } else {
420                 source = data;
421         }
422
423         /* use adaptive timeouts? */
424         if (timeOut < 0) {
425                 double start;
426
427                 /* adaptive timeout */
428                 start = CurrentTime();
429                 sendResult = SendBytes(sd, source, totalSize, GetTimeOut(SEND, Peer(sd),totalSize));
430                 /* we assume a failure is a timeout ... Shouldn't hurt
431                  * too much getting a bigger timeout anyway */
432                 SetTimeOut(SEND, Peer(sd), CurrentTime()-start, totalSize, !sendResult);
433         } else {
434                 sendResult = SendBytes(sd, source, totalSize, timeOut);
435         }
436         if(converted != NULL)
437                 free((void *)converted);
438
439         UnlockMessageSystem();
440         return sendResult;
441 }
442
443 /* it should be thread safe (SendData, DataSize are thread safe) */
444 int
445 SendMessageAndDatas(Socket sd,
446                     MessageType message,
447                     const void *data1,
448                     const DataDescriptor *description1,
449                     size_t howMany1,
450                     const void *data2,
451                     const DataDescriptor *description2,
452                     size_t howMany2,
453                     double timeOut) {
454
455         MessageHeader header;
456
457         LockMessageSystem();
458
459         header.version = NWS_VERSION;
460         header.message = message;
461         header.dataSize = 0;
462         if(data1 != NULL)
463                 header.dataSize += DataSize(description1, howMany1, NETWORK_FORMAT);
464         if(data2 != NULL)
465                 header.dataSize += DataSize(description2, howMany2, NETWORK_FORMAT);
466
467         UnlockMessageSystem();
468
469         if(!SendData(sd,
470                         &header,
471                         headerDescriptor,
472                         headerDescriptorLength,
473                         timeOut)) {
474                 FAIL("SendMessageAndDatas: header send failed \n");
475         }
476         if((data1 != NULL) && !SendData(sd, data1, description1, howMany1, timeOut)) {
477                 FAIL("SendMessageAndDatas: data1 send failed\n");
478         }
479         if((data2 != NULL) && !SendData(sd, data2, description2, howMany2, timeOut)) {
480                 FAIL("SendMessageAndDatas: data2 send failed\n");
481         }
482         return 1;
483 }
484
485 /*
486  * reads the NWS header associated with in incoming message and returns
487  * the message type.  returns -1 if the read fails
488  *
489  * it should be thread safe (RecvData is thread safe and header* are only
490  * read) 
491  */
492 int RecvMsgType(Socket sd, double timeout)
493 {
494         int status;
495         MessageHeader header;
496
497         status = RecvData(sd,
498                           &header,
499                           headerDescriptor,
500                           headerDescriptorLength,
501                           timeout);
502
503         if(status <= 0) {
504                 return(-1);
505         }
506
507         return((int)header.message);
508 }