Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
077f51c21595fbf711bfbd54a78b2f61eb4905c9
[simgrid.git] / src / gras / Msg / msg.c
1 /* $Id$ */
2
3 /* messaging - Function related to messaging (code shared between RL and SG)*/
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2003 the OURAGAN project.                                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "Msg/msg_private.h"
12
13 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(msg,GRAS);
14
15 gras_set_t *_gras_msgtype_set = NULL;
16 static char GRAS_header[6];
17 static char *make_namev(const char *name, short int ver);
18
19 /**
20  * gras_msg_init:
21  *
22  * Initialize this submodule.
23  */
24 void gras_msg_init(void) {
25   gras_error_t errcode;
26   
27   /* only initialize once */
28   if (_gras_msgtype_set != NULL)
29     return;
30
31   VERB0("Initializing Msg");
32   
33   TRYFAIL(gras_set_new(&_gras_msgtype_set));
34
35   memcpy(GRAS_header,"GRAS", 4);
36   GRAS_header[4]=GRAS_PROTOCOL_VERSION;
37   GRAS_header[5]=(char)GRAS_THISARCH;
38 }
39
40 /**
41  * gras_msg_exit:
42  *
43  * Finalize the msg module
44  **/
45 void
46 gras_msg_exit(void) {
47   VERB0("Exiting Msg");
48   gras_set_free(&_gras_msgtype_set);
49   _gras_msgtype_set = NULL;
50 }
51
52 /**
53  * gras_msgtype_free:
54  *
55  * Reclamed memory
56  */
57 void gras_msgtype_free(void *t) {
58   gras_msgtype_t *msgtype=(gras_msgtype_t *)t;
59   if (msgtype) {
60     free(msgtype->name);
61     free(msgtype);
62   }
63 }
64
65 /**
66  * make_namev:
67  *
68  * Returns the versionned name of the message. If the version is 0, that's 
69  * the name unchanged. Pay attention to this before free'ing the result.
70  */
71 static char *make_namev(const char *name, short int ver) {
72   char *namev=malloc(strlen(name)+2+3+1);
73
74   if (!ver)
75     return (char *)name;
76
77   if (namev) {
78       sprintf(namev,"%s_v%d",name,ver);
79   }
80   return namev;
81 }
82
83 /**
84  * gras_msgtype_declare:
85  * @name: name as it should be used for logging messages (must be uniq)
86  * @payload: datadescription of the payload
87  *
88  * Registers a message to the GRAS mecanism.
89  */
90 gras_error_t
91 gras_msgtype_declare(const char            *name,
92                      gras_datadesc_type_t  *payload,
93                      gras_msgtype_t       **dst) {
94   return gras_msgtype_declare_v(name, 0, payload, dst);
95 }
96
97 /**
98  * gras_msgtype_declare_v:
99  * @name: name as it should be used for logging messages (must be uniq)
100  * @version: something like versionning symbol
101  * @payload: datadescription of the payload
102  *
103  * Registers a message to the GRAS mecanism. Use this version instead of 
104  * gras_msgtype_declare when you change the semantic or syntax of a message and
105  * want your programs to be able to deal with both versions. Internally, each
106  * will be handled as an independent message type, so you can register 
107  * differents for each of them.
108  */
109 gras_error_t
110 gras_msgtype_declare_v(const char            *name,
111                        short int              version,
112                        gras_datadesc_type_t  *payload,
113                        gras_msgtype_t       **dst) {
114  
115   gras_error_t    errcode;
116   gras_msgtype_t *msgtype;
117   char *namev=make_namev(name,version);
118   
119   if (!namev)
120     RAISE_MALLOC;
121
122   errcode = gras_set_get_by_name(_gras_msgtype_set,
123                                  namev,(gras_set_elm_t**)&msgtype);
124   if (errcode == mismatch_error) {
125     /* create type */
126     if (! (msgtype = malloc(sizeof(gras_msgtype_t))) ) 
127       RAISE_MALLOC;
128
129     msgtype->name = (namev == name ? strdup(name) : namev);
130     msgtype->name_len = strlen(namev);
131     msgtype->version = version;
132     msgtype->ctn_type = payload;
133
134     TRY(gras_set_add(_gras_msgtype_set, (gras_set_elm_t*)msgtype,
135                      &gras_msgtype_free));
136
137     DEBUG2("Register version %d of message '%s'.", version, name);
138   } else if (errcode == no_error) { /* found */ 
139     if (namev != name)
140       free(namev);
141
142     gras_assert1(!gras_datadesc_type_cmp(msgtype->ctn_type, payload),
143                  "Message %s registred again with a different payload",
144                  namev);
145   } else {
146     if (namev != name)
147       free(namev);
148     return errcode; /* Error is set lookup */
149   }
150
151   return no_error;
152 }
153
154 /**
155  * gras_msgtype_by_name:
156  *
157  * Retrieve a datatype description from its name
158  */
159 gras_error_t 
160 gras_msgtype_by_name (const char     *name,
161                       gras_msgtype_t **dst) {
162   return gras_msgtype_by_namev(name,0,dst);
163 }
164 /**
165  * gras_msgtype_by_namev:
166  *
167  * Retrieve a datatype description from its name and version
168  */
169 gras_error_t
170 gras_msgtype_by_namev(const char      *name,
171                       short int        version,
172                       gras_msgtype_t **dst) {
173
174   gras_error_t errcode;
175   char *namev = make_namev(name,version); 
176
177   TRY(gras_set_get_by_name(_gras_msgtype_set, namev,
178                            (gras_set_elm_t**)dst));
179   if (name != namev) 
180     free(namev);
181
182   return no_error;
183 }
184
185 /**
186  * gras_msg_send:
187  *
188  * Send the given message on the given socket 
189  */
190 gras_error_t
191 gras_msg_send(gras_socket_t  *sock,
192               gras_msgtype_t *msgtype,
193               void           *payload) {
194
195   gras_error_t errcode;
196   static gras_datadesc_type_t *string_type=NULL;
197   if (!string_type) {
198     string_type = gras_datadesc_by_name("string");
199     gras_assert(string_type);
200   }
201   
202   TRY(gras_trp_chunk_send(sock, GRAS_header, 6));
203
204   TRY(gras_datadesc_send(sock, string_type,       msgtype->name));
205   TRY(gras_datadesc_send(sock, msgtype->ctn_type, payload));
206
207   return no_error;
208 }
209 /**
210  * gras_msg_recv:
211  *
212  * receive the next message on the given socket (which should be dropped 
213  * when the function returns an error)
214  */
215 gras_error_t
216 gras_msg_recv(gras_socket_t   *sock,
217               gras_msgtype_t **msgtype,
218               void           **payload) {
219   
220   gras_error_t errcode;
221   static gras_datadesc_type_t *string_type=NULL;
222   char header[6];
223   int cpt;
224   int r_arch;
225   char *msg_name;
226
227   if (!string_type) {
228     string_type=gras_datadesc_by_name("string");
229     gras_assert(string_type);
230   }
231   
232   TRY(gras_trp_chunk_recv(sock, header, 6));
233   for (cpt=0; cpt<4; cpt++)
234     if (header[cpt] != GRAS_header[cpt])
235       RAISE0(mismatch_error,"Incoming bytes do not look like a GRAS message");
236   if (header[4] != GRAS_header[4]) 
237     RAISE2(mismatch_error,"GRAS protocol mismatch (got %d, use %d)",
238            (int)header[4], (int)GRAS_header[4]);
239   r_arch = (int)header[5];
240
241   TRY(gras_datadesc_recv(sock, string_type, r_arch,(void**) &msg_name));
242   TRY(gras_set_get_by_name(_gras_msgtype_set,
243                            msg_name,(gras_set_elm_t**)msgtype));
244   TRY(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, payload));
245
246   return no_error;
247 }
248
249 /**
250  * gras_msg_wait:
251  * @timeout: How long should we wait for this message.
252  * @id: id of awaited msg
253  * @Returns: the error code (or no_error).
254  *
255  * Waits for a message to come in over a given socket.
256  *
257  * Every message of another type received before the one waited will be queued
258  * and used by subsequent call to this function or MsgHandle().
259  */
260 gras_error_t
261 gras_msg_wait(double                 timeout,    
262               gras_msgtype_t        *msgt_want,
263               gras_socket_t        **expeditor,
264               void                 **payload) {
265
266   gras_msgtype_t *msgt_got;
267   gras_error_t errcode;
268   double start, now;
269   gras_procdata_t *pd=gras_procdata_get();
270   int cpt;
271   gras_msg_t msg;
272   
273   *expeditor = NULL;
274   *payload   = NULL;
275
276   VERB1("Waiting for message %s",msgt_want->name);
277
278   start = now = gras_time();
279
280   gras_dynar_foreach(pd->msg_queue,cpt,msg){
281     if (msg.type->code == msgt_want->code) {
282       *expeditor = msg.expeditor;
283       *payload   = msg.payload;
284       gras_dynar_cursor_rm(pd->msg_queue, &cpt);
285       VERB0("Waited message was queued");
286       return no_error;
287     }
288   }
289
290   while (1) {
291     TRY(gras_trp_select(timeout - now + start, expeditor));
292     TRY(gras_msg_recv(*expeditor, &msgt_got, payload));
293     if (msgt_got->code == msgt_want->code) {
294       VERB0("Got waited message");
295       return no_error;
296     }
297
298     /* not expected msg type. Queue it for later */
299     msg.expeditor = *expeditor;
300     msg.type      =  msgt_got;
301     msg.payload   = *payload;
302     TRY(gras_dynar_push(pd->msg_queue,&msg));
303     
304     now=gras_time();
305     if (now - start + 0.001 < timeout) {
306       RAISE1(timeout_error,"Timeout while waiting for msg %s",msgt_want->name);
307     }
308   }
309
310   RAISE_IMPOSSIBLE;
311 }
312
313 /**
314  * gras_msg_handle:
315  * @timeOut: How long to wait for incoming messages
316  * @Returns: the error code (or no_error).
317  *
318  * Waits up to #timeOut# seconds to see if a message comes in; if so, calls the
319  * registered listener for that message (see RegisterCallback()).
320  */
321 gras_error_t 
322 gras_msg_handle(double timeOut) {
323   
324   gras_error_t    errcode;
325   int             cpt;
326
327   gras_msg_t      msg;
328   gras_socket_t  *expeditor;
329   void           *payload;
330   gras_msgtype_t *msgtype;
331
332   gras_procdata_t*pd=gras_procdata_get();
333   gras_cblist_t  *list;
334   gras_cb_t       cb;
335
336
337
338   VERB1("Handling message within the next %d",timeOut);
339   
340   /* get a message (from the queue or from the net) */
341   if (gras_dynar_length(pd->msg_queue)) {
342     gras_dynar_shift(pd->msg_queue,&msg);
343     expeditor = msg.expeditor;
344     msgtype   = msg.type;
345     payload   = msg.payload;
346     
347   } else {
348     TRY(gras_trp_select(timeOut, &expeditor));
349     TRY(gras_msg_recv(expeditor, &msgtype, &payload));
350   }
351       
352   /* handle it */
353   gras_dynar_foreach(pd->cbl_list,cpt,list) {
354     if (list->id == msgtype->code) {
355       break;
356     } else {
357       list=NULL;
358     }
359   }
360   if (!list) {
361     INFO1("Unexpected message '%s' ignored", msgtype->name);
362     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
363     return no_error;
364   }
365   
366   gras_dynar_foreach(list->cbs,cpt,cb) { 
367     if (cb(expeditor,msgtype->ctn_type,payload)) {
368       /* cb handled the message */
369       return no_error;
370     }
371   }
372
373   INFO1("Message '%s' refused by all registered callbacks", msgtype->name);
374   WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
375   return mismatch_error;
376 }
377
378 gras_error_t
379 gras_cb_register(gras_msgtype_t *msgtype,
380                  gras_cb_t cb) {
381   gras_error_t errcode;
382   gras_procdata_t *pd=gras_procdata_get();
383   gras_cblist_t *list;
384   int cpt;
385
386   /* search the list of cb for this message on this host (creating if NULL) */
387   gras_dynar_foreach(pd->cbl_list,cpt,list) {
388     if (list->id == msgtype->code) {
389       break;
390     } else {
391       list=NULL;
392     }
393   }
394   if (!list) {
395     /* First cb? Create room */
396     list = malloc(sizeof(gras_cblist_t));
397     if (!list)
398       RAISE_MALLOC;
399
400     list->id = msgtype->code;
401     TRY(gras_dynar_new(&(list->cbs), sizeof(gras_cb_t), NULL));
402     TRY(gras_dynar_push(pd->cbl_list,&list));
403   }
404
405   /* Insert the new one into the set */
406   TRY(gras_dynar_insert_at(list->cbs,0,cb));
407
408   return no_error;
409 }
410
411 void
412 gras_cb_unregister(gras_msgtype_t *msgtype,
413                    gras_cb_t cb) {
414
415   gras_procdata_t *pd=gras_procdata_get();
416   gras_cblist_t *list;
417   gras_cb_t cb_cpt;
418   int cpt;
419   int found = 0;
420
421   /* search the list of cb for this message on this host */
422   gras_dynar_foreach(pd->cbl_list,cpt,list) {
423     if (list->id == msgtype->code) {
424       break;
425     } else {
426       list=NULL;
427     }
428   }
429
430   /* Remove it from the set */
431   if (list) {
432     gras_dynar_foreach(list->cbs,cpt,cb_cpt) {
433       if (cb == cb_cpt) {
434         gras_dynar_cursor_rm(list->cbs, &cpt);
435         found = 1;
436       }
437     }
438   }
439   if (!found)
440     VERB1("Ignoring removal of unexisting callback to msg id %d",
441           msgtype->code);
442 }