Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add some missing copyright headers
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* $Id$ */
2
3 /* rpc - demo of the RPC features in GRAS                                   */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras.h"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc, "Messages specific to this example");
13
14 /* register messages which may be sent (common to client and server) */
15 static void register_messages(void)
16 {
17   gras_msgtype_declare_rpc("plain ping",
18                            gras_datadesc_by_name("int"),
19                            gras_datadesc_by_name("int"));
20
21   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
22   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
23   gras_msgtype_declare("kill", NULL);
24 }
25
26 /* Function prototypes */
27 int server(int argc, char *argv[]);
28 int forwarder(int argc, char *argv[]);
29 int client(int argc, char *argv[]);
30
31 #define exception_msg       "Error for the client"
32 #define exception_raising() THROW0(unknown_error,42,exception_msg)
33
34 static void exception_catching(void)
35 {
36   int gotit = 0, i;
37   xbt_ex_t e;
38
39   for (i = 0; i < 5; i++) {
40     gotit = 0;
41     TRY {
42       exception_raising();
43     }
44     CATCH(e) {
45       gotit = 1;
46     }
47     if (!gotit) {
48       THROW0(unknown_error, 0, "Didn't got the remote exception!");
49     }
50     xbt_assert2(e.category == unknown_error,
51                 "Got wrong category: %d (instead of %d)", e.category,
52                 unknown_error);
53     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
54     xbt_assert1(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
55                 "Got wrong message: %s", e.msg);
56     xbt_ex_free(e);
57   }
58 }
59
60 /* **********************************************************************
61  * Client code
62  * **********************************************************************/
63
64
65 int client(int argc, char *argv[])
66 {
67   xbt_ex_t e;
68   gras_socket_t toserver = NULL;        /* peer */
69   gras_socket_t toforwarder = NULL;     /* peer */
70
71   int ping, pong, i;
72   volatile int gotit = 0;
73
74
75   const char *host = "127.0.0.1";
76   int port = 4000;
77
78   memset(&e, 0, sizeof(xbt_ex_t));
79
80   /* 1. Init the GRAS's infrastructure */
81   gras_init(&argc, argv);
82
83   /* 2. Get the server's address. The command line override defaults when specified */
84   if (argc == 5) {
85     host = argv[1];
86     port = atoi(argv[2]);
87   }
88   INFO2("Launch client (server on %s:%d)", host, port);
89
90   exception_catching();
91
92   /* 3. Wait for the server & forwarder startup */
93   gras_os_sleep(2);
94
95   /* 4. Create a socket to speak to the server */
96   TRY {
97     exception_catching();
98     toserver = gras_socket_client(host, port);
99     toforwarder = gras_socket_client(argv[3], atoi(argv[4]));
100   }
101   CATCH(e) {
102     RETHROW0("Unable to connect to the server: %s");
103   }
104   INFO2("Connected to %s:%d.", host, port);
105
106
107   /* 5. Register the messages.
108      See, it doesn't have to be done completely at the beginning,
109      but only before use */
110   exception_catching();
111   register_messages();
112
113   /* 6. Keep the user informed of what's going on */
114   INFO2("Connected to server which is on %s:%d",
115         gras_socket_peer_name(toserver), gras_socket_peer_port(toserver));
116
117   /* 7. Prepare and send the ping message to the server */
118   ping = 1234;
119   TRY {
120     exception_catching();
121     gras_msg_rpccall(toserver, 6000.0, "plain ping", &ping, &pong);
122   }
123   CATCH(e) {
124     gras_socket_close(toserver);
125     RETHROW0("Failed to execute a PING rpc on the server: %s");
126   }
127   exception_catching();
128
129   /* 8. Keep the user informed of what's going on, again */
130   INFO4("The answer to PING(%d) on %s:%d is PONG(%d)",
131         ping,
132         gras_socket_peer_name(toserver), gras_socket_peer_port(toserver),
133         pong);
134
135   /* 9. Call a RPC which raises an exception (to test exception propagation) */
136   INFO0("Call the exception raising RPC");
137   TRY {
138     gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
139   }
140   CATCH(e) {
141     gotit = 1;
142     xbt_assert2(e.category == unknown_error,
143                 "Got wrong category: %d (instead of %d)",
144                 e.category, unknown_error);
145     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
146     xbt_assert1(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
147                 "Got wrong message: %s", e.msg);
148     INFO0
149       ("Got the expected exception when calling the exception raising RPC");
150     xbt_ex_free(e);
151   }
152
153   if (!gotit)
154     THROW0(unknown_error, 0, "Didn't got the remote exception!");
155
156   INFO0("Called the exception raising RPC");
157   exception_catching();
158
159   /* doxygen_ignore */
160   for (i = 0; i < 5; i++) {
161
162     INFO1("Call the exception raising RPC (i=%d)", i);
163     TRY {
164       gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
165     }
166     CATCH(e) {
167       gotit = 1;
168       xbt_ex_free(e);
169     }
170     if (!gotit) {
171       THROW0(unknown_error, 0, "Didn't got the remote exception!");
172     }
173   }
174   /* doxygen_resume */
175
176   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
177   for (i = 0; i < 5; i++) {
178     INFO1("Call the exception raising RPC on the forwarder (i=%d)", i);
179     TRY {
180       gras_msg_rpccall(toforwarder, 6000.0, "forward exception", NULL, NULL);
181     }
182     CATCH(e) {
183       gotit = 1;
184     }
185     if (!gotit) {
186       THROW0(unknown_error, 0, "Didn't got the remote exception!");
187     }
188     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
189     xbt_assert1(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
190                 "Got wrong message: %s", e.msg);
191     xbt_assert2(e.category == unknown_error,
192                 "Got wrong category: %d (instead of %d)",
193                 e.category, unknown_error);
194     INFO0
195       ("Got the expected exception when calling the exception raising RPC");
196     xbt_ex_free(e);
197     exception_catching();
198   }
199
200   INFO2("Ask %s:%d to die", gras_socket_peer_name(toforwarder),
201         gras_socket_peer_port(toforwarder));
202   gras_msg_send(toforwarder, "kill", NULL);
203   INFO2("Ask %s:%d to die", gras_socket_peer_name(toserver),
204         gras_socket_peer_port(toserver));
205   gras_msg_send(toserver, "kill", NULL);
206
207   /* 11. Cleanup the place before leaving */
208   gras_socket_close(toserver);
209   gras_socket_close(toforwarder);
210   INFO0("Done.");
211   gras_exit();
212   return 0;
213 }                               /* end_of_client */
214
215
216 /* **********************************************************************
217  * Forwarder code
218  * **********************************************************************/
219 typedef struct {
220   gras_socket_t server;
221   int done;
222 } s_forward_data_t, *forward_data_t;
223
224 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
225 {
226   forward_data_t fdata;
227   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
228   INFO2("Asked to die by %s:%d", gras_socket_peer_name(expeditor),
229         gras_socket_peer_port(expeditor));
230   fdata = gras_userdata_get();
231   fdata->done = 1;
232   return 0;
233 }
234
235 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx, void *payload_data)
236 {
237   forward_data_t fdata = gras_userdata_get();
238
239   INFO0("Forward a request");
240   gras_msg_rpccall(fdata->server, 60, "raise exception", NULL, NULL);
241   return 0;
242 }
243
244 int forwarder(int argc, char *argv[])
245 {
246   gras_socket_t mysock;
247   int port;
248   forward_data_t fdata;
249
250   gras_init(&argc, argv);
251
252   xbt_assert(argc == 4);
253
254   fdata = gras_userdata_new(s_forward_data_t);
255   fdata->done = 0;
256   port = atoi(argv[1]);
257
258   INFO1("Launch forwarder (port=%d)", port);
259   mysock = gras_socket_server(port);
260
261   gras_os_sleep(1);             /* wait for the server to be ready */
262   fdata->server = gras_socket_client(argv[2], atoi(argv[3]));
263
264   register_messages();
265   gras_cb_register("forward exception", &forwarder_cb_forward_ex);
266   gras_cb_register("kill", &forwarder_cb_kill);
267
268   while (!fdata->done) {
269     gras_msg_handle(600.0);
270   }
271
272   gras_socket_close(mysock);
273   gras_socket_close(fdata->server);
274   free(fdata);
275   INFO0("Done.");
276   gras_exit();
277   return 0;
278 }
279
280 /* **********************************************************************
281  * Server code
282  * **********************************************************************/
283 typedef struct {
284   gras_socket_t server;
285   int done;
286 } s_server_data_t, *server_data_t;
287
288 static int server_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
289 {
290   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
291   server_data_t sdata;
292
293   INFO2("Asked to die by %s:%d", gras_socket_peer_name(expeditor),
294         gras_socket_peer_port(expeditor));
295
296   sdata = gras_userdata_get();
297   sdata->done = 1;
298   return 0;
299 }
300
301 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx, void *payload_data)
302 {
303   exception_raising();
304   return 0;
305 }
306
307 static int server_cb_ping(gras_msg_cb_ctx_t ctx, void *payload_data)
308 {
309
310   /* 1. Get the payload into the msg variable, and retrieve who called us */
311   int msg = *(int *) payload_data;
312   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
313
314   /* 2. Log which client connected */
315   INFO3("Got message PING(%d) from %s:%d",
316         msg,
317         gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
318
319   /* 4. Change the value of the msg variable */
320   msg = 4321;
321
322   /* 5. Return as result */
323   gras_msg_rpcreturn(6000, ctx, &msg);
324   INFO0("Answered with PONG(4321)");
325
326   /* 6. Cleanups, if any */
327
328   /* 7. Tell GRAS that we consummed this message */
329   return 0;
330 }                               /* end_of_server_cb_ping */
331
332
333 int server(int argc, char *argv[])
334 {
335   gras_socket_t mysock;
336   server_data_t sdata;
337
338   int port = 4000;
339
340   /* 1. Init the GRAS infrastructure */
341   gras_init(&argc, argv);
342
343   /* 2. Get the port I should listen on from the command line, if specified */
344   if (argc == 2)
345     port = atoi(argv[1]);
346
347   sdata = gras_userdata_new(s_server_data_t);
348   sdata->done = 0;
349
350   INFO1("Launch server (port=%d)", port);
351
352   /* 3. Create my master socket */
353   mysock = gras_socket_server(port);
354
355   /* 4. Register the known messages and register my callback */
356   register_messages();
357   gras_cb_register("plain ping", &server_cb_ping);
358   gras_cb_register("raise exception", &server_cb_raise_ex);
359   gras_cb_register("kill", &server_cb_kill);
360
361   INFO1("Listening on port %d", gras_socket_my_port(mysock));
362
363   /* 5. Wait for the ping incomming messages */
364
365   /** \bug if the server is gone before the forwarder tries to connect,
366      it dies awfully with the following message. The problem stands somewhere
367      at the interface between the gras_socket_t and the msg mess. There is thus
368      no way for me to dive into this before this interface is rewritten
369 ==15875== Invalid read of size 4
370 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
371 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
372 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
373 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
374 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
375 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
376 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
377 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
378 ==15875==    by 0x42AA549: clone (clone.S:119)
379 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
380 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
381 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
382 ==15875==    by 0x4049386: gras_exit (gras.c:64)
383 ==15875==    by 0x804B936: server (rpc.c:345)
384 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
385 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
386 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
387 ==15875==    by 0x42AA549: clone (clone.S:119)
388   */
389   while (!sdata->done) {
390     gras_msg_handle(600.0);
391     exception_catching();
392   }
393
394   /* 8. Free the allocated resources, and shut GRAS down */
395   free(sdata);
396   gras_socket_close(mysock);
397   INFO0("Done.");
398   gras_exit();
399
400   return 0;
401 }                               /* end_of_server */