Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
48d2b824c1a2ccc606aaceceea9fd41fe132837e
[simgrid.git] / src / msg / msg_mailbox.c
1 #include "mailbox.h"
2 #include "msg/private.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_mailbox, msg,
5                                 "Logging specific to MSG (mailbox)");
6
7 static xbt_dict_t msg_mailboxes = NULL;
8
9 void MSG_mailbox_mod_init(void)
10 {
11   msg_mailboxes = xbt_dict_new();
12 }
13
14 void MSG_mailbox_mod_exit(void)
15 {
16   xbt_dict_free(&msg_mailboxes);
17 }
18
19 msg_mailbox_t MSG_mailbox_create(const char *alias)
20 {
21   msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t, 1);
22
23   mailbox->tasks = xbt_fifo_new();
24   mailbox->cond = NULL;
25   mailbox->alias = alias ? xbt_strdup(alias) : NULL;
26   mailbox->hostname = NULL;
27   mailbox->rdv = SIMIX_rdv_create(alias);
28   
29   return mailbox;
30 }
31
32 msg_mailbox_t MSG_mailbox_new(const char *alias)
33 {
34   msg_mailbox_t mailbox = MSG_mailbox_create(alias);
35
36   /* add the mbox in the dictionary */
37   xbt_dict_set(msg_mailboxes, alias, mailbox, MSG_mailbox_free);
38
39   return mailbox;
40 }
41
42 void MSG_mailbox_free(void *mailbox)
43 {
44   msg_mailbox_t _mailbox = (msg_mailbox_t) mailbox;
45
46   if (_mailbox->hostname)
47     free(_mailbox->hostname);
48
49   xbt_fifo_free(_mailbox->tasks);
50   free(_mailbox->alias);
51   SIMIX_rdv_destroy(_mailbox->rdv);
52   
53   free(_mailbox);
54 }
55
56 smx_cond_t MSG_mailbox_get_cond(msg_mailbox_t mailbox)
57 {
58   return mailbox->cond;
59 }
60
61 void MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task)
62 {
63   xbt_fifo_remove(mailbox->tasks, task);
64 }
65
66 int MSG_mailbox_is_empty(msg_mailbox_t mailbox)
67 {
68   return (NULL == xbt_fifo_get_first_item(mailbox->tasks));
69 }
70
71 m_task_t MSG_mailbox_pop_head(msg_mailbox_t mailbox)
72 {
73   return (m_task_t) xbt_fifo_shift(mailbox->tasks);
74 }
75
76 m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox)
77 {
78   xbt_fifo_item_t item;
79
80   if (!(item = xbt_fifo_get_first_item(mailbox->tasks)))
81     return NULL;
82
83   return (m_task_t) xbt_fifo_get_item_content(item);
84 }
85
86
87 m_task_t MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host)
88 {
89   m_task_t task = NULL;
90   xbt_fifo_item_t item = NULL;
91
92   xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t)
93     if (task->simdata->source == host) {
94     xbt_fifo_remove_item(mailbox->tasks, item);
95     return task;
96   }
97
98   return NULL;
99 }
100
101 int
102 MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)
103 {
104   m_task_t task = NULL;
105   xbt_fifo_item_t item = NULL;
106   int count = 0;
107
108   xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) {
109     if (task->simdata->source == host)
110       count++;
111   }
112
113   return count;
114 }
115
116 void MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)
117 {
118   mailbox->cond = cond;
119 }
120
121 const char *MSG_mailbox_get_alias(msg_mailbox_t mailbox)
122 {
123   return mailbox->alias;
124 }
125
126 const char *MSG_mailbox_get_hostname(msg_mailbox_t mailbox)
127 {
128   return mailbox->hostname;
129 }
130
131 void MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char *hostname)
132 {
133   mailbox->hostname = xbt_strdup(hostname);
134 }
135
136 msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias)
137 {
138
139   msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes, alias);
140
141   if (!mailbox) {
142     mailbox = MSG_mailbox_new(alias);
143     MSG_mailbox_set_hostname(mailbox, MSG_host_self()->name);
144   }
145
146   return mailbox;
147 }
148
149 msg_mailbox_t MSG_mailbox_get_by_channel(m_host_t host, m_channel_t channel)
150 {
151   xbt_assert0((host != NULL), "Invalid host");
152   xbt_assert1((channel >= 0)
153               && (channel < msg_global->max_channel), "Invalid channel %d",
154               channel);
155
156   return host->simdata->mailboxes[(size_t) channel];
157 }
158
159 MSG_error_t
160 MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task,
161                          m_host_t host, double timeout)
162 {
163   xbt_ex_t e;
164   MSG_error_t ret;
165   smx_host_t smx_host;
166   size_t task_size = sizeof(void*);
167   CHECK_HOST();
168
169   /* Sanity check */
170   xbt_assert0(task, "Null pointer for the task storage");
171
172   if (*task)
173     CRITICAL0
174       ("MSG_task_get() was asked to write in a non empty task struct.");
175
176   smx_host = host ? host->simdata->smx_host : NULL;
177   
178   TRY{
179     SIMIX_network_recv(mailbox->rdv, timeout, task, &task_size, 
180                        comm_filter_get, smx_host);
181   }
182   CATCH(e){
183     switch(e.category){
184       case host_error:
185         ret = MSG_HOST_FAILURE;
186         break;
187       case network_error:
188         ret = MSG_TRANSFER_FAILURE;
189         break;
190       case timeout_error:
191         ret = MSG_TRANSFER_FAILURE;
192         break;      
193       default:
194         ret = MSG_OK;
195         RETHROW;
196         break;
197         /*xbt_die("Unhandled SIMIX network exception");*/
198     }
199     xbt_ex_free(e);
200     MSG_RETURN(ret);        
201   }
202  
203   MSG_RETURN (MSG_OK);
204 }
205
206 MSG_error_t
207 MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
208                              double timeout)
209 {
210   xbt_ex_t e;
211   MSG_error_t ret;
212   m_process_t process = MSG_process_self();
213   const char *hostname;
214   simdata_task_t t_simdata = NULL;
215   m_host_t local_host = NULL;
216   m_host_t remote_host = NULL;
217
218   CHECK_HOST();
219
220   t_simdata = task->simdata;
221   t_simdata->sender = process;
222   t_simdata->source = MSG_process_get_host(process);
223
224   xbt_assert0(t_simdata->refcount == 1,
225               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
226
227   t_simdata->comm = NULL;
228
229   /*t_simdata->refcount++;*/
230   local_host = ((simdata_process_t) process->simdata)->m_host;
231   msg_global->sent_msg++;
232
233   /* get the host name containing the mailbox */
234   hostname = MSG_mailbox_get_hostname(mailbox);
235
236   remote_host = MSG_get_host_by_name(hostname);
237
238   if (!remote_host)
239     THROW1(not_found_error, 0, "Host %s not fount", hostname);
240
241   DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel %s",
242          t_simdata->message_size / 1000, local_host->name,
243          remote_host->name, MSG_mailbox_get_alias(mailbox));
244
245   TRY{
246     SIMIX_network_send(mailbox->rdv, t_simdata->message_size, t_simdata->rate,
247                        timeout, &task, sizeof(void *), comm_filter_put, NULL);
248   }
249
250   CATCH(e){
251     switch(e.category){
252       case host_error:
253         ret = MSG_HOST_FAILURE;
254         break;
255       case network_error:
256         ret = MSG_TRANSFER_FAILURE;
257         break;
258       case timeout_error:
259         ret = MSG_TRANSFER_FAILURE;
260         break;      
261       default:
262         ret = MSG_OK;
263         RETHROW;
264         break;
265         /*xbt_die("Unhandled SIMIX network exception");*/
266     }
267     xbt_ex_free(e);
268     MSG_RETURN(ret);        
269   }
270
271  /* t_simdata->refcount--;*/
272   MSG_RETURN (MSG_OK);
273 }