Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
987db03b9c8c38d9f55d4be200d2c4ceb225a94a
[simgrid.git] / src / gras / Msg / rpc.c
1 /* $Id$ */
2
3 /* rpc - RPC implementation on top of GRAS messages                         */
4
5 /* Copyright (c) 2005 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras/Msg/msg_private.h"
11                 
12 xbt_set_t _gras_rpctype_set = NULL;
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_rpc,gras_msg,"RPC mecanism");
15
16
17 /** @brief declare a new versionned RPC type of the given name and payloads
18  *
19  * @param name: name as it should be used for logging messages (must be uniq)
20  * @param payload_request: datatype of request
21  * @param payload_answer: datatype of answer
22  *
23  * Registers a new RPC message to the GRAS mechanism. RPC are constituted of a pair 
24  * of messages. 
25  */
26 void
27 gras_msgtype_declare_rpc(const char           *name,
28                          gras_datadesc_type_t  payload_request,
29                          gras_datadesc_type_t  payload_answer) {
30
31   gras_msgtype_declare_ext(name, 0, 
32                            e_gras_msg_kind_rpccall, 
33                            payload_request, payload_answer);
34
35 }
36
37 /** @brief declare a new versionned RPC type of the given name and payloads
38  *
39  * @param name: name as it should be used for logging messages (must be uniq)
40  * @param version: something like versionning symbol
41  * @param payload_request: datatype of request
42  * @param payload_answer: datatype of answer
43  *
44  * Registers a new RPC message to the GRAS mechanism. RPC are constituted of a pair 
45  * of messages. 
46  *
47  * Use this version instead of gras_rpctype_declare when you change the
48  * semantic or syntax of a message and want your programs to be able to deal
49  * with both versions. Internally, each will be handled as an independent
50  * message type, so you can register differents for each of them.
51  */
52 void
53 gras_msgtype_declare_rpc_v(const char           *name,
54                            short int             version,
55                            gras_datadesc_type_t  payload_request,
56                            gras_datadesc_type_t  payload_answer) {
57
58   gras_msgtype_declare_ext(name, version, 
59                            e_gras_msg_kind_rpccall, 
60                            payload_request, payload_answer);
61
62 }
63
64 static unsigned long int last_msg_ID = 0;
65
66 static int msgfilter_rpcID(gras_msg_t msg, void* ctx) {
67   unsigned long int ID= *(unsigned long int*)ctx;
68   int res = msg->ID == ID && 
69     (msg->kind == e_gras_msg_kind_rpcanswer || msg->kind == e_gras_msg_kind_rpcerror);
70
71   DEBUG5("Filter a message of ID %lu, type '%s' and kind '%s'. Waiting for ID=%lu. %s",
72          msg->ID,msg->type->name,e_gras_msg_kind_names[msg->kind],ID,
73          res?"take it": "reject");
74   return res;
75 }
76
77 /** @brief Launch a RPC call, but do not block for the answer */
78 gras_msg_cb_ctx_t 
79 gras_msg_rpc_async_call(gras_socket_t server,
80                         double timeOut,
81                         gras_msgtype_t msgtype,
82                         void *request) {
83   gras_msg_cb_ctx_t ctx = xbt_new0(s_gras_msg_cb_ctx_t,1);
84
85   if (msgtype->ctn_type) {
86     xbt_assert1(request,
87                 "RPC type '%s' convey a payload you must provide",
88                 msgtype->name);
89   } else {
90     xbt_assert1(!request,
91                 "No payload was declared for RPC type '%s'",
92                 msgtype->name);
93   }
94
95   ctx->ID = last_msg_ID++;
96   ctx->expeditor = server;
97   ctx->msgtype=msgtype;
98   ctx->timeout=timeOut;
99
100   VERB4("Send to %s:%d a RPC of type '%s' (ID=%lu)",
101         gras_socket_peer_name(server),
102         gras_socket_peer_port(server),
103         msgtype->name,ctx->ID);
104
105   gras_msg_send_ext(server, e_gras_msg_kind_rpccall, ctx->ID, msgtype, request);
106
107   return ctx;
108 }
109
110 /** @brief Wait teh answer of a RPC call previously launched asynchronously */
111 void gras_msg_rpc_async_wait(gras_msg_cb_ctx_t ctx,
112                              void *answer) {
113   s_gras_msg_t received;
114
115   if (ctx->msgtype->answer_type) {
116     xbt_assert1(answer,
117                 "Answers to RPC '%s' convey a payload you must accept",
118                 ctx->msgtype->name);
119   } else {
120     xbt_assert1(!answer,
121                 "No payload was declared for answers to RPC '%s'",
122                 ctx->msgtype->name);
123   }
124
125   gras_msg_wait_ext(ctx->timeout,
126                     ctx->msgtype, NULL, msgfilter_rpcID, &ctx->ID,
127                     &received);
128   free(ctx);
129   if (received.kind == e_gras_msg_kind_rpcerror) {
130     xbt_ex_t e;
131     memcpy(&e,received.payl,received.payl_size);
132     free(received.payl);
133     VERB3("Raise a remote exception cat:%d comming from %s (%s)",
134           e.category, e.host, e.msg);
135      __xbt_ex_ctx()->ctx_ex.msg      = e.msg;
136      __xbt_ex_ctx()->ctx_ex.category = e.category;
137      __xbt_ex_ctx()->ctx_ex.value    = e.value;
138      __xbt_ex_ctx()->ctx_ex.remote   = 1;
139      __xbt_ex_ctx()->ctx_ex.host     = e.host;
140      __xbt_ex_ctx()->ctx_ex.procname = e.procname;
141      __xbt_ex_ctx()->ctx_ex.pid      = e.pid;
142      __xbt_ex_ctx()->ctx_ex.file     = e.file;
143      __xbt_ex_ctx()->ctx_ex.line     = e.line;
144      __xbt_ex_ctx()->ctx_ex.func     = e.func;
145      __xbt_ex_ctx()->ctx_ex.used     = e.used;
146      __xbt_ex_ctx()->ctx_ex.bt_strings = e.bt_strings;
147      memset(&__xbt_ex_ctx()->ctx_ex.bt,0,
148             sizeof(__xbt_ex_ctx()->ctx_ex.bt));
149     DO_THROW(__xbt_ex_ctx()->ctx_ex);
150   }
151   memcpy(answer,received.payl,received.payl_size);
152   free(received.payl);
153 }
154
155 /** @brief Conduct a RPC call */
156 void gras_msg_rpccall(gras_socket_t server,
157                       double timeout,
158                       gras_msgtype_t msgtype,
159                       void *request, void *answer) {
160
161   gras_msg_cb_ctx_t ctx;
162
163   ctx= gras_msg_rpc_async_call(server, timeout,msgtype,request);
164   gras_msg_rpc_async_wait(ctx, answer);
165 }
166
167
168 /** @brief Return the result of a RPC call
169  *
170  * It done before the actual return of the callback so that the callback can do
171  * some cleanups before leaving.
172  */
173
174 void gras_msg_rpcreturn(double timeOut,gras_msg_cb_ctx_t ctx,void *answer) {
175   DEBUG5("Return to RPC '%s' from %s:%d (tOut=%f, payl=%p)",
176          ctx->msgtype->name,
177          gras_socket_peer_name(ctx->expeditor),gras_socket_peer_port(ctx->expeditor),
178          timeOut,answer);
179   gras_msg_send_ext(ctx->expeditor, e_gras_msg_kind_rpcanswer, 
180                     ctx->ID, ctx->msgtype, answer);
181 }
182