Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a indent rule to format the code in an uniform way, avoiding breaking the branche...
[simgrid.git] / src / gras / Msg / gras_msg_exchange.c
1 /* $Id$ */
2
3 /* gras message exchanges                                                   */
4
5 /* Copyright (c) 2003-2009. The SimGrid Team. All rights reserved.          */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "xbt/ex_interface.h"
12 #include "gras/Msg/msg_private.h"
13 #include "gras/Virtu/virtu_interface.h"
14
15 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
16
17
18 char _GRAS_header[6];
19 const char *e_gras_msg_kind_names[e_gras_msg_kind_count]=
20 {"UNKNOWN","ONEWAY","RPC call","RPC answer","RPC error"};
21
22
23 /** \brief Waits for a message to come in over a given socket.
24  *
25  * @param timeout: How long should we wait for this message.
26  * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message)
27  * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant)
28  * @param filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant)
29  * @param filter_ctx: context passed as second argument of the filter (a pattern to match?)
30  * @param[out] msg_got: where to write the message we got
31  *
32  * Every message of another type received before the one waited will be queued
33  * and used by subsequent call to this function or gras_msg_handle().
34  */
35
36 void
37 gras_msg_wait_ext_(double           timeout,
38                 gras_msgtype_t   msgt_want,
39                 gras_socket_t    expe_want,
40                 gras_msg_filter_t filter,
41                 void             *filter_ctx,
42
43                 gras_msg_t       msg_got) {
44
45         s_gras_msg_t msg;
46         double start, now;
47         gras_msg_procdata_t pd=
48                 (gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
49         unsigned int cpt;
50
51         xbt_assert0(msg_got,"msg_got is an output parameter");
52
53         start = gras_os_time();
54         VERB2("Waiting for message '%s' for %fs",msgt_want?msgt_want->name:"(any)", timeout);
55
56         xbt_dynar_foreach(pd->msg_waitqueue,cpt,msg){
57                 if ( (   !msgt_want || (msg.type->code == msgt_want->code))
58                                 && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
59                                                 gras_socket_peer_name(expe_want))))
60                                                 && (!filter || filter(&msg,filter_ctx))) {
61
62                         memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
63                         xbt_dynar_cursor_rm(pd->msg_waitqueue, &cpt);
64                         VERB0("The waited message was queued");
65                         return;
66                 }
67         }
68
69         xbt_dynar_foreach(pd->msg_queue,cpt,msg){
70                 if ( (   !msgt_want || (msg.type->code == msgt_want->code))
71                                 && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
72                                                 gras_socket_peer_name(expe_want))))
73                                                 && (!filter || filter(&msg,filter_ctx))) {
74
75                         memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
76                         xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
77                         VERB0("The waited message was queued");
78                         return;
79                 }
80         }
81
82         while (1) {
83                 int need_restart;
84                 xbt_ex_t e;
85
86                 restart_receive: /* Goto here when the receive of a message failed */
87                 need_restart=0;
88                 now=gras_os_time();
89                 memset(&msg,sizeof(msg),0);
90
91                 TRY {
92                         xbt_queue_shift_timed(pd->msg_received,&msg,timeout ? timeout - now + start : 0);
93                 } CATCH(e) {
94                         if (e.category == system_error &&
95                                         !strncmp("Socket closed by remote side",e.msg,
96                                                         strlen("Socket closed by remote side"))) {
97                                 xbt_ex_free(e);
98                                 need_restart=1;
99                         }       else {
100                                 RETHROW;
101                         }
102                 }
103                 if (need_restart)
104                         goto restart_receive;
105
106                 DEBUG0("Got a message from the socket");
107
108                 if ( (   !msgt_want || (msg.type->code == msgt_want->code))
109                                 && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
110                                                 gras_socket_peer_name(expe_want))))
111                                                 && (!filter || filter(&msg,filter_ctx))) {
112
113                         memcpy(msg_got,&msg,sizeof(s_gras_msg_t));
114                         DEBUG0("Message matches expectations. Use it.");
115                         return;
116                 }
117                 DEBUG0("Message does not match expectations. Queue it.");
118
119                 /* not expected msg type. Queue it for later */
120                 xbt_dynar_push(pd->msg_queue,&msg);
121
122                 now=gras_os_time();
123                 if (now - start + 0.001 > timeout) {
124                         THROW1(timeout_error,  now-start+0.001-timeout,
125                                         "Timeout while waiting for msg '%s'",
126                                         msgt_want?msgt_want->name:"(any)");
127                 }
128         }
129
130         THROW_IMPOSSIBLE;
131 }
132 /** \brief Waits for a message to come in over a given socket.
133  *
134  * @param timeout: How long should we wait for this message.
135  * @param msgt_want: type of awaited msg
136  * @param[out] expeditor: where to create a socket to answer the incomming message
137  * @param[out] payload: where to write the payload of the incomming message
138  * @return the error code (or no_error).
139  *
140  * Every message of another type received before the one waited will be queued
141  * and used by subsequent call to this function or gras_msg_handle().
142  */
143 void
144 gras_msg_wait_(double           timeout,
145                 gras_msgtype_t   msgt_want,
146                 gras_socket_t   *expeditor,
147                 void            *payload) {
148         s_gras_msg_t msg;
149
150         gras_msg_wait_ext_(timeout,
151                         msgt_want, NULL,      NULL, NULL,
152                         &msg);
153
154         if (msgt_want->ctn_type) {
155                 xbt_assert1(payload,
156                                 "Message type '%s' convey a payload you must accept",
157                                 msgt_want->name);
158         } else {
159                 xbt_assert1(!payload,
160                                 "No payload was declared for message type '%s'",
161                                 msgt_want->name);
162         }
163
164         if (payload) {
165                 memcpy(payload,msg.payl,msg.payl_size);
166                 free(msg.payl);
167         }
168
169         if (expeditor)
170                 *expeditor = msg.expe;
171 }
172
173 static int gras_msg_wait_or_filter(gras_msg_t msg, void *ctx) {
174         xbt_dynar_t dyn=(xbt_dynar_t)ctx;
175         int res =  xbt_dynar_member(dyn,msg->type);
176         if (res)
177                 VERB1("Got matching message (type=%s)",msg->type->name);
178         else
179                 VERB0("Got message not matching our expectations");
180         return res;
181 }
182 /** \brief Waits for a message to come in over a given socket.
183  *
184  * @param timeout: How long should we wait for this message.
185  * @param msgt_want: a dynar containing all accepted message type
186  * @param[out] ctx: the context of received message (in case it's a RPC call we want to answer to)
187  * @param[out] msgt_got: indice in the dynar of the type of the received message
188  * @param[out] payload: where to write the payload of the incomming message
189  * @return the error code (or no_error).
190  *
191  * Every message of a type not in the accepted list received before the one
192  * waited will be queued and used by subsequent call to this function or
193  * gras_msg_handle().
194  *
195  * If you are interested in the context, pass the address of a s_gras_msg_cb_ctx_t variable.
196  */
197 void gras_msg_wait_or(double         timeout,
198                 xbt_dynar_t    msgt_want,
199                 gras_msg_cb_ctx_t *ctx,
200                 int           *msgt_got,
201                 void          *payload) {
202         s_gras_msg_t msg;
203
204         VERB1("Wait %f seconds for several message types",timeout);
205         gras_msg_wait_ext_(timeout,
206                         NULL, NULL,
207                         &gras_msg_wait_or_filter, (void*)msgt_want,
208                         &msg);
209
210         if (msg.type->ctn_type) {
211                 xbt_assert1(payload,
212                                 "Message type '%s' convey a payload you must accept",
213                                 msg.type->name);
214         } /* don't check the other side since some of the types may have a payload */
215
216         if (payload && msg.type->ctn_type) {
217                 memcpy(payload,msg.payl,msg.payl_size);
218                 free(msg.payl);
219         }
220
221         if (ctx)
222                 *ctx=gras_msg_cb_ctx_new(msg.expe, msg.type, msg.ID,
223                                 (msg.kind == e_gras_msg_kind_rpccall), 60);
224
225         if (msgt_got)
226                 *msgt_got = xbt_dynar_search(msgt_want,msg.type);
227 }
228
229
230 /** \brief Send the data pointed by \a payload as a message of type
231  * \a msgtype to the peer \a sock */
232 void
233 gras_msg_send_(gras_socket_t   sock,
234                 gras_msgtype_t  msgtype,
235                 void           *payload) {
236
237         if (msgtype->ctn_type) {
238                 xbt_assert1(payload,
239                                 "Message type '%s' convey a payload you must provide",
240                                 msgtype->name);
241         } else {
242                 xbt_assert1(!payload,
243                                 "No payload was declared for message type '%s'",
244                                 msgtype->name);
245         }
246
247         DEBUG2("Send a oneway message of type '%s'. Payload=%p",
248                         msgtype->name,payload);
249         gras_msg_send_ext(sock, e_gras_msg_kind_oneway,0, msgtype, payload);
250         VERB2("Sent a oneway message of type '%s'. Payload=%p",
251                         msgtype->name,payload);
252 }
253
254 /** @brief Handle all messages arriving within the given period
255  *
256  * @param period: How long to wait for incoming messages (in seconds)
257  *
258  * Messages are dealed with just like gras_msg_handle() would do. The
259  * difference is that gras_msg_handle() handles at most one message (or wait up
260  * to timeout second when no message arrives) while this function handles any
261  * amount of messages, and lasts the given period in any case.
262  */
263 void
264 gras_msg_handleall(double period) {
265         xbt_ex_t e;
266         double begin=gras_os_time();
267         double now;
268
269         do {
270                 now=gras_os_time();
271                 TRY{
272                         if (period - now + begin > 0)
273                                 gras_msg_handle(period - now + begin);
274                 } CATCH(e) {
275                         if (e.category != timeout_error)
276                                 RETHROW0("Error while waiting for messages: %s");
277                         xbt_ex_free(e);
278                 }
279                 /* Epsilon to avoid numerical stability issues were the waited interval is so small that the global clock cannot notice the increment */
280         } while (period - now + begin > 0);
281 }
282
283 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
284  *
285  * @param timeOut: How long to wait for incoming messages (in seconds)
286  * @return the error code (or no_error).
287  *
288  * Any message arriving in the given interval is passed to the callbacks.
289  *
290  * @sa gras_msg_handleall().
291  */
292 void
293 gras_msg_handle(double timeOut) {
294
295         double          untiltimer;
296
297         unsigned int cpt;
298         int volatile ran_ok;
299
300         s_gras_msg_t    msg;
301
302         gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
303         gras_cblist_t  *list=NULL;
304         gras_msg_cb_t       cb;
305         s_gras_msg_cb_ctx_t ctx;
306
307         int timerexpected, timeouted;
308         xbt_ex_t e;
309
310         VERB1("Handling message within the next %.2fs",timeOut);
311
312         untiltimer = gras_msg_timer_handle();
313         DEBUG1("Next timer in %f sec", untiltimer);
314         if (untiltimer == 0.0) {
315                 /* A timer was already elapsed and handled */
316                 return;
317         }
318         if (untiltimer != -1.0) {
319                 timerexpected = 1;
320                 timeOut = MIN(timeOut, untiltimer);
321         } else {
322                 timerexpected = 0;
323         }
324
325         /* get a message (from the queue or from the net) */
326         timeouted = 0;
327         if (xbt_dynar_length(pd->msg_queue)) {
328                 DEBUG0("Get a message from the queue");
329                 xbt_dynar_shift(pd->msg_queue,&msg);
330         } else {
331                 TRY {
332                         xbt_queue_shift_timed(pd->msg_received,&msg,timeOut);
333                         //      msg.expe = gras_trp_select(timeOut);
334                 } CATCH(e) {
335                         if (e.category != timeout_error)
336                                 RETHROW;
337                         DEBUG0("Damn. Timeout while getting a message from the queue");
338                         xbt_ex_free(e);
339                         timeouted = 1;
340                 }
341         }
342
343         if (timeouted) {
344                 if (timerexpected) {
345
346                         /* A timer elapsed before the arrival of any message even if we select()ed a bit */
347                         untiltimer = gras_msg_timer_handle();
348                         if (untiltimer == 0.0) {
349                                 /* we served a timer, we're done */
350                                 return;
351                         } else {
352                                 xbt_assert1(untiltimer>0, "Negative timer (%f). I'm 'puzzeled'", untiltimer);
353                                 WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
354                                                 untiltimer);
355                                 THROW1(timeout_error,0,
356                                                 "No timer elapsed, in contrary to expectations (next in %f sec)",
357                                                 untiltimer);
358                         }
359
360                 } else {
361                         /* select timeouted, and no timer elapsed. Nothing to do */
362                         THROW1(timeout_error, 0, "No new message or timer (delay was %f)",
363                                         timeOut);
364                 }
365
366         }
367
368         /* A message was already there or arrived in the meanwhile. handle it */
369         xbt_dynar_foreach(pd->cbl_list,cpt,list) {
370                 if (list->id == msg.type->code) {
371                         break;
372                 } else {
373                         list=NULL;
374                 }
375         }
376         if (!list) {
377                 INFO4("No callback for message '%s' (type:%s) from %s:%d. Queue it for later gras_msg_wait() use.",
378                                 msg.type->name,e_gras_msg_kind_names[msg.kind],
379                                 gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe));
380                 xbt_dynar_push(pd->msg_waitqueue,&msg);
381                 return; /* FIXME: maybe we should call ourselves again until the end of the timer or a proper msg is got */
382         }
383
384         ctx.expeditor = msg.expe;
385         ctx.ID = msg.ID;
386         ctx.msgtype = msg.type;
387         ctx.answer_due = (msg.kind == e_gras_msg_kind_rpccall);
388
389         switch (msg.kind) {
390         case e_gras_msg_kind_oneway:
391         case e_gras_msg_kind_rpccall:
392                 ran_ok=0;
393                 TRY {
394                         xbt_dynar_foreach(list->cbs,cpt,cb) {
395                                 if (!ran_ok) {
396                                         DEBUG4("Use the callback #%d (@%p) for incomming msg '%s' (payload_size=%d)",
397                                                         cpt+1,cb,msg.type->name,msg.payl_size);
398                                         if (!(*cb)(&ctx,msg.payl)) {
399                                                 /* cb handled the message */
400                                                 free(msg.payl);
401                                                 ran_ok = 1;
402                                         }
403                                 }
404                         }
405                 } CATCH(e) {
406                         free(msg.payl);
407                         if (msg.type->kind == e_gras_msg_kind_rpccall) {
408                                 char *old_file=e.file;
409                                 /* The callback raised an exception, propagate it on the network */
410                                 if (!e.remote) {
411                                         /* Make sure we reduce the file name to its basename to avoid issues in tests */
412                                         char *new_file=strrchr(e.file,'/');
413                                         if (new_file)
414                                                 e.file = new_file;
415                                         /* the exception is born on this machine */
416                                         e.host = (char*)gras_os_myname();
417                                         xbt_ex_setup_backtrace(&e);
418                                 }
419                                 INFO5("Propagate %s exception ('%s') from '%s' RPC cb back to %s:%d",
420                                                 (e.remote ? "remote" : "local"),
421                                                 e.msg,
422                                                 msg.type->name,
423                                                 gras_socket_peer_name(msg.expe),
424                                                 gras_socket_peer_port(msg.expe));
425                                 if (XBT_LOG_ISENABLED(gras_msg,xbt_log_priority_verbose))
426                                         xbt_ex_display(&e);
427                                 gras_msg_send_ext(msg.expe, e_gras_msg_kind_rpcerror,
428                                                 msg.ID, msg.type, &e);
429                                 e.file=old_file;
430                                 xbt_ex_free(e);
431                                 ctx.answer_due = 0;
432                                 ran_ok=1;
433                         } else {
434                                 RETHROW4("Callback #%d (@%p) to message '%s' (payload size: %d) raised an exception: %s",
435                                                 cpt+1,cb,msg.type->name,msg.payl_size);
436                         }
437                 }
438
439                 xbt_assert1(! ctx.answer_due,
440                                 "Bug in user code: RPC callback to message '%s' didn't call gras_msg_rpcreturn",msg.type->name);
441                 if (ctx.answer_due)
442                         CRITICAL1("BUGS BOTH IN USER CODE (RPC callback to message '%s' didn't call gras_msg_rpcreturn) "
443                                         "AND IN SIMGRID (process wasn't killed by an assert)",msg.type->name);
444                 if (!ran_ok)
445                         THROW1(mismatch_error,0,
446                                         "Message '%s' refused by all registered callbacks (maybe your callback misses a 'return 0' at the end)", msg.type->name);
447                 /* FIXME: gras_datadesc_free not implemented => leaking the payload */
448                 break;
449
450
451         case e_gras_msg_kind_rpcanswer:
452                 INFO3("Unexpected RPC answer discarded (type: %s; from:%s:%d)", msg.type->name,
453                                 gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe));
454                 WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
455                 return;
456
457         case e_gras_msg_kind_rpcerror:
458                 INFO3("Unexpected RPC error discarded (type: %s; from:%s:%d)", msg.type->name,
459                                 gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe));
460                 WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
461                 return;
462
463         default:
464                 THROW1(unknown_error,0,
465                                 "Cannot handle messages of kind %d yet",msg.type->kind);
466         }
467
468 }
469