Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
SIGURS1 is not defined on windows (but signal() and signal.h are both here)
[simgrid.git] / examples / gras / rpc / rpc.c
1 /* $Id$ */
2
3 /* rpc - demo of the RPC features in GRAS                                   */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras.h"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(Rpc,"Messages specific to this example");
13
14 int err=0; /* to make the message of the raised exception more informative and
15               even be able to follow their propagation from server to client*/
16
17 /* register messages which may be sent (common to client and server) */
18 static void register_messages(void) {
19   gras_msgtype_declare_rpc("plain ping",
20                            gras_datadesc_by_name("int"),
21                            gras_datadesc_by_name("int"));
22
23   gras_msgtype_declare_rpc("raise exception", NULL, NULL);
24   gras_msgtype_declare_rpc("forward exception", NULL, NULL);
25   gras_msgtype_declare("kill",NULL);
26 }
27
28 /* Function prototypes */
29 int server (int argc,char *argv[]);
30 int forwarder (int argc,char *argv[]);
31 int client (int argc,char *argv[]);
32
33 static void exception_raising(void) {
34   THROW1(unknown_error,42,"Some error we will catch on client side %d",err++);
35 }
36 static void exception_catching(void) {
37   int gotit = 0,i;
38   xbt_ex_t e;
39
40   for (i=0; i<5; i++) {
41     gotit=0;
42     TRY {
43       exception_raising();
44     } CATCH(e) {
45       gotit = 1;
46     }
47     if (!gotit) {
48       THROW0(unknown_error,0,"Didn't got the remote exception!");
49     }
50     xbt_assert2(e.category == unknown_error, "Got wrong category: %d (instead of %d)", 
51                 e.category,unknown_error);
52     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
53     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
54                          strlen("Some error we will catch on client side")), 
55                 "Got wrong message: %s", e.msg);
56     xbt_ex_free(e);
57   }
58 }
59
60 /* **********************************************************************
61  * Client code
62  * **********************************************************************/
63
64
65 int client(int argc,char *argv[]) {
66   xbt_ex_t e;
67   gras_socket_t toserver=NULL; /* peer */
68   gras_socket_t toforwarder=NULL; /* peer */
69
70   memset(&e,0,sizeof(xbt_ex_t));
71
72   int ping, pong, i;
73   volatile int gotit=0;
74
75   const char *host = "127.0.0.1";
76         int   port = 4000;
77
78   /* 1. Init the GRAS's infrastructure */
79   gras_init(&argc, argv);
80    
81   /* 2. Get the server's address. The command line override defaults when specified */
82   if (argc == 5) {
83     host=argv[1];
84     port=atoi(argv[2]);
85   } 
86   INFO2("Launch client (server on %s:%d)",host,port);
87
88   exception_catching();
89    
90   /* 3. Wait for the server & forwarder startup */
91   gras_os_sleep(2);
92    
93   /* 4. Create a socket to speak to the server */
94   TRY {
95     exception_catching();
96     toserver=gras_socket_client(host,port);
97     toforwarder=gras_socket_client(argv[3],atoi(argv[4]));
98   } CATCH(e) {
99     RETHROW0("Unable to connect to the server: %s");
100   }
101   INFO2("Connected to %s:%d.",host,port);    
102
103
104   /* 5. Register the messages. 
105         See, it doesn't have to be done completely at the beginning,
106         but only before use */
107   exception_catching();
108   register_messages();
109
110   /* 6. Keep the user informed of what's going on */
111   INFO2("Connected to server which is on %s:%d ", 
112         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
113
114   /* 7. Prepare and send the ping message to the server */
115   ping = 1234;
116   TRY {
117     exception_catching();
118     gras_msg_rpccall(toserver, 6000.0,
119                      gras_msgtype_by_name("plain ping"), &ping, &pong);
120   } CATCH(e) {
121     gras_socket_close(toserver);
122     RETHROW0("Failed to execute a PING rpc on the server: %s");
123   }
124   exception_catching();
125
126   /* 8. Keep the user informed of what's going on, again */
127   INFO4("The answer to PING(%d) on %s:%d is PONG(%d)  ", 
128         ping, 
129         gras_socket_peer_name(toserver),gras_socket_peer_port(toserver),
130         pong);
131
132   /* 9. Call a RPC which raises an exception (to test exception propagation) */
133   INFO0("Call the exception raising RPC");
134   TRY {
135     gras_msg_rpccall(toserver, 6000.0,
136                      gras_msgtype_by_name("raise exception"), NULL, NULL);
137   } CATCH(e) {
138     gotit = 1; 
139     xbt_assert2(e.category == unknown_error, 
140                 "Got wrong category: %d (instead of %d)", 
141               e.category,unknown_error);
142     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
143     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
144                          strlen("Some error we will catch on client side")), 
145                 "Got wrong message: %s", e.msg);
146     INFO0("Got the expected exception when calling the exception raising RPC");
147     xbt_ex_free(e);
148   }
149
150   if (!gotit)
151     THROW0(unknown_error,0,"Didn't got the remote exception!");
152
153   INFO0("Called the exception raising RPC");
154   exception_catching();
155
156   /* doxygen_ignore */
157   for (i=0; i<5; i++) {
158         
159      INFO1("Call the exception raising RPC (i=%d)",i);
160      TRY {
161         gras_msg_rpccall(toserver, 6000.0,
162                          gras_msgtype_by_name("raise exception"), NULL, NULL);
163      } CATCH(e) {
164         gotit = 1;
165         xbt_ex_free(e);
166      }
167      if (!gotit) {
168         THROW0(unknown_error,0,"Didn't got the remote exception!");
169      }
170   }
171   /* doxygen_resume */
172   
173   /* 9. Call a RPC which raises an exception (to test that exception propagation works) */
174   for (i=0;i<5;i++) {
175     INFO1("Call the exception raising RPC on the forwarder (i=%d)",i);
176     TRY {
177       gras_msg_rpccall(toforwarder, 6000.0,
178                        gras_msgtype_by_name("forward exception"), NULL, NULL);
179     } CATCH(e) {
180       gotit = 1;
181     }
182     if (!gotit) {
183       THROW0(unknown_error,0,"Didn't got the remote exception!");
184     } 
185     xbt_assert1(e.value == 42, "Got wrong value: %d (!=42)", e.value);
186     xbt_assert1(!strncmp(e.msg,"Some error we will catch on client side",
187                          strlen("Some error we will catch on client side")), 
188                 "Got wrong message: %s", e.msg);
189     xbt_assert2(e.category == unknown_error, 
190                 "Got wrong category: %d (instead of %d)", 
191                 e.category,unknown_error);
192     INFO0("Got the expected exception when calling the exception raising RPC");
193     xbt_ex_free(e);
194     exception_catching();
195   }
196
197   INFO2("Ask %s:%d to die",gras_socket_peer_name(toforwarder),gras_socket_peer_port(toforwarder));
198   gras_msg_send(toforwarder,gras_msgtype_by_name("kill"),NULL);
199   INFO2("Ask %s:%d to die",gras_socket_peer_name(toserver),gras_socket_peer_port(toserver));
200   gras_msg_send(toserver,gras_msgtype_by_name("kill"),NULL);
201
202   /* 11. Cleanup the place before leaving */
203   gras_socket_close(toserver);
204   gras_socket_close(toforwarder);
205   gras_exit();
206   INFO0("Done.");
207   return 0;
208 } /* end_of_client */
209
210
211 /* **********************************************************************
212  * Forwarder code
213  * **********************************************************************/
214 typedef struct {
215   gras_socket_t server;
216   int done;
217 } s_forward_data_t, *forward_data_t;
218
219 static int forwarder_cb_kill(gras_msg_cb_ctx_t ctx,
220                              void             *payload_data) {
221   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
222   INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
223   forward_data_t fdata=gras_userdata_get();
224   fdata->done = 1;
225   return 1;
226 }
227
228 static int forwarder_cb_forward_ex(gras_msg_cb_ctx_t ctx,
229                                    void             *payload_data) {
230   forward_data_t fdata=gras_userdata_get();
231
232   INFO0("Forward a request");
233   gras_msg_rpccall(fdata->server, 60,
234                    gras_msgtype_by_name("raise exception"),NULL,NULL);
235   return 1;
236 }
237
238 int forwarder (int argc,char *argv[]) {
239   gras_socket_t mysock;
240   int port;
241   forward_data_t fdata;
242
243   gras_init(&argc,argv);
244
245   xbt_assert(argc == 4);
246
247   fdata=gras_userdata_new(s_forward_data_t);
248   fdata->done = 0;
249   port=atoi(argv[1]);
250
251   INFO1("Launch forwarder (port=%d)", port);
252   mysock = gras_socket_server(port);
253
254   gras_os_sleep(1); /* wait for the server to be ready */
255   fdata->server=gras_socket_client(argv[2],atoi(argv[3]));
256
257   register_messages();
258   gras_cb_register(gras_msgtype_by_name("forward exception"),
259                    &forwarder_cb_forward_ex);
260   gras_cb_register(gras_msgtype_by_name("kill"),
261                    &forwarder_cb_kill);
262
263   while (!fdata->done) {
264     gras_msg_handle(600.0);
265   }
266
267   gras_socket_close(mysock);
268   gras_socket_close(fdata->server);
269   free(fdata);
270   gras_exit();
271   INFO0("Done.");
272   return 0;
273 }
274
275 /* **********************************************************************
276  * Server code
277  * **********************************************************************/
278 typedef struct {
279   gras_socket_t server;
280   int done;
281 } s_server_data_t, *server_data_t;
282
283 static int server_cb_kill(gras_msg_cb_ctx_t ctx,
284                           void             *payload_data) {
285   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
286   INFO2("Asked to die by %s:%d",gras_socket_peer_name(expeditor),gras_socket_peer_port(expeditor));
287
288   server_data_t sdata=gras_userdata_get();
289   sdata->done = 1;
290   return 1;
291 }
292
293 static int server_cb_raise_ex(gras_msg_cb_ctx_t ctx,
294                               void             *payload_data) {
295   exception_raising();
296   return 1;
297 }
298
299 static int server_cb_ping(gras_msg_cb_ctx_t ctx,
300                           void             *payload_data) {
301                              
302   /* 1. Get the payload into the msg variable, and retrieve who called us */
303   int msg=*(int*)payload_data;
304   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
305    
306   /* 2. Log which client connected */
307   INFO3("Got message PING(%d) from %s:%d ", 
308         msg, 
309         gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
310   
311   /* 4. Change the value of the msg variable */
312   msg = 4321;
313
314   /* 5. Return as result */
315   gras_msg_rpcreturn(6000,ctx,&msg);
316   INFO0("Answered with PONG(4321)");
317      
318   /* 6. Cleanups, if any */
319
320   /* 7. Tell GRAS that we consummed this message */
321   return 1;
322 } /* end_of_server_cb_ping */
323
324
325 int server (int argc,char *argv[]) {
326   gras_socket_t mysock;
327   server_data_t sdata;
328
329   int port = 4000;
330   
331   /* 1. Init the GRAS infrastructure */
332   gras_init(&argc,argv);
333
334   /* 2. Get the port I should listen on from the command line, if specified */
335   if (argc == 2) 
336     port=atoi(argv[1]);
337
338   sdata=gras_userdata_new(s_server_data_t);
339   sdata->done = 0;
340
341   INFO1("Launch server (port=%d)", port);
342
343   /* 3. Create my master socket */
344   mysock = gras_socket_server(port);
345
346   /* 4. Register the known messages and register my callback */
347   register_messages();
348   gras_cb_register(gras_msgtype_by_name("plain ping"),&server_cb_ping);
349   gras_cb_register(gras_msgtype_by_name("raise exception"),&server_cb_raise_ex);
350   gras_cb_register(gras_msgtype_by_name("kill"),&server_cb_kill);
351
352   INFO1("Listening on port %d ", gras_socket_my_port(mysock));
353
354   /* 5. Wait for the ping incomming messages */
355   
356   /** \bug if the server is gone before the forwarder tries to connect, 
357      it dies awfully with the following message. The problem stands somewhere
358      at the interface between the gras_socket_t and the msg mess. There is thus
359      no way for me to dive into this before this interface is rewritten 
360 ==15875== Invalid read of size 4
361 ==15875==    at 0x408B805: find_port (transport_plugin_sg.c:68)
362 ==15875==    by 0x408BD64: gras_trp_sg_socket_client (transport_plugin_sg.c:115)
363 ==15875==    by 0x404A38B: gras_socket_client_ext (transport.c:255)
364 ==15875==    by 0x404A605: gras_socket_client (transport.c:288)
365 ==15875==    by 0x804B49D: forwarder (rpc.c:245)
366 ==15875==    by 0x80491FB: launch_forwarder (_rpc_simulator.c:52)
367 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
368 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
369 ==15875==    by 0x42AA549: clone (clone.S:119)
370 ==15875==  Address 0x433B49C is 44 bytes inside a block of size 48 free'd
371 ==15875==    at 0x401CF46: free (vg_replace_malloc.c:235)
372 ==15875==    by 0x408F1FA: gras_process_exit (sg_process.c:117)
373 ==15875==    by 0x4049386: gras_exit (gras.c:64)
374 ==15875==    by 0x804B936: server (rpc.c:345)
375 ==15875==    by 0x80492B1: launch_server (_rpc_simulator.c:69)
376 ==15875==    by 0x406780B: __context_wrapper (context.c:164)
377 ==15875==    by 0x41A6CB3: pthread_start_thread (manager.c:310)
378 ==15875==    by 0x42AA549: clone (clone.S:119)
379   */
380   while (!sdata->done) {
381     gras_msg_handle(600.0); 
382     exception_catching();
383   }
384    
385   /* 8. Free the allocated resources, and shut GRAS down */
386   free(sdata);
387   gras_socket_close(mysock);
388   gras_exit();
389    
390   INFO0("Done.");
391   return 0;
392 } /* end_of_server */
393