Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
b0ea9b11365320d2e130ce71d57025b8bbb508c2
[simgrid.git] / src / msg / msg_mailbox.c
1 #include "mailbox.h"
2 #include "msg/private.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_mailbox, msg,
5                                 "Logging specific to MSG (mailbox)");
6
7 static xbt_dict_t msg_mailboxes = NULL;
8
9 void MSG_mailbox_mod_init(void)
10 {
11   msg_mailboxes = xbt_dict_new();
12 }
13
14 void MSG_mailbox_mod_exit(void)
15 {
16   xbt_dict_free(&msg_mailboxes);
17 }
18
19 msg_mailbox_t MSG_mailbox_create(const char *alias)
20 {
21   msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t, 1);
22
23   mailbox->tasks = xbt_fifo_new();
24   mailbox->cond = NULL;
25   mailbox->alias = alias ? xbt_strdup(alias) : NULL;
26   mailbox->hostname = NULL;
27   mailbox->rdv = SIMIX_rdv_create(alias);
28   
29   return mailbox;
30 }
31
32 msg_mailbox_t MSG_mailbox_new(const char *alias)
33 {
34   msg_mailbox_t mailbox = MSG_mailbox_create(alias);
35
36   /* add the mbox in the dictionary */
37   xbt_dict_set(msg_mailboxes, alias, mailbox, MSG_mailbox_free);
38
39   return mailbox;
40 }
41
42 void MSG_mailbox_free(void *mailbox)
43 {
44   msg_mailbox_t _mailbox = (msg_mailbox_t) mailbox;
45
46   if (_mailbox->hostname)
47     free(_mailbox->hostname);
48
49   xbt_fifo_free(_mailbox->tasks);
50   free(_mailbox->alias);
51   SIMIX_rdv_destroy(_mailbox->rdv);
52   
53   free(_mailbox);
54 }
55
56 smx_cond_t MSG_mailbox_get_cond(msg_mailbox_t mailbox)
57 {
58   return mailbox->cond;
59 }
60
61 void MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task)
62 {
63   xbt_fifo_remove(mailbox->tasks, task);
64 }
65
66 int MSG_mailbox_is_empty(msg_mailbox_t mailbox)
67 {
68   return (NULL == xbt_fifo_get_first_item(mailbox->tasks));
69 }
70
71 m_task_t MSG_mailbox_pop_head(msg_mailbox_t mailbox)
72 {
73   return (m_task_t) xbt_fifo_shift(mailbox->tasks);
74 }
75
76 m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox)
77 {
78   xbt_fifo_item_t item;
79
80   if (!(item = xbt_fifo_get_first_item(mailbox->tasks)))
81     return NULL;
82
83   return (m_task_t) xbt_fifo_get_item_content(item);
84 }
85
86
87 m_task_t MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host)
88 {
89   m_task_t task = NULL;
90   xbt_fifo_item_t item = NULL;
91
92   xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t)
93     if (task->simdata->source == host) {
94     xbt_fifo_remove_item(mailbox->tasks, item);
95     return task;
96   }
97
98   return NULL;
99 }
100
101 int
102 MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)
103 {
104   m_task_t task = NULL;
105   xbt_fifo_item_t item = NULL;
106   int count = 0;
107
108   xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) {
109     if (task->simdata->source == host)
110       count++;
111   }
112
113   return count;
114 }
115
116 void MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)
117 {
118   mailbox->cond = cond;
119 }
120
121 const char *MSG_mailbox_get_alias(msg_mailbox_t mailbox)
122 {
123   return mailbox->alias;
124 }
125
126 const char *MSG_mailbox_get_hostname(msg_mailbox_t mailbox)
127 {
128   return mailbox->hostname;
129 }
130
131 void MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char *hostname)
132 {
133   mailbox->hostname = xbt_strdup(hostname);
134 }
135
136 msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias)
137 {
138
139   msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes, alias);
140
141   if (!mailbox) {
142     mailbox = MSG_mailbox_new(alias);
143     MSG_mailbox_set_hostname(mailbox, MSG_host_self()->name);
144   }
145
146   return mailbox;
147 }
148
149 msg_mailbox_t MSG_mailbox_get_by_channel(m_host_t host, m_channel_t channel)
150 {
151   xbt_assert0((host != NULL), "Invalid host");
152   xbt_assert1((channel >= 0)
153               && (channel < msg_global->max_channel), "Invalid channel %d",
154               channel);
155
156   return host->simdata->mailboxes[(size_t) channel];
157 }
158
159 MSG_error_t
160 MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task,
161                          m_host_t host, double timeout)
162 {
163   xbt_ex_t e;
164   MSG_error_t ret;
165   smx_host_t smx_host;
166   size_t task_size = sizeof(void*);
167   CHECK_HOST();
168
169   /* Sanity check */
170   xbt_assert0(task, "Null pointer for the task storage");
171
172   if (*task)
173     CRITICAL0
174       ("MSG_task_get() was asked to write in a non empty task struct.");
175
176   smx_host = host ? host->simdata->smx_host : NULL;
177   
178   TRY{
179     SIMIX_network_recv(mailbox->rdv, timeout, task, &task_size);
180   }
181   CATCH(e){
182     switch(e.category){
183       case host_error:
184         ret = MSG_HOST_FAILURE;
185         break;
186       case network_error:
187         ret = MSG_TRANSFER_FAILURE;
188         break;
189       case timeout_error:
190         ret = MSG_TRANSFER_FAILURE;
191         break;      
192       default:
193         ret = MSG_OK;
194         RETHROW;
195         break;
196         /*xbt_die("Unhandled SIMIX network exception");*/
197     }
198     xbt_ex_free(e);
199     MSG_RETURN(ret);        
200   }
201  
202   MSG_RETURN (MSG_OK);
203 }
204
205 MSG_error_t
206 MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
207                              double timeout)
208 {
209   xbt_ex_t e;
210   MSG_error_t ret;
211   m_process_t process = MSG_process_self();
212   const char *hostname;
213   simdata_task_t t_simdata = NULL;
214   m_host_t local_host = NULL;
215   m_host_t remote_host = NULL;
216
217   CHECK_HOST();
218
219   t_simdata = task->simdata;
220   t_simdata->sender = process;
221   t_simdata->source = MSG_process_get_host(process);
222
223   xbt_assert0(t_simdata->refcount == 1,
224               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
225
226   t_simdata->comm = NULL;
227
228   /*t_simdata->refcount++;*/
229   local_host = ((simdata_process_t) process->simdata)->m_host;
230   msg_global->sent_msg++;
231
232   /* get the host name containing the mailbox */
233   hostname = MSG_mailbox_get_hostname(mailbox);
234
235   remote_host = MSG_get_host_by_name(hostname);
236
237   if (!remote_host)
238     THROW1(not_found_error, 0, "Host %s not fount", hostname);
239
240   DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel %s",
241          t_simdata->message_size / 1000, local_host->name,
242          remote_host->name, MSG_mailbox_get_alias(mailbox));
243
244   TRY{
245     SIMIX_network_send(mailbox->rdv, t_simdata->message_size, t_simdata->rate,
246                        timeout, &task, sizeof(void *));
247   }
248
249   CATCH(e){
250     switch(e.category){
251       case host_error:
252         ret = MSG_HOST_FAILURE;
253         break;
254       case network_error:
255         ret = MSG_TRANSFER_FAILURE;
256         break;
257       case timeout_error:
258         ret = MSG_TRANSFER_FAILURE;
259         break;      
260       default:
261         ret = MSG_OK;
262         RETHROW;
263         break;
264         xbt_die("Unhandled SIMIX network exception");
265     }
266     xbt_ex_free(e);
267     MSG_RETURN(ret);        
268   }
269
270   /*t_simdata->refcount--;*/
271   MSG_RETURN (MSG_OK);
272 }