Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
refactor rdv to mailbox for consistency
[simgrid.git] / src / msg / msg_mailbox.cpp
1 /* Mailboxes in MSG */
2
3 /* Copyright (c) 2008-2015. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "simgrid/msg.h"
10 #include "msg_private.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_mailbox, msg, "Logging specific to MSG (mailbox)");
13
14 msg_mailbox_t MSG_mailbox_new(const char *alias)
15 {
16   return simcall_mbox_create(alias);
17 }
18
19 void MSG_mailbox_free(void *mailbox)
20 {
21   simcall_mbox_destroy((msg_mailbox_t)mailbox);
22 }
23
24 int MSG_mailbox_is_empty(msg_mailbox_t mailbox)
25 {
26   return (NULL == simcall_mbox_get_head(mailbox));
27 }
28
29 msg_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox)
30 {
31   smx_synchro_t comm = simcall_mbox_get_head(mailbox);
32
33   if (!comm)
34     return NULL;
35
36   return (msg_task_t) simcall_comm_get_src_data(comm);
37 }
38
39 int MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, msg_host_t host)
40 {
41   return simcall_mbox_comm_count_by_host(mailbox, host);
42 }
43
44 msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias)
45 {
46   msg_mailbox_t mailbox = simcall_mbox_get_by_name(alias);
47
48   if (!mailbox)
49     mailbox = MSG_mailbox_new(alias);
50
51   return mailbox;
52 }
53
54 /** \ingroup msg_mailbox_management
55  * \brief Set the mailbox to receive in asynchronous mode
56  *
57  * All messages sent to this mailbox will be transferred to the receiver without waiting for the receive call.
58  * The receive call will still be necessary to use the received data.
59  * If there is a need to receive some messages asynchronously, and some not, two different mailboxes should be used.
60  *
61  * \param alias The name of the mailbox
62  */
63 void MSG_mailbox_set_async(const char *alias){
64   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
65
66   simcall_mbox_set_receiver(mailbox, SIMIX_process_self());
67   XBT_VERB("%s mailbox set to receive eagerly for process %p\n",alias, SIMIX_process_self());
68 }
69
70 /** \ingroup msg_mailbox_management
71  * \brief Get a task from a mailbox on a given host
72  *
73  * \param mailbox The mailbox where the task was sent
74  * \param task a memory location for storing a #msg_task_t.
75  * \param host a #msg_host_t host from where the task was sent
76  * \param timeout a timeout
77
78  * \return Returns
79  * #MSG_OK if the task was successfully received,
80  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
81  */
82 msg_error_t MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, msg_task_t *task, msg_host_t host, double timeout)
83 {
84   return MSG_mailbox_get_task_ext_bounded(mailbox, task, host, timeout, -1.0);
85 }
86
87 /** \ingroup msg_mailbox_management
88  * \brief Get a task from a mailbox on a given host at a given rate
89  *
90  * \param mailbox The mailbox where the task was sent
91  * \param task a memory location for storing a #msg_task_t.
92  * \param host a #msg_host_t host from where the task was sent
93  * \param timeout a timeout
94  * \param rate a rate
95
96  * \return Returns
97  * #MSG_OK if the task was successfully received,
98  * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
99  */
100 msg_error_t MSG_mailbox_get_task_ext_bounded(msg_mailbox_t mailbox, msg_task_t * task, msg_host_t host, double timeout,
101                                              double rate)
102 {
103   xbt_ex_t e;
104   msg_error_t ret = MSG_OK;
105   /* We no longer support getting a task from a specific host */
106   if (host)
107     THROW_UNIMPLEMENTED;
108
109   TRACE_msg_task_get_start();
110   double start_time = MSG_get_clock();
111
112   /* Sanity check */
113   xbt_assert(task, "Null pointer for the task storage");
114
115   if (*task)
116     XBT_WARN("Asked to write the received task in a non empty struct -- proceeding.");
117
118   /* Try to receive it by calling SIMIX network layer */
119   TRY {
120     simcall_comm_recv(MSG_process_self(), mailbox, task, NULL, NULL, NULL, NULL, timeout, rate);
121     XBT_DEBUG("Got task %s from %p",(*task)->name,mailbox);
122     if (msg_global->debug_multiple_use && (*task)->simdata->isused!=0)
123       xbt_ex_free(*(xbt_ex_t*)(*task)->simdata->isused);
124     (*task)->simdata->isused = 0;
125   }
126   CATCH(e) {
127     switch (e.category) {
128     case cancel_error:
129       ret = MSG_HOST_FAILURE;
130       break;
131     case network_error:
132       ret = MSG_TRANSFER_FAILURE;
133       break;
134     case timeout_error:
135       ret = MSG_TIMEOUT;
136       break;
137     case host_error:
138       ret = MSG_HOST_FAILURE;
139       break;
140     default:
141       RETHROW;
142     }
143     xbt_ex_free(e);
144   }
145
146   if (ret != MSG_HOST_FAILURE && ret != MSG_TRANSFER_FAILURE && ret != MSG_TIMEOUT) {
147     TRACE_msg_task_get_end(start_time, *task);
148   }
149   MSG_RETURN(ret);
150 }
151
152 msg_error_t MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, msg_task_t task, double timeout)
153 {
154   msg_error_t ret = MSG_OK;
155   simdata_task_t t_simdata = NULL;
156   msg_process_t process = MSG_process_self();
157   simdata_process_t p_simdata = (simdata_process_t) SIMIX_process_self_get_data();
158
159   int call_end = TRACE_msg_task_put_start(task);    //must be after CHECK_HOST()
160
161   /* Prepare the task to send */
162   t_simdata = task->simdata;
163   t_simdata->sender = process;
164   t_simdata->source = ((simdata_process_t) SIMIX_process_self_get_data())->m_host;
165
166   if (t_simdata->isused != 0) {
167     if (msg_global->debug_multiple_use){
168       XBT_ERROR("This task is already used in there:");
169       xbt_backtrace_display((xbt_ex_t*) t_simdata->isused);
170       XBT_ERROR("And you try to reuse it from here:");
171       xbt_backtrace_display_current();
172     } else {
173       xbt_assert(t_simdata->isused == 0,
174                  "This task is still being used somewhere else. You cannot send it now. Go fix your code!"
175                  " (use --cfg=msg/debug_multiple_use:on to get the backtrace of the other process)");
176     }
177   }
178
179   if (msg_global->debug_multiple_use)
180     MSG_BT(t_simdata->isused, "Using Backtrace");
181   else
182     t_simdata->isused = (void*)1;
183   t_simdata->comm = NULL;
184   msg_global->sent_msg++;
185
186   p_simdata->waiting_task = task;
187
188   xbt_ex_t e;
189   /* Try to send it by calling SIMIX network layer */
190   TRY {
191     smx_synchro_t comm = NULL; /* MC needs the comm to be set to NULL during the simix call  */
192     comm = simcall_comm_isend(SIMIX_process_self(), mailbox,t_simdata->bytes_amount,
193                               t_simdata->rate, task, sizeof(void *), NULL, NULL, NULL, task, 0);
194     if (TRACE_is_enabled())
195       simcall_set_category(comm, task->category);
196      t_simdata->comm = comm;
197      simcall_comm_wait(comm, timeout);
198   }
199
200   CATCH(e) {
201     switch (e.category) {
202     case cancel_error:
203       ret = MSG_HOST_FAILURE;
204       break;
205     case network_error:
206       ret = MSG_TRANSFER_FAILURE;
207       break;
208     case timeout_error:
209       ret = MSG_TIMEOUT;
210       break;
211     default:
212       RETHROW;
213     }
214     xbt_ex_free(e);
215
216     /* If the send failed, it is not used anymore */
217     if (msg_global->debug_multiple_use && t_simdata->isused!=0)
218       xbt_ex_free(*(xbt_ex_t*)t_simdata->isused);
219     t_simdata->isused = 0;
220   }
221
222   p_simdata->waiting_task = NULL;
223   if (call_end)
224     TRACE_msg_task_put_end();
225   MSG_RETURN(ret);
226 }