Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Commit the autogenerated files
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* $Id$ */
2
3 /* rpc - demo of the RPC features in GRAS                                   */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras.h"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc,"Messages specific to this example");
13
14 int err=0; /* to make the message of the raised exception more informative and
15               even be able to follow their propagation from server to client*/
16
17 /* register messages which may be sent (common to client and server) */
18 static void register_messages(void) {
19   gras_msgtype_declare_rpc("plain ping",
20                            gras_datadesc_by_name("int"),
21                            gras_datadesc_by_name("int"));
22
23   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
24   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
25   gras_msgtype_declare("kill",NULL);
26 }
27
28 /* Function prototypes */
29 int server (int argc,char *argv[]);
30 int forwarder (int argc,char *argv[]);
31 int client (int argc,char *argv[]);
32
33 static void exception_raising(void) {
34   THROW1(unknown_error,42,"Some error we will catch on client side %d",err++);
35 }
36 static void exception_catching(void) {
37   int gotit = 0,i;
38   xbt_ex_t e;
39
40   for (i=0; i<5; i++) {
41     gotit=0;
42     TRY {
43       exception_raising();
44     } CATCH(e) {
45       gotit = 1;
46     }
47     if (!gotit) {
48       THROW0(unknown_error,0,"Didn't got the remote exception!");
49     }
50     xbt_assert2(e.category == unknown_error, "Got wrong category: %d (instead of %d)", 
51                 e.category,unknown_error);
52     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
53     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
54                          strlen("Some error we will catch on client side")), 
55                 "Got wrong message: %s", e.msg);
56     xbt_ex_free(e);
57   }
58 }
59
60 /* **********************************************************************
61  * Client code
62  * **********************************************************************/
63
64
65 int client(int argc,char *argv[]) {
66   xbt_ex_t e;
67   gras_socket_t toserver=NULL; /* peer */
68   gras_socket_t toforwarder=NULL; /* peer */
69
70   memset(&e,0,sizeof(xbt_ex_t));
71
72   int ping, pong, i;
73   volatile int gotit=0;
74
75   const char *host = "127.0.0.1";
76         int   port = 4000;
77
78   /* 1. Init the GRAS's infrastructure */
79   gras_init(&argc, argv);
80    
81   /* 2. Get the server's address. The command line override defaults when specified */
82   if (argc == 5) {
83     host=argv[1];
84     port=atoi(argv[2]);
85   } 
86   INFO2("Launch client (server on %s:%d)",host,port);
87
88   exception_catching();
89    
90   /* 3. Wait for the server & forwarder startup */
91   gras_os_sleep(2);
92    
93   /* 4. Create a socket to speak to the server */
94   TRY {
95     exception_catching();
96     toserver=gras_socket_client(host,port);
97     toforwarder=gras_socket_client(argv[3],atoi(argv[4]));
98   } CATCH(e) {
99     RETHROW0("Unable to connect to the server: %s");
100   }
101   INFO2("Connected to %s:%d.",host,port);    
102
103
104   /* 5. Register the messages. 
105         See, it doesn't have to be done completely at the beginning,
106         but only before use */
107   exception_catching();
108   register_messages();
109
110   /* 6. Keep the user informed of what's going on */
111   INFO2("Connected to server which is on %s:%d ", 
112         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
113
114   /* 7. Prepare and send the ping message to the server */
115   ping = 1234;
116   TRY {
117     exception_catching();
118     gras_msg_rpccall(toserver, 6000.0, "plain ping", &ping, &pong);
119   } CATCH(e) {
120     gras_socket_close(toserver);
121     RETHROW0("Failed to execute a PING rpc on the server: %s");
122   }
123   exception_catching();
124
125   /* 8. Keep the user informed of what's going on, again */
126   INFO4("The answer to PING(%d) on %s:%d is PONG(%d)  ", 
127         ping, 
128         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver),
129         pong);
130
131   /* 9. Call a RPC which raises an exception (to test exception propagation) */
132   INFO0("Call the exception raising RPC");
133   TRY {
134     gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
135   } CATCH(e) {
136     gotit = 1; 
137     xbt_assert2(e.category == unknown_error, 
138                 "Got wrong category: %d (instead of %d)", 
139               e.category,unknown_error);
140     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
141     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
142                          strlen("Some error we will catch on client side")), 
143                 "Got wrong message: %s", e.msg);
144     INFO0("Got the expected exception when calling the exception raising RPC");
145     xbt_ex_free(e);
146   }
147
148   if (!gotit)
149     THROW0(unknown_error,0,"Didn't got the remote exception!");
150
151   INFO0("Called the exception raising RPC");
152   exception_catching();
153
154   /* doxygen_ignore */
155   for (i=0; i<5; i++) {
156         
157      INFO1("Call the exception raising RPC (i=%d)",i);
158      TRY {
159         gras_msg_rpccall(toserver, 6000.0, "raise exception", NULL, NULL);
160      } CATCH(e) {
161         gotit = 1;
162         xbt_ex_free(e);
163      }
164      if (!gotit) {
165         THROW0(unknown_error,0,"Didn't got the remote exception!");
166      }
167   }
168   /* doxygen_resume */
169   
170   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
171   for (i=0;i<5;i++) {
172     INFO1("Call the exception raising RPC on the forwarder (i=%d)",i);
173     TRY {
174       gras_msg_rpccall(toforwarder, 6000.0, "forward exception", NULL, NULL);
175     } CATCH(e) {
176       gotit = 1;
177     }
178     if (!gotit) {
179       THROW0(unknown_error,0,"Didn't got the remote exception!");
180     } 
181     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
182     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
183                          strlen("Some error we will catch on client side")), 
184                 "Got wrong message: %s", e.msg);
185     xbt_assert2(e.category == unknown_error, 
186                 "Got wrong category: %d (instead of %d)", 
187                 e.category,unknown_error);
188     INFO0("Got the expected exception when calling the exception raising RPC");
189     xbt_ex_free(e);
190     exception_catching();
191   }
192
193   INFO2("Ask %s:%d to die",gras_socket_peer_name(toforwarder),gras_socket_peer_port(toforwarder));
194   gras_msg_send(toforwarder,"kill",NULL);
195   INFO2("Ask %s:%d to die",gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
196   gras_msg_send(toserver,"kill",NULL);
197
198   /* 11. Cleanup the place before leaving */
199   gras_socket_close(toserver);
200   gras_socket_close(toforwarder);
201   gras_exit();
202   INFO0("Done.");
203   return 0;
204 } /* end_of_client */
205
206
207 /* **********************************************************************
208  * Forwarder code
209  * **********************************************************************/
210 typedef struct {
211   gras_socket_t server;
212   int done;
213 } s_forward_data_t, *forward_data_t;
214
215 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx,
216                              void             *payload_data) {
217   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
218   INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
219   forward_data_t fdata=gras_userdata_get();
220   fdata->done = 1;
221   return 0;
222 }
223
224 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx,
225                                    void             *payload_data) {
226   forward_data_t fdata=gras_userdata_get();
227
228   INFO0("Forward a request");
229   gras_msg_rpccall(fdata->server, 60, "raise exception",NULL,NULL);
230   return 0;
231 }
232
233 int forwarder (int argc,char *argv[]) {
234   gras_socket_t mysock;
235   int port;
236   forward_data_t fdata;
237
238   gras_init(&argc,argv);
239
240   xbt_assert(argc == 4);
241
242   fdata=gras_userdata_new(s_forward_data_t);
243   fdata->done = 0;
244   port=atoi(argv[1]);
245
246   INFO1("Launch forwarder (port=%d)", port);
247   mysock = gras_socket_server(port);
248
249   gras_os_sleep(1); /* wait for the server to be ready */
250   fdata->server=gras_socket_client(argv[2],atoi(argv[3]));
251
252   register_messages();
253   gras_cb_register("forward exception", &forwarder_cb_forward_ex);
254   gras_cb_register("kill",              &forwarder_cb_kill);
255
256   while (!fdata->done) {
257     gras_msg_handle(600.0);
258   }
259
260   gras_socket_close(mysock);
261   gras_socket_close(fdata->server);
262   free(fdata);
263   gras_exit();
264   INFO0("Done.");
265   return 0;
266 }
267
268 /* **********************************************************************
269  * Server code
270  * **********************************************************************/
271 typedef struct {
272   gras_socket_t server;
273   int done;
274 } s_server_data_t, *server_data_t;
275
276 static int server_cb_kill(gras_msg_cb_ctx_t ctx,
277                           void             *payload_data) {
278   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
279   INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
280
281   server_data_t sdata=gras_userdata_get();
282   sdata->done = 1;
283   return 0;
284 }
285
286 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx,
287                               void             *payload_data) {
288   exception_raising();
289   return 0;
290 }
291
292 static int server_cb_ping(gras_msg_cb_ctx_t ctx,
293                           void             *payload_data) {
294                              
295   /* 1. Get the payload into the msg variable, and retrieve who called us */
296   int msg=*(int*)payload_data;
297   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
298    
299   /* 2. Log which client connected */
300   INFO3("Got message PING(%d) from %s:%d ", 
301         msg, 
302         gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
303   
304   /* 4. Change the value of the msg variable */
305   msg = 4321;
306
307   /* 5. Return as result */
308   gras_msg_rpcreturn(6000,ctx,&msg);
309   INFO0("Answered with PONG(4321)");
310      
311   /* 6. Cleanups, if any */
312
313   /* 7. Tell GRAS that we consummed this message */
314   return 0;
315 } /* end_of_server_cb_ping */
316
317
318 int server (int argc,char *argv[]) {
319   gras_socket_t mysock;
320   server_data_t sdata;
321
322   int port = 4000;
323   
324   /* 1. Init the GRAS infrastructure */
325   gras_init(&argc,argv);
326
327   /* 2. Get the port I should listen on from the command line, if specified */
328   if (argc == 2) 
329     port=atoi(argv[1]);
330
331   sdata=gras_userdata_new(s_server_data_t);
332   sdata->done = 0;
333
334   INFO1("Launch server (port=%d)", port);
335
336   /* 3. Create my master socket */
337   mysock = gras_socket_server(port);
338
339   /* 4. Register the known messages and register my callback */
340   register_messages();
341   gras_cb_register("plain ping",&server_cb_ping);
342   gras_cb_register("raise exception",&server_cb_raise_ex);
343   gras_cb_register("kill",&server_cb_kill);
344
345   INFO1("Listening on port %d ", gras_socket_my_port(mysock));
346
347   /* 5. Wait for the ping incomming messages */
348   
349   /** \bug if the server is gone before the forwarder tries to connect, 
350      it dies awfully with the following message. The problem stands somewhere
351      at the interface between the gras_socket_t and the msg mess. There is thus
352      no way for me to dive into this before this interface is rewritten 
353 ==15875== Invalid read of size 4
354 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
355 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
356 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
357 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
358 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
359 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
360 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
361 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
362 ==15875==    by 0x42AA549: clone (clone.S:119)
363 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
364 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
365 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
366 ==15875==    by 0x4049386: gras_exit (gras.c:64)
367 ==15875==    by 0x804B936: server (rpc.c:345)
368 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
369 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
370 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
371 ==15875==    by 0x42AA549: clone (clone.S:119)
372   */
373   while (!sdata->done) {
374     gras_msg_handle(600.0); 
375     exception_catching();
376   }
377    
378   /* 8. Free the allocated resources, and shut GRAS down */
379   free(sdata);
380   gras_socket_close(mysock);
381   gras_exit();
382    
383   INFO0("Done.");
384   return 0;
385 } /* end_of_server */
386