3 /* grasMsg - Function related to messaging (code shared between RL and SG) */
5 /* Authors: Martin Quinson */
6 /* Copyright (C) 2003 the OURAGAN project. */
8 /* This program is free software; you can redistribute it and/or modify it
9 under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "gras_private.h"
14 /*@null@*/static gras_msgentry_t *grasMsgList = NULL;
15 static unsigned int grasMsgCount = 0;
18 * Register a new message type to the system
22 gras_msgtype_register(gras_msgid_t message,
26 gras_msgentry_t *entry=grasMsgEntryGet(message);
27 gras_cblist_t *cbl=gras_cb_get(message);
33 // fprintf(stderr,"Register message '%s' under ID %d. Sequence count=%d\n",
34 //name,(int)message,sequence_count);
36 if (entry) { /* Check that it's the same entry re-registered */
37 if (strcmp(name,entry->name)) {
38 fprintf(stderr,"Second registration of message %d with another name. (old=%s,new=%s)\n",
39 (int)message,entry->name,name);
42 if (sequence_count != entry->seqCount) {
44 "Second registration of message %s with another sequence count. (old=%d,new=%d)\n",
45 entry->name,entry->seqCount,sequence_count);
46 return mismatch_error;
49 va_start(ap, sequence_count);
50 for (i=0;i<sequence_count;i++) {
51 dd=va_arg(ap, DataDescriptor*);
52 ddCount=va_arg(ap, size_t);
53 if (ddCount != entry->ddCount[i]) {
55 "Different re-registration of message %s: DataDescriptor count is different in sequence %d (is %d, was %d)\n",
56 entry->name, i, ddCount, entry->ddCount[i]);
59 if (gras_datadesc_cmp(dd,ddCount, entry->dd[i],ddCount)) {
61 "Different re-registration of message %s: DataDescriptor of sequence %d is different\n",
68 } else { /* build a new entry */
70 grasMsgList = (gras_msgentry_t *)realloc(grasMsgList,sizeof(gras_msgentry_t)*grasMsgCount);
72 grasMsgList = (gras_msgentry_t *)malloc(sizeof(gras_msgentry_t)*grasMsgCount);
75 fprintf(stderr, "PANIC: memory allocation of %d bytes in gras_msgtype_register() failed (Message table LOST).\n",
76 sizeof(gras_msgentry_t)*grasMsgCount);
80 entry = &(grasMsgList[grasMsgCount-1]);
83 if (!(entry->name = strdup(name))) {
84 fprintf(stderr, "gras_msgtype_register: memory allocation failed.\n");
88 entry->seqCount = sequence_count;
90 if (!(entry->dd = (DataDescriptor**)malloc(sizeof(DataDescriptor*)*sequence_count))) {
91 fprintf(stderr, "gras_msgtype_register: memory allocation of %d bytes failed.\n",
92 sizeof(DataDescriptor*)*sequence_count);
97 if (!(entry->ddCount = (size_t*)malloc(sizeof(size_t)*sequence_count))) {
98 fprintf(stderr, "gras_msgtype_register: memory allocation of %d bytes failed.\n",
99 sizeof(size_t)*sequence_count);
109 va_start(ap, sequence_count);
110 for (i=0;i<sequence_count;i++) {
111 dd=va_arg(ap, DataDescriptor*);
112 ddCount=va_arg(ap, size_t);
114 entry->ddCount[i]=ddCount;
116 if (!(entry->dd[i] = (DataDescriptor*)malloc(sizeof(DataDescriptor)*ddCount))) {
117 fprintf(stderr, "gras_msgtype_register: memory allocation of %d bytes failed.\n",
118 sizeof(DataDescriptor)*ddCount);
119 for (i--;i>=0;i--) free(entry->dd[i]);
120 free(entry->ddCount);
129 memcpy(entry->dd[i],dd,sizeof(DataDescriptor)*ddCount);
134 fprintf(stderr,"Warning, message type %s registered twice on this host\n",
137 return gras_cb_create(entry->id);
144 * Retrieve the entry associated with a message id
147 grasMsgEntryGet(gras_msgid_t id) {
150 for (i=0 ; i<grasMsgCount && grasMsgList[i].id != id ; i++);
151 return i==grasMsgCount ? NULL : &grasMsgList[i];
155 * Create the appropriate header
157 gras_msgheader_t *grasMsgHeaderNew(gras_msgid_t msgId,
158 unsigned int dataSize,
159 unsigned int seqCount) {
160 gras_msgheader_t *res;
161 if (!(res=(gras_msgheader_t*)malloc(sizeof(gras_msgheader_t)))) {
162 fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",sizeof(gras_msgheader_t));
165 memset(res->version,0,sizeof(res->version));
166 strcpy(res->version,GRASVERSION);
167 res->message = msgId;
168 res->dataSize = dataSize;
169 res->seqCount = seqCount;
174 gras_msg_t *gras_msg_new_va(gras_msgid_t msgId,
175 e_gras_free_directive_t free_data,
180 unsigned int networkSize=0;
182 /* malloc the needed room, and sanity check */
183 if (!(res=(gras_msg_t*)malloc(sizeof(gras_msg_t)))) {
184 fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",sizeof(gras_msg_t));
187 res->freeDirective=free_data;
189 if (!(res->entry=grasMsgEntryGet(msgId))) {
190 fprintf(stderr,"gras_msg_new(): unknown msg id %d\n",msgId);
194 if (res->entry->seqCount != seqCount) {
195 fprintf(stderr,"Damnit: you passed %d sequences to build a %s msg, where %d were expected\n",
196 seqCount,res->entry->name,res->entry->seqCount);
201 if (!(res->dataCount=(unsigned int*)malloc(sizeof(unsigned int)*seqCount))) {
202 fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",
203 (sizeof(unsigned int)*seqCount));
207 if (!(res->data=(void**)malloc(sizeof(void*)*seqCount))) {
208 fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",
209 (sizeof(void*)*seqCount));
210 free(res->dataCount);
215 res->dataCount = NULL;
219 /* populate the message */
220 networkSize += DataSize(headerDescriptor,headerDescriptorCount,NETWORK_FORMAT);
221 networkSize += DataSize(countDescriptor,countDescriptorCount,NETWORK_FORMAT) * seqCount;
223 for (i=0; i<seqCount; i++) {
224 res->data[i]=va_arg(ap, void*);
225 res->dataCount[i]=va_arg(ap, int);
226 if (res->dataCount[i] > 1000) {
227 fprintf(stderr,"GRAS WARNING: datacount>1000 in a message. You may want to check the arguments passed to gras_msg_new().\n");
229 if (res->dataCount[i] < 0) {
230 fprintf(stderr,"GRAS ERROR: datacount<0 in a message. Check the arguments passed to gras_msg_new().\n");
231 free(res->dataCount);
237 networkSize += res->dataCount[i] *
238 DataSize(res->entry->dd[i],res->entry->ddCount[i],NETWORK_FORMAT);
241 /* finish filling the fields */
242 if (!(res->header=grasMsgHeaderNew(msgId,networkSize,seqCount))) {
244 free(res->dataCount);
252 gras_msg_t *gras_msg_new(gras_msgid_t msgId,
253 e_gras_free_directive_t free_data,
259 va_start(ap, seqCount);
260 res=gras_msg_new_va(msgId,free_data,seqCount,ap);
267 gras_msg_new_and_send(gras_sock_t *sd,
275 va_start(ap, seqCount);
276 msg=gras_msg_new_va(msgId,free_after_use,seqCount,ap);
278 if (!msg) return unknown_error;
280 return gras_msg_send(sd,msg,free_after_use);
284 gras_msg_t *gras_msg_copy(gras_msg_t *msg) {
288 fprintf(stderr,"gras_msg_copy: \n");
290 /* malloc the needed room, and sanity check */
291 if (!(res=(gras_msg_t*)malloc(sizeof(gras_msg_t)))) {
292 fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",sizeof(gras_msg_t));
295 res->freeDirective=free_after_use;
296 res->entry=msg->entry;
298 if (!(res->dataCount=(unsigned int*)malloc(sizeof(unsigned int)*res->entry->seqCount))) {
299 fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",
300 (sizeof(unsigned int)*res->entry->seqCount));
304 if (!(res->data=(void**)malloc(sizeof(void*)*res->entry->seqCount))) {
305 fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",
306 (sizeof(void*)*res->entry->seqCount));
307 free(res->dataCount);
312 /* populate the message */
313 for (i=0; i<res->entry->seqCount; i++) {
314 res->data[i]= gras_datadesc_copy_data(msg->entry->dd[i],msg->entry->ddCount[i],res->data[i]);
315 res->dataCount[i]=msg->dataCount[i];
318 /* finish filling the fields */
319 if (!(res->header=grasMsgHeaderNew(msg->header->message,
320 msg->header->dataSize,
321 msg->header->seqCount))) {
323 free(res->dataCount);
333 void gras_msg_free(gras_msg_t *msg) {
337 if (msg->freeDirective == free_after_use)
338 for (i=0; i<msg->entry->seqCount; i++)
340 gras_sock_close(msg->sock);
342 // data isn't copied by MsgNew
344 free (msg->dataCount);
348 gras_error_t gras_msg_handle(double timeOut) {
349 grasProcessData_t *pd=grasProcessDataGet();
351 gras_error_t errcode;
355 if (pd->grasMsgQueueLen) {
356 /* handle queued message */
358 msg = pd->grasMsgQueue[0];
359 memmove(pd->grasMsgQueue[0],pd->grasMsgQueue[1],(pd->grasMsgQueueLen-1)*sizeof(gras_msg_t));
360 pd->grasMsgQueueLen--;
361 if (pd->grasMsgQueueLen == 0) {
362 /* size reached 0. Free the queue so that the next enlargement (with malloc) don't leak*/
363 free(pd->grasMsgQueue);
364 /* if size!=0 don't loose the time to realloc to only gain 4 bytes */
366 fprintf(stderr,"%s:%d: gras_msg_handle: The message was queued\n",__FILE__,__LINE__);
369 /* receive a message from the net */
370 if ((errcode=grasMsgRecv(&msg,timeOut))) {
371 if (errcode == timeout_error) {
374 fprintf(stderr,"gras_msg_handle: error '%s' while receiving\n",gras_error_name(errcode));
381 fprintf(stderr,"GRAS: Handle an incomming message '%s' (datasize=%d, sd=%p)\n",
382 msg->entry->name,msg->header->dataSize,msg->sock);
385 if (!(cbl=gras_cb_get(msg->entry->id))) {
386 fprintf(stderr,"Message %s is not registered on this host.\n",
389 return mismatch_error;
392 for (i = cbl->cbCount - 1; i>=0 ; i--) {
393 if ((*(cbl->cb[i]))(msg)) {
394 // if (cbl->cbTTL[i] > 0 && (!--(cbl->cbTTL[i]))) {
395 //fprintf(stderr,"GRAS FIXME: Remove the callback from the queue after use if needed.\n");
403 "No callback of msg type %s accepts this message. Discarding it\n",
406 return mismatch_error;
412 gras_msg_wait(double timeOut,
414 gras_msg_t **message) {
416 gras_error_t errcode;
418 gras_msgentry_t *entry=grasMsgEntryGet(id);
419 grasProcessData_t *pd=grasProcessDataGet();
422 fprintf(stderr,"gras_msg_wait: message id %d is not registered\n",id);
423 return mismatch_error;
427 start=now=gras_time();
429 for (i=0;i<pd->grasMsgQueueLen;i++) {
430 if (pd->grasMsgQueue[i]->header->message == id) {
431 *message = pd->grasMsgQueue[i];
432 memmove(pd->grasMsgQueue[i],pd->grasMsgQueue[i+1],(pd->grasMsgQueueLen-i-1)*sizeof(gras_msg_t));
433 pd->grasMsgQueueLen--;
434 if (pd->grasMsgQueueLen == 0) {
435 /* size reached 0. Free the queue so that the next enlargement (with malloc) don't leak*/
436 free(pd->grasMsgQueue);
437 /* if size!=0 don't loose the time to realloc to only gain 4 bytes */
439 fprintf(stderr,"%s:%d: gras_msg_wait: The message was queued\n",__FILE__,__LINE__);
445 if ((errcode=grasMsgRecv(message,timeOut))) {
446 if (errcode != timeout_error)
447 fprintf(stderr,"gras_msg_wait: error '%s' while receiving\n",gras_error_name(errcode));
451 if ((*message)->header->message != id) {
452 fprintf(stderr,"gras_msg_wait: Got message %s while waiting for message %s. Queue it.\n",
453 (*message)->entry->name,entry->name);
454 if (pd->grasMsgQueueLen++) {
455 pd->grasMsgQueue = (gras_msg_t **)realloc(pd->grasMsgQueue,
456 sizeof(gras_msg_t)*pd->grasMsgQueueLen);
458 pd->grasMsgQueue = (gras_msg_t **)malloc(sizeof(gras_msg_t)*pd->grasMsgQueueLen);
460 if (!pd->grasMsgQueue) {
461 fprintf(stderr, "PANIC: memory allocation of %d bytes in gras_msg_wait() failed (Queued messages are LOST).\n",
462 sizeof(gras_msg_t)*pd->grasMsgQueueLen);
463 pd->grasMsgQueueLen=0;
466 pd->grasMsgQueue[pd->grasMsgQueueLen - 1] = *message;
469 // fprintf(stderr,"Waited for %s successfully\n",(*message)->entry->name);
473 if (now - start + 0.001 < timeOut)
474 return timeout_error;
480 gras_cb_register(gras_msgid_t message,
484 gras_cblist_t *cbl=gras_cb_get(message);
487 fprintf(stderr,"Try to register a callback for an unregistered message id %d\n",message);
490 if (cbl->cbCount++) {
491 cbl->cb = (gras_cb_t *)realloc(cbl->cb,
492 sizeof(gras_cb_t)*cbl->cbCount);
493 cbl->cbTTL = (int *)realloc(cbl->cbTTL, sizeof(int)*cbl->cbCount);
495 cbl->cb = (gras_cb_t *)malloc(sizeof(gras_cb_t)*cbl->cbCount);
496 cbl->cbTTL = (int *)malloc( sizeof(int)*cbl->cbCount);
498 if (!cbl->cb || !cbl->cbTTL) {
499 fprintf(stderr,"gras_cb_register(): Malloc error (All callbacks for msg %d lost)\n",
502 cbl->cbTTL=NULL; /* Yes, leaking here, but we're dead anyway */
506 cbl->cb [ cbl->cbCount-1 ]=cb;
507 cbl->cbTTL[ cbl->cbCount-1 ]=TTL;