Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bug Fix. This function used to return the last task of the list when no matching...
[simgrid.git] / src / msg / msg_mailbox.c
1 #include "mailbox.h"\r
2 #include "msg/private.h"\r
3 \r
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_mailbox, msg,\r
5                                 "Logging specific to MSG (mailbox)");\r
6 \r
7 static xbt_dict_t \r
8 msg_mailboxes = NULL;\r
9 \r
10 void\r
11 MSG_mailbox_mod_init(void)\r
12 {\r
13         msg_mailboxes = xbt_dict_new(); \r
14 }\r
15 \r
16 void\r
17 MSG_mailbox_mod_exit(void)\r
18 {\r
19         xbt_dict_free(&msg_mailboxes);\r
20 }\r
21 \r
22 msg_mailbox_t\r
23 MSG_mailbox_new(const char *alias)\r
24 {\r
25         msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t,1);\r
26         \r
27         mailbox->tasks = xbt_fifo_new();\r
28         mailbox->cond = NULL;\r
29         mailbox->alias = xbt_strdup(alias);\r
30         mailbox->hostname = NULL;\r
31         \r
32         /* add the mbox in the dictionary */\r
33         xbt_dict_set(msg_mailboxes, alias, mailbox, MSG_mailbox_free);\r
34         \r
35         return mailbox;\r
36 }\r
37 \r
38 msg_mailbox_t\r
39 MSG_mailbox_create(const char *alias)\r
40 {\r
41         msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t,1);\r
42         \r
43         mailbox->tasks = xbt_fifo_new();\r
44         mailbox->cond = NULL;\r
45         mailbox->alias = xbt_strdup(alias);\r
46         mailbox->hostname = NULL;\r
47         \r
48         return mailbox;\r
49 }\r
50 \r
51 void\r
52 MSG_mailbox_free(void* mailbox)\r
53 {\r
54         msg_mailbox_t _mailbox = (msg_mailbox_t)mailbox;\r
55 \r
56         if(NULL != (_mailbox->hostname))\r
57                 free(_mailbox->hostname);\r
58 \r
59         free(_mailbox->alias);\r
60         \r
61         free(_mailbox);\r
62 }\r
63 \r
64 void\r
65 MSG_mailbox_put(msg_mailbox_t mailbox, m_task_t task)\r
66 {\r
67         xbt_fifo_push(mailbox->tasks, task);\r
68 }\r
69 \r
70 smx_cond_t\r
71 MSG_mailbox_get_cond(msg_mailbox_t mailbox)\r
72 {\r
73         return mailbox->cond;\r
74 }\r
75 \r
76 void\r
77 MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task)\r
78 {\r
79         xbt_fifo_remove(mailbox->tasks,task);\r
80 }\r
81 \r
82 int\r
83 MSG_mailbox_is_empty(msg_mailbox_t mailbox)\r
84 {\r
85         return (NULL == xbt_fifo_get_first_item(mailbox->tasks));\r
86 }\r
87 \r
88 m_task_t\r
89 MSG_mailbox_pop_head(msg_mailbox_t mailbox)\r
90 {\r
91         return (m_task_t)xbt_fifo_shift(mailbox->tasks);\r
92 }\r
93 \r
94 m_task_t\r
95 MSG_mailbox_get_head(msg_mailbox_t mailbox)\r
96 {\r
97         xbt_fifo_item_t item;\r
98         \r
99         if(NULL == (item = xbt_fifo_get_first_item(mailbox->tasks)))\r
100                 return NULL;\r
101                 \r
102         return (m_task_t)xbt_fifo_get_item_content(item);\r
103 }\r
104 \r
105 \r
106 m_task_t\r
107 MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host)\r
108 {\r
109  m_task_t task = NULL;\r
110  xbt_fifo_item_t item = NULL;\r
111 \r
112  xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t)\r
113    if (task->simdata->source == host) {\r
114      xbt_fifo_remove_item(mailbox->tasks, item);\r
115      return task;\r
116    }\r
117 \r
118  return NULL;\r
119 }\r
120 \r
121 int\r
122 MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)\r
123 {\r
124         m_task_t task = NULL;\r
125         xbt_fifo_item_t item = NULL;\r
126         int count = 0;\r
127         \r
128         xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) \r
129         {\r
130                 if (task->simdata->source == host)\r
131                         count++;\r
132         }\r
133                 \r
134         return count;\r
135 }\r
136 \r
137 void\r
138 MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)\r
139 {\r
140         mailbox->cond = cond;\r
141 }\r
142 \r
143 const char*\r
144 MSG_mailbox_get_alias(msg_mailbox_t mailbox)\r
145 {\r
146         return mailbox->alias;\r
147 }\r
148 \r
149 const char*\r
150 MSG_mailbox_get_hostname(msg_mailbox_t mailbox)\r
151 {\r
152         return mailbox->hostname;\r
153 }\r
154 \r
155 void\r
156 MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char* hostname)\r
157 {\r
158         mailbox->hostname = xbt_strdup(hostname);\r
159 }\r
160 \r
161 msg_mailbox_t\r
162 MSG_mailbox_get_by_alias(const char* alias)\r
163 {\r
164 \r
165         msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes,alias);\r
166    \r
167         if(!mailbox)\r
168         {\r
169                 mailbox = MSG_mailbox_new(alias);\r
170                 MSG_mailbox_set_hostname(mailbox,MSG_host_self()->name);\r
171         }\r
172         \r
173         return mailbox; \r
174 }\r
175 \r
176 msg_mailbox_t\r
177 MSG_mailbox_get_by_channel(m_host_t host, m_channel_t channel)\r
178 {\r
179          xbt_assert0((host != NULL), "Invalid host");\r
180          xbt_assert1((channel >= 0)&& (channel < msg_global->max_channel), "Invalid channel %d",channel);\r
181 \r
182          return host->simdata->mailboxes[(size_t)channel];\r
183 }\r
184 \r
185 MSG_error_t \r
186 MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t* task, m_host_t host, double timeout)\r
187 {\r
188         m_process_t process = MSG_process_self();\r
189         m_task_t t = NULL;\r
190         m_host_t h = NULL;\r
191         simdata_task_t t_simdata = NULL;\r
192         simdata_host_t h_simdata = NULL;\r
193         int first_time = 1;\r
194         \r
195         smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet\r
196         \r
197         CHECK_HOST();\r
198         \r
199         /* Sanity check */\r
200         xbt_assert0(task, "Null pointer for the task storage");\r
201         \r
202         if (*task)\r
203                 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");\r
204         \r
205         /* Get the task */\r
206         h = MSG_host_self();\r
207         h_simdata = h->simdata;\r
208         \r
209         SIMIX_mutex_lock(h->simdata->mutex);\r
210         \r
211         while (1) \r
212         {\r
213                 /* if the mailbox is empty (has no task */\r
214                 if(!MSG_mailbox_is_empty(mailbox))\r
215                 {\r
216                         if(!host) \r
217                         {\r
218                                 /* pop the head of the mailbox */\r
219                                 t = MSG_mailbox_pop_head(mailbox);\r
220                                 break;\r
221                         } \r
222                         else \r
223                         {\r
224                                 /* get the first task of the host */\r
225                                 if(NULL != (t = MSG_mailbox_get_first_host_task(mailbox,host)))\r
226                                         break;\r
227                         }\r
228                 }\r
229         \r
230                 if(timeout > 0) \r
231                 {\r
232                         if (!first_time) \r
233                         {\r
234                                 SIMIX_mutex_unlock(h->simdata->mutex);\r
235                                 /* set the simix condition of the mailbox to NULL */\r
236                                 MSG_mailbox_set_cond(mailbox, NULL);\r
237                                 SIMIX_cond_destroy(cond);\r
238                                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
239                         }\r
240                 }\r
241                 \r
242                 xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on the channel %s", MSG_mailbox_get_alias(mailbox));\r
243                 \r
244                 cond = SIMIX_cond_init();\r
245                 \r
246                 /* set the condition of the mailbox */\r
247                 MSG_mailbox_set_cond(mailbox, cond);\r
248                 \r
249                 if (timeout > 0)\r
250                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout);\r
251                 else \r
252                         SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex);\r
253                         \r
254         \r
255                 if (SIMIX_host_get_state(h_simdata->smx_host) == 0)\r
256                         MSG_RETURN(MSG_HOST_FAILURE);\r
257         \r
258                 first_time = 0;\r
259         }\r
260         \r
261         SIMIX_mutex_unlock(h->simdata->mutex);\r
262         \r
263         DEBUG1("OK, got a task (%s)", t->name);\r
264         /* clean conditional */\r
265         if (cond) \r
266         {\r
267                 SIMIX_cond_destroy(cond);\r
268                 \r
269                 MSG_mailbox_set_cond(mailbox,NULL);\r
270         }\r
271         \r
272         t_simdata = t->simdata;\r
273         t_simdata->receiver = process;\r
274         *task = t;\r
275         \r
276         SIMIX_mutex_lock(t_simdata->mutex);\r
277         \r
278         /* Transfer */\r
279         /* create SIMIX action to the communication */\r
280         t_simdata->comm = SIMIX_action_communicate(\r
281                                                                                                 t_simdata->sender->simdata->m_host->simdata->smx_host,\r
282                                                                                                 process->simdata->m_host->simdata->smx_host,\r
283                                                                                                 t->name, \r
284                                                                                                 t_simdata->message_size,\r
285                                                                                                 t_simdata->rate\r
286                                                                                         );\r
287         \r
288         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */\r
289         if (MSG_process_is_suspended(t_simdata->sender)) \r
290         {\r
291                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);\r
292                 SIMIX_action_set_priority(t_simdata->comm, 0);\r
293         }\r
294         \r
295         process->simdata->waiting_task = t;\r
296         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);\r
297         \r
298         while (1) \r
299         {\r
300                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);\r
301                 \r
302                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)\r
303                         break;\r
304         }\r
305         \r
306         SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);\r
307                 process->simdata->waiting_task = NULL;\r
308         \r
309         /* the task has already finished and the pointer must be null */\r
310         if (t->simdata->sender) \r
311         {\r
312                 t->simdata->sender->simdata->waiting_task = NULL;\r
313         }\r
314         \r
315         /* for this process, don't need to change in get function */\r
316         t->simdata->receiver = NULL;\r
317         SIMIX_mutex_unlock(t_simdata->mutex);\r
318         \r
319         \r
320         if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) \r
321         {\r
322                 SIMIX_action_destroy(t_simdata->comm);\r
323                 t_simdata->comm = NULL;\r
324                 t_simdata->using--;\r
325                 MSG_RETURN(MSG_OK);\r
326         } \r
327         else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) \r
328         {\r
329                 SIMIX_action_destroy(t_simdata->comm);\r
330                 t_simdata->comm = NULL;\r
331                 t_simdata->using--;\r
332                 MSG_RETURN(MSG_HOST_FAILURE);\r
333         } \r
334         else \r
335         {\r
336                 SIMIX_action_destroy(t_simdata->comm);\r
337                 t_simdata->comm = NULL;\r
338                 t_simdata->using--;\r
339                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
340         }\r
341 }\r
342 \r
343 MSG_error_t \r
344 MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, double timeout)\r
345 {\r
346         m_process_t process = MSG_process_self();\r
347         const char* hostname;\r
348         simdata_task_t task_simdata = NULL;\r
349         m_host_t local_host = NULL;\r
350         m_host_t remote_host = NULL;\r
351         smx_cond_t cond = NULL;\r
352 \r
353         CHECK_HOST();\r
354         \r
355         task_simdata = task->simdata;\r
356         task_simdata->sender = process;\r
357         task_simdata->source = MSG_process_get_host(process);\r
358         \r
359         xbt_assert0(task_simdata->using == 1,"This task is still being used somewhere else. You cannot send it now. Go fix your code!");\r
360         \r
361         task_simdata->comm = NULL;\r
362         \r
363         task_simdata->using++;\r
364         local_host = ((simdata_process_t) process->simdata)->m_host;\r
365         \r
366         /* get the host name containing the mailbox */\r
367         hostname = MSG_mailbox_get_hostname(mailbox);\r
368 \r
369         remote_host = MSG_get_host_by_name(hostname);\r
370 \r
371         if(NULL == remote_host)\r
372                 THROW1(not_found_error,0,"Host %s not fount", hostname);\r
373 \r
374 \r
375         DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",task->simdata->message_size / 1000, local_host->name,remote_host->name, MSG_mailbox_get_alias(mailbox));\r
376         \r
377         SIMIX_mutex_lock(remote_host->simdata->mutex);\r
378 \r
379         /* put the task in the mailbox */\r
380         MSG_mailbox_put(mailbox,task);\r
381         \r
382         if(NULL != (cond = MSG_mailbox_get_cond(mailbox)))\r
383         {\r
384                 DEBUG0("Somebody is listening. Let's wake him up!");\r
385                 SIMIX_cond_signal(cond);\r
386         }\r
387 \r
388         \r
389         \r
390         SIMIX_mutex_unlock(remote_host->simdata->mutex);\r
391         \r
392         SIMIX_mutex_lock(task->simdata->mutex);\r
393 \r
394         process->simdata->waiting_task = task;\r
395         \r
396         if(timeout > 0) \r
397         {\r
398                 xbt_ex_t e;\r
399                 double time;\r
400                 double time_elapsed;\r
401                 time = SIMIX_get_clock();\r
402                 \r
403                 TRY \r
404                 {\r
405                         /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */\r
406                         while (1) \r
407                         {\r
408                                 time_elapsed = SIMIX_get_clock() - time;\r
409                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,timeout - time_elapsed);\r
410                                 \r
411                                 if ((task->simdata->comm != NULL) && (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))\r
412                                         break;\r
413                         }\r
414                 } \r
415                 CATCH(e) \r
416                 {\r
417                         if(e.category==timeout_error) \r
418                         {\r
419                                 xbt_ex_free(e);\r
420                                 /* verify if the timeout happened and the communication didn't started yet */\r
421                                 if (task->simdata->comm == NULL) \r
422                                 {\r
423                                         process->simdata->waiting_task = NULL;\r
424                                         \r
425                                         /* remove the task from the mailbox */\r
426                                         MSG_mailbox_remove(mailbox,task);\r
427                                         \r
428                                         if (task->simdata->receiver) \r
429                                         {\r
430                                                 task->simdata->receiver->simdata->waiting_task = NULL;\r
431                                         }\r
432                                         \r
433                                         task->simdata->sender = NULL;\r
434                                         \r
435                                         SIMIX_mutex_unlock(task->simdata->mutex);\r
436                                         MSG_RETURN(MSG_TRANSFER_FAILURE);\r
437                                 }\r
438                         } \r
439                         else \r
440                         {\r
441                                 RETHROW;\r
442                         }\r
443                 }\r
444         } \r
445         else \r
446         {\r
447                 while (1) \r
448                 {\r
449                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);\r
450                         \r
451                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)\r
452                                 break;\r
453                 }\r
454         }\r
455         \r
456         DEBUG1("Action terminated %s", task->name);\r
457         process->simdata->waiting_task = NULL;\r
458         \r
459         /* the task has already finished and the pointer must be null */\r
460         if (task->simdata->receiver) \r
461         {\r
462                 task->simdata->receiver->simdata->waiting_task = NULL;\r
463         }\r
464         \r
465         task->simdata->sender = NULL;\r
466         SIMIX_mutex_unlock(task->simdata->mutex);\r
467 \r
468         \r
469         if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE)\r
470         {\r
471                 MSG_RETURN(MSG_OK);\r
472         } \r
473         else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) \r
474         {\r
475                 MSG_RETURN(MSG_HOST_FAILURE);\r
476         } \r
477         else \r
478         {\r
479                 MSG_RETURN(MSG_TRANSFER_FAILURE);\r
480         }\r
481 }\r