Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge back master branch
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* rpc - demo of the RPC features in GRAS                                   */
2
3 /* Copyright (c) 2006, 2007, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc, "Messages specific to this example");
12
13 /* register messages which may be sent (common to client and server) */
14 static void register_messages(void)
15 {
16   gras_msgtype_declare_rpc("plain ping",
17                            gras_datadesc_by_name("int"),
18                            gras_datadesc_by_name("int"));
19
20   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
21   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
22   gras_msgtype_declare("kill", NULL);
23 }
24
25 /* Function prototypes */
26 int server(int argc, char *argv[]);
27 int forwarder(int argc, char *argv[]);
28 int client(int argc, char *argv[]);
29
30 #define exception_msg       "Error for the client"
31 #define exception_raising() THROWF(unknown_error,42,exception_msg)
32
33 static void exception_catching(void)
34 {
35   int gotit = 0, i;
36   xbt_ex_t e;
37
38   for (i = 0; i < 5; i++) {
39     gotit = 0;
40     TRY {
41       exception_raising();
42     }
43     CATCH(e) {
44       gotit = 1;
45     }
46     if (!gotit) {
47       THROWF(unknown_error, 0, "Didn't got the remote exception!");
48     }
49     xbt_assert(e.category == unknown_error,
50                 "Got wrong category: %d (instead of %d)", e.category,
51                 unknown_error);
52     xbt_assert(e.value == 42, "Got wrong value: %d (!=42)", e.value);
53     xbt_assert(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
54                 "Got wrong message: %s", e.msg);
55     xbt_ex_free(e);
56   }
57 }
58
59 /* **********************************************************************
60  * Client code
61  * **********************************************************************/
62
63
64 int client(int argc, char *argv[])
65 {
66   xbt_ex_t e;
67   gras_socket_t toserver = NULL;        /* peer */
68   gras_socket_t toforwarder = NULL;     /* peer */
69
70   int ping, pong, i;
71   volatile int gotit = 0;
72
73
74   const char *host = "127.0.0.1";
75   int port = 4001;
76
77   memset(&e, 0, sizeof(xbt_ex_t));
78
79   /* 1. Init the GRAS's infrastructure */
80   gras_init(&argc, argv);
81
82   /* 2. Get the server's address. The command line override defaults when specified */
83   if (argc == 5) {
84     host = argv[1];
85     port = atoi(argv[2]);
86   }
87   XBT_INFO("Launch client (server on %s:%d)", host, port);
88
89   exception_catching();
90
91   /* 3. Wait for the server & forwarder startup */
92   gras_os_sleep(2);
93
94   /* 4. Create a socket to speak to the server */
95   TRY {
96     exception_catching();
97     toserver = gras_socket_client(host, port);
98     toforwarder = gras_socket_client(argv[3], atoi(argv[4]));
99   }
100   CATCH(e) {
101     RETHROWF("Unable to connect to the server: %s");
102   }
103   XBT_INFO("Connected to %s:%d.", host, port);
104
105
106   /* 5. Register the messages.
107      See, it doesn't have to be done completely at the beginning,
108      but only before use */
109   exception_catching();
110   register_messages();
111
112   /* 6. Keep the user informed of what's going on */
113   XBT_INFO("Connected to server which is on %s:%d",
114         gras_socket_peer_name(toserver), gras_socket_peer_port(toserver));
115
116   /* 7. Prepare and send the ping message to the server */
117   ping = 1234;
118   TRY {
119     exception_catching();
120     gras_msg_rpccall(toserver, 6000.0, "plain ping", &ping, &pong);
121   }
122   CATCH(e) {
123     gras_socket_close(toserver);
124     RETHROWF("Failed to execute a PING rpc on the server: %s");
125   }
126   exception_catching();
127
128   /* 8. Keep the user informed of what's going on, again */
129   XBT_INFO("The answer to PING(%d) on %s:%d is PONG(%d)",
130         ping,
131         gras_socket_peer_name(toserver), gras_socket_peer_port(toserver),
132         pong);
133
134   /* 9. Call a RPC which raises an exception (to test exception propagation) */
135   XBT_INFO("Call the exception raising RPC");
136   TRY {
137     gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
138   }
139   CATCH(e) {
140     gotit = 1;
141     xbt_assert(e.category == unknown_error,
142                 "Got wrong category: %d (instead of %d)",
143                 e.category, unknown_error);
144     xbt_assert(e.value == 42, "Got wrong value: %d (!=42)", e.value);
145     xbt_assert(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
146                 "Got wrong message: %s", e.msg);
147     XBT_INFO
148         ("Got the expected exception when calling the exception raising RPC");
149     xbt_ex_free(e);
150   }
151
152   if (!gotit)
153     THROWF(unknown_error, 0, "Didn't got the remote exception!");
154
155   XBT_INFO("Called the exception raising RPC");
156   exception_catching();
157
158   /* doxygen_ignore */
159   for (i = 0; i < 5; i++) {
160
161     XBT_INFO("Call the exception raising RPC (i=%d)", i);
162     TRY {
163       gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
164     }
165     CATCH(e) {
166       gotit = 1;
167       xbt_ex_free(e);
168     }
169     if (!gotit) {
170       THROWF(unknown_error, 0, "Didn't got the remote exception!");
171     }
172   }
173   /* doxygen_resume */
174
175   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
176   for (i = 0; i < 5; i++) {
177     XBT_INFO("Call the exception raising RPC on the forwarder (i=%d)", i);
178     TRY {
179       gras_msg_rpccall(toforwarder, 6000.0, "forward exception", NULL,
180                        NULL);
181     }
182     CATCH(e) {
183       gotit = 1;
184     }
185     if (!gotit) {
186       THROWF(unknown_error, 0, "Didn't got the remote exception!");
187     }
188     xbt_assert(e.value == 42, "Got wrong value: %d (!=42)", e.value);
189     xbt_assert(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
190                 "Got wrong message: %s", e.msg);
191     xbt_assert(e.category == unknown_error,
192                 "Got wrong category: %d (instead of %d)",
193                 e.category, unknown_error);
194     XBT_INFO
195         ("Got the expected exception when calling the exception raising RPC");
196     xbt_ex_free(e);
197     exception_catching();
198   }
199
200   XBT_INFO("Ask %s:%d to die", gras_socket_peer_name(toforwarder),
201         gras_socket_peer_port(toforwarder));
202   gras_msg_send(toforwarder, "kill", NULL);
203   XBT_INFO("Ask %s:%d to die", gras_socket_peer_name(toserver),
204         gras_socket_peer_port(toserver));
205   gras_msg_send(toserver, "kill", NULL);
206
207   /* 11. Cleanup the place before leaving */
208   gras_socket_close(toserver);
209   gras_socket_close(toforwarder);
210   XBT_INFO("Done.");
211   gras_exit();
212   return 0;
213 }                               /* end_of_client */
214
215
216 /* **********************************************************************
217  * Forwarder code
218  * **********************************************************************/
219 typedef struct {
220   gras_socket_t server;
221   int done;
222 } s_forward_data_t, *forward_data_t;
223
224 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
225 {
226   forward_data_t fdata;
227   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
228   XBT_INFO("Asked to die by %s:%d", gras_socket_peer_name(expeditor),
229         gras_socket_peer_port(expeditor));
230   fdata = gras_userdata_get();
231   fdata->done = 1;
232   return 0;
233 }
234
235 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx,
236                                    void *payload_data)
237 {
238   forward_data_t fdata = gras_userdata_get();
239
240   XBT_INFO("Forward a request");
241   gras_msg_rpccall(fdata->server, 60, "raise exception", NULL, NULL);
242   return 0;
243 }
244
245 int forwarder(int argc, char *argv[])
246 {
247   gras_socket_t mysock;
248   int port;
249   forward_data_t fdata;
250
251   gras_init(&argc, argv);
252
253   xbt_assert(argc == 4);
254
255   fdata = gras_userdata_new(s_forward_data_t);
256   fdata->done = 0;
257   port = atoi(argv[1]);
258
259   XBT_INFO("Launch forwarder (port=%d)", port);
260   mysock = gras_socket_server(port);
261
262   gras_os_sleep(1);             /* wait for the server to be ready */
263   fdata->server = gras_socket_client(argv[2], atoi(argv[3]));
264
265   register_messages();
266   gras_cb_register("forward exception", &forwarder_cb_forward_ex);
267   gras_cb_register("kill", &forwarder_cb_kill);
268
269   while (!fdata->done) {
270     gras_msg_handle(600.0);
271   }
272
273   gras_socket_close(mysock);
274   gras_socket_close(fdata->server);
275   free(fdata);
276   XBT_INFO("Done.");
277   gras_exit();
278   return 0;
279 }
280
281 /* **********************************************************************
282  * Server code
283  * **********************************************************************/
284 typedef struct {
285   gras_socket_t server;
286   int done;
287 } s_server_data_t, *server_data_t;
288
289 static int server_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
290 {
291   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
292   server_data_t sdata;
293
294   XBT_INFO("Asked to die by %s:%d", gras_socket_peer_name(expeditor),
295         gras_socket_peer_port(expeditor));
296
297   sdata = gras_userdata_get();
298   sdata->done = 1;
299   return 0;
300 }
301
302 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx, void *payload_data)
303 {
304   exception_raising();
305   return 0;
306 }
307
308 static int server_cb_ping(gras_msg_cb_ctx_t ctx, void *payload_data)
309 {
310
311   /* 1. Get the payload into the msg variable, and retrieve who called us */
312   int msg = *(int *) payload_data;
313   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
314
315   /* 2. Log which client connected */
316   XBT_INFO("Got message PING(%d) from %s:%d",
317         msg,
318         gras_socket_peer_name(expeditor),
319         gras_socket_peer_port(expeditor));
320
321   /* 4. Change the value of the msg variable */
322   msg = 4321;
323
324   /* 5. Return as result */
325   gras_msg_rpcreturn(6000, ctx, &msg);
326   XBT_INFO("Answered with PONG(4321)");
327
328   /* 6. Cleanups, if any */
329
330   /* 7. Tell GRAS that we consummed this message */
331   return 0;
332 }                               /* end_of_server_cb_ping */
333
334
335 int server(int argc, char *argv[])
336 {
337   gras_socket_t mysock;
338   server_data_t sdata;
339
340   int port = 4001;
341
342   /* 1. Init the GRAS infrastructure */
343   gras_init(&argc, argv);
344
345   /* 2. Get the port I should listen on from the command line, if specified */
346   if (argc == 2)
347     port = atoi(argv[1]);
348
349   sdata = gras_userdata_new(s_server_data_t);
350   sdata->done = 0;
351
352   XBT_INFO("Launch server (port=%d)", port);
353
354   /* 3. Create my master socket */
355   mysock = gras_socket_server(port);
356
357   /* 4. Register the known messages and register my callback */
358   register_messages();
359   gras_cb_register("plain ping", &server_cb_ping);
360   gras_cb_register("raise exception", &server_cb_raise_ex);
361   gras_cb_register("kill", &server_cb_kill);
362
363   XBT_INFO("Listening on port %d", gras_socket_my_port(mysock));
364
365   /* 5. Wait for the ping incomming messages */
366
367   /** \bug if the server is gone before the forwarder tries to connect,
368      it dies awfully with the following message. The problem stands somewhere
369      at the interface between the gras_socket_t and the msg mess. There is thus
370      no way for me to dive into this before this interface is rewritten
371 ==15875== Invalid read of size 4
372 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
373 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
374 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
375 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
376 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
377 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
378 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
379 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
380 ==15875==    by 0x42AA549: clone (clone.S:119)
381 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
382 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
383 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
384 ==15875==    by 0x4049386: gras_exit (gras.c:64)
385 ==15875==    by 0x804B936: server (rpc.c:345)
386 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
387 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
388 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
389 ==15875==    by 0x42AA549: clone (clone.S:119)
390   */
391   while (!sdata->done) {
392     gras_msg_handle(600.0);
393     exception_catching();
394   }
395
396   /* 8. Free the allocated resources, and shut GRAS down */
397   free(sdata);
398   gras_socket_close(mysock);
399   XBT_INFO("Done.");
400   gras_exit();
401
402   return 0;
403 }                               /* end_of_server */