Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
New function: gras_msg_wait_ext (for a finer control of accepted messages); introduce...
[simgrid.git] / src / gras / Msg / msg.c
1 /* $Id$ */
2
3 /* messaging - Function related to messaging (code shared between RL and SG)*/
4
5 /* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved.            */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "gras/Msg/msg_private.h"
12 #include "gras/Virtu/virtu_interface.h"
13 #include "gras/DataDesc/datadesc_interface.h"
14 #include "gras/Transport/transport_interface.h" /* gras_select */
15
16 #define MIN(a,b) ((a) < (b) ? (a) : (b))
17
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
19
20 xbt_set_t _gras_msgtype_set = NULL;
21 static char *make_namev(const char *name, short int ver);
22 char _GRAS_header[6];
23
24 /*
25  * Creating procdata for this module
26  */
27 static void *gras_msg_procdata_new() {
28    gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1);
29    
30    res->name = xbt_strdup("gras_msg");
31    res->name_len = 0;
32    res->msg_queue = xbt_dynar_new(sizeof(s_gras_msg_t),   NULL);
33    res->cbl_list  = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
34    res->timers    = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
35    
36    return (void*)res;
37 }
38
39 /*
40  * Freeing procdata for this module
41  */
42 static void gras_msg_procdata_free(void *data) {
43    gras_msg_procdata_t res = (gras_msg_procdata_t)data;
44    
45    xbt_dynar_free(&( res->msg_queue ));
46    xbt_dynar_free(&( res->cbl_list ));
47    xbt_dynar_free(&( res->timers ));
48    
49    free(res);
50 }
51
52 /*
53  * Module registration
54  */
55 int gras_msg_libdata_id;
56 void gras_msg_register() {
57    gras_msg_libdata_id = gras_procdata_add("gras_msg",gras_msg_procdata_new, gras_msg_procdata_free);
58 }
59
60 /*
61  * Initialize this submodule.
62  */
63 void gras_msg_init(void) {
64   /* only initialize once */
65   if (_gras_msgtype_set != NULL)
66     return;
67
68   VERB0("Initializing Msg");
69   
70   _gras_msgtype_set = xbt_set_new();
71
72   memcpy(_GRAS_header,"GRAS", 4);
73   _GRAS_header[4]=GRAS_PROTOCOL_VERSION;
74   _GRAS_header[5]=(char)GRAS_THISARCH;
75 }
76
77 /*
78  * Finalize the msg module
79  */
80 void
81 gras_msg_exit(void) {
82   VERB0("Exiting Msg");
83   xbt_set_free(&_gras_msgtype_set);
84 }
85
86 /*
87  * Reclamed memory
88  */
89 void gras_msgtype_free(void *t) {
90   gras_msgtype_t msgtype=(gras_msgtype_t)t;
91   if (msgtype) {
92     free(msgtype->name);
93     free(msgtype);
94   }
95 }
96
97 /**
98  * make_namev:
99  *
100  * Returns the versionned name of the message. If the version is 0, that's 
101  * the name unchanged. Pay attention to this before free'ing the result.
102  */
103 static char *make_namev(const char *name, short int ver) {
104   char *namev;
105
106   if (!ver)
107     return (char *)name;
108
109   namev = (char*)xbt_malloc(strlen(name)+2+3+1);
110
111   if (namev)
112       sprintf(namev,"%s_v%d",name,ver);
113
114   return namev;
115 }
116
117 /** @brief declare a new message type of the given name. It only accepts the given datadesc as payload
118  *
119  * @param name: name as it should be used for logging messages (must be uniq)
120  * @param payload: datadescription of the payload
121  */
122 void gras_msgtype_declare(const char           *name,
123                           gras_datadesc_type_t  payload) {
124    gras_msgtype_declare_v(name, 0, payload);
125 }
126
127 /** @brief declare a new versionned message type of the given name and payload
128  *
129  * @param name: name as it should be used for logging messages (must be uniq)
130  * @param version: something like versionning symbol
131  * @param payload: datadescription of the payload
132  *
133  * Registers a message to the GRAS mechanism. Use this version instead of 
134  * gras_msgtype_declare when you change the semantic or syntax of a message and
135  * want your programs to be able to deal with both versions. Internally, each
136  * will be handled as an independent message type, so you can register 
137  * differents for each of them.
138  */
139 void
140 gras_msgtype_declare_v(const char           *name,
141                        short int             version,
142                        gras_datadesc_type_t  payload) {
143  
144   gras_msgtype_t msgtype=NULL;
145   char *namev=make_namev(name,version);
146   volatile int found = 0;
147   xbt_ex_t e;    
148   
149   TRY {
150     msgtype = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set,namev);
151     found = 1;
152   } CATCH(e) {
153     if (e.category != not_found_error)
154       RETHROW;
155     xbt_ex_free(e);
156   }
157
158   if (found) {
159     VERB2("Re-register version %d of message '%s' (same payload, ignored).",
160           version, name);
161     xbt_assert3(!gras_datadesc_type_cmp(msgtype->ctn_type, payload),
162                  "Message %s re-registred with another payload (%s was %s)",
163                  namev,gras_datadesc_get_name(payload),
164                  gras_datadesc_get_name(msgtype->ctn_type));
165
166     return ; /* do really ignore it */
167
168   }
169
170   VERB3("Register version %d of message '%s' (payload: %s).", 
171         version, name, gras_datadesc_get_name(payload));    
172
173   msgtype = xbt_new(s_gras_msgtype_t,1);
174   msgtype->name = (namev == name ? strdup(name) : namev);
175   msgtype->name_len = strlen(namev);
176   msgtype->version = version;
177   msgtype->ctn_type = payload;
178
179   xbt_set_add(_gras_msgtype_set, (xbt_set_elm_t)msgtype,
180                &gras_msgtype_free);
181 }
182
183 /** @brief retrive an existing message type from its name. */
184 gras_msgtype_t gras_msgtype_by_name (const char *name) {
185   return gras_msgtype_by_namev(name,0);
186 }
187
188 /** @brief retrive an existing message type from its name and version. */
189 gras_msgtype_t gras_msgtype_by_namev(const char      *name,
190                                      short int        version) {
191   gras_msgtype_t res;
192   char *namev = make_namev(name,version); 
193
194   res = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set, namev);
195   if (name != namev) 
196     free(namev);
197   
198   return res;
199 }
200 /** @brief retrive an existing message type from its name and version. */
201 gras_msgtype_t gras_msgtype_by_id(int id) {
202   return (gras_msgtype_t)xbt_set_get_by_id(_gras_msgtype_set, id);
203 }
204
205 /** \brief Waits for a message to come in over a given socket. 
206  *
207  * @param timeout: How long should we wait for this message.
208  * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message)
209  * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant)
210  * @param payl_filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant)
211  * @param filter_ctx: context passed as second argument of the filter (a pattern to match?)
212  * @param[out] msgt_got: where to write the descriptor of the message we got
213  * @param[out] expe_got: where to create a socket to answer the incomming message
214  * @param[out] payl_got: where to write the payload of the incomming message
215  *
216  * Every message of another type received before the one waited will be queued
217  * and used by subsequent call to this function or gras_msg_handle().
218  */
219
220 void
221 gras_msg_wait_ext(double           timeout,    
222                   gras_msgtype_t   msgt_want,
223                   gras_socket_t    expe_want,
224                   int_f_pvoid_pvoid_t payl_filter,
225                   void               *filter_ctx, 
226                   gras_msgtype_t  *msgt_got,
227                   gras_socket_t   *expe_got,
228                   void            *payl_got) {
229
230   s_gras_msg_t msg;
231   double start, now;
232   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
233   int cpt;
234
235   xbt_assert0(msgt_want,"Cannot wait for the NULL message");
236
237   VERB1("Waiting for message '%s'",msgt_want->name);
238
239   start = now = gras_os_time();
240
241   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
242     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
243          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
244                                      gras_socket_peer_name(expe_want))))
245          && (!payl_filter || payl_filter(msg.payl,filter_ctx))) {
246
247       if (expe_got)
248         *expe_got = msg.expe;
249       if (msgt_got)
250         *msgt_got = msg.type;
251       if (payl_got) 
252         memcpy(payl_got, msg.payl, msg.payl_size);
253       free(msg.payl);
254       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
255       VERB0("The waited message was queued");
256       return;
257     }
258   }
259
260   while (1) {
261     memset(&msg,sizeof(msg),0);
262
263     msg.expe = gras_trp_select(timeout - now + start);
264     gras_msg_recv(msg.expe, &msg);
265
266     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
267          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
268                                      gras_socket_peer_name(expe_want))))
269          && (!payl_filter || payl_filter(msg.payl,filter_ctx))) {
270
271       if (expe_got)
272         *expe_got=msg.expe;
273       if (msgt_got)
274         *msgt_got = msg.type;
275       if (payl_got) 
276         memcpy(payl_got, msg.payl, msg.payl_size);
277       free(msg.payl);
278       return;
279     }
280
281     /* not expected msg type. Queue it for later */
282     xbt_dynar_push(pd->msg_queue,&msg);
283     
284     now=gras_os_time();
285     if (now - start + 0.001 < timeout) {
286       THROW1(timeout_error,  now-start+0.001-timeout,
287              "Timeout while waiting for msg %s",msgt_want->name);
288     }
289   }
290
291   THROW_IMPOSSIBLE;
292 }
293 /** \brief Waits for a message to come in over a given socket. 
294  *
295  * @param timeout: How long should we wait for this message.
296  * @param msgt_want: type of awaited msg
297  * @param[out] expeditor: where to create a socket to answer the incomming message
298  * @param[out] payload: where to write the payload of the incomming message
299  * @return the error code (or no_error).
300  *
301  * Every message of another type received before the one waited will be queued
302  * and used by subsequent call to this function or gras_msg_handle().
303  */
304 void
305 gras_msg_wait(double           timeout,    
306               gras_msgtype_t   msgt_want,
307               gras_socket_t   *expeditor,
308               void            *payload) {
309
310   return gras_msg_wait_ext(timeout,
311                            msgt_want, NULL,      NULL, NULL,
312                            NULL,      expeditor, payload);
313 }
314
315
316 /** \brief Send the data pointed by \a payload as a message of type
317  * \a msgtype to the peer \a sock */
318 void
319 gras_msg_send(gras_socket_t   sock,
320               gras_msgtype_t  msgtype,
321               void           *payload) {
322
323   gras_msg_send_ext(sock, e_gras_msg_kind_oneway, msgtype, payload);
324 }
325
326 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
327  *
328  * @param timeOut: How long to wait for incoming messages (in seconds)
329  * @return the error code (or no_error).
330  *
331  * Messages are passed to the callbacks.
332  */
333 void
334 gras_msg_handle(double timeOut) {
335   
336   double          untiltimer;
337    
338   int             cpt;
339
340   s_gras_msg_t    msg;
341   gras_socket_t   expeditor=NULL;
342   void           *payload=NULL;
343   gras_msgtype_t  msgtype=NULL;
344
345   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
346   gras_cblist_t  *list=NULL;
347   gras_msg_cb_t       cb;
348    
349   int timerexpected, timeouted;
350   xbt_ex_t e;
351
352   VERB1("Handling message within the next %.2fs",timeOut);
353   
354   untiltimer = gras_msg_timer_handle();
355   DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer);
356   if (untiltimer == 0.0) {
357      /* A timer was already elapsed and handled */
358      return;
359   }
360   if (untiltimer != -1.0) {
361      timerexpected = 1;
362      timeOut = MIN(timeOut, untiltimer);
363   } else {
364      timerexpected = 0;
365   }
366    
367   /* get a message (from the queue or from the net) */
368   timeouted = 0;
369   if (xbt_dynar_length(pd->msg_queue)) {
370     DEBUG0("Get a message from the queue");
371     xbt_dynar_shift(pd->msg_queue,&msg);
372     expeditor = msg.expe;
373     msgtype   = msg.type;
374     payload   = msg.payl;
375   } else {
376     TRY {
377       expeditor = gras_trp_select(timeOut);
378     } CATCH(e) {
379       if (e.category != timeout_error)
380         RETHROW;
381       xbt_ex_free(e);
382       timeouted = 1;
383     }
384
385     if (!timeouted) {
386       TRY {
387         /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
388         gras_msg_recv(expeditor, &msg);
389         msgtype   = msg.type;
390         payload   = msg.payl;
391       } CATCH(e) {
392         RETHROW1("Error caught while receiving a message on select()ed socket %p: %s",
393                  expeditor);
394       }
395     }
396   }
397
398   if (timeouted) {
399      if (timerexpected) {
400           
401         /* A timer elapsed before the arrival of any message even if we select()ed a bit */
402         untiltimer = gras_msg_timer_handle();
403         if (untiltimer == 0.0) {
404           /* we served a timer, we're done */
405           return;
406         } else {
407            xbt_assert1(untiltimer>0, "Negative timer (%f). I'm 'puzzeled'", untiltimer);
408            WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
409                   untiltimer);
410            THROW1(timeout_error,0,
411                   "No timer elapsed, in contrary to expectations (next in %f sec)",
412                   untiltimer);
413         }
414         
415      } else {
416         /* select timeouted, and no timer elapsed. Nothing to do */
417        THROW0(timeout_error, 0, "No new message or timer");
418      }
419      
420   }
421    
422   /* A message was already there or arrived in the meanwhile. handle it */
423   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
424     if (list->id == msgtype->code) {
425       break;
426     } else {
427       list=NULL;
428     }
429   }
430   if (!list) {
431     INFO1("No callback for the incomming '%s' message. Discarded.", 
432           msgtype->name);
433     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
434     return;
435   }
436   
437   xbt_dynar_foreach(list->cbs,cpt,cb) { 
438     VERB3("Use the callback #%d (@%p) for incomming msg %s",
439           cpt+1,cb,msgtype->name);
440     if ((*cb)(expeditor,payload)) {
441       /* cb handled the message */
442       free(payload);
443       return;
444     }
445   }
446
447   /* FIXME: gras_datadesc_free not implemented => leaking the payload */
448   THROW1(mismatch_error,0,
449          "Message '%s' refused by all registered callbacks", msgtype->name);
450 }
451
452 void
453 gras_cbl_free(void *data){
454   gras_cblist_t *list=*(void**)data;
455   if (list) {
456     xbt_dynar_free(&( list->cbs ));
457     free(list);
458   }
459 }
460
461 /** \brief Bind the given callback to the given message type 
462  *
463  * Several callbacks can be attached to a given message type. The lastly added one will get the message first, and 
464  * if it returns false, the message will be passed to the second one. 
465  * And so on until one of the callbacks accepts the message.
466  */
467 void
468 gras_cb_register(gras_msgtype_t msgtype,
469                  gras_msg_cb_t cb) {
470   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
471   gras_cblist_t *list=NULL;
472   int cpt;
473
474   DEBUG2("Register %p as callback to '%s'",cb,msgtype->name);
475
476   /* search the list of cb for this message on this host (creating if NULL) */
477   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
478     if (list->id == msgtype->code) {
479       break;
480     } else {
481       list=NULL;
482     }
483   }
484   if (!list) {
485     /* First cb? Create room */
486     list = xbt_new(gras_cblist_t,1);
487     list->id = msgtype->code;
488     list->cbs = xbt_dynar_new(sizeof(gras_msg_cb_t), NULL);
489     xbt_dynar_push(pd->cbl_list,&list);
490   }
491
492   /* Insert the new one into the set */
493   xbt_dynar_insert_at(list->cbs,0,&cb);
494 }
495
496 /** \brief Unbind the given callback from the given message type */
497 void
498 gras_cb_unregister(gras_msgtype_t msgtype,
499                    gras_msg_cb_t cb) {
500
501   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
502   gras_cblist_t *list;
503   gras_msg_cb_t cb_cpt;
504   int cpt;
505   int found = 0;
506
507   /* search the list of cb for this message on this host */
508   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
509     if (list->id == msgtype->code) {
510       break;
511     } else {
512       list=NULL;
513     }
514   }
515
516   /* Remove it from the set */
517   if (list) {
518     xbt_dynar_foreach(list->cbs,cpt,cb_cpt) {
519       if (cb == cb_cpt) {
520         xbt_dynar_cursor_rm(list->cbs, &cpt);
521         found = 1;
522       }
523     }
524   }
525   if (!found)
526     VERB1("Ignoring removal of unexisting callback to msg id %d",
527           msgtype->code);
528 }