Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Major refactoring to make the test stricter and more robust
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* $Id$ */
2
3 /* rpc - demo of the RPC features in GRAS                                   */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras.h"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc,"Messages specific to this example");
13
14 int err=0; /* to make the message of the raised exception more informative and
15               even be able to follow their propagation from server to client*/
16
17 /* register messages which may be sent (common to client and server) */
18 static void register_messages(void) {
19   gras_msgtype_declare_rpc("plain ping",
20                            gras_datadesc_by_name("int"),
21                            gras_datadesc_by_name("int"));
22
23   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
24   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
25   gras_msgtype_declare("kill",NULL);
26 }
27
28 /* Function prototypes */
29 int server (int argc,char *argv[]);
30 int forwarder (int argc,char *argv[]);
31 int client (int argc,char *argv[]);
32
33 static void exception_raising(void) {
34   THROW1(unknown_error,42,"Some error we will catch on client side %d",err++);
35 }
36 static void exception_catching(void) {
37   int gotit = 0,i;
38   xbt_ex_t e;
39
40   for (i=0; i<5; i++) {
41     gotit=0;
42     TRY {
43       exception_raising();
44     } CATCH(e) {
45       gotit = 1;
46     }
47     if (!gotit) {
48       THROW0(unknown_error,0,"Didn't got the remote exception!");
49     }
50     xbt_assert2(e.category == unknown_error, "Got wrong category: %d (instead of %d)", 
51                 e.category,unknown_error);
52     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
53     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
54                          strlen("Some error we will catch on client side")), 
55                 "Got wrong message: %s", e.msg);
56     xbt_ex_free(&e);
57   }
58 }
59
60 /* **********************************************************************
61  * Client code
62  * **********************************************************************/
63
64
65 int client(int argc,char *argv[]) {
66   xbt_ex_t e; 
67   gras_socket_t toserver=NULL; /* peer */
68   gras_socket_t toforwarder=NULL; /* peer */
69
70   
71
72   int ping, pong, i;
73   volatile int gotit=0;
74
75   const char *host = "127.0.0.1";
76         int   port = 4000;
77
78   /* 1. Init the GRAS's infrastructure */
79   gras_init(&argc, argv);
80    
81   /* 2. Get the server's address. The command line override defaults when specified */
82   if (argc == 5) {
83     host=argv[1];
84     port=atoi(argv[2]);
85   } 
86   INFO2("Launch client (server on %s:%d)",host,port);
87
88   exception_catching();
89    
90   /* 3. Wait for the server & forwarder startup */
91   gras_os_sleep(2);
92    
93   /* 4. Create a socket to speak to the server */
94   TRY {
95     exception_catching();
96     toserver=gras_socket_client(host,port);
97     toforwarder=gras_socket_client(argv[3],atoi(argv[4]));
98   } CATCH(e) {
99     RETHROW0("Unable to connect to the server: %s");
100   }
101   INFO2("Connected to %s:%d.",host,port);    
102
103
104   /* 5. Register the messages. 
105         See, it doesn't have to be done completely at the beginning,
106         but only before use */
107   exception_catching();
108   register_messages();
109
110   /* 6. Keep the user informed of what's going on */
111   INFO2("Connected to server which is on %s:%d ", 
112         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
113
114   /* 7. Prepare and send the ping message to the server */
115   ping = 1234;
116   TRY {
117     exception_catching();
118     gras_msg_rpccall(toserver, 6000.0,
119                      gras_msgtype_by_name("plain ping"), &ping, &pong);
120   } CATCH(e) {
121     gras_socket_close(toserver);
122     RETHROW0("Failed to execute a PING rpc on the server: %s");
123   }
124   exception_catching();
125
126   /* 8. Keep the user informed of what's going on, again */
127   INFO4("The answer to PING(%d) on %s:%d is PONG(%d)  ", 
128         ping, 
129         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver),
130         pong);
131
132   /* 9. Call a RPC which raises an exception (to test exception propagation) */
133   INFO0("Call the exception raising RPC");
134   TRY {
135     gras_msg_rpccall(toserver, 6000.0,
136                      gras_msgtype_by_name("raise exception"), NULL, NULL);
137   } CATCH(e) {
138     gotit = 1; 
139     xbt_assert2(e.category == unknown_error, 
140                 "Got wrong category: %d (instead of %d)", 
141               e.category,unknown_error);
142     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
143     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
144                          strlen("Some error we will catch on client side")), 
145                 "Got wrong message: %s", e.msg);
146     INFO0("Got the expected exception when calling the exception raising RPC");
147     xbt_ex_free(&e);
148   }
149
150   if (!gotit)
151     THROW0(unknown_error,0,"Didn't got the remote exception!");
152
153   INFO0("Called the exception raising RPC");
154   exception_catching();
155
156   /* doxygen_ignore */
157   for (i=0; i<5; i++) {
158         
159      INFO1("Call the exception raising RPC (i=%d)",i);
160      TRY {
161         gras_msg_rpccall(toserver, 6000.0,
162                          gras_msgtype_by_name("raise exception"), NULL, NULL);
163      } CATCH(e) {
164         gotit = 1;
165         xbt_ex_free(&e);
166      }
167      if (!gotit) {
168         THROW0(unknown_error,0,"Didn't got the remote exception!");
169      }
170   }
171   /* doxygen_resume */
172   
173   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
174   for (i=0;i<5;i++) {
175     INFO1("Call the exception raising RPC on the forwarder (i=%d)",i);
176     TRY {
177       gras_msg_rpccall(toforwarder, 6000.0,
178                        gras_msgtype_by_name("forward exception"), NULL, NULL);
179     } CATCH(e) {
180       gotit = 1;
181     }
182     if (!gotit) {
183       THROW0(unknown_error,0,"Didn't got the remote exception!");
184     } 
185     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
186     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
187                          strlen("Some error we will catch on client side")), 
188                 "Got wrong message: %s", e.msg);
189     xbt_assert2(e.category == unknown_error, 
190                 "Got wrong category: %d (instead of %d)", 
191                 e.category,unknown_error);
192     INFO0("Got the expected exception when calling the exception raising RPC");
193     xbt_ex_free(&e);
194     exception_catching();
195   }
196   
197   gras_msg_send(toserver,gras_msgtype_by_name("kill"),NULL);
198   gras_msg_send(toforwarder,gras_msgtype_by_name("kill"),NULL);
199
200   /* 11. Cleanup the place before leaving */
201   gras_socket_close(toserver);
202   gras_socket_close(toforwarder);
203   gras_exit();
204   INFO0("Done.");
205   return 0;
206 } /* end_of_client */
207
208
209 /* **********************************************************************
210  * Forwarder code
211  * **********************************************************************/
212 typedef struct {
213   gras_socket_t server;
214   int done;
215 } s_forward_data_t, *forward_data_t;
216
217 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx,
218                              void             *payload_data) {
219   forward_data_t fdata=gras_userdata_get();
220   fdata->done = 1;
221   return 1;
222 }
223
224 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx,
225                                    void             *payload_data) {
226   forward_data_t fdata=gras_userdata_get();
227
228   gras_msg_rpccall(fdata->server, 60,
229                    gras_msgtype_by_name("raise exception"),NULL,NULL);
230   return 1;
231 }
232
233 int forwarder (int argc,char *argv[]) {
234   gras_socket_t mysock;
235   int port;
236   forward_data_t fdata;
237
238   gras_init(&argc,argv);
239
240   xbt_assert(argc == 4);
241
242   fdata=gras_userdata_new(s_forward_data_t);
243   fdata->done = 0;
244   port=atoi(argv[1]);
245
246   INFO1("Launch forwarder (port=%d)", port);
247   mysock = gras_socket_server(port);
248
249   gras_os_sleep(1); /* wait for the server to be ready */
250   fdata->server=gras_socket_client(argv[2],atoi(argv[3]));
251
252   register_messages();
253   gras_cb_register(gras_msgtype_by_name("forward exception"),
254                    &forwarder_cb_forward_ex);
255   gras_cb_register(gras_msgtype_by_name("kill"),
256                    &forwarder_cb_kill);
257
258   while (!fdata->done) {
259     gras_msg_handle(600.0);
260   }
261
262   gras_socket_close(mysock);
263   gras_socket_close(fdata->server);
264   free(fdata);
265   gras_exit();
266   INFO0("Done.");
267   return 0;
268 }
269
270 /* **********************************************************************
271  * Server code
272  * **********************************************************************/
273 typedef struct {
274   gras_socket_t server;
275   int done;
276 } s_server_data_t, *server_data_t;
277
278 static int server_cb_kill(gras_msg_cb_ctx_t ctx,
279                           void             *payload_data) {
280   server_data_t sdata=gras_userdata_get();
281   sdata->done = 1;
282   return 1;
283 }
284
285 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx,
286                               void             *payload_data) {
287   exception_raising();
288   return 1;
289 }
290
291 static int server_cb_ping(gras_msg_cb_ctx_t ctx,
292                           void             *payload_data) {
293                              
294   /* 1. Get the payload into the msg variable, and retrieve who called us */
295   int msg=*(int*)payload_data;
296   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
297    
298   /* 2. Log which client connected */
299   INFO3("Got message PING(%d) from %s:%d ", 
300         msg, 
301         gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
302   
303   /* 4. Change the value of the msg variable */
304   msg = 4321;
305
306   /* 5. Return as result */
307   gras_msg_rpcreturn(6000,ctx,&msg);
308   INFO0("Answered with PONG(4321)");
309      
310   /* 6. Cleanups, if any */
311
312   /* 7. Tell GRAS that we consummed this message */
313   return 1;
314 } /* end_of_server_cb_ping */
315
316
317 int server (int argc,char *argv[]) {
318   gras_socket_t mysock;
319   server_data_t sdata;
320
321   int port = 4000;
322   
323   /* 1. Init the GRAS infrastructure */
324   gras_init(&argc,argv);
325
326   /* 2. Get the port I should listen on from the command line, if specified */
327   if (argc == 2) 
328     port=atoi(argv[1]);
329
330   sdata=gras_userdata_new(s_server_data_t);
331   sdata->done = 0;
332
333   INFO1("Launch server (port=%d)", port);
334
335   /* 3. Create my master socket */
336   mysock = gras_socket_server(port);
337
338   /* 4. Register the known messages and register my callback */
339   register_messages();
340   gras_cb_register(gras_msgtype_by_name("plain ping"),&server_cb_ping);
341   gras_cb_register(gras_msgtype_by_name("raise exception"),&server_cb_raise_ex);
342   gras_cb_register(gras_msgtype_by_name("kill"),&server_cb_kill);
343
344   INFO1("Listening on port %d ", gras_socket_my_port(mysock));
345
346   /* 5. Wait for the ping incomming messages */
347   
348   /* BUG: if the server is gone before the forwarder tries to connect, 
349      it dies awfully with :
350 ==15875== Invalid read of size 4
351 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
352 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
353 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
354 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
355 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
356 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
357 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
358 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
359 ==15875==    by 0x42AA549: clone (clone.S:119)
360 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
361 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
362 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
363 ==15875==    by 0x4049386: gras_exit (gras.c:64)
364 ==15875==    by 0x804B936: server (rpc.c:345)
365 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
366 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
367 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
368 ==15875==    by 0x42AA549: clone (clone.S:119)
369   */
370   while (!sdata->done) {
371     gras_msg_handle(600.0); 
372     exception_catching();
373   }
374    
375   /* 8. Free the allocated resources, and shut GRAS down */
376   free(sdata);
377   gras_socket_close(mysock);
378   gras_exit();
379    
380   INFO0("Done.");
381   return 0;
382 } /* end_of_server */
383