Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
1ac3636a793a6d73a61cb7697bac8a155efcdef8
[simgrid.git] / src / gras / Msg / msg.c
1 /* $Id$ */
2
3 /* messaging - Function related to messaging (code shared between RL and SG)*/
4
5 /* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved.            */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "gras/Msg/msg_private.h"
12 #include "gras/Virtu/virtu_interface.h"
13 #include "gras/DataDesc/datadesc_interface.h"
14 #include "gras/Transport/transport_interface.h" /* gras_select */
15
16 #ifndef MIN
17 #define MIN(a,b) ((a) < (b) ? (a) : (b))
18 #endif
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
21
22 xbt_set_t _gras_msgtype_set = NULL;
23 static char *make_namev(const char *name, short int ver);
24 char _GRAS_header[6];
25
26 /*
27  * Creating procdata for this module
28  */
29 static void *gras_msg_procdata_new() {
30    gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1);
31    
32    res->name = xbt_strdup("gras_msg");
33    res->name_len = 0;
34    res->msg_queue = xbt_dynar_new(sizeof(s_gras_msg_t),   NULL);
35    res->cbl_list  = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
36    res->timers    = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
37    
38    return (void*)res;
39 }
40
41 /*
42  * Freeing procdata for this module
43  */
44 static void gras_msg_procdata_free(void *data) {
45    gras_msg_procdata_t res = (gras_msg_procdata_t)data;
46    
47    xbt_dynar_free(&( res->msg_queue ));
48    xbt_dynar_free(&( res->cbl_list ));
49    xbt_dynar_free(&( res->timers ));
50
51    free(res->name);
52    free(res);
53 }
54
55 /*
56  * Module registration
57  */
58 int gras_msg_libdata_id;
59 void gras_msg_register() {
60    gras_msg_libdata_id = gras_procdata_add("gras_msg",gras_msg_procdata_new, gras_msg_procdata_free);
61 }
62
63 /*
64  * Initialize this submodule.
65  */
66 void gras_msg_init(void) {
67   /* only initialize once */
68   if (_gras_msgtype_set != NULL)
69     return;
70
71   VERB0("Initializing Msg");
72   
73   _gras_msgtype_set = xbt_set_new();
74
75   memcpy(_GRAS_header,"GRAS", 4);
76   _GRAS_header[4]=GRAS_PROTOCOL_VERSION;
77   _GRAS_header[5]=(char)GRAS_THISARCH;
78 }
79
80 /*
81  * Finalize the msg module
82  */
83 void
84 gras_msg_exit(void) {
85   VERB0("Exiting Msg");
86   xbt_set_free(&_gras_msgtype_set);
87 }
88
89 /*
90  * Reclamed memory
91  */
92 void gras_msgtype_free(void *t) {
93   gras_msgtype_t msgtype=(gras_msgtype_t)t;
94   if (msgtype) {
95     free(msgtype->name);
96     free(msgtype);
97   }
98 }
99
100 /**
101  * make_namev:
102  *
103  * Returns the versionned name of the message. If the version is 0, that's 
104  * the name unchanged. Pay attention to this before free'ing the result.
105  */
106 static char *make_namev(const char *name, short int ver) {
107   char *namev;
108
109   if (!ver)
110     return (char *)name;
111
112   namev = (char*)xbt_malloc(strlen(name)+2+3+1);
113
114   if (namev)
115       sprintf(namev,"%s_v%d",name,ver);
116
117   return namev;
118 }
119
120 /** @brief declare a new message type of the given name. It only accepts the given datadesc as payload
121  *
122  * @param name: name as it should be used for logging messages (must be uniq)
123  * @param payload: datadescription of the payload
124  */
125 void gras_msgtype_declare(const char           *name,
126                           gras_datadesc_type_t  payload) {
127    gras_msgtype_declare_v(name, 0, payload);
128 }
129
130 /** @brief declare a new versionned message type of the given name and payload
131  *
132  * @param name: name as it should be used for logging messages (must be uniq)
133  * @param version: something like versionning symbol
134  * @param payload: datadescription of the payload
135  *
136  * Registers a message to the GRAS mechanism. Use this version instead of 
137  * gras_msgtype_declare when you change the semantic or syntax of a message and
138  * want your programs to be able to deal with both versions. Internally, each
139  * will be handled as an independent message type, so you can register 
140  * differents for each of them.
141  */
142 void
143 gras_msgtype_declare_v(const char           *name,
144                        short int             version,
145                        gras_datadesc_type_t  payload) {
146  
147   gras_msgtype_t msgtype=NULL;
148   char *namev=make_namev(name,version);
149   volatile int found = 0;
150   xbt_ex_t e;    
151   
152   TRY {
153     msgtype = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set,namev);
154     found = 1;
155   } CATCH(e) {
156     if (e.category != not_found_error)
157       RETHROW;
158     xbt_ex_free(e);
159   }
160
161   if (found) {
162     VERB2("Re-register version %d of message '%s' (same payload, ignored).",
163           version, name);
164     xbt_assert3(!gras_datadesc_type_cmp(msgtype->ctn_type, payload),
165                  "Message %s re-registred with another payload (%s was %s)",
166                  namev,gras_datadesc_get_name(payload),
167                  gras_datadesc_get_name(msgtype->ctn_type));
168
169     return ; /* do really ignore it */
170
171   }
172
173   VERB3("Register version %d of message '%s' (payload: %s).", 
174         version, name, gras_datadesc_get_name(payload));    
175
176   msgtype = xbt_new(s_gras_msgtype_t,1);
177   msgtype->name = (namev == name ? strdup(name) : namev);
178   msgtype->name_len = strlen(namev);
179   msgtype->version = version;
180   msgtype->ctn_type = payload;
181
182   xbt_set_add(_gras_msgtype_set, (xbt_set_elm_t)msgtype,
183                &gras_msgtype_free);
184 }
185
186 /** @brief retrive an existing message type from its name. */
187 gras_msgtype_t gras_msgtype_by_name (const char *name) {
188   return gras_msgtype_by_namev(name,0);
189 }
190
191 /** @brief retrive an existing message type from its name and version. */
192 gras_msgtype_t gras_msgtype_by_namev(const char      *name,
193                                      short int        version) {
194   gras_msgtype_t res;
195   char *namev = make_namev(name,version); 
196
197   res = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set, namev);
198   if (name != namev) 
199     free(namev);
200   
201   return res;
202 }
203 /** @brief retrive an existing message type from its name and version. */
204 gras_msgtype_t gras_msgtype_by_id(int id) {
205   return (gras_msgtype_t)xbt_set_get_by_id(_gras_msgtype_set, id);
206 }
207
208 /** \brief Waits for a message to come in over a given socket. 
209  *
210  * @param timeout: How long should we wait for this message.
211  * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message)
212  * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant)
213  * @param payl_filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant)
214  * @param filter_ctx: context passed as second argument of the filter (a pattern to match?)
215  * @param[out] msgt_got: where to write the descriptor of the message we got
216  * @param[out] expe_got: where to create a socket to answer the incomming message
217  * @param[out] payl_got: where to write the payload of the incomming message
218  *
219  * Every message of another type received before the one waited will be queued
220  * and used by subsequent call to this function or gras_msg_handle().
221  */
222
223 void
224 gras_msg_wait_ext(double           timeout,    
225                   gras_msgtype_t   msgt_want,
226                   gras_socket_t    expe_want,
227                   int_f_pvoid_pvoid_t payl_filter,
228                   void               *filter_ctx, 
229                   gras_msgtype_t  *msgt_got,
230                   gras_socket_t   *expe_got,
231                   void            *payl_got) {
232
233   s_gras_msg_t msg;
234   double start, now;
235   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
236   int cpt;
237
238   xbt_assert0(msgt_want,"Cannot wait for the NULL message");
239
240   VERB1("Waiting for message '%s'",msgt_want->name);
241
242   start = now = gras_os_time();
243
244   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
245     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
246          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
247                                      gras_socket_peer_name(expe_want))))
248          && (!payl_filter || payl_filter(msg.payl,filter_ctx))) {
249
250       if (expe_got)
251         *expe_got = msg.expe;
252       if (msgt_got)
253         *msgt_got = msg.type;
254       if (payl_got) 
255         memcpy(payl_got, msg.payl, msg.payl_size);
256       free(msg.payl);
257       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
258       VERB0("The waited message was queued");
259       return;
260     }
261   }
262
263   while (1) {
264     memset(&msg,sizeof(msg),0);
265
266     msg.expe = gras_trp_select(timeout - now + start);
267     gras_msg_recv(msg.expe, &msg);
268
269     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
270          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
271                                      gras_socket_peer_name(expe_want))))
272          && (!payl_filter || payl_filter(msg.payl,filter_ctx))) {
273
274       if (expe_got)
275         *expe_got=msg.expe;
276       if (msgt_got)
277         *msgt_got = msg.type;
278       if (payl_got) 
279         memcpy(payl_got, msg.payl, msg.payl_size);
280       free(msg.payl);
281       return;
282     }
283
284     /* not expected msg type. Queue it for later */
285     xbt_dynar_push(pd->msg_queue,&msg);
286     
287     now=gras_os_time();
288     if (now - start + 0.001 < timeout) {
289       THROW1(timeout_error,  now-start+0.001-timeout,
290              "Timeout while waiting for msg %s",msgt_want->name);
291     }
292   }
293
294   THROW_IMPOSSIBLE;
295 }
296 /** \brief Waits for a message to come in over a given socket. 
297  *
298  * @param timeout: How long should we wait for this message.
299  * @param msgt_want: type of awaited msg
300  * @param[out] expeditor: where to create a socket to answer the incomming message
301  * @param[out] payload: where to write the payload of the incomming message
302  * @return the error code (or no_error).
303  *
304  * Every message of another type received before the one waited will be queued
305  * and used by subsequent call to this function or gras_msg_handle().
306  */
307 void
308 gras_msg_wait(double           timeout,    
309               gras_msgtype_t   msgt_want,
310               gras_socket_t   *expeditor,
311               void            *payload) {
312
313   return gras_msg_wait_ext(timeout,
314                            msgt_want, NULL,      NULL, NULL,
315                            NULL,      expeditor, payload);
316 }
317
318
319 /** \brief Send the data pointed by \a payload as a message of type
320  * \a msgtype to the peer \a sock */
321 void
322 gras_msg_send(gras_socket_t   sock,
323               gras_msgtype_t  msgtype,
324               void           *payload) {
325
326   gras_msg_send_ext(sock, e_gras_msg_kind_oneway, msgtype, payload);
327 }
328
329 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
330  *
331  * @param timeOut: How long to wait for incoming messages (in seconds)
332  * @return the error code (or no_error).
333  *
334  * Messages are passed to the callbacks.
335  */
336 void
337 gras_msg_handle(double timeOut) {
338   
339   double          untiltimer;
340    
341   int             cpt;
342
343   s_gras_msg_t    msg;
344   gras_socket_t   expeditor=NULL;
345   void           *payload=NULL;
346   gras_msgtype_t  msgtype=NULL;
347
348   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
349   gras_cblist_t  *list=NULL;
350   gras_msg_cb_t       cb;
351    
352   int timerexpected, timeouted;
353   xbt_ex_t e;
354
355   VERB1("Handling message within the next %.2fs",timeOut);
356   
357   untiltimer = gras_msg_timer_handle();
358   DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer);
359   if (untiltimer == 0.0) {
360      /* A timer was already elapsed and handled */
361      return;
362   }
363   if (untiltimer != -1.0) {
364      timerexpected = 1;
365      timeOut = MIN(timeOut, untiltimer);
366   } else {
367      timerexpected = 0;
368   }
369    
370   /* get a message (from the queue or from the net) */
371   timeouted = 0;
372   if (xbt_dynar_length(pd->msg_queue)) {
373     DEBUG0("Get a message from the queue");
374     xbt_dynar_shift(pd->msg_queue,&msg);
375     expeditor = msg.expe;
376     msgtype   = msg.type;
377     payload   = msg.payl;
378   } else {
379     TRY {
380       expeditor = gras_trp_select(timeOut);
381     } CATCH(e) {
382       if (e.category != timeout_error)
383         RETHROW;
384       xbt_ex_free(e);
385       timeouted = 1;
386     }
387
388     if (!timeouted) {
389       TRY {
390         /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
391         gras_msg_recv(expeditor, &msg);
392         msgtype   = msg.type;
393         payload   = msg.payl;
394       } CATCH(e) {
395         RETHROW1("Error caught while receiving a message on select()ed socket %p: %s",
396                  expeditor);
397       }
398     }
399   }
400
401   if (timeouted) {
402      if (timerexpected) {
403           
404         /* A timer elapsed before the arrival of any message even if we select()ed a bit */
405         untiltimer = gras_msg_timer_handle();
406         if (untiltimer == 0.0) {
407           /* we served a timer, we're done */
408           return;
409         } else {
410            xbt_assert1(untiltimer>0, "Negative timer (%f). I'm 'puzzeled'", untiltimer);
411            WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
412                   untiltimer);
413            THROW1(timeout_error,0,
414                   "No timer elapsed, in contrary to expectations (next in %f sec)",
415                   untiltimer);
416         }
417         
418      } else {
419         /* select timeouted, and no timer elapsed. Nothing to do */
420        THROW0(timeout_error, 0, "No new message or timer");
421      }
422      
423   }
424    
425   /* A message was already there or arrived in the meanwhile. handle it */
426   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
427     if (list->id == msgtype->code) {
428       break;
429     } else {
430       list=NULL;
431     }
432   }
433   if (!list) {
434     INFO1("No callback for the incomming '%s' message. Discarded.", 
435           msgtype->name);
436     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
437     return;
438   }
439   
440   xbt_dynar_foreach(list->cbs,cpt,cb) { 
441     VERB3("Use the callback #%d (@%p) for incomming msg %s",
442           cpt+1,cb,msgtype->name);
443     if ((*cb)(expeditor,payload)) {
444       /* cb handled the message */
445       free(payload);
446       return;
447     }
448   }
449
450   /* FIXME: gras_datadesc_free not implemented => leaking the payload */
451   THROW1(mismatch_error,0,
452          "Message '%s' refused by all registered callbacks", msgtype->name);
453 }
454
455 void
456 gras_cbl_free(void *data){
457   gras_cblist_t *list=*(void**)data;
458   if (list) {
459     xbt_dynar_free(&( list->cbs ));
460     free(list);
461   }
462 }
463
464 /** \brief Bind the given callback to the given message type 
465  *
466  * Several callbacks can be attached to a given message type. The lastly added one will get the message first, and 
467  * if it returns false, the message will be passed to the second one. 
468  * And so on until one of the callbacks accepts the message.
469  */
470 void
471 gras_cb_register(gras_msgtype_t msgtype,
472                  gras_msg_cb_t cb) {
473   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
474   gras_cblist_t *list=NULL;
475   int cpt;
476
477   DEBUG2("Register %p as callback to '%s'",cb,msgtype->name);
478
479   /* search the list of cb for this message on this host (creating if NULL) */
480   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
481     if (list->id == msgtype->code) {
482       break;
483     } else {
484       list=NULL;
485     }
486   }
487   if (!list) {
488     /* First cb? Create room */
489     list = xbt_new(gras_cblist_t,1);
490     list->id = msgtype->code;
491     list->cbs = xbt_dynar_new(sizeof(gras_msg_cb_t), NULL);
492     xbt_dynar_push(pd->cbl_list,&list);
493   }
494
495   /* Insert the new one into the set */
496   xbt_dynar_insert_at(list->cbs,0,&cb);
497 }
498
499 /** \brief Unbind the given callback from the given message type */
500 void
501 gras_cb_unregister(gras_msgtype_t msgtype,
502                    gras_msg_cb_t cb) {
503
504   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
505   gras_cblist_t *list;
506   gras_msg_cb_t cb_cpt;
507   int cpt;
508   int found = 0;
509
510   /* search the list of cb for this message on this host */
511   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
512     if (list->id == msgtype->code) {
513       break;
514     } else {
515       list=NULL;
516     }
517   }
518
519   /* Remove it from the set */
520   if (list) {
521     xbt_dynar_foreach(list->cbs,cpt,cb_cpt) {
522       if (cb == cb_cpt) {
523         xbt_dynar_cursor_rm(list->cbs, &cpt);
524         found = 1;
525       }
526     }
527   }
528   if (!found)
529     VERB1("Ignoring removal of unexisting callback to msg id %d",
530           msgtype->code);
531 }