Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Added a memset in function client() to avoid and "uninitialized" warning.
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* $Id$ */
2
3 /* rpc - demo of the RPC features in GRAS                                   */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras.h"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc,"Messages specific to this example");
13
14 int err=0; /* to make the message of the raised exception more informative and
15               even be able to follow their propagation from server to client*/
16
17 /* register messages which may be sent (common to client and server) */
18 static void register_messages(void) {
19   gras_msgtype_declare_rpc("plain ping",
20                            gras_datadesc_by_name("int"),
21                            gras_datadesc_by_name("int"));
22
23   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
24   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
25   gras_msgtype_declare("kill",NULL);
26 }
27
28 /* Function prototypes */
29 int server (int argc,char *argv[]);
30 int forwarder (int argc,char *argv[]);
31 int client (int argc,char *argv[]);
32
33 static void exception_raising(void) {
34   THROW1(unknown_error,42,"Some error we will catch on client side %d",err++);
35 }
36 static void exception_catching(void) {
37   int gotit = 0,i;
38   xbt_ex_t e;
39
40   for (i=0; i<5; i++) {
41     gotit=0;
42     TRY {
43       exception_raising();
44     } CATCH(e) {
45       gotit = 1;
46     }
47     if (!gotit) {
48       THROW0(unknown_error,0,"Didn't got the remote exception!");
49     }
50     xbt_assert2(e.category == unknown_error, "Got wrong category: %d (instead of %d)", 
51                 e.category,unknown_error);
52     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
53     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
54                          strlen("Some error we will catch on client side")), 
55                 "Got wrong message: %s", e.msg);
56     xbt_ex_free(e);
57   }
58 }
59
60 /* **********************************************************************
61  * Client code
62  * **********************************************************************/
63
64
65 int client(int argc,char *argv[]) {
66   xbt_ex_t e;
67   gras_socket_t toserver=NULL; /* peer */
68   gras_socket_t toforwarder=NULL; /* peer */
69
70   memset(&e,0,sizeof(xbt_ex_t));
71
72   int ping, pong, i;
73   volatile int gotit=0;
74
75   const char *host = "127.0.0.1";
76         int   port = 4000;
77
78   /* 1. Init the GRAS's infrastructure */
79   gras_init(&argc, argv);
80    
81   /* 2. Get the server's address. The command line override defaults when specified */
82   if (argc == 5) {
83     host=argv[1];
84     port=atoi(argv[2]);
85   } 
86   INFO2("Launch client (server on %s:%d)",host,port);
87
88   exception_catching();
89    
90   /* 3. Wait for the server & forwarder startup */
91   gras_os_sleep(2);
92    
93   /* 4. Create a socket to speak to the server */
94   TRY {
95     exception_catching();
96     toserver=gras_socket_client(host,port);
97     toforwarder=gras_socket_client(argv[3],atoi(argv[4]));
98   } CATCH(e) {
99     RETHROW0("Unable to connect to the server: %s");
100   }
101   INFO2("Connected to %s:%d.",host,port);    
102
103
104   /* 5. Register the messages. 
105         See, it doesn't have to be done completely at the beginning,
106         but only before use */
107   exception_catching();
108   register_messages();
109
110   /* 6. Keep the user informed of what's going on */
111   INFO2("Connected to server which is on %s:%d ", 
112         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
113
114   /* 7. Prepare and send the ping message to the server */
115   ping = 1234;
116   TRY {
117     exception_catching();
118     gras_msg_rpccall(toserver, 6000.0,
119                      gras_msgtype_by_name("plain ping"), &ping, &pong);
120   } CATCH(e) {
121     gras_socket_close(toserver);
122     RETHROW0("Failed to execute a PING rpc on the server: %s");
123   }
124   exception_catching();
125
126   /* 8. Keep the user informed of what's going on, again */
127   INFO4("The answer to PING(%d) on %s:%d is PONG(%d)  ", 
128         ping, 
129         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver),
130         pong);
131
132   /* 9. Call a RPC which raises an exception (to test exception propagation) */
133   INFO0("Call the exception raising RPC");
134   TRY {
135     gras_msg_rpccall(toserver, 6000.0,
136                      gras_msgtype_by_name("raise exception"), NULL, NULL);
137   } CATCH(e) {
138     gotit = 1; 
139     xbt_assert2(e.category == unknown_error, 
140                 "Got wrong category: %d (instead of %d)", 
141               e.category,unknown_error);
142     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
143     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
144                          strlen("Some error we will catch on client side")), 
145                 "Got wrong message: %s", e.msg);
146     INFO0("Got the expected exception when calling the exception raising RPC");
147     xbt_ex_free(e);
148   }
149
150   if (!gotit)
151     THROW0(unknown_error,0,"Didn't got the remote exception!");
152
153   INFO0("Called the exception raising RPC");
154   exception_catching();
155
156   /* doxygen_ignore */
157   for (i=0; i<5; i++) {
158         
159      INFO1("Call the exception raising RPC (i=%d)",i);
160      TRY {
161         gras_msg_rpccall(toserver, 6000.0,
162                          gras_msgtype_by_name("raise exception"), NULL, NULL);
163      } CATCH(e) {
164         gotit = 1;
165         xbt_ex_free(e);
166      }
167      if (!gotit) {
168         THROW0(unknown_error,0,"Didn't got the remote exception!");
169      }
170   }
171   /* doxygen_resume */
172   
173   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
174   for (i=0;i<5;i++) {
175     INFO1("Call the exception raising RPC on the forwarder (i=%d)",i);
176     TRY {
177       gras_msg_rpccall(toforwarder, 6000.0,
178                        gras_msgtype_by_name("forward exception"), NULL, NULL);
179     } CATCH(e) {
180       gotit = 1;
181     }
182     if (!gotit) {
183       THROW0(unknown_error,0,"Didn't got the remote exception!");
184     } 
185     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
186     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
187                          strlen("Some error we will catch on client side")), 
188                 "Got wrong message: %s", e.msg);
189     xbt_assert2(e.category == unknown_error, 
190                 "Got wrong category: %d (instead of %d)", 
191                 e.category,unknown_error);
192     INFO0("Got the expected exception when calling the exception raising RPC");
193     xbt_ex_free(e);
194     exception_catching();
195   }
196   
197   gras_msg_send(toserver,gras_msgtype_by_name("kill"),NULL);
198   gras_msg_send(toforwarder,gras_msgtype_by_name("kill"),NULL);
199
200   /* 11. Cleanup the place before leaving */
201   gras_socket_close(toserver);
202   gras_socket_close(toforwarder);
203   gras_exit();
204   INFO0("Done.");
205   return 0;
206 } /* end_of_client */
207
208
209 /* **********************************************************************
210  * Forwarder code
211  * **********************************************************************/
212 typedef struct {
213   gras_socket_t server;
214   int done;
215 } s_forward_data_t, *forward_data_t;
216
217 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx,
218                              void             *payload_data) {
219   forward_data_t fdata=gras_userdata_get();
220   fdata->done = 1;
221   return 1;
222 }
223
224 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx,
225                                    void             *payload_data) {
226   forward_data_t fdata=gras_userdata_get();
227
228   gras_msg_rpccall(fdata->server, 60,
229                    gras_msgtype_by_name("raise exception"),NULL,NULL);
230   return 1;
231 }
232
233 int forwarder (int argc,char *argv[]) {
234   gras_socket_t mysock;
235   int port;
236   forward_data_t fdata;
237
238   gras_init(&argc,argv);
239
240   xbt_assert(argc == 4);
241
242   fdata=gras_userdata_new(s_forward_data_t);
243   fdata->done = 0;
244   port=atoi(argv[1]);
245
246   INFO1("Launch forwarder (port=%d)", port);
247   mysock = gras_socket_server(port);
248
249   gras_os_sleep(1); /* wait for the server to be ready */
250   fdata->server=gras_socket_client(argv[2],atoi(argv[3]));
251
252   register_messages();
253   gras_cb_register(gras_msgtype_by_name("forward exception"),
254                    &forwarder_cb_forward_ex);
255   gras_cb_register(gras_msgtype_by_name("kill"),
256                    &forwarder_cb_kill);
257
258   while (!fdata->done) {
259     gras_msg_handle(600.0);
260   }
261
262   gras_socket_close(mysock);
263   gras_socket_close(fdata->server);
264   free(fdata);
265   gras_exit();
266   INFO0("Done.");
267   return 0;
268 }
269
270 /* **********************************************************************
271  * Server code
272  * **********************************************************************/
273 typedef struct {
274   gras_socket_t server;
275   int done;
276 } s_server_data_t, *server_data_t;
277
278 static int server_cb_kill(gras_msg_cb_ctx_t ctx,
279                           void             *payload_data) {
280   server_data_t sdata=gras_userdata_get();
281   sdata->done = 1;
282   return 1;
283 }
284
285 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx,
286                               void             *payload_data) {
287   exception_raising();
288   return 1;
289 }
290
291 static int server_cb_ping(gras_msg_cb_ctx_t ctx,
292                           void             *payload_data) {
293                              
294   /* 1. Get the payload into the msg variable, and retrieve who called us */
295   int msg=*(int*)payload_data;
296   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
297    
298   /* 2. Log which client connected */
299   INFO3("Got message PING(%d) from %s:%d ", 
300         msg, 
301         gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
302   
303   /* 4. Change the value of the msg variable */
304   msg = 4321;
305
306   /* 5. Return as result */
307   gras_msg_rpcreturn(6000,ctx,&msg);
308   INFO0("Answered with PONG(4321)");
309      
310   /* 6. Cleanups, if any */
311
312   /* 7. Tell GRAS that we consummed this message */
313   return 1;
314 } /* end_of_server_cb_ping */
315
316
317 int server (int argc,char *argv[]) {
318   gras_socket_t mysock;
319   server_data_t sdata;
320
321   int port = 4000;
322   
323   /* 1. Init the GRAS infrastructure */
324   gras_init(&argc,argv);
325
326   /* 2. Get the port I should listen on from the command line, if specified */
327   if (argc == 2) 
328     port=atoi(argv[1]);
329
330   sdata=gras_userdata_new(s_server_data_t);
331   sdata->done = 0;
332
333   INFO1("Launch server (port=%d)", port);
334
335   /* 3. Create my master socket */
336   mysock = gras_socket_server(port);
337
338   /* 4. Register the known messages and register my callback */
339   register_messages();
340   gras_cb_register(gras_msgtype_by_name("plain ping"),&server_cb_ping);
341   gras_cb_register(gras_msgtype_by_name("raise exception"),&server_cb_raise_ex);
342   gras_cb_register(gras_msgtype_by_name("kill"),&server_cb_kill);
343
344   INFO1("Listening on port %d ", gras_socket_my_port(mysock));
345
346   /* 5. Wait for the ping incomming messages */
347   
348   /** \bug if the server is gone before the forwarder tries to connect, 
349      it dies awfully with the following message. The problem stands somewhere
350      at the interface between the gras_socket_t and the msg mess. There is thus
351      no way for me to dive into this before this interface is rewritten 
352 ==15875== Invalid read of size 4
353 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
354 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
355 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
356 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
357 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
358 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
359 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
360 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
361 ==15875==    by 0x42AA549: clone (clone.S:119)
362 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
363 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
364 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
365 ==15875==    by 0x4049386: gras_exit (gras.c:64)
366 ==15875==    by 0x804B936: server (rpc.c:345)
367 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
368 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
369 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
370 ==15875==    by 0x42AA549: clone (clone.S:119)
371   */
372   while (!sdata->done) {
373     gras_msg_handle(600.0); 
374     exception_catching();
375   }
376    
377   /* 8. Free the allocated resources, and shut GRAS down */
378   free(sdata);
379   gras_socket_close(mysock);
380   gras_exit();
381    
382   INFO0("Done.");
383   return 0;
384 } /* end_of_server */
385