Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Integrate Bruno's work on SIMIX onto main stream. Tests are broken, but it looks...
[simgrid.git] / src / gras_simix / Msg / gras_simix_msg.c
1 /* $Id$ */
2
3 /* messaging - Function related to messaging (code shared between RL and SG)*/
4
5 /* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved.            */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "xbt/ex_interface.h"
12 #include "gras_simix/Msg/gras_simix_msg_private.h"
13 #include "gras_simix/Virtu/gras_simix_virtu_interface.h"
14 #include "gras_simix/DataDesc/gras_simix_datadesc_interface.h"
15 #include "gras_simix/Transport/gras_simix_transport_interface.h" /* gras_select */
16 #include "portable.h" /* execinfo when available to propagate exceptions */
17
18 #ifndef MIN
19 #define MIN(a,b) ((a) < (b) ? (a) : (b))
20 #endif
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
23
24 xbt_set_t _gras_msgtype_set = NULL;
25 static char *make_namev(const char *name, short int ver);
26 char _GRAS_header[6];
27 const char *e_gras_msg_kind_names[e_gras_msg_kind_count]=
28   {"UNKNOWN","ONEWAY","RPC call","RPC answer","RPC error"};
29
30 /*
31  * Creating procdata for this module
32  */
33 static void *gras_msg_procdata_new() {
34    gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1);
35    
36    res->name = xbt_strdup("gras_msg");
37    res->name_len = 0;
38    res->msg_queue     = xbt_dynar_new(sizeof(s_gras_msg_t),   NULL);
39    res->msg_waitqueue = xbt_dynar_new(sizeof(s_gras_msg_t),   NULL);
40    res->cbl_list      = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
41    res->timers        = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
42    res->msg_to_receive_queue = xbt_fifo_new();
43    
44    return (void*)res;
45 }
46
47 /*
48  * Freeing procdata for this module
49  */
50 static void gras_msg_procdata_free(void *data) {
51    gras_msg_procdata_t res = (gras_msg_procdata_t)data;
52    
53    xbt_dynar_free(&( res->msg_queue ));
54    xbt_dynar_free(&( res->msg_waitqueue ));
55    xbt_dynar_free(&( res->cbl_list ));
56    xbt_dynar_free(&( res->timers ));
57    xbt_fifo_free( res->msg_to_receive_queue );
58
59    free(res->name);
60    free(res);
61 }
62
63 /*
64  * Module registration
65  */
66 int gras_msg_libdata_id;
67 void gras_msg_register() {
68    gras_msg_libdata_id = gras_procdata_add("gras_msg",gras_msg_procdata_new, gras_msg_procdata_free);
69 }
70
71 /*
72  * Initialize this submodule.
73  */
74 void gras_msg_init(void) {
75   /* only initialize once */
76   if (_gras_msgtype_set != NULL)
77     return;
78
79   VERB0("Initializing Msg");
80   
81   _gras_msgtype_set = xbt_set_new();
82
83   memcpy(_GRAS_header,"GRAS", 4);
84   _GRAS_header[4]=GRAS_PROTOCOL_VERSION;
85   _GRAS_header[5]=(char)GRAS_THISARCH;
86    
87   gras_msg_ctx_mallocator = 
88      xbt_mallocator_new(1000,
89                         gras_msg_ctx_mallocator_new_f,
90                         gras_msg_ctx_mallocator_free_f,
91                         gras_msg_ctx_mallocator_reset_f);
92 }
93
94 /*
95  * Finalize the msg module
96  */
97 void
98 gras_msg_exit(void) {
99   VERB0("Exiting Msg");
100   xbt_set_free(&_gras_msgtype_set);
101
102   xbt_mallocator_free(gras_msg_ctx_mallocator);
103 }
104
105 /*
106  * Reclamed memory
107  */
108 void gras_msgtype_free(void *t) {
109   gras_msgtype_t msgtype=(gras_msgtype_t)t;
110   if (msgtype) {
111     free(msgtype->name);
112     free(msgtype);
113   }
114 }
115 /**
116  * Dump all declared message types (debugging purpose)
117  */
118 void gras_msgtype_dumpall(void) {   
119   xbt_set_cursor_t cursor;
120   gras_msgtype_t msgtype=NULL;
121    
122   INFO0("Dump of all registered messages:");
123   xbt_set_foreach(_gras_msgtype_set, cursor, msgtype) {
124     INFO6("  Message name: %s (v%d) %s; %s%s%s", 
125           msgtype->name, msgtype->version, e_gras_msg_kind_names[msgtype->kind],
126           gras_datadesc_get_name(msgtype->ctn_type),
127           (msgtype->kind==e_gras_msg_kind_rpccall ? " -> ":""),
128           (msgtype->kind==e_gras_msg_kind_rpccall ? gras_datadesc_get_name(msgtype->answer_type) : ""));
129   }   
130 }
131
132
133 /**
134  * make_namev:
135  *
136  * Returns the versionned name of the message. If the version is 0, that's 
137  * the name unchanged. Pay attention to this before free'ing the result.
138  */
139 static char *make_namev(const char *name, short int ver) {
140   char *namev;
141
142   if (!ver)
143     return (char *)name;
144
145   namev = (char*)xbt_malloc(strlen(name)+2+3+1);
146
147   if (namev)
148       sprintf(namev,"%s_v%d",name,ver);
149
150   return namev;
151 }
152
153 /* Internal function doing the crude work of registering messages */
154 void 
155 gras_msgtype_declare_ext(const char           *name,
156                          short int             version,
157                          e_gras_msg_kind_t     kind, 
158                          gras_datadesc_type_t  payload_request,
159                          gras_datadesc_type_t  payload_answer) {
160
161   gras_msgtype_t msgtype=NULL;
162   char *namev=make_namev(name,version);
163   volatile int found = 0;
164   xbt_ex_t e;    
165   
166   TRY {
167     msgtype = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set,namev);
168     found = 1;
169   } CATCH(e) {
170     if (e.category != not_found_error)
171       RETHROW;
172     xbt_ex_free(e);
173   }
174
175   if (found) {
176     VERB2("Re-register version %d of message '%s' (same kind & payload, ignored).",
177           version, name);
178     xbt_assert3(msgtype->kind == kind,
179                 "Message %s re-registered as a %s (it was known as a %s)",
180                 namev,e_gras_msg_kind_names[kind],e_gras_msg_kind_names[msgtype->kind]);
181     xbt_assert3(!gras_datadesc_type_cmp(msgtype->ctn_type, payload_request),
182                  "Message %s re-registred with another payload (%s was %s)",
183                  namev,gras_datadesc_get_name(payload_request),
184                  gras_datadesc_get_name(msgtype->ctn_type));
185
186     xbt_assert3(!gras_datadesc_type_cmp(msgtype->answer_type, payload_answer),
187              "Message %s re-registred with another answer payload (%s was %s)",
188                  namev,gras_datadesc_get_name(payload_answer),
189                  gras_datadesc_get_name(msgtype->answer_type));
190
191     return ; /* do really ignore it */
192
193   }
194
195   VERB4("Register version %d of message '%s' "
196         "(payload: %s; answer payload: %s).", 
197         version, name, gras_datadesc_get_name(payload_request),
198         gras_datadesc_get_name(payload_answer));    
199
200   msgtype = xbt_new(s_gras_msgtype_t,1);
201   msgtype->name = (namev == name ? strdup(name) : namev);
202   msgtype->name_len = strlen(namev);
203   msgtype->version = version;
204   msgtype->kind = kind;
205   msgtype->ctn_type = payload_request;
206   msgtype->answer_type = payload_answer;
207
208   xbt_set_add(_gras_msgtype_set, (xbt_set_elm_t)msgtype,
209                &gras_msgtype_free);
210 }
211
212
213 /** @brief declare a new message type of the given name. It only accepts the given datadesc as payload
214  *
215  * @param name: name as it should be used for logging messages (must be uniq)
216  * @param payload: datadescription of the payload
217  */
218 void gras_msgtype_declare(const char           *name,
219                           gras_datadesc_type_t  payload) {
220    gras_msgtype_declare_ext(name, 0, e_gras_msg_kind_oneway, payload, NULL);
221 }
222
223
224
225 /** @brief declare a new versionned message type of the given name and payload
226  *
227  * @param name: name as it should be used for logging messages (must be uniq)
228  * @param version: something like versionning symbol
229  * @param payload: datadescription of the payload
230  *
231  * Registers a message to the GRAS mechanism. Use this version instead of 
232  * gras_msgtype_declare when you change the semantic or syntax of a message and
233  * want your programs to be able to deal with both versions. Internally, each
234  * will be handled as an independent message type, so you can register 
235  * differents for each of them.
236  */
237 void
238 gras_msgtype_declare_v(const char           *name,
239                        short int             version,
240                        gras_datadesc_type_t  payload) {
241  
242    gras_msgtype_declare_ext(name, version, 
243                             e_gras_msg_kind_oneway, payload, NULL);
244 }
245
246 /** @brief retrieve an existing message type from its name (raises an exception if it does not exist). */
247 gras_msgtype_t gras_msgtype_by_name (const char *name) {
248   return gras_msgtype_by_namev(name,0);
249 }
250 /** @brief retrieve an existing message type from its name (or NULL if it does not exist). */
251 gras_msgtype_t gras_msgtype_by_name_or_null (const char *name) {
252   xbt_ex_t e;
253   gras_msgtype_t res = NULL;
254    
255   TRY {
256      res = gras_msgtype_by_namev(name,0);
257   } CATCH(e) {
258      res = NULL;
259      xbt_ex_free(e);
260   }
261   return res;
262 }
263
264 /** @brief retrieve an existing message type from its name and version. */
265 gras_msgtype_t gras_msgtype_by_namev(const char      *name,
266                                      short int        version) {
267   gras_msgtype_t res = NULL;
268   char *namev = make_namev(name,version);
269   volatile int found=0;
270   xbt_ex_t e;
271
272   TRY {
273     res = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set, namev);
274     found=1;
275   } CATCH(e) {
276     xbt_ex_free(e);
277   }
278   if (!found)
279     THROW1(not_found_error,0,"No registred message of that name: %s",name);
280
281   if (name != namev) 
282     free(namev);
283   
284   return res;
285 }
286 /** @brief retrieve an existing message type from its name and version. */
287 gras_msgtype_t gras_msgtype_by_id(int id) {
288   return (gras_msgtype_t)xbt_set_get_by_id(_gras_msgtype_set, id);
289 }
290
291 /** \brief Waits for a message to come in over a given socket. 
292  *
293  * @param timeout: How long should we wait for this message.
294  * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message)
295  * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant)
296  * @param filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant)
297  * @param filter_ctx: context passed as second argument of the filter (a pattern to match?)
298  * @param[out] msg_got: where to write the message we got
299  *
300  * Every message of another type received before the one waited will be queued
301  * and used by subsequent call to this function or gras_msg_handle().
302  */
303
304 void
305 gras_msg_wait_ext_(double           timeout,    
306
307                   gras_msgtype_t   msgt_want,
308                   gras_socket_t    expe_want,
309                   gras_msg_filter_t filter,
310                   void             *filter_ctx, 
311
312                   gras_msg_t       msg_got) {
313
314   s_gras_msg_t msg;
315   double start, now;
316   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
317   int cpt;
318
319   xbt_assert0(msg_got,"msg_got is an output parameter");
320
321   start = gras_os_time();
322   VERB1("Waiting for message '%s'",msgt_want?msgt_want->name:"(any)");
323
324   xbt_dynar_foreach(pd->msg_waitqueue,cpt,msg){
325     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
326          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
327                                      gras_socket_peer_name(expe_want))))
328          && (!filter || filter(&msg,filter_ctx))) {
329
330       memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
331       xbt_dynar_cursor_rm(pd->msg_waitqueue, &cpt);
332       VERB0("The waited message was queued");
333       return;
334     }
335   }
336
337   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
338     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
339          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
340                                      gras_socket_peer_name(expe_want))))
341          && (!filter || filter(&msg,filter_ctx))) {
342
343       memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
344       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
345       VERB0("The waited message was queued");
346       return;
347     }
348   }
349
350   while (1) {
351     int need_restart;
352     xbt_ex_t e;
353
354   restart_receive: /* Goto here when the receive of a message failed */
355     need_restart=0;
356     now=gras_os_time();
357     memset(&msg,sizeof(msg),0);
358
359     TRY {
360       msg.expe = gras_trp_select(timeout ? timeout - now + start : 0);
361       gras_msg_recv(msg.expe, &msg);
362     } CATCH(e) {
363       if (e.category == system_error &&
364           !strncmp("Socket closed by remote side",e.msg,
365                   strlen("Socket closed by remote side"))) {
366         xbt_ex_free(e);
367         need_restart=1;
368       } else {
369         RETHROW;
370       }
371     }
372     if (need_restart)
373       goto restart_receive;
374
375     DEBUG0("Got a message from the socket");
376
377     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
378          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
379                                      gras_socket_peer_name(expe_want))))
380          && (!filter || filter(&msg,filter_ctx))) {
381
382       memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
383       DEBUG0("Message matches expectations. Use it.");
384       return;
385     }
386     DEBUG0("Message does not match expectations. Queue it.");
387
388     /* not expected msg type. Queue it for later */
389     xbt_dynar_push(pd->msg_queue,&msg);
390     
391     now=gras_os_time();
392     if (now - start + 0.001 > timeout) {
393       THROW1(timeout_error,  now-start+0.001-timeout,
394              "Timeout while waiting for msg '%s'",
395              msgt_want?msgt_want->name:"(any)");
396     }
397   }
398
399   THROW_IMPOSSIBLE;
400 }
401 /** \brief Waits for a message to come in over a given socket. 
402  *
403  * @param timeout: How long should we wait for this message.
404  * @param msgt_want: type of awaited msg
405  * @param[out] expeditor: where to create a socket to answer the incomming message
406  * @param[out] payload: where to write the payload of the incomming message
407  * @return the error code (or no_error).
408  *
409  * Every message of another type received before the one waited will be queued
410  * and used by subsequent call to this function or gras_msg_handle().
411  */
412 void
413 gras_msg_wait_(double           timeout,    
414                gras_msgtype_t   msgt_want,
415                gras_socket_t   *expeditor,
416                void            *payload) {
417   s_gras_msg_t msg;
418
419   gras_msg_wait_ext_(timeout,
420                      msgt_want, NULL,      NULL, NULL,
421                      &msg);
422
423   if (msgt_want->ctn_type) {
424     xbt_assert1(payload,
425                 "Message type '%s' convey a payload you must accept",
426                 msgt_want->name);
427   } else {
428     xbt_assert1(!payload,
429                 "No payload was declared for message type '%s'",
430                 msgt_want->name);
431   }
432
433   if (payload) {
434     memcpy(payload,msg.payl,msg.payl_size);
435     free(msg.payl);
436   }
437
438   if (expeditor)
439     *expeditor = msg.expe;
440 }
441
442 static int gras_msg_wait_or_filter(gras_msg_t msg, void *ctx) {
443   xbt_dynar_t dyn=(xbt_dynar_t)ctx;
444   int res =  xbt_dynar_member(dyn,msg->type);
445   if (res)
446     VERB1("Got matching message (type=%s)",msg->type->name);
447   else
448     VERB0("Got message not matching our expectations");
449   return res;
450 }
451 /** \brief Waits for a message to come in over a given socket. 
452  *
453  * @param timeout: How long should we wait for this message.
454  * @param msgt_want: a dynar containing all accepted message type
455  * @param[out] ctx: the context of received message (in case it's a RPC call we want to answer to)
456  * @param[out] msgt_got: indice in the dynar of the type of the received message 
457  * @param[out] payload: where to write the payload of the incomming message
458  * @return the error code (or no_error).
459  *
460  * Every message of a type not in the accepted list received before the one
461  * waited will be queued and used by subsequent call to this function or
462  * gras_msg_handle().
463  *
464  * If you are interested in the context, pass the address of a s_gras_msg_cb_ctx_t variable.
465  */
466 void gras_msg_wait_or(double         timeout,
467                       xbt_dynar_t    msgt_want,
468                       gras_msg_cb_ctx_t *ctx,
469                       int           *msgt_got,
470                       void          *payload) {
471   s_gras_msg_t msg;
472
473   VERB1("Wait %f seconds for several message types",timeout);
474   gras_msg_wait_ext_(timeout,
475                      NULL, NULL,      
476                      &gras_msg_wait_or_filter, (void*)msgt_want,
477                      &msg);
478
479   if (msg.type->ctn_type) {
480     xbt_assert1(payload,
481                 "Message type '%s' convey a payload you must accept",
482                 msg.type->name);
483   } /* don't check the other side since some of the types may have a payload */
484
485   if (payload && msg.type->ctn_type) {
486     memcpy(payload,msg.payl,msg.payl_size);
487     free(msg.payl);
488   }
489
490   if (ctx) 
491     *ctx=gras_msg_cb_ctx_new(msg.expe, msg.type, msg.ID,
492                              (msg.kind == e_gras_msg_kind_rpccall), 60);
493
494   if (msgt_got)
495     *msgt_got = xbt_dynar_search(msgt_want,msg.type);
496 }
497
498
499 /** \brief Send the data pointed by \a payload as a message of type
500  * \a msgtype to the peer \a sock */
501 void
502 gras_msg_send_(gras_socket_t   sock,
503               gras_msgtype_t  msgtype,
504               void           *payload) {
505
506   if (msgtype->ctn_type) {
507     xbt_assert1(payload,
508                 "Message type '%s' convey a payload you must provide",
509                 msgtype->name);
510   } else {
511     xbt_assert1(!payload,
512                 "No payload was declared for message type '%s'",
513                 msgtype->name);
514   }
515
516   DEBUG2("Send a oneway message of type '%s'. Payload=%p",
517          msgtype->name,payload);
518   gras_msg_send_ext(sock, e_gras_msg_kind_oneway,0, msgtype, payload);
519   VERB2("Sent a oneway message of type '%s'. Payload=%p",
520         msgtype->name,payload);
521 }
522
523 /** @brief Handle all messages arriving within the given period
524  *
525  * @param period: How long to wait for incoming messages (in seconds)
526  *
527  * Messages are dealed with just like gras_msg_handle() would do. The
528  * difference is that gras_msg_handle() handles at most one message (or wait up
529  * to timeout second when no message arrives) while this function handles any
530  * amount of messages, and lasts the given period in any case.
531  */
532 void 
533 gras_msg_handleall(double period) {
534   xbt_ex_t e;
535   double begin=gras_os_time();
536   double now;
537
538   do {
539     now=gras_os_time();
540     TRY{
541       if (period - now + begin > 0)
542         gras_msg_handle(period - now + begin);
543     } CATCH(e) {
544       if (e.category != timeout_error) 
545         RETHROW0("Error while waiting for messages: %s");
546       xbt_ex_free(e);
547     }
548   } while (now - begin < period);
549 }
550
551 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
552  *
553  * @param timeOut: How long to wait for incoming messages (in seconds)
554  * @return the error code (or no_error).
555  *
556  * Messages are passed to the callbacks. See also gras_msg_handleall().
557  */
558 void
559 gras_msg_handle(double timeOut) {
560   
561   double          untiltimer;
562    
563   int             cpt;
564   int volatile ran_ok;
565
566   s_gras_msg_t    msg;
567
568   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
569   gras_cblist_t  *list=NULL;
570   gras_msg_cb_t       cb;
571   s_gras_msg_cb_ctx_t ctx;
572    
573   int timerexpected, timeouted;
574   xbt_ex_t e;
575
576   VERB1("Handling message within the next %.2fs",timeOut);
577   
578   untiltimer = gras_msg_timer_handle();
579   DEBUG1("Next timer in %f sec", untiltimer);
580   if (untiltimer == 0.0) {
581      /* A timer was already elapsed and handled */
582      return;
583   }
584   if (untiltimer != -1.0) {
585      timerexpected = 1;
586      timeOut = MIN(timeOut, untiltimer);
587   } else {
588      timerexpected = 0;
589   }
590    
591   /* get a message (from the queue or from the net) */
592   timeouted = 0;
593   if (xbt_dynar_length(pd->msg_queue)) {
594     DEBUG0("Get a message from the queue");
595     xbt_dynar_shift(pd->msg_queue,&msg);
596   } else {
597     TRY {
598       msg.expe = gras_trp_select(timeOut);
599     } CATCH(e) {
600       if (e.category != timeout_error)
601         RETHROW;
602       xbt_ex_free(e);
603       timeouted = 1;
604     }
605
606     if (!timeouted) {
607       TRY {
608         /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
609         gras_msg_recv(msg.expe, &msg);
610         DEBUG1("Received a msg from the socket kind:%s",
611                e_gras_msg_kind_names[msg.kind]);
612     
613       } CATCH(e) {
614         RETHROW4("Error while receiving a message on select()ed socket %p to [%s]%s:%d: %s",
615                  msg.expe,
616                  gras_socket_peer_proc(msg.expe),gras_socket_peer_name(msg.expe),
617                  gras_socket_peer_port(msg.expe));
618       }
619     }
620   }
621
622   if (timeouted) {
623      if (timerexpected) {
624           
625         /* A timer elapsed before the arrival of any message even if we select()ed a bit */
626         untiltimer = gras_msg_timer_handle();
627         if (untiltimer == 0.0) {
628           /* we served a timer, we're done */
629           return;
630         } else {
631            xbt_assert1(untiltimer>0, "Negative timer (%f). I'm 'puzzeled'", untiltimer);
632            WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
633                   untiltimer);
634            THROW1(timeout_error,0,
635                   "No timer elapsed, in contrary to expectations (next in %f sec)",
636                   untiltimer);
637         }
638         
639      } else {
640         /* select timeouted, and no timer elapsed. Nothing to do */
641        THROW1(timeout_error, 0, "No new message or timer (delay was %f)",
642               timeOut);
643      }
644      
645   }
646    
647   /* A message was already there or arrived in the meanwhile. handle it */
648   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
649     if (list->id == msg.type->code) {
650       break;
651     } else {
652       list=NULL;
653     }
654   }
655   if (!list) {
656     INFO3("No callback for message '%s' from %s:%d. Queue it for later gras_msg_wait() use.",
657           msg.type->name,
658           gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe));
659     xbt_dynar_push(pd->msg_waitqueue,&msg);
660     return; /* FIXME: maybe we should call ourselves again until the end of the timer or a proper msg is got */
661   }
662   
663   ctx.expeditor = msg.expe;
664   ctx.ID = msg.ID;
665   ctx.msgtype = msg.type;
666   ctx.answer_due = (msg.kind == e_gras_msg_kind_rpccall);
667
668   switch (msg.kind) {
669   case e_gras_msg_kind_oneway:
670   case e_gras_msg_kind_rpccall:
671     ran_ok=0;
672     TRY {
673       xbt_dynar_foreach(list->cbs,cpt,cb) { 
674         if (!ran_ok) {
675           DEBUG4("Use the callback #%d (@%p) for incomming msg %s (payload_size=%d)",
676                 cpt+1,cb,msg.type->name,msg.payl_size);
677           if (!(*cb)(&ctx,msg.payl)) {
678             /* cb handled the message */
679             free(msg.payl);
680             ran_ok = 1;
681           }
682         }
683       }
684     } CATCH(e) {
685       free(msg.payl);
686       if (msg.type->kind == e_gras_msg_kind_rpccall) {
687         /* The callback raised an exception, propagate it on the network */
688         if (!e.remote) { /* the exception is born on this machine */
689           e.host = (char*)gras_os_myname();
690           xbt_ex_setup_backtrace(&e);
691         } 
692         VERB5("Propagate %s exception ('%s') from '%s' RPC cb back to %s:%d",
693               (e.remote ? "remote" : "local"),
694               e.msg,
695               msg.type->name,
696               gras_socket_peer_name(msg.expe),
697               gras_socket_peer_port(msg.expe));
698         gras_msg_send_ext(msg.expe, e_gras_msg_kind_rpcerror,
699                           msg.ID, msg.type, &e);
700         xbt_ex_free(e);
701         ctx.answer_due = 0;
702         ran_ok=1;
703       } else {
704         RETHROW0("Callback raised an exception: %s");
705       }
706     }
707     xbt_assert0(!(ctx.answer_due),
708                 "RPC callback didn't call gras_msg_rpcreturn");
709
710     if (!ran_ok)
711       THROW1(mismatch_error,0,
712              "Message '%s' refused by all registered callbacks", msg.type->name);
713     /* FIXME: gras_datadesc_free not implemented => leaking the payload */
714     break;
715
716
717   case e_gras_msg_kind_rpcanswer:
718     INFO1("Unexpected RPC answer discarded (type: %s)", msg.type->name);
719     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
720     return;
721
722   case e_gras_msg_kind_rpcerror:
723     INFO1("Unexpected RPC error discarded (type: %s)", msg.type->name);
724     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
725     return;
726
727   default:
728     THROW1(unknown_error,0,
729            "Cannot handle messages of kind %d yet",msg.type->kind);
730   }
731
732 }
733
734 void
735 gras_cbl_free(void *data){
736   gras_cblist_t *list=*(void**)data;
737   if (list) {
738     xbt_dynar_free(&( list->cbs ));
739     free(list);
740   }
741 }
742
743 /** \brief Bind the given callback to the given message type 
744  *
745  * Several callbacks can be attached to a given message type. The lastly added one will get the message first, and 
746  * if it returns a non-null value, the message will be passed to the second one. 
747  * And so on until one of the callbacks accepts the message.
748  */
749 void
750 gras_cb_register_(gras_msgtype_t msgtype,
751                   gras_msg_cb_t cb) {
752   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
753   gras_cblist_t *list=NULL;
754   int cpt;
755
756   DEBUG2("Register %p as callback to '%s'",cb,msgtype->name);
757
758   /* search the list of cb for this message on this host (creating if NULL) */
759   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
760     if (list->id == msgtype->code) {
761       break;
762     } else {
763       list=NULL;
764     }
765   }
766   if (!list) {
767     /* First cb? Create room */
768     list = xbt_new(gras_cblist_t,1);
769     list->id = msgtype->code;
770     list->cbs = xbt_dynar_new(sizeof(gras_msg_cb_t), NULL);
771     xbt_dynar_push(pd->cbl_list,&list);
772   }
773
774   /* Insert the new one into the set */
775   xbt_dynar_insert_at(list->cbs,0,&cb);
776 }
777
778 /** \brief Unbind the given callback from the given message type */
779 void
780 gras_cb_unregister_(gras_msgtype_t msgtype,
781                     gras_msg_cb_t cb) {
782
783   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
784   gras_cblist_t *list;
785   gras_msg_cb_t cb_cpt;
786   int cpt;
787   int found = 0;
788
789   /* search the list of cb for this message on this host */
790   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
791     if (list->id == msgtype->code) {
792       break;
793     } else {
794       list=NULL;
795     }
796   }
797
798   /* Remove it from the set */
799   if (list) {
800     xbt_dynar_foreach(list->cbs,cpt,cb_cpt) {
801       if (cb == cb_cpt) {
802         xbt_dynar_cursor_rm(list->cbs, &cpt);
803         found = 1;
804       }
805     }
806   }
807   if (!found)
808     VERB1("Ignoring removal of unexisting callback to msg id %d",
809           msgtype->code);
810 }
811
812 /** \brief Retrieve the expeditor of the message */
813 gras_socket_t gras_msg_cb_ctx_from(gras_msg_cb_ctx_t ctx) {
814   return ctx->expeditor;
815 }
816 /* \brief Creates a new message exchange context (user should never have to) */
817 gras_msg_cb_ctx_t gras_msg_cb_ctx_new(gras_socket_t expe, 
818                                       gras_msgtype_t msgtype,
819                                       unsigned long int ID,
820                                       int answer_due,
821                                       double timeout) {
822   gras_msg_cb_ctx_t res=xbt_new(s_gras_msg_cb_ctx_t,1);
823   res->expeditor = expe;
824   res->msgtype = msgtype;
825   res->ID = ID;
826   res->timeout = timeout;
827   res->answer_due = answer_due;
828
829   return res;
830 }
831 /* \brief Frees a message exchange context 
832  *
833  * This function is mainly useful with \ref gras_msg_wait_or, ie seldom.
834  */
835 void gras_msg_cb_ctx_free(gras_msg_cb_ctx_t ctx) {
836   free(ctx);
837 }