Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use xbt_ex_setup_backtrace to actually setup the backtrace to avoid dupplicating...
[simgrid.git] / src / gras / Msg / rpc.c
1 /* $Id$ */
2
3 /* rpc - RPC implementation on top of GRAS messages                         */
4
5 /* Copyright (c) 2005 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras/Msg/msg_private.h"
11                 
12 //XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_rpc,gras_msg,"RPCing");
13
14 xbt_set_t _gras_rpctype_set = NULL;
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_rpc,gras_msg,"RPC mecanism");
17
18
19 /** @brief declare a new versionned RPC type of the given name and payloads
20  *
21  * @param name: name as it should be used for logging messages (must be uniq)
22  * @param payload_request: datatype of request
23  * @param payload_answer: datatype of answer
24  *
25  * Registers a new RPC message to the GRAS mechanism. RPC are constituted of a pair 
26  * of messages. 
27  */
28 void
29 gras_msgtype_declare_rpc(const char           *name,
30                          gras_datadesc_type_t  payload_request,
31                          gras_datadesc_type_t  payload_answer) {
32
33   gras_msgtype_declare_ext(name, 0, 
34                            e_gras_msg_kind_rpccall, 
35                            payload_request, payload_answer);
36
37 }
38
39 /** @brief declare a new versionned RPC type of the given name and payloads
40  *
41  * @param name: name as it should be used for logging messages (must be uniq)
42  * @param version: something like versionning symbol
43  * @param payload_request: datatype of request
44  * @param payload_answer: datatype of answer
45  *
46  * Registers a new RPC message to the GRAS mechanism. RPC are constituted of a pair 
47  * of messages. 
48  *
49  * Use this version instead of gras_rpctype_declare when you change the
50  * semantic or syntax of a message and want your programs to be able to deal
51  * with both versions. Internally, each will be handled as an independent
52  * message type, so you can register differents for each of them.
53  */
54 void
55 gras_msgtype_declare_rpc_v(const char           *name,
56                            short int             version,
57                            gras_datadesc_type_t  payload_request,
58                            gras_datadesc_type_t  payload_answer) {
59
60   gras_msgtype_declare_ext(name, version, 
61                            e_gras_msg_kind_rpccall, 
62                            payload_request, payload_answer);
63
64 }
65
66 static unsigned long int last_msg_ID = 0;
67
68 static int msgfilter_rpcID(gras_msg_t msg, void* ctx) {
69   unsigned long int ID= *(unsigned long int*)ctx;
70   int res = msg->ID == ID && 
71     (msg->kind == e_gras_msg_kind_rpcanswer || msg->kind == e_gras_msg_kind_rpcerror);
72
73   DEBUG5("Filter a message of ID %lu, type '%s' and kind '%s'. Waiting for ID=%lu. %s",
74          msg->ID,msg->type->name,e_gras_msg_kind_names[msg->kind],ID,
75          res?"take it": "reject");
76   return res;
77 }
78
79 /** @brief Launch a RPC call, but do not block for the answer */
80 gras_msg_cb_ctx_t 
81 gras_msg_rpc_async_call(gras_socket_t server,
82                         double timeOut,
83                         gras_msgtype_t msgtype,
84                         void *request) {
85   gras_msg_cb_ctx_t ctx = xbt_new0(s_gras_msg_cb_ctx_t,1);
86
87   ctx->ID = last_msg_ID++;
88   ctx->expeditor = server;
89   ctx->msgtype=msgtype;
90   ctx->timeout=timeOut;
91
92   VERB5("Send to %s:%d a RPC of type '%s' (ID=%lu) (exception%s caught)",
93         gras_socket_peer_name(server),
94         gras_socket_peer_port(server),
95         msgtype->name,ctx->ID,
96         (__xbt_ex_ctx()->ctx_caught?"":" not"));
97
98   gras_msg_send_ext(server, e_gras_msg_kind_rpccall, ctx->ID, msgtype, request);
99
100   return ctx;
101 }
102
103 /** @brief Wait teh answer of a RPC call previously launched asynchronously */
104 void gras_msg_rpc_async_wait(gras_msg_cb_ctx_t ctx,
105                              void *answer) {
106   s_gras_msg_t received;
107
108   gras_msg_wait_ext(ctx->timeout,
109                     ctx->msgtype, NULL, msgfilter_rpcID, &ctx->ID,
110                     &received);
111   free(ctx);
112   if (received.kind == e_gras_msg_kind_rpcerror) {
113     /* Damn. Got an exception. Extract it and revive it */
114     xbt_ex_t e;
115     memcpy(&e,received.payl,received.payl_size);
116     free(received.payl);
117     VERB3("Raise a remote exception cat:%d comming from %s %s",
118           e.category, e.host,
119           (__xbt_ex_ctx()->ctx_caught?"caught":"not caught"));
120      __xbt_ex_ctx()->ctx_ex.msg      = e.msg;
121      __xbt_ex_ctx()->ctx_ex.category = e.category;
122      __xbt_ex_ctx()->ctx_ex.value    = e.value;
123      __xbt_ex_ctx()->ctx_ex.remote   = 1;
124      __xbt_ex_ctx()->ctx_ex.host     = e.host;
125      __xbt_ex_ctx()->ctx_ex.procname = e.procname;
126      __xbt_ex_ctx()->ctx_ex.file     = e.file;
127      __xbt_ex_ctx()->ctx_ex.line     = e.line;
128      __xbt_ex_ctx()->ctx_ex.func     = e.func;
129      __xbt_ex_ctx()->ctx_ex.used     = e.used;
130      __xbt_ex_ctx()->ctx_ex.bt_strings = e.bt_strings;
131     DO_THROW(__xbt_ex_ctx()->ctx_ex);
132   }
133   memcpy(answer,received.payl,received.payl_size);
134   free(received.payl);
135 }
136
137 /** @brief Conduct a RPC call */
138 void gras_msg_rpccall(gras_socket_t server,
139                       double timeout,
140                       gras_msgtype_t msgtype,
141                       void *request, void *answer) {
142
143   gras_msg_cb_ctx_t ctx;
144
145   ctx= gras_msg_rpc_async_call(server, timeout,msgtype,request);
146   gras_msg_rpc_async_wait(ctx, answer);
147 }
148
149
150 /** @brief Return the result of a RPC call
151  *
152  * It done before the actual return of the callback so that the callback can do
153  * some cleanups before leaving.
154  */
155
156 void gras_msg_rpcreturn(double timeOut,gras_msg_cb_ctx_t ctx,void *answer) {
157   gras_msg_send_ext(ctx->expeditor, e_gras_msg_kind_rpcanswer, 
158                     ctx->ID, ctx->msgtype, answer);
159 }
160