3 /* gras_rl - implementation of GRAS on real life */
5 /* Authors: Martin Quinson */
6 /* Copyright (C) 2003 the OURAGAN project. */
8 /* This program is free software; you can redistribute it and/or modify it
9 under the terms of the license (GNU LGPL) which comes with this package. */
16 #include <unistd.h> /* sleep() */
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <netinet/in.h> /* struct in_addr */
29 GRAS_LOG_NEW_DEFAULT_CATEGORY(rl);
32 static grasProcessData_t *_grasProcessData;
34 /* Prototypes of internal functions */
35 static int grasConversionRequired(const DataDescriptor *description, size_t howMany);
37 _gras_rawsock_exchange(gras_rawsock_t *sd, int sender, unsigned int timeout,
38 unsigned int expSize, unsigned int msgSize);
41 gras_error_t gras_process_init() {
42 if (!(_grasProcessData=(grasProcessData_t *)malloc(sizeof(grasProcessData_t)))) {
43 fprintf(stderr,"gras_process_init: cannot malloc %d bytes\n",sizeof(grasProcessData_t));
46 _grasProcessData->grasMsgQueueLen=0;
47 _grasProcessData->grasMsgQueue = NULL;
49 _grasProcessData->grasCblListLen = 0;
50 _grasProcessData->grasCblList = NULL;
52 _grasProcessData->userdata = NULL;
55 gras_error_t gras_process_finalize() {
56 fprintf(stderr,"FIXME: %s not implemented (=> leaking on exit :)\n",__FUNCTION__);
60 /* **************************************************************************
61 * Openning/Maintaining/Closing connexions
62 * **************************************************************************/
64 gras_sock_client_open(const char *host, short port,
65 /* OUT */ gras_sock_t **sock) {
68 IPAddress addresses[10];
72 if (!(*sock=malloc(sizeof(gras_sock_t)))) {
73 fprintf(stderr,"Malloc error\n");
76 (*sock)->peer_addr=NULL;
78 if (!(addrCount = IPAddressValues(host, addresses, 10))) {
79 fprintf(stderr,"grasOpenClientSocket: address retrieval of '%s' failed\n",host);
83 for(i = 0; i < addrCount && i<10 ; i++) {
84 if(CallAddr(addresses[i], port, &sd, -1)) {
91 fprintf(stderr,"grasOpenClientSocket: something wicked happenned while connecting to %s:%d",
98 gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
99 /* OUT */ gras_sock_t **sock) {
103 if (!(*sock=malloc(sizeof(gras_sock_t)))) {
104 fprintf(stderr,"Malloc error\n");
108 if (!EstablishAnEar(startingPort,endingPort,&((*sock)->sock),&port)) {
110 return unknown_error;
112 (*sock)->peer_addr=NULL;
118 gras_error_t gras_sock_close(gras_sock_t *sock) {
120 DROP_SOCKET(&(sock->sock));
121 if (sock->peer_addr) free(sock->peer_addr);
128 gras_sock_get_my_port(gras_sock_t *sd) {
134 gras_sock_get_peer_port(gras_sock_t *sd) {
136 return PeerNamePort(sd->sock);
140 gras_sock_get_peer_name(gras_sock_t *sd) {
143 if (!sd) return NULL;
144 tmp=PeerName_r(sd->sock);
146 strcpy(sd->peer_name,tmp);
149 strcpy(sd->peer_name,"unknown");
152 return sd->peer_name;
156 /* **************************************************************************
158 * **************************************************************************/
161 * Returns 1 or 0 depending on whether or not format conversion is required for
162 * data with the format described by the #howMany#-long array #description#.
164 int grasConversionRequired(const DataDescriptor *description, size_t howMany){
167 if(DataSize(description, howMany, HOST_FORMAT) !=
168 DataSize(description, howMany, NETWORK_FORMAT)) {
172 for(i = 0; i < howMany; i++) {
173 if(description[i].type == STRUCT_TYPE) {
174 if(grasConversionRequired(description[i].members, description[i].length)) {
177 } else if(DifferentFormat(description[i].type))
181 return DifferentOrder();
184 /* **************************************************************************
185 * Actually exchanging messages
186 * **************************************************************************/
189 * Discard data on the socket because of failure on our side
192 gras_msg_discard(gras_sock_t *sd, size_t size) {
197 (void)RecvBytes(sd->sock,
199 (s > sizeof(garbage)) ? sizeof(garbage) : s,
200 GetTimeOut(RECV, Peer(sd->sock), 2048));
201 s -= sizeof(garbage);
205 int grasDataRecv( gras_sock_t *sd,
207 const DataDescriptor *description,
208 size_t description_length,
209 unsigned int repetition) {
211 void *converted=NULL;
213 void *destination; /* where to receive the data from the net (*data or converted)*/
215 size_t netSize,hostSize;
216 double start; /* for timeouts */
217 char *net,*host; /* iterators */
222 netSize = DataSize(description, description_length, NETWORK_FORMAT);
223 hostSize = DataSize(description, description_length, HOST_FORMAT);
225 if (!(*data=malloc(hostSize*repetition))) {
227 ERROR1("grasDataRecv: memory allocation of %d bytes failed\n", hostSize*repetition);
231 convertIt = grasConversionRequired(description, description_length);
234 if (!(converted = malloc(netSize*repetition))) {
237 ERROR1("RecvData: memory allocation of %d bytes failed\n", netSize*repetition);
240 destination = converted;
245 /* adaptive timeout */
246 start = CurrentTime();
248 recvResult = RecvBytes(sd->sock, destination, netSize*repetition,
249 GetTimeOut(RECV, Peer(sd->sock), netSize*repetition));
250 /* we assume a failure is a timeout ... Shouldn't hurt
251 * too much getting a bigger timeout anyway */
252 SetTimeOut(RECV, sd->sock, CurrentTime()-start, netSize*repetition, !recvResult);
254 fprintf(stderr,"RECV [seqLen=%d;netsize=%d;hostsize=%d] : (",
255 repetition,netSize,hostSize);
256 for (i=0; i<netSize * repetition; i++) {
257 if (i) fputc('.',stderr);
258 fprintf(stderr,"%02X",((unsigned char*)destination)[i]);
260 fprintf(stderr,") on %p.\n",destination);
262 if (recvResult != 0) {
264 for (i=0, net=(char*)converted, host=(char*)*data;
266 i++, net += netSize, host += hostSize) {
267 ConvertData((void*)host, (void*)net, description, description_length, NETWORK_FORMAT);
270 if(converted != NULL)
273 if (!gras_unlock()) return 0;
278 gras_error_t grasDataSend(gras_sock_t *sd,
280 const DataDescriptor *description,
281 size_t description_length,
285 char *net,*host; /* iterators */
288 size_t netSize = DataSize(description, description_length, NETWORK_FORMAT);
289 size_t hostSize = DataSize(description, description_length, HOST_FORMAT);
290 double start; /* for timeouts */
295 if(grasConversionRequired(description, description_length)) {
296 converted = malloc(netSize * repetition);
297 if(converted == NULL) {
299 fprintf(stderr,"grasDataSend: memory allocation of %d bytes failed.\n",netSize * repetition);
303 for (i=0, net=(char*)converted, host=(char*)data;
305 i++, net += netSize, host += hostSize)
306 ConvertData((void*)net, (void*)host, description, description_length, HOST_FORMAT);
312 fprintf(stderr,"SEND (");
313 for (i=0; i<netSize * repetition; i++) {
314 if (i) fputc('.',stderr);
315 fprintf(stderr,"%02X",((unsigned char*)source)[i]);
317 fprintf(stderr,") from %p\n",source);
318 // grasDataDescDump((const DataDescriptor *)description, description_length);
320 /* adaptive timeout */
321 start = CurrentTime();
322 sendResult = SendBytes(sd->sock, source, netSize * repetition,
323 GetTimeOut(SEND, Peer(sd->sock),netSize*repetition));
324 /* we assume a failure is a timeout ... Shouldn't hurt
325 * too much getting a bigger timeout anyway */
326 SetTimeOut(SEND, sd->sock, CurrentTime()-start, netSize * repetition, !sendResult);
327 if(converted != NULL)
328 free((void *)converted);
335 grasMsgRecv(gras_msg_t **msg,
339 size_t recvd; /* num of bytes received */
343 if (!(sd = (gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
344 fprintf(stderr,"grasMsgRecv: Malloc error\n");
348 if(!IncomingRequest(timeOut, &(sd->sock), &dummyldap)) {
350 return timeout_error;
357 if (!(res = malloc(sizeof(gras_msg_t)))) {
365 res->freeDirective=free_after_use;
367 if (!gras_unlock()) return thread_error;
369 if(!(recvd=grasDataRecv( sd,
370 (void **)&(res->header),
371 headerDescriptor,headerDescriptorCount,1))) {
372 fprintf(stderr,"grasMsgRecv: no message received\n");
373 return network_error;
375 res->header->dataSize -= recvd;
377 fprintf(stderr,"Received header=ver:'%s' msg:%d size:%d seqCount:%d\n",
378 res->header->version, res->header->message,
379 res->header->dataSize, res->header->seqCount);
380 if (strncmp(res->header->version,GRASVERSION,strlen(GRASVERSION))) {
381 /* The other side do not use the same version than us. Let's panic */
384 while ((junkint=recv(sd->sock,junk,2046,0)) == 2046);
385 fprintf(stderr,"PANIC: Got a message from a peer (%s:%d) using GRAS version '%10s' while this side use version '%s'.\n",
386 gras_sock_get_peer_name(sd),gras_sock_get_peer_port(sd),
387 res->header->version,GRASVERSION);
388 return mismatch_error;
390 if (!(res->entry=grasMsgEntryGet(res->header->message))) {
391 /* This message is not registered on our side, discard it */
392 gras_msg_discard(sd,res->header->dataSize);
394 i= res->header->message;
399 fprintf(stderr,"grasMsgRecv: unregistered message %d received from %s\n",
400 i,gras_sock_get_peer_name(sd));
401 return mismatch_error;
404 if (!(recvd=grasDataRecv( sd,
405 (void **)&(res->dataCount),
406 countDescriptor,res->entry->seqCount,1))) {
407 gras_msg_discard(sd,res->header->dataSize);
408 i = res->header->message;
412 fprintf(stderr, "grasMsgRecv: Error while getting elemCounts in message %d received from %s\n",
413 i,gras_sock_get_peer_name(sd));
414 return network_error;
416 res->header->dataSize -= recvd;
418 if (!gras_lock()) return thread_error;
419 if (!(res->data=(void**)malloc(sizeof(void*)*res->entry->seqCount))) {
420 gras_msg_discard(sd,res->header->dataSize);
422 i= res->header->message;
424 free(res->dataCount);
429 fprintf(stderr,"grasMsgRecv: Out of memory while receiving message %d received from %s\n",
430 i,gras_sock_get_peer_name(sd));
433 if (!gras_unlock()) return thread_error;
435 for (i=0;i<res->entry->seqCount;i++) {
437 if(!(recvd=grasDataRecv( sd,
439 (const DataDescriptor*)res->entry->dd[i],
440 res->entry->ddCount[i],
441 res->dataCount[i]) )) {
442 gras_msg_discard(sd,res->header->dataSize);
443 for (i--;i>=0;i--) free(res->data[i]);
444 free(res->dataCount);
448 fprintf(stderr,"grasDataRecv: Transmision error while receiving message %d received from %s\n",
449 i,gras_sock_get_peer_name(sd));
450 return network_error;
452 res->header->dataSize -= recvd;
455 if (res->header->dataSize) {
456 fprintf(stderr,"Damnit dataSize = %d != 0 after receiving message %d received from %s\n",
457 res->header->dataSize,i,gras_sock_get_peer_name(sd));
458 return unknown_error;
464 * Send a message to the network
468 gras_msg_send(gras_sock_t *sd,
470 e_gras_free_directive_t freeDirective) {
472 gras_error_t errcode;
475 /* arg validity checks */
476 gras_assert0(msg,"Trying to send NULL message");
477 gras_assert0(sd, "Trying to send over a NULL socket");
480 fprintf(stderr,"Header to send=ver:'%s' msg:%d size:%d seqCount:%d\n",
481 msg->header->version, msg->header->message,
482 msg->header->dataSize, msg->header->seqCount);
484 /* Send the message */
485 if ((errcode=grasDataSend(sd,
487 headerDescriptor,headerDescriptorCount,1))) {
488 fprintf(stderr,"gras_msg_send: Error '%s' while sending header of message %s to %s:%d\n",
489 gras_error_name(errcode),msg->entry->name,gras_sock_get_peer_name(sd),gras_sock_get_peer_port(sd));
493 if ((errcode=grasDataSend(sd,
494 msg->dataCount,countDescriptor,countDescriptorCount,
495 msg->entry->seqCount))) {
496 fprintf(stderr,"gras_msg_send: Error '%s' while sending sequence counts of message %s to %s\n",
497 gras_error_name(errcode),msg->entry->name,gras_sock_get_peer_name(sd));
501 for (i=0; i<msg->entry->seqCount; i++) {
502 if ((errcode=grasDataSend(sd,
504 (const DataDescriptor*)msg->entry->dd[i],msg->entry->ddCount[i],
505 msg->dataCount[i]))) {
506 fprintf(stderr,"gras_msg_send: Error '%s' while sending sequence %d of message %s to %s\n",
507 gras_error_name(errcode),i,msg->entry->name,gras_sock_get_peer_name(sd));
512 if (freeDirective == free_after_use) gras_msg_free(msg);
516 gras_sock_t *gras_sock_new(void) {
517 return malloc(sizeof(gras_sock_t));
520 void grasSockFree(gras_sock_t *s) {
524 /* **************************************************************************
525 * Creating/Using raw sockets
526 * **************************************************************************/
527 gras_error_t gras_rawsock_server_open(unsigned short startingPort,
528 unsigned short endingPort,
529 unsigned int bufSize, gras_rawsock_t **sock) {
530 struct sockaddr_in sockaddr;
533 if (!(*sock=malloc(sizeof(gras_rawsock_t)))) {
534 fprintf(stderr,"Malloc error\n");
539 for((*sock)->port = startingPort;
540 (*sock)->port <= endingPort;
542 if(((*sock)->sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
543 ERROR0("gras_rawsock_server_open: socket creation failed\n");
548 setsockopt((*sock)->sock, SOL_SOCKET, SO_RCVBUF, (char *)&bufSize, sizeof(bufSize));
549 setsockopt((*sock)->sock, SOL_SOCKET, SO_SNDBUF, (char *)&bufSize, sizeof(bufSize));
551 memset(&sockaddr, 0, sizeof(sockaddr));
552 sockaddr.sin_port = htons((unsigned short)(*sock)->port);
553 sockaddr.sin_addr.s_addr = INADDR_ANY;
554 sockaddr.sin_family = AF_INET;
556 if (bind((*sock)->sock,
557 (struct sockaddr *)&sockaddr, sizeof(sockaddr)) != -1 &&
558 listen((*sock)->sock, 1) != -1) {
561 close((*sock)->sock);
564 if((*sock)->port > endingPort) {
565 fprintf(stderr,"gras_rawsock_server_open: couldn't find a port between %d and %d\n",
566 startingPort, endingPort);
568 return mismatch_error;
575 void Dummy(int sig) {
579 gras_error_t gras_rawsock_client_open(const char *host, short port,
580 unsigned int bufSize, gras_rawsock_t **sock) {
582 IPAddress addresses[10];
584 struct sockaddr_in sockaddr;
586 if (!(*sock=malloc(sizeof(gras_rawsock_t)))) {
587 fprintf(stderr,"Malloc error\n");
592 if (!(addrCount = IPAddressValues(host, addresses, 10))) {
593 fprintf(stderr,"grasOpenClientSocket: address retrieval of '%s' failed\n",host);
597 (*sock)->port = port;
598 memset(&sockaddr, 0, sizeof(sockaddr));
599 sockaddr.sin_port = htons((unsigned short)(*sock)->port);
600 sockaddr.sin_family = AF_INET;
602 for(i = 0; i < addrCount && i<10 ; i++) {
603 if(((*sock)->sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
604 fprintf(stderr,"gras_rawsock_client_open: socket creation failed\n");
609 setsockopt((*sock)->sock, SOL_SOCKET, SO_RCVBUF,
610 (char *)&bufSize, sizeof(bufSize));
611 setsockopt((*sock)->sock, SOL_SOCKET, SO_SNDBUF,
612 (char *)&bufSize, sizeof(bufSize));
614 sockaddr.sin_addr.s_addr = addresses[i];
616 was = signal(SIGALRM,Dummy);
617 SetRealTimer(GetTimeOut(CONN, addresses[i], 0));
618 if(connect((*sock)->sock,
619 (struct sockaddr *)&sockaddr, sizeof(sockaddr)) == 0) {
626 close((*sock)->sock);
633 "grasOpenClientRawSocket: Unable to contact %s:%d\n",
635 return network_error;
641 gras_error_t gras_rawsock_close(gras_rawsock_t *sd) {
643 CloseSocket(&(sd->sock), 0);
649 unsigned short gras_rawsock_get_peer_port(gras_rawsock_t *sd) {
654 /* FIXME: RL ignores the provided timeout and compute an appropriate one */
656 _gras_rawsock_exchange(gras_rawsock_t *sd, int sender, unsigned int timeout,
657 unsigned int expSize, unsigned int msgSize){
660 int bytesThisMessage;
670 struct timeval timeOut;
672 if((expData = (char *)malloc(msgSize)) == NULL) {
673 fprintf(stderr,"gras_rawsock_send: malloc %d failed\n", msgSize);
677 /* let's get information on the caller (needed later on) */
678 name = PeerName_r(sd->sock);
679 IPAddressValue(name, &addr);
682 ltimeout = (int)GetTimeOut((sender ? SEND : RECV), addr, expSize/msgSize +1);
685 SetRealTimer(ltimeout);
689 bytesTotal < expSize;
690 bytesTotal += bytesThisMessage) {
691 for(bytesThisMessage = 0;
692 bytesThisMessage < msgSize;
693 bytesThisMessage += bytesThisCall) {
697 bytesThisCall = send(sd->sock, expData, msgSize - bytesThisMessage, 0);
701 FD_SET(sd->sock,&rd_set);
702 timeOut.tv_sec = ltimeout;
705 * YUK! The timeout can get to be REALLY large if the
706 * amount of data gets big -- fence it at 60 secs
708 if ((ltimeout <= 0) || (ltimeout > 60)) {
711 rv = select(sd->sock+1,&rd_set,NULL,NULL,&timeOut);
713 bytesThisCall = recv(sd->sock, expData, msgSize-bytesThisMessage, 0);
723 gras_rawsock_recv(gras_rawsock_t *sd, unsigned int expSize, unsigned int msgSize,
724 unsigned int timeout) {
725 return _gras_rawsock_exchange(sd,0,timeout,expSize,msgSize);
728 gras_rawsock_send(gras_rawsock_t *sd, unsigned int expSize, unsigned int msgSize){
729 return _gras_rawsock_exchange(sd,1,0,expSize,msgSize);
735 /* **************************************************************************
737 * **************************************************************************/
738 grasProcessData_t *grasProcessDataGet() {
739 return _grasProcessData;
742 /* **************************************************************************
743 * Wrappers over OS functions
744 * **************************************************************************/
749 void gras_sleep(unsigned long sec,unsigned long usec) {
751 if (usec/1000000) sleep(usec/1000000);
752 (void)usleep(usec % 1000000);