Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ONGOING work on exceptions plus minor cleanups.
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 /* \ingroup msg_gos_functions
20  * \brief This function is now deprecated and useless. Please stop using it.
21  */
22 MSG_error_t MSG_process_start(m_process_t process)
23 {
24   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
25   
26   return MSG_OK;
27 }
28
29 /** \ingroup msg_gos_functions
30  * \brief Listen on a channel and wait for receiving a task.
31  *
32  * It takes two parameters.
33  * \param task a memory location for storing a #m_task_t. It will
34    hold a task when this function will return. Thus \a task should not
35    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
36    those two condition does not hold, there will be a warning message.
37  * \param channel the channel on which the agent should be
38    listening. This value has to be >=0 and < than the maximal
39    number of channels fixed with MSG_set_channel_number().
40  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
41  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
42  */
43 MSG_error_t MSG_task_get(m_task_t * task,
44                          m_channel_t channel)
45 {
46   return MSG_task_get_with_time_out(task, channel, -1);
47 }
48
49 /** \ingroup msg_gos_functions
50  * \brief Listen on a channel and wait for receiving a task with a timeout.
51  *
52  * It takes three parameters.
53  * \param task a memory location for storing a #m_task_t. It will
54    hold a task when this function will return. Thus \a task should not
55    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
56    those two condition does not hold, there will be a warning message.
57  * \param channel the channel on which the agent should be
58    listening. This value has to be >=0 and < than the maximal
59    number of channels fixed with MSG_set_channel_number().
60  * \param max_duration the maximum time to wait for a task before giving
61     up. In such a case, \a task will not be modified and will still be
62     equal to \c NULL when returning.
63  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
64    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
65  */
66
67 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
68                                        m_channel_t channel,
69                                        double max_duration)
70 {
71   m_process_t process = MSG_process_self();
72   m_task_t t = NULL;
73   m_host_t h = NULL;
74   simdata_task_t t_simdata = NULL;
75   simdata_host_t h_simdata = NULL;
76   int first_time = 1;
77   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
78   
79   CHECK_HOST();
80   /* Sanity check */
81   xbt_assert0(task,"Null pointer for the task\n");
82
83   if (*task) 
84     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
85
86   /* Get the task */
87   h = MSG_host_self();
88   h_simdata = h->simdata;
89
90   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
91
92   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
93     if(max_duration>0) {
94       if(!first_time) {
95         MSG_RETURN(MSG_OK);
96       }
97     }
98     xbt_assert2(!(h_simdata->sleeping[channel]),
99                 "A process (%s(%d)) is already blocked on this channel",
100                 h_simdata->sleeping[channel]->name,
101                 h_simdata->sleeping[channel]->simdata->PID);
102     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
103     if(max_duration>0) {
104       __MSG_process_block(max_duration);
105     } else {
106       __MSG_process_block(-1);
107     }
108     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
109        == SURF_CPU_OFF)
110       MSG_RETURN(MSG_HOST_FAILURE);
111     h_simdata->sleeping[channel] = NULL;
112     first_time = 0;
113     /* OK, we should both be ready now. Are you there ? */
114   }
115
116   t_simdata = t->simdata;
117   /*   *task = __MSG_task_copy(t); */
118   *task=t;
119
120   /* Transfer */
121   t_simdata->using++;
122
123   t_simdata->comm = surf_workstation_resource->extension_public->
124     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
125                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
126   
127   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
128
129   if(__MSG_process_isBlocked(t_simdata->sender)) 
130     __MSG_process_unblock(t_simdata->sender);
131
132   PAJE_PROCESS_PUSH_STATE(process,"C");  
133
134   do {
135     __MSG_task_wait_event(process, t);
136     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
137   } while (state==SURF_ACTION_RUNNING);
138
139   if(t->simdata->using>1) {
140     xbt_fifo_unshift(msg_global->process_to_run,process);
141     xbt_context_yield();
142   }
143
144   PAJE_PROCESS_POP_STATE(process);
145   PAJE_COMM_STOP(process,t,channel);
146
147   if(state == SURF_ACTION_DONE) {
148     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
149       t_simdata->comm = NULL;
150     MSG_RETURN(MSG_OK);
151   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
152           == SURF_CPU_OFF) {
153     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
154       t_simdata->comm = NULL;
155     MSG_RETURN(MSG_HOST_FAILURE);
156   } else {
157     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
158       t_simdata->comm = NULL;
159     MSG_RETURN(MSG_TRANSFER_FAILURE);
160   }
161 }
162
163 /** \ingroup msg_gos_functions
164  * \brief Test whether there is a pending communication on a channel.
165  *
166  * It takes one parameter.
167  * \param channel the channel on which the agent should be
168    listening. This value has to be >=0 and < than the maximal
169    number of channels fixed with MSG_set_channel_number().
170  * \return 1 if there is a pending communication and 0 otherwise
171  */
172 int MSG_task_Iprobe(m_channel_t channel)
173 {
174   m_host_t h = NULL;
175   simdata_host_t h_simdata = NULL;
176
177   DEBUG2("Probing on channel %d (%s)", channel,h->name);
178   CHECK_HOST();
179   h = MSG_host_self();
180   h_simdata = h->simdata;
181   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
182 }
183
184 /** \ingroup msg_gos_functions
185  * \brief Test whether there is a pending communication on a channel, and who sent it.
186  *
187  * It takes one parameter.
188  * \param channel the channel on which the agent should be
189    listening. This value has to be >=0 and < than the maximal
190    number of channels fixed with MSG_set_channel_number().
191  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
192  */
193 int MSG_task_probe_from(m_channel_t channel)
194 {
195   m_host_t h = NULL;
196   simdata_host_t h_simdata = NULL;
197   xbt_fifo_item_t item;
198   m_task_t t;
199
200   CHECK_HOST();
201   h = MSG_host_self();
202   h_simdata = h->simdata;
203
204   DEBUG2("Probing on channel %d (%s)", channel,h->name);
205    
206   item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
207   if (!item || !(t = xbt_fifo_get_item_content(item)))
208     return -1;
209    
210   return MSG_process_get_PID(t->simdata->sender);
211 }
212
213 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
214                                     int *PID)
215 {
216   m_host_t h = NULL;
217   simdata_host_t h_simdata = NULL;
218   xbt_fifo_item_t item;
219   m_task_t t;
220   int first_time = 1;
221   m_process_t process = MSG_process_self();
222
223   if(PID) {
224     *PID = -1;
225   }
226
227   if(max_duration==0.0) {
228     return MSG_task_probe_from(channel);
229   } else {
230     CHECK_HOST();
231     h = MSG_host_self();
232     h_simdata = h->simdata;
233     
234     DEBUG2("Probing on channel %d (%s)", channel,h->name);
235     while(!(item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]))) {
236       if(max_duration>0) {
237         if(!first_time) {
238           MSG_RETURN(MSG_OK);
239         }
240       }
241       xbt_assert2(!(h_simdata->sleeping[channel]),
242                   "A process (%s(%d)) is already blocked on this channel",
243                   h_simdata->sleeping[channel]->name,
244                   h_simdata->sleeping[channel]->simdata->PID);
245       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
246       if(max_duration>0) {
247         __MSG_process_block(max_duration);
248       } else {
249         __MSG_process_block(-1);
250       }
251       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
252          == SURF_CPU_OFF) {
253         MSG_RETURN(MSG_HOST_FAILURE);
254       }
255       h_simdata->sleeping[channel] = NULL;
256       first_time = 0;
257     }
258     if (!item || !(t = xbt_fifo_get_item_content(item))) {
259       MSG_RETURN(MSG_OK);
260     }
261     if(PID) {
262       *PID = MSG_process_get_PID(t->simdata->sender);
263     }
264     MSG_RETURN(MSG_OK);
265   }
266 }
267 /** \ingroup msg_gos_functions
268  * \brief Put a task on a channel of an host and waits for the end of the
269  * transmission.
270  *
271  * This function is used for describing the behavior of an agent. It
272  * takes three parameter.
273  * \param task a #m_task_t to send on another location. This task
274    will not be usable anymore when the function will return. There is
275    no automatic task duplication and you have to save your parameters
276    before calling this function. Tasks are unique and once it has been
277    sent to another location, you should not access it anymore. You do
278    not need to call MSG_task_destroy() but to avoid using, as an
279    effect of inattention, this task anymore, you definitely should
280    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
281    can be transfered iff it has been correctly created with
282    MSG_task_create().
283  * \param dest the destination of the message
284  * \param channel the channel on which the agent should put this
285    task. This value has to be >=0 and < than the maximal number of
286    channels fixed with MSG_set_channel_number().
287  * \return #MSG_FATAL if \a task is not properly initialized and
288  * #MSG_OK otherwise.
289  */
290 MSG_error_t MSG_task_put(m_task_t task,
291                          m_host_t dest, m_channel_t channel)
292 {
293   m_process_t process = MSG_process_self();
294   simdata_task_t task_simdata = NULL;
295   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
296   m_host_t local_host = NULL;
297   m_host_t remote_host = NULL;
298
299   CHECK_HOST();
300
301   task_simdata = task->simdata;
302   task_simdata->sender = process;
303   xbt_assert0(task_simdata->using==1,"Gargl!");
304   task_simdata->comm = NULL;
305   
306   local_host = ((simdata_process_t) process->simdata)->host;
307   remote_host = dest;
308
309   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
310          task->simdata->message_size,local_host->name, remote_host->name, channel);
311
312   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
313                 mbox[channel], task);
314
315   PAJE_COMM_START(process,task,channel);
316     
317   if(remote_host->simdata->sleeping[channel]) 
318     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
319
320   process->simdata->put_host = dest;
321   process->simdata->put_channel = channel;
322   while(!(task_simdata->comm)) 
323     __MSG_process_block(-1);
324   surf_workstation_resource->common_public->action_use(task_simdata->comm);
325   process->simdata->put_host = NULL;
326   process->simdata->put_channel = -1;
327
328
329   PAJE_PROCESS_PUSH_STATE(process,"C");  
330
331   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
332   while (state==SURF_ACTION_RUNNING) {
333     __MSG_task_wait_event(process, task);
334     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
335   }
336     
337
338   PAJE_PROCESS_POP_STATE(process);  
339
340   if(state == SURF_ACTION_DONE) {
341     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
342       task_simdata->comm = NULL;
343     MSG_task_destroy(task);
344     MSG_RETURN(MSG_OK);
345   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
346             == SURF_CPU_OFF) {
347     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
348       task_simdata->comm = NULL;
349     MSG_task_destroy(task);
350     MSG_RETURN(MSG_HOST_FAILURE);
351   } else { 
352     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
353       task_simdata->comm = NULL;
354     MSG_task_destroy(task);
355     MSG_RETURN(MSG_TRANSFER_FAILURE);
356   }
357 }
358
359 /** \ingroup msg_gos_functions
360  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
361  * rate.
362  *
363  * \sa MSG_task_put
364  */
365 MSG_error_t MSG_task_put_bounded(m_task_t task,
366                                  m_host_t dest, m_channel_t channel,
367                                  double max_rate)
368 {
369   MSG_error_t res = MSG_OK;
370   task->simdata->rate=max_rate;
371   res = MSG_task_put(task, dest, channel);
372   task->simdata->rate=-1.0;
373   return(res);
374 }
375
376 /** \ingroup msg_gos_functions
377  * \brief Executes a task and waits for its termination.
378  *
379  * This function is used for describing the behavior of an agent. It
380  * takes only one parameter.
381  * \param task a #m_task_t to execute on the location on which the
382    agent is running.
383  * \return #MSG_FATAL if \a task is not properly initialized and
384  * #MSG_OK otherwise.
385  */
386 MSG_error_t MSG_task_execute(m_task_t task)
387 {
388   m_process_t process = MSG_process_self();
389   MSG_error_t res;
390
391   DEBUG1("Computing on %s", process->simdata->host->name);
392
393   __MSG_task_execute(process, task);
394
395   PAJE_PROCESS_PUSH_STATE(process,"E");  
396   res = __MSG_wait_for_computation(process,task);
397   PAJE_PROCESS_POP_STATE(process);
398   return res;
399 }
400
401 void __MSG_task_execute(m_process_t process, m_task_t task)
402 {
403   simdata_task_t simdata = NULL;
404
405   CHECK_HOST();
406
407   simdata = task->simdata;
408
409   simdata->compute = surf_workstation_resource->extension_public->
410     execute(MSG_process_get_host(process)->simdata->host,
411             simdata->computation_amount);
412   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
413 }
414
415 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
416 {
417   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
418   simdata_task_t simdata = task->simdata;
419
420   simdata->using++;
421   do {
422     __MSG_task_wait_event(process, task);
423     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
424   } while (state==SURF_ACTION_RUNNING);
425   simdata->using--;
426     
427
428   if(state == SURF_ACTION_DONE) {
429     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
430       simdata->compute = NULL;
431     simdata->computation_amount = 0.0;
432     MSG_RETURN(MSG_OK);
433   } else if(surf_workstation_resource->extension_public->
434             get_state(MSG_process_get_host(process)->simdata->host) 
435             == SURF_CPU_OFF) {
436     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
437       simdata->compute = NULL;
438     MSG_RETURN(MSG_HOST_FAILURE);
439   } else {
440     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
441       simdata->compute = NULL;
442     MSG_RETURN(MSG_TASK_CANCELLED);
443   }
444 }
445 /** \ingroup m_task_management
446  * \brief Creates a new #m_task_t (a parallel one....).
447  *
448  * A constructor for #m_task_t taking six arguments and returning the 
449    corresponding object.
450  * \param name a name for the object. It is for user-level information
451    and can be NULL.
452  * \param host_nb the number of hosts implied in the parallel task.
453  * \param host_list an array of #host_nb m_host_t.
454  * \param computation_amount an array of #host_nb
455    doubles. computation_amount[i] is the total number of operations
456    that have to be performed on host_list[i].
457  * \param communication_amount an array of #host_nb*#host_nb doubles.
458  * \param data a pointer to any data may want to attach to the new
459    object.  It is for user-level information and can be NULL. It can
460    be retrieved with the function \ref MSG_task_get_data.
461  * \see m_task_t
462  * \return The new corresponding object.
463  */
464 m_task_t MSG_parallel_task_create(const char *name, 
465                                   int host_nb,
466                                   const m_host_t *host_list,
467                                   double *computation_amount,
468                                   double *communication_amount,
469                                   void *data)
470 {
471   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
472   m_task_t task = xbt_new0(s_m_task_t,1);
473   int i;
474
475   /* Task structure */
476   task->name = xbt_strdup(name);
477   task->simdata = simdata;
478   task->data = data;
479
480   /* Simulator Data */
481   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
482   simdata->rate = -1.0;
483   simdata->using = 1;
484   simdata->sender = NULL;
485   simdata->host_nb = host_nb;
486   
487   simdata->host_list = xbt_new0(void *, host_nb);
488   simdata->comp_amount = computation_amount;
489   simdata->comm_amount = communication_amount;
490
491   for(i=0;i<host_nb;i++)
492     simdata->host_list[i] = host_list[i]->simdata->host;
493
494   return task;
495 }
496
497
498 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
499 {
500   simdata_task_t simdata = NULL;
501
502   CHECK_HOST();
503
504   simdata = task->simdata;
505
506   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
507
508   simdata->compute = surf_workstation_resource->extension_public->
509   execute_parallel_task(task->simdata->host_nb,
510                         task->simdata->host_list,
511                         task->simdata->comp_amount,
512                         task->simdata->comm_amount,
513                         1.0,
514                         -1.0);
515   if(simdata->compute)
516     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
517 }
518
519 MSG_error_t MSG_parallel_task_execute(m_task_t task)
520 {
521   m_process_t process = MSG_process_self();
522   MSG_error_t res;
523
524   DEBUG0("Computing on a tons of guys");
525   
526   __MSG_parallel_task_execute(process, task);
527
528   if(task->simdata->compute)
529     res = __MSG_wait_for_computation(process,task);
530   else 
531     res = MSG_OK;
532
533   return res;  
534 }
535
536
537 /** \ingroup msg_gos_functions
538  * \brief Sleep for the specified number of seconds
539  *
540  * Makes the current process sleep until \a time seconds have elapsed.
541  *
542  * \param nb_sec a number of second
543  */
544 MSG_error_t MSG_process_sleep(double nb_sec)
545 {
546   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
547   m_process_t process = MSG_process_self();
548   m_task_t dummy = NULL;
549   simdata_task_t simdata = NULL;
550
551   CHECK_HOST();
552   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
553   simdata = dummy->simdata;
554
555   simdata->compute = surf_workstation_resource->extension_public->
556     sleep(MSG_process_get_host(process)->simdata->host,
557             simdata->computation_amount);
558   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
559
560   
561   simdata->using++;
562   do {
563     __MSG_task_wait_event(process, dummy);
564     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
565   } while (state==SURF_ACTION_RUNNING);
566   simdata->using--;
567     
568   if(state == SURF_ACTION_DONE) {
569     if(surf_workstation_resource->extension_public->
570        get_state(MSG_process_get_host(process)->simdata->host) 
571        == SURF_CPU_OFF) {
572       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
573         simdata->compute = NULL;
574       MSG_RETURN(MSG_HOST_FAILURE);
575     }
576     if(__MSG_process_isBlocked(process)) {
577       __MSG_process_unblock(MSG_process_self());
578     }
579     if(surf_workstation_resource->extension_public->
580        get_state(MSG_process_get_host(process)->simdata->host) 
581        == SURF_CPU_OFF) {
582       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
583         simdata->compute = NULL;
584       MSG_RETURN(MSG_HOST_FAILURE);
585     }
586     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
587       simdata->compute = NULL;
588     MSG_task_destroy(dummy);
589     MSG_RETURN(MSG_OK);
590   } else MSG_RETURN(MSG_HOST_FAILURE);
591 }
592
593 /** \ingroup msg_gos_functions
594  * \brief Return the number of MSG tasks currently running on a
595  * the host of the current running process.
596  */
597 static int MSG_get_msgload(void) 
598 {
599   m_process_t process;
600    
601   CHECK_HOST();
602   
603   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
604   process = MSG_process_self();
605   return xbt_fifo_size(process->simdata->host->simdata->process_list);
606 }
607
608 /** \ingroup msg_gos_functions
609  *
610  * \brief Return the the last value returned by a MSG function (except
611  * MSG_get_errno...).
612  */
613 MSG_error_t MSG_get_errno(void)
614 {
615   return PROCESS_GET_ERRNO();
616 }