Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill old $Id$ command dating from CVS
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* rpc - demo of the RPC features in GRAS                                   */
2
3 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "gras.h"
9
10 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc, "Messages specific to this example");
11
12 /* register messages which may be sent (common to client and server) */
13 static void register_messages(void)
14 {
15   gras_msgtype_declare_rpc("plain ping",
16                            gras_datadesc_by_name("int"),
17                            gras_datadesc_by_name("int"));
18
19   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
20   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
21   gras_msgtype_declare("kill", NULL);
22 }
23
24 /* Function prototypes */
25 int server(int argc, char *argv[]);
26 int forwarder(int argc, char *argv[]);
27 int client(int argc, char *argv[]);
28
29 #define exception_msg       "Error for the client"
30 #define exception_raising() THROW0(unknown_error,42,exception_msg)
31
32 static void exception_catching(void)
33 {
34   int gotit = 0, i;
35   xbt_ex_t e;
36
37   for (i = 0; i < 5; i++) {
38     gotit = 0;
39     TRY {
40       exception_raising();
41     }
42     CATCH(e) {
43       gotit = 1;
44     }
45     if (!gotit) {
46       THROW0(unknown_error, 0, "Didn't got the remote exception!");
47     }
48     xbt_assert2(e.category == unknown_error,
49                 "Got wrong category: %d (instead of %d)", e.category,
50                 unknown_error);
51     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
52     xbt_assert1(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
53                 "Got wrong message: %s", e.msg);
54     xbt_ex_free(e);
55   }
56 }
57
58 /* **********************************************************************
59  * Client code
60  * **********************************************************************/
61
62
63 int client(int argc, char *argv[])
64 {
65   xbt_ex_t e;
66   gras_socket_t toserver = NULL;        /* peer */
67   gras_socket_t toforwarder = NULL;     /* peer */
68
69   int ping, pong, i;
70   volatile int gotit = 0;
71
72
73   const char *host = "127.0.0.1";
74   int port = 4000;
75
76   memset(&e, 0, sizeof(xbt_ex_t));
77
78   /* 1. Init the GRAS's infrastructure */
79   gras_init(&argc, argv);
80
81   /* 2. Get the server's address. The command line override defaults when specified */
82   if (argc == 5) {
83     host = argv[1];
84     port = atoi(argv[2]);
85   }
86   INFO2("Launch client (server on %s:%d)", host, port);
87
88   exception_catching();
89
90   /* 3. Wait for the server & forwarder startup */
91   gras_os_sleep(2);
92
93   /* 4. Create a socket to speak to the server */
94   TRY {
95     exception_catching();
96     toserver = gras_socket_client(host, port);
97     toforwarder = gras_socket_client(argv[3], atoi(argv[4]));
98   }
99   CATCH(e) {
100     RETHROW0("Unable to connect to the server: %s");
101   }
102   INFO2("Connected to %s:%d.", host, port);
103
104
105   /* 5. Register the messages.
106      See, it doesn't have to be done completely at the beginning,
107      but only before use */
108   exception_catching();
109   register_messages();
110
111   /* 6. Keep the user informed of what's going on */
112   INFO2("Connected to server which is on %s:%d",
113         gras_socket_peer_name(toserver), gras_socket_peer_port(toserver));
114
115   /* 7. Prepare and send the ping message to the server */
116   ping = 1234;
117   TRY {
118     exception_catching();
119     gras_msg_rpccall(toserver, 6000.0, "plain ping", &ping, &pong);
120   }
121   CATCH(e) {
122     gras_socket_close(toserver);
123     RETHROW0("Failed to execute a PING rpc on the server: %s");
124   }
125   exception_catching();
126
127   /* 8. Keep the user informed of what's going on, again */
128   INFO4("The answer to PING(%d) on %s:%d is PONG(%d)",
129         ping,
130         gras_socket_peer_name(toserver), gras_socket_peer_port(toserver),
131         pong);
132
133   /* 9. Call a RPC which raises an exception (to test exception propagation) */
134   INFO0("Call the exception raising RPC");
135   TRY {
136     gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
137   }
138   CATCH(e) {
139     gotit = 1;
140     xbt_assert2(e.category == unknown_error,
141                 "Got wrong category: %d (instead of %d)",
142                 e.category, unknown_error);
143     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
144     xbt_assert1(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
145                 "Got wrong message: %s", e.msg);
146     INFO0
147       ("Got the expected exception when calling the exception raising RPC");
148     xbt_ex_free(e);
149   }
150
151   if (!gotit)
152     THROW0(unknown_error, 0, "Didn't got the remote exception!");
153
154   INFO0("Called the exception raising RPC");
155   exception_catching();
156
157   /* doxygen_ignore */
158   for (i = 0; i < 5; i++) {
159
160     INFO1("Call the exception raising RPC (i=%d)", i);
161     TRY {
162       gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
163     }
164     CATCH(e) {
165       gotit = 1;
166       xbt_ex_free(e);
167     }
168     if (!gotit) {
169       THROW0(unknown_error, 0, "Didn't got the remote exception!");
170     }
171   }
172   /* doxygen_resume */
173
174   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
175   for (i = 0; i < 5; i++) {
176     INFO1("Call the exception raising RPC on the forwarder (i=%d)", i);
177     TRY {
178       gras_msg_rpccall(toforwarder, 6000.0, "forward exception", NULL, NULL);
179     }
180     CATCH(e) {
181       gotit = 1;
182     }
183     if (!gotit) {
184       THROW0(unknown_error, 0, "Didn't got the remote exception!");
185     }
186     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
187     xbt_assert1(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
188                 "Got wrong message: %s", e.msg);
189     xbt_assert2(e.category == unknown_error,
190                 "Got wrong category: %d (instead of %d)",
191                 e.category, unknown_error);
192     INFO0
193       ("Got the expected exception when calling the exception raising RPC");
194     xbt_ex_free(e);
195     exception_catching();
196   }
197
198   INFO2("Ask %s:%d to die", gras_socket_peer_name(toforwarder),
199         gras_socket_peer_port(toforwarder));
200   gras_msg_send(toforwarder, "kill", NULL);
201   INFO2("Ask %s:%d to die", gras_socket_peer_name(toserver),
202         gras_socket_peer_port(toserver));
203   gras_msg_send(toserver, "kill", NULL);
204
205   /* 11. Cleanup the place before leaving */
206   gras_socket_close(toserver);
207   gras_socket_close(toforwarder);
208   INFO0("Done.");
209   gras_exit();
210   return 0;
211 }                               /* end_of_client */
212
213
214 /* **********************************************************************
215  * Forwarder code
216  * **********************************************************************/
217 typedef struct {
218   gras_socket_t server;
219   int done;
220 } s_forward_data_t, *forward_data_t;
221
222 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
223 {
224   forward_data_t fdata;
225   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
226   INFO2("Asked to die by %s:%d", gras_socket_peer_name(expeditor),
227         gras_socket_peer_port(expeditor));
228   fdata = gras_userdata_get();
229   fdata->done = 1;
230   return 0;
231 }
232
233 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx, void *payload_data)
234 {
235   forward_data_t fdata = gras_userdata_get();
236
237   INFO0("Forward a request");
238   gras_msg_rpccall(fdata->server, 60, "raise exception", NULL, NULL);
239   return 0;
240 }
241
242 int forwarder(int argc, char *argv[])
243 {
244   gras_socket_t mysock;
245   int port;
246   forward_data_t fdata;
247
248   gras_init(&argc, argv);
249
250   xbt_assert(argc == 4);
251
252   fdata = gras_userdata_new(s_forward_data_t);
253   fdata->done = 0;
254   port = atoi(argv[1]);
255
256   INFO1("Launch forwarder (port=%d)", port);
257   mysock = gras_socket_server(port);
258
259   gras_os_sleep(1);             /* wait for the server to be ready */
260   fdata->server = gras_socket_client(argv[2], atoi(argv[3]));
261
262   register_messages();
263   gras_cb_register("forward exception", &forwarder_cb_forward_ex);
264   gras_cb_register("kill", &forwarder_cb_kill);
265
266   while (!fdata->done) {
267     gras_msg_handle(600.0);
268   }
269
270   gras_socket_close(mysock);
271   gras_socket_close(fdata->server);
272   free(fdata);
273   INFO0("Done.");
274   gras_exit();
275   return 0;
276 }
277
278 /* **********************************************************************
279  * Server code
280  * **********************************************************************/
281 typedef struct {
282   gras_socket_t server;
283   int done;
284 } s_server_data_t, *server_data_t;
285
286 static int server_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
287 {
288   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
289   server_data_t sdata;
290
291   INFO2("Asked to die by %s:%d", gras_socket_peer_name(expeditor),
292         gras_socket_peer_port(expeditor));
293
294   sdata = gras_userdata_get();
295   sdata->done = 1;
296   return 0;
297 }
298
299 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx, void *payload_data)
300 {
301   exception_raising();
302   return 0;
303 }
304
305 static int server_cb_ping(gras_msg_cb_ctx_t ctx, void *payload_data)
306 {
307
308   /* 1. Get the payload into the msg variable, and retrieve who called us */
309   int msg = *(int *) payload_data;
310   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
311
312   /* 2. Log which client connected */
313   INFO3("Got message PING(%d) from %s:%d",
314         msg,
315         gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
316
317   /* 4. Change the value of the msg variable */
318   msg = 4321;
319
320   /* 5. Return as result */
321   gras_msg_rpcreturn(6000, ctx, &msg);
322   INFO0("Answered with PONG(4321)");
323
324   /* 6. Cleanups, if any */
325
326   /* 7. Tell GRAS that we consummed this message */
327   return 0;
328 }                               /* end_of_server_cb_ping */
329
330
331 int server(int argc, char *argv[])
332 {
333   gras_socket_t mysock;
334   server_data_t sdata;
335
336   int port = 4000;
337
338   /* 1. Init the GRAS infrastructure */
339   gras_init(&argc, argv);
340
341   /* 2. Get the port I should listen on from the command line, if specified */
342   if (argc == 2)
343     port = atoi(argv[1]);
344
345   sdata = gras_userdata_new(s_server_data_t);
346   sdata->done = 0;
347
348   INFO1("Launch server (port=%d)", port);
349
350   /* 3. Create my master socket */
351   mysock = gras_socket_server(port);
352
353   /* 4. Register the known messages and register my callback */
354   register_messages();
355   gras_cb_register("plain ping", &server_cb_ping);
356   gras_cb_register("raise exception", &server_cb_raise_ex);
357   gras_cb_register("kill", &server_cb_kill);
358
359   INFO1("Listening on port %d", gras_socket_my_port(mysock));
360
361   /* 5. Wait for the ping incomming messages */
362
363   /** \bug if the server is gone before the forwarder tries to connect,
364      it dies awfully with the following message. The problem stands somewhere
365      at the interface between the gras_socket_t and the msg mess. There is thus
366      no way for me to dive into this before this interface is rewritten
367 ==15875== Invalid read of size 4
368 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
369 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
370 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
371 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
372 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
373 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
374 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
375 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
376 ==15875==    by 0x42AA549: clone (clone.S:119)
377 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
378 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
379 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
380 ==15875==    by 0x4049386: gras_exit (gras.c:64)
381 ==15875==    by 0x804B936: server (rpc.c:345)
382 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
383 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
384 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
385 ==15875==    by 0x42AA549: clone (clone.S:119)
386   */
387   while (!sdata->done) {
388     gras_msg_handle(600.0);
389     exception_catching();
390   }
391
392   /* 8. Free the allocated resources, and shut GRAS down */
393   free(sdata);
394   gras_socket_close(mysock);
395   INFO0("Done.");
396   gras_exit();
397
398   return 0;
399 }                               /* end_of_server */