Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill old $Id$ command dating from CVS
[simgrid.git] / src / gras / Msg / gras_msg_exchange.c
1 /* gras message exchanges                                                   */
2
3 /* Copyright (c) 2003-2009. The SimGrid Team. All rights reserved.          */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "xbt/ex.h"
9 #include "xbt/ex_interface.h"
10 #include "gras/Msg/msg_private.h"
11 #include "gras/Virtu/virtu_interface.h"
12
13 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
14
15
16 char _GRAS_header[6];
17 const char *e_gras_msg_kind_names[e_gras_msg_kind_count] =
18   { "UNKNOWN", "ONEWAY", "RPC call", "RPC answer", "RPC error" };
19
20
21 /** \brief Waits for a message to come in over a given socket.
22  *
23  * @param timeout: How long should we wait for this message.
24  * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message)
25  * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant)
26  * @param filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant)
27  * @param filter_ctx: context passed as second argument of the filter (a pattern to match?)
28  * @param[out] msg_got: where to write the message we got
29  *
30  * Every message of another type received before the one waited will be queued
31  * and used by subsequent call to this function or gras_msg_handle().
32  */
33
34 void
35 gras_msg_wait_ext_(double timeout,
36                    gras_msgtype_t msgt_want,
37                    gras_socket_t expe_want,
38                    gras_msg_filter_t filter,
39                    void *filter_ctx, gras_msg_t msg_got)
40 {
41
42   s_gras_msg_t msg;
43   double start, now;
44   gras_msg_procdata_t pd =
45     (gras_msg_procdata_t) gras_libdata_by_id(gras_msg_libdata_id);
46   unsigned int cpt;
47
48   xbt_assert0(msg_got, "msg_got is an output parameter");
49
50   start = gras_os_time();
51   VERB2("Waiting for message '%s' for %fs",
52         msgt_want ? msgt_want->name : "(any)", timeout);
53
54   xbt_dynar_foreach(pd->msg_waitqueue, cpt, msg) {
55     if ((!msgt_want || (msg.type->code == msgt_want->code))
56         && (!expe_want || (!strcmp(gras_socket_peer_name(msg.expe),
57                                    gras_socket_peer_name(expe_want))))
58         && (!filter || filter(&msg, filter_ctx))) {
59
60       memcpy(msg_got, &msg, sizeof(s_gras_msg_t));
61       xbt_dynar_cursor_rm(pd->msg_waitqueue, &cpt);
62       VERB0("The waited message was queued");
63       return;
64     }
65   }
66
67   xbt_dynar_foreach(pd->msg_queue, cpt, msg) {
68     if ((!msgt_want || (msg.type->code == msgt_want->code))
69         && (!expe_want || (!strcmp(gras_socket_peer_name(msg.expe),
70                                    gras_socket_peer_name(expe_want))))
71         && (!filter || filter(&msg, filter_ctx))) {
72
73       memcpy(msg_got, &msg, sizeof(s_gras_msg_t));
74       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
75       VERB0("The waited message was queued");
76       return;
77     }
78   }
79
80   while (1) {
81     int need_restart;
82     xbt_ex_t e;
83
84   restart_receive:             /* Goto here when the receive of a message failed */
85     need_restart = 0;
86     now = gras_os_time();
87     memset(&msg, 0, sizeof(msg));
88
89     TRY {
90       xbt_queue_shift_timed(pd->msg_received, &msg,
91                             timeout ? timeout - now + start : 0);
92     }
93     CATCH(e) {
94       if (e.category == system_error &&
95           !strncmp("Socket closed by remote side", e.msg,
96                    strlen("Socket closed by remote side"))) {
97         xbt_ex_free(e);
98         need_restart = 1;
99       } else {
100         RETHROW;
101       }
102     }
103     if (need_restart)
104       goto restart_receive;
105
106     DEBUG0("Got a message from the socket");
107
108     if ((!msgt_want || (msg.type->code == msgt_want->code))
109         && (!expe_want || (!strcmp(gras_socket_peer_name(msg.expe),
110                                    gras_socket_peer_name(expe_want))))
111         && (!filter || filter(&msg, filter_ctx))) {
112
113       memcpy(msg_got, &msg, sizeof(s_gras_msg_t));
114       DEBUG0("Message matches expectations. Use it.");
115       return;
116     }
117     DEBUG0("Message does not match expectations. Queue it.");
118
119     /* not expected msg type. Queue it for later */
120     xbt_dynar_push(pd->msg_queue, &msg);
121
122     now = gras_os_time();
123     if (now - start + 0.001 > timeout) {
124       THROW1(timeout_error, now - start + 0.001 - timeout,
125              "Timeout while waiting for msg '%s'",
126              msgt_want ? msgt_want->name : "(any)");
127     }
128   }
129
130   THROW_IMPOSSIBLE;
131 }
132
133 /** \brief Waits for a message to come in over a given socket.
134  *
135  * @param timeout: How long should we wait for this message.
136  * @param msgt_want: type of awaited msg
137  * @param[out] expeditor: where to create a socket to answer the incomming message
138  * @param[out] payload: where to write the payload of the incomming message
139  * @return the error code (or no_error).
140  *
141  * Every message of another type received before the one waited will be queued
142  * and used by subsequent call to this function or gras_msg_handle().
143  */
144 void
145 gras_msg_wait_(double timeout,
146                gras_msgtype_t msgt_want,
147                gras_socket_t * expeditor, void *payload)
148 {
149   s_gras_msg_t msg;
150
151   gras_msg_wait_ext_(timeout, msgt_want, NULL, NULL, NULL, &msg);
152
153   if (msgt_want->ctn_type) {
154     xbt_assert1(payload,
155                 "Message type '%s' convey a payload you must accept",
156                 msgt_want->name);
157   } else {
158     xbt_assert1(!payload,
159                 "No payload was declared for message type '%s'",
160                 msgt_want->name);
161   }
162
163   if (payload) {
164     memcpy(payload, msg.payl, msg.payl_size);
165     free(msg.payl);
166   }
167
168   if (expeditor)
169     *expeditor = msg.expe;
170 }
171
172 static int gras_msg_wait_or_filter(gras_msg_t msg, void *ctx)
173 {
174   xbt_dynar_t dyn = (xbt_dynar_t) ctx;
175   int res = xbt_dynar_member(dyn, msg->type);
176   if (res)
177     VERB1("Got matching message (type=%s)", msg->type->name);
178   else
179     VERB0("Got message not matching our expectations");
180   return res;
181 }
182
183 /** \brief Waits for a message to come in over a given socket.
184  *
185  * @param timeout: How long should we wait for this message.
186  * @param msgt_want: a dynar containing all accepted message type
187  * @param[out] ctx: the context of received message (in case it's a RPC call we want to answer to)
188  * @param[out] msgt_got: indice in the dynar of the type of the received message
189  * @param[out] payload: where to write the payload of the incomming message
190  * @return the error code (or no_error).
191  *
192  * Every message of a type not in the accepted list received before the one
193  * waited will be queued and used by subsequent call to this function or
194  * gras_msg_handle().
195  *
196  * If you are interested in the context, pass the address of a s_gras_msg_cb_ctx_t variable.
197  */
198 void gras_msg_wait_or(double timeout,
199                       xbt_dynar_t msgt_want,
200                       gras_msg_cb_ctx_t * ctx, int *msgt_got, void *payload)
201 {
202   s_gras_msg_t msg;
203
204   VERB1("Wait %f seconds for several message types", timeout);
205   gras_msg_wait_ext_(timeout,
206                      NULL, NULL,
207                      &gras_msg_wait_or_filter, (void *) msgt_want, &msg);
208
209   if (msg.type->ctn_type) {
210     xbt_assert1(payload,
211                 "Message type '%s' convey a payload you must accept",
212                 msg.type->name);
213   }
214   /* don't check the other side since some of the types may have a payload */
215   if (payload && msg.type->ctn_type) {
216     memcpy(payload, msg.payl, msg.payl_size);
217     free(msg.payl);
218   }
219
220   if (ctx)
221     *ctx = gras_msg_cb_ctx_new(msg.expe, msg.type, msg.ID,
222                                (msg.kind == e_gras_msg_kind_rpccall), 60);
223
224   if (msgt_got)
225     *msgt_got = xbt_dynar_search(msgt_want, msg.type);
226 }
227
228
229 /** \brief Send the data pointed by \a payload as a message of type
230  * \a msgtype to the peer \a sock */
231 void gras_msg_send_(gras_socket_t sock, gras_msgtype_t msgtype, void *payload)
232 {
233
234   if (msgtype->ctn_type) {
235     xbt_assert1(payload,
236                 "Message type '%s' convey a payload you must provide",
237                 msgtype->name);
238   } else {
239     xbt_assert1(!payload,
240                 "No payload was declared for message type '%s'",
241                 msgtype->name);
242   }
243
244   DEBUG2("Send a oneway message of type '%s'. Payload=%p",
245          msgtype->name, payload);
246   gras_msg_send_ext(sock, e_gras_msg_kind_oneway, 0, msgtype, payload);
247   VERB2("Sent a oneway message of type '%s'. Payload=%p",
248         msgtype->name, payload);
249 }
250
251 /** @brief Handle all messages arriving within the given period
252  *
253  * @param period: How long to wait for incoming messages (in seconds)
254  *
255  * Messages are dealed with just like gras_msg_handle() would do. The
256  * difference is that gras_msg_handle() handles at most one message (or wait up
257  * to timeout second when no message arrives) while this function handles any
258  * amount of messages, and lasts the given period in any case.
259  */
260 void gras_msg_handleall(double period)
261 {
262   xbt_ex_t e;
263   double begin = gras_os_time();
264   double now;
265
266   do {
267     now = gras_os_time();
268     TRY {
269       if (period - now + begin > 0)
270         gras_msg_handle(period - now + begin);
271     }
272     CATCH(e) {
273       if (e.category != timeout_error)
274         RETHROW0("Error while waiting for messages: %s");
275       xbt_ex_free(e);
276     }
277     /* Epsilon to avoid numerical stability issues were the waited interval is so small that the global clock cannot notice the increment */
278   } while (period - now + begin > 0);
279 }
280
281 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
282  *
283  * @param timeOut: How long to wait for incoming messages (in seconds)
284  * @return the error code (or no_error).
285  *
286  * Any message arriving in the given interval is passed to the callbacks.
287  *
288  * @sa gras_msg_handleall().
289  */
290 void gras_msg_handle(double timeOut)
291 {
292
293   double untiltimer;
294
295   unsigned int cpt;
296   int volatile ran_ok;
297
298   s_gras_msg_t msg;
299
300   gras_msg_procdata_t pd =
301     (gras_msg_procdata_t) gras_libdata_by_id(gras_msg_libdata_id);
302   gras_cblist_t *list = NULL;
303   gras_msg_cb_t cb;
304   s_gras_msg_cb_ctx_t ctx;
305
306   int timerexpected, timeouted;
307   xbt_ex_t e;
308
309   VERB1("Handling message within the next %.2fs", timeOut);
310
311   untiltimer = gras_msg_timer_handle();
312   DEBUG1("Next timer in %f sec", untiltimer);
313   if (untiltimer == 0.0) {
314     /* A timer was already elapsed and handled */
315     return;
316   }
317   if (untiltimer != -1.0) {
318     timerexpected = 1;
319     timeOut = MIN(timeOut, untiltimer);
320   } else {
321     timerexpected = 0;
322   }
323
324   /* get a message (from the queue or from the net) */
325   timeouted = 0;
326   if (xbt_dynar_length(pd->msg_queue)) {
327     DEBUG0("Get a message from the queue");
328     xbt_dynar_shift(pd->msg_queue, &msg);
329   } else {
330     TRY {
331       xbt_queue_shift_timed(pd->msg_received, &msg, timeOut);
332       //      msg.expe = gras_trp_select(timeOut);
333     }
334     CATCH(e) {
335       if (e.category != timeout_error)
336         RETHROW;
337       DEBUG0("Damn. Timeout while getting a message from the queue");
338       xbt_ex_free(e);
339       timeouted = 1;
340     }
341   }
342
343   if (timeouted) {
344     if (timerexpected) {
345
346       /* A timer elapsed before the arrival of any message even if we select()ed a bit */
347       untiltimer = gras_msg_timer_handle();
348       if (untiltimer == 0.0) {
349         /* we served a timer, we're done */
350         return;
351       } else {
352         xbt_assert1(untiltimer > 0, "Negative timer (%f). I'm 'puzzeled'",
353                     untiltimer);
354         WARN1
355           ("No timer elapsed, in contrary to expectations (next in %f sec)",
356            untiltimer);
357         THROW1(timeout_error, 0,
358                "No timer elapsed, in contrary to expectations (next in %f sec)",
359                untiltimer);
360       }
361
362     } else {
363       /* select timeouted, and no timer elapsed. Nothing to do */
364       THROW1(timeout_error, 0, "No new message or timer (delay was %f)",
365              timeOut);
366     }
367
368   }
369
370   /* A message was already there or arrived in the meanwhile. handle it */
371   xbt_dynar_foreach(pd->cbl_list, cpt, list) {
372     if (list->id == msg.type->code) {
373       break;
374     } else {
375       list = NULL;
376     }
377   }
378   if (!list) {
379     INFO4
380       ("No callback for message '%s' (type:%s) from %s:%d. Queue it for later gras_msg_wait() use.",
381        msg.type->name, e_gras_msg_kind_names[msg.kind],
382        gras_socket_peer_name(msg.expe), gras_socket_peer_port(msg.expe));
383     xbt_dynar_push(pd->msg_waitqueue, &msg);
384     return;                     /* FIXME: maybe we should call ourselves again until the end of the timer or a proper msg is got */
385   }
386
387   ctx.expeditor = msg.expe;
388   ctx.ID = msg.ID;
389   ctx.msgtype = msg.type;
390   ctx.answer_due = (msg.kind == e_gras_msg_kind_rpccall);
391
392   switch (msg.kind) {
393   case e_gras_msg_kind_oneway:
394   case e_gras_msg_kind_rpccall:
395     ran_ok = 0;
396     TRY {
397       xbt_dynar_foreach(list->cbs, cpt, cb) {
398         if (!ran_ok) {
399           DEBUG4
400             ("Use the callback #%d (@%p) for incomming msg '%s' (payload_size=%d)",
401              cpt + 1, cb, msg.type->name, msg.payl_size);
402           if (!(*cb) (&ctx, msg.payl)) {
403             /* cb handled the message */
404             free(msg.payl);
405             ran_ok = 1;
406           }
407         }
408       }
409     }
410     CATCH(e) {
411       free(msg.payl);
412       if (msg.type->kind == e_gras_msg_kind_rpccall) {
413         char *old_file = e.file;
414         /* The callback raised an exception, propagate it on the network */
415         if (!e.remote) {
416           /* Make sure we reduce the file name to its basename to avoid issues in tests */
417           char *new_file = strrchr(e.file, '/');
418           if (new_file)
419             e.file = new_file;
420           /* the exception is born on this machine */
421           e.host = (char *) gras_os_myname();
422           xbt_ex_setup_backtrace(&e);
423         }
424         INFO5("Propagate %s exception ('%s') from '%s' RPC cb back to %s:%d",
425               (e.remote ? "remote" : "local"),
426               e.msg,
427               msg.type->name,
428               gras_socket_peer_name(msg.expe),
429               gras_socket_peer_port(msg.expe));
430         if (XBT_LOG_ISENABLED(gras_msg, xbt_log_priority_verbose))
431           xbt_ex_display(&e);
432         gras_msg_send_ext(msg.expe, e_gras_msg_kind_rpcerror,
433                           msg.ID, msg.type, &e);
434         e.file = old_file;
435         xbt_ex_free(e);
436         ctx.answer_due = 0;
437         ran_ok = 1;
438       } else {
439         RETHROW4
440           ("Callback #%d (@%p) to message '%s' (payload size: %d) raised an exception: %s",
441            cpt + 1, cb, msg.type->name, msg.payl_size);
442       }
443     }
444
445     xbt_assert1(!ctx.answer_due,
446                 "Bug in user code: RPC callback to message '%s' didn't call gras_msg_rpcreturn",
447                 msg.type->name);
448     if (ctx.answer_due)
449       CRITICAL1
450         ("BUGS BOTH IN USER CODE (RPC callback to message '%s' didn't call gras_msg_rpcreturn) "
451          "AND IN SIMGRID (process wasn't killed by an assert)",
452          msg.type->name);
453     if (!ran_ok)
454       THROW1(mismatch_error, 0,
455              "Message '%s' refused by all registered callbacks (maybe your callback misses a 'return 0' at the end)",
456              msg.type->name);
457     /* FIXME: gras_datadesc_free not implemented => leaking the payload */
458     break;
459
460
461   case e_gras_msg_kind_rpcanswer:
462     INFO3("Unexpected RPC answer discarded (type: %s; from:%s:%d)",
463           msg.type->name, gras_socket_peer_name(msg.expe),
464           gras_socket_peer_port(msg.expe));
465     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
466     return;
467
468   case e_gras_msg_kind_rpcerror:
469     INFO3("Unexpected RPC error discarded (type: %s; from:%s:%d)",
470           msg.type->name, gras_socket_peer_name(msg.expe),
471           gras_socket_peer_port(msg.expe));
472     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
473     return;
474
475   default:
476     THROW1(unknown_error, 0,
477            "Cannot handle messages of kind %d yet", msg.type->kind);
478   }
479
480 }