Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
do not try to malloc 0-sized arrays
[simgrid.git] / src / msg / msg_mailbox.c
1 #include "mailbox.h"\r
2 #include "msg/private.h"\r
3 \r
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_mailbox, msg,\r
5                                 "Logging specific to MSG (mailbox)");\r
6 \r
7 static xbt_dict_t \r
8 msg_mailboxes = NULL;\r
9 \r
10 void\r
11 MSG_mailbox_mod_init(void)\r
12 {\r
13         msg_mailboxes = xbt_dict_new(); \r
14 }\r
15 \r
16 void\r
17 MSG_mailbox_mod_exit(void)\r
18 {\r
19         xbt_dict_free(&msg_mailboxes);\r
20 }\r
21 \r
22 msg_mailbox_t\r
23 MSG_mailbox_create(const char *alias)\r
24 {\r
25         msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t,1);\r
26         \r
27         mailbox->tasks = xbt_fifo_new();\r
28         mailbox->cond = NULL;\r
29         mailbox->alias = alias ? xbt_strdup(alias) : NULL;\r
30         mailbox->hostname = NULL;\r
31         \r
32         return mailbox;\r
33 }\r
34 \r
35 msg_mailbox_t\r
36 MSG_mailbox_new(const char *alias)\r
37 {\r
38         msg_mailbox_t mailbox = MSG_mailbox_create(alias);\r
39         \r
40         /* add the mbox in the dictionary */\r
41         xbt_dict_set(msg_mailboxes, alias, mailbox, MSG_mailbox_free);\r
42         \r
43         return mailbox;\r
44 }\r
45 \r
46 void\r
47 MSG_mailbox_free(void* mailbox)\r
48 {\r
49         msg_mailbox_t _mailbox = (msg_mailbox_t)mailbox;\r
50 \r
51         if(NULL != (_mailbox->hostname))\r
52                 free(_mailbox->hostname);\r
53 \r
54         free(_mailbox->alias);\r
55         \r
56         free(_mailbox);\r
57 }\r
58 \r
59 void\r
60 MSG_mailbox_put(msg_mailbox_t mailbox, m_task_t task)\r
61 {\r
62         xbt_fifo_push(mailbox->tasks, task);\r
63 }\r
64 \r
65 smx_cond_t\r
66 MSG_mailbox_get_cond(msg_mailbox_t mailbox)\r
67 {\r
68         return mailbox->cond;\r
69 }\r
70 \r
71 void\r
72 MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task)\r
73 {\r
74         xbt_fifo_remove(mailbox->tasks,task);\r
75 }\r
76 \r
77 int\r
78 MSG_mailbox_is_empty(msg_mailbox_t mailbox)\r
79 {\r
80         return (NULL == xbt_fifo_get_first_item(mailbox->tasks));\r
81 }\r
82 \r
83 m_task_t\r
84 MSG_mailbox_pop_head(msg_mailbox_t mailbox)\r
85 {\r
86         return (m_task_t)xbt_fifo_shift(mailbox->tasks);\r
87 }\r
88 \r
89 m_task_t\r
90 MSG_mailbox_get_head(msg_mailbox_t mailbox)\r
91 {\r
92         xbt_fifo_item_t item;\r
93         \r
94         if(NULL == (item = xbt_fifo_get_first_item(mailbox->tasks)))\r
95                 return NULL;\r
96                 \r
97         return (m_task_t)xbt_fifo_get_item_content(item);\r
98 }\r
99 \r
100 \r
101 m_task_t\r
102 MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host)\r
103 {\r
104  m_task_t task = NULL;\r
105  xbt_fifo_item_t item = NULL;\r
106 \r
107  xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t)\r
108    if (task->simdata->source == host) {\r
109      xbt_fifo_remove_item(mailbox->tasks, item);\r
110      return task;\r
111    }\r
112 \r
113  return NULL;\r
114 }\r
115 \r
116 int\r
117 MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)\r
118 {\r
119         m_task_t task = NULL;\r
120         xbt_fifo_item_t item = NULL;\r
121         int count = 0;\r
122         \r
123         xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) \r
124         {\r
125                 if (task->simdata->source == host)\r
126                         count++;\r
127         }\r
128                 \r
129         return count;\r
130 }\r
131 \r
132 void\r
133 MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)\r
134 {\r
135         mailbox->cond = cond;\r
136 }\r
137 \r
138 const char*\r
139 MSG_mailbox_get_alias(msg_mailbox_t mailbox)\r
140 {\r
141         return mailbox->alias;\r
142 }\r
143 \r
144 const char*\r
145 MSG_mailbox_get_hostname(msg_mailbox_t mailbox)\r
146 {\r
147         return mailbox->hostname;\r
148 }\r
149 \r
150 void\r
151 MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char* hostname)\r
152 {\r
153         mailbox->hostname = xbt_strdup(hostname);\r
154 }\r
155 \r
156 msg_mailbox_t\r
157 MSG_mailbox_get_by_alias(const char* alias)\r
158 {\r
159 \r
160         msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes,alias);\r
161    \r
162         if(!mailbox)\r
163         {\r
164                 mailbox = MSG_mailbox_new(alias);\r
165                 MSG_mailbox_set_hostname(mailbox,MSG_host_self()->name);\r
166         }\r
167         \r
168         return mailbox; \r
169 }\r
170 \r
171 msg_mailbox_t\r
172 MSG_mailbox_get_by_channel(m_host_t host, m_channel_t channel)\r
173 {\r
174          xbt_assert0((host != NULL), "Invalid host");\r
175          xbt_assert1((channel >= 0)&& (channel < msg_global->max_channel), "Invalid channel %d",channel);\r
176 \r
177          return host->simdata->mailboxes[(size_t)channel];\r
178 }\r
179 \r
180 MSG_error_t \r
181 MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t* task, m_host_t host, double timeout)\r
182 {\r
183         m_process_t process = MSG_process_self();\r
184         m_task_t t = NULL;\r
185         m_host_t h = NULL;\r
186         simdata_task_t t_simdata = NULL;\r
187         simdata_host_t h_simdata = NULL;\r
188         int first_time = 1;\r
189         \r
190         smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet\r
191         \r
192         CHECK_HOST();\r
193         \r
194         /* Sanity check */\r
195         xbt_assert0(task, "Null pointer for the task storage");\r
196         \r
197         if (*task)\r
198                 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");\r
199         \r
200         /* Get the task */\r
201         h = MSG_host_self();\r
202         h_simdata = h->simdata;\r
203         \r
204         SIMIX_mutex_lock(h->simdata->mutex);\r
205         \r
206         while (1) \r
207         {\r
208                 /* if the mailbox is empty (has no task */\r
209                 if(!MSG_mailbox_is_empty(mailbox))\r
210                 {\r
211                         if(!host) \r
212                         {\r
213                                 /* pop the head of the mailbox */\r
214                                 t = MSG_mailbox_pop_head(mailbox);\r
215                                 break;\r
216                         } \r
217                         else \r
218                         {\r
219                                 /* get the first task of the host */\r
220                                 if(NULL != (t = MSG_mailbox_get_first_host_task(mailbox,host)))\r
221                                         break;\r
222                         }\r
223                 }\r
224         \r
225                 if(timeout > 0) \r
226                 {\r
227                         if (!first_time) \r
228                         {\r
229                                 SIMIX_mutex_unlock(h->simdata->mutex);\r
230                                 /* set the simix condition of the mailbox to NULL */\r
231                                 MSG_mailbox_set_cond(mailbox, NULL);\r
232                                 SIMIX_cond_destroy(cond);\r
233                                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
234                         }\r
235                 }\r
236                 \r
237                 xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on the channel %s", MSG_mailbox_get_alias(mailbox));\r
238                 \r
239                 cond = SIMIX_cond_init();\r
240                 \r
241                 /* set the condition of the mailbox */\r
242                 MSG_mailbox_set_cond(mailbox, cond);\r
243                 \r
244                 if (timeout > 0)\r
245                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout);\r
246                 else \r
247                         SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex);\r
248                         \r
249         \r
250                 if (SIMIX_host_get_state(h_simdata->smx_host) == 0)\r
251                         MSG_RETURN(MSG_HOST_FAILURE);\r
252         \r
253                 first_time = 0;\r
254         }\r
255         \r
256         SIMIX_mutex_unlock(h->simdata->mutex);\r
257         \r
258         DEBUG1("OK, got a task (%s)", t->name);\r
259         /* clean conditional */\r
260         if (cond) \r
261         {\r
262                 SIMIX_cond_destroy(cond);\r
263                 \r
264                 MSG_mailbox_set_cond(mailbox,NULL);\r
265         }\r
266         \r
267         t_simdata = t->simdata;\r
268         t_simdata->receiver = process;\r
269         *task = t;\r
270         \r
271         SIMIX_mutex_lock(t_simdata->mutex);\r
272         \r
273         /* Transfer */\r
274         /* create SIMIX action to the communication */\r
275         t_simdata->comm = SIMIX_action_communicate(\r
276                                                                                                 t_simdata->sender->simdata->m_host->simdata->smx_host,\r
277                                                                                                 process->simdata->m_host->simdata->smx_host,\r
278                                                                                                 t->name, \r
279                                                                                                 t_simdata->message_size,\r
280                                                                                                 t_simdata->rate\r
281                                                                                         );\r
282         \r
283         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */\r
284         if (MSG_process_is_suspended(t_simdata->sender)) \r
285         {\r
286                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);\r
287                 SIMIX_action_set_priority(t_simdata->comm, 0);\r
288         }\r
289         \r
290         process->simdata->waiting_task = t;\r
291         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);\r
292         \r
293         while (1) \r
294         {\r
295                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);\r
296                 \r
297                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)\r
298                         break;\r
299         }\r
300         \r
301         SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);\r
302                 process->simdata->waiting_task = NULL;\r
303         \r
304         /* the task has already finished and the pointer must be null */\r
305         if (t->simdata->sender) \r
306         {\r
307                 t->simdata->sender->simdata->waiting_task = NULL;\r
308         }\r
309         \r
310         /* for this process, don't need to change in get function */\r
311         t->simdata->receiver = NULL;\r
312         SIMIX_mutex_unlock(t_simdata->mutex);\r
313         \r
314         \r
315         if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) \r
316         {\r
317                 SIMIX_action_destroy(t_simdata->comm);\r
318                 t_simdata->comm = NULL;\r
319                 t_simdata->using--;\r
320                 MSG_RETURN(MSG_OK);\r
321         } \r
322         else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) \r
323         {\r
324                 SIMIX_action_destroy(t_simdata->comm);\r
325                 t_simdata->comm = NULL;\r
326                 t_simdata->using--;\r
327                 MSG_RETURN(MSG_HOST_FAILURE);\r
328         } \r
329         else \r
330         {\r
331                 SIMIX_action_destroy(t_simdata->comm);\r
332                 t_simdata->comm = NULL;\r
333                 t_simdata->using--;\r
334                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
335         }\r
336 }\r
337 \r
338 MSG_error_t \r
339 MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, double timeout)\r
340 {\r
341         m_process_t process = MSG_process_self();\r
342         const char* hostname;\r
343         simdata_task_t task_simdata = NULL;\r
344         m_host_t local_host = NULL;\r
345         m_host_t remote_host = NULL;\r
346         smx_cond_t cond = NULL;\r
347 \r
348         CHECK_HOST();\r
349         \r
350         task_simdata = task->simdata;\r
351         task_simdata->sender = process;\r
352         task_simdata->source = MSG_process_get_host(process);\r
353         \r
354         xbt_assert0(task_simdata->using == 1,"This task is still being used somewhere else. You cannot send it now. Go fix your code!");\r
355         \r
356         task_simdata->comm = NULL;\r
357         \r
358         task_simdata->using++;\r
359         local_host = ((simdata_process_t) process->simdata)->m_host;\r
360         \r
361         /* get the host name containing the mailbox */\r
362         hostname = MSG_mailbox_get_hostname(mailbox);\r
363 \r
364         remote_host = MSG_get_host_by_name(hostname);\r
365 \r
366         if(NULL == remote_host)\r
367                 THROW1(not_found_error,0,"Host %s not fount", hostname);\r
368 \r
369 \r
370         DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",task->simdata->message_size / 1000, local_host->name,remote_host->name, MSG_mailbox_get_alias(mailbox));\r
371         \r
372         SIMIX_mutex_lock(remote_host->simdata->mutex);\r
373 \r
374         /* put the task in the mailbox */\r
375         MSG_mailbox_put(mailbox,task);\r
376         \r
377         if(NULL != (cond = MSG_mailbox_get_cond(mailbox)))\r
378         {\r
379                 DEBUG0("Somebody is listening. Let's wake him up!");\r
380                 SIMIX_cond_signal(cond);\r
381         }\r
382 \r
383         \r
384         \r
385         SIMIX_mutex_unlock(remote_host->simdata->mutex);\r
386         \r
387         SIMIX_mutex_lock(task->simdata->mutex);\r
388 \r
389         process->simdata->waiting_task = task;\r
390         \r
391         if(timeout > 0) \r
392         {\r
393                 xbt_ex_t e;\r
394                 double time;\r
395                 double time_elapsed;\r
396                 time = SIMIX_get_clock();\r
397                 \r
398                 TRY \r
399                 {\r
400                         /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */\r
401                         while (1) \r
402                         {\r
403                                 time_elapsed = SIMIX_get_clock() - time;\r
404                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,timeout - time_elapsed);\r
405                                 \r
406                                 if ((task->simdata->comm != NULL) && (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))\r
407                                         break;\r
408                         }\r
409                 } \r
410                 CATCH(e) \r
411                 {\r
412                         if(e.category==timeout_error) \r
413                         {\r
414                                 xbt_ex_free(e);\r
415                                 /* verify if the timeout happened and the communication didn't started yet */\r
416                                 if (task->simdata->comm == NULL) \r
417                                 {\r
418                                         process->simdata->waiting_task = NULL;\r
419                                         \r
420                                         /* remove the task from the mailbox */\r
421                                         MSG_mailbox_remove(mailbox,task);\r
422                                         \r
423                                         if (task->simdata->receiver) \r
424                                         {\r
425                                                 task->simdata->receiver->simdata->waiting_task = NULL;\r
426                                         }\r
427                                         \r
428                                         task->simdata->sender = NULL;\r
429                                         \r
430                                         SIMIX_mutex_unlock(task->simdata->mutex);\r
431                                         MSG_RETURN(MSG_TRANSFER_FAILURE);\r
432                                 }\r
433                         } \r
434                         else \r
435                         {\r
436                                 RETHROW;\r
437                         }\r
438                 }\r
439         } \r
440         else \r
441         {\r
442                 while (1) \r
443                 {\r
444                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);\r
445                         \r
446                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)\r
447                                 break;\r
448                 }\r
449         }\r
450         \r
451         DEBUG1("Action terminated %s", task->name);\r
452         process->simdata->waiting_task = NULL;\r
453         \r
454         /* the task has already finished and the pointer must be null */\r
455         if (task->simdata->receiver) \r
456         {\r
457                 task->simdata->receiver->simdata->waiting_task = NULL;\r
458         }\r
459         \r
460         task->simdata->sender = NULL;\r
461         SIMIX_mutex_unlock(task->simdata->mutex);\r
462 \r
463         \r
464         if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE)\r
465         {\r
466                 MSG_RETURN(MSG_OK);\r
467         } \r
468         else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) \r
469         {\r
470                 MSG_RETURN(MSG_HOST_FAILURE);\r
471         } \r
472         else \r
473         {\r
474                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
475         }\r
476 }\r