Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
write down the TODO
[simgrid.git] / src / gras / Msg / msg.c
1 /* $Id$ */
2
3 /* messaging - Function related to messaging (code shared between RL and SG)*/
4
5 /* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved.            */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10
11 #include "gras/Msg/msg_private.h"
12 #include "gras/DataDesc/datadesc_interface.h"
13 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
14 #include "gras/Virtu/virtu_interface.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg,gras,"High level messaging");
17
18 xbt_set_t _gras_msgtype_set = NULL;
19 static char GRAS_header[6];
20 static char *make_namev(const char *name, short int ver);
21
22 /*
23  * Initialize this submodule.
24  */
25 void gras_msg_init(void) {
26   /* only initialize once */
27   if (_gras_msgtype_set != NULL)
28     return;
29
30   VERB0("Initializing Msg");
31   
32   _gras_msgtype_set = xbt_set_new();
33
34   memcpy(GRAS_header,"GRAS", 4);
35   GRAS_header[4]=GRAS_PROTOCOL_VERSION;
36   GRAS_header[5]=(char)GRAS_THISARCH;
37 }
38
39 /*
40  * Finalize the msg module
41  */
42 void
43 gras_msg_exit(void) {
44   VERB0("Exiting Msg");
45   xbt_set_free(&_gras_msgtype_set);
46 }
47
48 /*
49  * Reclamed memory
50  */
51 void gras_msgtype_free(void *t) {
52   gras_msgtype_t msgtype=(gras_msgtype_t)t;
53   if (msgtype) {
54     xbt_free(msgtype->name);
55     xbt_free(msgtype);
56   }
57 }
58
59 /**
60  * make_namev:
61  *
62  * Returns the versionned name of the message. If the version is 0, that's 
63  * the name unchanged. Pay attention to this before free'ing the result.
64  */
65 static char *make_namev(const char *name, short int ver) {
66   char *namev;
67
68   if (!ver)
69     return (char *)name;
70
71   namev = (char*)xbt_malloc(strlen(name)+2+3+1);
72
73   if (namev)
74       sprintf(namev,"%s_v%d",name,ver);
75
76   return namev;
77 }
78
79 /**
80  * @param name: name as it should be used for logging messages (must be uniq)
81  * @param payload: datadescription of the payload
82  */
83 void gras_msgtype_declare(const char           *name,
84                           gras_datadesc_type_t  payload) {
85    gras_msgtype_declare_v(name, 0, payload);
86 }
87
88 /**
89  * @param name: name as it should be used for logging messages (must be uniq)
90  * @param version: something like versionning symbol
91  * @param payload: datadescription of the payload
92  *
93  * Registers a message to the GRAS mecanism. Use this version instead of 
94  * gras_msgtype_declare when you change the semantic or syntax of a message and
95  * want your programs to be able to deal with both versions. Internally, each
96  * will be handled as an independent message type, so you can register 
97  * differents for each of them.
98  */
99 void
100 gras_msgtype_declare_v(const char           *name,
101                        short int             version,
102                        gras_datadesc_type_t  payload) {
103  
104   xbt_error_t   errcode;
105   gras_msgtype_t msgtype;
106   char *namev=make_namev(name,version);
107   
108   errcode = xbt_set_get_by_name(_gras_msgtype_set,
109                                  namev,(xbt_set_elm_t*)&msgtype);
110
111   if (errcode == no_error) {
112     VERB2("Re-register version %d of message '%s' (same payload, ignored).",
113           version, name);
114     xbt_assert3(!gras_datadesc_type_cmp(msgtype->ctn_type, payload),
115                  "Message %s re-registred with another payload (%s was %s)",
116                  namev,gras_datadesc_get_name(payload),
117                  gras_datadesc_get_name(msgtype->ctn_type));
118
119     return ; /* do really ignore it */
120
121   }
122   xbt_assert_error(mismatch_error); /* expect this error */
123   VERB3("Register version %d of message '%s' (payload: %s).", 
124         version, name, gras_datadesc_get_name(payload));    
125
126   msgtype = xbt_new(s_gras_msgtype_t,1);
127   msgtype->name = (namev == name ? strdup(name) : namev);
128   msgtype->name_len = strlen(namev);
129   msgtype->version = version;
130   msgtype->ctn_type = payload;
131
132   xbt_set_add(_gras_msgtype_set, (xbt_set_elm_t)msgtype,
133                &gras_msgtype_free);
134 }
135
136 /*
137  * Retrieve a msgtype description from its name
138  */
139 gras_msgtype_t gras_msgtype_by_name (const char *name) {
140   return gras_msgtype_by_namev(name,0);
141 }
142 /*
143  * Retrieve a msgtype description from its name and version
144  */
145 gras_msgtype_t gras_msgtype_by_namev(const char      *name,
146                                      short int        version) {
147   gras_msgtype_t res;
148
149   xbt_error_t errcode;
150   char *namev = make_namev(name,version); 
151
152   errcode = xbt_set_get_by_name(_gras_msgtype_set, namev,
153                                  (xbt_set_elm_t*)&res);
154   if (errcode != no_error)
155     res = NULL;
156   if (!res) 
157      WARN1("msgtype_by_name(%s) returns NULL",namev);
158   if (name != namev) 
159     xbt_free(namev);
160   
161   return res;
162 }
163
164 /*
165  * Send the given message on the given socket 
166  */
167 xbt_error_t
168 gras_msg_send(gras_socket_t   sock,
169               gras_msgtype_t  msgtype,
170               void           *payload) {
171
172   xbt_error_t errcode;
173   static gras_datadesc_type_t string_type=NULL;
174
175   if (!msgtype)
176     RAISE0(mismatch_error,
177            "Cannot send the NULL message (did msgtype_by_name fail?)");
178
179   if (!string_type) {
180     string_type = gras_datadesc_by_name("string");
181     xbt_assert(string_type);
182   }
183
184   DEBUG3("send '%s' to %s:%d", msgtype->name, 
185          gras_socket_peer_name(sock),gras_socket_peer_port(sock));
186   TRY(gras_trp_chunk_send(sock, GRAS_header, 6));
187
188   TRY(gras_datadesc_send(sock, string_type,   &msgtype->name));
189   TRY(gras_datadesc_send(sock, msgtype->ctn_type, payload));
190   TRY(gras_trp_flush(sock));
191
192   return no_error;
193 }
194 /*
195  * receive the next message on the given socket.  
196  */
197 xbt_error_t
198 gras_msg_recv(gras_socket_t    sock,
199               gras_msgtype_t  *msgtype,
200               void           **payload,
201               int             *payload_size) {
202
203   xbt_error_t errcode;
204   static gras_datadesc_type_t string_type=NULL;
205   char header[6];
206   int cpt;
207   int r_arch;
208   char *msg_name=NULL;
209
210   if (!string_type) {
211     string_type=gras_datadesc_by_name("string");
212     xbt_assert(string_type);
213   }
214   
215   TRY(gras_trp_chunk_recv(sock, header, 6));
216   for (cpt=0; cpt<4; cpt++)
217     if (header[cpt] != GRAS_header[cpt])
218       RAISE0(mismatch_error,"Incoming bytes do not look like a GRAS message");
219   if (header[4] != GRAS_header[4]) 
220     RAISE2(mismatch_error,"GRAS protocol mismatch (got %d, use %d)",
221            (int)header[4], (int)GRAS_header[4]);
222   r_arch = (int)header[5];
223   DEBUG2("Handle an incoming message using protocol %d (remote is %s)",
224          (int)header[4],gras_datadesc_arch_name(r_arch));
225
226   TRY(gras_datadesc_recv(sock, string_type, r_arch, &msg_name));
227   errcode = xbt_set_get_by_name(_gras_msgtype_set,
228                                  msg_name,(xbt_set_elm_t*)msgtype);
229   if (errcode != no_error)
230     RAISE2(errcode,
231            "Got error %s while retrieving the type associated to messages '%s'",
232            xbt_error_name(errcode),msg_name);
233   /* FIXME: Survive unknown messages */
234   xbt_free(msg_name);
235
236   *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
237   xbt_assert2(*payload_size > 0,
238                "%s %s",
239                "Dynamic array as payload is forbided for now (FIXME?).",
240                "Reference to dynamic array is allowed.");
241   *payload = xbt_malloc(*payload_size);
242   TRY(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload));
243
244   return no_error;
245 }
246
247 /**
248  * @param timeout: How long should we wait for this message.
249  * @param msgt_want: type of awaited msg
250  * @param[out] expeditor: where to create a socket to answer the incomming message
251  * @param[out] payload: where to write the payload of the incomming message
252  * @return the error code (or no_error).
253  *
254  * Every message of another type received before the one waited will be queued
255  * and used by subsequent call to this function or MsgHandle().
256  */
257 xbt_error_t
258 gras_msg_wait(double           timeout,    
259               gras_msgtype_t   msgt_want,
260               gras_socket_t   *expeditor,
261               void            *payload) {
262
263   gras_msgtype_t msgt_got;
264   void *payload_got;
265   int payload_size_got;
266   xbt_error_t errcode;
267   double start, now;
268   gras_procdata_t *pd=gras_procdata_get();
269   int cpt;
270   gras_msg_t msg;
271   
272   *expeditor = NULL;
273   payload_got = NULL;
274
275   if (!msgt_want)
276     RAISE0(mismatch_error,
277            "Cannot wait for the NULL message (did msgtype_by_name fail?)");
278
279   VERB1("Waiting for message %s",msgt_want->name);
280
281   start = now = gras_os_time();
282
283   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
284     if (msg.type->code == msgt_want->code) {
285       *expeditor = msg.expeditor;
286       memcpy(payload, msg.payload, msg.payload_size);
287       xbt_free(msg.payload);
288       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
289       VERB0("The waited message was queued");
290       return no_error;
291     }
292   }
293
294   while (1) {
295     TRY(gras_trp_select(timeout - now + start, expeditor));
296     TRY(gras_msg_recv(*expeditor, &msgt_got, &payload_got, &payload_size_got));
297     if (msgt_got->code == msgt_want->code) {
298       memcpy(payload, payload_got, payload_size_got);
299       xbt_free(payload_got);
300       VERB0("Got waited message");
301       return no_error;
302     }
303
304     /* not expected msg type. Queue it for later */
305     msg.expeditor = *expeditor;
306     msg.type      =  msgt_got;
307     msg.payload   =  payload;
308     msg.payload_size = payload_size_got;
309     xbt_dynar_push(pd->msg_queue,&msg);
310     
311     now=gras_os_time();
312     if (now - start + 0.001 < timeout) {
313       RAISE1(timeout_error,"Timeout while waiting for msg %s",msgt_want->name);
314     }
315   }
316
317   RAISE_IMPOSSIBLE;
318 }
319
320 /**
321  * @param timeOut: How long to wait for incoming messages
322  * @return the error code (or no_error).
323  *
324  * Waits up to \a timeOut seconds to see if a message comes in; if so, calls the
325  * registered listener for that message (see \ref gras_cb_register()).
326  */
327 xbt_error_t 
328 gras_msg_handle(double timeOut) {
329   
330   xbt_error_t    errcode;
331   int             cpt;
332
333   gras_msg_t      msg;
334   gras_socket_t   expeditor;
335   void           *payload=NULL;
336   int             payload_size;
337   gras_msgtype_t  msgtype;
338
339   gras_procdata_t*pd=gras_procdata_get();
340   gras_cblist_t  *list;
341   gras_cb_t       cb;
342
343
344
345   VERB1("Handling message within the next %.2fs",timeOut);
346   
347   /* get a message (from the queue or from the net) */
348   if (xbt_dynar_length(pd->msg_queue)) {
349     xbt_dynar_shift(pd->msg_queue,&msg);
350     expeditor = msg.expeditor;
351     msgtype   = msg.type;
352     payload   = msg.payload;
353     
354   } else {
355     TRY(gras_trp_select(timeOut, &expeditor));
356     TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
357   }
358       
359   /* handle it */
360   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
361     if (list->id == msgtype->code) {
362       break;
363     } else {
364       list=NULL;
365     }
366   }
367   if (!list) {
368     INFO1("No callback for the incomming '%s' message. Discarded.", 
369           msgtype->name);
370     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
371     return no_error;
372   }
373   
374   xbt_dynar_foreach(list->cbs,cpt,cb) { 
375     INFO3("Use the callback #%d (@%p) for incomming msg %s",
376           cpt+1,cb,msgtype->name);
377     if ((*cb)(expeditor,payload)) {
378       /* cb handled the message */
379       xbt_free(payload);
380       return no_error;
381     }
382   }
383
384   INFO1("Message '%s' refused by all registered callbacks", msgtype->name);
385   WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
386   return mismatch_error;
387 }
388
389 void
390 gras_cbl_free(void *data){
391   gras_cblist_t *list=*(void**)data;
392   if (list) {
393     xbt_dynar_free(&( list->cbs ));
394     xbt_free(list);
395   }
396 }
397
398 void
399 gras_cb_register(gras_msgtype_t msgtype,
400                  gras_cb_t cb) {
401   gras_procdata_t *pd=gras_procdata_get();
402   gras_cblist_t *list=NULL;
403   int cpt;
404
405   DEBUG2("Register %p as callback to %s",cb,msgtype->name);
406
407   /* search the list of cb for this message on this host (creating if NULL) */
408   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
409     if (list->id == msgtype->code) {
410       break;
411     } else {
412       list=NULL;
413     }
414   }
415   if (!list) {
416     /* First cb? Create room */
417     list = xbt_new(gras_cblist_t,1);
418     list->id = msgtype->code;
419     list->cbs = xbt_dynar_new(sizeof(gras_cb_t), NULL);
420     xbt_dynar_push(pd->cbl_list,&list);
421   }
422
423   /* Insert the new one into the set */
424   xbt_dynar_insert_at(list->cbs,0,&cb);
425 }
426
427 void
428 gras_cb_unregister(gras_msgtype_t msgtype,
429                    gras_cb_t cb) {
430
431   gras_procdata_t *pd=gras_procdata_get();
432   gras_cblist_t *list;
433   gras_cb_t cb_cpt;
434   int cpt;
435   int found = 0;
436
437   /* search the list of cb for this message on this host */
438   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
439     if (list->id == msgtype->code) {
440       break;
441     } else {
442       list=NULL;
443     }
444   }
445
446   /* Remove it from the set */
447   if (list) {
448     xbt_dynar_foreach(list->cbs,cpt,cb_cpt) {
449       if (cb == cb_cpt) {
450         xbt_dynar_cursor_rm(list->cbs, &cpt);
451         found = 1;
452       }
453     }
454   }
455   if (!found)
456     VERB1("Ignoring removal of unexisting callback to msg id %d",
457           msgtype->code);
458 }