Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
aadc18fda0547f4b24a83c801451443a5054a664
[simgrid.git] / src / msg / msg_mailbox.c
1 #include "mailbox.h"\r
2 #include "msg/private.h"\r
3 \r
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_mailbox, msg,\r
5                                 "Logging specific to MSG (mailbox)");\r
6 \r
7 static xbt_dict_t \r
8 msg_mailboxes = NULL;\r
9 \r
10 void\r
11 MSG_mailbox_mod_init(void)\r
12 {\r
13         msg_mailboxes = xbt_dict_new(); \r
14 }\r
15 \r
16 void\r
17 MSG_mailbox_mod_exit(void)\r
18 {\r
19         xbt_dict_free(&msg_mailboxes);\r
20 }\r
21 \r
22 msg_mailbox_t\r
23 MSG_mailbox_new(const char *alias)\r
24 {\r
25         msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t,1);\r
26         \r
27         mailbox->tasks = xbt_fifo_new();\r
28         mailbox->cond = NULL;\r
29         mailbox->alias = xbt_strdup(alias);\r
30         mailbox->hostname = NULL;\r
31         \r
32         /* add the mbox in the dictionary */\r
33         xbt_dict_set(msg_mailboxes, alias, mailbox, MSG_mailbox_free);\r
34         \r
35         return mailbox;\r
36 }\r
37 \r
38 msg_mailbox_t\r
39 MSG_mailbox_create(const char *alias)\r
40 {\r
41         msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t,1);\r
42         \r
43         mailbox->tasks = xbt_fifo_new();\r
44         mailbox->cond = NULL;\r
45         mailbox->alias = xbt_strdup(alias);\r
46         mailbox->hostname = NULL;\r
47         \r
48         return mailbox;\r
49 }\r
50 \r
51 void\r
52 MSG_mailbox_free(void* mailbox)\r
53 {\r
54         msg_mailbox_t _mailbox = (msg_mailbox_t)mailbox;\r
55 \r
56         if(NULL != (_mailbox->hostname))\r
57                 free(_mailbox->hostname);\r
58 \r
59         free(_mailbox->alias);\r
60         \r
61         free(_mailbox);\r
62 }\r
63 \r
64 void\r
65 MSG_mailbox_put(msg_mailbox_t mailbox, m_task_t task)\r
66 {\r
67         xbt_fifo_push(mailbox->tasks, task);\r
68 }\r
69 \r
70 smx_cond_t\r
71 MSG_mailbox_get_cond(msg_mailbox_t mailbox)\r
72 {\r
73         return mailbox->cond;\r
74 }\r
75 \r
76 void\r
77 MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task)\r
78 {\r
79         xbt_fifo_remove(mailbox->tasks,task);\r
80 }\r
81 \r
82 int\r
83 MSG_mailbox_is_empty(msg_mailbox_t mailbox)\r
84 {\r
85         return (NULL == xbt_fifo_get_first_item(mailbox->tasks));\r
86 }\r
87 \r
88 m_task_t\r
89 MSG_mailbox_pop_head(msg_mailbox_t mailbox)\r
90 {\r
91         return (m_task_t)xbt_fifo_shift(mailbox->tasks);\r
92 }\r
93 \r
94 m_task_t\r
95 MSG_mailbox_get_head(msg_mailbox_t mailbox)\r
96 {\r
97         xbt_fifo_item_t item;\r
98         \r
99         if(NULL == (item = xbt_fifo_get_first_item(mailbox->tasks)))\r
100                 return NULL;\r
101                 \r
102         return (m_task_t)xbt_fifo_get_item_content(item);\r
103 }\r
104 \r
105 m_task_t\r
106 MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host)\r
107 {\r
108         m_task_t task = NULL;\r
109         xbt_fifo_item_t item = NULL;\r
110         \r
111         xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) \r
112         {\r
113                 if (task->simdata->source == host)\r
114                         break;\r
115         }\r
116         \r
117         if(item) \r
118                 xbt_fifo_remove_item(mailbox->tasks, item);\r
119                 \r
120         return task;\r
121 }\r
122 \r
123 int\r
124 MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)\r
125 {\r
126         m_task_t task = NULL;\r
127         xbt_fifo_item_t item = NULL;\r
128         int count = 0;\r
129         \r
130         xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) \r
131         {\r
132                 if (task->simdata->source == host)\r
133                         count++;\r
134         }\r
135                 \r
136         return count;\r
137 }\r
138 \r
139 void\r
140 MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)\r
141 {\r
142         mailbox->cond = cond;\r
143 }\r
144 \r
145 const char*\r
146 MSG_mailbox_get_alias(msg_mailbox_t mailbox)\r
147 {\r
148         return mailbox->alias;\r
149 }\r
150 \r
151 const char*\r
152 MSG_mailbox_get_hostname(msg_mailbox_t mailbox)\r
153 {\r
154         return mailbox->hostname;\r
155 }\r
156 \r
157 void\r
158 MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char* hostname)\r
159 {\r
160         mailbox->hostname = xbt_strdup(hostname);\r
161 }\r
162 \r
163 msg_mailbox_t\r
164 MSG_mailbox_get_by_alias(const char* alias)\r
165 {\r
166 \r
167         msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes,alias);\r
168    \r
169         if(!mailbox)\r
170         {\r
171                 mailbox = MSG_mailbox_new(alias);\r
172                 MSG_mailbox_set_hostname(mailbox,MSG_host_self()->name);\r
173         }\r
174         \r
175         return mailbox; \r
176 }\r
177 \r
178 msg_mailbox_t\r
179 MSG_mailbox_get_by_channel(m_host_t host, m_channel_t channel)\r
180 {\r
181          xbt_assert0((host != NULL), "Invalid host");\r
182          xbt_assert1((channel >= 0)&& (channel < msg_global->max_channel), "Invalid channel %d",channel);\r
183 \r
184          return host->simdata->mailboxes[(size_t)channel];\r
185 }\r
186 \r
187 MSG_error_t \r
188 MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t* task, m_host_t host, double timeout)\r
189 {\r
190         m_process_t process = MSG_process_self();\r
191         m_task_t t = NULL;\r
192         m_host_t h = NULL;\r
193         simdata_task_t t_simdata = NULL;\r
194         simdata_host_t h_simdata = NULL;\r
195         int first_time = 1;\r
196         \r
197         smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet\r
198         \r
199         CHECK_HOST();\r
200         \r
201         /* Sanity check */\r
202         xbt_assert0(task, "Null pointer for the task storage");\r
203         \r
204         if (*task)\r
205                 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");\r
206         \r
207         /* Get the task */\r
208         h = MSG_host_self();\r
209         h_simdata = h->simdata;\r
210         \r
211         SIMIX_mutex_lock(h->simdata->mutex);\r
212         \r
213         while (1) \r
214         {\r
215                 /* if the mailbox is empty (has no task */\r
216                 if(!MSG_mailbox_is_empty(mailbox))\r
217                 {\r
218                         if(!host) \r
219                         {\r
220                                 /* pop the head of the mailbox */\r
221                                 t = MSG_mailbox_pop_head(mailbox);\r
222                                 break;\r
223                         } \r
224                         else \r
225                         {\r
226                                 /* get the first task of the host */\r
227                                 if(NULL != (t = MSG_mailbox_get_first_host_task(mailbox,host)))\r
228                                         break;\r
229                         }\r
230                 }\r
231         \r
232                 if(timeout > 0) \r
233                 {\r
234                         if (!first_time) \r
235                         {\r
236                                 SIMIX_mutex_unlock(h->simdata->mutex);\r
237                                 /* set the simix condition of the mailbox to NULL */\r
238                                 MSG_mailbox_set_cond(mailbox, NULL);\r
239                                 SIMIX_cond_destroy(cond);\r
240                                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
241                         }\r
242                 }\r
243                 \r
244                 xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on the channel %s", MSG_mailbox_get_alias(mailbox));\r
245                 \r
246                 cond = SIMIX_cond_init();\r
247                 \r
248                 /* set the condition of the mailbox */\r
249                 MSG_mailbox_set_cond(mailbox, cond);\r
250                 \r
251                 if (timeout > 0)\r
252                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout);\r
253                 else \r
254                         SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex);\r
255                         \r
256         \r
257                 if (SIMIX_host_get_state(h_simdata->smx_host) == 0)\r
258                         MSG_RETURN(MSG_HOST_FAILURE);\r
259         \r
260                 first_time = 0;\r
261         }\r
262         \r
263         SIMIX_mutex_unlock(h->simdata->mutex);\r
264         \r
265         DEBUG1("OK, got a task (%s)", t->name);\r
266         /* clean conditional */\r
267         if (cond) \r
268         {\r
269                 SIMIX_cond_destroy(cond);\r
270                 \r
271                 MSG_mailbox_set_cond(mailbox,NULL);\r
272         }\r
273         \r
274         t_simdata = t->simdata;\r
275         t_simdata->receiver = process;\r
276         *task = t;\r
277         \r
278         SIMIX_mutex_lock(t_simdata->mutex);\r
279         \r
280         /* Transfer */\r
281         /* create SIMIX action to the communication */\r
282         t_simdata->comm = SIMIX_action_communicate(\r
283                                                                                                 t_simdata->sender->simdata->m_host->simdata->smx_host,\r
284                                                                                                 process->simdata->m_host->simdata->smx_host,\r
285                                                                                                 t->name, \r
286                                                                                                 t_simdata->message_size,\r
287                                                                                                 t_simdata->rate\r
288                                                                                         );\r
289         \r
290         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */\r
291         if (MSG_process_is_suspended(t_simdata->sender)) \r
292         {\r
293                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);\r
294                 SIMIX_action_set_priority(t_simdata->comm, 0);\r
295         }\r
296         \r
297         process->simdata->waiting_task = t;\r
298         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);\r
299         \r
300         while (1) \r
301         {\r
302                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);\r
303                 \r
304                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)\r
305                         break;\r
306         }\r
307         \r
308         SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);\r
309                 process->simdata->waiting_task = NULL;\r
310         \r
311         /* the task has already finished and the pointer must be null */\r
312         if (t->simdata->sender) \r
313         {\r
314                 t->simdata->sender->simdata->waiting_task = NULL;\r
315         }\r
316         \r
317         /* for this process, don't need to change in get function */\r
318         t->simdata->receiver = NULL;\r
319         SIMIX_mutex_unlock(t_simdata->mutex);\r
320         \r
321         \r
322         if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) \r
323         {\r
324                 SIMIX_action_destroy(t_simdata->comm);\r
325                 t_simdata->comm = NULL;\r
326                 t_simdata->using--;\r
327                 MSG_RETURN(MSG_OK);\r
328         } \r
329         else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) \r
330         {\r
331                 SIMIX_action_destroy(t_simdata->comm);\r
332                 t_simdata->comm = NULL;\r
333                 t_simdata->using--;\r
334                 MSG_RETURN(MSG_HOST_FAILURE);\r
335         } \r
336         else \r
337         {\r
338                 SIMIX_action_destroy(t_simdata->comm);\r
339                 t_simdata->comm = NULL;\r
340                 t_simdata->using--;\r
341                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
342         }\r
343 }\r
344 \r
345 MSG_error_t \r
346 MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, double timeout)\r
347 {\r
348         m_process_t process = MSG_process_self();\r
349         const char* hostname;\r
350         simdata_task_t task_simdata = NULL;\r
351         m_host_t local_host = NULL;\r
352         m_host_t remote_host = NULL;\r
353         smx_cond_t cond = NULL;\r
354 \r
355         CHECK_HOST();\r
356         \r
357         task_simdata = task->simdata;\r
358         task_simdata->sender = process;\r
359         task_simdata->source = MSG_process_get_host(process);\r
360         \r
361         xbt_assert0(task_simdata->using == 1,"This task is still being used somewhere else. You cannot send it now. Go fix your code!");\r
362         \r
363         task_simdata->comm = NULL;\r
364         \r
365         task_simdata->using++;\r
366         local_host = ((simdata_process_t) process->simdata)->m_host;\r
367         \r
368         /* get the host name containing the mailbox */\r
369         hostname = MSG_mailbox_get_hostname(mailbox);\r
370 \r
371         remote_host = MSG_get_host_by_name(hostname);\r
372 \r
373         if(NULL == remote_host)\r
374                 THROW1(not_found_error,0,"Host %s not fount", hostname);\r
375 \r
376 \r
377         DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",task->simdata->message_size / 1000, local_host->name,remote_host->name, MSG_mailbox_get_alias(mailbox));\r
378         \r
379         SIMIX_mutex_lock(remote_host->simdata->mutex);\r
380 \r
381         /* put the task in the mailbox */\r
382         MSG_mailbox_put(mailbox,task);\r
383         \r
384         if(NULL != (cond = MSG_mailbox_get_cond(mailbox)))\r
385         {\r
386                 DEBUG0("Somebody is listening. Let's wake him up!");\r
387                 SIMIX_cond_signal(cond);\r
388         }\r
389 \r
390         \r
391         \r
392         SIMIX_mutex_unlock(remote_host->simdata->mutex);\r
393         \r
394         SIMIX_mutex_lock(task->simdata->mutex);\r
395 \r
396         process->simdata->waiting_task = task;\r
397         \r
398         if(timeout > 0) \r
399         {\r
400                 xbt_ex_t e;\r
401                 double time;\r
402                 double time_elapsed;\r
403                 time = SIMIX_get_clock();\r
404                 \r
405                 TRY \r
406                 {\r
407                         /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */\r
408                         while (1) \r
409                         {\r
410                                 time_elapsed = SIMIX_get_clock() - time;\r
411                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,timeout - time_elapsed);\r
412                                 \r
413                                 if ((task->simdata->comm != NULL) && (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))\r
414                                         break;\r
415                         }\r
416                 } \r
417                 CATCH(e) \r
418                 {\r
419                         if(e.category==timeout_error) \r
420                         {\r
421                                 xbt_ex_free(e);\r
422                                 /* verify if the timeout happened and the communication didn't started yet */\r
423                                 if (task->simdata->comm == NULL) \r
424                                 {\r
425                                         process->simdata->waiting_task = NULL;\r
426                                         \r
427                                         /* remove the task from the mailbox */\r
428                                         MSG_mailbox_remove(mailbox,task);\r
429                                         \r
430                                         if (task->simdata->receiver) \r
431                                         {\r
432                                                 task->simdata->receiver->simdata->waiting_task = NULL;\r
433                                         }\r
434                                         \r
435                                         task->simdata->sender = NULL;\r
436                                         \r
437                                         SIMIX_mutex_unlock(task->simdata->mutex);\r
438                                         MSG_RETURN(MSG_TRANSFER_FAILURE);\r
439                                 }\r
440                         } \r
441                         else \r
442                         {\r
443                                 RETHROW;\r
444                         }\r
445                 }\r
446         } \r
447         else \r
448         {\r
449                 while (1) \r
450                 {\r
451                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);\r
452                         \r
453                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)\r
454                                 break;\r
455                 }\r
456         }\r
457         \r
458         DEBUG1("Action terminated %s", task->name);\r
459         process->simdata->waiting_task = NULL;\r
460         \r
461         /* the task has already finished and the pointer must be null */\r
462         if (task->simdata->receiver) \r
463         {\r
464                 task->simdata->receiver->simdata->waiting_task = NULL;\r
465         }\r
466         \r
467         task->simdata->sender = NULL;\r
468         SIMIX_mutex_unlock(task->simdata->mutex);\r
469 \r
470         \r
471         if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE)\r
472         {\r
473                 MSG_RETURN(MSG_OK);\r
474         } \r
475         else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) \r
476         {\r
477                 MSG_RETURN(MSG_HOST_FAILURE);\r
478         } \r
479         else \r
480         {\r
481                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
482         }\r
483 }\r