Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Let's shild the user against himself: check that he provided a payload variable when...
[simgrid.git] / src / gras / Msg / rpc.c
1 /* $Id$ */
2
3 /* rpc - RPC implementation on top of GRAS messages                         */
4
5 /* Copyright (c) 2005 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras/Msg/msg_private.h"
11                 
12 xbt_set_t _gras_rpctype_set = NULL;
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_rpc,gras_msg,"RPC mecanism");
15
16
17 /** @brief declare a new versionned RPC type of the given name and payloads
18  *
19  * @param name: name as it should be used for logging messages (must be uniq)
20  * @param payload_request: datatype of request
21  * @param payload_answer: datatype of answer
22  *
23  * Registers a new RPC message to the GRAS mechanism. RPC are constituted of a pair 
24  * of messages. 
25  */
26 void
27 gras_msgtype_declare_rpc(const char           *name,
28                          gras_datadesc_type_t  payload_request,
29                          gras_datadesc_type_t  payload_answer) {
30
31   gras_msgtype_declare_ext(name, 0, 
32                            e_gras_msg_kind_rpccall, 
33                            payload_request, payload_answer);
34
35 }
36
37 /** @brief declare a new versionned RPC type of the given name and payloads
38  *
39  * @param name: name as it should be used for logging messages (must be uniq)
40  * @param version: something like versionning symbol
41  * @param payload_request: datatype of request
42  * @param payload_answer: datatype of answer
43  *
44  * Registers a new RPC message to the GRAS mechanism. RPC are constituted of a pair 
45  * of messages. 
46  *
47  * Use this version instead of gras_rpctype_declare when you change the
48  * semantic or syntax of a message and want your programs to be able to deal
49  * with both versions. Internally, each will be handled as an independent
50  * message type, so you can register differents for each of them.
51  */
52 void
53 gras_msgtype_declare_rpc_v(const char           *name,
54                            short int             version,
55                            gras_datadesc_type_t  payload_request,
56                            gras_datadesc_type_t  payload_answer) {
57
58   gras_msgtype_declare_ext(name, version, 
59                            e_gras_msg_kind_rpccall, 
60                            payload_request, payload_answer);
61
62 }
63
64 static unsigned long int last_msg_ID = 0;
65
66 static int msgfilter_rpcID(gras_msg_t msg, void* ctx) {
67   unsigned long int ID= *(unsigned long int*)ctx;
68   int res = msg->ID == ID && 
69     (msg->kind == e_gras_msg_kind_rpcanswer || msg->kind == e_gras_msg_kind_rpcerror);
70
71   DEBUG5("Filter a message of ID %lu, type '%s' and kind '%s'. Waiting for ID=%lu. %s",
72          msg->ID,msg->type->name,e_gras_msg_kind_names[msg->kind],ID,
73          res?"take it": "reject");
74   return res;
75 }
76
77 /** @brief Launch a RPC call, but do not block for the answer */
78 gras_msg_cb_ctx_t 
79 gras_msg_rpc_async_call(gras_socket_t server,
80                         double timeOut,
81                         gras_msgtype_t msgtype,
82                         void *request) {
83   gras_msg_cb_ctx_t ctx = xbt_new0(s_gras_msg_cb_ctx_t,1);
84
85   if (msgtype->ctn_type) {
86     xbt_assert1(request,
87                 "RPC type '%s' convey a payload you must provide",
88                 msgtype->name);
89   } else {
90     xbt_assert1(!request,
91                 "No payload was declared for RPC type '%s'",
92                 msgtype->name);
93   }
94
95   ctx->ID = last_msg_ID++;
96   ctx->expeditor = server;
97   ctx->msgtype=msgtype;
98   ctx->timeout=timeOut;
99
100   VERB5("Send to %s:%d a RPC of type '%s' (ID=%lu) (exception%s caught)",
101         gras_socket_peer_name(server),
102         gras_socket_peer_port(server),
103         msgtype->name,ctx->ID,
104         (__xbt_ex_ctx()->ctx_caught?"":" not"));
105
106   gras_msg_send_ext(server, e_gras_msg_kind_rpccall, ctx->ID, msgtype, request);
107
108   return ctx;
109 }
110
111 /** @brief Wait teh answer of a RPC call previously launched asynchronously */
112 void gras_msg_rpc_async_wait(gras_msg_cb_ctx_t ctx,
113                              void *answer) {
114   s_gras_msg_t received;
115
116   if (ctx->msgtype->answer_type) {
117     xbt_assert1(answer,
118                 "Answers to RPC '%s' convey a payload you must accept",
119                 ctx->msgtype->name);
120   } else {
121     xbt_assert1(!answer,
122                 "No payload was declared for answers to RPC '%s'",
123                 ctx->msgtype->name);
124   }
125
126   gras_msg_wait_ext(ctx->timeout,
127                     ctx->msgtype, NULL, msgfilter_rpcID, &ctx->ID,
128                     &received);
129   free(ctx);
130   if (received.kind == e_gras_msg_kind_rpcerror) {
131     xbt_ex_t e;
132     memcpy(&e,received.payl,received.payl_size);
133     free(received.payl);
134     VERB3("Raise a remote exception cat:%d comming from %s (%s)",
135           e.category, e.host, e.msg);
136      __xbt_ex_ctx()->ctx_ex.msg      = e.msg;
137      __xbt_ex_ctx()->ctx_ex.category = e.category;
138      __xbt_ex_ctx()->ctx_ex.value    = e.value;
139      __xbt_ex_ctx()->ctx_ex.remote   = 1;
140      __xbt_ex_ctx()->ctx_ex.host     = e.host;
141      __xbt_ex_ctx()->ctx_ex.procname = e.procname;
142      __xbt_ex_ctx()->ctx_ex.pid      = e.pid;
143      __xbt_ex_ctx()->ctx_ex.file     = e.file;
144      __xbt_ex_ctx()->ctx_ex.line     = e.line;
145      __xbt_ex_ctx()->ctx_ex.func     = e.func;
146      __xbt_ex_ctx()->ctx_ex.used     = e.used;
147      __xbt_ex_ctx()->ctx_ex.bt_strings = e.bt_strings;
148      memset(&__xbt_ex_ctx()->ctx_ex.bt,0,
149             sizeof(__xbt_ex_ctx()->ctx_ex.bt));
150     DO_THROW(__xbt_ex_ctx()->ctx_ex);
151   }
152   memcpy(answer,received.payl,received.payl_size);
153   free(received.payl);
154 }
155
156 /** @brief Conduct a RPC call */
157 void gras_msg_rpccall(gras_socket_t server,
158                       double timeout,
159                       gras_msgtype_t msgtype,
160                       void *request, void *answer) {
161
162   gras_msg_cb_ctx_t ctx;
163
164   ctx= gras_msg_rpc_async_call(server, timeout,msgtype,request);
165   gras_msg_rpc_async_wait(ctx, answer);
166 }
167
168
169 /** @brief Return the result of a RPC call
170  *
171  * It done before the actual return of the callback so that the callback can do
172  * some cleanups before leaving.
173  */
174
175 void gras_msg_rpcreturn(double timeOut,gras_msg_cb_ctx_t ctx,void *answer) {
176   gras_msg_send_ext(ctx->expeditor, e_gras_msg_kind_rpcanswer, 
177                     ctx->ID, ctx->msgtype, answer);
178 }
179