Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
First wave of GRAS API breaking: gras_cb_register wants a message name (char*) as...
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* $Id$ */
2
3 /* rpc - demo of the RPC features in GRAS                                   */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras.h"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc,"Messages specific to this example");
13
14 int err=0; /* to make the message of the raised exception more informative and
15               even be able to follow their propagation from server to client*/
16
17 /* register messages which may be sent (common to client and server) */
18 static void register_messages(void) {
19   gras_msgtype_declare_rpc("plain ping",
20                            gras_datadesc_by_name("int"),
21                            gras_datadesc_by_name("int"));
22
23   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
24   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
25   gras_msgtype_declare("kill",NULL);
26 }
27
28 /* Function prototypes */
29 int server (int argc,char *argv[]);
30 int forwarder (int argc,char *argv[]);
31 int client (int argc,char *argv[]);
32
33 static void exception_raising(void) {
34   THROW1(unknown_error,42,"Some error we will catch on client side %d",err++);
35 }
36 static void exception_catching(void) {
37   int gotit = 0,i;
38   xbt_ex_t e;
39
40   for (i=0; i<5; i++) {
41     gotit=0;
42     TRY {
43       exception_raising();
44     } CATCH(e) {
45       gotit = 1;
46     }
47     if (!gotit) {
48       THROW0(unknown_error,0,"Didn't got the remote exception!");
49     }
50     xbt_assert2(e.category == unknown_error, "Got wrong category: %d (instead of %d)", 
51                 e.category,unknown_error);
52     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
53     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
54                          strlen("Some error we will catch on client side")), 
55                 "Got wrong message: %s", e.msg);
56     xbt_ex_free(e);
57   }
58 }
59
60 /* **********************************************************************
61  * Client code
62  * **********************************************************************/
63
64
65 int client(int argc,char *argv[]) {
66   xbt_ex_t e;
67   gras_socket_t toserver=NULL; /* peer */
68   gras_socket_t toforwarder=NULL; /* peer */
69
70   memset(&e,0,sizeof(xbt_ex_t));
71
72   int ping, pong, i;
73   volatile int gotit=0;
74
75   const char *host = "127.0.0.1";
76         int   port = 4000;
77
78   /* 1. Init the GRAS's infrastructure */
79   gras_init(&argc, argv);
80    
81   /* 2. Get the server's address. The command line override defaults when specified */
82   if (argc == 5) {
83     host=argv[1];
84     port=atoi(argv[2]);
85   } 
86   INFO2("Launch client (server on %s:%d)",host,port);
87
88   exception_catching();
89    
90   /* 3. Wait for the server & forwarder startup */
91   gras_os_sleep(2);
92    
93   /* 4. Create a socket to speak to the server */
94   TRY {
95     exception_catching();
96     toserver=gras_socket_client(host,port);
97     toforwarder=gras_socket_client(argv[3],atoi(argv[4]));
98   } CATCH(e) {
99     RETHROW0("Unable to connect to the server: %s");
100   }
101   INFO2("Connected to %s:%d.",host,port);    
102
103
104   /* 5. Register the messages. 
105         See, it doesn't have to be done completely at the beginning,
106         but only before use */
107   exception_catching();
108   register_messages();
109
110   /* 6. Keep the user informed of what's going on */
111   INFO2("Connected to server which is on %s:%d ", 
112         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
113
114   /* 7. Prepare and send the ping message to the server */
115   ping = 1234;
116   TRY {
117     exception_catching();
118     gras_msg_rpccall(toserver, 6000.0,
119                      gras_msgtype_by_name("plain ping"), &ping, &pong);
120   } CATCH(e) {
121     gras_socket_close(toserver);
122     RETHROW0("Failed to execute a PING rpc on the server: %s");
123   }
124   exception_catching();
125
126   /* 8. Keep the user informed of what's going on, again */
127   INFO4("The answer to PING(%d) on %s:%d is PONG(%d)  ", 
128         ping, 
129         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver),
130         pong);
131
132   /* 9. Call a RPC which raises an exception (to test exception propagation) */
133   INFO0("Call the exception raising RPC");
134   TRY {
135     gras_msg_rpccall(toserver, 6000.0,
136                      gras_msgtype_by_name("raise exception"), NULL, NULL);
137   } CATCH(e) {
138     gotit = 1; 
139     xbt_assert2(e.category == unknown_error, 
140                 "Got wrong category: %d (instead of %d)", 
141               e.category,unknown_error);
142     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
143     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
144                          strlen("Some error we will catch on client side")), 
145                 "Got wrong message: %s", e.msg);
146     INFO0("Got the expected exception when calling the exception raising RPC");
147     xbt_ex_free(e);
148   }
149
150   if (!gotit)
151     THROW0(unknown_error,0,"Didn't got the remote exception!");
152
153   INFO0("Called the exception raising RPC");
154   exception_catching();
155
156   /* doxygen_ignore */
157   for (i=0; i<5; i++) {
158         
159      INFO1("Call the exception raising RPC (i=%d)",i);
160      TRY {
161         gras_msg_rpccall(toserver, 6000.0,
162                          gras_msgtype_by_name("raise exception"), NULL, NULL);
163      } CATCH(e) {
164         gotit = 1;
165         xbt_ex_free(e);
166      }
167      if (!gotit) {
168         THROW0(unknown_error,0,"Didn't got the remote exception!");
169      }
170   }
171   /* doxygen_resume */
172   
173   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
174   for (i=0;i<5;i++) {
175     INFO1("Call the exception raising RPC on the forwarder (i=%d)",i);
176     TRY {
177       gras_msg_rpccall(toforwarder, 6000.0,
178                        gras_msgtype_by_name("forward exception"), NULL, NULL);
179     } CATCH(e) {
180       gotit = 1;
181     }
182     if (!gotit) {
183       THROW0(unknown_error,0,"Didn't got the remote exception!");
184     } 
185     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
186     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
187                          strlen("Some error we will catch on client side")), 
188                 "Got wrong message: %s", e.msg);
189     xbt_assert2(e.category == unknown_error, 
190                 "Got wrong category: %d (instead of %d)", 
191                 e.category,unknown_error);
192     INFO0("Got the expected exception when calling the exception raising RPC");
193     xbt_ex_free(e);
194     exception_catching();
195   }
196
197   INFO2("Ask %s:%d to die",gras_socket_peer_name(toforwarder),gras_socket_peer_port(toforwarder));
198   gras_msg_send(toforwarder,gras_msgtype_by_name("kill"),NULL);
199   INFO2("Ask %s:%d to die",gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
200   gras_msg_send(toserver,gras_msgtype_by_name("kill"),NULL);
201
202   /* 11. Cleanup the place before leaving */
203   gras_socket_close(toserver);
204   gras_socket_close(toforwarder);
205   gras_exit();
206   INFO0("Done.");
207   return 0;
208 } /* end_of_client */
209
210
211 /* **********************************************************************
212  * Forwarder code
213  * **********************************************************************/
214 typedef struct {
215   gras_socket_t server;
216   int done;
217 } s_forward_data_t, *forward_data_t;
218
219 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx,
220                              void             *payload_data) {
221   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
222   INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
223   forward_data_t fdata=gras_userdata_get();
224   fdata->done = 1;
225   return 1;
226 }
227
228 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx,
229                                    void             *payload_data) {
230   forward_data_t fdata=gras_userdata_get();
231
232   INFO0("Forward a request");
233   gras_msg_rpccall(fdata->server, 60,
234                    gras_msgtype_by_name("raise exception"),NULL,NULL);
235   return 1;
236 }
237
238 int forwarder (int argc,char *argv[]) {
239   gras_socket_t mysock;
240   int port;
241   forward_data_t fdata;
242
243   gras_init(&argc,argv);
244
245   xbt_assert(argc == 4);
246
247   fdata=gras_userdata_new(s_forward_data_t);
248   fdata->done = 0;
249   port=atoi(argv[1]);
250
251   INFO1("Launch forwarder (port=%d)", port);
252   mysock = gras_socket_server(port);
253
254   gras_os_sleep(1); /* wait for the server to be ready */
255   fdata->server=gras_socket_client(argv[2],atoi(argv[3]));
256
257   register_messages();
258   gras_cb_register("forward exception", &forwarder_cb_forward_ex);
259   gras_cb_register("kill",              &forwarder_cb_kill);
260
261   while (!fdata->done) {
262     gras_msg_handle(600.0);
263   }
264
265   gras_socket_close(mysock);
266   gras_socket_close(fdata->server);
267   free(fdata);
268   gras_exit();
269   INFO0("Done.");
270   return 0;
271 }
272
273 /* **********************************************************************
274  * Server code
275  * **********************************************************************/
276 typedef struct {
277   gras_socket_t server;
278   int done;
279 } s_server_data_t, *server_data_t;
280
281 static int server_cb_kill(gras_msg_cb_ctx_t ctx,
282                           void             *payload_data) {
283   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
284   INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
285
286   server_data_t sdata=gras_userdata_get();
287   sdata->done = 1;
288   return 1;
289 }
290
291 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx,
292                               void             *payload_data) {
293   exception_raising();
294   return 1;
295 }
296
297 static int server_cb_ping(gras_msg_cb_ctx_t ctx,
298                           void             *payload_data) {
299                              
300   /* 1. Get the payload into the msg variable, and retrieve who called us */
301   int msg=*(int*)payload_data;
302   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
303    
304   /* 2. Log which client connected */
305   INFO3("Got message PING(%d) from %s:%d ", 
306         msg, 
307         gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
308   
309   /* 4. Change the value of the msg variable */
310   msg = 4321;
311
312   /* 5. Return as result */
313   gras_msg_rpcreturn(6000,ctx,&msg);
314   INFO0("Answered with PONG(4321)");
315      
316   /* 6. Cleanups, if any */
317
318   /* 7. Tell GRAS that we consummed this message */
319   return 1;
320 } /* end_of_server_cb_ping */
321
322
323 int server (int argc,char *argv[]) {
324   gras_socket_t mysock;
325   server_data_t sdata;
326
327   int port = 4000;
328   
329   /* 1. Init the GRAS infrastructure */
330   gras_init(&argc,argv);
331
332   /* 2. Get the port I should listen on from the command line, if specified */
333   if (argc == 2) 
334     port=atoi(argv[1]);
335
336   sdata=gras_userdata_new(s_server_data_t);
337   sdata->done = 0;
338
339   INFO1("Launch server (port=%d)", port);
340
341   /* 3. Create my master socket */
342   mysock = gras_socket_server(port);
343
344   /* 4. Register the known messages and register my callback */
345   register_messages();
346   gras_cb_register("plain ping",&server_cb_ping);
347   gras_cb_register("raise exception",&server_cb_raise_ex);
348   gras_cb_register("kill",&server_cb_kill);
349
350   INFO1("Listening on port %d ", gras_socket_my_port(mysock));
351
352   /* 5. Wait for the ping incomming messages */
353   
354   /** \bug if the server is gone before the forwarder tries to connect, 
355      it dies awfully with the following message. The problem stands somewhere
356      at the interface between the gras_socket_t and the msg mess. There is thus
357      no way for me to dive into this before this interface is rewritten 
358 ==15875== Invalid read of size 4
359 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
360 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
361 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
362 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
363 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
364 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
365 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
366 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
367 ==15875==    by 0x42AA549: clone (clone.S:119)
368 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
369 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
370 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
371 ==15875==    by 0x4049386: gras_exit (gras.c:64)
372 ==15875==    by 0x804B936: server (rpc.c:345)
373 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
374 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
375 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
376 ==15875==    by 0x42AA549: clone (clone.S:119)
377   */
378   while (!sdata->done) {
379     gras_msg_handle(600.0); 
380     exception_catching();
381   }
382    
383   /* 8. Free the allocated resources, and shut GRAS down */
384   free(sdata);
385   gras_socket_close(mysock);
386   gras_exit();
387    
388   INFO0("Done.");
389   return 0;
390 } /* end_of_server */
391