Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
d3444a2af7e5b13fa4ee79679813a5f69fc860e7
[simgrid.git] / src / msg / msg_mailbox.c
1 #include "mailbox.h"
2 #include "msg/private.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_mailbox, msg,
5                                 "Logging specific to MSG (mailbox)");
6
7 static xbt_dict_t 
8 msg_mailboxes = NULL;
9
10 void
11 MSG_mailbox_mod_init(void)
12 {
13         msg_mailboxes = xbt_dict_new(); 
14 }
15
16 void
17 MSG_mailbox_mod_exit(void)
18 {
19         xbt_dict_free(&msg_mailboxes);
20 }
21
22 msg_mailbox_t
23 MSG_mailbox_create(const char *alias)
24 {
25         msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t,1);
26         
27         mailbox->tasks = xbt_fifo_new();
28         mailbox->cond = NULL;
29         mailbox->alias = alias ? xbt_strdup(alias) : NULL;
30         mailbox->hostname = NULL;
31         
32         return mailbox;
33 }
34
35 msg_mailbox_t
36 MSG_mailbox_new(const char *alias)
37 {
38         msg_mailbox_t mailbox = MSG_mailbox_create(alias);
39         
40         /* add the mbox in the dictionary */
41         xbt_dict_set(msg_mailboxes, alias, mailbox, MSG_mailbox_free);
42         
43         return mailbox;
44 }
45
46 void
47 MSG_mailbox_free(void* mailbox)
48 {
49         msg_mailbox_t _mailbox = (msg_mailbox_t)mailbox;
50
51         if(NULL != (_mailbox->hostname))
52                 free(_mailbox->hostname);
53
54         xbt_fifo_free(_mailbox->tasks);
55         free(_mailbox->alias);
56         
57         free(_mailbox);
58 }
59
60 void
61 MSG_mailbox_put(msg_mailbox_t mailbox, m_task_t task)
62 {
63         xbt_fifo_push(mailbox->tasks, task);
64 }
65
66 smx_cond_t
67 MSG_mailbox_get_cond(msg_mailbox_t mailbox)
68 {
69         return mailbox->cond;
70 }
71
72 void
73 MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task)
74 {
75         xbt_fifo_remove(mailbox->tasks,task);
76 }
77
78 int
79 MSG_mailbox_is_empty(msg_mailbox_t mailbox)
80 {
81         return (NULL == xbt_fifo_get_first_item(mailbox->tasks));
82 }
83
84 m_task_t
85 MSG_mailbox_pop_head(msg_mailbox_t mailbox)
86 {
87         return (m_task_t)xbt_fifo_shift(mailbox->tasks);
88 }
89
90 m_task_t
91 MSG_mailbox_get_head(msg_mailbox_t mailbox)
92 {
93         xbt_fifo_item_t item;
94         
95         if(NULL == (item = xbt_fifo_get_first_item(mailbox->tasks)))
96                 return NULL;
97                 
98         return (m_task_t)xbt_fifo_get_item_content(item);
99 }
100
101
102 m_task_t
103 MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host)
104 {
105  m_task_t task = NULL;
106  xbt_fifo_item_t item = NULL;
107
108  xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t)
109    if (task->simdata->source == host) {
110      xbt_fifo_remove_item(mailbox->tasks, item);
111      return task;
112    }
113
114  return NULL;
115 }
116
117 int
118 MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)
119 {
120         m_task_t task = NULL;
121         xbt_fifo_item_t item = NULL;
122         int count = 0;
123         
124         xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) 
125         {
126                 if (task->simdata->source == host)
127                         count++;
128         }
129                 
130         return count;
131 }
132
133 void
134 MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)
135 {
136         mailbox->cond = cond;
137 }
138
139 const char*
140 MSG_mailbox_get_alias(msg_mailbox_t mailbox)
141 {
142         return mailbox->alias;
143 }
144
145 const char*
146 MSG_mailbox_get_hostname(msg_mailbox_t mailbox)
147 {
148         return mailbox->hostname;
149 }
150
151 void
152 MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char* hostname)
153 {
154         mailbox->hostname = xbt_strdup(hostname);
155 }
156
157 msg_mailbox_t
158 MSG_mailbox_get_by_alias(const char* alias)
159 {
160
161         msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes,alias);
162    
163         if(!mailbox)
164         {
165                 mailbox = MSG_mailbox_new(alias);
166                 MSG_mailbox_set_hostname(mailbox,MSG_host_self()->name);
167         }
168         
169         return mailbox; 
170 }
171
172 msg_mailbox_t
173 MSG_mailbox_get_by_channel(m_host_t host, m_channel_t channel)
174 {
175          xbt_assert0((host != NULL), "Invalid host");
176          xbt_assert1((channel >= 0)&& (channel < msg_global->max_channel), "Invalid channel %d",channel);
177
178          return host->simdata->mailboxes[(size_t)channel];
179 }
180
181 MSG_error_t 
182 MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t* task, m_host_t host, double timeout)
183 {
184         m_process_t process = MSG_process_self();
185         m_task_t t = NULL;
186         m_host_t h = NULL;
187         simdata_task_t t_simdata = NULL;
188         simdata_host_t h_simdata = NULL;
189         int first_time = 1;
190         
191         smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet
192         
193         CHECK_HOST();
194         
195         /* Sanity check */
196         xbt_assert0(task, "Null pointer for the task storage");
197         
198         if (*task)
199                 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
200         
201         /* Get the task */
202         h = MSG_host_self();
203         h_simdata = h->simdata;
204         
205         SIMIX_mutex_lock(h->simdata->mutex);
206         
207         while (1) 
208         {
209                 /* if the mailbox is empty (has no task */
210                 if(!MSG_mailbox_is_empty(mailbox))
211                 {
212                         if(!host) 
213                         {
214                                 /* pop the head of the mailbox */
215                                 t = MSG_mailbox_pop_head(mailbox);
216                                 break;
217                         } 
218                         else 
219                         {
220                                 /* get the first task of the host */
221                                 if(NULL != (t = MSG_mailbox_get_first_host_task(mailbox,host)))
222                                         break;
223                         }
224                 }
225         
226                 if(timeout > 0) 
227                 {
228                         if (!first_time) 
229                         {
230                                 SIMIX_mutex_unlock(h->simdata->mutex);
231                                 /* set the simix condition of the mailbox to NULL */
232                                 MSG_mailbox_set_cond(mailbox, NULL);
233                                 SIMIX_cond_destroy(cond);
234                                 MSG_RETURN(MSG_TRANSFER_FAILURE);
235                         }
236                 }
237                 
238                 xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on the channel %s", MSG_mailbox_get_alias(mailbox));
239                 
240                 cond = SIMIX_cond_init();
241                 
242                 /* set the condition of the mailbox */
243                 MSG_mailbox_set_cond(mailbox, cond);
244                 
245                 if (timeout > 0)
246                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout);
247                 else 
248                         SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex);
249                         
250         
251                 if (SIMIX_host_get_state(h_simdata->smx_host) == 0)
252                         MSG_RETURN(MSG_HOST_FAILURE);
253         
254                 first_time = 0;
255         }
256         
257         SIMIX_mutex_unlock(h->simdata->mutex);
258         
259         DEBUG1("OK, got a task (%s)", t->name);
260         /* clean conditional */
261         if (cond) 
262         {
263                 SIMIX_cond_destroy(cond);
264                 
265                 MSG_mailbox_set_cond(mailbox,NULL);
266         }
267         
268         t_simdata = t->simdata;
269         t_simdata->receiver = process;
270         *task = t;
271         
272         SIMIX_mutex_lock(t_simdata->mutex);
273         
274         /* Transfer */
275         /* create SIMIX action to the communication */
276         t_simdata->comm = SIMIX_action_communicate(
277                                                                                                 t_simdata->sender->simdata->m_host->simdata->smx_host,
278                                                                                                 process->simdata->m_host->simdata->smx_host,
279                                                                                                 t->name, 
280                                                                                                 t_simdata->message_size,
281                                                                                                 t_simdata->rate
282                                                                                         );
283         
284         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
285         if (MSG_process_is_suspended(t_simdata->sender)) 
286         {
287                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
288                 SIMIX_action_set_priority(t_simdata->comm, 0);
289         }
290         
291         process->simdata->waiting_task = t;
292         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
293         
294         while (1) 
295         {
296                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
297                 
298                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
299                         break;
300         }
301         
302         SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
303                 process->simdata->waiting_task = NULL;
304         
305         /* the task has already finished and the pointer must be null */
306         if (t->simdata->sender) 
307         {
308                 t->simdata->sender->simdata->waiting_task = NULL;
309         }
310         
311         /* for this process, don't need to change in get function */
312         t->simdata->receiver = NULL;
313         SIMIX_mutex_unlock(t_simdata->mutex);
314         
315         
316         if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) 
317         {
318                 SIMIX_action_destroy(t_simdata->comm);
319                 t_simdata->comm = NULL;
320                 t_simdata->using--;
321                 MSG_RETURN(MSG_OK);
322         } 
323         else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) 
324         {
325                 SIMIX_action_destroy(t_simdata->comm);
326                 t_simdata->comm = NULL;
327                 t_simdata->using--;
328                 MSG_RETURN(MSG_HOST_FAILURE);
329         } 
330         else 
331         {
332                 SIMIX_action_destroy(t_simdata->comm);
333                 t_simdata->comm = NULL;
334                 t_simdata->using--;
335                 MSG_RETURN(MSG_TRANSFER_FAILURE);
336         }
337 }
338
339 MSG_error_t 
340 MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, double timeout)
341 {
342         m_process_t process = MSG_process_self();
343         const char* hostname;
344         simdata_task_t task_simdata = NULL;
345         m_host_t local_host = NULL;
346         m_host_t remote_host = NULL;
347         smx_cond_t cond = NULL;
348
349         CHECK_HOST();
350         
351         task_simdata = task->simdata;
352         task_simdata->sender = process;
353         task_simdata->source = MSG_process_get_host(process);
354         
355         xbt_assert0(task_simdata->using == 1,"This task is still being used somewhere else. You cannot send it now. Go fix your code!");
356         
357         task_simdata->comm = NULL;
358         
359         task_simdata->using++;
360         local_host = ((simdata_process_t) process->simdata)->m_host;
361         
362         /* get the host name containing the mailbox */
363         hostname = MSG_mailbox_get_hostname(mailbox);
364
365         remote_host = MSG_get_host_by_name(hostname);
366
367         if(NULL == remote_host)
368                 THROW1(not_found_error,0,"Host %s not fount", hostname);
369
370
371         DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",task->simdata->message_size / 1000, local_host->name,remote_host->name, MSG_mailbox_get_alias(mailbox));
372         
373         SIMIX_mutex_lock(remote_host->simdata->mutex);
374
375         /* put the task in the mailbox */
376         MSG_mailbox_put(mailbox,task);
377         
378         if(NULL != (cond = MSG_mailbox_get_cond(mailbox)))
379         {
380                 DEBUG0("Somebody is listening. Let's wake him up!");
381                 SIMIX_cond_signal(cond);
382         }
383
384         
385         
386         SIMIX_mutex_unlock(remote_host->simdata->mutex);
387         
388         SIMIX_mutex_lock(task->simdata->mutex);
389
390         process->simdata->waiting_task = task;
391         
392         if(timeout > 0) 
393         {
394                 xbt_ex_t e;
395                 double time;
396                 double time_elapsed;
397                 time = SIMIX_get_clock();
398                 
399                 TRY 
400                 {
401                         /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
402                         while (1) 
403                         {
404                                 time_elapsed = SIMIX_get_clock() - time;
405                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,timeout - time_elapsed);
406                                 
407                                 if ((task->simdata->comm != NULL) && (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))
408                                         break;
409                         }
410                 } 
411                 CATCH(e) 
412                 {
413                         if(e.category==timeout_error) 
414                         {
415                                 xbt_ex_free(e);
416                                 /* verify if the timeout happened and the communication didn't started yet */
417                                 if (task->simdata->comm == NULL) 
418                                 {
419                                         process->simdata->waiting_task = NULL;
420                                         
421                                         /* remove the task from the mailbox */
422                                         MSG_mailbox_remove(mailbox,task);
423                                         
424                                         if (task->simdata->receiver) 
425                                         {
426                                                 task->simdata->receiver->simdata->waiting_task = NULL;
427                                         }
428                                         
429                                         task->simdata->sender = NULL;
430                                         
431                                         SIMIX_mutex_unlock(task->simdata->mutex);
432                                         MSG_RETURN(MSG_TRANSFER_FAILURE);
433                                 }
434                         } 
435                         else 
436                         {
437                                 RETHROW;
438                         }
439                 }
440         } 
441         else 
442         {
443                 while (1) 
444                 {
445                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
446                         
447                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)
448                                 break;
449                 }
450         }
451         
452         DEBUG1("Action terminated %s", task->name);
453         process->simdata->waiting_task = NULL;
454         
455         /* the task has already finished and the pointer must be null */
456         if (task->simdata->receiver) 
457         {
458                 task->simdata->receiver->simdata->waiting_task = NULL;
459         }
460         
461         task->simdata->sender = NULL;
462         SIMIX_mutex_unlock(task->simdata->mutex);
463
464         
465         if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE)
466         {
467                 MSG_RETURN(MSG_OK);
468         } 
469         else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) 
470         {
471                 MSG_RETURN(MSG_HOST_FAILURE);
472         } 
473         else 
474         {
475                 MSG_RETURN(MSG_TRANSFER_FAILURE);
476         }
477 }