Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix copyright headers
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* rpc - demo of the RPC features in GRAS                                   */
2
3 /* Copyright (c) 2006, 2007, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc, "Messages specific to this example");
12
13 /* register messages which may be sent (common to client and server) */
14 static void register_messages(void)
15 {
16   gras_msgtype_declare_rpc("plain ping",
17                            gras_datadesc_by_name("int"),
18                            gras_datadesc_by_name("int"));
19
20   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
21   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
22   gras_msgtype_declare("kill", NULL);
23 }
24
25 /* Function prototypes */
26 int server(int argc, char *argv[]);
27 int forwarder(int argc, char *argv[]);
28 int client(int argc, char *argv[]);
29
30 #define exception_msg       "Error for the client"
31 #define exception_raising() THROW0(unknown_error,42,exception_msg)
32
33 static void exception_catching(void)
34 {
35   int gotit = 0, i;
36   xbt_ex_t e;
37
38   for (i = 0; i < 5; i++) {
39     gotit = 0;
40     TRY {
41       exception_raising();
42     }
43     CATCH(e) {
44       gotit = 1;
45     }
46     if (!gotit) {
47       THROW0(unknown_error, 0, "Didn't got the remote exception!");
48     }
49     xbt_assert2(e.category == unknown_error,
50                 "Got wrong category: %d (instead of %d)", e.category,
51                 unknown_error);
52     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
53     xbt_assert1(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
54                 "Got wrong message: %s", e.msg);
55     xbt_ex_free(e);
56   }
57 }
58
59 /* **********************************************************************
60  * Client code
61  * **********************************************************************/
62
63
64 int client(int argc, char *argv[])
65 {
66   xbt_ex_t e;
67   gras_socket_t toserver = NULL;        /* peer */
68   gras_socket_t toforwarder = NULL;     /* peer */
69
70   int ping, pong, i;
71   volatile int gotit = 0;
72
73
74   const char *host = "127.0.0.1";
75   int port = 4000;
76
77   memset(&e, 0, sizeof(xbt_ex_t));
78
79   /* 1. Init the GRAS's infrastructure */
80   gras_init(&argc, argv);
81
82   /* 2. Get the server's address. The command line override defaults when specified */
83   if (argc == 5) {
84     host = argv[1];
85     port = atoi(argv[2]);
86   }
87   INFO2("Launch client (server on %s:%d)", host, port);
88
89   exception_catching();
90
91   /* 3. Wait for the server & forwarder startup */
92   gras_os_sleep(2);
93
94   /* 4. Create a socket to speak to the server */
95   TRY {
96     exception_catching();
97     toserver = gras_socket_client(host, port);
98     toforwarder = gras_socket_client(argv[3], atoi(argv[4]));
99   }
100   CATCH(e) {
101     RETHROW0("Unable to connect to the server: %s");
102   }
103   INFO2("Connected to %s:%d.", host, port);
104
105
106   /* 5. Register the messages.
107      See, it doesn't have to be done completely at the beginning,
108      but only before use */
109   exception_catching();
110   register_messages();
111
112   /* 6. Keep the user informed of what's going on */
113   INFO2("Connected to server which is on %s:%d",
114         gras_socket_peer_name(toserver), gras_socket_peer_port(toserver));
115
116   /* 7. Prepare and send the ping message to the server */
117   ping = 1234;
118   TRY {
119     exception_catching();
120     gras_msg_rpccall(toserver, 6000.0, "plain ping", &ping, &pong);
121   }
122   CATCH(e) {
123     gras_socket_close(toserver);
124     RETHROW0("Failed to execute a PING rpc on the server: %s");
125   }
126   exception_catching();
127
128   /* 8. Keep the user informed of what's going on, again */
129   INFO4("The answer to PING(%d) on %s:%d is PONG(%d)",
130         ping,
131         gras_socket_peer_name(toserver), gras_socket_peer_port(toserver),
132         pong);
133
134   /* 9. Call a RPC which raises an exception (to test exception propagation) */
135   INFO0("Call the exception raising RPC");
136   TRY {
137     gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
138   }
139   CATCH(e) {
140     gotit = 1;
141     xbt_assert2(e.category == unknown_error,
142                 "Got wrong category: %d (instead of %d)",
143                 e.category, unknown_error);
144     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
145     xbt_assert1(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
146                 "Got wrong message: %s", e.msg);
147     INFO0
148       ("Got the expected exception when calling the exception raising RPC");
149     xbt_ex_free(e);
150   }
151
152   if (!gotit)
153     THROW0(unknown_error, 0, "Didn't got the remote exception!");
154
155   INFO0("Called the exception raising RPC");
156   exception_catching();
157
158   /* doxygen_ignore */
159   for (i = 0; i < 5; i++) {
160
161     INFO1("Call the exception raising RPC (i=%d)", i);
162     TRY {
163       gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
164     }
165     CATCH(e) {
166       gotit = 1;
167       xbt_ex_free(e);
168     }
169     if (!gotit) {
170       THROW0(unknown_error, 0, "Didn't got the remote exception!");
171     }
172   }
173   /* doxygen_resume */
174
175   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
176   for (i = 0; i < 5; i++) {
177     INFO1("Call the exception raising RPC on the forwarder (i=%d)", i);
178     TRY {
179       gras_msg_rpccall(toforwarder, 6000.0, "forward exception", NULL, NULL);
180     }
181     CATCH(e) {
182       gotit = 1;
183     }
184     if (!gotit) {
185       THROW0(unknown_error, 0, "Didn't got the remote exception!");
186     }
187     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
188     xbt_assert1(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
189                 "Got wrong message: %s", e.msg);
190     xbt_assert2(e.category == unknown_error,
191                 "Got wrong category: %d (instead of %d)",
192                 e.category, unknown_error);
193     INFO0
194       ("Got the expected exception when calling the exception raising RPC");
195     xbt_ex_free(e);
196     exception_catching();
197   }
198
199   INFO2("Ask %s:%d to die", gras_socket_peer_name(toforwarder),
200         gras_socket_peer_port(toforwarder));
201   gras_msg_send(toforwarder, "kill", NULL);
202   INFO2("Ask %s:%d to die", gras_socket_peer_name(toserver),
203         gras_socket_peer_port(toserver));
204   gras_msg_send(toserver, "kill", NULL);
205
206   /* 11. Cleanup the place before leaving */
207   gras_socket_close(toserver);
208   gras_socket_close(toforwarder);
209   INFO0("Done.");
210   gras_exit();
211   return 0;
212 }                               /* end_of_client */
213
214
215 /* **********************************************************************
216  * Forwarder code
217  * **********************************************************************/
218 typedef struct {
219   gras_socket_t server;
220   int done;
221 } s_forward_data_t, *forward_data_t;
222
223 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
224 {
225   forward_data_t fdata;
226   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
227   INFO2("Asked to die by %s:%d", gras_socket_peer_name(expeditor),
228         gras_socket_peer_port(expeditor));
229   fdata = gras_userdata_get();
230   fdata->done = 1;
231   return 0;
232 }
233
234 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx, void *payload_data)
235 {
236   forward_data_t fdata = gras_userdata_get();
237
238   INFO0("Forward a request");
239   gras_msg_rpccall(fdata->server, 60, "raise exception", NULL, NULL);
240   return 0;
241 }
242
243 int forwarder(int argc, char *argv[])
244 {
245   gras_socket_t mysock;
246   int port;
247   forward_data_t fdata;
248
249   gras_init(&argc, argv);
250
251   xbt_assert(argc == 4);
252
253   fdata = gras_userdata_new(s_forward_data_t);
254   fdata->done = 0;
255   port = atoi(argv[1]);
256
257   INFO1("Launch forwarder (port=%d)", port);
258   mysock = gras_socket_server(port);
259
260   gras_os_sleep(1);             /* wait for the server to be ready */
261   fdata->server = gras_socket_client(argv[2], atoi(argv[3]));
262
263   register_messages();
264   gras_cb_register("forward exception", &forwarder_cb_forward_ex);
265   gras_cb_register("kill", &forwarder_cb_kill);
266
267   while (!fdata->done) {
268     gras_msg_handle(600.0);
269   }
270
271   gras_socket_close(mysock);
272   gras_socket_close(fdata->server);
273   free(fdata);
274   INFO0("Done.");
275   gras_exit();
276   return 0;
277 }
278
279 /* **********************************************************************
280  * Server code
281  * **********************************************************************/
282 typedef struct {
283   gras_socket_t server;
284   int done;
285 } s_server_data_t, *server_data_t;
286
287 static int server_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
288 {
289   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
290   server_data_t sdata;
291
292   INFO2("Asked to die by %s:%d", gras_socket_peer_name(expeditor),
293         gras_socket_peer_port(expeditor));
294
295   sdata = gras_userdata_get();
296   sdata->done = 1;
297   return 0;
298 }
299
300 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx, void *payload_data)
301 {
302   exception_raising();
303   return 0;
304 }
305
306 static int server_cb_ping(gras_msg_cb_ctx_t ctx, void *payload_data)
307 {
308
309   /* 1. Get the payload into the msg variable, and retrieve who called us */
310   int msg = *(int *) payload_data;
311   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
312
313   /* 2. Log which client connected */
314   INFO3("Got message PING(%d) from %s:%d",
315         msg,
316         gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
317
318   /* 4. Change the value of the msg variable */
319   msg = 4321;
320
321   /* 5. Return as result */
322   gras_msg_rpcreturn(6000, ctx, &msg);
323   INFO0("Answered with PONG(4321)");
324
325   /* 6. Cleanups, if any */
326
327   /* 7. Tell GRAS that we consummed this message */
328   return 0;
329 }                               /* end_of_server_cb_ping */
330
331
332 int server(int argc, char *argv[])
333 {
334   gras_socket_t mysock;
335   server_data_t sdata;
336
337   int port = 4000;
338
339   /* 1. Init the GRAS infrastructure */
340   gras_init(&argc, argv);
341
342   /* 2. Get the port I should listen on from the command line, if specified */
343   if (argc == 2)
344     port = atoi(argv[1]);
345
346   sdata = gras_userdata_new(s_server_data_t);
347   sdata->done = 0;
348
349   INFO1("Launch server (port=%d)", port);
350
351   /* 3. Create my master socket */
352   mysock = gras_socket_server(port);
353
354   /* 4. Register the known messages and register my callback */
355   register_messages();
356   gras_cb_register("plain ping", &server_cb_ping);
357   gras_cb_register("raise exception", &server_cb_raise_ex);
358   gras_cb_register("kill", &server_cb_kill);
359
360   INFO1("Listening on port %d", gras_socket_my_port(mysock));
361
362   /* 5. Wait for the ping incomming messages */
363
364   /** \bug if the server is gone before the forwarder tries to connect,
365      it dies awfully with the following message. The problem stands somewhere
366      at the interface between the gras_socket_t and the msg mess. There is thus
367      no way for me to dive into this before this interface is rewritten
368 ==15875== Invalid read of size 4
369 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
370 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
371 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
372 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
373 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
374 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
375 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
376 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
377 ==15875==    by 0x42AA549: clone (clone.S:119)
378 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
379 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
380 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
381 ==15875==    by 0x4049386: gras_exit (gras.c:64)
382 ==15875==    by 0x804B936: server (rpc.c:345)
383 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
384 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
385 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
386 ==15875==    by 0x42AA549: clone (clone.S:119)
387   */
388   while (!sdata->done) {
389     gras_msg_handle(600.0);
390     exception_catching();
391   }
392
393   /* 8. Free the allocated resources, and shut GRAS down */
394   free(sdata);
395   gras_socket_close(mysock);
396   INFO0("Done.");
397   gras_exit();
398
399   return 0;
400 }                               /* end_of_server */