3 /* rpc - demo of the RPC features in GRAS */
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
12 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc,"Messages specific to this example");
14 int err=0; /* to make the message of the raised exception more informative and
15 even be able to follow their propagation from server to client*/
17 /* register messages which may be sent (common to client and server) */
18 static void register_messages(void) {
19 gras_msgtype_declare_rpc("plain ping",
20 gras_datadesc_by_name("int"),
21 gras_datadesc_by_name("int"));
23 gras_msgtype_declare_rpc("raise exception", NULL, NULL);
24 gras_msgtype_declare_rpc("forward exception", NULL, NULL);
25 gras_msgtype_declare("kill",NULL);
28 /* Function prototypes */
29 int server (int argc,char *argv[]);
30 int forwarder (int argc,char *argv[]);
31 int client (int argc,char *argv[]);
33 static void exception_raising(void) {
34 THROW1(unknown_error,42,"Some error we will catch on client side %d",err++);
36 static void exception_catching(void) {
48 THROW0(unknown_error,0,"Didn't got the remote exception!");
50 xbt_assert2(e.category == unknown_error, "Got wrong category: %d (instead of %d)",
51 e.category,unknown_error);
52 xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
53 xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
54 strlen("Some error we will catch on client side")),
55 "Got wrong message: %s", e.msg);
60 /* **********************************************************************
62 * **********************************************************************/
65 int client(int argc,char *argv[]) {
67 gras_socket_t toserver=NULL; /* peer */
68 gras_socket_t toforwarder=NULL; /* peer */
74 const char *host = "127.0.0.1";
77 memset(&e,0,sizeof(xbt_ex_t));
79 /* 1. Init the GRAS's infrastructure */
80 gras_init(&argc, argv);
82 /* 2. Get the server's address. The command line override defaults when specified */
87 INFO2("Launch client (server on %s:%d)",host,port);
91 /* 3. Wait for the server & forwarder startup */
94 /* 4. Create a socket to speak to the server */
97 toserver=gras_socket_client(host,port);
98 toforwarder=gras_socket_client(argv[3],atoi(argv[4]));
100 RETHROW0("Unable to connect to the server: %s");
102 INFO2("Connected to %s:%d.",host,port);
105 /* 5. Register the messages.
106 See, it doesn't have to be done completely at the beginning,
107 but only before use */
108 exception_catching();
111 /* 6. Keep the user informed of what's going on */
112 INFO2("Connected to server which is on %s:%d ",
113 gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
115 /* 7. Prepare and send the ping message to the server */
118 exception_catching();
119 gras_msg_rpccall(toserver, 6000.0, "plain ping", &ping, &pong);
121 gras_socket_close(toserver);
122 RETHROW0("Failed to execute a PING rpc on the server: %s");
124 exception_catching();
126 /* 8. Keep the user informed of what's going on, again */
127 INFO4("The answer to PING(%d) on %s:%d is PONG(%d) ",
129 gras_socket_peer_name(toserver),gras_socket_peer_port(toserver),
132 /* 9. Call a RPC which raises an exception (to test exception propagation) */
133 INFO0("Call the exception raising RPC");
135 gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
138 xbt_assert2(e.category == unknown_error,
139 "Got wrong category: %d (instead of %d)",
140 e.category,unknown_error);
141 xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
142 xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
143 strlen("Some error we will catch on client side")),
144 "Got wrong message: %s", e.msg);
145 INFO0("Got the expected exception when calling the exception raising RPC");
150 THROW0(unknown_error,0,"Didn't got the remote exception!");
152 INFO0("Called the exception raising RPC");
153 exception_catching();
156 for (i=0; i<5; i++) {
158 INFO1("Call the exception raising RPC (i=%d)",i);
160 gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
166 THROW0(unknown_error,0,"Didn't got the remote exception!");
171 /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
173 INFO1("Call the exception raising RPC on the forwarder (i=%d)",i);
175 gras_msg_rpccall(toforwarder, 6000.0, "forward exception", NULL, NULL);
180 THROW0(unknown_error,0,"Didn't got the remote exception!");
182 xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
183 xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
184 strlen("Some error we will catch on client side")),
185 "Got wrong message: %s", e.msg);
186 xbt_assert2(e.category == unknown_error,
187 "Got wrong category: %d (instead of %d)",
188 e.category,unknown_error);
189 INFO0("Got the expected exception when calling the exception raising RPC");
191 exception_catching();
194 INFO2("Ask %s:%d to die",gras_socket_peer_name(toforwarder),gras_socket_peer_port(toforwarder));
195 gras_msg_send(toforwarder,"kill",NULL);
196 INFO2("Ask %s:%d to die",gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
197 gras_msg_send(toserver,"kill",NULL);
199 /* 11. Cleanup the place before leaving */
200 gras_socket_close(toserver);
201 gras_socket_close(toforwarder);
205 } /* end_of_client */
208 /* **********************************************************************
210 * **********************************************************************/
212 gras_socket_t server;
214 } s_forward_data_t, *forward_data_t;
216 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx,
217 void *payload_data) {
218 forward_data_t fdata;
219 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
220 INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
221 fdata=gras_userdata_get();
226 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx,
227 void *payload_data) {
228 forward_data_t fdata=gras_userdata_get();
230 INFO0("Forward a request");
231 gras_msg_rpccall(fdata->server, 60, "raise exception",NULL,NULL);
235 int forwarder (int argc,char *argv[]) {
236 gras_socket_t mysock;
238 forward_data_t fdata;
240 gras_init(&argc,argv);
242 xbt_assert(argc == 4);
244 fdata=gras_userdata_new(s_forward_data_t);
248 INFO1("Launch forwarder (port=%d)", port);
249 mysock = gras_socket_server(port);
251 gras_os_sleep(1); /* wait for the server to be ready */
252 fdata->server=gras_socket_client(argv[2],atoi(argv[3]));
255 gras_cb_register("forward exception", &forwarder_cb_forward_ex);
256 gras_cb_register("kill", &forwarder_cb_kill);
258 while (!fdata->done) {
259 gras_msg_handle(600.0);
262 gras_socket_close(mysock);
263 gras_socket_close(fdata->server);
270 /* **********************************************************************
272 * **********************************************************************/
274 gras_socket_t server;
276 } s_server_data_t, *server_data_t;
278 static int server_cb_kill(gras_msg_cb_ctx_t ctx,
279 void *payload_data) {
280 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
283 INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
285 sdata=gras_userdata_get();
290 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx,
291 void *payload_data) {
296 static int server_cb_ping(gras_msg_cb_ctx_t ctx,
297 void *payload_data) {
299 /* 1. Get the payload into the msg variable, and retrieve who called us */
300 int msg=*(int*)payload_data;
301 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
303 /* 2. Log which client connected */
304 INFO3("Got message PING(%d) from %s:%d ",
306 gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
308 /* 4. Change the value of the msg variable */
311 /* 5. Return as result */
312 gras_msg_rpcreturn(6000,ctx,&msg);
313 INFO0("Answered with PONG(4321)");
315 /* 6. Cleanups, if any */
317 /* 7. Tell GRAS that we consummed this message */
319 } /* end_of_server_cb_ping */
322 int server (int argc,char *argv[]) {
323 gras_socket_t mysock;
328 /* 1. Init the GRAS infrastructure */
329 gras_init(&argc,argv);
331 /* 2. Get the port I should listen on from the command line, if specified */
335 sdata=gras_userdata_new(s_server_data_t);
338 INFO1("Launch server (port=%d)", port);
340 /* 3. Create my master socket */
341 mysock = gras_socket_server(port);
343 /* 4. Register the known messages and register my callback */
345 gras_cb_register("plain ping",&server_cb_ping);
346 gras_cb_register("raise exception",&server_cb_raise_ex);
347 gras_cb_register("kill",&server_cb_kill);
349 INFO1("Listening on port %d ", gras_socket_my_port(mysock));
351 /* 5. Wait for the ping incomming messages */
353 /** \bug if the server is gone before the forwarder tries to connect,
354 it dies awfully with the following message. The problem stands somewhere
355 at the interface between the gras_socket_t and the msg mess. There is thus
356 no way for me to dive into this before this interface is rewritten
357 ==15875== Invalid read of size 4
358 ==15875== at 0x408B805: find_port (transport_plugin_sg.c:68)
359 ==15875== by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
360 ==15875== by 0x404A38B: gras_socket_client_ext (transport.c:255)
361 ==15875== by 0x404A605: gras_socket_client (transport.c:288)
362 ==15875== by 0x804B49D: forwarder (rpc.c:245)
363 ==15875== by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
364 ==15875== by 0x406780B: __context_wrapper (context.c:164)
365 ==15875== by 0x41A6CB3: pthread_start_thread (manager.c:310)
366 ==15875== by 0x42AA549: clone (clone.S:119)
367 ==15875== Address 0x433B49C is 44 bytes inside a block of size 48 free'd
368 ==15875== at 0x401CF46: free (vg_replace_malloc.c:235)
369 ==15875== by 0x408F1FA: gras_process_exit (sg_process.c:117)
370 ==15875== by 0x4049386: gras_exit (gras.c:64)
371 ==15875== by 0x804B936: server (rpc.c:345)
372 ==15875== by 0x80492B1: launch_server (_rpc_simulator.c:69)
373 ==15875== by 0x406780B: __context_wrapper (context.c:164)
374 ==15875== by 0x41A6CB3: pthread_start_thread (manager.c:310)
375 ==15875== by 0x42AA549: clone (clone.S:119)
377 while (!sdata->done) {
378 gras_msg_handle(600.0);
379 exception_catching();
382 /* 8. Free the allocated resources, and shut GRAS down */
384 gras_socket_close(mysock);
389 } /* end_of_server */