Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Disable broken test (yeah, that's bad)
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* $Id$ */
2
3 /* rpc - demo of the RPC features in GRAS                                   */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras.h"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc,"Messages specific to this example");
13
14 /* register messages which may be sent (common to client and server) */
15 static void register_messages(void) {
16   gras_msgtype_declare_rpc("plain ping",
17                            gras_datadesc_by_name("int"),
18                            gras_datadesc_by_name("int"));
19
20   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
21   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
22   gras_msgtype_declare("kill",NULL);
23 }
24
25 /* Function prototypes */
26 int server (int argc,char *argv[]);
27 int forwarder (int argc,char *argv[]);
28 int client (int argc,char *argv[]);
29
30 static void exception_raising(void) {
31   THROW0(unknown_error,42,"Some error we will catch on client side");
32 }
33 static void exception_catching(void) {
34   int gotit = 0,i;
35   xbt_ex_t e;
36
37   for (i=0; i<5; i++) {
38     gotit=0;
39     TRY {
40       exception_raising();
41     } CATCH(e) {
42       gotit = 1;
43     }
44     if (!gotit) {
45       THROW0(unknown_error,0,"Didn't got the remote exception!");
46     }
47     xbt_assert2(e.category == unknown_error, "Got wrong category: %d (instead of %d)", 
48                 e.category,unknown_error);
49     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
50     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
51                          strlen("Some error we will catch on client side")), 
52                 "Got wrong message: %s", e.msg);
53     xbt_ex_free(e);
54   }
55 }
56
57 /* **********************************************************************
58  * Client code
59  * **********************************************************************/
60
61
62 int client(int argc,char *argv[]) {
63   xbt_ex_t e;
64   gras_socket_t toserver=NULL; /* peer */
65   gras_socket_t toforwarder=NULL; /* peer */
66
67   int ping, pong, i;
68   volatile int gotit=0;
69
70
71   const char *host = "127.0.0.1";
72                 int   port = 4000;
73
74   memset(&e,0,sizeof(xbt_ex_t));
75
76   /* 1. Init the GRAS's infrastructure */
77   gras_init(&argc, argv);
78    
79   /* 2. Get the server's address. The command line override defaults when specified */
80   if (argc == 5) {
81     host=argv[1];
82     port=atoi(argv[2]);
83   } 
84   INFO2("Launch client (server on %s:%d)",host,port);
85
86   exception_catching();
87    
88   /* 3. Wait for the server & forwarder startup */
89   gras_os_sleep(2);
90    
91   /* 4. Create a socket to speak to the server */
92   TRY {
93     exception_catching();
94     toserver=gras_socket_client(host,port);
95     toforwarder=gras_socket_client(argv[3],atoi(argv[4]));
96   } CATCH(e) {
97     RETHROW0("Unable to connect to the server: %s");
98   }
99   INFO2("Connected to %s:%d.",host,port);    
100
101
102   /* 5. Register the messages. 
103         See, it doesn't have to be done completely at the beginning,
104         but only before use */
105   exception_catching();
106   register_messages();
107
108   /* 6. Keep the user informed of what's going on */
109   INFO2("Connected to server which is on %s:%d",
110         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
111
112   /* 7. Prepare and send the ping message to the server */
113   ping = 1234;
114   TRY {
115     exception_catching();
116     gras_msg_rpccall(toserver, 6000.0, "plain ping", &ping, &pong);
117   } CATCH(e) {
118     gras_socket_close(toserver);
119     RETHROW0("Failed to execute a PING rpc on the server: %s");
120   }
121   exception_catching();
122
123   /* 8. Keep the user informed of what's going on, again */
124   INFO4("The answer to PING(%d) on %s:%d is PONG(%d)",
125         ping, 
126         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver),
127         pong);
128
129   /* 9. Call a RPC which raises an exception (to test exception propagation) */
130   INFO0("Call the exception raising RPC");
131   TRY {
132     gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
133   } CATCH(e) {
134     gotit = 1; 
135     xbt_assert2(e.category == unknown_error, 
136                 "Got wrong category: %d (instead of %d)", 
137               e.category,unknown_error);
138     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
139     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
140                          strlen("Some error we will catch on client side")), 
141                 "Got wrong message: %s", e.msg);
142     INFO0("Got the expected exception when calling the exception raising RPC");
143     xbt_ex_free(e);
144   }
145
146   if (!gotit)
147     THROW0(unknown_error,0,"Didn't got the remote exception!");
148
149   INFO0("Called the exception raising RPC");
150   exception_catching();
151
152   /* doxygen_ignore */
153   for (i=0; i<5; i++) {
154         
155      INFO1("Call the exception raising RPC (i=%d)",i);
156      TRY {
157         gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
158      } CATCH(e) {
159         gotit = 1;
160         xbt_ex_free(e);
161      }
162      if (!gotit) {
163         THROW0(unknown_error,0,"Didn't got the remote exception!");
164      }
165   }
166   /* doxygen_resume */
167   
168   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
169   for (i=0;i<5;i++) {
170     INFO1("Call the exception raising RPC on the forwarder (i=%d)",i);
171     TRY {
172       gras_msg_rpccall(toforwarder, 6000.0, "forward exception", NULL, NULL);
173     } CATCH(e) {
174       gotit = 1;
175     }
176     if (!gotit) {
177       THROW0(unknown_error,0,"Didn't got the remote exception!");
178     } 
179     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
180     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
181                          strlen("Some error we will catch on client side")), 
182                 "Got wrong message: %s", e.msg);
183     xbt_assert2(e.category == unknown_error, 
184                 "Got wrong category: %d (instead of %d)", 
185                 e.category,unknown_error);
186     INFO0("Got the expected exception when calling the exception raising RPC");
187     xbt_ex_free(e);
188     exception_catching();
189   }
190
191   INFO2("Ask %s:%d to die",gras_socket_peer_name(toforwarder),gras_socket_peer_port(toforwarder));
192   gras_msg_send(toforwarder,"kill",NULL);
193   INFO2("Ask %s:%d to die",gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
194   gras_msg_send(toserver,"kill",NULL);
195
196   /* 11. Cleanup the place before leaving */
197   gras_socket_close(toserver);
198   gras_socket_close(toforwarder);
199   INFO0("Done.");
200   gras_exit();
201   return 0;
202 } /* end_of_client */
203
204
205 /* **********************************************************************
206  * Forwarder code
207  * **********************************************************************/
208 typedef struct {
209   gras_socket_t server;
210   int done;
211 } s_forward_data_t, *forward_data_t;
212
213 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx,
214                                  void             *payload_data) {
215   forward_data_t fdata;
216   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
217   INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
218   fdata=gras_userdata_get();
219   fdata->done = 1;
220   return 0;
221 }
222
223 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx,
224                                    void             *payload_data) {
225   forward_data_t fdata=gras_userdata_get();
226
227   INFO0("Forward a request");
228   gras_msg_rpccall(fdata->server, 60, "raise exception",NULL,NULL);
229   return 0;
230 }
231
232 int forwarder (int argc,char *argv[]) {
233   gras_socket_t mysock;
234   int port;
235   forward_data_t fdata;
236
237   gras_init(&argc,argv);
238
239   xbt_assert(argc == 4);
240
241   fdata=gras_userdata_new(s_forward_data_t);
242   fdata->done = 0;
243   port=atoi(argv[1]);
244
245   INFO1("Launch forwarder (port=%d)", port);
246   mysock = gras_socket_server(port);
247
248   gras_os_sleep(1); /* wait for the server to be ready */
249   fdata->server=gras_socket_client(argv[2],atoi(argv[3]));
250
251   register_messages();
252   gras_cb_register("forward exception", &forwarder_cb_forward_ex);
253   gras_cb_register("kill",              &forwarder_cb_kill);
254
255   while (!fdata->done) {
256     gras_msg_handle(600.0);
257   }
258
259   gras_socket_close(mysock);
260   gras_socket_close(fdata->server);
261   free(fdata);
262   INFO0("Done.");
263   gras_exit();
264   return 0;
265 }
266
267 /* **********************************************************************
268  * Server code
269  * **********************************************************************/
270 typedef struct {
271   gras_socket_t server;
272   int done;
273 } s_server_data_t, *server_data_t;
274
275 static int server_cb_kill(gras_msg_cb_ctx_t ctx,
276                           void             *payload_data) {
277   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
278   server_data_t sdata;
279
280   INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
281
282   sdata=gras_userdata_get();
283   sdata->done = 1;
284   return 0;
285 }
286
287 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx,
288                               void             *payload_data) {
289   exception_raising();
290   return 0;
291 }
292
293 static int server_cb_ping(gras_msg_cb_ctx_t ctx,
294                           void             *payload_data) {
295                              
296   /* 1. Get the payload into the msg variable, and retrieve who called us */
297   int msg=*(int*)payload_data;
298   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
299    
300   /* 2. Log which client connected */
301   INFO3("Got message PING(%d) from %s:%d",
302         msg, 
303         gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
304   
305   /* 4. Change the value of the msg variable */
306   msg = 4321;
307
308   /* 5. Return as result */
309   gras_msg_rpcreturn(6000,ctx,&msg);
310   INFO0("Answered with PONG(4321)");
311      
312   /* 6. Cleanups, if any */
313
314   /* 7. Tell GRAS that we consummed this message */
315   return 0;
316 } /* end_of_server_cb_ping */
317
318
319 int server (int argc,char *argv[]) {
320   gras_socket_t mysock;
321   server_data_t sdata;
322
323   int port = 4000;
324   
325   /* 1. Init the GRAS infrastructure */
326   gras_init(&argc,argv);
327
328   /* 2. Get the port I should listen on from the command line, if specified */
329   if (argc == 2) 
330     port=atoi(argv[1]);
331
332   sdata=gras_userdata_new(s_server_data_t);
333   sdata->done = 0;
334
335   INFO1("Launch server (port=%d)", port);
336
337   /* 3. Create my master socket */
338   mysock = gras_socket_server(port);
339
340   /* 4. Register the known messages and register my callback */
341   register_messages();
342   gras_cb_register("plain ping",&server_cb_ping);
343   gras_cb_register("raise exception",&server_cb_raise_ex);
344   gras_cb_register("kill",&server_cb_kill);
345
346   INFO1("Listening on port %d", gras_socket_my_port(mysock));
347
348   /* 5. Wait for the ping incomming messages */
349   
350   /** \bug if the server is gone before the forwarder tries to connect, 
351      it dies awfully with the following message. The problem stands somewhere
352      at the interface between the gras_socket_t and the msg mess. There is thus
353      no way for me to dive into this before this interface is rewritten 
354 ==15875== Invalid read of size 4
355 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
356 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
357 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
358 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
359 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
360 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
361 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
362 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
363 ==15875==    by 0x42AA549: clone (clone.S:119)
364 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
365 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
366 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
367 ==15875==    by 0x4049386: gras_exit (gras.c:64)
368 ==15875==    by 0x804B936: server (rpc.c:345)
369 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
370 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
371 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
372 ==15875==    by 0x42AA549: clone (clone.S:119)
373   */
374   while (!sdata->done) {
375     gras_msg_handle(600.0); 
376     exception_catching();
377   }
378    
379   /* 8. Free the allocated resources, and shut GRAS down */
380   free(sdata);
381   gras_socket_close(mysock);
382   INFO0("Done.");
383   gras_exit();
384    
385   return 0;
386 } /* end_of_server */
387