Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
small improvements suggested by Arnaud
[simgrid.git] / src / gras / Msg / rpc.c
1 /* $Id$ */
2
3 /* rpc - RPC implementation on top of GRAS messages                         */
4
5 /* Copyright (c) 2005 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras/Msg/msg_private.h"
11
12 xbt_set_t _gras_rpctype_set = NULL;
13 xbt_dynar_t _gras_rpc_cancelled = NULL;
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_rpc, gras_msg, "RPC mecanism");
16
17
18 /** @brief declare a new versionned RPC type of the given name and payloads
19  *
20  * @param name: name as it should be used for logging messages (must be uniq)
21  * @param payload_request: datatype of request
22  * @param payload_answer: datatype of answer
23  *
24  * Registers a new RPC message to the GRAS mechanism. RPC are constituted of a pair
25  * of messages.
26  */
27 void
28 gras_msgtype_declare_rpc(const char *name,
29                          gras_datadesc_type_t payload_request,
30                          gras_datadesc_type_t payload_answer)
31 {
32
33   gras_msgtype_declare_ext(name, 0,
34                            e_gras_msg_kind_rpccall,
35                            payload_request, payload_answer);
36
37 }
38
39 /** @brief declare a new versionned RPC type of the given name and payloads
40  *
41  * @param name: name as it should be used for logging messages (must be uniq)
42  * @param version: something like versionning symbol
43  * @param payload_request: datatype of request
44  * @param payload_answer: datatype of answer
45  *
46  * Registers a new RPC message to the GRAS mechanism. RPC are constituted of a pair
47  * of messages.
48  *
49  * Use this version instead of gras_rpctype_declare when you change the
50  * semantic or syntax of a message and want your programs to be able to deal
51  * with both versions. Internally, each will be handled as an independent
52  * message type, so you can register differents for each of them.
53  */
54 void
55 gras_msgtype_declare_rpc_v(const char *name,
56                            short int version,
57                            gras_datadesc_type_t payload_request,
58                            gras_datadesc_type_t payload_answer)
59 {
60
61   gras_msgtype_declare_ext(name, version,
62                            e_gras_msg_kind_rpccall,
63                            payload_request, payload_answer);
64
65 }
66
67 static unsigned long int last_msg_ID = 0;
68
69 static int msgfilter_rpcID(gras_msg_t msg, void *ctx)
70 {
71   unsigned long int ID = *(unsigned long int *) ctx;
72   int res = msg->ID == ID &&
73     (msg->kind == e_gras_msg_kind_rpcanswer
74      || msg->kind == e_gras_msg_kind_rpcerror);
75   unsigned int cursor;
76   gras_msg_cb_ctx_t rpc_ctx;
77
78
79   DEBUG5
80     ("Filter a message of ID %lu, type '%s' and kind '%s'. Waiting for ID=%lu. %s",
81      msg->ID, msg->type->name, e_gras_msg_kind_names[msg->kind], ID,
82      res ? "take it" : "reject");
83
84   if (res && !_gras_rpc_cancelled)
85     return res;
86
87   /* Check whether it is an old answer to a message we already canceled */
88   xbt_dynar_foreach(_gras_rpc_cancelled, cursor, rpc_ctx) {
89     if (msg->ID == rpc_ctx->ID && msg->kind == e_gras_msg_kind_rpcanswer) {
90       VERB1
91         ("Got an answer to the already canceled (timeouted?) RPC %ld. Ignore it (leaking the payload!).",
92          msg->ID);
93       xbt_dynar_cursor_rm(_gras_rpc_cancelled, &cursor);
94       return 1;
95     }
96   }
97
98   return res;
99 }
100
101 /* Mallocator cruft */
102 xbt_mallocator_t gras_msg_ctx_mallocator = NULL;
103 void *gras_msg_ctx_mallocator_new_f(void)
104 {
105   return xbt_new0(s_gras_msg_cb_ctx_t, 1);
106 }
107
108 void gras_msg_ctx_mallocator_free_f(void *ctx)
109 {
110   xbt_free(ctx);
111 }
112
113 void gras_msg_ctx_mallocator_reset_f(void *ctx)
114 {
115   memset(ctx, sizeof(s_gras_msg_cb_ctx_t), 0);
116 }
117
118 /** @brief Launch a RPC call, but do not block for the answer */
119 gras_msg_cb_ctx_t
120 gras_msg_rpc_async_call_(gras_socket_t server,
121                          double timeOut,
122                          gras_msgtype_t msgtype, void *request)
123 {
124   gras_msg_cb_ctx_t ctx = xbt_mallocator_get(gras_msg_ctx_mallocator);
125
126   if (msgtype->ctn_type) {
127     xbt_assert1(request,
128                 "RPC type '%s' convey a payload you must provide",
129                 msgtype->name);
130   } else {
131     xbt_assert1(!request,
132                 "No payload was declared for RPC type '%s'", msgtype->name);
133   }
134
135   ctx->ID = last_msg_ID++;
136   ctx->expeditor = server;
137   ctx->msgtype = msgtype;
138   ctx->timeout = timeOut;
139
140   VERB4("Send to %s:%d a RPC of type '%s' (ID=%lu)",
141         gras_socket_peer_name(server),
142         gras_socket_peer_port(server), msgtype->name, ctx->ID);
143
144   gras_msg_send_ext(server, e_gras_msg_kind_rpccall, ctx->ID, msgtype,
145                     request);
146
147   return ctx;
148 }
149
150 /** @brief Wait the answer of a RPC call previously launched asynchronously */
151 void gras_msg_rpc_async_wait(gras_msg_cb_ctx_t ctx, void *answer)
152 {
153   xbt_ex_t e;
154   s_gras_msg_t received;
155
156   if (ctx->msgtype->answer_type) {
157     xbt_assert1(answer,
158                 "Answers to RPC '%s' convey a payload you must accept",
159                 ctx->msgtype->name);
160   } else {
161     xbt_assert1(!answer,
162                 "No payload was declared for answers to RPC '%s'",
163                 ctx->msgtype->name);
164   }
165
166   TRY {
167     /* The filter returns 1 when we eat an old RPC answer to something canceled */
168     do {
169       gras_msg_wait_ext_(ctx->timeout,
170                          ctx->msgtype, NULL, msgfilter_rpcID, &ctx->ID,
171                          &received);
172     } while (received.ID != ctx->ID);
173
174   }
175   CATCH(e) {
176     if (!_gras_rpc_cancelled)
177       _gras_rpc_cancelled = xbt_dynar_new(sizeof(ctx), NULL);
178     xbt_dynar_push(_gras_rpc_cancelled, &ctx);
179     INFO5("canceled RPC %ld pushed onto the stack (%s from %s:%d) Reason: %s",
180           ctx->ID, ctx->msgtype->name,
181           gras_socket_peer_name(ctx->expeditor),
182           gras_socket_peer_port(ctx->expeditor), e.msg);
183     RETHROW;
184   }
185
186   xbt_mallocator_release(gras_msg_ctx_mallocator, ctx);
187   if (received.kind == e_gras_msg_kind_rpcerror) {
188     xbt_ex_t e;
189     memcpy(&e, received.payl, received.payl_size);
190     free(received.payl);
191     VERB3("Raise a remote exception cat:%d comming from %s (%s)",
192           e.category, e.host, e.msg);
193     __xbt_ex_ctx()->ctx_ex.msg = e.msg;
194     __xbt_ex_ctx()->ctx_ex.category = e.category;
195     __xbt_ex_ctx()->ctx_ex.value = e.value;
196     __xbt_ex_ctx()->ctx_ex.remote = 1;
197     __xbt_ex_ctx()->ctx_ex.host = e.host;
198     __xbt_ex_ctx()->ctx_ex.procname = e.procname;
199     __xbt_ex_ctx()->ctx_ex.pid = e.pid;
200     __xbt_ex_ctx()->ctx_ex.file = e.file;
201     __xbt_ex_ctx()->ctx_ex.line = e.line;
202     __xbt_ex_ctx()->ctx_ex.func = e.func;
203     __xbt_ex_ctx()->ctx_ex.used = e.used;
204     __xbt_ex_ctx()->ctx_ex.bt_strings = e.bt_strings;
205     memset(&__xbt_ex_ctx()->ctx_ex.bt, 0, sizeof(__xbt_ex_ctx()->ctx_ex.bt));
206     DO_THROW(__xbt_ex_ctx()->ctx_ex);
207   }
208   memcpy(answer, received.payl, received.payl_size);
209   free(received.payl);
210 }
211
212 /** @brief Conduct a RPC call */
213 void gras_msg_rpccall_(gras_socket_t server,
214                        double timeout,
215                        gras_msgtype_t msgtype, void *request, void *answer)
216 {
217
218   gras_msg_cb_ctx_t ctx;
219
220   ctx = gras_msg_rpc_async_call_(server, timeout, msgtype, request);
221   gras_msg_rpc_async_wait(ctx, answer);
222 }
223
224
225 /** @brief Return the result of a RPC call
226  *
227  * It done before the actual return of the callback so that the callback can do
228  * some cleanups before leaving.
229  */
230
231 void gras_msg_rpcreturn(double timeOut, gras_msg_cb_ctx_t ctx, void *answer)
232 {
233   xbt_assert0(ctx->answer_due,
234               "RPC return not allowed here. Either not a RPC message or already returned a result");
235   ctx->answer_due = 0;
236   DEBUG5("Return to RPC '%s' from %s:%d (tOut=%f, payl=%p)",
237          ctx->msgtype->name,
238          gras_socket_peer_name(ctx->expeditor),
239          gras_socket_peer_port(ctx->expeditor), timeOut, answer);
240   gras_msg_send_ext(ctx->expeditor, e_gras_msg_kind_rpcanswer, ctx->ID,
241                     ctx->msgtype, answer);
242 }