Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ec226a7b94fca188be9f5ec078b9a7127fffa841
[simgrid.git] / src / msg / msg_gos.cpp
1 /* Copyright (c) 2004-2016. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/simix/smx_private.h" /* MSG_task_listen looks inside the rdv directly. Not clean. */
7 #include "msg_private.h"
8 #include "mc/mc.h"
9 #include "xbt/log.h"
10 #include "xbt/sysdep.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
13                                 "Logging specific to MSG (gos)");
14
15 /** \ingroup msg_task_usage
16  * \brief Executes a task and waits for its termination.
17  *
18  * This function is used for describing the behavior of a process. It takes only one parameter.
19  * \param task a #msg_task_t to execute on the location on which the process is running.
20  * \return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED or #MSG_HOST_FAILURE otherwise
21  */
22 msg_error_t MSG_task_execute(msg_task_t task)
23 {
24   /* TODO: add this to other locations */
25   msg_host_t host = MSG_process_get_host(MSG_process_self());
26   MSG_host_add_task(host, task);
27
28   msg_error_t ret = MSG_parallel_task_execute(task);
29
30   MSG_host_del_task(host, task);
31
32   return ret;
33 }
34
35 /** \ingroup msg_task_usage
36  * \brief Executes a parallel task and waits for its termination.
37  *
38  * \param task a #msg_task_t to execute on the location on which the process is running.
39  *
40  * \return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED
41  * or #MSG_HOST_FAILURE otherwise
42  */
43 msg_error_t MSG_parallel_task_execute(msg_task_t task)
44 {
45   xbt_ex_t e;
46   simdata_task_t simdata = task->simdata;
47   simdata_process_t p_simdata = (simdata_process_t) SIMIX_process_self_get_data();
48   e_smx_state_t comp_state;
49   msg_error_t status = MSG_OK;
50
51   TRACE_msg_task_execute_start(task);
52
53   xbt_assert((!simdata->compute) && (task->simdata->isused == 0),
54              "This task is executed somewhere else. Go fix your code! %d", task->simdata->isused!=NULL);
55
56   XBT_DEBUG("Computing on %s", MSG_process_get_name(MSG_process_self()));
57
58   if (simdata->flops_amount == 0 && !simdata->host_nb) {
59     TRACE_msg_task_execute_end(task);
60     return MSG_OK;
61   }
62
63   TRY {
64     if (msg_global->debug_multiple_use)
65       MSG_BT(simdata->isused, "Using Backtrace");
66     else
67       simdata->isused = (void*)1;
68
69     if (simdata->host_nb > 0) {
70       simdata->compute = static_cast<simgrid::simix::Exec*>(
71           simcall_execution_parallel_start(task->name, simdata->host_nb,simdata->host_list,
72                                                        simdata->flops_parallel_amount, simdata->bytes_parallel_amount,
73                                                        1.0, -1.0));
74       XBT_DEBUG("Parallel execution action created: %p", simdata->compute);
75     } else {
76       unsigned long affinity_mask =
77          (unsigned long)(uintptr_t) xbt_dict_get_or_null_ext(simdata->affinity_mask_db, (char *) p_simdata->m_host,
78                                                              sizeof(msg_host_t));
79       XBT_DEBUG("execute %s@%s with affinity(0x%04lx)",
80                 MSG_task_get_name(task), MSG_host_get_name(p_simdata->m_host), affinity_mask);
81
82           simdata->compute = static_cast<simgrid::simix::Exec*>(
83               simcall_execution_start(task->name, simdata->flops_amount, simdata->priority,
84                                                  simdata->bound, affinity_mask));
85     }
86     simcall_set_category(simdata->compute, task->category);
87     p_simdata->waiting_action = simdata->compute;
88     comp_state = simcall_execution_wait(simdata->compute);
89
90     p_simdata->waiting_action = NULL;
91
92     if (msg_global->debug_multiple_use && simdata->isused!=0)
93       xbt_ex_free(*(xbt_ex_t*)simdata->isused);
94     simdata->isused = 0;
95
96     XBT_DEBUG("Execution task '%s' finished in state %d", task->name, (int)comp_state);
97   }
98   CATCH(e) {
99     switch (e.category) {
100     case cancel_error:
101       status = MSG_TASK_CANCELED;
102       break;
103     case host_error:
104       status = MSG_HOST_FAILURE;
105       break;
106     default:
107       RETHROW;
108     }
109     xbt_ex_free(e);
110   }
111   /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
112   simdata->flops_amount = 0.0;
113   simdata->comm = NULL;
114   simdata->compute = NULL;
115   TRACE_msg_task_execute_end(task);
116
117   MSG_RETURN(status);
118 }
119
120 /** \ingroup msg_task_usage
121  * \brief Sleep for the specified number of seconds
122  *
123  * Makes the current process sleep until \a time seconds have elapsed.
124  *
125  * \param nb_sec a number of second
126  */
127 msg_error_t MSG_process_sleep(double nb_sec)
128 {
129   xbt_ex_t e;
130   msg_error_t status = MSG_OK;
131   /*msg_process_t proc = MSG_process_self();*/
132
133   TRACE_msg_process_sleep_in(MSG_process_self());
134
135   TRY {
136     simcall_process_sleep(nb_sec);
137   }
138   CATCH(e) {
139     switch (e.category) {
140     case cancel_error:
141       XBT_DEBUG("According to the JAVA API, a sleep call should only deal with HostFailureException, WTF here ?"); 
142       // adsein: MSG_TASK_CANCELED is assigned when someone kills the process that made the sleep, this is not
143       // correct. For instance, when the node is turned off, the error should be MSG_HOST_FAILURE, which is by the way
144       // and according to the JAVA document, the only exception that can be triggered by MSG_Process_sleep call.
145       // To avoid possible impacts in the code, I just raised a host_failure exception for the moment in the JAVA code
146       // and did not change anythings at the C level.
147       // See comment in the jmsg_process.c file, function JNIEXPORT void JNICALL Java_org_simgrid_msg_Process_sleep(JNIEnv *env, jclass cls, jlong jmillis, jint jnanos) 
148       status = MSG_TASK_CANCELED;
149       break;
150     default:
151       RETHROW;
152     }
153     xbt_ex_free(e);
154   }
155
156   TRACE_msg_process_sleep_out(MSG_process_self());
157   MSG_RETURN(status);
158 }
159
160 /** \ingroup msg_task_usage
161  * \brief Receives a task from a mailbox.
162  *
163  * This is a blocking function, the execution flow will be blocked until the task is received. See #MSG_task_irecv
164  * for receiving tasks asynchronously.
165  *
166  * \param task a memory location for storing a #msg_task_t.
167  * \param alias name of the mailbox to receive the task from
168  *
169  * \return Returns
170  * #MSG_OK if the task was successfully received,
171  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
172  */
173 msg_error_t MSG_task_receive(msg_task_t * task, const char *alias)
174 {
175   return MSG_task_receive_with_timeout(task, alias, -1);
176 }
177
178 /** \ingroup msg_task_usage
179  * \brief Receives a task from a mailbox at a given rate.
180  *
181  * \param task a memory location for storing a #msg_task_t.
182  * \param alias name of the mailbox to receive the task from
183  * \param rate limit the reception to rate bandwidth
184  *
185  * \return Returns
186  * #MSG_OK if the task was successfully received,
187  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
188  */
189 msg_error_t MSG_task_receive_bounded(msg_task_t * task, const char *alias, double rate)
190 {
191   return MSG_task_receive_with_timeout_bounded(task, alias, -1, rate);
192 }
193
194 /** \ingroup msg_task_usage
195  * \brief Receives a task from a mailbox with a given timeout.
196  *
197  * This is a blocking function with a timeout, the execution flow will be blocked until the task is received or the
198  * timeout is achieved. See #MSG_task_irecv for receiving tasks asynchronously.  You can provide a -1 timeout
199  * to obtain an infinite timeout.
200  *
201  * \param task a memory location for storing a #msg_task_t.
202  * \param alias name of the mailbox to receive the task from
203  * \param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_receive)
204  *
205  * \return Returns
206  * #MSG_OK if the task was successfully received,
207  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
208  */
209 msg_error_t MSG_task_receive_with_timeout(msg_task_t * task, const char *alias, double timeout)
210 {
211   return MSG_task_receive_ext(task, alias, timeout, NULL);
212 }
213
214 /** \ingroup msg_task_usage
215  * \brief Receives a task from a mailbox with a given timeout and at a given rate.
216  *
217  * \param task a memory location for storing a #msg_task_t.
218  * \param alias name of the mailbox to receive the task from
219  * \param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_receive)
220  *  \param rate limit the reception to rate bandwidth
221  *
222  * \return Returns
223  * #MSG_OK if the task was successfully received,
224  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
225  */
226 msg_error_t MSG_task_receive_with_timeout_bounded(msg_task_t * task, const char *alias, double timeout,double rate)
227 {
228   return MSG_task_receive_ext_bounded(task, alias, timeout, NULL, rate);
229 }
230
231 /** \ingroup msg_task_usage
232  * \brief Receives a task from a mailbox from a specific host with a given timeout.
233  *
234  * This is a blocking function with a timeout, the execution flow will be blocked until the task is received or the
235  * timeout is achieved. See #MSG_task_irecv for receiving tasks asynchronously. You can provide a -1 timeout
236  * to obtain an infinite timeout.
237  *
238  * \param task a memory location for storing a #msg_task_t.
239  * \param alias name of the mailbox to receive the task from
240  * \param timeout is the maximum wait time for completion (provide -1 for no timeout)
241  * \param host a #msg_host_t host from where the task was sent
242  *
243  * \return Returns
244  * #MSG_OK if the task was successfully received,
245 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
246  */
247 msg_error_t MSG_task_receive_ext(msg_task_t * task, const char *alias, double timeout, msg_host_t host)
248 {
249   xbt_ex_t e;
250   msg_error_t ret = MSG_OK;
251   XBT_DEBUG("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'", alias);
252   TRY {
253     ret = MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host, timeout);
254   }
255   CATCH(e) {
256     switch (e.category) {
257     case cancel_error:          /* may be thrown by MSG_mailbox_get_by_alias */
258       ret = MSG_HOST_FAILURE;
259       break;
260     default:
261       RETHROW;
262     }
263     xbt_ex_free(e);
264   }
265   return ret;
266 }
267
268 /** \ingroup msg_task_usage
269  * \brief Receives a task from a mailbox from a specific host with a given timeout  and at a given rate.
270  *
271  * \param task a memory location for storing a #msg_task_t.
272  * \param alias name of the mailbox to receive the task from
273  * \param timeout is the maximum wait time for completion (provide -1 for no timeout)
274  * \param host a #msg_host_t host from where the task was sent
275  * \param rate limit the reception to rate bandwidth
276  *
277  * \return Returns
278  * #MSG_OK if the task was successfully received,
279 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
280  */
281 msg_error_t MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, double timeout, msg_host_t host,
282                                          double rate)
283 {
284   XBT_DEBUG("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'", alias);
285   return MSG_mailbox_get_task_ext_bounded(MSG_mailbox_get_by_alias(alias), task, host, timeout, rate);
286 }
287
288 /* Internal function used to factorize code between MSG_task_isend_with_matching() and MSG_task_dsend(). */
289 static inline msg_comm_t MSG_task_isend_internal(msg_task_t task, const char *alias,
290                                                      int (*match_fun)(void*,void*, smx_synchro_t),
291                                                      void *match_data, void_f_pvoid_t cleanup, int detached)
292 {
293   simdata_task_t t_simdata = NULL;
294   msg_process_t process = MSG_process_self();
295   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
296   int call_end = TRACE_msg_task_put_start(task);
297
298   /* Prepare the task to send */
299   t_simdata = task->simdata;
300   t_simdata->sender = process;
301   t_simdata->source = ((simdata_process_t) SIMIX_process_self_get_data())->m_host;
302
303   if (t_simdata->isused != 0) {
304     if (msg_global->debug_multiple_use){
305       XBT_ERROR("This task is already used in there:");
306       xbt_backtrace_display((xbt_ex_t*) t_simdata->isused);
307       XBT_ERROR("And you try to reuse it from here:");
308       xbt_backtrace_display_current();
309     } else {
310       xbt_assert(t_simdata->isused == 0,
311                  "This task is still being used somewhere else. You cannot send it now. Go fix your code!"
312                  "(use --cfg=msg/debug-multiple-use:on to get the backtrace of the other process)");
313     }
314   }
315
316   if (msg_global->debug_multiple_use)
317     MSG_BT(t_simdata->isused, "Using Backtrace");
318   else
319     t_simdata->isused = (void*)1;
320   t_simdata->comm = NULL;
321   msg_global->sent_msg++;
322
323   /* Send it by calling SIMIX network layer */
324   smx_synchro_t act = simcall_comm_isend(SIMIX_process_self(), mailbox, t_simdata->bytes_amount, t_simdata->rate,
325                                          task, sizeof(void *), match_fun, cleanup, NULL, match_data,detached);
326   t_simdata->comm = static_cast<simgrid::simix::Comm*>(act); /* FIXME: is the field t_simdata->comm still useful? */
327
328   msg_comm_t comm;
329   if (detached) {
330     comm = NULL;
331   } else {
332     comm = xbt_new0(s_msg_comm_t, 1);
333     comm->task_sent = task;
334     comm->task_received = NULL;
335     comm->status = MSG_OK;
336     comm->s_comm = act;
337   }
338
339   if (TRACE_is_enabled())
340     simcall_set_category(act, task->category);
341   if (call_end)
342     TRACE_msg_task_put_end();
343
344   return comm;
345 }
346
347 /** \ingroup msg_task_usage
348  * \brief Sends a task on a mailbox.
349  *
350  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test() to end the communication.
351  *
352  * \param task a #msg_task_t to send on another location.
353  * \param alias name of the mailbox to sent the task to
354  * \return the msg_comm_t communication created
355  */
356 msg_comm_t MSG_task_isend(msg_task_t task, const char *alias)
357 {
358   return MSG_task_isend_internal(task, alias, NULL, NULL, NULL, 0);
359 }
360
361 /** \ingroup msg_task_usage
362  * \brief Sends a task on a mailbox with a maximum rate
363  *
364  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test() to end the communication. The maxrate
365  * parameter allows the application to limit the bandwidth utilization of network links when sending the task.
366  *
367  * \param task a #msg_task_t to send on another location.
368  * \param alias name of the mailbox to sent the task to
369  * \param maxrate the maximum communication rate for sending this task .
370  * \return the msg_comm_t communication created
371  */
372 msg_comm_t MSG_task_isend_bounded(msg_task_t task, const char *alias, double maxrate)
373 {
374   task->simdata->rate = maxrate;
375   return MSG_task_isend_internal(task, alias, NULL, NULL, NULL, 0);
376 }
377
378 /** \ingroup msg_task_usage
379  * \brief Sends a task on a mailbox, with support for matching requests
380  *
381  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test() to end the communication.
382  *
383  * \param task a #msg_task_t to send on another location.
384  * \param alias name of the mailbox to sent the task to
385  * \param match_fun boolean function which parameters are:
386  *        - match_data_provided_here
387  *        - match_data_provided_by_other_side_if_any
388  *        - the_smx_synchro_describing_the_other_side
389  * \param match_data user provided data passed to match_fun
390  * \return the msg_comm_t communication created
391  */
392 msg_comm_t MSG_task_isend_with_matching(msg_task_t task, const char *alias,
393                                         int (*match_fun)(void*, void*, smx_synchro_t), void *match_data)
394 {
395   return MSG_task_isend_internal(task, alias, match_fun, match_data, NULL, 0);
396 }
397
398 /** \ingroup msg_task_usage
399  * \brief Sends a task on a mailbox.
400  *
401  * This is a non blocking detached send function.
402  * Think of it as a best effort send. Keep in mind that the third parameter is only called if the communication fails.
403  * If the communication does work, it is responsibility of the receiver code to free anything related to the task, as
404  * usual. More details on this can be obtained on
405  * <a href="http://lists.gforge.inria.fr/pipermail/simgrid-user/2011-November/002649.html">this thread</a>
406  * in the SimGrid-user mailing list archive.
407  *
408  * \param task a #msg_task_t to send on another location.
409  * \param alias name of the mailbox to sent the task to
410  * \param cleanup a function to destroy the task if the communication fails, e.g. MSG_task_destroy
411  * (if NULL, no function will be called)
412  */
413 void MSG_task_dsend(msg_task_t task, const char *alias, void_f_pvoid_t cleanup)
414 {
415   MSG_task_isend_internal(task, alias, NULL, NULL, cleanup, 1);
416 }
417
418 /** \ingroup msg_task_usage
419  * \brief Sends a task on a mailbox with a maximal rate.
420  *
421  * This is a non blocking detached send function.
422  * Think of it as a best effort send. Keep in mind that the third parameter is only called if the communication fails.
423  * If the communication does work, it is responsibility of the receiver code to free anything related to the task, as
424  * usual. More details on this can be obtained on
425  * <a href="http://lists.gforge.inria.fr/pipermail/simgrid-user/2011-November/002649.html">this thread</a>
426  * in the SimGrid-user mailing list archive.
427  *
428  * \param task a #msg_task_t to send on another location.
429  * \param alias name of the mailbox to sent the task to
430  * \param cleanup a function to destroy the task if the
431  * communication fails, e.g. MSG_task_destroy
432  * (if NULL, no function will be called)
433  * \param maxrate the maximum communication rate for sending this task
434  *
435  */
436 void MSG_task_dsend_bounded(msg_task_t task, const char *alias, void_f_pvoid_t cleanup, double maxrate)
437 {
438   task->simdata->rate = maxrate;
439   MSG_task_dsend(task, alias, cleanup);
440 }
441
442 /** \ingroup msg_task_usage
443  * \brief Starts listening for receiving a task from an asynchronous communication.
444  *
445  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test() to end the communication.
446  *
447  * \param task a memory location for storing a #msg_task_t. has to be valid until the end of the communication.
448  * \param name of the mailbox to receive the task on
449  * \return the msg_comm_t communication created
450  */
451 msg_comm_t MSG_task_irecv(msg_task_t *task, const char *name)
452 {
453   return MSG_task_irecv_bounded(task, name, -1.0);
454 }
455
456 /** \ingroup msg_task_usage
457  * \brief Starts listening for receiving a task from an asynchronous communication at a given rate.
458  *
459  * \param task a memory location for storing a #msg_task_t. has to be valid until the end of the communication.
460  * \param name of the mailbox to receive the task on
461  * \param rate limit the bandwidth to the given rate
462  * \return the msg_comm_t communication created
463  */
464 msg_comm_t MSG_task_irecv_bounded(msg_task_t *task, const char *name, double rate)
465 {
466   smx_mailbox_t rdv = MSG_mailbox_get_by_alias(name);
467
468   /* FIXME: these functions are not traceable */
469   /* Sanity check */
470   xbt_assert(task, "Null pointer for the task storage");
471
472   if (*task)
473     XBT_CRITICAL("MSG_task_irecv() was asked to write in a non empty task struct.");
474
475   /* Try to receive it by calling SIMIX network layer */
476   msg_comm_t comm = xbt_new0(s_msg_comm_t, 1);
477   comm->task_sent = NULL;
478   comm->task_received = task;
479   comm->status = MSG_OK;
480   comm->s_comm = simcall_comm_irecv(MSG_process_self(), rdv, task, NULL, NULL, NULL, NULL, rate);
481
482   return comm;
483 }
484
485 /** \ingroup msg_task_usage
486  * \brief Checks whether a communication is done, and if yes, finalizes it.
487  * \param comm the communication to test
488  * \return TRUE if the communication is finished
489  * (but it may have failed, use MSG_comm_get_status() to know its status)
490  * or FALSE if the communication is not finished yet
491  * If the status is FALSE, don't forget to use MSG_process_sleep() after the test.
492  */
493 int MSG_comm_test(msg_comm_t comm)
494 {
495   xbt_ex_t e;
496   int finished = 0;
497
498   TRY {
499     finished = simcall_comm_test(comm->s_comm);
500
501     if (finished && comm->task_received != NULL) {
502       /* I am the receiver */
503       if (msg_global->debug_multiple_use && (*comm->task_received)->simdata->isused!=0)
504         xbt_ex_free(*(xbt_ex_t*)(*comm->task_received)->simdata->isused);
505       (*comm->task_received)->simdata->isused = 0;
506     }
507   }
508   CATCH(e) {
509     switch (e.category) {
510       case network_error:
511         comm->status = MSG_TRANSFER_FAILURE;
512         finished = 1;
513         break;
514       case timeout_error:
515         comm->status = MSG_TIMEOUT;
516         finished = 1;
517         break;
518       default:
519         RETHROW;
520     }
521     xbt_ex_free(e);
522   }
523
524   return finished;
525 }
526
527 /** \ingroup msg_task_usage
528  * \brief This function checks if a communication is finished.
529  * \param comms a vector of communications
530  * \return the position of the finished communication if any
531  * (but it may have failed, use MSG_comm_get_status() to know its status),
532  * or -1 if none is finished
533  */
534 int MSG_comm_testany(xbt_dynar_t comms)
535 {
536   xbt_ex_t e;
537   int finished_index = -1;
538
539   /* create the equivalent dynar with SIMIX objects */
540   xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_synchro_t), NULL);
541   msg_comm_t comm;
542   unsigned int cursor;
543   xbt_dynar_foreach(comms, cursor, comm) {
544     xbt_dynar_push(s_comms, &comm->s_comm);
545   }
546
547   msg_error_t status = MSG_OK;
548   TRY {
549     finished_index = simcall_comm_testany(s_comms);
550   }
551   CATCH(e) {
552     switch (e.category) {
553       case network_error:
554         finished_index = e.value;
555         status = MSG_TRANSFER_FAILURE;
556         break;
557       case timeout_error:
558         finished_index = e.value;
559         status = MSG_TIMEOUT;
560         break;
561       default:
562         RETHROW;
563     }
564     xbt_ex_free(e);
565   }
566   xbt_dynar_free(&s_comms);
567
568   if (finished_index != -1) {
569     comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
570     /* the communication is finished */
571     comm->status = status;
572
573     if (status == MSG_OK && comm->task_received != NULL) {
574       /* I am the receiver */
575       if (msg_global->debug_multiple_use && (*comm->task_received)->simdata->isused!=0)
576         xbt_ex_free(*(xbt_ex_t*)(*comm->task_received)->simdata->isused);
577       (*comm->task_received)->simdata->isused = 0;
578     }
579   }
580
581   return finished_index;
582 }
583
584 /** \ingroup msg_task_usage
585  * \brief Destroys a communication.
586  * \param comm the communication to destroy.
587  */
588 void MSG_comm_destroy(msg_comm_t comm)
589 {
590   xbt_free(comm);
591 }
592
593 /** \ingroup msg_task_usage
594  * \brief Wait for the completion of a communication.
595  *
596  * It takes two parameters.
597  * \param comm the communication to wait.
598  * \param timeout Wait until the communication terminates or the timeout occurs.
599  *                You can provide a -1 timeout to obtain an infinite timeout.
600  * \return msg_error_t
601  */
602 msg_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
603 {
604   xbt_ex_t e;
605   TRY {
606     simcall_comm_wait(comm->s_comm, timeout);
607
608     if (comm->task_received != NULL) {
609       /* I am the receiver */
610       if (msg_global->debug_multiple_use && (*comm->task_received)->simdata->isused!=0)
611         xbt_ex_free(*(xbt_ex_t*)(*comm->task_received)->simdata->isused);
612       (*comm->task_received)->simdata->isused = 0;
613     }
614
615     /* FIXME: these functions are not traceable */
616   }
617   CATCH(e) {
618     switch (e.category) {
619     case network_error:
620       comm->status = MSG_TRANSFER_FAILURE;
621       break;
622     case timeout_error:
623       comm->status = MSG_TIMEOUT;
624       break;
625     default:
626       RETHROW;
627     }
628     xbt_ex_free(e);
629   }
630
631   return comm->status;
632 }
633
634 /** \ingroup msg_task_usage
635 * \brief This function is called by a sender and permit to wait for each communication
636 *
637 * \param comm a vector of communication
638 * \param nb_elem is the size of the comm vector
639 * \param timeout for each call of MSG_comm_wait
640 */
641 void MSG_comm_waitall(msg_comm_t * comm, int nb_elem, double timeout)
642 {
643   int i = 0;
644   for (i = 0; i < nb_elem; i++) {
645     MSG_comm_wait(comm[i], timeout);
646   }
647 }
648
649 /** \ingroup msg_task_usage
650  * \brief This function waits for the first communication finished in a list.
651  * \param comms a vector of communications
652  * \return the position of the first finished communication
653  * (but it may have failed, use MSG_comm_get_status() to know its status)
654  */
655 int MSG_comm_waitany(xbt_dynar_t comms)
656 {
657   xbt_ex_t e;
658   int finished_index = -1;
659
660   /* create the equivalent dynar with SIMIX objects */
661   xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_synchro_t), NULL);
662   msg_comm_t comm;
663   unsigned int cursor;
664   xbt_dynar_foreach(comms, cursor, comm) {
665     xbt_dynar_push(s_comms, &comm->s_comm);
666   }
667
668   msg_error_t status = MSG_OK;
669   TRY {
670     finished_index = simcall_comm_waitany(s_comms);
671   }
672   CATCH(e) {
673     switch (e.category) {
674       case network_error:
675         finished_index = e.value;
676         status = MSG_TRANSFER_FAILURE;
677         break;
678       case timeout_error:
679         finished_index = e.value;
680         status = MSG_TIMEOUT;
681         break;
682       default:
683         RETHROW;
684     }
685     xbt_ex_free(e);
686   }
687
688   xbt_assert(finished_index != -1, "WaitAny returned -1");
689   xbt_dynar_free(&s_comms);
690
691   comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
692   /* the communication is finished */
693   comm->status = status;
694
695   if (comm->task_received != NULL) {
696     /* I am the receiver */
697     if (msg_global->debug_multiple_use && (*comm->task_received)->simdata->isused!=0)
698       xbt_ex_free(*(xbt_ex_t*)(*comm->task_received)->simdata->isused);
699     (*comm->task_received)->simdata->isused = 0;
700   }
701
702   return finished_index;
703 }
704
705 /**
706  * \ingroup msg_task_usage
707  * \brief Returns the error (if any) that occurred during a finished communication.
708  * \param comm a finished communication
709  * \return the status of the communication, or #MSG_OK if no error occurred
710  * during the communication
711  */
712 msg_error_t MSG_comm_get_status(msg_comm_t comm) {
713
714   return comm->status;
715 }
716
717 /** \ingroup msg_task_usage
718  * \brief Get a task (#msg_task_t) from a communication
719  *
720  * \param comm the communication where to get the task
721  * \return the task from the communication
722  */
723 msg_task_t MSG_comm_get_task(msg_comm_t comm)
724 {
725   xbt_assert(comm, "Invalid parameter");
726
727   return comm->task_received ? *comm->task_received : comm->task_sent;
728 }
729
730 /**
731  * \brief This function is called by SIMIX in kernel mode to copy the data of a comm.
732  * \param comm the comm
733  * \param buff the data copied
734  * \param buff_size size of the buffer
735  */
736 void MSG_comm_copy_data_from_SIMIX(smx_synchro_t comm, void* buff, size_t buff_size) {
737   // copy the task
738   SIMIX_comm_copy_pointer_callback(comm, buff, buff_size);
739
740   // notify the user callback if any
741   if (msg_global->task_copy_callback) {
742     msg_task_t task = (msg_task_t) buff;
743     msg_global->task_copy_callback(task, simcall_comm_get_src_proc(comm), simcall_comm_get_dst_proc(comm));
744   }
745 }
746
747 /** \ingroup msg_task_usage
748  * \brief Sends a task to a mailbox
749  *
750  * This is a blocking function, the execution flow will be blocked until the task is sent (and received on the other
751  * side if #MSG_task_receive is used).
752  * See #MSG_task_isend for sending tasks asynchronously.
753  *
754  * \param task the task to be sent
755  * \param alias the mailbox name to where the task is sent
756  *
757  * \return Returns #MSG_OK if the task was successfully sent,
758  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
759  */
760 msg_error_t MSG_task_send(msg_task_t task, const char *alias)
761 {
762   XBT_DEBUG("MSG_task_send: Trying to send a message on mailbox '%s'", alias);
763   return MSG_task_send_with_timeout(task, alias, -1);
764 }
765
766 /** \ingroup msg_task_usage
767  * \brief Sends a task to a mailbox with a maximum rate
768  *
769  * This is a blocking function, the execution flow will be blocked until the task is sent. The maxrate parameter allows
770  * the application to limit the bandwidth utilization of network links when sending the task.
771  *
772  * \param task the task to be sent
773  * \param alias the mailbox name to where the task is sent
774  * \param maxrate the maximum communication rate for sending this task
775  *
776  * \return Returns #MSG_OK if the task was successfully sent,
777  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
778  */
779 msg_error_t MSG_task_send_bounded(msg_task_t task, const char *alias, double maxrate)
780 {
781   task->simdata->rate = maxrate;
782   return MSG_task_send(task, alias);
783 }
784
785 /** \ingroup msg_task_usage
786  * \brief Sends a task to a mailbox with a timeout
787  *
788  * This is a blocking function, the execution flow will be blocked until the task is sent or the timeout is achieved.
789  *
790  * \param task the task to be sent
791  * \param alias the mailbox name to where the task is sent
792  * \param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_send)
793  *
794  * \return Returns #MSG_OK if the task was successfully sent,
795  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
796  */
797 msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, double timeout)
798 {
799   msg_error_t ret = MSG_OK;
800   simdata_task_t t_simdata = NULL;
801   msg_process_t process = MSG_process_self();
802   simdata_process_t p_simdata = (simdata_process_t) SIMIX_process_self_get_data();
803   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
804
805   int call_end = TRACE_msg_task_put_start(task);    //must be after CHECK_HOST()
806
807   /* Prepare the task to send */
808   t_simdata = task->simdata;
809   t_simdata->sender = process;
810   t_simdata->source = ((simdata_process_t) SIMIX_process_self_get_data())->m_host;
811
812   if (t_simdata->isused != 0) {
813     if (msg_global->debug_multiple_use){
814       XBT_ERROR("This task is already used in there:");
815       xbt_backtrace_display((xbt_ex_t*) t_simdata->isused);
816       XBT_ERROR("And you try to reuse it from here:");
817       xbt_backtrace_display_current();
818     } else {
819       xbt_assert(t_simdata->isused == 0,
820                  "This task is still being used somewhere else. You cannot send it now. Go fix your code!"
821                  " (use --cfg=msg/debug-multiple-use:on to get the backtrace of the other process)");
822     }
823   }
824
825   if (msg_global->debug_multiple_use)
826     MSG_BT(t_simdata->isused, "Using Backtrace");
827   else
828     t_simdata->isused = (void*)1;
829   t_simdata->comm = NULL;
830   msg_global->sent_msg++;
831
832   p_simdata->waiting_task = task;
833
834   xbt_ex_t e;
835   /* Try to send it by calling SIMIX network layer */
836   TRY {
837     smx_synchro_t comm = NULL; /* MC needs the comm to be set to NULL during the simix call  */
838     comm = simcall_comm_isend(SIMIX_process_self(), mailbox,t_simdata->bytes_amount,
839                               t_simdata->rate, task, sizeof(void *), NULL, NULL, NULL, task, 0);
840     if (TRACE_is_enabled())
841       simcall_set_category(comm, task->category);
842      t_simdata->comm = static_cast<simgrid::simix::Comm*>(comm);
843      simcall_comm_wait(comm, timeout);
844   }
845
846   CATCH(e) {
847     switch (e.category) {
848     case cancel_error:
849       ret = MSG_HOST_FAILURE;
850       break;
851     case network_error:
852       ret = MSG_TRANSFER_FAILURE;
853       break;
854     case timeout_error:
855       ret = MSG_TIMEOUT;
856       break;
857     default:
858       RETHROW;
859     }
860     xbt_ex_free(e);
861
862     /* If the send failed, it is not used anymore */
863     if (msg_global->debug_multiple_use && t_simdata->isused!=0)
864       xbt_ex_free(*(xbt_ex_t*)t_simdata->isused);
865     t_simdata->isused = 0;
866   }
867
868   p_simdata->waiting_task = NULL;
869   if (call_end)
870     TRACE_msg_task_put_end();
871   MSG_RETURN(ret);
872 }
873
874 /** \ingroup msg_task_usage
875  * \brief Sends a task to a mailbox with a timeout and with a maximum rate
876  *
877  * This is a blocking function, the execution flow will be blocked until the task is sent or the timeout is achieved.
878  *
879  * \param task the task to be sent
880  * \param alias the mailbox name to where the task is sent
881  * \param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_send)
882  * \param maxrate the maximum communication rate for sending this task
883  *
884  * \return Returns #MSG_OK if the task was successfully sent,
885  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
886  */
887 msg_error_t MSG_task_send_with_timeout_bounded(msg_task_t task, const char *alias, double timeout, double maxrate)
888 {
889   task->simdata->rate = maxrate;
890   return MSG_task_send_with_timeout(task, alias, timeout);
891 }
892
893 /** \ingroup msg_task_usage
894  * \brief Check if there is a communication going on in a mailbox.
895  *
896  * \param alias the name of the mailbox to be considered
897  *
898  * \return Returns 1 if there is a communication, 0 otherwise
899  */
900 int MSG_task_listen(const char *alias)
901 {
902   smx_mailbox_t mbox = MSG_mailbox_get_by_alias(alias);
903   return !MSG_mailbox_is_empty(mbox) || (mbox->permanent_receiver && !mbox->done_comm_queue->empty());
904 }
905
906 /** \ingroup msg_task_usage
907  * \brief Look if there is a communication on a mailbox and return the PID of the sender process.
908  *
909  * \param alias the name of the mailbox to be considered
910  *
911  * \return Returns the PID of sender process,
912  * -1 if there is no communication in the mailbox.
913  */
914 int MSG_task_listen_from(const char *alias)
915 {
916   msg_task_t task;
917
918   if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
919     return -1;
920
921   return MSG_process_get_PID(task->simdata->sender);
922 }
923
924 /** \ingroup msg_task_usage
925  * \brief Sets the tracing category of a task.
926  *
927  * This function should be called after the creation of a MSG task, to define the category of that task. The
928  * first parameter task must contain a task that was  created with the function #MSG_task_create. The second
929  * parameter category must contain a category that was previously declared with the function #TRACE_category
930  * (or with #TRACE_category_with_color).
931  *
932  * See \ref tracing for details on how to trace the (categorized) resource utilization.
933  *
934  * \param task the task that is going to be categorized
935  * \param category the name of the category to be associated to the task
936  *
937  * \see MSG_task_get_category, TRACE_category, TRACE_category_with_color
938  */
939 void MSG_task_set_category (msg_task_t task, const char *category)
940 {
941   TRACE_msg_set_task_category (task, category);
942 }
943
944 /** \ingroup msg_task_usage
945  *
946  * \brief Gets the current tracing category of a task.
947  *
948  * \param task the task to be considered
949  *
950  * \see MSG_task_set_category
951  *
952  * \return Returns the name of the tracing category of the given task, NULL otherwise
953  */
954 const char *MSG_task_get_category (msg_task_t task)
955 {
956   return task->category;
957 }
958
959 /**
960  * \brief Returns the value of a given AS or router property
961  *
962  * \param asr the name of a router or AS
963  * \param name a property name
964  * \return value of a property (or NULL if property not set)
965  */
966 const char *MSG_as_router_get_property_value(const char* asr, const char *name)
967 {
968   return (char*) xbt_dict_get_or_null(MSG_as_router_get_properties(asr), name);
969 }
970
971 /**
972  * \brief Returns a xbt_dict_t consisting of the list of properties assigned to
973  * a the AS or router
974  *
975  * \param asr the name of a router or AS
976  * \return a dict containing the properties
977  */
978 xbt_dict_t MSG_as_router_get_properties(const char* asr)
979 {
980   return (simcall_asr_get_properties(asr));
981 }
982
983 /**
984  * \brief Change the value of a given AS or router
985  *
986  * \param asr the name of a router or AS
987  * \param name a property name
988  * \param value what to change the property to
989  * \param free_ctn the freeing function to use to kill the value on need
990  */
991 void MSG_as_router_set_property_value(const char* asr, const char *name, char *value,void_f_pvoid_t free_ctn) {
992   xbt_dict_set(MSG_as_router_get_properties(asr), name, value,free_ctn);
993 }