Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
eafe695971f346d4a3d3be1a02c235b0f29b5121
[simgrid.git] / src / gras / Common / gras_msg.c
1 /* $Id$ */
2
3 /* grasMsg - Function related to messaging (code shared between RL and SG)  */
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2003 the OURAGAN project.                                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "gras_private.h"
12 #include <string.h>
13
14 /*@null@*/static gras_msgentry_t *grasMsgList = NULL;
15 static unsigned int grasMsgCount = 0;
16
17 /**
18  * Register a new message type to the system
19  */
20
21 gras_error_t
22 gras_msgtype_register(gras_msgid_t message,
23             const char *name,
24             int sequence_count,
25             ...) {
26   gras_msgentry_t *entry=grasMsgEntryGet(message);
27   gras_cblist_t *cbl=gras_cb_get(message);
28   int i;
29   DataDescriptor *dd;
30   size_t ddCount;
31   va_list ap;
32
33   //  fprintf(stderr,"Register message '%s' under ID %d. Sequence count=%d\n",
34   //name,(int)message,sequence_count);
35
36   if (entry) { /* Check that it's the same entry re-registered */
37     if (strcmp(name,entry->name)) {
38       fprintf(stderr,"Second registration of message %d with another name. (old=%s,new=%s)\n",
39               (int)message,entry->name,name);
40       return malloc_error;
41     }
42     if (sequence_count != entry->seqCount) {
43       fprintf(stderr,
44               "Second registration of message %s with another sequence count. (old=%d,new=%d)\n",
45               entry->name,entry->seqCount,sequence_count);
46       return mismatch_error;
47     }
48
49     va_start(ap, sequence_count);
50     for (i=0;i<sequence_count;i++) {
51       dd=va_arg(ap, DataDescriptor*);
52       ddCount=va_arg(ap, size_t);
53       if (ddCount != entry->ddCount[i]) {
54         fprintf(stderr,
55                 "Different re-registration of message %s: DataDescriptor count is different in sequence %d (is %d, was %d)\n",
56                 entry->name, i, ddCount, entry->ddCount[i]);
57         return sanity_error;
58       }
59       if (gras_datadesc_cmp(dd,ddCount, entry->dd[i],ddCount)) {
60         fprintf(stderr,
61                 "Different re-registration of message %s: DataDescriptor of sequence %d is different\n",
62                 entry->name, i);
63         return sanity_error;
64       }
65     }
66     va_end(ap);
67
68   } else { /* build a new entry */
69     if (grasMsgCount++) {
70       grasMsgList = (gras_msgentry_t *)realloc(grasMsgList,sizeof(gras_msgentry_t)*grasMsgCount);
71     } else {
72       grasMsgList = (gras_msgentry_t *)malloc(sizeof(gras_msgentry_t)*grasMsgCount);
73     }
74     if (!grasMsgList) {
75       fprintf(stderr, "PANIC: memory allocation of %d bytes in gras_msgtype_register() failed (Message table LOST).\n",
76               sizeof(gras_msgentry_t)*grasMsgCount);
77       grasMsgCount=0;
78       return malloc_error;
79     }
80     entry = &(grasMsgList[grasMsgCount-1]);
81
82     entry->id = message;
83     if (!(entry->name = strdup(name))) {
84       fprintf(stderr, "gras_msgtype_register: memory allocation failed.\n");
85       grasMsgCount--;
86       return malloc_error;
87     }
88     entry->seqCount = sequence_count;
89     if (sequence_count) {
90       if (!(entry->dd = (DataDescriptor**)malloc(sizeof(DataDescriptor*)*sequence_count))) {
91         fprintf(stderr, "gras_msgtype_register: memory allocation of %d bytes failed.\n",
92                 sizeof(DataDescriptor*)*sequence_count);
93         free(entry->name);
94         grasMsgCount--;
95         return malloc_error;
96       }
97       if (!(entry->ddCount = (size_t*)malloc(sizeof(size_t)*sequence_count))) {
98         fprintf(stderr, "gras_msgtype_register: memory allocation of %d bytes failed.\n",
99                 sizeof(size_t)*sequence_count);
100         free(entry->dd);
101         free(entry->name);
102         grasMsgCount--;
103         return malloc_error;
104       }
105     } else {
106       entry->dd=NULL;
107       entry->ddCount=NULL;
108     }
109     va_start(ap, sequence_count);
110     for (i=0;i<sequence_count;i++) {
111       dd=va_arg(ap, DataDescriptor*);
112       ddCount=va_arg(ap, size_t);
113
114       entry->ddCount[i]=ddCount;
115       if (ddCount) {
116         if (!(entry->dd[i] = (DataDescriptor*)malloc(sizeof(DataDescriptor)*ddCount))) {
117           fprintf(stderr, "gras_msgtype_register: memory allocation of %d bytes failed.\n",
118                   sizeof(DataDescriptor)*ddCount);
119           for (i--;i>=0;i--) free(entry->dd[i]);
120           free(entry->ddCount);
121           free(entry->dd);
122           free(entry->name);
123           grasMsgCount--;
124           return malloc_error;
125         }
126       } else {
127         entry->dd[i]=NULL;
128       }
129       memcpy(entry->dd[i],dd,sizeof(DataDescriptor)*ddCount);
130     }
131     va_end(ap);
132   }
133   if (cbl) {
134     fprintf(stderr,"Warning, message type %s registered twice on this host\n",
135             entry->name);
136   } else {
137     return gras_cb_create(entry->id);
138   }
139
140   return no_error;
141 }
142
143 /*
144  * Retrieve the entry associated with a message id
145  */
146 gras_msgentry_t *
147 grasMsgEntryGet(gras_msgid_t id) {
148   int i;
149
150   for (i=0 ; i<grasMsgCount && grasMsgList[i].id != id ; i++);
151   return i==grasMsgCount ? NULL : &grasMsgList[i];
152 }
153
154 /*
155  * Create the appropriate header
156  */
157 gras_msgheader_t *grasMsgHeaderNew(gras_msgid_t msgId, 
158                                   unsigned int dataSize,
159                                   unsigned int seqCount) {
160   gras_msgheader_t *res;
161   if (!(res=(gras_msgheader_t*)malloc(sizeof(gras_msgheader_t)))) {
162     fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",sizeof(gras_msgheader_t));
163     return NULL;
164   }    
165   memset(res->version,0,sizeof(res->version));
166   strcpy(res->version,GRASVERSION);
167   res->message = msgId;
168   res->dataSize = dataSize;
169   res->seqCount = seqCount;
170
171   return res;
172 }
173
174 gras_msg_t *gras_msg_new_va(gras_msgid_t msgId,
175                         e_gras_free_directive_t free_data,
176                         int seqCount,
177                         va_list ap) {
178   gras_msg_t *res;
179   int i;
180   unsigned int networkSize=0;
181    
182   /* malloc the needed room, and sanity check */
183   if (!(res=(gras_msg_t*)malloc(sizeof(gras_msg_t)))) {
184     fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",sizeof(gras_msg_t));
185     return NULL;
186   }
187   res->freeDirective=free_data;
188
189   if (!(res->entry=grasMsgEntryGet(msgId))) {
190     fprintf(stderr,"gras_msg_new(): unknown msg id %d\n",msgId);
191     free(res);
192     return NULL;
193   }
194   if (res->entry->seqCount != seqCount) {
195     fprintf(stderr,"Damnit: you passed %d sequences to build a %s msg, where %d were expected\n",
196             seqCount,res->entry->name,res->entry->seqCount);
197     free(res);
198     return NULL;
199   }
200   if (seqCount) {
201     if (!(res->dataCount=(unsigned int*)malloc(sizeof(unsigned int)*seqCount))) {
202       fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",
203               (sizeof(unsigned int)*seqCount));
204       free(res);
205       return NULL;
206     }
207     if (!(res->data=(void**)malloc(sizeof(void*)*seqCount))) {
208       fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",
209               (sizeof(void*)*seqCount));
210       free(res->dataCount);
211       free(res);
212       return NULL;
213     }
214   } else {
215     res->dataCount = NULL;
216     res->data = NULL;
217   }
218   
219   /* populate the message */
220   networkSize += DataSize(headerDescriptor,headerDescriptorCount,NETWORK_FORMAT);
221   networkSize += DataSize(countDescriptor,countDescriptorCount,NETWORK_FORMAT) * seqCount;
222
223   for (i=0; i<seqCount; i++) {
224     res->data[i]=va_arg(ap, void*);
225     res->dataCount[i]=va_arg(ap, int);
226     if (res->dataCount[i] > 1000) {
227       fprintf(stderr,"GRAS WARNING: datacount>1000 in a message. You may want to check the arguments passed to gras_msg_new().\n");
228     }
229     if (res->dataCount[i] < 0) {
230       fprintf(stderr,"GRAS ERROR: datacount<0 in a message. Check the arguments passed to gras_msg_new().\n");
231       free(res->dataCount);
232       free(res->data);
233       free(res);
234       return NULL;
235     }
236
237     networkSize += res->dataCount[i] * 
238       DataSize(res->entry->dd[i],res->entry->ddCount[i],NETWORK_FORMAT);
239   }
240
241   /* finish filling the fields */
242   if (!(res->header=grasMsgHeaderNew(msgId,networkSize,seqCount))) {
243     free(res->data);
244     free(res->dataCount);
245     free(res);
246     return NULL;
247   }
248   res->sock=NULL;
249   return res;
250 }
251
252 gras_msg_t *gras_msg_new(gras_msgid_t msgId,
253                       e_gras_free_directive_t free_data,
254                       int seqCount,
255                       ...) {
256   gras_msg_t *res;
257   va_list ap;
258
259   va_start(ap, seqCount);
260   res=gras_msg_new_va(msgId,free_data,seqCount,ap);
261   va_end(ap);
262
263   return res;
264 }
265
266 gras_error_t
267 gras_msg_new_and_send(gras_sock_t *sd,
268                gras_msgid_t msgId,
269                int seqCount,
270                ...) {
271
272   gras_msg_t *msg;
273   va_list ap;
274
275   va_start(ap, seqCount);
276   msg=gras_msg_new_va(msgId,free_after_use,seqCount,ap);
277   va_end(ap);
278   if (!msg) return unknown_error;
279   
280   return gras_msg_send(sd,msg,free_after_use);
281 }
282
283
284 gras_msg_t *gras_msg_copy(gras_msg_t *msg) {
285   gras_msg_t *res;
286   int i;
287
288   fprintf(stderr,"gras_msg_copy: \n");
289
290   /* malloc the needed room, and sanity check */
291   if (!(res=(gras_msg_t*)malloc(sizeof(gras_msg_t)))) {
292     fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",sizeof(gras_msg_t));
293     return NULL;
294   }
295   res->freeDirective=free_after_use;
296   res->entry=msg->entry;
297
298   if (!(res->dataCount=(unsigned int*)malloc(sizeof(unsigned int)*res->entry->seqCount))) {
299     fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",
300             (sizeof(unsigned int)*res->entry->seqCount));
301     free(res);
302     return NULL;
303   }
304   if (!(res->data=(void**)malloc(sizeof(void*)*res->entry->seqCount))) {
305     fprintf(stderr,"gras_msg_new(): Malloc error (wanted %d bytes)\n",
306             (sizeof(void*)*res->entry->seqCount));
307     free(res->dataCount);
308     free(res);
309     return NULL;
310   }
311   
312   /* populate the message */
313   for (i=0; i<res->entry->seqCount; i++) {
314     res->data[i]= gras_datadesc_copy_data(msg->entry->dd[i],msg->entry->ddCount[i],res->data[i]);
315     res->dataCount[i]=msg->dataCount[i];
316   }
317   
318   /* finish filling the fields */
319   if (!(res->header=grasMsgHeaderNew(msg->header->message,
320                                      msg->header->dataSize,
321                                      msg->header->seqCount))) {
322     free(res->data);
323     free(res->dataCount);
324     free(res);
325     return NULL;
326   }
327   res->sock=msg->sock;
328
329   return res;
330 }
331
332
333 void gras_msg_free(gras_msg_t *msg) {
334   int i;
335
336   if (!msg) return;
337   if (msg->freeDirective == free_after_use)
338     for (i=0; i<msg->entry->seqCount; i++) 
339       free(msg->data[i]);
340   gras_sock_close(msg->sock);
341   free(msg->header);
342   // data isn't copied by MsgNew
343   free (msg->data);
344   free (msg->dataCount);
345   free (msg);
346 }
347
348 gras_error_t gras_msg_handle(double timeOut) {
349   grasProcessData_t *pd=grasProcessDataGet();
350   int i;
351   gras_error_t errcode;
352   gras_msg_t *msg;
353   gras_cblist_t *cbl;
354
355   if (pd->grasMsgQueueLen) {
356     /* handle queued message */
357
358     msg = pd->grasMsgQueue[0];
359     memmove(pd->grasMsgQueue[0],pd->grasMsgQueue[1],(pd->grasMsgQueueLen-1)*sizeof(gras_msg_t));
360     pd->grasMsgQueueLen--;
361     if (pd->grasMsgQueueLen == 0) {
362       /* size reached 0. Free the queue so that the next enlargement (with malloc) don't leak*/
363       free(pd->grasMsgQueue);
364       /* if size!=0 don't loose the time to realloc to only gain 4 bytes */
365     }
366     fprintf(stderr,"%s:%d: gras_msg_handle: The message was queued\n",__FILE__,__LINE__);
367     return no_error;
368   } else {
369     /* receive a message from the net */
370     if ((errcode=grasMsgRecv(&msg,timeOut))) {
371       if (errcode == timeout_error) {
372         return no_error;
373       } else {
374         fprintf(stderr,"gras_msg_handle: error '%s' while receiving\n",gras_error_name(errcode));
375         return errcode;
376       }
377     }
378   }
379   
380   /*
381   fprintf(stderr,"GRAS: Handle an incomming message '%s' (datasize=%d, sd=%p)\n",
382           msg->entry->name,msg->header->dataSize,msg->sock);
383   */  
384
385   if (!(cbl=gras_cb_get(msg->entry->id))) {
386     fprintf(stderr,"Message %s is not registered on this host.\n",
387             msg->entry->name);
388     gras_msg_free(msg);
389     return mismatch_error;
390   }
391     
392   for (i = cbl->cbCount - 1; i>=0 ; i--) {
393     if ((*(cbl->cb[i]))(msg)) {
394       //      if (cbl->cbTTL[i] > 0 && (!--(cbl->cbTTL[i]))) {
395       //fprintf(stderr,"GRAS FIXME: Remove the callback from the queue after use if needed.\n");
396       //}
397       break;
398     }
399   }
400
401   if (i<0) {
402     fprintf(stderr,
403             "No callback of msg type %s accepts this message. Discarding it\n",
404             msg->entry->name);
405     gras_msg_free(msg);
406     return mismatch_error;
407   } 
408   return no_error;
409 }
410
411 gras_error_t 
412 gras_msg_wait(double timeOut,
413             gras_msgid_t id,
414             gras_msg_t **message) {
415   int i;
416   gras_error_t errcode;
417   double start,now;
418   gras_msgentry_t *entry=grasMsgEntryGet(id);
419   grasProcessData_t *pd=grasProcessDataGet();
420
421   if (!entry) {
422     fprintf(stderr,"gras_msg_wait: message id %d is not registered\n",id);
423     return mismatch_error;
424   }
425     
426   *message = NULL;
427   start=now=gras_time();
428   
429   for (i=0;i<pd->grasMsgQueueLen;i++) {
430     if (pd->grasMsgQueue[i]->header->message == id) {
431       *message = pd->grasMsgQueue[i];
432       memmove(pd->grasMsgQueue[i],pd->grasMsgQueue[i+1],(pd->grasMsgQueueLen-i-1)*sizeof(gras_msg_t));
433       pd->grasMsgQueueLen--;
434       if (pd->grasMsgQueueLen == 0) {
435         /* size reached 0. Free the queue so that the next enlargement (with malloc) don't leak*/
436         free(pd->grasMsgQueue);
437         /* if size!=0 don't loose the time to realloc to only gain 4 bytes */
438       }
439       fprintf(stderr,"%s:%d: gras_msg_wait: The message was queued\n",__FILE__,__LINE__);
440       return no_error;
441     }
442   }
443
444   while (1) {    
445     if ((errcode=grasMsgRecv(message,timeOut))) {
446       if (errcode != timeout_error)
447         fprintf(stderr,"gras_msg_wait: error '%s' while receiving\n",gras_error_name(errcode));
448       return errcode;
449     }
450     
451     if ((*message)->header->message != id) {
452       fprintf(stderr,"gras_msg_wait: Got message %s while waiting for message %s. Queue it.\n",
453               (*message)->entry->name,entry->name);
454       if (pd->grasMsgQueueLen++) {
455         pd->grasMsgQueue = (gras_msg_t **)realloc(pd->grasMsgQueue,
456                                                  sizeof(gras_msg_t)*pd->grasMsgQueueLen);
457       } else {
458         pd->grasMsgQueue = (gras_msg_t **)malloc(sizeof(gras_msg_t)*pd->grasMsgQueueLen);
459       }
460       if (!pd->grasMsgQueue) {
461         fprintf(stderr, "PANIC: memory allocation of %d bytes in gras_msg_wait() failed (Queued messages are LOST).\n",
462                 sizeof(gras_msg_t)*pd->grasMsgQueueLen);
463         pd->grasMsgQueueLen=0;
464         return malloc_error;
465       }
466       pd->grasMsgQueue[pd->grasMsgQueueLen - 1] = *message;
467       *message=NULL;
468     } else {
469       //      fprintf(stderr,"Waited for %s successfully\n",(*message)->entry->name);
470       return no_error;
471     }
472     now=gras_time();
473     if (now - start + 0.001 < timeOut)
474       return timeout_error;
475   }
476 }
477
478
479 gras_error_t
480 gras_cb_register(gras_msgid_t message,
481                      int TTL,
482                      gras_cb_t cb) {
483
484   gras_cblist_t *cbl=gras_cb_get(message);
485
486   if (!cbl) {
487     fprintf(stderr,"Try to register a callback for an unregistered message id %d\n",message);
488     return sanity_error;
489   }
490   if (cbl->cbCount++) {
491     cbl->cb = (gras_cb_t *)realloc(cbl->cb,
492                                                 sizeof(gras_cb_t)*cbl->cbCount);
493     cbl->cbTTL = (int *)realloc(cbl->cbTTL, sizeof(int)*cbl->cbCount);
494   } else {
495     cbl->cb = (gras_cb_t *)malloc(sizeof(gras_cb_t)*cbl->cbCount);
496     cbl->cbTTL = (int *)malloc( sizeof(int)*cbl->cbCount);
497   }
498   if (!cbl->cb || !cbl->cbTTL) {
499     fprintf(stderr,"gras_cb_register(): Malloc error (All callbacks for msg %d lost)\n",
500             message);
501     cbl->cb=NULL;
502     cbl->cbTTL=NULL; /* Yes, leaking here, but we're dead anyway */
503     cbl->cbCount=0;
504     return malloc_error;
505   }
506   cbl->cb   [ cbl->cbCount-1 ]=cb;
507   cbl->cbTTL[ cbl->cbCount-1 ]=TTL;
508
509   return no_error;
510 }