Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
msg_simix alpha. All functions implemented.
[simgrid.git] / src / msg_simix / msg_simix_gos.c
1 #include "msg_simix_private.h"
2 #include "xbt/sysdep.h"
3 #include "xbt/log.h"
4
5 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, "Logging specific to MSG (gos)");
6
7 /** \defgroup msg_gos_functions MSG Operating System Functions
8  *  \brief This section describes the functions that can be used
9  *  by an agent for handling some task.
10  */
11
12 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
13                                                         m_channel_t channel,
14                                                         double max_duration,
15                                                         m_host_t host)
16 {
17
18   m_process_t process = MSG_process_self();
19   m_task_t t = NULL;
20   m_host_t h = NULL;
21   simdata_task_t t_simdata = NULL;
22   simdata_host_t h_simdata = NULL;
23   int first_time = 1;
24   xbt_fifo_item_t item = NULL;
25
26         smx_cond_t cond = NULL;                 //conditional wait if the task isn't on the channel yet
27
28   CHECK_HOST();
29   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
30   /* Sanity check */
31   xbt_assert0(task,"Null pointer for the task\n");
32
33   if (*task) 
34     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
35
36   /* Get the task */
37   h = MSG_host_self();
38   h_simdata = h->simdata;
39
40   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
41
42         SIMIX_mutex_lock(h->simdata->mutex);
43   while (1) {
44                 if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
45                         if(!host) {
46                                 t = xbt_fifo_shift(h_simdata->mbox[channel]);
47                                 break;
48                         } else {
49                                 xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
50                                         if(t->simdata->source==host) break;
51                                 }
52                                 if(item) {
53                                         xbt_fifo_remove_item(h->simdata->mbox[channel],item);
54                                         break;
55                                 } 
56                         }
57                 }
58                 
59                 if(max_duration>0) {
60                         if(!first_time) {
61                                 MSG_RETURN(MSG_TRANSFER_FAILURE);
62                         }
63                 }
64                 xbt_assert1(!(h_simdata->sleeping[channel]),"A process is already blocked on channel %d", channel);
65         
66                 cond = SIMIX_cond_init();
67                 h_simdata->sleeping[channel] = cond;
68                 if (max_duration > 0) {
69                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
70                 }
71                 else SIMIX_cond_wait(h_simdata->sleeping[channel],h->simdata->mutex);
72
73                 first_time = 0;
74         }
75         SIMIX_mutex_unlock(h->simdata->mutex);
76
77   DEBUG1("OK, got a task (%s)", t->name);
78         /* clean conditional */
79         if (cond) {
80                 SIMIX_cond_destroy(cond);
81                 h_simdata->sleeping[channel] = NULL;
82         }
83
84   t_simdata = t->simdata;
85   /*   *task = __MSG_task_copy(t); */
86   *task=t;
87
88         SIMIX_mutex_lock(t_simdata->mutex);
89
90   /* Transfer */
91   t_simdata->using++;
92         /* create SIMIX action to the communication */
93         t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->host->simdata->host,
94                                                                                                                                                                                 process->simdata->host->simdata->host,t->name, t_simdata->message_size, t_simdata->rate); 
95                                                                                                                                                                                 /*
96         if (MSG_process_is_suspended(t_simdata->sender)) {
97                 SIMIX_set_priority(t_simdata->comm,0);
98                 t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->host->simdata->host,
99                                                                                                                                                                                         process->simdata->host->simdata->host,t->name, t_simdata->message_size, t_simdata->rate); 
100         }
101                                                                                                                                                                                         */
102                 /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
103         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
104         SIMIX_register_condition_to_action(t_simdata->comm, t_simdata->cond);
105         SIMIX_cond_wait(t_simdata->cond,t_simdata->mutex);
106
107         /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
108         t->simdata->comm = NULL;
109         t->simdata->compute = NULL;
110         SIMIX_mutex_unlock(t_simdata->mutex);
111
112         //MSG_task_destroy(t);
113
114         MSG_RETURN(MSG_OK);
115 }
116         
117 /** \ingroup msg_gos_functions
118  * \brief Listen on a channel and wait for receiving a task.
119  *
120  * It takes two parameters.
121  * \param task a memory location for storing a #m_task_t. It will
122    hold a task when this function will return. Thus \a task should not
123    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
124    those two condition does not hold, there will be a warning message.
125  * \param channel the channel on which the agent should be
126    listening. This value has to be >=0 and < than the maximal
127    number of channels fixed with MSG_set_channel_number().
128  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
129  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
130  */
131 MSG_error_t MSG_task_get(m_task_t * task,
132                          m_channel_t channel)
133 {
134   return MSG_task_get_with_time_out(task, channel, -1);
135 }
136
137 /** \ingroup msg_gos_functions
138  * \brief Listen on a channel and wait for receiving a task with a timeout.
139  *
140  * It takes three parameters.
141  * \param task a memory location for storing a #m_task_t. It will
142    hold a task when this function will return. Thus \a task should not
143    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
144    those two condition does not hold, there will be a warning message.
145  * \param channel the channel on which the agent should be
146    listening. This value has to be >=0 and < than the maximal
147    number of channels fixed with MSG_set_channel_number().
148  * \param max_duration the maximum time to wait for a task before giving
149     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
150     will not be modified and will still be
151     equal to \c NULL when returning. 
152  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
153    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
154  */
155 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
156                                        m_channel_t channel,
157                                        double max_duration)
158 {
159   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
160 }
161
162 /** \ingroup msg_gos_functions
163  * \brief Listen on \a channel and waits for receiving a task from \a host.
164  *
165  * It takes three parameters.
166  * \param task a memory location for storing a #m_task_t. It will
167    hold a task when this function will return. Thus \a task should not
168    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
169    those two condition does not hold, there will be a warning message.
170  * \param channel the channel on which the agent should be
171    listening. This value has to be >=0 and < than the maximal
172    number of channels fixed with MSG_set_channel_number().
173  * \param host the host that is to be watched.
174  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
175    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
176  */
177 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
178                                    m_host_t host)
179 {
180   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
181 }
182
183 /** \ingroup msg_gos_functions
184  * \brief Test whether there is a pending communication on a channel.
185  *
186  * It takes one parameter.
187  * \param channel the channel on which the agent should be
188    listening. This value has to be >=0 and < than the maximal
189    number of channels fixed with MSG_set_channel_number().
190  * \return 1 if there is a pending communication and 0 otherwise
191  */
192 int MSG_task_Iprobe(m_channel_t channel)
193 {
194   m_host_t h = NULL;
195
196   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
197   CHECK_HOST();
198
199   DEBUG2("Probing on channel %d (%s)", channel,h->name);
200
201   h = MSG_host_self();
202   return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL);
203 }
204
205 /** \ingroup msg_gos_functions
206  * \brief Test whether there is a pending communication on a channel, and who sent it.
207  *
208  * It takes one parameter.
209  * \param channel the channel on which the agent should be
210    listening. This value has to be >=0 and < than the maximal
211    number of channels fixed with MSG_set_channel_number().
212  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
213  */
214 int MSG_task_probe_from(m_channel_t channel)
215 {
216   m_host_t h = NULL;
217   xbt_fifo_item_t item;
218   m_task_t t;
219
220   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
221   CHECK_HOST();
222
223   h = MSG_host_self();
224
225   DEBUG2("Probing on channel %d (%s)", channel,h->name);
226    
227   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
228   if ( (!item) || (!(t = xbt_fifo_get_item_content(item))) )
229     return -1;
230    
231   return MSG_process_get_PID(t->simdata->sender);
232 }
233
234 /** \ingroup msg_gos_functions
235  * \brief Wait for at most \a max_duration second for a task reception
236    on \a channel. *\a PID is updated with the PID of the first process
237    that triggered this event if any.
238  *
239  * It takes three parameters:
240  * \param channel the channel on which the agent should be
241    listening. This value has to be >=0 and < than the maximal.
242    number of channels fixed with MSG_set_channel_number().
243  * \param PID a memory location for storing an int.
244  * \param max_duration the maximum time to wait for a task before
245     giving up. In the case of a reception, *\a PID will be updated
246     with the PID of the first process to send a task.
247  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
248    and #MSG_OK otherwise.
249  */
250 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
251                 int *PID)
252 {
253         m_host_t h = NULL;
254         simdata_host_t h_simdata = NULL;
255         xbt_fifo_item_t item;
256         m_task_t t;
257         int first_time = 1;
258         smx_cond_t cond;
259
260         xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
261         if(PID) {
262                 *PID = -1;
263         }
264
265         if(max_duration==0.0) {
266                 *PID = MSG_task_probe_from(channel);
267                 MSG_RETURN(MSG_OK);
268         } else {
269                 CHECK_HOST();
270                 h = MSG_host_self();
271                 h_simdata = h->simdata;
272
273                 DEBUG2("Probing on channel %d (%s)", channel,h->name);
274                 while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
275                         if(max_duration>0) {
276                                 if(!first_time) {
277                                         MSG_RETURN(MSG_OK);
278                                 }
279                         }
280                         SIMIX_mutex_lock(h_simdata->mutex);
281                         xbt_assert1(!(h_simdata->sleeping[channel]),
282                                         "A process is already blocked on this channel %d", channel);
283                         cond = SIMIX_cond_init();
284                         h_simdata->sleeping[channel] = cond; /* I'm waiting. Wake me up when you're ready */
285                         if(max_duration>0) {
286                                 SIMIX_cond_wait_timeout(cond,h_simdata->mutex, max_duration);
287                         } else {
288                                 SIMIX_cond_wait(cond,h_simdata->mutex);
289                         }
290                         SIMIX_cond_destroy(cond);
291                         SIMIX_mutex_unlock(h_simdata->mutex);
292                         if(SIMIX_host_get_state(h_simdata->host)==0) {
293                                 MSG_RETURN(MSG_HOST_FAILURE);
294                         }
295                         h_simdata->sleeping[channel] = NULL;
296                         first_time = 0;
297                 }
298                 if (!item || !(t = xbt_fifo_get_item_content(item))) {
299                         MSG_RETURN(MSG_OK);
300                 }
301                 if(PID) {
302                         *PID = MSG_process_get_PID(t->simdata->sender);
303                 }
304                 MSG_RETURN(MSG_OK);
305         }
306 }
307
308
309 /** \ingroup msg_gos_functions
310
311  * \brief Return the number of tasks waiting to be received on a \a
312    channel and sent by \a host.
313  *
314  * It takes two parameters.
315  * \param channel the channel on which the agent should be
316    listening. This value has to be >=0 and < than the maximal
317    number of channels fixed with MSG_set_channel_number().
318  * \param host the host that is to be watched.
319  * \return the number of tasks waiting to be received on \a channel
320    and sent by \a host.
321  */
322 int MSG_task_probe_from_host(int channel, m_host_t host)
323 {
324         xbt_die("not implemented yet");
325         return 0;
326 }
327
328 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
329  * host (with a timeout on the waiting of the destination host) and
330  * waits for the end of the transmission.
331  *
332  * This function is used for describing the behavior of an agent. It
333  * takes four parameter.
334  * \param task a #m_task_t to send on another location. This task
335    will not be usable anymore when the function will return. There is
336    no automatic task duplication and you have to save your parameters
337    before calling this function. Tasks are unique and once it has been
338    sent to another location, you should not access it anymore. You do
339    not need to call MSG_task_destroy() but to avoid using, as an
340    effect of inattention, this task anymore, you definitely should
341    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
342    can be transfered iff it has been correctly created with
343    MSG_task_create().
344  * \param dest the destination of the message
345  * \param channel the channel on which the agent should put this
346    task. This value has to be >=0 and < than the maximal number of
347    channels fixed with MSG_set_channel_number().
348  * \param max_duration the maximum time to wait for a task before giving
349     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
350     will not be modified 
351  * \return #MSG_FATAL if \a task is not properly initialized and
352    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
353    this function was called was shut down. Returns
354    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
355    (network failure, dest failure, timeout...)
356  */
357 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
358                                       m_channel_t channel, double max_duration)
359 {
360
361
362   m_process_t process = MSG_process_self();
363   simdata_task_t task_simdata = NULL;
364   m_host_t local_host = NULL;
365   m_host_t remote_host = NULL;
366   CHECK_HOST();
367
368   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
369
370   task_simdata = task->simdata;
371   task_simdata->sender = process;
372   task_simdata->source = MSG_process_get_host(process);
373   xbt_assert0(task_simdata->using==1,
374               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
375   task_simdata->comm = NULL;
376   
377   local_host = ((simdata_process_t) process->simdata)->host;
378   remote_host = dest;
379
380   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
381          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
382
383         SIMIX_mutex_lock(remote_host->simdata->mutex);
384   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
385                 mbox[channel], task);
386
387   
388   if(remote_host->simdata->sleeping[channel]) {
389     DEBUG0("Somebody is listening. Let's wake him up!");
390                 SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
391   }
392         SIMIX_mutex_unlock(remote_host->simdata->mutex);
393
394   process->simdata->put_host = dest;
395   process->simdata->put_channel = channel;
396         SIMIX_mutex_lock(task->simdata->mutex);
397  // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
398         DEBUG0("Waiting action finish!");
399         if (max_duration >0) {
400                 SIMIX_cond_wait_timeout(task->simdata->cond,task->simdata->mutex,max_duration);
401         }
402         else {
403                 SIMIX_cond_wait(task->simdata->cond,task->simdata->mutex);
404         }
405         DEBUG1("Action terminated %s",task->name);    
406         task->simdata->using--;
407         SIMIX_mutex_unlock(task->simdata->mutex);
408
409
410         MSG_RETURN(MSG_OK);
411 }
412 /** \ingroup msg_gos_functions
413  * \brief Put a task on a channel of an host and waits for the end of the
414  * transmission.
415  *
416  * This function is used for describing the behavior of an agent. It
417  * takes three parameter.
418  * \param task a #m_task_t to send on another location. This task
419    will not be usable anymore when the function will return. There is
420    no automatic task duplication and you have to save your parameters
421    before calling this function. Tasks are unique and once it has been
422    sent to another location, you should not access it anymore. You do
423    not need to call MSG_task_destroy() but to avoid using, as an
424    effect of inattention, this task anymore, you definitely should
425    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
426    can be transfered iff it has been correctly created with
427    MSG_task_create().
428  * \param dest the destination of the message
429  * \param channel the channel on which the agent should put this
430    task. This value has to be >=0 and < than the maximal number of
431    channels fixed with MSG_set_channel_number().
432  * \return #MSG_FATAL if \a task is not properly initialized and
433  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
434  * this function was called was shut down. Returns
435  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
436  * (network failure, dest failure)
437  */
438 MSG_error_t MSG_task_put(m_task_t task,
439                          m_host_t dest, m_channel_t channel)
440 {
441   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
442 }
443
444 /** \ingroup msg_gos_functions
445  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
446  * rate.
447  *
448  * \sa MSG_task_put
449  */
450 MSG_error_t MSG_task_put_bounded(m_task_t task,
451                                  m_host_t dest, m_channel_t channel,
452                                  double max_rate)
453 {
454   MSG_error_t res = MSG_OK;
455   task->simdata->rate=max_rate;
456   res = MSG_task_put(task, dest, channel);
457   return(res);
458 }
459
460 /** \ingroup msg_gos_functions
461  * \brief Executes a task and waits for its termination.
462  *
463  * This function is used for describing the behavior of an agent. It
464  * takes only one parameter.
465  * \param task a #m_task_t to execute on the location on which the
466    agent is running.
467  * \return #MSG_FATAL if \a task is not properly initialized and
468  * #MSG_OK otherwise.
469  */
470 MSG_error_t MSG_task_execute(m_task_t task)
471 {
472         simdata_task_t simdata = NULL;
473
474   CHECK_HOST();
475
476   simdata = task->simdata;
477   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
478               "This taks is executed somewhere else. Go fix your code!");
479         
480         DEBUG1("Computing on %s", MSG_process_self()->simdata->host->name);
481   simdata->using++;
482         SIMIX_mutex_lock(simdata->mutex);
483   simdata->compute = SIMIX_action_execute(SIMIX_host_self(), task->name, simdata->computation_amount);
484         SIMIX_action_set_priority(simdata->compute, simdata->priority);
485
486         SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
487         SIMIX_register_condition_to_action(simdata->compute, simdata->cond);
488         
489         SIMIX_cond_wait(simdata->cond, simdata->mutex);
490
491         /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
492         simdata->comm = NULL;
493         simdata->compute = NULL;
494
495         SIMIX_mutex_unlock(simdata->mutex);
496   simdata->using--;
497         MSG_RETURN(MSG_OK);
498 }
499
500
501 /** \ingroup m_task_management
502  * \brief Creates a new #m_task_t (a parallel one....).
503  *
504  * A constructor for #m_task_t taking six arguments and returning the 
505    corresponding object.
506  * \param name a name for the object. It is for user-level information
507    and can be NULL.
508  * \param host_nb the number of hosts implied in the parallel task.
509  * \param host_list an array of \p host_nb m_host_t.
510  * \param computation_amount an array of \p host_nb
511    doubles. computation_amount[i] is the total number of operations
512    that have to be performed on host_list[i].
513  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
514  * \param data a pointer to any data may want to attach to the new
515    object.  It is for user-level information and can be NULL. It can
516    be retrieved with the function \ref MSG_task_get_data.
517  * \see m_task_t
518  * \return The new corresponding object.
519  */
520 m_task_t MSG_parallel_task_create(const char *name, 
521                                   int host_nb,
522                                   const m_host_t *host_list,
523                                   double *computation_amount,
524                                   double *communication_amount,
525                                   void *data)
526 {
527   m_task_t task = xbt_new0(s_m_task_t,1);
528         xbt_die("not implemented yet");
529   return task;
530 }
531
532
533 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
534 {
535         return;
536 }
537
538 MSG_error_t MSG_parallel_task_execute(m_task_t task)
539 {
540
541         xbt_die("not implemented yet");
542   return MSG_OK;  
543 }
544
545
546 /** \ingroup msg_gos_functions
547  * \brief Sleep for the specified number of seconds
548  *
549  * Makes the current process sleep until \a time seconds have elapsed.
550  *
551  * \param nb_sec a number of second
552  */
553 MSG_error_t MSG_process_sleep(double nb_sec)
554 {
555         smx_action_t act_sleep;
556         m_process_t proc = MSG_process_self();
557         smx_mutex_t mutex;
558         smx_cond_t cond;
559         /* create action to sleep */
560         act_sleep = SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->smx_process),nb_sec);
561         
562         mutex = SIMIX_mutex_init();
563         SIMIX_mutex_lock(mutex);
564         /* create conditional and register action to it */
565         cond = SIMIX_cond_init();
566
567         SIMIX_register_condition_to_action(act_sleep, cond);
568         SIMIX_register_action_to_condition(act_sleep, cond);
569         SIMIX_cond_wait(cond,mutex);
570         SIMIX_mutex_unlock(mutex);
571
572         /* remove variables */
573         SIMIX_cond_destroy(cond);
574         SIMIX_mutex_destroy(mutex);
575
576         MSG_RETURN(MSG_OK);
577 }
578
579 /** \ingroup msg_gos_functions
580  * \brief Return the number of MSG tasks currently running on
581  * the host of the current running process.
582  */
583 static int MSG_get_msgload(void) 
584 {
585         xbt_die("not implemented yet");
586         return 0;
587 }
588
589 /** \ingroup msg_gos_functions
590  *
591  * \brief Return the last value returned by a MSG function (except
592  * MSG_get_errno...).
593  */
594 MSG_error_t MSG_get_errno(void)
595 {
596   return PROCESS_GET_ERRNO();
597 }