Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
More informative error message
[simgrid.git] / src / simdag / sd_task.c
1 /* Copyright (c) 2007-2009 Da SimGrid Team.  All rights reserved.           */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.h"
7 #include "simdag/simdag.h"
8 #include "xbt/sysdep.h"
9 #include "xbt/dynar.h"
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(sd_task, sd,
12                                 "Logging specific to SimDag (task)");
13
14 static void __SD_task_remove_dependencies(SD_task_t task);
15 static void __SD_task_destroy_scheduling_data(SD_task_t task);
16
17 /**
18  * \brief Creates a new task.
19  *
20  * \param name the name of the task (can be \c NULL)
21  * \param data the user data you want to associate with the task (can be \c NULL)
22  * \param amount amount of the task
23  * \return the new task
24  * \see SD_task_destroy()
25  */
26 SD_task_t SD_task_create(const char *name, void *data, double amount)
27 {
28
29   SD_task_t task;
30   SD_CHECK_INIT_DONE();
31
32   task = xbt_new(s_SD_task_t, 1);
33
34   /* general information */
35   task->data = data;            /* user data */
36   task->name = xbt_strdup(name);
37   task->kind = 0;
38   task->state_hookup.prev = NULL;
39   task->state_hookup.next = NULL;
40   task->state_set = sd_global->not_scheduled_task_set;
41   task->state = SD_NOT_SCHEDULED;
42   xbt_swag_insert(task, task->state_set);
43
44   task->amount = amount;
45   task->remains = amount;
46   task->start_time = -1.0;
47   task->finish_time = -1.0;
48   task->surf_action = NULL;
49   task->watch_points = 0;
50
51   /* dependencies */
52   task->tasks_before = xbt_dynar_new(sizeof(SD_dependency_t), NULL);
53   task->tasks_after = xbt_dynar_new(sizeof(SD_dependency_t), NULL);
54
55   /* scheduling parameters */
56   task->workstation_nb = 0;
57   task->workstation_list = NULL;
58   task->computation_amount = NULL;
59   task->communication_amount = NULL;
60   task->rate = 0;
61
62   sd_global->task_number++;
63
64   return task;
65 }
66
67 /**
68  * \brief Returns the user data of a task
69  *
70  * \param task a task
71  * \return the user data associated with this task (can be \c NULL)
72  * \see SD_task_set_data()
73  */
74 void *SD_task_get_data(SD_task_t task)
75 {
76   SD_CHECK_INIT_DONE();
77   xbt_assert0(task != NULL, "Invalid parameter");
78   return task->data;
79 }
80
81 /**
82  * \brief Sets the user data of a task
83  *
84  * The new data can be \c NULL. The old data should have been freed first
85  * if it was not \c NULL.
86  *
87  * \param task a task
88  * \param data the new data you want to associate with this task
89  * \see SD_task_get_data()
90  */
91 void SD_task_set_data(SD_task_t task, void *data)
92 {
93   SD_CHECK_INIT_DONE();
94   xbt_assert0(task != NULL, "Invalid parameter");
95   task->data = data;
96 }
97
98 /**
99  * \brief Returns the state of a task
100  *
101  * \param task a task
102  * \return the current \ref e_SD_task_state_t "state" of this task:
103  * #SD_NOT_SCHEDULED, #SD_SCHEDULED, #SD_READY, #SD_RUNNING, #SD_DONE or #SD_FAILED
104  * \see e_SD_task_state_t
105  */
106 e_SD_task_state_t SD_task_get_state(SD_task_t task)
107 {
108   SD_CHECK_INIT_DONE();
109   xbt_assert0(task != NULL, "Invalid parameter");
110   return task->state;
111 }
112
113 /* Changes the state of a task. Updates the swags and the flag sd_global->watch_point_reached.
114  */
115 void __SD_task_set_state(SD_task_t task, e_SD_task_state_t new_state)
116 {
117   xbt_swag_remove(task, task->state_set);
118   switch (new_state) {
119   case SD_NOT_SCHEDULED:
120     task->state_set = sd_global->not_scheduled_task_set;
121     break;
122   case SD_SCHEDULED:
123     task->state_set = sd_global->scheduled_task_set;
124     break;
125   case SD_READY:
126     task->state_set = sd_global->ready_task_set;
127     break;
128   case SD_IN_FIFO:
129     task->state_set = sd_global->in_fifo_task_set;
130     break;
131   case SD_RUNNING:
132     task->state_set = sd_global->running_task_set;
133     task->start_time =
134       surf_workstation_model->action_get_start_time(task->surf_action);
135     break;
136   case SD_DONE:
137     task->state_set = sd_global->done_task_set;
138     task->finish_time =
139       surf_workstation_model->action_get_finish_time(task->surf_action);
140     task->remains = 0;
141     break;
142   case SD_FAILED:
143     task->state_set = sd_global->failed_task_set;
144     break;
145   default:
146     xbt_assert0(0, "Invalid state");
147   }
148   xbt_swag_insert(task, task->state_set);
149   task->state = new_state;
150
151   if (task->watch_points & new_state) {
152     INFO1("Watch point reached with task '%s'!", SD_task_get_name(task));
153     sd_global->watch_point_reached = 1;
154     SD_task_unwatch(task, new_state);   /* remove the watch point */
155   }
156 }
157
158 /**
159  * \brief Returns the name of a task
160  *
161  * \param task a task
162  * \return the name of this task (can be \c NULL)
163  */
164 const char *SD_task_get_name(SD_task_t task)
165 {
166   SD_CHECK_INIT_DONE();
167   xbt_assert0(task != NULL, "Invalid parameter");
168   return task->name;
169 }
170
171 /** @brief Returns the dynar of the parents of a task
172  *
173  * \param task a task
174  * \return a newly allocated dynar comprising the parents of this task
175  */
176
177 xbt_dynar_t SD_task_get_parents(SD_task_t task)
178 {
179   unsigned int i;
180   xbt_dynar_t parents;
181   SD_dependency_t dep;
182   SD_CHECK_INIT_DONE();
183   xbt_assert0(task != NULL, "Invalid parameter");
184
185   parents = xbt_dynar_new(sizeof(SD_task_t), NULL);
186   xbt_dynar_foreach(task->tasks_before, i, dep){
187     xbt_dynar_push(parents, &(dep->src));
188   }
189   return parents;
190 }
191
192 /** @brief Returns the dynar of the parents of a task
193  *
194  * \param task a task
195  * \return a newly allocated dynar comprising the parents of this task
196  */
197 xbt_dynar_t SD_task_get_children(SD_task_t task)
198 {
199   unsigned int i;
200   xbt_dynar_t children;
201   SD_dependency_t dep;
202   SD_CHECK_INIT_DONE();
203   xbt_assert0(task != NULL, "Invalid parameter");
204
205   children = xbt_dynar_new(sizeof(SD_task_t), NULL);
206   xbt_dynar_foreach(task->tasks_after, i, dep){
207     xbt_dynar_push(children, &(dep->dst));
208   }
209   return children;
210 }
211
212 /**
213  * \brief Returns the amount of workstations involved in a task
214  *
215  * Only call this on already scheduled tasks!
216  * \param task a task
217  */
218 int SD_task_get_workstation_count(SD_task_t task)
219 {
220   SD_CHECK_INIT_DONE();
221   xbt_assert0(task != NULL, "Invalid parameter");
222   //  xbt_assert1( task->state_set != sd_global->scheduled_task_set, 
223   //           "Unscheduled task %s", task->name);
224   return task->workstation_nb;
225 }
226
227 /**
228  * \brief Returns the list of workstations involved in a task
229  *
230  * Only call this on already scheduled tasks!
231  * \param task a task
232  */
233 SD_workstation_t* SD_task_get_workstation_list(SD_task_t task)
234 {
235   SD_CHECK_INIT_DONE();
236   xbt_assert0(task != NULL, "Invalid parameter");
237   //xbt_assert1( task->state_set != sd_global->scheduled_task_set, 
238   //           "Unscheduled task %s", task->name);
239   return task->workstation_list;
240 }
241
242 /**
243  * \brief Returns the total amount of a task
244  *
245  * \param task a task
246  * \return the total amount of this task
247  * \see SD_task_get_remaining_amount()
248  */
249 double SD_task_get_amount(SD_task_t task)
250 {
251   SD_CHECK_INIT_DONE();
252   xbt_assert0(task != NULL, "Invalid parameter");
253   return task->amount;
254 }
255
256 /**
257  * \brief Returns the remaining amount of a task
258  *
259  * \param task a task
260  * \return the remaining amount of this task
261  * \see SD_task_get_amount()
262  */
263 double SD_task_get_remaining_amount(SD_task_t task)
264 {
265   SD_CHECK_INIT_DONE();
266   xbt_assert0(task != NULL, "Invalid parameter");
267
268   if (task->surf_action)
269     return surf_workstation_model->get_remains(task->surf_action);
270   else
271     return task->remains;
272 }
273
274 int SD_task_get_kind(SD_task_t task) {
275   return task->kind;
276 }
277
278 /** @brief Displays debugging informations about a task */
279 void SD_task_dump(SD_task_t task)
280 {
281   unsigned int counter;
282   SD_dependency_t dependency;
283   char *statename;
284
285   INFO1("Displaying task %s",SD_task_get_name(task));
286   statename=bprintf("%s %s %s %s %s %s %s",
287       (task->state&SD_NOT_SCHEDULED?"not scheduled":""),
288       (task->state&SD_SCHEDULED?"scheduled":""),
289       (task->state&SD_READY?"ready":"not ready"),
290       (task->state&SD_IN_FIFO?"in fifo":""),
291       (task->state&SD_RUNNING?"running":""),
292       (task->state&SD_DONE?"done":""),
293       (task->state&SD_FAILED?"failed":""));
294   INFO1("  - state: %s",statename);
295   free(statename);
296
297   if (task->kind!=0) {
298     switch(task->kind){
299     case SD_TASK_COMM_E2E:
300       INFO0("  - kind: end-to-end communication");
301       break;
302     case SD_TASK_COMP_SEQ:
303       INFO0("  - kind: sequential computation");
304       break;
305     default:
306       INFO1("  - (unknown kind %d)",task->kind);
307     }
308   }
309   INFO1("  - amount: %.0f",SD_task_get_amount(task));
310   if (xbt_dynar_length(task->tasks_before)) {
311     INFO0("  - pre-dependencies:");
312     xbt_dynar_foreach(task->tasks_before,counter,dependency) {
313       INFO1("    %s",SD_task_get_name(dependency->src));
314     }
315   }
316   if (xbt_dynar_length(task->tasks_after)) {
317     INFO0("  - post-dependencies:");
318     xbt_dynar_foreach(task->tasks_after,counter,dependency) {
319       INFO1("    %s",SD_task_get_name(dependency->dst));
320     }
321   }
322 }
323 /** @brief Dumps the task in dotty formalism into the FILE* passed as second argument */
324 void SD_task_dotty(SD_task_t task,void* out) {
325   unsigned int counter;
326   SD_dependency_t dependency;
327   fprintf(out, "  T%ld [label=\"%.20s\"",(unsigned long int)task, task->name);
328   switch(task->kind){
329     case SD_TASK_COMM_E2E:
330       fprintf(out,", shape=box");
331       break;
332     case SD_TASK_COMP_SEQ:
333       fprintf(out,", shape=circle");
334       break;
335   }
336   fprintf(out,"];\n");
337   xbt_dynar_foreach(task->tasks_before,counter,dependency) {
338     fprintf(out," T%p -> T%p;\n",dependency->src, dependency->dst);
339   }
340 }
341
342 /* Destroys a dependency between two tasks.
343  */
344 static void __SD_task_dependency_destroy(void *dependency)
345 {
346   if (((SD_dependency_t) dependency)->name != NULL)
347     xbt_free(((SD_dependency_t) dependency)->name);
348   xbt_free(dependency);
349 }
350
351 /**
352  * \brief Adds a dependency between two tasks
353  *
354  * \a dst will depend on \a src, ie \a dst will not start before \a src is finished.
355  * Their \ref e_SD_task_state_t "state" must be #SD_NOT_SCHEDULED, #SD_SCHEDULED or #SD_READY.
356  *
357  * \param name the name of the new dependency (can be \c NULL)
358  * \param data the user data you want to associate with this dependency (can be \c NULL)
359  * \param src the task which must be executed first
360  * \param dst the task you want to make depend on \a src
361  * \see SD_task_dependency_remove()
362  */
363 void SD_task_dependency_add(const char *name, void *data, SD_task_t src,
364                             SD_task_t dst)
365 {
366   xbt_dynar_t dynar;
367   int length;
368   int found = 0;
369   int i;
370   SD_dependency_t dependency;
371
372   SD_CHECK_INIT_DONE();
373   xbt_assert0(src != NULL && dst != NULL, "Invalid parameter");
374
375   dynar = src->tasks_after;
376   length = xbt_dynar_length(dynar);
377
378   if (src == dst)
379     THROW1(arg_error, 0,
380            "Cannot add a dependency between task '%s' and itself",
381            SD_task_get_name(src));
382
383   if (!__SD_task_is_not_scheduled(src)
384       && !__SD_task_is_scheduled_or_ready(src))
385     THROW1(arg_error, 0,
386            "Task '%s' must be SD_NOT_SCHEDULED, SD_SCHEDULED or SD_READY",
387            SD_task_get_name(src));
388
389   if (!__SD_task_is_not_scheduled(dst)
390       && !__SD_task_is_scheduled_or_ready(dst))
391     THROW1(arg_error, 0,
392            "Task '%s' must be SD_NOT_SCHEDULED, SD_SCHEDULED or SD_READY",
393            SD_task_get_name(dst));
394
395   DEBUG2("SD_task_dependency_add: src = %s, dst = %s", SD_task_get_name(src),
396          SD_task_get_name(dst));
397   for (i = 0; i < length && !found; i++) {
398     xbt_dynar_get_cpy(dynar, i, &dependency);
399     found = (dependency->dst == dst);
400     DEBUG2("Dependency %d: dependency->dst = %s", i,
401            SD_task_get_name(dependency->dst));
402   }
403
404   if (found)
405     THROW2(arg_error, 0,
406            "A dependency already exists between task '%s' and task '%s'",
407            SD_task_get_name(src), SD_task_get_name(dst));
408
409   dependency = xbt_new(s_SD_dependency_t, 1);
410
411   dependency->name = xbt_strdup(name); /* xbt_strdup is cleaver enough to deal with NULL args itself */
412   dependency->data = data;
413   dependency->src = src;
414   dependency->dst = dst;
415
416   /* src must be executed before dst */
417   xbt_dynar_push(src->tasks_after, &dependency);
418   xbt_dynar_push(dst->tasks_before, &dependency);
419
420   /* if the task was ready, then dst->tasks_before is not empty anymore,
421      so we must go back to state SD_SCHEDULED */
422   if (__SD_task_is_ready(dst)) {
423     DEBUG1("SD_task_dependency_add: %s was ready and becomes scheduled!",
424            SD_task_get_name(dst));
425     __SD_task_set_state(dst, SD_SCHEDULED);
426   }
427
428   /*  __SD_print_dependencies(src);
429      __SD_print_dependencies(dst); */
430 }
431
432 /**
433  * \brief Indacates whether there is a dependency between two tasks.
434  *
435  * \param src a task
436  * \param dst a task depending on \a src
437  *
438  * If src is NULL, checks whether dst has any pre-dependency.
439  * If dst is NULL, checks whether src has any post-dependency.
440  */
441 int SD_task_dependency_exists(SD_task_t src, SD_task_t dst)
442 {
443   unsigned int counter;
444   SD_dependency_t dependency;
445
446   SD_CHECK_INIT_DONE();
447   xbt_assert0(src != NULL || dst != NULL, "Invalid parameter: both src and dst are NULL");
448
449   if (src) {
450     if (dst) {
451       xbt_dynar_foreach(src->tasks_after,counter,dependency) {
452         if (dependency->dst == dst)
453           return 1;
454       }
455     } else {
456       return xbt_dynar_length(src->tasks_after);
457     }
458   } else {
459     return xbt_dynar_length(dst->tasks_before);
460   }
461   return 0;
462 }
463
464 /**
465  * \brief Remove a dependency between two tasks
466  *
467  * \param src a task
468  * \param dst a task depending on \a src
469  * \see SD_task_dependency_add()
470  */
471 void SD_task_dependency_remove(SD_task_t src, SD_task_t dst)
472 {
473
474   xbt_dynar_t dynar;
475   int length;
476   int found = 0;
477   int i;
478   SD_dependency_t dependency;
479
480   SD_CHECK_INIT_DONE();
481   xbt_assert0(src != NULL && dst != NULL, "Invalid parameter");
482
483   /* remove the dependency from src->tasks_after */
484   dynar = src->tasks_after;
485   length = xbt_dynar_length(dynar);
486
487   for (i = 0; i < length && !found; i++) {
488     xbt_dynar_get_cpy(dynar, i, &dependency);
489     if (dependency->dst == dst) {
490       xbt_dynar_remove_at(dynar, i, NULL);
491       found = 1;
492     }
493   }
494   if (!found)
495     THROW4(arg_error, 0,
496            "No dependency found between task '%s' and '%s': task '%s' is not a successor of task '%s'",
497            SD_task_get_name(src), SD_task_get_name(dst),
498            SD_task_get_name(dst), SD_task_get_name(src));
499
500   /* remove the dependency from dst->tasks_before */
501   dynar = dst->tasks_before;
502   length = xbt_dynar_length(dynar);
503   found = 0;
504
505   for (i = 0; i < length && !found; i++) {
506     xbt_dynar_get_cpy(dynar, i, &dependency);
507     if (dependency->src == src) {
508       xbt_dynar_remove_at(dynar, i, NULL);
509       __SD_task_dependency_destroy(dependency);
510       found = 1;
511     }
512   }
513   /* should never happen... */
514   xbt_assert4(found,
515               "SimDag error: task '%s' is a successor of '%s' but task '%s' is not a predecessor of task '%s'",
516               SD_task_get_name(dst), SD_task_get_name(src),
517               SD_task_get_name(src), SD_task_get_name(dst));
518
519   /* if the task was scheduled and dst->tasks_before is empty now, we can make it ready */
520   if (xbt_dynar_length(dst->tasks_before) == 0 && __SD_task_is_scheduled(dst))
521     __SD_task_set_state(dst, SD_READY);
522
523   /*  __SD_print_dependencies(src);
524      __SD_print_dependencies(dst); */
525 }
526
527 /**
528  * \brief Returns the user data associated with a dependency between two tasks
529  *
530  * \param src a task
531  * \param dst a task depending on \a src
532  * \return the user data associated with this dependency (can be \c NULL)
533  * \see SD_task_dependency_add()
534  */
535 void *SD_task_dependency_get_data(SD_task_t src, SD_task_t dst)
536 {
537
538   xbt_dynar_t dynar;
539   int length;
540   int found = 0;
541   int i;
542   SD_dependency_t dependency;
543
544
545   SD_CHECK_INIT_DONE();
546   xbt_assert0(src != NULL && dst != NULL, "Invalid parameter");
547
548   dynar = src->tasks_after;
549   length = xbt_dynar_length(dynar);
550
551   for (i = 0; i < length && !found; i++) {
552     xbt_dynar_get_cpy(dynar, i, &dependency);
553     found = (dependency->dst == dst);
554   }
555   if (!found)
556     THROW2(arg_error, 0, "No dependency found between task '%s' and '%s'",
557            SD_task_get_name(src), SD_task_get_name(dst));
558   return dependency->data;
559 }
560
561 /* temporary function for debugging */
562 static void __SD_print_watch_points(SD_task_t task)
563 {
564   static const int state_masks[] =
565     { SD_SCHEDULED, SD_RUNNING, SD_READY, SD_DONE, SD_FAILED };
566   static const char *state_names[] =
567     { "scheduled", "running", "ready", "done", "failed" };
568   int i;
569
570   INFO2("Task '%s' watch points (%x): ", SD_task_get_name(task),
571         task->watch_points);
572
573
574   for (i = 0; i < 5; i++) {
575     if (task->watch_points & state_masks[i])
576       INFO1("%s ", state_names[i]);
577   }
578 }
579
580 /**
581  * \brief Adds a watch point to a task
582  *
583  * SD_simulate() will stop as soon as the \ref e_SD_task_state_t "state" of this
584  * task becomes the one given in argument. The
585  * watch point is then automatically removed.
586  *
587  * \param task a task
588  * \param state the \ref e_SD_task_state_t "state" you want to watch
589  * (cannot be #SD_NOT_SCHEDULED)
590  * \see SD_task_unwatch()
591  */
592 void SD_task_watch(SD_task_t task, e_SD_task_state_t state)
593 {
594   SD_CHECK_INIT_DONE();
595   xbt_assert0(task != NULL, "Invalid parameter");
596
597   if (state & SD_NOT_SCHEDULED)
598     THROW0(arg_error, 0,
599            "Cannot add a watch point for state SD_NOT_SCHEDULED");
600
601   task->watch_points = task->watch_points | state;
602   /*  __SD_print_watch_points(task); */
603 }
604
605 /**
606  * \brief Removes a watch point from a task
607  *
608  * \param task a task
609  * \param state the \ref e_SD_task_state_t "state" you no longer want to watch
610  * \see SD_task_watch()
611  */
612 void SD_task_unwatch(SD_task_t task, e_SD_task_state_t state)
613 {
614   SD_CHECK_INIT_DONE();
615   xbt_assert0(task != NULL, "Invalid parameter");
616   xbt_assert0(state != SD_NOT_SCHEDULED,
617               "SimDag error: Cannot have a watch point for state SD_NOT_SCHEDULED");
618
619   task->watch_points = task->watch_points & ~state;
620   /*  __SD_print_watch_points(task); */
621 }
622
623 /**
624  * \brief Returns an approximative estimation of the execution time of a task.
625  *
626  * The estimation is very approximative because the value returned is the time
627  * the task would take if it was executed now and if it was the only task.
628  *
629  * \param task the task to evaluate
630  * \param workstation_nb number of workstations on which the task would be executed
631  * \param workstation_list the workstations on which the task would be executed
632  * \param computation_amount computation amount for each workstation
633  * \param communication_amount communication amount between each pair of workstations
634  * \param rate task execution speed rate
635  * \see SD_schedule()
636  */
637 double SD_task_get_execution_time(SD_task_t task,
638                                   int workstation_nb,
639                                   const SD_workstation_t * workstation_list,
640                                   const double *computation_amount,
641                                   const double *communication_amount,
642                                   double rate)
643 {
644   double time, max_time = 0.0;
645   int i, j;
646   SD_CHECK_INIT_DONE();
647   xbt_assert0(task != NULL && workstation_nb > 0 && workstation_list != NULL
648               && computation_amount != NULL
649               && communication_amount != NULL, "Invalid parameter");
650
651   /* the task execution time is the maximum execution time of the parallel tasks */
652
653   for (i = 0; i < workstation_nb; i++) {
654     time =
655       SD_workstation_get_computation_time(workstation_list[i],
656                                           computation_amount[i]);
657
658     for (j = 0; j < workstation_nb; j++) {
659       time +=
660         SD_route_get_communication_time(workstation_list[i],
661                                         workstation_list[j],
662                                         communication_amount[i *
663                                                              workstation_nb +
664                                                              j]);
665     }
666
667     if (time > max_time) {
668       max_time = time;
669     }
670   }
671   return max_time * SD_task_get_amount(task);
672 }
673 static inline void SD_task_do_schedule(SD_task_t task) {
674   SD_CHECK_INIT_DONE();
675
676    if (!__SD_task_is_not_scheduled(task))
677      THROW1(arg_error, 0, "Task '%s' has already been scheduled",
678             SD_task_get_name(task));
679
680  /* update the task state */
681   if (xbt_dynar_length(task->tasks_before) == 0)
682     __SD_task_set_state(task, SD_READY);
683   else
684     __SD_task_set_state(task, SD_SCHEDULED);
685 }
686
687 /**
688  * \brief Schedules a task
689  *
690  * The task state must be #SD_NOT_SCHEDULED.
691  * Once scheduled, a task will be executed as soon as possible in SD_simulate(),
692  * i.e. when its dependencies are satisfied.
693  *
694  * \param task the task you want to schedule
695  * \param workstation_nb number of workstations on which the task will be executed
696  * \param workstation_list the workstations on which the task will be executed
697  * \param computation_amount computation amount for each workstation
698  * \param communication_amount communication amount between each pair of workstations
699  * \param rate task execution speed rate
700  * \see SD_task_unschedule()
701  */
702 void SD_task_schedule(SD_task_t task, int workstation_count,
703                       const SD_workstation_t * workstation_list,
704                       const double *computation_amount,
705                       const double *communication_amount, double rate)
706 {
707   xbt_assert0(workstation_count > 0, "workstation_nb must be positive");
708
709   int communication_nb;
710
711   task->workstation_nb = workstation_count;
712   task->rate = rate;
713
714   task->computation_amount = xbt_new(double, workstation_count);
715   memcpy(task->computation_amount, computation_amount,
716          sizeof(double) * workstation_count);
717
718   communication_nb = workstation_count * workstation_count;
719   task->communication_amount = xbt_new(double, communication_nb);
720   memcpy(task->communication_amount, communication_amount,
721          sizeof(double) * communication_nb);
722
723   task->workstation_list = xbt_new(SD_workstation_t, workstation_count);
724   memcpy(task->workstation_list, workstation_list,
725          sizeof(SD_workstation_t) * workstation_count);
726
727   SD_task_do_schedule(task);
728 }
729 /**
730  * \brief Unschedules a task
731  *
732  * The task state must be #SD_SCHEDULED, #SD_READY, #SD_RUNNING or #SD_FAILED.
733  * If you call this function, the task state becomes #SD_NOT_SCHEDULED.
734  * Call SD_task_schedule() to schedule it again.
735  *
736  * \param task the task you want to unschedule
737  * \see SD_task_schedule()
738  */
739 void SD_task_unschedule(SD_task_t task)
740 {
741   SD_CHECK_INIT_DONE();
742   xbt_assert0(task != NULL, "Invalid parameter");
743
744   if (task->state_set != sd_global->scheduled_task_set &&
745       task->state_set != sd_global->ready_task_set &&
746       task->state_set != sd_global->running_task_set &&
747       task->state_set != sd_global->failed_task_set)
748     THROW1(arg_error, 0,
749            "Task %s: the state must be SD_SCHEDULED, SD_READY, SD_RUNNING or SD_FAILED",
750            SD_task_get_name(task));
751
752   if (__SD_task_is_scheduled_or_ready(task))    /* if the task is scheduled or ready */
753     __SD_task_destroy_scheduling_data(task);
754
755   if (__SD_task_is_running(task))       /* the task should become SD_FAILED */
756     surf_workstation_model->action_cancel(task->surf_action);
757   else
758     __SD_task_set_state(task, SD_NOT_SCHEDULED);
759   task->remains = task->amount;
760   task->start_time = -1.0;
761 }
762
763 /* Destroys the data memorised by SD_task_schedule. Task state must be SD_SCHEDULED or SD_READY.
764  */
765 static void __SD_task_destroy_scheduling_data(SD_task_t task)
766 {
767   SD_CHECK_INIT_DONE();
768   if (!__SD_task_is_scheduled_or_ready(task) && !__SD_task_is_in_fifo(task))
769     THROW1(arg_error, 0,
770            "Task '%s' must be SD_SCHEDULED, SD_READY or SD_IN_FIFO",
771            SD_task_get_name(task));
772
773   xbt_free(task->computation_amount);
774   xbt_free(task->communication_amount);
775 }
776
777 /* Runs a task. This function is directly called by __SD_task_try_to_run if the task
778  * doesn't have to wait in fifos. Otherwise, it is called by __SD_task_just_done when
779  * the task gets out of its fifos.
780  */
781 void __SD_task_really_run(SD_task_t task)
782 {
783
784   int i;
785   void **surf_workstations;
786
787   SD_CHECK_INIT_DONE();
788   xbt_assert0(task != NULL, "Invalid parameter");
789   xbt_assert2(__SD_task_is_ready_or_in_fifo(task),
790               "Task '%s' is not ready or in a fifo! Task state: %d",
791               SD_task_get_name(task), SD_task_get_state(task));
792   xbt_assert1(task->workstation_list != NULL,
793               "Task '%s': workstation_list is NULL!", SD_task_get_name(task));
794
795
796
797   DEBUG1("Really running task '%s'", SD_task_get_name(task));
798
799   /* set this task as current task for the workstations in sequential mode */
800   for (i = 0; i < task->workstation_nb; i++) {
801     if (SD_workstation_get_access_mode(task->workstation_list[i]) ==
802         SD_WORKSTATION_SEQUENTIAL_ACCESS) {
803       task->workstation_list[i]->current_task = task;
804       xbt_assert0(__SD_workstation_is_busy(task->workstation_list[i]),
805                   "The workstation should be busy now");
806     }
807   }
808
809   DEBUG1("Task '%s' set as current task for its workstations",
810          SD_task_get_name(task));
811
812   /* start the task */
813
814   /* we have to create a Surf workstation array instead of the SimDag workstation array */
815   surf_workstations = xbt_new(void *, task->workstation_nb);
816
817   for (i = 0; i < task->workstation_nb; i++) {
818     surf_workstations[i] = task->workstation_list[i]->surf_workstation;
819   }
820
821   task->surf_action = NULL;
822   if ((task->workstation_nb == 1) && (task->communication_amount[0] == 0.0)) {
823     task->surf_action =
824       surf_workstation_model->extension.
825       workstation.execute(surf_workstations[0], task->computation_amount[0]);
826   } else if ((task->workstation_nb == 1)
827              && (task->computation_amount[0] == 0.0)) {
828     task->surf_action =
829       surf_workstation_model->extension.
830       workstation.communicate(surf_workstations[0], surf_workstations[0],
831                               task->communication_amount[0], task->rate);
832   } else if ((task->workstation_nb == 2)
833              && (task->computation_amount[0] == 0.0)
834              && (task->computation_amount[1] == 0.0)) {
835     int nb = 0;
836     double value = 0.0;
837
838     for (i = 0; i < task->workstation_nb * task->workstation_nb; i++) {
839       if (task->communication_amount[i] > 0.0) {
840         nb++;
841         value = task->communication_amount[i];
842       }
843     }
844     if (nb == 1) {
845       task->surf_action =
846         surf_workstation_model->extension.
847         workstation.communicate(surf_workstations[0], surf_workstations[1],
848                                 value, task->rate);
849     }
850   }
851   if (!task->surf_action) {
852     double *computation_amount = xbt_new(double, task->workstation_nb);
853     double *communication_amount = xbt_new(double, task->workstation_nb *
854                                            task->workstation_nb);
855
856     memcpy(computation_amount, task->computation_amount, sizeof(double) *
857            task->workstation_nb);
858     memcpy(communication_amount, task->communication_amount,
859            sizeof(double) * task->workstation_nb * task->workstation_nb);
860
861     task->surf_action =
862       surf_workstation_model->extension.
863       workstation.execute_parallel_task(task->workstation_nb,
864                                         surf_workstations, computation_amount,
865                                         communication_amount, task->amount,
866                                         task->rate);
867   } else {
868     xbt_free(surf_workstations);
869   }
870
871   surf_workstation_model->action_data_set(task->surf_action, task);
872
873   DEBUG1("surf_action = %p", task->surf_action);
874
875   __SD_task_destroy_scheduling_data(task);      /* now the scheduling data are not useful anymore */
876   __SD_task_set_state(task, SD_RUNNING);
877   xbt_assert2(__SD_task_is_running(task), "Bad state of task '%s': %d",
878               SD_task_get_name(task), SD_task_get_state(task));
879
880 }
881
882 /* Tries to run a task. This function is called by SD_simulate() when a scheduled task becomes SD_READY
883  * (ie when its dependencies are satisfied).
884  * If one of the workstations where the task is scheduled on is busy (in sequential mode),
885  * the task doesn't start.
886  * Returns whether the task has started.
887  */
888 int __SD_task_try_to_run(SD_task_t task)
889 {
890
891   int can_start = 1;
892   int i;
893   SD_workstation_t workstation;
894
895   SD_CHECK_INIT_DONE();
896   xbt_assert0(task != NULL, "Invalid parameter");
897   xbt_assert2(__SD_task_is_ready(task),
898               "Task '%s' is not ready! Task state: %d",
899               SD_task_get_name(task), SD_task_get_state(task));
900
901
902   for (i = 0; i < task->workstation_nb; i++) {
903     can_start = !__SD_workstation_is_busy(task->workstation_list[i]);
904   }
905
906   DEBUG2("Task '%s' can start: %d", SD_task_get_name(task), can_start);
907
908   if (!can_start) {             /* if the task cannot start and is not in the fifos yet */
909     for (i = 0; i < task->workstation_nb; i++) {
910       workstation = task->workstation_list[i];
911       if (workstation->access_mode == SD_WORKSTATION_SEQUENTIAL_ACCESS) {
912         DEBUG2("Pushing task '%s' in the fifo of workstation '%s'",
913                SD_task_get_name(task), SD_workstation_get_name(workstation));
914         xbt_fifo_push(workstation->task_fifo, task);
915       }
916     }
917     __SD_task_set_state(task, SD_IN_FIFO);
918     xbt_assert2(__SD_task_is_in_fifo(task), "Bad state of task '%s': %d",
919                 SD_task_get_name(task), SD_task_get_state(task));
920     DEBUG1("Task '%s' state is now SD_IN_FIFO", SD_task_get_name(task));
921   } else {
922     __SD_task_really_run(task);
923   }
924
925   return can_start;
926 }
927
928 /* This function is called by SD_simulate when a task is done.
929  * It updates task->state and task->action and executes if necessary the tasks
930  * which were waiting in fifos for the end of `task'
931  */
932 void __SD_task_just_done(SD_task_t task)
933 {
934   int i, j;
935   SD_workstation_t workstation;
936
937   SD_task_t candidate;
938   int candidate_nb = 0;
939   int candidate_capacity = 8;
940   SD_task_t *candidates;
941   int can_start = 1;
942
943   SD_CHECK_INIT_DONE();
944   xbt_assert0(task != NULL, "Invalid parameter");
945   xbt_assert1(__SD_task_is_running(task),
946               "The task must be running! Task state: %d",
947               SD_task_get_state(task));
948   xbt_assert1(task->workstation_list != NULL,
949               "Task '%s': workstation_list is NULL!", SD_task_get_name(task));
950
951
952   candidates = xbt_new(SD_task_t, 8);
953
954   __SD_task_set_state(task, SD_DONE);
955   surf_workstation_model->action_unref(task->surf_action);
956   task->surf_action = NULL;
957
958   DEBUG0("Looking for candidates");
959
960   /* if the task was executed on sequential workstations,
961      maybe we can execute the next task of the fifo for each workstation */
962   for (i = 0; i < task->workstation_nb; i++) {
963     workstation = task->workstation_list[i];
964     DEBUG2("Workstation '%s': access_mode = %d",
965            SD_workstation_get_name(workstation), workstation->access_mode);
966     if (workstation->access_mode == SD_WORKSTATION_SEQUENTIAL_ACCESS) {
967       xbt_assert1(workstation->task_fifo != NULL,
968                   "Workstation '%s' has sequential access but no fifo!",
969                   SD_workstation_get_name(workstation));
970       xbt_assert2(workstation->current_task =
971                   task, "Workstation '%s': current task should be '%s'",
972                   SD_workstation_get_name(workstation),
973                   SD_task_get_name(task));
974
975       /* the task is over so we can release the workstation */
976       workstation->current_task = NULL;
977
978       DEBUG0("Getting candidate in fifo");
979       candidate =
980         xbt_fifo_get_item_content(xbt_fifo_get_first_item
981                                   (workstation->task_fifo));
982
983       if (candidate != NULL) {
984         DEBUG1("Candidate: '%s'", SD_task_get_name(candidate));
985         xbt_assert2(__SD_task_is_in_fifo(candidate),
986                     "Bad state of candidate '%s': %d",
987                     SD_task_get_name(candidate),
988                     SD_task_get_state(candidate));
989       }
990
991       DEBUG1("Candidate in fifo: %p", candidate);
992
993       /* if there was a task waiting for my place */
994       if (candidate != NULL) {
995         /* Unfortunately, we are not sure yet that we can execute the task now,
996            because the task can be waiting more deeply in some other workstation's fifos...
997            So we memorize all candidate tasks, and then we will check for each candidate
998            whether or not all its workstations are available. */
999
1000         /* realloc if necessary */
1001         if (candidate_nb == candidate_capacity) {
1002           candidate_capacity *= 2;
1003           candidates =
1004             xbt_realloc(candidates, sizeof(SD_task_t) * candidate_capacity);
1005         }
1006
1007         /* register the candidate */
1008         candidates[candidate_nb++] = candidate;
1009         candidate->fifo_checked = 0;
1010       }
1011     }
1012   }
1013
1014   DEBUG1("Candidates found: %d", candidate_nb);
1015
1016   /* now we check every candidate task */
1017   for (i = 0; i < candidate_nb; i++) {
1018     candidate = candidates[i];
1019
1020     if (candidate->fifo_checked) {
1021       continue;                 /* we have already evaluated that task */
1022     }
1023
1024     xbt_assert2(__SD_task_is_in_fifo(candidate),
1025                 "Bad state of candidate '%s': %d",
1026                 SD_task_get_name(candidate), SD_task_get_state(candidate));
1027
1028     for (j = 0; j < candidate->workstation_nb && can_start; j++) {
1029       workstation = candidate->workstation_list[j];
1030
1031       /* I can start on this workstation if the workstation is shared
1032          or if I am the first task in the fifo */
1033       can_start = workstation->access_mode == SD_WORKSTATION_SHARED_ACCESS ||
1034         candidate ==
1035         xbt_fifo_get_item_content(xbt_fifo_get_first_item
1036                                   (workstation->task_fifo));
1037     }
1038
1039     DEBUG2("Candidate '%s' can start: %d", SD_task_get_name(candidate),
1040            can_start);
1041
1042     /* now we are sure that I can start! */
1043     if (can_start) {
1044       for (j = 0; j < candidate->workstation_nb && can_start; j++) {
1045         workstation = candidate->workstation_list[j];
1046
1047         /* update the fifo */
1048         if (workstation->access_mode == SD_WORKSTATION_SEQUENTIAL_ACCESS) {
1049           candidate = xbt_fifo_shift(workstation->task_fifo);   /* the return value is stored just for debugging */
1050           DEBUG1("Head of the fifo: '%s'",
1051                  (candidate != NULL) ? SD_task_get_name(candidate) : "NULL");
1052           xbt_assert0(candidate == candidates[i],
1053                       "Error in __SD_task_just_done: bad first task in the fifo");
1054         }
1055       }                         /* for each workstation */
1056
1057       /* finally execute the task */
1058       DEBUG2("Task '%s' state: %d", SD_task_get_name(candidate),
1059              SD_task_get_state(candidate));
1060       __SD_task_really_run(candidate);
1061
1062       DEBUG4
1063         ("Calling __SD_task_is_running: task '%s', state set: %p, running_task_set: %p, is running: %d",
1064          SD_task_get_name(candidate), candidate->state_set,
1065          sd_global->running_task_set, __SD_task_is_running(candidate));
1066       xbt_assert2(__SD_task_is_running(candidate),
1067                   "Bad state of task '%s': %d", SD_task_get_name(candidate),
1068                   SD_task_get_state(candidate));
1069       DEBUG0("Okay, the task is running.");
1070
1071     }                           /* can start */
1072     candidate->fifo_checked = 1;
1073   }                             /* for each candidate */
1074
1075   xbt_free(candidates);
1076 }
1077
1078 /* Remove all dependencies associated with a task. This function is called when the task is destroyed.
1079  */
1080 static void __SD_task_remove_dependencies(SD_task_t task)
1081 {
1082   /* we must destroy the dependencies carefuly (with SD_dependency_remove)
1083      because each one is stored twice */
1084   SD_dependency_t dependency;
1085   while (xbt_dynar_length(task->tasks_before) > 0) {
1086     xbt_dynar_get_cpy(task->tasks_before, 0, &dependency);
1087     SD_task_dependency_remove(dependency->src, dependency->dst);
1088   }
1089
1090   while (xbt_dynar_length(task->tasks_after) > 0) {
1091     xbt_dynar_get_cpy(task->tasks_after, 0, &dependency);
1092     SD_task_dependency_remove(dependency->src, dependency->dst);
1093   }
1094 }
1095
1096 /**
1097  * \brief Returns the start time of a task
1098  *
1099  * The task state must be SD_RUNNING, SD_DONE or SD_FAILED.
1100  *
1101  * \param task: a task
1102  * \return the start time of this task
1103  */
1104 double SD_task_get_start_time(SD_task_t task)
1105 {
1106   SD_CHECK_INIT_DONE();
1107   xbt_assert0(task != NULL, "Invalid parameter");
1108   if (task->surf_action)
1109     return surf_workstation_model->action_get_start_time(task->surf_action);
1110   else
1111     return task->start_time;
1112 }
1113
1114 /**
1115  * \brief Returns the finish time of a task
1116  *
1117  * The task state must be SD_RUNNING, SD_DONE or SD_FAILED.
1118  * If the state is not completed yet, the returned value is an
1119  * estimation of the task finish time. This value can fluctuate
1120  * until the task is completed.
1121  *
1122  * \param task: a task
1123  * \return the start time of this task
1124  */
1125 double SD_task_get_finish_time(SD_task_t task)
1126 {
1127   SD_CHECK_INIT_DONE();
1128   xbt_assert0(task != NULL, "Invalid parameter");
1129
1130   if (task->surf_action)        /* should never happen as actions are destroyed right after their completion */
1131     return surf_workstation_model->action_get_finish_time(task->surf_action);
1132   else
1133     return task->finish_time;
1134 }
1135
1136 /**
1137  * \brief Destroys a task.
1138  *
1139  * The user data (if any) should have been destroyed first.
1140  *
1141  * \param task the task you want to destroy
1142  * \see SD_task_create()
1143  */
1144 void SD_task_destroy(SD_task_t task)
1145 {
1146   SD_CHECK_INIT_DONE();
1147   xbt_assert0(task != NULL, "Invalid parameter");
1148
1149   DEBUG1("Destroying task %s...", SD_task_get_name(task));
1150
1151   __SD_task_remove_dependencies(task);
1152   /* if the task was scheduled or ready we have to free the scheduling parameters */
1153   if (__SD_task_is_scheduled_or_ready(task))
1154     __SD_task_destroy_scheduling_data(task);
1155   xbt_swag_remove(task,task->state_set);
1156
1157   if (task->name != NULL)
1158     xbt_free(task->name);
1159
1160   if (task->surf_action != NULL)
1161     surf_workstation_model->action_unref(task->surf_action);
1162
1163   if (task->workstation_list != NULL)
1164     xbt_free(task->workstation_list);
1165
1166   xbt_dynar_free(&task->tasks_before);
1167   xbt_dynar_free(&task->tasks_after);
1168   xbt_free(task);
1169
1170   sd_global->task_number--;
1171
1172   DEBUG0("Task destroyed.");
1173 }
1174
1175
1176 static inline SD_task_t SD_task_create_sized(const char*name,void*data,double amount,int ws_count) {
1177   SD_task_t task = SD_task_create(name,data,amount);
1178   task->communication_amount = xbt_new0(double,ws_count*ws_count);
1179   task->computation_amount = xbt_new0(double,ws_count);
1180   task->workstation_nb = ws_count;
1181   task->workstation_list = xbt_new0(SD_workstation_t,ws_count);
1182   return task;
1183 }
1184 /** @brief create a end-to-end communication task that can then be auto-scheduled
1185  *
1186  * Auto-scheduling mean that the task can be used with SD_task_schedulev(). This
1187  * allows to specify the task costs at creation, and decorelate them from the
1188  * scheduling process where you just specify which resource should deliver the
1189  * mandatory power.
1190  *
1191  * A end-to-end communication must be scheduled on 2 hosts, and the amount
1192  * specified at creation is sent from hosts[0] to hosts[1].
1193  */
1194 SD_task_t SD_task_create_comm_e2e(const char*name, void *data, double amount) {
1195   SD_task_t res = SD_task_create_sized(name,data,amount,2);
1196   res->communication_amount[2] = amount;
1197   res->kind=SD_TASK_COMM_E2E;
1198   return res;
1199 }
1200 /** @brief create a sequential computation task that can then be auto-scheduled
1201  *
1202  * Auto-scheduling mean that the task can be used with SD_task_schedulev(). This
1203  * allows to specify the task costs at creation, and decorelate them from the
1204  * scheduling process where you just specify which resource should deliver the
1205  * mandatory power.
1206  *
1207  * A sequential computation must be scheduled on 1 host, and the amount
1208  * specified at creation to be run on hosts[0].
1209  */
1210 SD_task_t SD_task_create_comp_seq(const char*name, void *data, double amount) {
1211   SD_task_t res = SD_task_create_sized(name,data,amount,1);
1212   res->computation_amount[0]=amount;
1213   res->kind=SD_TASK_COMP_SEQ;
1214   return res;
1215 }
1216
1217 /** @brief Auto-schedules a task.
1218  *
1219  * Auto-scheduling mean that the task can be used with SD_task_schedulev(). This
1220  * allows to specify the task costs at creation, and decorelate them from the
1221  * scheduling process where you just specify which resource should deliver the
1222  * mandatory power.
1223  *
1224  * To be auto-schedulable, a task must be created with SD_task_create_comm_e2e() or
1225  * SD_task_create_comp_seq(). Check their definitions for the exact semantic of each
1226  * of them.
1227  *
1228  * @todo
1229  * We should create tasks kind for the following categories:
1230  *  - Point to point communication (done)
1231  *  - Sequential computation       (done)
1232  *  - group communication (redistribution, several kinds)
1233  *  - parallel tasks with no internal communication (one kind per speedup model such as amdal)
1234  *  - idem+ internal communication. Task type not enough since we cannot store comm cost alongside to comp one)
1235  */
1236 void SD_task_schedulev(SD_task_t task, int count, const SD_workstation_t*list) {
1237   int i;
1238   xbt_assert1(task->kind != 0,"Task %s is not typed. Cannot automatically schedule it.",SD_task_get_name(task));
1239   switch(task->kind) {
1240   case SD_TASK_COMM_E2E:
1241   case SD_TASK_COMP_SEQ:
1242     xbt_assert(task->workstation_nb==count);
1243     for (i=0;i<count;i++)
1244       task->workstation_list[i]=list[i];
1245     SD_task_do_schedule(task);
1246     break;
1247   default:
1248     xbt_die(bprintf("Kind of task %s not supported by SD_task_schedulev()",
1249           SD_task_get_name(task)));
1250   }
1251   if (task->kind == SD_TASK_COMM_E2E) {
1252     VERB4("Schedule comm task %s between %s -> %s. It costs %.f bytes",
1253         SD_task_get_name(task),
1254         SD_workstation_get_name(task->workstation_list[0]),SD_workstation_get_name(task->workstation_list[1]),
1255         task->communication_amount[2]);
1256
1257   }
1258   /* Iterate over all childs and parent being COMM_E2E to say where I am located (and start them if ready) */
1259   if (task->kind == SD_TASK_COMP_SEQ) {
1260     VERB3("Schedule computation task %s on %s. It costs %.f flops",
1261         SD_task_get_name(task),SD_workstation_get_name(task->workstation_list[0]),
1262         task->computation_amount[0]);
1263     SD_dependency_t dep;
1264     unsigned int cpt;
1265     xbt_dynar_foreach(task->tasks_before,cpt,dep) {
1266       SD_task_t before = dep->src;
1267       if (before->kind == SD_TASK_COMM_E2E) {
1268         before->workstation_list[1] = task->workstation_list[0];
1269         if (before->workstation_list[0] && __SD_task_is_not_scheduled(before)) {
1270           SD_task_do_schedule(before);
1271           VERB4("Auto-Schedule comm task %s between %s -> %s. It costs %.f bytes",
1272               SD_task_get_name(before),
1273               SD_workstation_get_name(before->workstation_list[0]),SD_workstation_get_name(before->workstation_list[1]),
1274               before->communication_amount[2]);
1275         }
1276       }
1277     }
1278     xbt_dynar_foreach(task->tasks_after,cpt,dep) {
1279       SD_task_t after = dep->dst;
1280       if (after->kind == SD_TASK_COMM_E2E) {
1281         after->workstation_list[0] = task->workstation_list[0];
1282         if (after->workstation_list[1] && __SD_task_is_not_scheduled(after)) {
1283           SD_task_do_schedule(after);
1284           VERB4("Auto-Schedule comm task %s between %s -> %s. It costs %.f bytes",
1285               SD_task_get_name(after),
1286               SD_workstation_get_name(after->workstation_list[0]),SD_workstation_get_name(after->workstation_list[1]),
1287               after->communication_amount[2]);
1288
1289         }
1290       }
1291     }
1292   }
1293 }
1294 /** @brief autoschedule a task on a list of workstations
1295  *
1296  * This function is very similar to SD_task_schedulev(),
1297  * but takes the list of workstations to schedule onto as separate parameters.
1298  * It builds a proper vector of workstations and then call SD_task_schedulev()
1299  */
1300 void SD_task_schedulel(SD_task_t task, int count, ...) {
1301   va_list ap;
1302   SD_workstation_t *list=xbt_new(SD_workstation_t,count);
1303   int i;
1304   va_start(ap,count);
1305   for (i=0;i<count;i++) {
1306       list[i] = va_arg(ap,SD_workstation_t);
1307   }
1308   va_end(ap);
1309   SD_task_schedulev(task,count,list);
1310   free(list);
1311 }