Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
65b30f9eeee07841b9d366a9011a134e279c8fc9
[simgrid.git] / src / gras / Msg / gras_msg_exchange.c
1 /* $Id$ */
2
3 /* gras message exchanges                                                   */
4
5 /* Copyright (c) 2003, 2004, 2005, 2006, 2007 Martin Quinson.               */
6 /* All rights reserved.                                                     */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "xbt/ex.h"
12 #include "xbt/ex_interface.h"
13 #include "gras/Msg/msg_private.h"
14 #include "gras/Virtu/virtu_interface.h"
15 #include "gras/Transport/transport_interface.h" /* gras_select */
16
17 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
18
19
20 char _GRAS_header[6];
21 const char *e_gras_msg_kind_names[e_gras_msg_kind_count]=
22   {"UNKNOWN","ONEWAY","RPC call","RPC answer","RPC error"};
23
24
25 /** \brief Waits for a message to come in over a given socket. 
26  *
27  * @param timeout: How long should we wait for this message.
28  * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message)
29  * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant)
30  * @param filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant)
31  * @param filter_ctx: context passed as second argument of the filter (a pattern to match?)
32  * @param[out] msg_got: where to write the message we got
33  *
34  * Every message of another type received before the one waited will be queued
35  * and used by subsequent call to this function or gras_msg_handle().
36  */
37
38 void
39 gras_msg_wait_ext_(double           timeout,    
40
41                   gras_msgtype_t   msgt_want,
42                   gras_socket_t    expe_want,
43                   gras_msg_filter_t filter,
44                   void             *filter_ctx, 
45
46                   gras_msg_t       msg_got) {
47
48   s_gras_msg_t msg;
49   double start, now;
50   gras_msg_procdata_t pd=
51     (gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
52   int cpt;
53
54   xbt_assert0(msg_got,"msg_got is an output parameter");
55
56   start = gras_os_time();
57   VERB1("Waiting for message '%s'",msgt_want?msgt_want->name:"(any)");
58
59   xbt_dynar_foreach(pd->msg_waitqueue,cpt,msg){
60     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
61          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
62                                      gras_socket_peer_name(expe_want))))
63          && (!filter || filter(&msg,filter_ctx))) {
64
65       memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
66       xbt_dynar_cursor_rm(pd->msg_waitqueue, &cpt);
67       VERB0("The waited message was queued");
68       return;
69     }
70   }
71
72   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
73     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
74          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
75                                      gras_socket_peer_name(expe_want))))
76          && (!filter || filter(&msg,filter_ctx))) {
77
78       memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
79       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
80       VERB0("The waited message was queued");
81       return;
82     }
83   }
84
85   while (1) {
86     int need_restart;
87     xbt_ex_t e;
88
89   restart_receive: /* Goto here when the receive of a message failed */
90     need_restart=0;
91     now=gras_os_time();
92     memset(&msg,sizeof(msg),0);
93
94     TRY {
95       msg.expe = gras_trp_select(timeout ? timeout - now + start : 0);
96       gras_msg_recv(msg.expe, &msg);
97     } CATCH(e) {
98       if (e.category == system_error &&
99           !strncmp("Socket closed by remote side",e.msg,
100                   strlen("Socket closed by remote side"))) {
101         xbt_ex_free(e);
102         need_restart=1;
103       } else {
104         RETHROW;
105       }
106     }
107     if (need_restart)
108       goto restart_receive;
109
110     DEBUG0("Got a message from the socket");
111
112     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
113          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
114                                      gras_socket_peer_name(expe_want))))
115          && (!filter || filter(&msg,filter_ctx))) {
116
117       memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
118       DEBUG0("Message matches expectations. Use it.");
119       return;
120     }
121     DEBUG0("Message does not match expectations. Queue it.");
122
123     /* not expected msg type. Queue it for later */
124     xbt_dynar_push(pd->msg_queue,&msg);
125     
126     now=gras_os_time();
127     if (now - start + 0.001 > timeout) {
128       THROW1(timeout_error,  now-start+0.001-timeout,
129              "Timeout while waiting for msg '%s'",
130              msgt_want?msgt_want->name:"(any)");
131     }
132   }
133
134   THROW_IMPOSSIBLE;
135 }
136 /** \brief Waits for a message to come in over a given socket. 
137  *
138  * @param timeout: How long should we wait for this message.
139  * @param msgt_want: type of awaited msg
140  * @param[out] expeditor: where to create a socket to answer the incomming message
141  * @param[out] payload: where to write the payload of the incomming message
142  * @return the error code (or no_error).
143  *
144  * Every message of another type received before the one waited will be queued
145  * and used by subsequent call to this function or gras_msg_handle().
146  */
147 void
148 gras_msg_wait_(double           timeout,    
149                gras_msgtype_t   msgt_want,
150                gras_socket_t   *expeditor,
151                void            *payload) {
152   s_gras_msg_t msg;
153
154   gras_msg_wait_ext_(timeout,
155                      msgt_want, NULL,      NULL, NULL,
156                      &msg);
157
158   if (msgt_want->ctn_type) {
159     xbt_assert1(payload,
160                 "Message type '%s' convey a payload you must accept",
161                 msgt_want->name);
162   } else {
163     xbt_assert1(!payload,
164                 "No payload was declared for message type '%s'",
165                 msgt_want->name);
166   }
167
168   if (payload) {
169     memcpy(payload,msg.payl,msg.payl_size);
170     free(msg.payl);
171   }
172
173   if (expeditor)
174     *expeditor = msg.expe;
175 }
176
177 static int gras_msg_wait_or_filter(gras_msg_t msg, void *ctx) {
178   xbt_dynar_t dyn=(xbt_dynar_t)ctx;
179   int res =  xbt_dynar_member(dyn,msg->type);
180   if (res)
181     VERB1("Got matching message (type=%s)",msg->type->name);
182   else
183     VERB0("Got message not matching our expectations");
184   return res;
185 }
186 /** \brief Waits for a message to come in over a given socket. 
187  *
188  * @param timeout: How long should we wait for this message.
189  * @param msgt_want: a dynar containing all accepted message type
190  * @param[out] ctx: the context of received message (in case it's a RPC call we want to answer to)
191  * @param[out] msgt_got: indice in the dynar of the type of the received message 
192  * @param[out] payload: where to write the payload of the incomming message
193  * @return the error code (or no_error).
194  *
195  * Every message of a type not in the accepted list received before the one
196  * waited will be queued and used by subsequent call to this function or
197  * gras_msg_handle().
198  *
199  * If you are interested in the context, pass the address of a s_gras_msg_cb_ctx_t variable.
200  */
201 void gras_msg_wait_or(double         timeout,
202                       xbt_dynar_t    msgt_want,
203                       gras_msg_cb_ctx_t *ctx,
204                       int           *msgt_got,
205                       void          *payload) {
206   s_gras_msg_t msg;
207
208   VERB1("Wait %f seconds for several message types",timeout);
209   gras_msg_wait_ext_(timeout,
210                      NULL, NULL,      
211                      &gras_msg_wait_or_filter, (void*)msgt_want,
212                      &msg);
213
214   if (msg.type->ctn_type) {
215     xbt_assert1(payload,
216                 "Message type '%s' convey a payload you must accept",
217                 msg.type->name);
218   } /* don't check the other side since some of the types may have a payload */
219
220   if (payload && msg.type->ctn_type) {
221     memcpy(payload,msg.payl,msg.payl_size);
222     free(msg.payl);
223   }
224
225   if (ctx) 
226     *ctx=gras_msg_cb_ctx_new(msg.expe, msg.type, msg.ID,
227                              (msg.kind == e_gras_msg_kind_rpccall), 60);
228
229   if (msgt_got)
230     *msgt_got = xbt_dynar_search(msgt_want,msg.type);
231 }
232
233
234 /** \brief Send the data pointed by \a payload as a message of type
235  * \a msgtype to the peer \a sock */
236 void
237 gras_msg_send_(gras_socket_t   sock,
238               gras_msgtype_t  msgtype,
239               void           *payload) {
240
241   if (msgtype->ctn_type) {
242     xbt_assert1(payload,
243                 "Message type '%s' convey a payload you must provide",
244                 msgtype->name);
245   } else {
246     xbt_assert1(!payload,
247                 "No payload was declared for message type '%s'",
248                 msgtype->name);
249   }
250
251   DEBUG2("Send a oneway message of type '%s'. Payload=%p",
252          msgtype->name,payload);
253   gras_msg_send_ext(sock, e_gras_msg_kind_oneway,0, msgtype, payload);
254   VERB2("Sent a oneway message of type '%s'. Payload=%p",
255         msgtype->name,payload);
256 }
257
258 /** @brief Handle all messages arriving within the given period
259  *
260  * @param period: How long to wait for incoming messages (in seconds)
261  *
262  * Messages are dealed with just like gras_msg_handle() would do. The
263  * difference is that gras_msg_handle() handles at most one message (or wait up
264  * to timeout second when no message arrives) while this function handles any
265  * amount of messages, and lasts the given period in any case.
266  */
267 void 
268 gras_msg_handleall(double period) {
269   xbt_ex_t e;
270   double begin=gras_os_time();
271   double now;
272
273   do {
274     now=gras_os_time();
275     TRY{
276       if (period - now + begin > 0)
277         gras_msg_handle(period - now + begin);
278     } CATCH(e) {
279       if (e.category != timeout_error) 
280         RETHROW0("Error while waiting for messages: %s");
281       xbt_ex_free(e);
282     }
283   } while (now - begin < period);
284 }
285
286 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
287  *
288  * @param timeOut: How long to wait for incoming messages (in seconds)
289  * @return the error code (or no_error).
290  *
291  * Messages are passed to the callbacks. See also gras_msg_handleall().
292  */
293 void
294 gras_msg_handle(double timeOut) {
295   
296   double          untiltimer;
297    
298   int             cpt;
299   int volatile ran_ok;
300
301   s_gras_msg_t    msg;
302
303   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
304   gras_cblist_t  *list=NULL;
305   gras_msg_cb_t       cb;
306   s_gras_msg_cb_ctx_t ctx;
307    
308   int timerexpected, timeouted;
309   xbt_ex_t e;
310
311   VERB1("Handling message within the next %.2fs",timeOut);
312   
313   untiltimer = gras_msg_timer_handle();
314   DEBUG1("Next timer in %f sec", untiltimer);
315   if (untiltimer == 0.0) {
316      /* A timer was already elapsed and handled */
317      return;
318   }
319   if (untiltimer != -1.0) {
320      timerexpected = 1;
321      timeOut = MIN(timeOut, untiltimer);
322   } else {
323      timerexpected = 0;
324   }
325    
326   /* get a message (from the queue or from the net) */
327   timeouted = 0;
328   if (xbt_dynar_length(pd->msg_queue)) {
329     DEBUG0("Get a message from the queue");
330     xbt_dynar_shift(pd->msg_queue,&msg);
331   } else {
332     TRY {
333       msg.expe = gras_trp_select(timeOut);
334     } CATCH(e) {
335       if (e.category != timeout_error)
336         RETHROW;
337       xbt_ex_free(e);
338       timeouted = 1;
339     }
340
341     if (!timeouted) {
342       TRY {
343         /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
344         gras_msg_recv(msg.expe, &msg);
345         DEBUG1("Received a msg from the socket kind:%s",
346                e_gras_msg_kind_names[msg.kind]);
347     
348       } CATCH(e) {
349         RETHROW4("Error while receiving a message on select()ed socket %p to [%s]%s:%d: %s",
350                  msg.expe,
351                  gras_socket_peer_proc(msg.expe),gras_socket_peer_name(msg.expe),
352                  gras_socket_peer_port(msg.expe));
353       }
354     }
355   }
356
357   if (timeouted) {
358      if (timerexpected) {
359           
360         /* A timer elapsed before the arrival of any message even if we select()ed a bit */
361         untiltimer = gras_msg_timer_handle();
362         if (untiltimer == 0.0) {
363           /* we served a timer, we're done */
364           return;
365         } else {
366            xbt_assert1(untiltimer>0, "Negative timer (%f). I'm 'puzzeled'", untiltimer);
367            WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
368                   untiltimer);
369            THROW1(timeout_error,0,
370                   "No timer elapsed, in contrary to expectations (next in %f sec)",
371                   untiltimer);
372         }
373         
374      } else {
375         /* select timeouted, and no timer elapsed. Nothing to do */
376        THROW1(timeout_error, 0, "No new message or timer (delay was %f)",
377               timeOut);
378      }
379      
380   }
381    
382   /* A message was already there or arrived in the meanwhile. handle it */
383   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
384     if (list->id == msg.type->code) {
385       break;
386     } else {
387       list=NULL;
388     }
389   }
390   if (!list) {
391     INFO3("No callback for message '%s' from %s:%d. Queue it for later gras_msg_wait() use.",
392           msg.type->name,
393           gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe));
394     xbt_dynar_push(pd->msg_waitqueue,&msg);
395     return; /* FIXME: maybe we should call ourselves again until the end of the timer or a proper msg is got */
396   }
397   
398   ctx.expeditor = msg.expe;
399   ctx.ID = msg.ID;
400   ctx.msgtype = msg.type;
401   ctx.answer_due = (msg.kind == e_gras_msg_kind_rpccall);
402
403   switch (msg.kind) {
404   case e_gras_msg_kind_oneway:
405   case e_gras_msg_kind_rpccall:
406     ran_ok=0;
407     TRY {
408       xbt_dynar_foreach(list->cbs,cpt,cb) { 
409         if (!ran_ok) {
410           DEBUG4("Use the callback #%d (@%p) for incomming msg %s (payload_size=%d)",
411                 cpt+1,cb,msg.type->name,msg.payl_size);
412           if (!(*cb)(&ctx,msg.payl)) {
413             /* cb handled the message */
414             free(msg.payl);
415             ran_ok = 1;
416           }
417         }
418       }
419     } CATCH(e) {
420       free(msg.payl);
421       if (msg.type->kind == e_gras_msg_kind_rpccall) {
422         char *old_file=e.file;
423         /* The callback raised an exception, propagate it on the network */
424         if (!e.remote) { 
425           /* Make sure we reduce the file name to its basename to avoid issues in tests */
426           char *new_file=strrchr(e.file,'/');
427           if (new_file)
428              e.file = new_file;
429           /* the exception is born on this machine */
430           e.host = (char*)gras_os_myname();
431           xbt_ex_setup_backtrace(&e);
432         } 
433         VERB5("Propagate %s exception ('%s') from '%s' RPC cb back to %s:%d",
434               (e.remote ? "remote" : "local"),
435               e.msg,
436               msg.type->name,
437               gras_socket_peer_name(msg.expe),
438               gras_socket_peer_port(msg.expe));
439         gras_msg_send_ext(msg.expe, e_gras_msg_kind_rpcerror,
440                           msg.ID, msg.type, &e);
441         e.file=old_file;
442         xbt_ex_free(e);
443         ctx.answer_due = 0;
444         ran_ok=1;
445       } else {
446         RETHROW0("Callback raised an exception: %s");
447       }
448     }
449     xbt_assert0(!(ctx.answer_due),
450                 "RPC callback didn't call gras_msg_rpcreturn");
451
452     if (!ran_ok)
453       THROW1(mismatch_error,0,
454              "Message '%s' refused by all registered callbacks", msg.type->name);
455     /* FIXME: gras_datadesc_free not implemented => leaking the payload */
456     break;
457
458
459   case e_gras_msg_kind_rpcanswer:
460     INFO1("Unexpected RPC answer discarded (type: %s)", msg.type->name);
461     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
462     return;
463
464   case e_gras_msg_kind_rpcerror:
465     INFO1("Unexpected RPC error discarded (type: %s)", msg.type->name);
466     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
467     return;
468
469   default:
470     THROW1(unknown_error,0,
471            "Cannot handle messages of kind %d yet",msg.type->kind);
472   }
473
474 }
475