Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Plug memleak.
[simgrid.git] / src / msg / msg_mailbox.c
1 #include "mailbox.h"\r
2 #include "msg/private.h"\r
3 \r
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_mailbox, msg,\r
5                                 "Logging specific to MSG (mailbox)");\r
6 \r
7 static xbt_dict_t \r
8 msg_mailboxes = NULL;\r
9 \r
10 void\r
11 MSG_mailbox_mod_init(void)\r
12 {\r
13         msg_mailboxes = xbt_dict_new(); \r
14 }\r
15 \r
16 void\r
17 MSG_mailbox_mod_exit(void)\r
18 {\r
19         xbt_dict_free(&msg_mailboxes);\r
20 }\r
21 \r
22 msg_mailbox_t\r
23 MSG_mailbox_create(const char *alias)\r
24 {\r
25         msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t,1);\r
26         \r
27         mailbox->tasks = xbt_fifo_new();\r
28         mailbox->cond = NULL;\r
29         mailbox->alias = alias ? xbt_strdup(alias) : NULL;\r
30         mailbox->hostname = NULL;\r
31         \r
32         return mailbox;\r
33 }\r
34 \r
35 msg_mailbox_t\r
36 MSG_mailbox_new(const char *alias)\r
37 {\r
38         msg_mailbox_t mailbox = MSG_mailbox_create(alias);\r
39         \r
40         /* add the mbox in the dictionary */\r
41         xbt_dict_set(msg_mailboxes, alias, mailbox, MSG_mailbox_free);\r
42         \r
43         return mailbox;\r
44 }\r
45 \r
46 void\r
47 MSG_mailbox_free(void* mailbox)\r
48 {\r
49         msg_mailbox_t _mailbox = (msg_mailbox_t)mailbox;\r
50 \r
51         if(NULL != (_mailbox->hostname))\r
52                 free(_mailbox->hostname);\r
53 \r
54         xbt_fifo_free(_mailbox->tasks);\r
55         free(_mailbox->alias);\r
56         \r
57         free(_mailbox);\r
58 }\r
59 \r
60 void\r
61 MSG_mailbox_put(msg_mailbox_t mailbox, m_task_t task)\r
62 {\r
63         xbt_fifo_push(mailbox->tasks, task);\r
64 }\r
65 \r
66 smx_cond_t\r
67 MSG_mailbox_get_cond(msg_mailbox_t mailbox)\r
68 {\r
69         return mailbox->cond;\r
70 }\r
71 \r
72 void\r
73 MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task)\r
74 {\r
75         xbt_fifo_remove(mailbox->tasks,task);\r
76 }\r
77 \r
78 int\r
79 MSG_mailbox_is_empty(msg_mailbox_t mailbox)\r
80 {\r
81         return (NULL == xbt_fifo_get_first_item(mailbox->tasks));\r
82 }\r
83 \r
84 m_task_t\r
85 MSG_mailbox_pop_head(msg_mailbox_t mailbox)\r
86 {\r
87         return (m_task_t)xbt_fifo_shift(mailbox->tasks);\r
88 }\r
89 \r
90 m_task_t\r
91 MSG_mailbox_get_head(msg_mailbox_t mailbox)\r
92 {\r
93         xbt_fifo_item_t item;\r
94         \r
95         if(NULL == (item = xbt_fifo_get_first_item(mailbox->tasks)))\r
96                 return NULL;\r
97                 \r
98         return (m_task_t)xbt_fifo_get_item_content(item);\r
99 }\r
100 \r
101 \r
102 m_task_t\r
103 MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host)\r
104 {\r
105  m_task_t task = NULL;\r
106  xbt_fifo_item_t item = NULL;\r
107 \r
108  xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t)\r
109    if (task->simdata->source == host) {\r
110      xbt_fifo_remove_item(mailbox->tasks, item);\r
111      return task;\r
112    }\r
113 \r
114  return NULL;\r
115 }\r
116 \r
117 int\r
118 MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)\r
119 {\r
120         m_task_t task = NULL;\r
121         xbt_fifo_item_t item = NULL;\r
122         int count = 0;\r
123         \r
124         xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) \r
125         {\r
126                 if (task->simdata->source == host)\r
127                         count++;\r
128         }\r
129                 \r
130         return count;\r
131 }\r
132 \r
133 void\r
134 MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)\r
135 {\r
136         mailbox->cond = cond;\r
137 }\r
138 \r
139 const char*\r
140 MSG_mailbox_get_alias(msg_mailbox_t mailbox)\r
141 {\r
142         return mailbox->alias;\r
143 }\r
144 \r
145 const char*\r
146 MSG_mailbox_get_hostname(msg_mailbox_t mailbox)\r
147 {\r
148         return mailbox->hostname;\r
149 }\r
150 \r
151 void\r
152 MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char* hostname)\r
153 {\r
154         mailbox->hostname = xbt_strdup(hostname);\r
155 }\r
156 \r
157 msg_mailbox_t\r
158 MSG_mailbox_get_by_alias(const char* alias)\r
159 {\r
160 \r
161         msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes,alias);\r
162    \r
163         if(!mailbox)\r
164         {\r
165                 mailbox = MSG_mailbox_new(alias);\r
166                 MSG_mailbox_set_hostname(mailbox,MSG_host_self()->name);\r
167         }\r
168         \r
169         return mailbox; \r
170 }\r
171 \r
172 msg_mailbox_t\r
173 MSG_mailbox_get_by_channel(m_host_t host, m_channel_t channel)\r
174 {\r
175          xbt_assert0((host != NULL), "Invalid host");\r
176          xbt_assert1((channel >= 0)&& (channel < msg_global->max_channel), "Invalid channel %d",channel);\r
177 \r
178          return host->simdata->mailboxes[(size_t)channel];\r
179 }\r
180 \r
181 MSG_error_t \r
182 MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t* task, m_host_t host, double timeout)\r
183 {\r
184         m_process_t process = MSG_process_self();\r
185         m_task_t t = NULL;\r
186         m_host_t h = NULL;\r
187         simdata_task_t t_simdata = NULL;\r
188         simdata_host_t h_simdata = NULL;\r
189         int first_time = 1;\r
190         \r
191         smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet\r
192         \r
193         CHECK_HOST();\r
194         \r
195         /* Sanity check */\r
196         xbt_assert0(task, "Null pointer for the task storage");\r
197         \r
198         if (*task)\r
199                 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");\r
200         \r
201         /* Get the task */\r
202         h = MSG_host_self();\r
203         h_simdata = h->simdata;\r
204         \r
205         SIMIX_mutex_lock(h->simdata->mutex);\r
206         \r
207         while (1) \r
208         {\r
209                 /* if the mailbox is empty (has no task */\r
210                 if(!MSG_mailbox_is_empty(mailbox))\r
211                 {\r
212                         if(!host) \r
213                         {\r
214                                 /* pop the head of the mailbox */\r
215                                 t = MSG_mailbox_pop_head(mailbox);\r
216                                 break;\r
217                         } \r
218                         else \r
219                         {\r
220                                 /* get the first task of the host */\r
221                                 if(NULL != (t = MSG_mailbox_get_first_host_task(mailbox,host)))\r
222                                         break;\r
223                         }\r
224                 }\r
225         \r
226                 if(timeout > 0) \r
227                 {\r
228                         if (!first_time) \r
229                         {\r
230                                 SIMIX_mutex_unlock(h->simdata->mutex);\r
231                                 /* set the simix condition of the mailbox to NULL */\r
232                                 MSG_mailbox_set_cond(mailbox, NULL);\r
233                                 SIMIX_cond_destroy(cond);\r
234                                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
235                         }\r
236                 }\r
237                 \r
238                 xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on the channel %s", MSG_mailbox_get_alias(mailbox));\r
239                 \r
240                 cond = SIMIX_cond_init();\r
241                 \r
242                 /* set the condition of the mailbox */\r
243                 MSG_mailbox_set_cond(mailbox, cond);\r
244                 \r
245                 if (timeout > 0)\r
246                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout);\r
247                 else \r
248                         SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex);\r
249                         \r
250         \r
251                 if (SIMIX_host_get_state(h_simdata->smx_host) == 0)\r
252                         MSG_RETURN(MSG_HOST_FAILURE);\r
253         \r
254                 first_time = 0;\r
255         }\r
256         \r
257         SIMIX_mutex_unlock(h->simdata->mutex);\r
258         \r
259         DEBUG1("OK, got a task (%s)", t->name);\r
260         /* clean conditional */\r
261         if (cond) \r
262         {\r
263                 SIMIX_cond_destroy(cond);\r
264                 \r
265                 MSG_mailbox_set_cond(mailbox,NULL);\r
266         }\r
267         \r
268         t_simdata = t->simdata;\r
269         t_simdata->receiver = process;\r
270         *task = t;\r
271         \r
272         SIMIX_mutex_lock(t_simdata->mutex);\r
273         \r
274         /* Transfer */\r
275         /* create SIMIX action to the communication */\r
276         t_simdata->comm = SIMIX_action_communicate(\r
277                                                                                                 t_simdata->sender->simdata->m_host->simdata->smx_host,\r
278                                                                                                 process->simdata->m_host->simdata->smx_host,\r
279                                                                                                 t->name, \r
280                                                                                                 t_simdata->message_size,\r
281                                                                                                 t_simdata->rate\r
282                                                                                         );\r
283         \r
284         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */\r
285         if (MSG_process_is_suspended(t_simdata->sender)) \r
286         {\r
287                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);\r
288                 SIMIX_action_set_priority(t_simdata->comm, 0);\r
289         }\r
290         \r
291         process->simdata->waiting_task = t;\r
292         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);\r
293         \r
294         while (1) \r
295         {\r
296                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);\r
297                 \r
298                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)\r
299                         break;\r
300         }\r
301         \r
302         SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);\r
303                 process->simdata->waiting_task = NULL;\r
304         \r
305         /* the task has already finished and the pointer must be null */\r
306         if (t->simdata->sender) \r
307         {\r
308                 t->simdata->sender->simdata->waiting_task = NULL;\r
309         }\r
310         \r
311         /* for this process, don't need to change in get function */\r
312         t->simdata->receiver = NULL;\r
313         SIMIX_mutex_unlock(t_simdata->mutex);\r
314         \r
315         \r
316         if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) \r
317         {\r
318                 SIMIX_action_destroy(t_simdata->comm);\r
319                 t_simdata->comm = NULL;\r
320                 t_simdata->using--;\r
321                 MSG_RETURN(MSG_OK);\r
322         } \r
323         else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) \r
324         {\r
325                 SIMIX_action_destroy(t_simdata->comm);\r
326                 t_simdata->comm = NULL;\r
327                 t_simdata->using--;\r
328                 MSG_RETURN(MSG_HOST_FAILURE);\r
329         } \r
330         else \r
331         {\r
332                 SIMIX_action_destroy(t_simdata->comm);\r
333                 t_simdata->comm = NULL;\r
334                 t_simdata->using--;\r
335                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
336         }\r
337 }\r
338 \r
339 MSG_error_t \r
340 MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, double timeout)\r
341 {\r
342         m_process_t process = MSG_process_self();\r
343         const char* hostname;\r
344         simdata_task_t task_simdata = NULL;\r
345         m_host_t local_host = NULL;\r
346         m_host_t remote_host = NULL;\r
347         smx_cond_t cond = NULL;\r
348 \r
349         CHECK_HOST();\r
350         \r
351         task_simdata = task->simdata;\r
352         task_simdata->sender = process;\r
353         task_simdata->source = MSG_process_get_host(process);\r
354         \r
355         xbt_assert0(task_simdata->using == 1,"This task is still being used somewhere else. You cannot send it now. Go fix your code!");\r
356         \r
357         task_simdata->comm = NULL;\r
358         \r
359         task_simdata->using++;\r
360         local_host = ((simdata_process_t) process->simdata)->m_host;\r
361         \r
362         /* get the host name containing the mailbox */\r
363         hostname = MSG_mailbox_get_hostname(mailbox);\r
364 \r
365         remote_host = MSG_get_host_by_name(hostname);\r
366 \r
367         if(NULL == remote_host)\r
368                 THROW1(not_found_error,0,"Host %s not fount", hostname);\r
369 \r
370 \r
371         DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",task->simdata->message_size / 1000, local_host->name,remote_host->name, MSG_mailbox_get_alias(mailbox));\r
372         \r
373         SIMIX_mutex_lock(remote_host->simdata->mutex);\r
374 \r
375         /* put the task in the mailbox */\r
376         MSG_mailbox_put(mailbox,task);\r
377         \r
378         if(NULL != (cond = MSG_mailbox_get_cond(mailbox)))\r
379         {\r
380                 DEBUG0("Somebody is listening. Let's wake him up!");\r
381                 SIMIX_cond_signal(cond);\r
382         }\r
383 \r
384         \r
385         \r
386         SIMIX_mutex_unlock(remote_host->simdata->mutex);\r
387         \r
388         SIMIX_mutex_lock(task->simdata->mutex);\r
389 \r
390         process->simdata->waiting_task = task;\r
391         \r
392         if(timeout > 0) \r
393         {\r
394                 xbt_ex_t e;\r
395                 double time;\r
396                 double time_elapsed;\r
397                 time = SIMIX_get_clock();\r
398                 \r
399                 TRY \r
400                 {\r
401                         /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */\r
402                         while (1) \r
403                         {\r
404                                 time_elapsed = SIMIX_get_clock() - time;\r
405                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,timeout - time_elapsed);\r
406                                 \r
407                                 if ((task->simdata->comm != NULL) && (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))\r
408                                         break;\r
409                         }\r
410                 } \r
411                 CATCH(e) \r
412                 {\r
413                         if(e.category==timeout_error) \r
414                         {\r
415                                 xbt_ex_free(e);\r
416                                 /* verify if the timeout happened and the communication didn't started yet */\r
417                                 if (task->simdata->comm == NULL) \r
418                                 {\r
419                                         process->simdata->waiting_task = NULL;\r
420                                         \r
421                                         /* remove the task from the mailbox */\r
422                                         MSG_mailbox_remove(mailbox,task);\r
423                                         \r
424                                         if (task->simdata->receiver) \r
425                                         {\r
426                                                 task->simdata->receiver->simdata->waiting_task = NULL;\r
427                                         }\r
428                                         \r
429                                         task->simdata->sender = NULL;\r
430                                         \r
431                                         SIMIX_mutex_unlock(task->simdata->mutex);\r
432                                         MSG_RETURN(MSG_TRANSFER_FAILURE);\r
433                                 }\r
434                         } \r
435                         else \r
436                         {\r
437                                 RETHROW;\r
438                         }\r
439                 }\r
440         } \r
441         else \r
442         {\r
443                 while (1) \r
444                 {\r
445                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);\r
446                         \r
447                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)\r
448                                 break;\r
449                 }\r
450         }\r
451         \r
452         DEBUG1("Action terminated %s", task->name);\r
453         process->simdata->waiting_task = NULL;\r
454         \r
455         /* the task has already finished and the pointer must be null */\r
456         if (task->simdata->receiver) \r
457         {\r
458                 task->simdata->receiver->simdata->waiting_task = NULL;\r
459         }\r
460         \r
461         task->simdata->sender = NULL;\r
462         SIMIX_mutex_unlock(task->simdata->mutex);\r
463 \r
464         \r
465         if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE)\r
466         {\r
467                 MSG_RETURN(MSG_OK);\r
468         } \r
469         else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) \r
470         {\r
471                 MSG_RETURN(MSG_HOST_FAILURE);\r
472         } \r
473         else \r
474         {\r
475                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
476         }\r
477 }\r