Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Split up a too long file, taking the oportunity to sanitize the file naming spacec
[simgrid.git] / src / gras / Msg / gras_msg_exchange.c
1 /* $Id$ */
2
3 /* gras message exchanges                                                   */
4
5 /* Copyright (c) 2003, 2004, 2005, 2006, 2007 Martin Quinson.               */
6 /* All rights reserved.                                                     */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "xbt/ex.h"
12 #include "xbt/ex_interface.h"
13 #include "gras/Msg/msg_private.h"
14 #include "gras/Virtu/virtu_interface.h"
15 #include "gras/Transport/transport_interface.h" /* gras_select */
16
17 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
18
19
20 char _GRAS_header[6];
21 const char *e_gras_msg_kind_names[e_gras_msg_kind_count]=
22   {"UNKNOWN","ONEWAY","RPC call","RPC answer","RPC error"};
23
24
25
26 /** \brief Waits for a message to come in over a given socket. 
27  *
28  * @param timeout: How long should we wait for this message.
29  * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message)
30  * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant)
31  * @param filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant)
32  * @param filter_ctx: context passed as second argument of the filter (a pattern to match?)
33  * @param[out] msg_got: where to write the message we got
34  *
35  * Every message of another type received before the one waited will be queued
36  * and used by subsequent call to this function or gras_msg_handle().
37  */
38
39 void
40 gras_msg_wait_ext_(double           timeout,    
41
42                   gras_msgtype_t   msgt_want,
43                   gras_socket_t    expe_want,
44                   gras_msg_filter_t filter,
45                   void             *filter_ctx, 
46
47                   gras_msg_t       msg_got) {
48
49   s_gras_msg_t msg;
50   double start, now;
51   gras_msg_procdata_t pd=
52     (gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
53   int cpt;
54
55   xbt_assert0(msg_got,"msg_got is an output parameter");
56
57   start = gras_os_time();
58   VERB1("Waiting for message '%s'",msgt_want?msgt_want->name:"(any)");
59
60   xbt_dynar_foreach(pd->msg_waitqueue,cpt,msg){
61     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
62          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
63                                      gras_socket_peer_name(expe_want))))
64          && (!filter || filter(&msg,filter_ctx))) {
65
66       memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
67       xbt_dynar_cursor_rm(pd->msg_waitqueue, &cpt);
68       VERB0("The waited message was queued");
69       return;
70     }
71   }
72
73   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
74     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
75          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
76                                      gras_socket_peer_name(expe_want))))
77          && (!filter || filter(&msg,filter_ctx))) {
78
79       memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
80       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
81       VERB0("The waited message was queued");
82       return;
83     }
84   }
85
86   while (1) {
87     int need_restart;
88     xbt_ex_t e;
89
90   restart_receive: /* Goto here when the receive of a message failed */
91     need_restart=0;
92     now=gras_os_time();
93     memset(&msg,sizeof(msg),0);
94
95     TRY {
96       msg.expe = gras_trp_select(timeout ? timeout - now + start : 0);
97       gras_msg_recv(msg.expe, &msg);
98     } CATCH(e) {
99       if (e.category == system_error &&
100           !strncmp("Socket closed by remote side",e.msg,
101                   strlen("Socket closed by remote side"))) {
102         xbt_ex_free(e);
103         need_restart=1;
104       } else {
105         RETHROW;
106       }
107     }
108     if (need_restart)
109       goto restart_receive;
110
111     DEBUG0("Got a message from the socket");
112
113     if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
114          && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
115                                      gras_socket_peer_name(expe_want))))
116          && (!filter || filter(&msg,filter_ctx))) {
117
118       memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
119       DEBUG0("Message matches expectations. Use it.");
120       return;
121     }
122     DEBUG0("Message does not match expectations. Queue it.");
123
124     /* not expected msg type. Queue it for later */
125     xbt_dynar_push(pd->msg_queue,&msg);
126     
127     now=gras_os_time();
128     if (now - start + 0.001 > timeout) {
129       THROW1(timeout_error,  now-start+0.001-timeout,
130              "Timeout while waiting for msg '%s'",
131              msgt_want?msgt_want->name:"(any)");
132     }
133   }
134
135   THROW_IMPOSSIBLE;
136 }
137 /** \brief Waits for a message to come in over a given socket. 
138  *
139  * @param timeout: How long should we wait for this message.
140  * @param msgt_want: type of awaited msg
141  * @param[out] expeditor: where to create a socket to answer the incomming message
142  * @param[out] payload: where to write the payload of the incomming message
143  * @return the error code (or no_error).
144  *
145  * Every message of another type received before the one waited will be queued
146  * and used by subsequent call to this function or gras_msg_handle().
147  */
148 void
149 gras_msg_wait_(double           timeout,    
150                gras_msgtype_t   msgt_want,
151                gras_socket_t   *expeditor,
152                void            *payload) {
153   s_gras_msg_t msg;
154
155   gras_msg_wait_ext_(timeout,
156                      msgt_want, NULL,      NULL, NULL,
157                      &msg);
158
159   if (msgt_want->ctn_type) {
160     xbt_assert1(payload,
161                 "Message type '%s' convey a payload you must accept",
162                 msgt_want->name);
163   } else {
164     xbt_assert1(!payload,
165                 "No payload was declared for message type '%s'",
166                 msgt_want->name);
167   }
168
169   if (payload) {
170     memcpy(payload,msg.payl,msg.payl_size);
171     free(msg.payl);
172   }
173
174   if (expeditor)
175     *expeditor = msg.expe;
176 }
177
178 static int gras_msg_wait_or_filter(gras_msg_t msg, void *ctx) {
179   xbt_dynar_t dyn=(xbt_dynar_t)ctx;
180   int res =  xbt_dynar_member(dyn,msg->type);
181   if (res)
182     VERB1("Got matching message (type=%s)",msg->type->name);
183   else
184     VERB0("Got message not matching our expectations");
185   return res;
186 }
187 /** \brief Waits for a message to come in over a given socket. 
188  *
189  * @param timeout: How long should we wait for this message.
190  * @param msgt_want: a dynar containing all accepted message type
191  * @param[out] ctx: the context of received message (in case it's a RPC call we want to answer to)
192  * @param[out] msgt_got: indice in the dynar of the type of the received message 
193  * @param[out] payload: where to write the payload of the incomming message
194  * @return the error code (or no_error).
195  *
196  * Every message of a type not in the accepted list received before the one
197  * waited will be queued and used by subsequent call to this function or
198  * gras_msg_handle().
199  *
200  * If you are interested in the context, pass the address of a s_gras_msg_cb_ctx_t variable.
201  */
202 void gras_msg_wait_or(double         timeout,
203                       xbt_dynar_t    msgt_want,
204                       gras_msg_cb_ctx_t *ctx,
205                       int           *msgt_got,
206                       void          *payload) {
207   s_gras_msg_t msg;
208
209   VERB1("Wait %f seconds for several message types",timeout);
210   gras_msg_wait_ext_(timeout,
211                      NULL, NULL,      
212                      &gras_msg_wait_or_filter, (void*)msgt_want,
213                      &msg);
214
215   if (msg.type->ctn_type) {
216     xbt_assert1(payload,
217                 "Message type '%s' convey a payload you must accept",
218                 msg.type->name);
219   } /* don't check the other side since some of the types may have a payload */
220
221   if (payload && msg.type->ctn_type) {
222     memcpy(payload,msg.payl,msg.payl_size);
223     free(msg.payl);
224   }
225
226   if (ctx) 
227     *ctx=gras_msg_cb_ctx_new(msg.expe, msg.type, msg.ID,
228                              (msg.kind == e_gras_msg_kind_rpccall), 60);
229
230   if (msgt_got)
231     *msgt_got = xbt_dynar_search(msgt_want,msg.type);
232 }
233
234
235 /** \brief Send the data pointed by \a payload as a message of type
236  * \a msgtype to the peer \a sock */
237 void
238 gras_msg_send_(gras_socket_t   sock,
239               gras_msgtype_t  msgtype,
240               void           *payload) {
241
242   if (msgtype->ctn_type) {
243     xbt_assert1(payload,
244                 "Message type '%s' convey a payload you must provide",
245                 msgtype->name);
246   } else {
247     xbt_assert1(!payload,
248                 "No payload was declared for message type '%s'",
249                 msgtype->name);
250   }
251
252   DEBUG2("Send a oneway message of type '%s'. Payload=%p",
253          msgtype->name,payload);
254   gras_msg_send_ext(sock, e_gras_msg_kind_oneway,0, msgtype, payload);
255   VERB2("Sent a oneway message of type '%s'. Payload=%p",
256         msgtype->name,payload);
257 }
258
259 /** @brief Handle all messages arriving within the given period
260  *
261  * @param period: How long to wait for incoming messages (in seconds)
262  *
263  * Messages are dealed with just like gras_msg_handle() would do. The
264  * difference is that gras_msg_handle() handles at most one message (or wait up
265  * to timeout second when no message arrives) while this function handles any
266  * amount of messages, and lasts the given period in any case.
267  */
268 void 
269 gras_msg_handleall(double period) {
270   xbt_ex_t e;
271   double begin=gras_os_time();
272   double now;
273
274   do {
275     now=gras_os_time();
276     TRY{
277       if (period - now + begin > 0)
278         gras_msg_handle(period - now + begin);
279     } CATCH(e) {
280       if (e.category != timeout_error) 
281         RETHROW0("Error while waiting for messages: %s");
282       xbt_ex_free(e);
283     }
284   } while (now - begin < period);
285 }
286
287 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
288  *
289  * @param timeOut: How long to wait for incoming messages (in seconds)
290  * @return the error code (or no_error).
291  *
292  * Messages are passed to the callbacks. See also gras_msg_handleall().
293  */
294 void
295 gras_msg_handle(double timeOut) {
296   
297   double          untiltimer;
298    
299   int             cpt;
300   int volatile ran_ok;
301
302   s_gras_msg_t    msg;
303
304   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
305   gras_cblist_t  *list=NULL;
306   gras_msg_cb_t       cb;
307   s_gras_msg_cb_ctx_t ctx;
308    
309   int timerexpected, timeouted;
310   xbt_ex_t e;
311
312   VERB1("Handling message within the next %.2fs",timeOut);
313   
314   untiltimer = gras_msg_timer_handle();
315   DEBUG1("Next timer in %f sec", untiltimer);
316   if (untiltimer == 0.0) {
317      /* A timer was already elapsed and handled */
318      return;
319   }
320   if (untiltimer != -1.0) {
321      timerexpected = 1;
322      timeOut = MIN(timeOut, untiltimer);
323   } else {
324      timerexpected = 0;
325   }
326    
327   /* get a message (from the queue or from the net) */
328   timeouted = 0;
329   if (xbt_dynar_length(pd->msg_queue)) {
330     DEBUG0("Get a message from the queue");
331     xbt_dynar_shift(pd->msg_queue,&msg);
332   } else {
333     TRY {
334       msg.expe = gras_trp_select(timeOut);
335     } CATCH(e) {
336       if (e.category != timeout_error)
337         RETHROW;
338       xbt_ex_free(e);
339       timeouted = 1;
340     }
341
342     if (!timeouted) {
343       TRY {
344         /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
345         gras_msg_recv(msg.expe, &msg);
346         DEBUG1("Received a msg from the socket kind:%s",
347                e_gras_msg_kind_names[msg.kind]);
348     
349       } CATCH(e) {
350         RETHROW4("Error while receiving a message on select()ed socket %p to [%s]%s:%d: %s",
351                  msg.expe,
352                  gras_socket_peer_proc(msg.expe),gras_socket_peer_name(msg.expe),
353                  gras_socket_peer_port(msg.expe));
354       }
355     }
356   }
357
358   if (timeouted) {
359      if (timerexpected) {
360           
361         /* A timer elapsed before the arrival of any message even if we select()ed a bit */
362         untiltimer = gras_msg_timer_handle();
363         if (untiltimer == 0.0) {
364           /* we served a timer, we're done */
365           return;
366         } else {
367            xbt_assert1(untiltimer>0, "Negative timer (%f). I'm 'puzzeled'", untiltimer);
368            WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
369                   untiltimer);
370            THROW1(timeout_error,0,
371                   "No timer elapsed, in contrary to expectations (next in %f sec)",
372                   untiltimer);
373         }
374         
375      } else {
376         /* select timeouted, and no timer elapsed. Nothing to do */
377        THROW1(timeout_error, 0, "No new message or timer (delay was %f)",
378               timeOut);
379      }
380      
381   }
382    
383   /* A message was already there or arrived in the meanwhile. handle it */
384   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
385     if (list->id == msg.type->code) {
386       break;
387     } else {
388       list=NULL;
389     }
390   }
391   if (!list) {
392     INFO3("No callback for message '%s' from %s:%d. Queue it for later gras_msg_wait() use.",
393           msg.type->name,
394           gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe));
395     xbt_dynar_push(pd->msg_waitqueue,&msg);
396     return; /* FIXME: maybe we should call ourselves again until the end of the timer or a proper msg is got */
397   }
398   
399   ctx.expeditor = msg.expe;
400   ctx.ID = msg.ID;
401   ctx.msgtype = msg.type;
402   ctx.answer_due = (msg.kind == e_gras_msg_kind_rpccall);
403
404   switch (msg.kind) {
405   case e_gras_msg_kind_oneway:
406   case e_gras_msg_kind_rpccall:
407     ran_ok=0;
408     TRY {
409       xbt_dynar_foreach(list->cbs,cpt,cb) { 
410         if (!ran_ok) {
411           DEBUG4("Use the callback #%d (@%p) for incomming msg %s (payload_size=%d)",
412                 cpt+1,cb,msg.type->name,msg.payl_size);
413           if (!(*cb)(&ctx,msg.payl)) {
414             /* cb handled the message */
415             free(msg.payl);
416             ran_ok = 1;
417           }
418         }
419       }
420     } CATCH(e) {
421       free(msg.payl);
422       if (msg.type->kind == e_gras_msg_kind_rpccall) {
423         char *old_file=e.file;
424         /* The callback raised an exception, propagate it on the network */
425         if (!e.remote) { 
426           /* Make sure we reduce the file name to its basename to avoid issues in tests */
427           char *new_file=strrchr(e.file,'/');
428           if (new_file)
429              e.file = new_file;
430           /* the exception is born on this machine */
431           e.host = (char*)gras_os_myname();
432           xbt_ex_setup_backtrace(&e);
433         } 
434         VERB5("Propagate %s exception ('%s') from '%s' RPC cb back to %s:%d",
435               (e.remote ? "remote" : "local"),
436               e.msg,
437               msg.type->name,
438               gras_socket_peer_name(msg.expe),
439               gras_socket_peer_port(msg.expe));
440         gras_msg_send_ext(msg.expe, e_gras_msg_kind_rpcerror,
441                           msg.ID, msg.type, &e);
442         e.file=old_file;
443         xbt_ex_free(e);
444         ctx.answer_due = 0;
445         ran_ok=1;
446       } else {
447         RETHROW0("Callback raised an exception: %s");
448       }
449     }
450     xbt_assert0(!(ctx.answer_due),
451                 "RPC callback didn't call gras_msg_rpcreturn");
452
453     if (!ran_ok)
454       THROW1(mismatch_error,0,
455              "Message '%s' refused by all registered callbacks", msg.type->name);
456     /* FIXME: gras_datadesc_free not implemented => leaking the payload */
457     break;
458
459
460   case e_gras_msg_kind_rpcanswer:
461     INFO1("Unexpected RPC answer discarded (type: %s)", msg.type->name);
462     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
463     return;
464
465   case e_gras_msg_kind_rpcerror:
466     INFO1("Unexpected RPC error discarded (type: %s)", msg.type->name);
467     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
468     return;
469
470   default:
471     THROW1(unknown_error,0,
472            "Cannot handle messages of kind %d yet",msg.type->kind);
473   }
474
475 }
476