Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
The buf plugin is dead on SG. Thanks to the verbosity of simulated exceptions for...
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 /** \ingroup msg_gos_functions
20  * \brief Listen on a channel and wait for receiving a task.
21  *
22  * It takes two parameters.
23  * \param task a memory location for storing a #m_task_t. It will
24    hold a task when this function will return. Thus \a task should not
25    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
26    those two condition does not hold, there will be a warning message.
27  * \param channel the channel on which the agent should be
28    listening. This value has to be >=0 and < than the maximal
29    number of channels fixed with MSG_set_channel_number().
30  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
31  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
32  */
33 MSG_error_t MSG_task_get(m_task_t * task,
34                          m_channel_t channel)
35 {
36   return MSG_task_get_with_time_out(task, channel, -1);
37 }
38
39 /** \ingroup msg_gos_functions
40  * \brief Listen on a channel and wait for receiving a task with a timeout.
41  *
42  * It takes three parameters.
43  * \param task a memory location for storing a #m_task_t. It will
44    hold a task when this function will return. Thus \a task should not
45    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
46    those two condition does not hold, there will be a warning message.
47  * \param channel the channel on which the agent should be
48    listening. This value has to be >=0 and < than the maximal
49    number of channels fixed with MSG_set_channel_number().
50  * \param max_duration the maximum time to wait for a task before giving
51     up. In such a case, \a task will not be modified and will still be
52     equal to \c NULL when returning.
53  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
54    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
55  */
56
57 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
58                                        m_channel_t channel,
59                                        double max_duration)
60 {
61   m_process_t process = MSG_process_self();
62   m_task_t t = NULL;
63   m_host_t h = NULL;
64   simdata_task_t t_simdata = NULL;
65   simdata_host_t h_simdata = NULL;
66   int first_time = 1;
67   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
68   
69   CHECK_HOST();
70   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
71   /* Sanity check */
72   xbt_assert0(task,"Null pointer for the task\n");
73
74   if (*task) 
75     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
76
77   /* Get the task */
78   h = MSG_host_self();
79   h_simdata = h->simdata;
80
81   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
82
83   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
84     if(max_duration>0) {
85       if(!first_time) {
86         MSG_RETURN(MSG_OK);
87       }
88     }
89     xbt_assert2(!(h_simdata->sleeping[channel]),
90                 "A process (%s(%d)) is already blocked on this channel",
91                 h_simdata->sleeping[channel]->name,
92                 h_simdata->sleeping[channel]->simdata->PID);
93     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
94     if(max_duration>0) {
95       __MSG_process_block(max_duration);
96     } else {
97       __MSG_process_block(-1);
98     }
99     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
100        == SURF_CPU_OFF)
101       MSG_RETURN(MSG_HOST_FAILURE);
102     h_simdata->sleeping[channel] = NULL;
103     first_time = 0;
104     /* OK, we should both be ready now. Are you there ? */
105   }
106
107   DEBUG1("OK, got a task (%s)", t->name);
108
109   t_simdata = t->simdata;
110   /*   *task = __MSG_task_copy(t); */
111   *task=t;
112
113   /* Transfer */
114   t_simdata->using++;
115
116   DEBUG0("Calling SURF for communication creation");
117   t_simdata->comm = surf_workstation_resource->extension_public->
118     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
119                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
120   
121   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
122
123   if(__MSG_process_isBlocked(t_simdata->sender))
124     __MSG_process_unblock(t_simdata->sender);
125
126   PAJE_PROCESS_PUSH_STATE(process,"C");  
127
128   do {
129     DEBUG0("Waiting for action termination");
130     __MSG_task_wait_event(process, t);
131     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
132   } while (state==SURF_ACTION_RUNNING);
133   DEBUG0("Action terminated");
134
135   if(t->simdata->using>1) {
136     xbt_fifo_unshift(msg_global->process_to_run,process);
137     xbt_context_yield();
138   }
139
140   PAJE_PROCESS_POP_STATE(process);
141   PAJE_COMM_STOP(process,t,channel);
142
143   if(state == SURF_ACTION_DONE) {
144     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
145       t_simdata->comm = NULL;
146     MSG_RETURN(MSG_OK);
147   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
148           == SURF_CPU_OFF) {
149     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
150       t_simdata->comm = NULL;
151     MSG_RETURN(MSG_HOST_FAILURE);
152   } else {
153     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
154       t_simdata->comm = NULL;
155     MSG_RETURN(MSG_TRANSFER_FAILURE);
156   }
157 }
158
159 /** \ingroup msg_gos_functions
160  * \brief Test whether there is a pending communication on a channel.
161  *
162  * It takes one parameter.
163  * \param channel the channel on which the agent should be
164    listening. This value has to be >=0 and < than the maximal
165    number of channels fixed with MSG_set_channel_number().
166  * \return 1 if there is a pending communication and 0 otherwise
167  */
168 int MSG_task_Iprobe(m_channel_t channel)
169 {
170   m_host_t h = NULL;
171   simdata_host_t h_simdata = NULL;
172
173   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
174   DEBUG2("Probing on channel %d (%s)", channel,h->name);
175   CHECK_HOST();
176   h = MSG_host_self();
177   h_simdata = h->simdata;
178   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
179 }
180
181 /** \ingroup msg_gos_functions
182  * \brief Test whether there is a pending communication on a channel, and who sent it.
183  *
184  * It takes one parameter.
185  * \param channel the channel on which the agent should be
186    listening. This value has to be >=0 and < than the maximal
187    number of channels fixed with MSG_set_channel_number().
188  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
189  */
190 int MSG_task_probe_from(m_channel_t channel)
191 {
192   m_host_t h = NULL;
193   simdata_host_t h_simdata = NULL;
194   xbt_fifo_item_t item;
195   m_task_t t;
196
197   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
198   CHECK_HOST();
199   h = MSG_host_self();
200   h_simdata = h->simdata;
201
202   DEBUG2("Probing on channel %d (%s)", channel,h->name);
203    
204   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
205   if (!item || !(t = xbt_fifo_get_item_content(item)))
206     return -1;
207    
208   return MSG_process_get_PID(t->simdata->sender);
209 }
210
211 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
212                                     int *PID)
213 {
214   m_host_t h = NULL;
215   simdata_host_t h_simdata = NULL;
216   xbt_fifo_item_t item;
217   m_task_t t;
218   int first_time = 1;
219   m_process_t process = MSG_process_self();
220
221   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
222   if(PID) {
223     *PID = -1;
224   }
225
226   if(max_duration==0.0) {
227     return MSG_task_probe_from(channel);
228   } else {
229     CHECK_HOST();
230     h = MSG_host_self();
231     h_simdata = h->simdata;
232     
233     DEBUG2("Probing on channel %d (%s)", channel,h->name);
234     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
235       if(max_duration>0) {
236         if(!first_time) {
237           MSG_RETURN(MSG_OK);
238         }
239       }
240       xbt_assert2(!(h_simdata->sleeping[channel]),
241                   "A process (%s(%d)) is already blocked on this channel",
242                   h_simdata->sleeping[channel]->name,
243                   h_simdata->sleeping[channel]->simdata->PID);
244       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
245       if(max_duration>0) {
246         __MSG_process_block(max_duration);
247       } else {
248         __MSG_process_block(-1);
249       }
250       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
251          == SURF_CPU_OFF) {
252         MSG_RETURN(MSG_HOST_FAILURE);
253       }
254       h_simdata->sleeping[channel] = NULL;
255       first_time = 0;
256     }
257     if (!item || !(t = xbt_fifo_get_item_content(item))) {
258       MSG_RETURN(MSG_OK);
259     }
260     if(PID) {
261       *PID = MSG_process_get_PID(t->simdata->sender);
262     }
263     MSG_RETURN(MSG_OK);
264   }
265 }
266 /** \ingroup msg_gos_functions
267  * \brief Put a task on a channel of an host and waits for the end of the
268  * transmission.
269  *
270  * This function is used for describing the behavior of an agent. It
271  * takes three parameter.
272  * \param task a #m_task_t to send on another location. This task
273    will not be usable anymore when the function will return. There is
274    no automatic task duplication and you have to save your parameters
275    before calling this function. Tasks are unique and once it has been
276    sent to another location, you should not access it anymore. You do
277    not need to call MSG_task_destroy() but to avoid using, as an
278    effect of inattention, this task anymore, you definitely should
279    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
280    can be transfered iff it has been correctly created with
281    MSG_task_create().
282  * \param dest the destination of the message
283  * \param channel the channel on which the agent should put this
284    task. This value has to be >=0 and < than the maximal number of
285    channels fixed with MSG_set_channel_number().
286  * \return #MSG_FATAL if \a task is not properly initialized and
287  * #MSG_OK otherwise.
288  */
289 MSG_error_t MSG_task_put(m_task_t task,
290                          m_host_t dest, m_channel_t channel)
291 {
292   m_process_t process = MSG_process_self();
293   simdata_task_t task_simdata = NULL;
294   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
295   m_host_t local_host = NULL;
296   m_host_t remote_host = NULL;
297
298   CHECK_HOST();
299
300   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
301
302   task_simdata = task->simdata;
303   task_simdata->sender = process;
304   task_simdata->source = MSG_process_get_host(process);
305   xbt_assert0(task_simdata->using==1,"Gargl!");
306   task_simdata->comm = NULL;
307   
308   local_host = ((simdata_process_t) process->simdata)->host;
309   remote_host = dest;
310
311   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
312          task->simdata->message_size,local_host->name, remote_host->name, channel);
313
314   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
315                 mbox[channel], task);
316
317   PAJE_COMM_START(process,task,channel);
318     
319   if(remote_host->simdata->sleeping[channel]) {
320     DEBUG0("Somebody is listening. Let's wake him up!");
321     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
322   }
323
324   process->simdata->put_host = dest;
325   process->simdata->put_channel = channel;
326   while(!(task_simdata->comm)) {
327     DEBUG0("Communication not initiated yet. Let's block!");
328     __MSG_process_block(-1);
329   }
330   DEBUG0("Registering to this communication");
331   surf_workstation_resource->common_public->action_use(task_simdata->comm);
332   process->simdata->put_host = NULL;
333   process->simdata->put_channel = -1;
334
335
336   PAJE_PROCESS_PUSH_STATE(process,"C");  
337
338   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
339   while (state==SURF_ACTION_RUNNING) {
340     DEBUG0("Waiting for action termination");
341     __MSG_task_wait_event(process, task);
342     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
343   }
344   DEBUG0("Action terminated");
345
346   PAJE_PROCESS_POP_STATE(process);  
347
348   if(state == SURF_ACTION_DONE) {
349     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
350       task_simdata->comm = NULL;
351     MSG_task_destroy(task);
352     MSG_RETURN(MSG_OK);
353   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
354             == SURF_CPU_OFF) {
355     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
356       task_simdata->comm = NULL;
357     MSG_task_destroy(task);
358     MSG_RETURN(MSG_HOST_FAILURE);
359   } else { 
360     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
361       task_simdata->comm = NULL;
362     MSG_task_destroy(task);
363     MSG_RETURN(MSG_TRANSFER_FAILURE);
364   }
365 }
366
367 /** \ingroup msg_gos_functions
368  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
369  * rate.
370  *
371  * \sa MSG_task_put
372  */
373 MSG_error_t MSG_task_put_bounded(m_task_t task,
374                                  m_host_t dest, m_channel_t channel,
375                                  double max_rate)
376 {
377   MSG_error_t res = MSG_OK;
378   task->simdata->rate=max_rate;
379   res = MSG_task_put(task, dest, channel);
380   task->simdata->rate=-1.0;
381   return(res);
382 }
383
384 /** \ingroup msg_gos_functions
385  * \brief Executes a task and waits for its termination.
386  *
387  * This function is used for describing the behavior of an agent. It
388  * takes only one parameter.
389  * \param task a #m_task_t to execute on the location on which the
390    agent is running.
391  * \return #MSG_FATAL if \a task is not properly initialized and
392  * #MSG_OK otherwise.
393  */
394 MSG_error_t MSG_task_execute(m_task_t task)
395 {
396   m_process_t process = MSG_process_self();
397   MSG_error_t res;
398
399   DEBUG1("Computing on %s", process->simdata->host->name);
400
401   __MSG_task_execute(process, task);
402
403   PAJE_PROCESS_PUSH_STATE(process,"E");  
404   res = __MSG_wait_for_computation(process,task);
405   PAJE_PROCESS_POP_STATE(process);
406   return res;
407 }
408
409 void __MSG_task_execute(m_process_t process, m_task_t task)
410 {
411   simdata_task_t simdata = NULL;
412
413   CHECK_HOST();
414
415   simdata = task->simdata;
416
417   simdata->compute = surf_workstation_resource->extension_public->
418     execute(MSG_process_get_host(process)->simdata->host,
419             simdata->computation_amount);
420   surf_workstation_resource->common_public->
421     set_priority(simdata->compute, simdata->priority);
422
423   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
424 }
425
426 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
427 {
428   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
429   simdata_task_t simdata = task->simdata;
430
431   simdata->using++;
432   do {
433     __MSG_task_wait_event(process, task);
434     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
435   } while (state==SURF_ACTION_RUNNING);
436   simdata->using--;
437     
438
439   if(state == SURF_ACTION_DONE) {
440     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
441       simdata->compute = NULL;
442     simdata->computation_amount = 0.0;
443     MSG_RETURN(MSG_OK);
444   } else if(surf_workstation_resource->extension_public->
445             get_state(MSG_process_get_host(process)->simdata->host) 
446             == SURF_CPU_OFF) {
447     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
448       simdata->compute = NULL;
449     MSG_RETURN(MSG_HOST_FAILURE);
450   } else {
451     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
452       simdata->compute = NULL;
453     MSG_RETURN(MSG_TASK_CANCELLED);
454   }
455 }
456 /** \ingroup m_task_management
457  * \brief Creates a new #m_task_t (a parallel one....).
458  *
459  * A constructor for #m_task_t taking six arguments and returning the 
460    corresponding object.
461  * \param name a name for the object. It is for user-level information
462    and can be NULL.
463  * \param host_nb the number of hosts implied in the parallel task.
464  * \param host_list an array of #host_nb m_host_t.
465  * \param computation_amount an array of #host_nb
466    doubles. computation_amount[i] is the total number of operations
467    that have to be performed on host_list[i].
468  * \param communication_amount an array of #host_nb*#host_nb doubles.
469  * \param data a pointer to any data may want to attach to the new
470    object.  It is for user-level information and can be NULL. It can
471    be retrieved with the function \ref MSG_task_get_data.
472  * \see m_task_t
473  * \return The new corresponding object.
474  */
475 m_task_t MSG_parallel_task_create(const char *name, 
476                                   int host_nb,
477                                   const m_host_t *host_list,
478                                   double *computation_amount,
479                                   double *communication_amount,
480                                   void *data)
481 {
482   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
483   m_task_t task = xbt_new0(s_m_task_t,1);
484   int i;
485
486   /* Task structure */
487   task->name = xbt_strdup(name);
488   task->simdata = simdata;
489   task->data = data;
490
491   /* Simulator Data */
492   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
493   simdata->rate = -1.0;
494   simdata->using = 1;
495   simdata->sender = NULL;
496   simdata->source = NULL;
497   simdata->host_nb = host_nb;
498   
499   simdata->host_list = xbt_new0(void *, host_nb);
500   simdata->comp_amount = computation_amount;
501   simdata->comm_amount = communication_amount;
502
503   for(i=0;i<host_nb;i++)
504     simdata->host_list[i] = host_list[i]->simdata->host;
505
506   return task;
507 }
508
509
510 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
511 {
512   simdata_task_t simdata = NULL;
513
514   CHECK_HOST();
515
516   simdata = task->simdata;
517
518   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
519
520   simdata->compute = surf_workstation_resource->extension_public->
521   execute_parallel_task(task->simdata->host_nb,
522                         task->simdata->host_list,
523                         task->simdata->comp_amount,
524                         task->simdata->comm_amount,
525                         1.0,
526                         -1.0);
527   if(simdata->compute)
528     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
529 }
530
531 MSG_error_t MSG_parallel_task_execute(m_task_t task)
532 {
533   m_process_t process = MSG_process_self();
534   MSG_error_t res;
535
536   DEBUG0("Computing on a tons of guys");
537   
538   __MSG_parallel_task_execute(process, task);
539
540   if(task->simdata->compute)
541     res = __MSG_wait_for_computation(process,task);
542   else 
543     res = MSG_OK;
544
545   return res;  
546 }
547
548
549 /** \ingroup msg_gos_functions
550  * \brief Sleep for the specified number of seconds
551  *
552  * Makes the current process sleep until \a time seconds have elapsed.
553  *
554  * \param nb_sec a number of second
555  */
556 MSG_error_t MSG_process_sleep(double nb_sec)
557 {
558   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
559   m_process_t process = MSG_process_self();
560   m_task_t dummy = NULL;
561   simdata_task_t simdata = NULL;
562
563   CHECK_HOST();
564   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
565   simdata = dummy->simdata;
566
567   simdata->compute = surf_workstation_resource->extension_public->
568     sleep(MSG_process_get_host(process)->simdata->host,
569             simdata->computation_amount);
570   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
571
572   
573   simdata->using++;
574   do {
575     __MSG_task_wait_event(process, dummy);
576     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
577   } while (state==SURF_ACTION_RUNNING);
578   simdata->using--;
579     
580   if(state == SURF_ACTION_DONE) {
581     if(surf_workstation_resource->extension_public->
582        get_state(MSG_process_get_host(process)->simdata->host) 
583        == SURF_CPU_OFF) {
584       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
585         simdata->compute = NULL;
586       MSG_RETURN(MSG_HOST_FAILURE);
587     }
588     if(__MSG_process_isBlocked(process)) {
589       __MSG_process_unblock(MSG_process_self());
590     }
591     if(surf_workstation_resource->extension_public->
592        get_state(MSG_process_get_host(process)->simdata->host) 
593        == SURF_CPU_OFF) {
594       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
595         simdata->compute = NULL;
596       MSG_RETURN(MSG_HOST_FAILURE);
597     }
598     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
599       simdata->compute = NULL;
600     MSG_task_destroy(dummy);
601     MSG_RETURN(MSG_OK);
602   } else MSG_RETURN(MSG_HOST_FAILURE);
603 }
604
605 /** \ingroup msg_gos_functions
606  * \brief Return the number of MSG tasks currently running on a
607  * the host of the current running process.
608  */
609 static int MSG_get_msgload(void) 
610 {
611   m_process_t process;
612    
613   CHECK_HOST();
614   
615   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
616   process = MSG_process_self();
617   return xbt_fifo_size(process->simdata->host->simdata->process_list);
618 }
619
620 /** \ingroup msg_gos_functions
621  *
622  * \brief Return the the last value returned by a MSG function (except
623  * MSG_get_errno...).
624  */
625 MSG_error_t MSG_get_errno(void)
626 {
627   return PROCESS_GET_ERRNO();
628 }