Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
4cb483b19ee8c8c9e2d6c34c99c17bd6c6842526
[simgrid.git] / src / gras / Msg / msg.c
1 /* $Id$ */
2
3 /* messaging - Function related to messaging (code shared between RL and SG)*/
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2003 the OURAGAN project.                                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "Msg/msg_private.h"
12
13 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(msg,GRAS);
14
15 gras_set_t *_gras_msgtype_set = NULL;
16 static char GRAS_header[6];
17 static char *make_namev(const char *name, short int ver);
18
19 /**
20  * gras_msg_init:
21  *
22  * Initialize this submodule.
23  */
24 void gras_msg_init(void) {
25   gras_error_t errcode;
26   
27   /* only initialize once */
28   if (_gras_msgtype_set != NULL)
29     return;
30
31   VERB0("Initializing Msg");
32   
33   TRYFAIL(gras_set_new(&_gras_msgtype_set));
34
35   memcpy(GRAS_header,"GRAS", 4);
36   GRAS_header[4]=GRAS_PROTOCOL_VERSION;
37   GRAS_header[5]=(char)GRAS_THISARCH;
38 }
39
40 /**
41  * gras_msg_exit:
42  *
43  * Finalize the msg module
44  **/
45 void
46 gras_msg_exit(void) {
47   VERB0("Exiting Msg");
48   gras_set_free(&_gras_msgtype_set);
49   _gras_msgtype_set = NULL;
50 }
51
52 /**
53  * gras_msgtype_free:
54  *
55  * Reclamed memory
56  */
57 void gras_msgtype_free(void *t) {
58   gras_msgtype_t *msgtype=(gras_msgtype_t *)t;
59   if (msgtype) {
60     free(msgtype->name);
61     free(msgtype);
62   }
63 }
64
65 /**
66  * make_namev:
67  *
68  * Returns the versionned name of the message. If the version is 0, that's 
69  * the name unchanged. Pay attention to this before free'ing the result.
70  */
71 static char *make_namev(const char *name, short int ver) {
72   char *namev=malloc(strlen(name)+2+3+1);
73
74   if (!ver)
75     return (char *)name;
76
77   if (namev) {
78       sprintf(namev,"%s_v%d",name,ver);
79   }
80   return namev;
81 }
82
83 /**
84  * gras_msgtype_declare:
85  * @name: name as it should be used for logging messages (must be uniq)
86  * @payload: datadescription of the payload
87  *
88  * Registers a message to the GRAS mecanism.
89  */
90 gras_error_t
91 gras_msgtype_declare(const char            *name,
92                      gras_datadesc_type_t  *payload,
93                      gras_msgtype_t       **dst) {
94   return gras_msgtype_declare_v(name, 0, payload, dst);
95 }
96
97 /**
98  * gras_msgtype_declare_v:
99  * @name: name as it should be used for logging messages (must be uniq)
100  * @version: something like versionning symbol
101  * @payload: datadescription of the payload
102  *
103  * Registers a message to the GRAS mecanism. Use this version instead of 
104  * gras_msgtype_declare when you change the semantic or syntax of a message and
105  * want your programs to be able to deal with both versions. Internally, each
106  * will be handled as an independent message type, so you can register 
107  * differents for each of them.
108  */
109 gras_error_t
110 gras_msgtype_declare_v(const char            *name,
111                        short int              version,
112                        gras_datadesc_type_t  *payload,
113                        gras_msgtype_t       **dst) {
114  
115   gras_error_t    errcode;
116   gras_msgtype_t *msgtype;
117   char *namev=make_namev(name,version);
118   
119   if (!namev)
120     RAISE_MALLOC;
121
122   errcode = gras_set_get_by_name(_gras_msgtype_set,
123                                  namev,(gras_set_elm_t**)msgtype);
124   if (errcode == mismatch_error) {
125     /* create type */
126     if (! (msgtype = malloc(sizeof(gras_msgtype_t))) ) 
127       RAISE_MALLOC;
128
129     msgtype->name = (namev == name ? strdup(name) : namev);
130     msgtype->name_len = strlen(namev);
131     msgtype->version = version;
132     msgtype->ctn_type = payload;
133
134     TRY(gras_set_add(_gras_msgtype_set, (gras_set_elm_t*)msgtype,
135                      &gras_msgtype_free));
136
137     DEBUG2("Register version %d of message '%s'.", version, name);
138   } else if (errcode == no_error) { /* found */ 
139     if (namev != name)
140       free(namev);
141
142     gras_assert1(!gras_datadesc_type_cmp(msgtype->ctn_type, payload),
143                  "Message %s registred again with a different payload",
144                  namev);
145   } else {
146     if (namev != name)
147       free(namev);
148     return errcode; /* Error is set lookup */
149   }
150
151   return no_error;
152 }
153
154 /**
155  * gras_msgtype_by_name:
156  *
157  * Retrieve a datatype description from its name
158  */
159 gras_error_t 
160 gras_msgtype_by_name (const char     *name,
161                       gras_msgtype_t **dst) {
162   return gras_msgtype_by_namev(name,0,dst);
163 }
164 /**
165  * gras_msgtype_by_namev:
166  *
167  * Retrieve a datatype description from its name and version
168  */
169 gras_error_t
170 gras_msgtype_by_namev(const char      *name,
171                       short int        version,
172                       gras_msgtype_t **dst) {
173
174   gras_error_t errcode;
175   char *namev = make_namev(name,version); 
176
177   TRY(gras_set_get_by_name(_gras_msgtype_set, namev,
178                            (gras_set_elm_t**)dst));
179   if (name != namev) 
180     free(namev);
181
182   return no_error;
183 }
184
185 /**
186  * gras_msg_send:
187  *
188  * Send the given message on the given socket 
189  */
190 gras_error_t
191 gras_msg_send(gras_socket_t  *sock,
192               gras_msgtype_t *msgtype,
193               void           *payload) {
194
195   gras_error_t errcode;
196   static gras_datadesc_type_t *string_type=NULL;
197   if (!string_type)
198     TRY(gras_datadesc_by_name("string", &string_type));
199   
200   TRY(gras_trp_chunk_send(sock, GRAS_header, 6));
201
202   TRY(gras_datadesc_send(sock, string_type,       msgtype->name));
203   TRY(gras_datadesc_send(sock, msgtype->ctn_type, payload));
204
205   return no_error;
206 }
207 /**
208  * gras_msg_recv:
209  *
210  * receive the next message on the given socket (which should be dropped 
211  * when the function returns an error)
212  */
213 gras_error_t
214 gras_msg_recv(gras_socket_t   *sock,
215               gras_msgtype_t **msgtype,
216               void           **payload) {
217   
218   gras_error_t errcode;
219   static gras_datadesc_type_t *string_type=NULL;
220   char header[6];
221   int cpt;
222   int r_arch;
223   char *msg_name;
224
225   if (!string_type)
226     TRY(gras_datadesc_by_name("string", &string_type));
227   
228   TRY(gras_trp_chunk_recv(sock, header, 6));
229   for (cpt=0; cpt<4; cpt++)
230     if (header[cpt] != GRAS_header[cpt])
231       RAISE0(mismatch_error,"Incoming bytes do not look like a GRAS message");
232   if (header[4] != GRAS_header[4]) 
233     RAISE2(mismatch_error,"GRAS protocol mismatch (got %d, use %d)",
234            (int)header[4], (int)GRAS_header[4]);
235   r_arch = (int)header[5];
236
237   TRY(gras_datadesc_recv(sock, string_type, r_arch,(void**) &msg_name));
238   TRY(gras_set_get_by_name(_gras_msgtype_set,
239                            msg_name,(gras_set_elm_t**)msgtype));
240   TRY(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, payload));
241
242   return no_error;
243 }
244
245 /**
246  * gras_msg_wait:
247  * @timeout: How long should we wait for this message.
248  * @id: id of awaited msg
249  * @Returns: the error code (or no_error).
250  *
251  * Waits for a message to come in over a given socket.
252  *
253  * Every message of another type received before the one waited will be queued
254  * and used by subsequent call to this function or MsgHandle().
255  */
256 gras_error_t
257 gras_msg_wait(double                 timeout,    
258               gras_msgtype_t        *msgt_want,
259               gras_socket_t        **expeditor,
260               void                 **payload) {
261
262   gras_msgtype_t *msgt_got;
263   gras_error_t errcode;
264   double start, now;
265   gras_procdata_t *pd=gras_procdata_get();
266   int cpt;
267   gras_msg_t msg;
268   
269   *expeditor = NULL;
270   *payload   = NULL;
271
272   VERB1("Waiting for message %s",msgt_want->name);
273
274   start = now = gras_time();
275
276   gras_dynar_foreach(pd->msg_queue,cpt,msg){
277     if (msg.type->code == msgt_want->code) {
278       *expeditor = msg.expeditor;
279       *payload   = msg.payload;
280       gras_dynar_cursor_rm(pd->msg_queue, &cpt);
281       VERB0("Waited message was queued");
282       return no_error;
283     }
284   }
285
286   while (1) {
287     TRY(gras_trp_select(timeout - now + start, expeditor));
288     TRY(gras_msg_recv(*expeditor, &msgt_got, payload));
289     if (msgt_got->code == msgt_want->code) {
290       VERB0("Got waited message");
291       return no_error;
292     }
293
294     /* not expected msg type. Queue it for later */
295     msg.expeditor = *expeditor;
296     msg.type      =  msgt_got;
297     msg.payload   = *payload;
298     TRY(gras_dynar_push(pd->msg_queue,&msg));
299     
300     now=gras_time();
301     if (now - start + 0.001 < timeout) {
302       RAISE1(timeout_error,"Timeout while waiting for msg %s",msgt_want->name);
303     }
304   }
305
306   RAISE_IMPOSSIBLE;
307 }
308
309 /**
310  * gras_msg_handle:
311  * @timeOut: How long to wait for incoming messages
312  * @Returns: the error code (or no_error).
313  *
314  * Waits up to #timeOut# seconds to see if a message comes in; if so, calls the
315  * registered listener for that message (see RegisterCallback()).
316  */
317 gras_error_t 
318 gras_msg_handle(double timeOut) {
319   
320   gras_error_t    errcode;
321   int             cpt;
322
323   gras_msg_t      msg;
324   gras_socket_t  *expeditor;
325   void           *payload;
326   gras_msgtype_t *msgtype;
327
328   gras_procdata_t*pd=gras_procdata_get();
329   gras_cblist_t  *list;
330   gras_cb_t       cb;
331
332
333
334   VERB1("Handling message within the next %.2s",timeOut);
335   
336   /* get a message (from the queue or from the net) */
337   if (gras_dynar_length(pd->msg_queue)) {
338     gras_dynar_shift(pd->msg_queue,&msg);
339     expeditor = msg.expeditor;
340     msgtype   = msg.type;
341     payload   = msg.payload;
342     
343   } else {
344     TRY(gras_trp_select(timeOut, &expeditor));
345     TRY(gras_msg_recv(expeditor, &msgtype, &payload));
346   }
347       
348   /* handle it */
349   gras_dynar_foreach(pd->cbl_list,cpt,list) {
350     if (list->id == msgtype->code) {
351       break;
352     } else {
353       list=NULL;
354     }
355   }
356   if (!list) {
357     INFO1("Unexpected message '%s' ignored", msgtype->name);
358     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
359     return no_error;
360   }
361   
362   gras_dynar_foreach(list->cbs,cpt,cb) { 
363     if (cb(expeditor,msgtype->ctn_type,payload)) {
364       /* cb handled the message */
365       return no_error;
366     }
367   }
368
369   INFO1("Message '%s' refused by all registered callbacks", msgtype->name);
370   WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
371   return mismatch_error;
372 }
373
374 gras_error_t
375 gras_cb_register(gras_msgtype_t *msgtype,
376                  gras_cb_t cb) {
377   gras_error_t errcode;
378   gras_procdata_t *pd=gras_procdata_get();
379   gras_cblist_t *list;
380   int cpt;
381
382   /* search the list of cb for this message on this host (creating if NULL) */
383   gras_dynar_foreach(pd->cbl_list,cpt,list) {
384     if (list->id == msgtype->code) {
385       break;
386     } else {
387       list=NULL;
388     }
389   }
390   if (!list) {
391     /* First cb? Create room */
392     list = malloc(sizeof(gras_cblist_t));
393     if (!list)
394       RAISE_MALLOC;
395
396     list->id = msgtype->code;
397     TRY(gras_dynar_new(&(list->cbs), sizeof(gras_cb_t), NULL));
398     TRY(gras_dynar_push(pd->cbl_list,&list));
399   }
400
401   /* Insert the new one into the set */
402   TRY(gras_dynar_insert_at(list->cbs,0,cb));
403
404   return no_error;
405 }
406
407 void
408 gras_cb_unregister(gras_msgtype_t *msgtype,
409                    gras_cb_t cb) {
410
411   gras_procdata_t *pd=gras_procdata_get();
412   gras_cblist_t *list;
413   gras_cb_t cb_cpt;
414   int cpt;
415   int found = 0;
416
417   /* search the list of cb for this message on this host */
418   gras_dynar_foreach(pd->cbl_list,cpt,list) {
419     if (list->id == msgtype->code) {
420       break;
421     } else {
422       list=NULL;
423     }
424   }
425
426   /* Remove it from the set */
427   if (list) {
428     gras_dynar_foreach(list->cbs,cpt,cb_cpt) {
429       if (cb == cb_cpt) {
430         gras_dynar_cursor_rm(list->cbs, &cpt);
431         found = 1;
432       }
433     }
434   }
435   if (!found)
436     VERB1("Ignoring removal of unexisting callback to msg id %d",
437           msgtype->code);
438 }