xbt_dynar_free(&( res->msg_queue ));
xbt_dynar_free(&( res->cbl_list ));
xbt_dynar_free(&( res->timers ));
+
+ free(res);
}
/*
DEBUG3("send '%s' to %s:%d", msgtype->name,
gras_socket_peer_name(sock),gras_socket_peer_port(sock));
- TRY(gras_trp_chunk_send(sock, GRAS_header, 6));
+ TRYOLD(gras_trp_chunk_send(sock, GRAS_header, 6));
- TRY(gras_datadesc_send(sock, string_type, &msgtype->name));
- TRY(gras_datadesc_send(sock, msgtype->ctn_type, payload));
- TRY(gras_trp_flush(sock));
+ TRYOLD(gras_datadesc_send(sock, string_type, &msgtype->name));
+ if (msgtype->ctn_type)
+ TRYOLD(gras_datadesc_send(sock, msgtype->ctn_type, payload));
+ TRYOLD(gras_trp_flush(sock));
return no_error;
}
int r_arch;
char *msg_name=NULL;
+ xbt_assert1(!gras_socket_is_meas(sock),
+ "Asked to receive a message on the measurement socket %p", sock);
if (!string_type) {
string_type=gras_datadesc_by_name("string");
xbt_assert(string_type);
}
- TRY(gras_trp_chunk_recv(sock, header, 6));
+ errcode=gras_trp_chunk_recv(sock, header, 6);
+ if (errcode!=no_error)
+ RAISE2(errcode,"Got '%s' while trying to get the mesage header on socket %p",
+ xbt_error_name(errcode),sock);
for (cpt=0; cpt<4; cpt++)
if (header[cpt] != GRAS_header[cpt])
RAISE2(mismatch_error,"Incoming bytes do not look like a GRAS message (header='%.4s' not '%.4s')",header,GRAS_header);
DEBUG2("Handle an incoming message using protocol %d (remote is %s)",
(int)header[4],gras_datadesc_arch_name(r_arch));
- TRY(gras_datadesc_recv(sock, string_type, r_arch, &msg_name));
+ TRYOLD(gras_datadesc_recv(sock, string_type, r_arch, &msg_name));
errcode = xbt_set_get_by_name(_gras_msgtype_set,
msg_name,(xbt_set_elm_t*)msgtype);
if (errcode != no_error)
/* FIXME: Survive unknown messages */
free(msg_name);
- *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
- xbt_assert2(*payload_size > 0,
- "%s %s",
- "Dynamic array as payload is forbided for now (FIXME?).",
- "Reference to dynamic array is allowed.");
- *payload = xbt_malloc(*payload_size);
- TRY(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload));
-
+ if ((*msgtype)->ctn_type) {
+ *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
+ xbt_assert2(*payload_size > 0,
+ "%s %s",
+ "Dynamic array as payload is forbided for now (FIXME?).",
+ "Reference to dynamic array is allowed.");
+ *payload = xbt_malloc(*payload_size);
+ TRYOLD(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload));
+ } else {
+ *payload = NULL;
+ *payload_size = 0;
+ }
return no_error;
}
gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
int cpt;
s_gras_msg_t msg;
+ gras_socket_t expeditor_res = NULL;
- *expeditor = NULL;
payload_got = NULL;
if (!msgt_want)
RAISE0(mismatch_error,
"Cannot wait for the NULL message (did msgtype_by_name fail?)");
- VERB1("Waiting for message %s",msgt_want->name);
+ VERB1("Waiting for message '%s'",msgt_want->name);
start = now = gras_os_time();
xbt_dynar_foreach(pd->msg_queue,cpt,msg){
if (msg.type->code == msgt_want->code) {
- *expeditor = msg.expeditor;
+ if (expeditor)
+ *expeditor = msg.expeditor;
memcpy(payload, msg.payload, msg.payload_size);
free(msg.payload);
xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
}
while (1) {
- TRY(gras_trp_select(timeout - now + start, expeditor));
- TRY(gras_msg_recv(*expeditor, &msgt_got, &payload_got, &payload_size_got));
+ TRYOLD(gras_trp_select(timeout - now + start, &expeditor_res));
+ TRYOLD(gras_msg_recv(expeditor_res, &msgt_got, &payload_got, &payload_size_got));
if (msgt_got->code == msgt_want->code) {
+ if (expeditor)
+ *expeditor=expeditor_res;
memcpy(payload, payload_got, payload_size_got);
free(payload_got);
VERB0("Got waited message");
}
/* not expected msg type. Queue it for later */
- msg.expeditor = *expeditor;
- msg.type = msgt_got;
- msg.payload = payload;
+ msg.expeditor = expeditor_res;
+ msg.type = msgt_got;
+ msg.payload = payload;
msg.payload_size = payload_size_got;
xbt_dynar_push(pd->msg_queue,&msg);
errcode = gras_trp_select(timeOut, &expeditor);
if (errcode != no_error && errcode != timeout_error)
return errcode;
- if (errcode != timeout_error)
- TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+ if (errcode == no_error) {
+ errcode = gras_msg_recv(expeditor, &msgtype, &payload, &payload_size);
+ if (errcode != no_error)
+ RAISE2(errcode, "Error '%s' while receiving a message on select()ed socket %p",
+ xbt_error_name(errcode),expeditor);
+ }
}
if (errcode == timeout_error ) {
return no_error;
} else {
xbt_assert1(untiltimer>0, "Negative timer (%f). I'm puzzeled", untiltimer);
- ERROR1("No timer elapsed, in contrary to expectations (next in %f sec)",
+ WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
untiltimer);
return timeout_error;
}
gras_cblist_t *list=NULL;
int cpt;
- DEBUG2("Register %p as callback to %s",cb,msgtype->name);
+ DEBUG2("Register %p as callback to '%s'",cb,msgtype->name);
/* search the list of cb for this message on this host (creating if NULL) */
xbt_dynar_foreach(pd->cbl_list,cpt,list) {