Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Deprecating stupidly named functions...
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 /** \ingroup msg_gos_functions
20  * \brief Listen on a channel and wait for receiving a task.
21  *
22  * It takes two parameters.
23  * \param task a memory location for storing a #m_task_t. It will
24    hold a task when this function will return. Thus \a task should not
25    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
26    those two condition does not hold, there will be a warning message.
27  * \param channel the channel on which the agent should be
28    listening. This value has to be >=0 and < than the maximal
29    number of channels fixed with MSG_set_channel_number().
30  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
31  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
32  */
33 MSG_error_t MSG_task_get(m_task_t * task,
34                          m_channel_t channel)
35 {
36   return MSG_task_get_with_time_out(task, channel, -1);
37 }
38
39 /** \ingroup msg_gos_functions
40  * \brief Listen on a channel and wait for receiving a task with a timeout.
41  *
42  * It takes three parameters.
43  * \param task a memory location for storing a #m_task_t. It will
44    hold a task when this function will return. Thus \a task should not
45    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
46    those two condition does not hold, there will be a warning message.
47  * \param channel the channel on which the agent should be
48    listening. This value has to be >=0 and < than the maximal
49    number of channels fixed with MSG_set_channel_number().
50  * \param max_duration the maximum time to wait for a task before giving
51     up. In such a case, \a task will not be modified and will still be
52     equal to \c NULL when returning.
53  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
54    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
55  */
56
57 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
58                                        m_channel_t channel,
59                                        double max_duration)
60 {
61   m_process_t process = MSG_process_self();
62   m_task_t t = NULL;
63   m_host_t h = NULL;
64   simdata_task_t t_simdata = NULL;
65   simdata_host_t h_simdata = NULL;
66   int first_time = 1;
67   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
68   
69   CHECK_HOST();
70   /* Sanity check */
71   xbt_assert0(task,"Null pointer for the task\n");
72
73   if (*task) 
74     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
75
76   /* Get the task */
77   h = MSG_host_self();
78   h_simdata = h->simdata;
79
80   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
81
82   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
83     if(max_duration>0) {
84       if(!first_time) {
85         MSG_RETURN(MSG_OK);
86       }
87     }
88     xbt_assert2(!(h_simdata->sleeping[channel]),
89                 "A process (%s(%d)) is already blocked on this channel",
90                 h_simdata->sleeping[channel]->name,
91                 h_simdata->sleeping[channel]->simdata->PID);
92     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
93     if(max_duration>0) {
94       __MSG_process_block(max_duration);
95     } else {
96       __MSG_process_block(-1);
97     }
98     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
99        == SURF_CPU_OFF)
100       MSG_RETURN(MSG_HOST_FAILURE);
101     h_simdata->sleeping[channel] = NULL;
102     first_time = 0;
103     /* OK, we should both be ready now. Are you there ? */
104   }
105
106   t_simdata = t->simdata;
107   /*   *task = __MSG_task_copy(t); */
108   *task=t;
109
110   /* Transfer */
111   t_simdata->using++;
112
113   t_simdata->comm = surf_workstation_resource->extension_public->
114     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
115                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
116   
117   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
118
119   if(__MSG_process_isBlocked(t_simdata->sender)) 
120     __MSG_process_unblock(t_simdata->sender);
121
122   PAJE_PROCESS_PUSH_STATE(process,"C");  
123
124   do {
125     __MSG_task_wait_event(process, t);
126     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
127   } while (state==SURF_ACTION_RUNNING);
128
129   if(t->simdata->using>1) {
130     xbt_fifo_unshift(msg_global->process_to_run,process);
131     xbt_context_yield();
132   }
133
134   PAJE_PROCESS_POP_STATE(process);
135   PAJE_COMM_STOP(process,t,channel);
136
137   if(state == SURF_ACTION_DONE) {
138     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
139       t_simdata->comm = NULL;
140     MSG_RETURN(MSG_OK);
141   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
142           == SURF_CPU_OFF) {
143     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
144       t_simdata->comm = NULL;
145     MSG_RETURN(MSG_HOST_FAILURE);
146   } else {
147     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
148       t_simdata->comm = NULL;
149     MSG_RETURN(MSG_TRANSFER_FAILURE);
150   }
151 }
152
153 /** \ingroup msg_gos_functions
154  * \brief Test whether there is a pending communication on a channel.
155  *
156  * It takes one parameter.
157  * \param channel the channel on which the agent should be
158    listening. This value has to be >=0 and < than the maximal
159    number of channels fixed with MSG_set_channel_number().
160  * \return 1 if there is a pending communication and 0 otherwise
161  */
162 int MSG_task_Iprobe(m_channel_t channel)
163 {
164   m_host_t h = NULL;
165   simdata_host_t h_simdata = NULL;
166
167   DEBUG2("Probing on channel %d (%s)", channel,h->name);
168   CHECK_HOST();
169   h = MSG_host_self();
170   h_simdata = h->simdata;
171   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
172 }
173
174 /** \ingroup msg_gos_functions
175  * \brief Test whether there is a pending communication on a channel, and who sent it.
176  *
177  * It takes one parameter.
178  * \param channel the channel on which the agent should be
179    listening. This value has to be >=0 and < than the maximal
180    number of channels fixed with MSG_set_channel_number().
181  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
182  */
183 int MSG_task_probe_from(m_channel_t channel)
184 {
185   m_host_t h = NULL;
186   simdata_host_t h_simdata = NULL;
187   xbt_fifo_item_t item;
188   m_task_t t;
189
190   CHECK_HOST();
191   h = MSG_host_self();
192   h_simdata = h->simdata;
193
194   DEBUG2("Probing on channel %d (%s)", channel,h->name);
195    
196   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
197   if (!item || !(t = xbt_fifo_get_item_content(item)))
198     return -1;
199    
200   return MSG_process_get_PID(t->simdata->sender);
201 }
202
203 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
204                                     int *PID)
205 {
206   m_host_t h = NULL;
207   simdata_host_t h_simdata = NULL;
208   xbt_fifo_item_t item;
209   m_task_t t;
210   int first_time = 1;
211   m_process_t process = MSG_process_self();
212
213   if(PID) {
214     *PID = -1;
215   }
216
217   if(max_duration==0.0) {
218     return MSG_task_probe_from(channel);
219   } else {
220     CHECK_HOST();
221     h = MSG_host_self();
222     h_simdata = h->simdata;
223     
224     DEBUG2("Probing on channel %d (%s)", channel,h->name);
225     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
226       if(max_duration>0) {
227         if(!first_time) {
228           MSG_RETURN(MSG_OK);
229         }
230       }
231       xbt_assert2(!(h_simdata->sleeping[channel]),
232                   "A process (%s(%d)) is already blocked on this channel",
233                   h_simdata->sleeping[channel]->name,
234                   h_simdata->sleeping[channel]->simdata->PID);
235       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
236       if(max_duration>0) {
237         __MSG_process_block(max_duration);
238       } else {
239         __MSG_process_block(-1);
240       }
241       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
242          == SURF_CPU_OFF) {
243         MSG_RETURN(MSG_HOST_FAILURE);
244       }
245       h_simdata->sleeping[channel] = NULL;
246       first_time = 0;
247     }
248     if (!item || !(t = xbt_fifo_get_item_content(item))) {
249       MSG_RETURN(MSG_OK);
250     }
251     if(PID) {
252       *PID = MSG_process_get_PID(t->simdata->sender);
253     }
254     MSG_RETURN(MSG_OK);
255   }
256 }
257 /** \ingroup msg_gos_functions
258  * \brief Put a task on a channel of an host and waits for the end of the
259  * transmission.
260  *
261  * This function is used for describing the behavior of an agent. It
262  * takes three parameter.
263  * \param task a #m_task_t to send on another location. This task
264    will not be usable anymore when the function will return. There is
265    no automatic task duplication and you have to save your parameters
266    before calling this function. Tasks are unique and once it has been
267    sent to another location, you should not access it anymore. You do
268    not need to call MSG_task_destroy() but to avoid using, as an
269    effect of inattention, this task anymore, you definitely should
270    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
271    can be transfered iff it has been correctly created with
272    MSG_task_create().
273  * \param dest the destination of the message
274  * \param channel the channel on which the agent should put this
275    task. This value has to be >=0 and < than the maximal number of
276    channels fixed with MSG_set_channel_number().
277  * \return #MSG_FATAL if \a task is not properly initialized and
278  * #MSG_OK otherwise.
279  */
280 MSG_error_t MSG_task_put(m_task_t task,
281                          m_host_t dest, m_channel_t channel)
282 {
283   m_process_t process = MSG_process_self();
284   simdata_task_t task_simdata = NULL;
285   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
286   m_host_t local_host = NULL;
287   m_host_t remote_host = NULL;
288
289   CHECK_HOST();
290
291   task_simdata = task->simdata;
292   task_simdata->sender = process;
293   xbt_assert0(task_simdata->using==1,"Gargl!");
294   task_simdata->comm = NULL;
295   
296   local_host = ((simdata_process_t) process->simdata)->host;
297   remote_host = dest;
298
299   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
300          task->simdata->message_size,local_host->name, remote_host->name, channel);
301
302   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
303                 mbox[channel], task);
304
305   PAJE_COMM_START(process,task,channel);
306     
307   if(remote_host->simdata->sleeping[channel]) 
308     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
309
310   process->simdata->put_host = dest;
311   process->simdata->put_channel = channel;
312   while(!(task_simdata->comm)) 
313     __MSG_process_block(-1);
314   surf_workstation_resource->common_public->action_use(task_simdata->comm);
315   process->simdata->put_host = NULL;
316   process->simdata->put_channel = -1;
317
318
319   PAJE_PROCESS_PUSH_STATE(process,"C");  
320
321   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
322   while (state==SURF_ACTION_RUNNING) {
323     __MSG_task_wait_event(process, task);
324     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
325   }
326     
327
328   PAJE_PROCESS_POP_STATE(process);  
329
330   if(state == SURF_ACTION_DONE) {
331     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
332       task_simdata->comm = NULL;
333     MSG_task_destroy(task);
334     MSG_RETURN(MSG_OK);
335   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
336             == SURF_CPU_OFF) {
337     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
338       task_simdata->comm = NULL;
339     MSG_task_destroy(task);
340     MSG_RETURN(MSG_HOST_FAILURE);
341   } else { 
342     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
343       task_simdata->comm = NULL;
344     MSG_task_destroy(task);
345     MSG_RETURN(MSG_TRANSFER_FAILURE);
346   }
347 }
348
349 /** \ingroup msg_gos_functions
350  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
351  * rate.
352  *
353  * \sa MSG_task_put
354  */
355 MSG_error_t MSG_task_put_bounded(m_task_t task,
356                                  m_host_t dest, m_channel_t channel,
357                                  double max_rate)
358 {
359   MSG_error_t res = MSG_OK;
360   task->simdata->rate=max_rate;
361   res = MSG_task_put(task, dest, channel);
362   task->simdata->rate=-1.0;
363   return(res);
364 }
365
366 /** \ingroup msg_gos_functions
367  * \brief Executes a task and waits for its termination.
368  *
369  * This function is used for describing the behavior of an agent. It
370  * takes only one parameter.
371  * \param task a #m_task_t to execute on the location on which the
372    agent is running.
373  * \return #MSG_FATAL if \a task is not properly initialized and
374  * #MSG_OK otherwise.
375  */
376 MSG_error_t MSG_task_execute(m_task_t task)
377 {
378   m_process_t process = MSG_process_self();
379   MSG_error_t res;
380
381   DEBUG1("Computing on %s", process->simdata->host->name);
382
383   __MSG_task_execute(process, task);
384
385   PAJE_PROCESS_PUSH_STATE(process,"E");  
386   res = __MSG_wait_for_computation(process,task);
387   PAJE_PROCESS_POP_STATE(process);
388   return res;
389 }
390
391 void __MSG_task_execute(m_process_t process, m_task_t task)
392 {
393   simdata_task_t simdata = NULL;
394
395   CHECK_HOST();
396
397   simdata = task->simdata;
398
399   simdata->compute = surf_workstation_resource->extension_public->
400     execute(MSG_process_get_host(process)->simdata->host,
401             simdata->computation_amount);
402   surf_workstation_resource->common_public->
403     set_priority(simdata->compute, simdata->priority);
404
405   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
406 }
407
408 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
409 {
410   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
411   simdata_task_t simdata = task->simdata;
412
413   simdata->using++;
414   do {
415     __MSG_task_wait_event(process, task);
416     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
417   } while (state==SURF_ACTION_RUNNING);
418   simdata->using--;
419     
420
421   if(state == SURF_ACTION_DONE) {
422     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
423       simdata->compute = NULL;
424     simdata->computation_amount = 0.0;
425     MSG_RETURN(MSG_OK);
426   } else if(surf_workstation_resource->extension_public->
427             get_state(MSG_process_get_host(process)->simdata->host) 
428             == SURF_CPU_OFF) {
429     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
430       simdata->compute = NULL;
431     MSG_RETURN(MSG_HOST_FAILURE);
432   } else {
433     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
434       simdata->compute = NULL;
435     MSG_RETURN(MSG_TASK_CANCELLED);
436   }
437 }
438 /** \ingroup m_task_management
439  * \brief Creates a new #m_task_t (a parallel one....).
440  *
441  * A constructor for #m_task_t taking six arguments and returning the 
442    corresponding object.
443  * \param name a name for the object. It is for user-level information
444    and can be NULL.
445  * \param host_nb the number of hosts implied in the parallel task.
446  * \param host_list an array of #host_nb m_host_t.
447  * \param computation_amount an array of #host_nb
448    doubles. computation_amount[i] is the total number of operations
449    that have to be performed on host_list[i].
450  * \param communication_amount an array of #host_nb*#host_nb doubles.
451  * \param data a pointer to any data may want to attach to the new
452    object.  It is for user-level information and can be NULL. It can
453    be retrieved with the function \ref MSG_task_get_data.
454  * \see m_task_t
455  * \return The new corresponding object.
456  */
457 m_task_t MSG_parallel_task_create(const char *name, 
458                                   int host_nb,
459                                   const m_host_t *host_list,
460                                   double *computation_amount,
461                                   double *communication_amount,
462                                   void *data)
463 {
464   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
465   m_task_t task = xbt_new0(s_m_task_t,1);
466   int i;
467
468   /* Task structure */
469   task->name = xbt_strdup(name);
470   task->simdata = simdata;
471   task->data = data;
472
473   /* Simulator Data */
474   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
475   simdata->rate = -1.0;
476   simdata->using = 1;
477   simdata->sender = NULL;
478   simdata->host_nb = host_nb;
479   
480   simdata->host_list = xbt_new0(void *, host_nb);
481   simdata->comp_amount = computation_amount;
482   simdata->comm_amount = communication_amount;
483
484   for(i=0;i<host_nb;i++)
485     simdata->host_list[i] = host_list[i]->simdata->host;
486
487   return task;
488 }
489
490
491 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
492 {
493   simdata_task_t simdata = NULL;
494
495   CHECK_HOST();
496
497   simdata = task->simdata;
498
499   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
500
501   simdata->compute = surf_workstation_resource->extension_public->
502   execute_parallel_task(task->simdata->host_nb,
503                         task->simdata->host_list,
504                         task->simdata->comp_amount,
505                         task->simdata->comm_amount,
506                         1.0,
507                         -1.0);
508   if(simdata->compute)
509     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
510 }
511
512 MSG_error_t MSG_parallel_task_execute(m_task_t task)
513 {
514   m_process_t process = MSG_process_self();
515   MSG_error_t res;
516
517   DEBUG0("Computing on a tons of guys");
518   
519   __MSG_parallel_task_execute(process, task);
520
521   if(task->simdata->compute)
522     res = __MSG_wait_for_computation(process,task);
523   else 
524     res = MSG_OK;
525
526   return res;  
527 }
528
529
530 /** \ingroup msg_gos_functions
531  * \brief Sleep for the specified number of seconds
532  *
533  * Makes the current process sleep until \a time seconds have elapsed.
534  *
535  * \param nb_sec a number of second
536  */
537 MSG_error_t MSG_process_sleep(double nb_sec)
538 {
539   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
540   m_process_t process = MSG_process_self();
541   m_task_t dummy = NULL;
542   simdata_task_t simdata = NULL;
543
544   CHECK_HOST();
545   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
546   simdata = dummy->simdata;
547
548   simdata->compute = surf_workstation_resource->extension_public->
549     sleep(MSG_process_get_host(process)->simdata->host,
550             simdata->computation_amount);
551   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
552
553   
554   simdata->using++;
555   do {
556     __MSG_task_wait_event(process, dummy);
557     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
558   } while (state==SURF_ACTION_RUNNING);
559   simdata->using--;
560     
561   if(state == SURF_ACTION_DONE) {
562     if(surf_workstation_resource->extension_public->
563        get_state(MSG_process_get_host(process)->simdata->host) 
564        == SURF_CPU_OFF) {
565       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
566         simdata->compute = NULL;
567       MSG_RETURN(MSG_HOST_FAILURE);
568     }
569     if(__MSG_process_isBlocked(process)) {
570       __MSG_process_unblock(MSG_process_self());
571     }
572     if(surf_workstation_resource->extension_public->
573        get_state(MSG_process_get_host(process)->simdata->host) 
574        == SURF_CPU_OFF) {
575       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
576         simdata->compute = NULL;
577       MSG_RETURN(MSG_HOST_FAILURE);
578     }
579     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
580       simdata->compute = NULL;
581     MSG_task_destroy(dummy);
582     MSG_RETURN(MSG_OK);
583   } else MSG_RETURN(MSG_HOST_FAILURE);
584 }
585
586 /** \ingroup msg_gos_functions
587  * \brief Return the number of MSG tasks currently running on a
588  * the host of the current running process.
589  */
590 static int MSG_get_msgload(void) 
591 {
592   m_process_t process;
593    
594   CHECK_HOST();
595   
596   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
597   process = MSG_process_self();
598   return xbt_fifo_size(process->simdata->host->simdata->process_list);
599 }
600
601 /** \ingroup msg_gos_functions
602  *
603  * \brief Return the the last value returned by a MSG function (except
604  * MSG_get_errno...).
605  */
606 MSG_error_t MSG_get_errno(void)
607 {
608   return PROCESS_GET_ERRNO();
609 }