return res;
}
+/* Mallocator cruft */
+xbt_mallocator_t gras_msg_ctx_mallocator = NULL;
+void* gras_msg_ctx_mallocator_new_f(void) {
+ return xbt_new0(s_gras_msg_cb_ctx_t,1);
+}
+void gras_msg_ctx_mallocator_free_f(void* ctx) {
+ xbt_free(ctx);
+}
+void gras_msg_ctx_mallocator_reset_f(void* ctx) {
+ memset(ctx, sizeof(s_gras_msg_cb_ctx_t),0);
+}
+
/** @brief Launch a RPC call, but do not block for the answer */
gras_msg_cb_ctx_t
-gras_msg_rpc_async_call(gras_socket_t server,
- double timeOut,
- gras_msgtype_t msgtype,
- void *request) {
- gras_msg_cb_ctx_t ctx = xbt_new0(s_gras_msg_cb_ctx_t,1);
+gras_msg_rpc_async_call_(gras_socket_t server,
+ double timeOut,
+ gras_msgtype_t msgtype,
+ void *request) {
+ gras_msg_cb_ctx_t ctx = xbt_mallocator_get(gras_msg_ctx_mallocator);
if (msgtype->ctn_type) {
xbt_assert1(request,
return ctx;
}
-/** @brief Wait teh answer of a RPC call previously launched asynchronously */
+/** @brief Wait the answer of a RPC call previously launched asynchronously */
void gras_msg_rpc_async_wait(gras_msg_cb_ctx_t ctx,
void *answer) {
xbt_ex_t e;
TRY {
/* The filter returns 1 when we eat an old RPC answer to something canceled */
do {
- gras_msg_wait_ext(ctx->timeout,
- ctx->msgtype, NULL, msgfilter_rpcID, &ctx->ID,
- &received);
+ gras_msg_wait_ext_(ctx->timeout,
+ ctx->msgtype, NULL, msgfilter_rpcID, &ctx->ID,
+ &received);
} while (received.ID != ctx->ID);
} CATCH(e) {
RETHROW;
}
- free(ctx);
+ xbt_mallocator_release(gras_msg_ctx_mallocator, ctx);
if (received.kind == e_gras_msg_kind_rpcerror) {
xbt_ex_t e;
memcpy(&e,received.payl,received.payl_size);
}
/** @brief Conduct a RPC call */
-void gras_msg_rpccall(gras_socket_t server,
- double timeout,
- gras_msgtype_t msgtype,
- void *request, void *answer) {
+void gras_msg_rpccall_(gras_socket_t server,
+ double timeout,
+ gras_msgtype_t msgtype,
+ void *request, void *answer) {
gras_msg_cb_ctx_t ctx;
- ctx= gras_msg_rpc_async_call(server, timeout,msgtype,request);
+ ctx = gras_msg_rpc_async_call_(server, timeout,msgtype,request);
gras_msg_rpc_async_wait(ctx, answer);
}
*/
void gras_msg_rpcreturn(double timeOut,gras_msg_cb_ctx_t ctx,void *answer) {
+ xbt_assert0(ctx->answer_due,
+ "RPC return not allowed here. Either not a RPC message or already returned a result");
+ ctx->answer_due = 0;
DEBUG5("Return to RPC '%s' from %s:%d (tOut=%f, payl=%p)",
ctx->msgtype->name,
gras_socket_peer_name(ctx->expeditor),gras_socket_peer_port(ctx->expeditor),