Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
dd652eaa5c4e08bb588421ebe4c6c04f11ef3012
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 /* \ingroup msg_gos_functions
20  * \brief This function is now deprecated and useless. Please stop using it.
21  */
22 MSG_error_t MSG_process_start(m_process_t process)
23 {
24   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
25   
26   return MSG_OK;
27 }
28
29 /** \ingroup msg_gos_functions
30  * \brief Listen on a channel and wait for receiving a task.
31  *
32  * It takes two parameters.
33  * \param task a memory location for storing a #m_task_t. It will
34    hold a task when this function will return. Thus \a task should not
35    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
36    those two condition does not hold, there will be a warning message.
37  * \param channel the channel on which the agent should be
38    listening. This value has to be >=0 and < than the maximal
39    number of channels fixed with MSG_set_channel_number().
40  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
41  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
42  */
43 MSG_error_t MSG_task_get(m_task_t * task,
44                          m_channel_t channel)
45 {
46   return MSG_task_get_with_time_out(task, channel, -1);
47 }
48
49 /** \ingroup msg_gos_functions
50  * \brief Listen on a channel and wait for receiving a task with a timeout.
51  *
52  * It takes three parameters.
53  * \param task a memory location for storing a #m_task_t. It will
54    hold a task when this function will return. Thus \a task should not
55    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
56    those two condition does not hold, there will be a warning message.
57  * \param channel the channel on which the agent should be
58    listening. This value has to be >=0 and < than the maximal
59    number of channels fixed with MSG_set_channel_number().
60  * \param max_duration the maximum time to wait for a task before giving
61     up. In such a case, \a task will not be modified and will still be
62     equal to \c NULL when returning.
63  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
64    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
65  */
66
67 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
68                                        m_channel_t channel,
69                                        double max_duration)
70 {
71   m_process_t process = MSG_process_self();
72   m_task_t t = NULL;
73   m_host_t h = NULL;
74   simdata_task_t t_simdata = NULL;
75   simdata_host_t h_simdata = NULL;
76   int warning = 0;
77   int first_time = 1;
78   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
79   
80   CHECK_HOST();
81   /* Sanity check */
82   xbt_assert0(task,"Null pointer for the task\n");
83
84   if (*task) 
85     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
86
87   /* Get the task */
88   h = MSG_host_self();
89   h_simdata = h->simdata;
90
91   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
92
93   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
94     if(max_duration>0) {
95       if(!first_time) {
96         MSG_RETURN(MSG_OK);
97       }
98     }
99     xbt_assert2(!(h_simdata->sleeping[channel]),
100                 "A process (%s(%d)) is already blocked on this channel",
101                 h_simdata->sleeping[channel]->name,
102                 h_simdata->sleeping[channel]->simdata->PID);
103     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
104     if(max_duration>0) {
105       __MSG_process_block(max_duration);
106     } else {
107       __MSG_process_block(-1);
108     }
109     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
110        == SURF_CPU_OFF)
111       MSG_RETURN(MSG_HOST_FAILURE);
112     h_simdata->sleeping[channel] = NULL;
113     first_time = 0;
114     /* OK, we should both be ready now. Are you there ? */
115   }
116
117   t_simdata = t->simdata;
118   /*   *task = __MSG_task_copy(t); */
119   *task=t;
120
121   /* Transfer */
122   t_simdata->using++;
123
124   t_simdata->comm = surf_workstation_resource->extension_public->
125     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
126                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
127   
128   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
129
130   if(__MSG_process_isBlocked(t_simdata->sender)) 
131     __MSG_process_unblock(t_simdata->sender);
132
133   PAJE_PROCESS_PUSH_STATE(process,"C");  
134
135   do {
136     __MSG_task_wait_event(process, t);
137     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
138   } while (state==SURF_ACTION_RUNNING);
139
140   if(t->simdata->using>1) {
141     xbt_fifo_unshift(msg_global->process_to_run,process);
142     xbt_context_yield();
143   }
144
145   PAJE_PROCESS_POP_STATE(process);
146   PAJE_COMM_STOP(process,t,channel);
147
148   if(state == SURF_ACTION_DONE) {
149     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
150       t_simdata->comm = NULL;
151     MSG_RETURN(MSG_OK);
152   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
153           == SURF_CPU_OFF) {
154     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
155       t_simdata->comm = NULL;
156     MSG_RETURN(MSG_HOST_FAILURE);
157   } else {
158     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
159       t_simdata->comm = NULL;
160     MSG_RETURN(MSG_TRANSFER_FAILURE);
161   }
162 }
163
164 /** \ingroup msg_gos_functions
165  * \brief Test whether there is a pending communication on a channel.
166  *
167  * It takes one parameter.
168  * \param channel the channel on which the agent should be
169    listening. This value has to be >=0 and < than the maximal
170    number of channels fixed with MSG_set_channel_number().
171  * \return 1 if there is a pending communication and 0 otherwise
172  */
173 int MSG_task_Iprobe(m_channel_t channel)
174 {
175   m_host_t h = NULL;
176   simdata_host_t h_simdata = NULL;
177
178   DEBUG2("Probing on channel %d (%s)", channel,h->name);
179   CHECK_HOST();
180   h = MSG_host_self();
181   h_simdata = h->simdata;
182   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
183 }
184
185 /** \ingroup msg_gos_functions
186  * \brief Test whether there is a pending communication on a channel, and who sent it.
187  *
188  * It takes one parameter.
189  * \param channel the channel on which the agent should be
190    listening. This value has to be >=0 and < than the maximal
191    number of channels fixed with MSG_set_channel_number().
192  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
193  */
194 int MSG_task_probe_from(m_channel_t channel)
195 {
196   m_host_t h = NULL;
197   simdata_host_t h_simdata = NULL;
198   xbt_fifo_item_t item;
199   m_task_t t;
200
201   CHECK_HOST();
202   h = MSG_host_self();
203   h_simdata = h->simdata;
204
205   DEBUG2("Probing on channel %d (%s)", channel,h->name);
206    
207   item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
208   if (!item || !(t = xbt_fifo_get_item_content(item)))
209     return -1;
210    
211   return MSG_process_get_PID(t->simdata->sender);
212 }
213
214 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
215                                     int *PID)
216 {
217   m_host_t h = NULL;
218   simdata_host_t h_simdata = NULL;
219   xbt_fifo_item_t item;
220   m_task_t t;
221   int first_time = 1;
222   m_process_t process = MSG_process_self();
223
224   if(PID) {
225     *PID = -1;
226   }
227
228   if(max_duration==0.0) {
229     return MSG_task_probe_from(channel);
230   } else {
231     CHECK_HOST();
232     h = MSG_host_self();
233     h_simdata = h->simdata;
234     
235     DEBUG2("Probing on channel %d (%s)", channel,h->name);
236     while(!(item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]))) {
237       if(max_duration>0) {
238         if(!first_time) {
239           MSG_RETURN(MSG_OK);
240         }
241       }
242       xbt_assert2(!(h_simdata->sleeping[channel]),
243                   "A process (%s(%d)) is already blocked on this channel",
244                   h_simdata->sleeping[channel]->name,
245                   h_simdata->sleeping[channel]->simdata->PID);
246       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
247       if(max_duration>0) {
248         __MSG_process_block(max_duration);
249       } else {
250         __MSG_process_block(-1);
251       }
252       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
253          == SURF_CPU_OFF) {
254         MSG_RETURN(MSG_HOST_FAILURE);
255       }
256       h_simdata->sleeping[channel] = NULL;
257       first_time = 0;
258     }
259     if (!item || !(t = xbt_fifo_get_item_content(item))) {
260       MSG_RETURN(MSG_OK);
261     }
262     if(PID) {
263       *PID = MSG_process_get_PID(t->simdata->sender);
264     }
265     MSG_RETURN(MSG_OK);
266   }
267 }
268 /** \ingroup msg_gos_functions
269  * \brief Put a task on a channel of an host and waits for the end of the
270  * transmission.
271  *
272  * This function is used for describing the behavior of an agent. It
273  * takes three parameter.
274  * \param task a #m_task_t to send on another location. This task
275    will not be usable anymore when the function will return. There is
276    no automatic task duplication and you have to save your parameters
277    before calling this function. Tasks are unique and once it has been
278    sent to another location, you should not access it anymore. You do
279    not need to call MSG_task_destroy() but to avoid using, as an
280    effect of inattention, this task anymore, you definitely should
281    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
282    can be transfered iff it has been correctly created with
283    MSG_task_create().
284  * \param dest the destination of the message
285  * \param channel the channel on which the agent should put this
286    task. This value has to be >=0 and < than the maximal number of
287    channels fixed with MSG_set_channel_number().
288  * \return #MSG_FATAL if \a task is not properly initialized and
289  * #MSG_OK otherwise.
290  */
291 MSG_error_t MSG_task_put(m_task_t task,
292                          m_host_t dest, m_channel_t channel)
293 {
294   m_process_t process = MSG_process_self();
295   simdata_task_t task_simdata = NULL;
296   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
297   m_host_t local_host = NULL;
298   m_host_t remote_host = NULL;
299
300   CHECK_HOST();
301
302   task_simdata = task->simdata;
303   task_simdata->sender = process;
304   xbt_assert0(task_simdata->using==1,"Gargl!");
305   task_simdata->comm = NULL;
306   
307   local_host = ((simdata_process_t) process->simdata)->host;
308   remote_host = dest;
309
310   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
311          task->simdata->message_size,local_host->name, remote_host->name, channel);
312
313   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
314                 mbox[channel], task);
315
316   PAJE_COMM_START(process,task,channel);
317     
318   if(remote_host->simdata->sleeping[channel]) 
319     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
320
321   process->simdata->put_host = dest;
322   process->simdata->put_channel = channel;
323   while(!(task_simdata->comm)) 
324     __MSG_process_block(-1);
325   surf_workstation_resource->common_public->action_use(task_simdata->comm);
326   process->simdata->put_host = NULL;
327   process->simdata->put_channel = -1;
328
329
330   PAJE_PROCESS_PUSH_STATE(process,"C");  
331
332   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
333   while (state==SURF_ACTION_RUNNING) {
334     __MSG_task_wait_event(process, task);
335     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
336   }
337     
338
339   PAJE_PROCESS_POP_STATE(process);  
340
341   if(state == SURF_ACTION_DONE) {
342     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
343       task_simdata->comm = NULL;
344     MSG_task_destroy(task);
345     MSG_RETURN(MSG_OK);
346   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
347             == SURF_CPU_OFF) {
348     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
349       task_simdata->comm = NULL;
350     MSG_task_destroy(task);
351     MSG_RETURN(MSG_HOST_FAILURE);
352   } else { 
353     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
354       task_simdata->comm = NULL;
355     MSG_task_destroy(task);
356     MSG_RETURN(MSG_TRANSFER_FAILURE);
357   }
358 }
359
360 /** \ingroup msg_gos_functions
361  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
362  * rate.
363  *
364  * \sa MSG_task_put
365  */
366 MSG_error_t MSG_task_put_bounded(m_task_t task,
367                                  m_host_t dest, m_channel_t channel,
368                                  double max_rate)
369 {
370   MSG_error_t res = MSG_OK;
371   task->simdata->rate=max_rate;
372   res = MSG_task_put(task, dest, channel);
373   task->simdata->rate=-1.0;
374   return(res);
375 }
376
377 /** \ingroup msg_gos_functions
378  * \brief Executes a task and waits for its termination.
379  *
380  * This function is used for describing the behavior of an agent. It
381  * takes only one parameter.
382  * \param task a #m_task_t to execute on the location on which the
383    agent is running.
384  * \return #MSG_FATAL if \a task is not properly initialized and
385  * #MSG_OK otherwise.
386  */
387 MSG_error_t MSG_task_execute(m_task_t task)
388 {
389   m_process_t process = MSG_process_self();
390   MSG_error_t res;
391
392   DEBUG1("Computing on %s", process->simdata->host->name);
393
394   __MSG_task_execute(process, task);
395
396   PAJE_PROCESS_PUSH_STATE(process,"E");  
397   res = __MSG_wait_for_computation(process,task);
398   PAJE_PROCESS_POP_STATE(process);
399   return res;
400 }
401
402 void __MSG_task_execute(m_process_t process, m_task_t task)
403 {
404   simdata_task_t simdata = NULL;
405
406   CHECK_HOST();
407
408   simdata = task->simdata;
409
410   simdata->compute = surf_workstation_resource->extension_public->
411     execute(MSG_process_get_host(process)->simdata->host,
412             simdata->computation_amount);
413   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
414 }
415
416 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
417 {
418   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
419   simdata_task_t simdata = task->simdata;
420
421   simdata->using++;
422   do {
423     __MSG_task_wait_event(process, task);
424     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
425   } while (state==SURF_ACTION_RUNNING);
426   simdata->using--;
427     
428
429   if(state == SURF_ACTION_DONE) {
430     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
431       simdata->compute = NULL;
432     simdata->computation_amount = 0.0;
433     MSG_RETURN(MSG_OK);
434   } else if(surf_workstation_resource->extension_public->
435             get_state(MSG_process_get_host(process)->simdata->host) 
436             == SURF_CPU_OFF) {
437     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
438       simdata->compute = NULL;
439     MSG_RETURN(MSG_HOST_FAILURE);
440   } else {
441     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
442       simdata->compute = NULL;
443     MSG_RETURN(MSG_TASK_CANCELLED);
444   }
445 }
446 /** \ingroup m_task_management
447  * \brief Creates a new #m_task_t (a parallel one....).
448  *
449  * A constructor for #m_task_t taking six arguments and returning the 
450    corresponding object.
451  * \param name a name for the object. It is for user-level information
452    and can be NULL.
453  * \param host_nb the number of hosts implied in the parallel task.
454  * \param host_list an array of #host_nb m_host_t.
455  * \param computation_amount an array of #host_nb
456    doubles. computation_amount[i] is the total number of operations
457    that have to be performed on host_list[i].
458  * \param communication_amount an array of #host_nb*#host_nb doubles.
459  * \param data a pointer to any data may want to attach to the new
460    object.  It is for user-level information and can be NULL. It can
461    be retrieved with the function \ref MSG_task_get_data.
462  * \see m_task_t
463  * \return The new corresponding object.
464  */
465 m_task_t MSG_parallel_task_create(const char *name, 
466                                   int host_nb,
467                                   const m_host_t *host_list,
468                                   double *computation_amount,
469                                   double *communication_amount,
470                                   void *data)
471 {
472   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
473   m_task_t task = xbt_new0(s_m_task_t,1);
474   int i;
475
476   /* Task structure */
477   task->name = xbt_strdup(name);
478   task->simdata = simdata;
479   task->data = data;
480
481   /* Simulator Data */
482   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
483   simdata->rate = -1.0;
484   simdata->using = 1;
485   simdata->sender = NULL;
486   simdata->host_nb = host_nb;
487   
488   simdata->host_list = xbt_new0(void *, host_nb);
489   simdata->comp_amount = computation_amount;
490   simdata->comm_amount = communication_amount;
491
492   for(i=0;i<host_nb;i++)
493     simdata->host_list[i] = host_list[i]->simdata->host;
494
495   return task;
496 }
497
498
499 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
500 {
501   simdata_task_t simdata = NULL;
502
503   CHECK_HOST();
504
505   simdata = task->simdata;
506
507   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
508
509   simdata->compute = surf_workstation_resource->extension_public->
510   execute_parallel_task(task->simdata->host_nb,
511                         task->simdata->host_list,
512                         task->simdata->comp_amount,
513                         task->simdata->comm_amount,
514                         1.0,
515                         -1.0);
516   if(simdata->compute)
517     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
518 }
519
520 MSG_error_t MSG_parallel_task_execute(m_task_t task)
521 {
522   m_process_t process = MSG_process_self();
523   MSG_error_t res;
524
525   DEBUG0("Computing on a tons of guys");
526   
527   __MSG_parallel_task_execute(process, task);
528
529   if(task->simdata->compute)
530     res = __MSG_wait_for_computation(process,task);
531   else 
532     res = MSG_OK;
533
534   return res;  
535 }
536
537
538 /** \ingroup msg_gos_functions
539  * \brief Sleep for the specified number of seconds
540  *
541  * Makes the current process sleep until \a time seconds have elapsed.
542  *
543  * \param nb_sec a number of second
544  */
545 MSG_error_t MSG_process_sleep(double nb_sec)
546 {
547   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
548   m_process_t process = MSG_process_self();
549   m_task_t dummy = NULL;
550   simdata_task_t simdata = NULL;
551
552   CHECK_HOST();
553   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
554   simdata = dummy->simdata;
555
556   simdata->compute = surf_workstation_resource->extension_public->
557     sleep(MSG_process_get_host(process)->simdata->host,
558             simdata->computation_amount);
559   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
560
561   
562   simdata->using++;
563   do {
564     __MSG_task_wait_event(process, dummy);
565     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
566   } while (state==SURF_ACTION_RUNNING);
567   simdata->using--;
568     
569   if(state == SURF_ACTION_DONE) {
570     if(surf_workstation_resource->extension_public->
571        get_state(MSG_process_get_host(process)->simdata->host) 
572        == SURF_CPU_OFF) {
573       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
574         simdata->compute = NULL;
575       MSG_RETURN(MSG_HOST_FAILURE);
576     }
577     if(__MSG_process_isBlocked(process)) {
578       __MSG_process_unblock(MSG_process_self());
579     }
580     if(surf_workstation_resource->extension_public->
581        get_state(MSG_process_get_host(process)->simdata->host) 
582        == SURF_CPU_OFF) {
583       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
584         simdata->compute = NULL;
585       MSG_RETURN(MSG_HOST_FAILURE);
586     }
587     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
588       simdata->compute = NULL;
589     MSG_task_destroy(dummy);
590     MSG_RETURN(MSG_OK);
591   } else MSG_RETURN(MSG_HOST_FAILURE);
592 }
593
594 /** \ingroup msg_gos_functions
595  * \brief Return the number of MSG tasks currently running on a
596  * the host of the current running process.
597  */
598 static int MSG_get_msgload(void) 
599 {
600   m_process_t process;
601    
602   CHECK_HOST();
603   
604   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
605   process = MSG_process_self();
606   return xbt_fifo_size(process->simdata->host->simdata->process_list);
607 }
608
609 /** \ingroup msg_gos_functions
610  *
611  * \brief Return the the last value returned by a MSG function (except
612  * MSG_get_errno...).
613  */
614 MSG_error_t MSG_get_errno(void)
615 {
616   return PROCESS_GET_ERRNO();
617 }