Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
plug an harmless leak
[simgrid.git] / src / gras / Msg / msg.c
1 /* $Id$ */
2
3 /* messaging - Function related to messaging (code shared between RL and SG)*/
4
5 /* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved.            */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "gras/Msg/msg_private.h"
12 #include "gras/Virtu/virtu_interface.h"
13 #include "gras/DataDesc/datadesc_interface.h"
14 #include "gras/Transport/transport_interface.h" /* gras_select */
15
16 #define MIN(a,b) ((a) < (b) ? (a) : (b))
17
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
19
20 xbt_set_t _gras_msgtype_set = NULL;
21 static char *make_namev(const char *name, short int ver);
22 char _GRAS_header[6];
23
24 /*
25  * Creating procdata for this module
26  */
27 static void *gras_msg_procdata_new() {
28    gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1);
29    
30    res->name = xbt_strdup("gras_msg");
31    res->name_len = 0;
32    res->msg_queue = xbt_dynar_new(sizeof(s_gras_msg_t),   NULL);
33    res->cbl_list  = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
34    res->timers    = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
35    
36    return (void*)res;
37 }
38
39 /*
40  * Freeing procdata for this module
41  */
42 static void gras_msg_procdata_free(void *data) {
43    gras_msg_procdata_t res = (gras_msg_procdata_t)data;
44    
45    xbt_dynar_free(&( res->msg_queue ));
46    xbt_dynar_free(&( res->cbl_list ));
47    xbt_dynar_free(&( res->timers ));
48
49    free(res->name);
50    free(res);
51 }
52
53 /*
54  * Module registration
55  */
56 int gras_msg_libdata_id;
57 void gras_msg_register() {
58    gras_msg_libdata_id = gras_procdata_add("gras_msg",gras_msg_procdata_new, gras_msg_procdata_free);
59 }
60
61 /*
62  * Initialize this submodule.
63  */
64 void gras_msg_init(void) {
65   /* only initialize once */
66   if (_gras_msgtype_set != NULL)
67     return;
68
69   VERB0("Initializing Msg");
70   
71   _gras_msgtype_set = xbt_set_new();
72
73   memcpy(_GRAS_header,"GRAS", 4);
74   _GRAS_header[4]=GRAS_PROTOCOL_VERSION;
75   _GRAS_header[5]=(char)GRAS_THISARCH;
76 }
77
78 /*
79  * Finalize the msg module
80  */
81 void
82 gras_msg_exit(void) {
83   VERB0("Exiting Msg");
84   xbt_set_free(&_gras_msgtype_set);
85 }
86
87 /*
88  * Reclamed memory
89  */
90 void gras_msgtype_free(void *t) {
91   gras_msgtype_t msgtype=(gras_msgtype_t)t;
92   if (msgtype) {
93     free(msgtype->name);
94     free(msgtype);
95   }
96 }
97
98 /**
99  * make_namev:
100  *
101  * Returns the versionned name of the message. If the version is 0, that's 
102  * the name unchanged. Pay attention to this before free'ing the result.
103  */
104 static char *make_namev(const char *name, short int ver) {
105   char *namev;
106
107   if (!ver)
108     return (char *)name;
109
110   namev = (char*)xbt_malloc(strlen(name)+2+3+1);
111
112   if (namev)
113       sprintf(namev,"%s_v%d",name,ver);
114
115   return namev;
116 }
117
118 /** @brief declare a new message type of the given name. It only accepts the given datadesc as payload
119  *
120  * @param name: name as it should be used for logging messages (must be uniq)
121  * @param payload: datadescription of the payload
122  */
123 void gras_msgtype_declare(const char           *name,
124                           gras_datadesc_type_t  payload) {
125    gras_msgtype_declare_v(name, 0, payload);
126 }
127
128 /** @brief declare a new versionned message type of the given name and payload
129  *
130  * @param name: name as it should be used for logging messages (must be uniq)
131  * @param version: something like versionning symbol
132  * @param payload: datadescription of the payload
133  *
134  * Registers a message to the GRAS mechanism. Use this version instead of 
135  * gras_msgtype_declare when you change the semantic or syntax of a message and
136  * want your programs to be able to deal with both versions. Internally, each
137  * will be handled as an independent message type, so you can register 
138  * differents for each of them.
139  */
140 void
141 gras_msgtype_declare_v(const char           *name,
142                        short int             version,
143                        gras_datadesc_type_t  payload) {
144  
145   gras_msgtype_t msgtype=NULL;
146   char *namev=make_namev(name,version);
147   volatile int found = 0;
148   xbt_ex_t e;    
149   
150   TRY {
151     msgtype = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set,namev);
152     found = 1;
153   } CATCH(e) {
154     if (e.category != not_found_error)
155       RETHROW;
156     xbt_ex_free(e);
157   }
158
159   if (found) {
160     VERB2("Re-register version %d of message '%s' (same payload, ignored).",
161           version, name);
162     xbt_assert3(!gras_datadesc_type_cmp(msgtype->ctn_type, payload),
163                  "Message %s re-registred with another payload (%s was %s)",
164                  namev,gras_datadesc_get_name(payload),
165                  gras_datadesc_get_name(msgtype->ctn_type));
166
167     return ; /* do really ignore it */
168
169   }
170
171   VERB3("Register version %d of message '%s' (payload: %s).", 
172         version, name, gras_datadesc_get_name(payload));    
173
174   msgtype = xbt_new(s_gras_msgtype_t,1);
175   msgtype->name = (namev == name ? strdup(name) : namev);
176   msgtype->name_len = strlen(namev);
177   msgtype->version = version;
178   msgtype->ctn_type = payload;
179
180   xbt_set_add(_gras_msgtype_set, (xbt_set_elm_t)msgtype,
181                &gras_msgtype_free);
182 }
183
184 /** @brief retrive an existing message type from its name. */
185 gras_msgtype_t gras_msgtype_by_name (const char *name) {
186   return gras_msgtype_by_namev(name,0);
187 }
188
189 /** @brief retrive an existing message type from its name and version. */
190 gras_msgtype_t gras_msgtype_by_namev(const char      *name,
191                                      short int        version) {
192   gras_msgtype_t res;
193   char *namev = make_namev(name,version); 
194
195   res = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set, namev);
196   if (name != namev) 
197     free(namev);
198   
199   return res;
200 }
201 /** @brief retrive an existing message type from its name and version. */
202 gras_msgtype_t gras_msgtype_by_id(int id) {
203   return (gras_msgtype_t)xbt_set_get_by_id(_gras_msgtype_set, id);
204 }
205
206 /** \brief Waits for a message to come in over a given socket. 
207  *
208  * @param timeout: How long should we wait for this message.
209  * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message)
210  * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant)
211  * @param payl_filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant)
212  * @param filter_ctx: context passed as second argument of the filter (a pattern to match?)
213  * @param[out] msgt_got: where to write the descriptor of the message we got
214  * @param[out] expe_got: where to create a socket to answer the incomming message
215  * @param[out] payl_got: where to write the payload of the incomming message
216  *
217  * Every message of another type received before the one waited will be queued
218  * and used by subsequent call to this function or gras_msg_handle().
219  */
220
221 void
222 gras_msg_wait_ext(double           timeout,    
223                   gras_msgtype_t   msgt_want,
224                   gras_socket_t    expe_want,
225                   int_f_pvoid_pvoid_t payl_filter,
226                   void               *filter_ctx, 
227                   gras_msgtype_t  *msgt_got,
228                   gras_socket_t   *expe_got,
229                   void            *payl_got) {
230
231   s_gras_msg_t msg;
232   double start, now;
233   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
234   int cpt;
235
236   xbt_assert0(msgt_want,"Cannot wait for the NULL message");
237
238   VERB1("Waiting for message '%s'",msgt_want->name);
239
240   start = now = gras_os_time();
241
242   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
243     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
244          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
245                                      gras_socket_peer_name(expe_want))))
246          && (!payl_filter || payl_filter(msg.payl,filter_ctx))) {
247
248       if (expe_got)
249         *expe_got = msg.expe;
250       if (msgt_got)
251         *msgt_got = msg.type;
252       if (payl_got) 
253         memcpy(payl_got, msg.payl, msg.payl_size);
254       free(msg.payl);
255       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
256       VERB0("The waited message was queued");
257       return;
258     }
259   }
260
261   while (1) {
262     memset(&msg,sizeof(msg),0);
263
264     msg.expe = gras_trp_select(timeout - now + start);
265     gras_msg_recv(msg.expe, &msg);
266
267     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
268          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
269                                      gras_socket_peer_name(expe_want))))
270          && (!payl_filter || payl_filter(msg.payl,filter_ctx))) {
271
272       if (expe_got)
273         *expe_got=msg.expe;
274       if (msgt_got)
275         *msgt_got = msg.type;
276       if (payl_got) 
277         memcpy(payl_got, msg.payl, msg.payl_size);
278       free(msg.payl);
279       return;
280     }
281
282     /* not expected msg type. Queue it for later */
283     xbt_dynar_push(pd->msg_queue,&msg);
284     
285     now=gras_os_time();
286     if (now - start + 0.001 < timeout) {
287       THROW1(timeout_error,  now-start+0.001-timeout,
288              "Timeout while waiting for msg %s",msgt_want->name);
289     }
290   }
291
292   THROW_IMPOSSIBLE;
293 }
294 /** \brief Waits for a message to come in over a given socket. 
295  *
296  * @param timeout: How long should we wait for this message.
297  * @param msgt_want: type of awaited msg
298  * @param[out] expeditor: where to create a socket to answer the incomming message
299  * @param[out] payload: where to write the payload of the incomming message
300  * @return the error code (or no_error).
301  *
302  * Every message of another type received before the one waited will be queued
303  * and used by subsequent call to this function or gras_msg_handle().
304  */
305 void
306 gras_msg_wait(double           timeout,    
307               gras_msgtype_t   msgt_want,
308               gras_socket_t   *expeditor,
309               void            *payload) {
310
311   return gras_msg_wait_ext(timeout,
312                            msgt_want, NULL,      NULL, NULL,
313                            NULL,      expeditor, payload);
314 }
315
316
317 /** \brief Send the data pointed by \a payload as a message of type
318  * \a msgtype to the peer \a sock */
319 void
320 gras_msg_send(gras_socket_t   sock,
321               gras_msgtype_t  msgtype,
322               void           *payload) {
323
324   gras_msg_send_ext(sock, e_gras_msg_kind_oneway, msgtype, payload);
325 }
326
327 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
328  *
329  * @param timeOut: How long to wait for incoming messages (in seconds)
330  * @return the error code (or no_error).
331  *
332  * Messages are passed to the callbacks.
333  */
334 void
335 gras_msg_handle(double timeOut) {
336   
337   double          untiltimer;
338    
339   int             cpt;
340
341   s_gras_msg_t    msg;
342   gras_socket_t   expeditor=NULL;
343   void           *payload=NULL;
344   gras_msgtype_t  msgtype=NULL;
345
346   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
347   gras_cblist_t  *list=NULL;
348   gras_msg_cb_t       cb;
349    
350   int timerexpected, timeouted;
351   xbt_ex_t e;
352
353   VERB1("Handling message within the next %.2fs",timeOut);
354   
355   untiltimer = gras_msg_timer_handle();
356   DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer);
357   if (untiltimer == 0.0) {
358      /* A timer was already elapsed and handled */
359      return;
360   }
361   if (untiltimer != -1.0) {
362      timerexpected = 1;
363      timeOut = MIN(timeOut, untiltimer);
364   } else {
365      timerexpected = 0;
366   }
367    
368   /* get a message (from the queue or from the net) */
369   timeouted = 0;
370   if (xbt_dynar_length(pd->msg_queue)) {
371     DEBUG0("Get a message from the queue");
372     xbt_dynar_shift(pd->msg_queue,&msg);
373     expeditor = msg.expe;
374     msgtype   = msg.type;
375     payload   = msg.payl;
376   } else {
377     TRY {
378       expeditor = gras_trp_select(timeOut);
379     } CATCH(e) {
380       if (e.category != timeout_error)
381         RETHROW;
382       xbt_ex_free(e);
383       timeouted = 1;
384     }
385
386     if (!timeouted) {
387       TRY {
388         /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
389         gras_msg_recv(expeditor, &msg);
390         msgtype   = msg.type;
391         payload   = msg.payl;
392       } CATCH(e) {
393         RETHROW1("Error caught while receiving a message on select()ed socket %p: %s",
394                  expeditor);
395       }
396     }
397   }
398
399   if (timeouted) {
400      if (timerexpected) {
401           
402         /* A timer elapsed before the arrival of any message even if we select()ed a bit */
403         untiltimer = gras_msg_timer_handle();
404         if (untiltimer == 0.0) {
405           /* we served a timer, we're done */
406           return;
407         } else {
408            xbt_assert1(untiltimer>0, "Negative timer (%f). I'm 'puzzeled'", untiltimer);
409            WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
410                   untiltimer);
411            THROW1(timeout_error,0,
412                   "No timer elapsed, in contrary to expectations (next in %f sec)",
413                   untiltimer);
414         }
415         
416      } else {
417         /* select timeouted, and no timer elapsed. Nothing to do */
418        THROW0(timeout_error, 0, "No new message or timer");
419      }
420      
421   }
422    
423   /* A message was already there or arrived in the meanwhile. handle it */
424   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
425     if (list->id == msgtype->code) {
426       break;
427     } else {
428       list=NULL;
429     }
430   }
431   if (!list) {
432     INFO1("No callback for the incomming '%s' message. Discarded.", 
433           msgtype->name);
434     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
435     return;
436   }
437   
438   xbt_dynar_foreach(list->cbs,cpt,cb) { 
439     VERB3("Use the callback #%d (@%p) for incomming msg %s",
440           cpt+1,cb,msgtype->name);
441     if ((*cb)(expeditor,payload)) {
442       /* cb handled the message */
443       free(payload);
444       return;
445     }
446   }
447
448   /* FIXME: gras_datadesc_free not implemented => leaking the payload */
449   THROW1(mismatch_error,0,
450          "Message '%s' refused by all registered callbacks", msgtype->name);
451 }
452
453 void
454 gras_cbl_free(void *data){
455   gras_cblist_t *list=*(void**)data;
456   if (list) {
457     xbt_dynar_free(&( list->cbs ));
458     free(list);
459   }
460 }
461
462 /** \brief Bind the given callback to the given message type 
463  *
464  * Several callbacks can be attached to a given message type. The lastly added one will get the message first, and 
465  * if it returns false, the message will be passed to the second one. 
466  * And so on until one of the callbacks accepts the message.
467  */
468 void
469 gras_cb_register(gras_msgtype_t msgtype,
470                  gras_msg_cb_t cb) {
471   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
472   gras_cblist_t *list=NULL;
473   int cpt;
474
475   DEBUG2("Register %p as callback to '%s'",cb,msgtype->name);
476
477   /* search the list of cb for this message on this host (creating if NULL) */
478   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
479     if (list->id == msgtype->code) {
480       break;
481     } else {
482       list=NULL;
483     }
484   }
485   if (!list) {
486     /* First cb? Create room */
487     list = xbt_new(gras_cblist_t,1);
488     list->id = msgtype->code;
489     list->cbs = xbt_dynar_new(sizeof(gras_msg_cb_t), NULL);
490     xbt_dynar_push(pd->cbl_list,&list);
491   }
492
493   /* Insert the new one into the set */
494   xbt_dynar_insert_at(list->cbs,0,&cb);
495 }
496
497 /** \brief Unbind the given callback from the given message type */
498 void
499 gras_cb_unregister(gras_msgtype_t msgtype,
500                    gras_msg_cb_t cb) {
501
502   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
503   gras_cblist_t *list;
504   gras_msg_cb_t cb_cpt;
505   int cpt;
506   int found = 0;
507
508   /* search the list of cb for this message on this host */
509   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
510     if (list->id == msgtype->code) {
511       break;
512     } else {
513       list=NULL;
514     }
515   }
516
517   /* Remove it from the set */
518   if (list) {
519     xbt_dynar_foreach(list->cbs,cpt,cb_cpt) {
520       if (cb == cb_cpt) {
521         xbt_dynar_cursor_rm(list->cbs, &cpt);
522         found = 1;
523       }
524     }
525   }
526   if (!found)
527     VERB1("Ignoring removal of unexisting callback to msg id %d",
528           msgtype->code);
529 }