Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Implemented the MSG_task_get_source() function. (See ChangeLog)
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 /** \ingroup msg_gos_functions
20  * \brief Listen on a channel and wait for receiving a task.
21  *
22  * It takes two parameters.
23  * \param task a memory location for storing a #m_task_t. It will
24    hold a task when this function will return. Thus \a task should not
25    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
26    those two condition does not hold, there will be a warning message.
27  * \param channel the channel on which the agent should be
28    listening. This value has to be >=0 and < than the maximal
29    number of channels fixed with MSG_set_channel_number().
30  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
31  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
32  */
33 MSG_error_t MSG_task_get(m_task_t * task,
34                          m_channel_t channel)
35 {
36   return MSG_task_get_with_time_out(task, channel, -1);
37 }
38
39 /** \ingroup msg_gos_functions
40  * \brief Listen on a channel and wait for receiving a task with a timeout.
41  *
42  * It takes three parameters.
43  * \param task a memory location for storing a #m_task_t. It will
44    hold a task when this function will return. Thus \a task should not
45    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
46    those two condition does not hold, there will be a warning message.
47  * \param channel the channel on which the agent should be
48    listening. This value has to be >=0 and < than the maximal
49    number of channels fixed with MSG_set_channel_number().
50  * \param max_duration the maximum time to wait for a task before giving
51     up. In such a case, \a task will not be modified and will still be
52     equal to \c NULL when returning.
53  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
54    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
55  */
56
57 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
58                                        m_channel_t channel,
59                                        double max_duration)
60 {
61   m_process_t process = MSG_process_self();
62   m_task_t t = NULL;
63   m_host_t h = NULL;
64   simdata_task_t t_simdata = NULL;
65   simdata_host_t h_simdata = NULL;
66   int first_time = 1;
67   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
68   
69   CHECK_HOST();
70   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
71   /* Sanity check */
72   xbt_assert0(task,"Null pointer for the task\n");
73
74   if (*task) 
75     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
76
77   /* Get the task */
78   h = MSG_host_self();
79   h_simdata = h->simdata;
80
81   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
82
83   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
84     if(max_duration>0) {
85       if(!first_time) {
86         MSG_RETURN(MSG_OK);
87       }
88     }
89     xbt_assert2(!(h_simdata->sleeping[channel]),
90                 "A process (%s(%d)) is already blocked on this channel",
91                 h_simdata->sleeping[channel]->name,
92                 h_simdata->sleeping[channel]->simdata->PID);
93     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
94     if(max_duration>0) {
95       __MSG_process_block(max_duration);
96     } else {
97       __MSG_process_block(-1);
98     }
99     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
100        == SURF_CPU_OFF)
101       MSG_RETURN(MSG_HOST_FAILURE);
102     h_simdata->sleeping[channel] = NULL;
103     first_time = 0;
104     /* OK, we should both be ready now. Are you there ? */
105   }
106
107   t_simdata = t->simdata;
108   /*   *task = __MSG_task_copy(t); */
109   *task=t;
110
111   /* Transfer */
112   t_simdata->using++;
113
114   t_simdata->comm = surf_workstation_resource->extension_public->
115     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
116                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
117   
118   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
119
120   if(__MSG_process_isBlocked(t_simdata->sender)) 
121     __MSG_process_unblock(t_simdata->sender);
122
123   PAJE_PROCESS_PUSH_STATE(process,"C");  
124
125   do {
126     __MSG_task_wait_event(process, t);
127     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
128   } while (state==SURF_ACTION_RUNNING);
129
130   if(t->simdata->using>1) {
131     xbt_fifo_unshift(msg_global->process_to_run,process);
132     xbt_context_yield();
133   }
134
135   PAJE_PROCESS_POP_STATE(process);
136   PAJE_COMM_STOP(process,t,channel);
137
138   if(state == SURF_ACTION_DONE) {
139     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
140       t_simdata->comm = NULL;
141     MSG_RETURN(MSG_OK);
142   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
143           == SURF_CPU_OFF) {
144     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
145       t_simdata->comm = NULL;
146     MSG_RETURN(MSG_HOST_FAILURE);
147   } else {
148     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
149       t_simdata->comm = NULL;
150     MSG_RETURN(MSG_TRANSFER_FAILURE);
151   }
152 }
153
154 /** \ingroup msg_gos_functions
155  * \brief Test whether there is a pending communication on a channel.
156  *
157  * It takes one parameter.
158  * \param channel the channel on which the agent should be
159    listening. This value has to be >=0 and < than the maximal
160    number of channels fixed with MSG_set_channel_number().
161  * \return 1 if there is a pending communication and 0 otherwise
162  */
163 int MSG_task_Iprobe(m_channel_t channel)
164 {
165   m_host_t h = NULL;
166   simdata_host_t h_simdata = NULL;
167
168   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
169   DEBUG2("Probing on channel %d (%s)", channel,h->name);
170   CHECK_HOST();
171   h = MSG_host_self();
172   h_simdata = h->simdata;
173   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
174 }
175
176 /** \ingroup msg_gos_functions
177  * \brief Test whether there is a pending communication on a channel, and who sent it.
178  *
179  * It takes one parameter.
180  * \param channel the channel on which the agent should be
181    listening. This value has to be >=0 and < than the maximal
182    number of channels fixed with MSG_set_channel_number().
183  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
184  */
185 int MSG_task_probe_from(m_channel_t channel)
186 {
187   m_host_t h = NULL;
188   simdata_host_t h_simdata = NULL;
189   xbt_fifo_item_t item;
190   m_task_t t;
191
192   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
193   CHECK_HOST();
194   h = MSG_host_self();
195   h_simdata = h->simdata;
196
197   DEBUG2("Probing on channel %d (%s)", channel,h->name);
198    
199   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
200   if (!item || !(t = xbt_fifo_get_item_content(item)))
201     return -1;
202    
203   return MSG_process_get_PID(t->simdata->sender);
204 }
205
206 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
207                                     int *PID)
208 {
209   m_host_t h = NULL;
210   simdata_host_t h_simdata = NULL;
211   xbt_fifo_item_t item;
212   m_task_t t;
213   int first_time = 1;
214   m_process_t process = MSG_process_self();
215
216   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
217   if(PID) {
218     *PID = -1;
219   }
220
221   if(max_duration==0.0) {
222     return MSG_task_probe_from(channel);
223   } else {
224     CHECK_HOST();
225     h = MSG_host_self();
226     h_simdata = h->simdata;
227     
228     DEBUG2("Probing on channel %d (%s)", channel,h->name);
229     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
230       if(max_duration>0) {
231         if(!first_time) {
232           MSG_RETURN(MSG_OK);
233         }
234       }
235       xbt_assert2(!(h_simdata->sleeping[channel]),
236                   "A process (%s(%d)) is already blocked on this channel",
237                   h_simdata->sleeping[channel]->name,
238                   h_simdata->sleeping[channel]->simdata->PID);
239       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
240       if(max_duration>0) {
241         __MSG_process_block(max_duration);
242       } else {
243         __MSG_process_block(-1);
244       }
245       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
246          == SURF_CPU_OFF) {
247         MSG_RETURN(MSG_HOST_FAILURE);
248       }
249       h_simdata->sleeping[channel] = NULL;
250       first_time = 0;
251     }
252     if (!item || !(t = xbt_fifo_get_item_content(item))) {
253       MSG_RETURN(MSG_OK);
254     }
255     if(PID) {
256       *PID = MSG_process_get_PID(t->simdata->sender);
257     }
258     MSG_RETURN(MSG_OK);
259   }
260 }
261 /** \ingroup msg_gos_functions
262  * \brief Put a task on a channel of an host and waits for the end of the
263  * transmission.
264  *
265  * This function is used for describing the behavior of an agent. It
266  * takes three parameter.
267  * \param task a #m_task_t to send on another location. This task
268    will not be usable anymore when the function will return. There is
269    no automatic task duplication and you have to save your parameters
270    before calling this function. Tasks are unique and once it has been
271    sent to another location, you should not access it anymore. You do
272    not need to call MSG_task_destroy() but to avoid using, as an
273    effect of inattention, this task anymore, you definitely should
274    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
275    can be transfered iff it has been correctly created with
276    MSG_task_create().
277  * \param dest the destination of the message
278  * \param channel the channel on which the agent should put this
279    task. This value has to be >=0 and < than the maximal number of
280    channels fixed with MSG_set_channel_number().
281  * \return #MSG_FATAL if \a task is not properly initialized and
282  * #MSG_OK otherwise.
283  */
284 MSG_error_t MSG_task_put(m_task_t task,
285                          m_host_t dest, m_channel_t channel)
286 {
287   m_process_t process = MSG_process_self();
288   simdata_task_t task_simdata = NULL;
289   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
290   m_host_t local_host = NULL;
291   m_host_t remote_host = NULL;
292
293   CHECK_HOST();
294
295   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
296
297   task_simdata = task->simdata;
298   task_simdata->sender = process;
299   task_simdata->source = MSG_process_get_host(process);
300   xbt_assert0(task_simdata->using==1,"Gargl!");
301   task_simdata->comm = NULL;
302   
303   local_host = ((simdata_process_t) process->simdata)->host;
304   remote_host = dest;
305
306   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
307          task->simdata->message_size,local_host->name, remote_host->name, channel);
308
309   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
310                 mbox[channel], task);
311
312   PAJE_COMM_START(process,task,channel);
313     
314   if(remote_host->simdata->sleeping[channel]) 
315     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
316
317   process->simdata->put_host = dest;
318   process->simdata->put_channel = channel;
319   while(!(task_simdata->comm)) 
320     __MSG_process_block(-1);
321   surf_workstation_resource->common_public->action_use(task_simdata->comm);
322   process->simdata->put_host = NULL;
323   process->simdata->put_channel = -1;
324
325
326   PAJE_PROCESS_PUSH_STATE(process,"C");  
327
328   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
329   while (state==SURF_ACTION_RUNNING) {
330     __MSG_task_wait_event(process, task);
331     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
332   }
333     
334
335   PAJE_PROCESS_POP_STATE(process);  
336
337   if(state == SURF_ACTION_DONE) {
338     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
339       task_simdata->comm = NULL;
340     MSG_task_destroy(task);
341     MSG_RETURN(MSG_OK);
342   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
343             == SURF_CPU_OFF) {
344     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
345       task_simdata->comm = NULL;
346     MSG_task_destroy(task);
347     MSG_RETURN(MSG_HOST_FAILURE);
348   } else { 
349     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
350       task_simdata->comm = NULL;
351     MSG_task_destroy(task);
352     MSG_RETURN(MSG_TRANSFER_FAILURE);
353   }
354 }
355
356 /** \ingroup msg_gos_functions
357  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
358  * rate.
359  *
360  * \sa MSG_task_put
361  */
362 MSG_error_t MSG_task_put_bounded(m_task_t task,
363                                  m_host_t dest, m_channel_t channel,
364                                  double max_rate)
365 {
366   MSG_error_t res = MSG_OK;
367   task->simdata->rate=max_rate;
368   res = MSG_task_put(task, dest, channel);
369   task->simdata->rate=-1.0;
370   return(res);
371 }
372
373 /** \ingroup msg_gos_functions
374  * \brief Executes a task and waits for its termination.
375  *
376  * This function is used for describing the behavior of an agent. It
377  * takes only one parameter.
378  * \param task a #m_task_t to execute on the location on which the
379    agent is running.
380  * \return #MSG_FATAL if \a task is not properly initialized and
381  * #MSG_OK otherwise.
382  */
383 MSG_error_t MSG_task_execute(m_task_t task)
384 {
385   m_process_t process = MSG_process_self();
386   MSG_error_t res;
387
388   DEBUG1("Computing on %s", process->simdata->host->name);
389
390   __MSG_task_execute(process, task);
391
392   PAJE_PROCESS_PUSH_STATE(process,"E");  
393   res = __MSG_wait_for_computation(process,task);
394   PAJE_PROCESS_POP_STATE(process);
395   return res;
396 }
397
398 void __MSG_task_execute(m_process_t process, m_task_t task)
399 {
400   simdata_task_t simdata = NULL;
401
402   CHECK_HOST();
403
404   simdata = task->simdata;
405
406   simdata->compute = surf_workstation_resource->extension_public->
407     execute(MSG_process_get_host(process)->simdata->host,
408             simdata->computation_amount);
409   surf_workstation_resource->common_public->
410     set_priority(simdata->compute, simdata->priority);
411
412   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
413 }
414
415 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
416 {
417   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
418   simdata_task_t simdata = task->simdata;
419
420   simdata->using++;
421   do {
422     __MSG_task_wait_event(process, task);
423     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
424   } while (state==SURF_ACTION_RUNNING);
425   simdata->using--;
426     
427
428   if(state == SURF_ACTION_DONE) {
429     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
430       simdata->compute = NULL;
431     simdata->computation_amount = 0.0;
432     MSG_RETURN(MSG_OK);
433   } else if(surf_workstation_resource->extension_public->
434             get_state(MSG_process_get_host(process)->simdata->host) 
435             == SURF_CPU_OFF) {
436     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
437       simdata->compute = NULL;
438     MSG_RETURN(MSG_HOST_FAILURE);
439   } else {
440     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
441       simdata->compute = NULL;
442     MSG_RETURN(MSG_TASK_CANCELLED);
443   }
444 }
445 /** \ingroup m_task_management
446  * \brief Creates a new #m_task_t (a parallel one....).
447  *
448  * A constructor for #m_task_t taking six arguments and returning the 
449    corresponding object.
450  * \param name a name for the object. It is for user-level information
451    and can be NULL.
452  * \param host_nb the number of hosts implied in the parallel task.
453  * \param host_list an array of #host_nb m_host_t.
454  * \param computation_amount an array of #host_nb
455    doubles. computation_amount[i] is the total number of operations
456    that have to be performed on host_list[i].
457  * \param communication_amount an array of #host_nb*#host_nb doubles.
458  * \param data a pointer to any data may want to attach to the new
459    object.  It is for user-level information and can be NULL. It can
460    be retrieved with the function \ref MSG_task_get_data.
461  * \see m_task_t
462  * \return The new corresponding object.
463  */
464 m_task_t MSG_parallel_task_create(const char *name, 
465                                   int host_nb,
466                                   const m_host_t *host_list,
467                                   double *computation_amount,
468                                   double *communication_amount,
469                                   void *data)
470 {
471   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
472   m_task_t task = xbt_new0(s_m_task_t,1);
473   int i;
474
475   /* Task structure */
476   task->name = xbt_strdup(name);
477   task->simdata = simdata;
478   task->data = data;
479
480   /* Simulator Data */
481   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
482   simdata->rate = -1.0;
483   simdata->using = 1;
484   simdata->sender = NULL;
485   simdata->source = NULL;
486   simdata->host_nb = host_nb;
487   
488   simdata->host_list = xbt_new0(void *, host_nb);
489   simdata->comp_amount = computation_amount;
490   simdata->comm_amount = communication_amount;
491
492   for(i=0;i<host_nb;i++)
493     simdata->host_list[i] = host_list[i]->simdata->host;
494
495   return task;
496 }
497
498
499 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
500 {
501   simdata_task_t simdata = NULL;
502
503   CHECK_HOST();
504
505   simdata = task->simdata;
506
507   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
508
509   simdata->compute = surf_workstation_resource->extension_public->
510   execute_parallel_task(task->simdata->host_nb,
511                         task->simdata->host_list,
512                         task->simdata->comp_amount,
513                         task->simdata->comm_amount,
514                         1.0,
515                         -1.0);
516   if(simdata->compute)
517     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
518 }
519
520 MSG_error_t MSG_parallel_task_execute(m_task_t task)
521 {
522   m_process_t process = MSG_process_self();
523   MSG_error_t res;
524
525   DEBUG0("Computing on a tons of guys");
526   
527   __MSG_parallel_task_execute(process, task);
528
529   if(task->simdata->compute)
530     res = __MSG_wait_for_computation(process,task);
531   else 
532     res = MSG_OK;
533
534   return res;  
535 }
536
537
538 /** \ingroup msg_gos_functions
539  * \brief Sleep for the specified number of seconds
540  *
541  * Makes the current process sleep until \a time seconds have elapsed.
542  *
543  * \param nb_sec a number of second
544  */
545 MSG_error_t MSG_process_sleep(double nb_sec)
546 {
547   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
548   m_process_t process = MSG_process_self();
549   m_task_t dummy = NULL;
550   simdata_task_t simdata = NULL;
551
552   CHECK_HOST();
553   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
554   simdata = dummy->simdata;
555
556   simdata->compute = surf_workstation_resource->extension_public->
557     sleep(MSG_process_get_host(process)->simdata->host,
558             simdata->computation_amount);
559   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
560
561   
562   simdata->using++;
563   do {
564     __MSG_task_wait_event(process, dummy);
565     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
566   } while (state==SURF_ACTION_RUNNING);
567   simdata->using--;
568     
569   if(state == SURF_ACTION_DONE) {
570     if(surf_workstation_resource->extension_public->
571        get_state(MSG_process_get_host(process)->simdata->host) 
572        == SURF_CPU_OFF) {
573       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
574         simdata->compute = NULL;
575       MSG_RETURN(MSG_HOST_FAILURE);
576     }
577     if(__MSG_process_isBlocked(process)) {
578       __MSG_process_unblock(MSG_process_self());
579     }
580     if(surf_workstation_resource->extension_public->
581        get_state(MSG_process_get_host(process)->simdata->host) 
582        == SURF_CPU_OFF) {
583       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
584         simdata->compute = NULL;
585       MSG_RETURN(MSG_HOST_FAILURE);
586     }
587     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
588       simdata->compute = NULL;
589     MSG_task_destroy(dummy);
590     MSG_RETURN(MSG_OK);
591   } else MSG_RETURN(MSG_HOST_FAILURE);
592 }
593
594 /** \ingroup msg_gos_functions
595  * \brief Return the number of MSG tasks currently running on a
596  * the host of the current running process.
597  */
598 static int MSG_get_msgload(void) 
599 {
600   m_process_t process;
601    
602   CHECK_HOST();
603   
604   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
605   process = MSG_process_self();
606   return xbt_fifo_size(process->simdata->host->simdata->process_list);
607 }
608
609 /** \ingroup msg_gos_functions
610  *
611  * \brief Return the the last value returned by a MSG function (except
612  * MSG_get_errno...).
613  */
614 MSG_error_t MSG_get_errno(void)
615 {
616   return PROCESS_GET_ERRNO();
617 }