Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Move datadesc and TCP sockets from GRAS to XBT.
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* rpc - demo of the RPC features in GRAS                                   */
2
3 /* Copyright (c) 2006, 2007, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc, "Messages specific to this example");
12
13 /* register messages which may be sent (common to client and server) */
14 static void register_messages(void)
15 {
16   gras_msgtype_declare_rpc("plain ping",
17                            xbt_datadesc_by_name("int"),
18                            xbt_datadesc_by_name("int"));
19
20   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
21   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
22   gras_msgtype_declare("kill", NULL);
23 }
24
25 /* Function prototypes */
26 int server(int argc, char *argv[]);
27 int forwarder(int argc, char *argv[]);
28 int client(int argc, char *argv[]);
29
30 #define exception_msg       "Error for the client"
31 #define exception_raising() THROWF(unknown_error,42,exception_msg)
32
33 static void exception_catching(void)
34 {
35   int gotit = 0, i;
36   xbt_ex_t e;
37
38   for (i = 0; i < 5; i++) {
39     gotit = 0;
40     TRY {
41       exception_raising();
42     }
43     CATCH(e) {
44       gotit = 1;
45       xbt_assert(e.category == unknown_error,
46                  "Got wrong category: %d (instead of %d)", e.category,
47                  unknown_error);
48       xbt_assert(e.value == 42, "Got wrong value: %d (!=42)", e.value);
49       xbt_assert(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
50                  "Got wrong message: %s", e.msg);
51       xbt_ex_free(e);
52     }
53     if (!gotit) {
54       THROWF(unknown_error, 0, "Didn't got the remote exception!");
55     }
56   }
57 }
58
59 /* **********************************************************************
60  * Client code
61  * **********************************************************************/
62
63 static void client_create_sockets(xbt_socket_t *toserver,
64                                   xbt_socket_t *toforwarder,
65                                   const char *srv_host, int srv_port,
66                                   const char *fwd_host, int fwd_port)
67 {
68   TRY {
69     exception_catching();
70     *toserver = gras_socket_client(srv_host, srv_port);
71     *toforwarder = gras_socket_client(fwd_host, fwd_port);
72   }
73   CATCH_ANONYMOUS {
74     RETHROWF("Unable to connect to the server: %s");
75   }
76 }
77
78 int client(int argc, char *argv[])
79 {
80   xbt_ex_t e;
81   xbt_socket_t toserver = NULL;        /* peer */
82   xbt_socket_t toforwarder = NULL;     /* peer */
83
84   int ping, pong, i;
85   volatile int gotit = 0;
86
87
88   const char *host = "127.0.0.1";
89   int port = 4001;
90
91   memset(&e, 0, sizeof(xbt_ex_t));
92
93   /* 1. Init the GRAS's infrastructure */
94   gras_init(&argc, argv);
95
96   /* 2. Get the server's address. The command line override defaults when specified */
97   if (argc == 5) {
98     host = argv[1];
99     port = atoi(argv[2]);
100   }
101   XBT_INFO("Launch client (server on %s:%d)", host, port);
102
103   exception_catching();
104
105   /* 3. Wait for the server & forwarder startup */
106   gras_os_sleep(2);
107
108   /* 4. Create a socket to speak to the server */
109   client_create_sockets(&toserver, &toforwarder,
110                         host, port, argv[3], atoi(argv[4]));
111   XBT_INFO("Connected to %s:%d.", host, port);
112
113
114   /* 5. Register the messages.
115      See, it doesn't have to be done completely at the beginning,
116      but only before use */
117   exception_catching();
118   register_messages();
119
120   /* 6. Keep the user informed of what's going on */
121   XBT_INFO("Connected to server which is on %s:%d",
122         xbt_socket_peer_name(toserver), xbt_socket_peer_port(toserver));
123
124   /* 7. Prepare and send the ping message to the server */
125   ping = 1234;
126   TRY {
127     exception_catching();
128     gras_msg_rpccall(toserver, 6000.0, "plain ping", &ping, &pong);
129   }
130   CATCH_ANONYMOUS {
131     gras_socket_close(toserver);
132     RETHROWF("Failed to execute a PING rpc on the server: %s");
133   }
134   exception_catching();
135
136   /* 8. Keep the user informed of what's going on, again */
137   XBT_INFO("The answer to PING(%d) on %s:%d is PONG(%d)",
138         ping,
139         xbt_socket_peer_name(toserver), xbt_socket_peer_port(toserver),
140         pong);
141
142   /* 9. Call a RPC which raises an exception (to test exception propagation) */
143   XBT_INFO("Call the exception raising RPC");
144   TRY {
145     gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
146   }
147   CATCH(e) {
148     gotit = 1;
149     xbt_assert(e.category == unknown_error,
150                 "Got wrong category: %d (instead of %d)",
151                 e.category, unknown_error);
152     xbt_assert(e.value == 42, "Got wrong value: %d (!=42)", e.value);
153     xbt_assert(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
154                 "Got wrong message: %s", e.msg);
155     XBT_INFO
156         ("Got the expected exception when calling the exception raising RPC");
157     xbt_ex_free(e);
158   }
159
160   if (!gotit)
161     THROWF(unknown_error, 0, "Didn't got the remote exception!");
162
163   XBT_INFO("Called the exception raising RPC");
164   exception_catching();
165
166   /* doxygen_ignore */
167   for (i = 0; i < 5; i++) {
168
169     XBT_INFO("Call the exception raising RPC (i=%d)", i);
170     TRY {
171       gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
172     }
173     CATCH(e) {
174       gotit = 1;
175       xbt_ex_free(e);
176     }
177     if (!gotit) {
178       THROWF(unknown_error, 0, "Didn't got the remote exception!");
179     }
180   }
181   /* doxygen_resume */
182
183   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
184   for (i = 0; i < 5; i++) {
185     XBT_INFO("Call the exception raising RPC on the forwarder (i=%d)", i);
186     TRY {
187       gras_msg_rpccall(toforwarder, 6000.0, "forward exception", NULL,
188                        NULL);
189     }
190     CATCH(e) {
191       gotit = 1;
192       xbt_assert(e.value == 42, "Got wrong value: %d (!=42)", e.value);
193       xbt_assert(!strncmp(e.msg, exception_msg, strlen(exception_msg)),
194                  "Got wrong message: %s", e.msg);
195       xbt_assert(e.category == unknown_error,
196                  "Got wrong category: %d (instead of %d)",
197                  e.category, unknown_error);
198       XBT_INFO
199         ("Got the expected exception when calling the exception raising RPC");
200       xbt_ex_free(e);
201     }
202     if (!gotit) {
203       THROWF(unknown_error, 0, "Didn't got the remote exception!");
204     }
205     exception_catching();
206   }
207
208   XBT_INFO("Ask %s:%d to die", xbt_socket_peer_name(toforwarder),
209         xbt_socket_peer_port(toforwarder));
210   gras_msg_send(toforwarder, "kill", NULL);
211   XBT_INFO("Ask %s:%d to die", xbt_socket_peer_name(toserver),
212         xbt_socket_peer_port(toserver));
213   gras_msg_send(toserver, "kill", NULL);
214
215   /* 11. Cleanup the place before leaving */
216   gras_socket_close(toserver);
217   gras_socket_close(toforwarder);
218   XBT_INFO("Done.");
219   gras_exit();
220   return 0;
221 }                               /* end_of_client */
222
223
224 /* **********************************************************************
225  * Forwarder code
226  * **********************************************************************/
227 typedef struct {
228   xbt_socket_t server;
229   int done;
230 } s_forward_data_t, *forward_data_t;
231
232 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
233 {
234   forward_data_t fdata;
235   xbt_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
236   XBT_INFO("Asked to die by %s:%d", xbt_socket_peer_name(expeditor),
237         xbt_socket_peer_port(expeditor));
238   fdata = gras_userdata_get();
239   fdata->done = 1;
240   return 0;
241 }
242
243 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx,
244                                    void *payload_data)
245 {
246   forward_data_t fdata = gras_userdata_get();
247
248   XBT_INFO("Forward a request");
249   gras_msg_rpccall(fdata->server, 60, "raise exception", NULL, NULL);
250   return 0;
251 }
252
253 int forwarder(int argc, char *argv[])
254 {
255   xbt_socket_t mysock;
256   int port;
257   forward_data_t fdata;
258
259   gras_init(&argc, argv);
260
261   xbt_assert(argc == 4);
262
263   fdata = gras_userdata_new(s_forward_data_t);
264   fdata->done = 0;
265   port = atoi(argv[1]);
266
267   XBT_INFO("Launch forwarder (port=%d)", port);
268   mysock = gras_socket_server(port);
269
270   gras_os_sleep(1);             /* wait for the server to be ready */
271   fdata->server = gras_socket_client(argv[2], atoi(argv[3]));
272
273   register_messages();
274   gras_cb_register("forward exception", &forwarder_cb_forward_ex);
275   gras_cb_register("kill", &forwarder_cb_kill);
276
277   while (!fdata->done) {
278     gras_msg_handle(600.0);
279   }
280
281   gras_socket_close(mysock);
282   gras_socket_close(fdata->server);
283   free(fdata);
284   XBT_INFO("Done.");
285   gras_exit();
286   return 0;
287 }
288
289 /* **********************************************************************
290  * Server code
291  * **********************************************************************/
292 typedef struct {
293   xbt_socket_t server;
294   int done;
295 } s_server_data_t, *server_data_t;
296
297 static int server_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
298 {
299   xbt_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
300   server_data_t sdata;
301
302   XBT_INFO("Asked to die by %s:%d", xbt_socket_peer_name(expeditor),
303         xbt_socket_peer_port(expeditor));
304
305   sdata = gras_userdata_get();
306   sdata->done = 1;
307   return 0;
308 }
309
310 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx, void *payload_data)
311 {
312   exception_raising();
313   return 0;
314 }
315
316 static int server_cb_ping(gras_msg_cb_ctx_t ctx, void *payload_data)
317 {
318
319   /* 1. Get the payload into the msg variable, and retrieve who called us */
320   int msg = *(int *) payload_data;
321   xbt_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
322
323   /* 2. Log which client connected */
324   XBT_INFO("Got message PING(%d) from %s:%d",
325         msg,
326         xbt_socket_peer_name(expeditor),
327         xbt_socket_peer_port(expeditor));
328
329   /* 4. Change the value of the msg variable */
330   msg = 4321;
331
332   /* 5. Return as result */
333   gras_msg_rpcreturn(6000, ctx, &msg);
334   XBT_INFO("Answered with PONG(4321)");
335
336   /* 6. Cleanups, if any */
337
338   /* 7. Tell GRAS that we consummed this message */
339   return 0;
340 }                               /* end_of_server_cb_ping */
341
342
343 int server(int argc, char *argv[])
344 {
345   xbt_socket_t mysock;
346   server_data_t sdata;
347
348   int port = 4001;
349
350   /* 1. Init the GRAS infrastructure */
351   gras_init(&argc, argv);
352
353   /* 2. Get the port I should listen on from the command line, if specified */
354   if (argc == 2)
355     port = atoi(argv[1]);
356
357   sdata = gras_userdata_new(s_server_data_t);
358   sdata->done = 0;
359
360   XBT_INFO("Launch server (port=%d)", port);
361
362   /* 3. Create my master socket */
363   mysock = gras_socket_server(port);
364
365   /* 4. Register the known messages and register my callback */
366   register_messages();
367   gras_cb_register("plain ping", &server_cb_ping);
368   gras_cb_register("raise exception", &server_cb_raise_ex);
369   gras_cb_register("kill", &server_cb_kill);
370
371   XBT_INFO("Listening on port %d", xbt_socket_my_port(mysock));
372
373   /* 5. Wait for the ping incomming messages */
374
375   /** \bug if the server is gone before the forwarder tries to connect,
376      it dies awfully with the following message. The problem stands somewhere
377      at the interface between the xbt_socket_t and the msg mess. There is thus
378      no way for me to dive into this before this interface is rewritten
379 ==15875== Invalid read of size 4
380 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
381 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
382 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
383 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
384 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
385 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
386 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
387 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
388 ==15875==    by 0x42AA549: clone (clone.S:119)
389 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
390 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
391 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
392 ==15875==    by 0x4049386: gras_exit (gras.c:64)
393 ==15875==    by 0x804B936: server (rpc.c:345)
394 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
395 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
396 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
397 ==15875==    by 0x42AA549: clone (clone.S:119)
398   */
399   while (!sdata->done) {
400     gras_msg_handle(600.0);
401     exception_catching();
402   }
403
404   /* 8. Free the allocated resources, and shut GRAS down */
405   free(sdata);
406   gras_socket_close(mysock);
407   XBT_INFO("Done.");
408   gras_exit();
409
410   return 0;
411 }                               /* end_of_server */