3 /* messaging - Function related to messaging (code shared between RL and SG)*/
5 /* Authors: Martin Quinson */
6 /* Copyright (C) 2003 the OURAGAN project. */
8 /* This program is free software; you can redistribute it and/or modify it
9 under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "gras/Msg/msg_private.h"
12 #include "gras/DataDesc/datadesc_interface.h"
13 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
14 #include "gras/Virtu/virtu_interface.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg,gras,"High level messaging");
18 xbt_set_t _gras_msgtype_set = NULL;
19 static char GRAS_header[6];
20 static char *make_namev(const char *name, short int ver);
25 * Initialize this submodule.
27 void gras_msg_init(void) {
30 /* only initialize once */
31 if (_gras_msgtype_set != NULL)
34 VERB0("Initializing Msg");
36 _gras_msgtype_set = xbt_set_new();
38 memcpy(GRAS_header,"GRAS", 4);
39 GRAS_header[4]=GRAS_PROTOCOL_VERSION;
40 GRAS_header[5]=(char)GRAS_THISARCH;
46 * Finalize the msg module
51 xbt_set_free(&_gras_msgtype_set);
59 void gras_msgtype_free(void *t) {
60 gras_msgtype_t msgtype=(gras_msgtype_t)t;
62 xbt_free(msgtype->name);
70 * Returns the versionned name of the message. If the version is 0, that's
71 * the name unchanged. Pay attention to this before free'ing the result.
73 static char *make_namev(const char *name, short int ver) {
79 namev = (char*)xbt_malloc(strlen(name)+2+3+1);
82 sprintf(namev,"%s_v%d",name,ver);
88 * gras_msgtype_declare:
89 * @name: name as it should be used for logging messages (must be uniq)
90 * @payload: datadescription of the payload
92 * Registers a message to the GRAS mecanism.
94 void gras_msgtype_declare(const char *name,
95 gras_datadesc_type_t payload) {
96 gras_msgtype_declare_v(name, 0, payload);
100 * gras_msgtype_declare_v:
101 * @name: name as it should be used for logging messages (must be uniq)
102 * @version: something like versionning symbol
103 * @payload: datadescription of the payload
105 * Registers a message to the GRAS mecanism. Use this version instead of
106 * gras_msgtype_declare when you change the semantic or syntax of a message and
107 * want your programs to be able to deal with both versions. Internally, each
108 * will be handled as an independent message type, so you can register
109 * differents for each of them.
112 gras_msgtype_declare_v(const char *name,
114 gras_datadesc_type_t payload) {
117 gras_msgtype_t msgtype;
118 char *namev=make_namev(name,version);
120 errcode = xbt_set_get_by_name(_gras_msgtype_set,
121 namev,(xbt_set_elm_t*)&msgtype);
123 if (errcode == no_error) {
124 VERB2("Re-register version %d of message '%s' (same payload, ignored).",
126 xbt_assert3(!gras_datadesc_type_cmp(msgtype->ctn_type, payload),
127 "Message %s re-registred with another payload (%s was %s)",
128 namev,gras_datadesc_get_name(payload),
129 gras_datadesc_get_name(msgtype->ctn_type));
131 return ; /* do really ignore it */
134 xbt_assert_error(mismatch_error); /* expect this error */
135 VERB3("Register version %d of message '%s' (payload: %s).",
136 version, name, gras_datadesc_get_name(payload));
138 msgtype = xbt_new(s_gras_msgtype_t,1);
139 msgtype->name = (namev == name ? strdup(name) : namev);
140 msgtype->name_len = strlen(namev);
141 msgtype->version = version;
142 msgtype->ctn_type = payload;
144 xbt_set_add(_gras_msgtype_set, (xbt_set_elm_t)msgtype,
149 * gras_msgtype_by_name:
151 * Retrieve a datatype description from its name
153 gras_msgtype_t gras_msgtype_by_name (const char *name) {
154 return gras_msgtype_by_namev(name,0);
157 * gras_msgtype_by_namev:
159 * Retrieve a datatype description from its name and version
161 gras_msgtype_t gras_msgtype_by_namev(const char *name,
166 char *namev = make_namev(name,version);
168 errcode = xbt_set_get_by_name(_gras_msgtype_set, namev,
169 (xbt_set_elm_t*)&res);
170 if (errcode != no_error)
173 WARN1("msgtype_by_name(%s) returns NULL",namev);
183 * Send the given message on the given socket
186 gras_msg_send(gras_socket_t sock,
187 gras_msgtype_t msgtype,
191 static gras_datadesc_type_t string_type=NULL;
194 RAISE0(mismatch_error,
195 "Cannot send the NULL message (did msgtype_by_name fail?)");
198 string_type = gras_datadesc_by_name("string");
199 xbt_assert(string_type);
202 DEBUG3("send '%s' to %s:%d", msgtype->name,
203 gras_socket_peer_name(sock),gras_socket_peer_port(sock));
204 TRY(gras_trp_chunk_send(sock, GRAS_header, 6));
206 TRY(gras_datadesc_send(sock, string_type, &msgtype->name));
207 TRY(gras_datadesc_send(sock, msgtype->ctn_type, payload));
208 TRY(gras_trp_flush(sock));
215 * receive the next message on the given socket.
218 gras_msg_recv(gras_socket_t sock,
219 gras_msgtype_t *msgtype,
224 static gras_datadesc_type_t string_type=NULL;
231 string_type=gras_datadesc_by_name("string");
232 xbt_assert(string_type);
235 TRY(gras_trp_chunk_recv(sock, header, 6));
236 for (cpt=0; cpt<4; cpt++)
237 if (header[cpt] != GRAS_header[cpt])
238 RAISE0(mismatch_error,"Incoming bytes do not look like a GRAS message");
239 if (header[4] != GRAS_header[4])
240 RAISE2(mismatch_error,"GRAS protocol mismatch (got %d, use %d)",
241 (int)header[4], (int)GRAS_header[4]);
242 r_arch = (int)header[5];
243 DEBUG2("Handle an incoming message using protocol %d (remote is %s)",
244 (int)header[4],gras_datadesc_arch_name(r_arch));
246 TRY(gras_datadesc_recv(sock, string_type, r_arch, &msg_name));
247 errcode = xbt_set_get_by_name(_gras_msgtype_set,
248 msg_name,(xbt_set_elm_t*)msgtype);
249 if (errcode != no_error)
251 "Got error %s while retrieving the type associated to messages '%s'",
252 xbt_error_name(errcode),msg_name);
253 /* FIXME: Survive unknown messages */
256 *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
257 xbt_assert2(*payload_size > 0,
259 "Dynamic array as payload is forbided for now (FIXME?).",
260 "Reference to dynamic array is allowed.");
261 *payload = xbt_malloc(*payload_size);
262 TRY(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload));
269 * @timeout: How long should we wait for this message.
270 * @id: id of awaited msg
271 * @Returns: the error code (or no_error).
273 * Waits for a message to come in over a given socket.
275 * Every message of another type received before the one waited will be queued
276 * and used by subsequent call to this function or MsgHandle().
279 gras_msg_wait(double timeout,
280 gras_msgtype_t msgt_want,
281 gras_socket_t *expeditor,
284 gras_msgtype_t msgt_got;
286 int payload_size_got;
289 gras_procdata_t *pd=gras_procdata_get();
297 RAISE0(mismatch_error,
298 "Cannot wait for the NULL message (did msgtype_by_name fail?)");
300 VERB1("Waiting for message %s",msgt_want->name);
302 start = now = gras_os_time();
304 xbt_dynar_foreach(pd->msg_queue,cpt,msg){
305 if (msg.type->code == msgt_want->code) {
306 *expeditor = msg.expeditor;
307 memcpy(payload, msg.payload, msg.payload_size);
308 xbt_free(msg.payload);
309 xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
310 VERB0("The waited message was queued");
316 TRY(gras_trp_select(timeout - now + start, expeditor));
317 TRY(gras_msg_recv(*expeditor, &msgt_got, &payload_got, &payload_size_got));
318 if (msgt_got->code == msgt_want->code) {
319 memcpy(payload, payload_got, payload_size_got);
320 xbt_free(payload_got);
321 VERB0("Got waited message");
325 /* not expected msg type. Queue it for later */
326 msg.expeditor = *expeditor;
328 msg.payload = payload;
329 msg.payload_size = payload_size_got;
330 xbt_dynar_push(pd->msg_queue,&msg);
333 if (now - start + 0.001 < timeout) {
334 RAISE1(timeout_error,"Timeout while waiting for msg %s",msgt_want->name);
343 * @timeOut: How long to wait for incoming messages
344 * @Returns: the error code (or no_error).
346 * Waits up to #timeOut# seconds to see if a message comes in; if so, calls the
347 * registered listener for that message (see RegisterCallback()).
350 gras_msg_handle(double timeOut) {
356 gras_socket_t expeditor;
359 gras_msgtype_t msgtype;
361 gras_procdata_t*pd=gras_procdata_get();
367 VERB1("Handling message within the next %.2fs",timeOut);
369 /* get a message (from the queue or from the net) */
370 if (xbt_dynar_length(pd->msg_queue)) {
371 xbt_dynar_shift(pd->msg_queue,&msg);
372 expeditor = msg.expeditor;
374 payload = msg.payload;
377 TRY(gras_trp_select(timeOut, &expeditor));
378 TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
382 xbt_dynar_foreach(pd->cbl_list,cpt,list) {
383 if (list->id == msgtype->code) {
390 INFO1("No callback for the incomming '%s' message. Discarded.",
392 WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
396 xbt_dynar_foreach(list->cbs,cpt,cb) {
397 INFO3("Use the callback #%d (@%p) for incomming msg %s",
398 cpt+1,cb,msgtype->name);
399 if ((*cb)(expeditor,payload)) {
400 /* cb handled the message */
406 INFO1("Message '%s' refused by all registered callbacks", msgtype->name);
407 WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
408 return mismatch_error;
412 gras_cbl_free(void *data){
413 gras_cblist_t *list=*(void**)data;
415 xbt_dynar_free(&( list->cbs ));
421 gras_cb_register(gras_msgtype_t msgtype,
423 gras_procdata_t *pd=gras_procdata_get();
424 gras_cblist_t *list=NULL;
427 DEBUG2("Register %p as callback to %s",cb,msgtype->name);
429 /* search the list of cb for this message on this host (creating if NULL) */
430 xbt_dynar_foreach(pd->cbl_list,cpt,list) {
431 if (list->id == msgtype->code) {
438 /* First cb? Create room */
439 list = xbt_new(gras_cblist_t,1);
440 list->id = msgtype->code;
441 list->cbs = xbt_dynar_new(sizeof(gras_cb_t), NULL);
442 xbt_dynar_push(pd->cbl_list,&list);
445 /* Insert the new one into the set */
446 xbt_dynar_insert_at(list->cbs,0,&cb);
450 gras_cb_unregister(gras_msgtype_t msgtype,
453 gras_procdata_t *pd=gras_procdata_get();
459 /* search the list of cb for this message on this host */
460 xbt_dynar_foreach(pd->cbl_list,cpt,list) {
461 if (list->id == msgtype->code) {
468 /* Remove it from the set */
470 xbt_dynar_foreach(list->cbs,cpt,cb_cpt) {
472 xbt_dynar_cursor_rm(list->cbs, &cpt);
478 VERB1("Ignoring removal of unexisting callback to msg id %d",