Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
SD_task_t simplification
[simgrid.git] / src / simdag / sd_task.cpp
index a9cae7e..1631fa6 100644 (file)
@@ -20,8 +20,8 @@ static void __SD_task_destroy_scheduling_data(SD_task_t task)
 
   xbt_free(task->flops_amount);
   xbt_free(task->bytes_amount);
-  task->flops_amount = nullptr;
   task->bytes_amount = nullptr;
+  task->flops_amount = nullptr;
 }
 
 void* SD_task_new_f()
@@ -58,8 +58,6 @@ void SD_task_recycle_f(void *t)
   task->successors->clear();
 
   /* scheduling parameters */
-  task->host_count = 0;
-  task->host_list = nullptr;
   task->flops_amount = nullptr;
   task->bytes_amount = nullptr;
   task->rate = -1;
@@ -73,7 +71,6 @@ void SD_task_free_f(void *t)
   delete task->outputs;
   delete task->predecessors;
   delete task->successors;
-
   xbt_free(task);
 }
 
@@ -94,17 +91,15 @@ SD_task_t SD_task_create(const char *name, void *data, double amount)
   task->name = xbt_strdup(name);
   task->amount = amount;
   task->remains = amount;
-
+  task->allocation = new std::vector<sg_host_t>();
   return task;
 }
 
-static inline SD_task_t SD_task_create_sized(const char *name, void *data, double amount, int ws_count)
+static inline SD_task_t SD_task_create_sized(const char *name, void *data, double amount, int count)
 {
   SD_task_t task = SD_task_create(name, data, amount);
-  task->bytes_amount = xbt_new0(double, ws_count * ws_count);
-  task->flops_amount = xbt_new0(double, ws_count);
-  task->host_count = ws_count;
-  task->host_list = xbt_new0(sg_host_t, ws_count);
+  task->bytes_amount = xbt_new0(double, count * count);
+  task->flops_amount = xbt_new0(double, count);
   return task;
 }
 
@@ -191,7 +186,6 @@ SD_task_t SD_task_create_comp_par_amdahl(const char *name, void *data, double fl
 SD_task_t SD_task_create_comm_par_mxn_1d_block(const char *name, void *data, double amount)
 {
   SD_task_t res = SD_task_create(name, data, amount);
-  res->host_list=nullptr;
   res->kind = SD_TASK_COMM_PAR_MXN_1D_BLOCK;
 
   return res;
@@ -232,7 +226,7 @@ void SD_task_destroy(SD_task_t task)
   if (task->surf_action != nullptr)
     task->surf_action->unref();
 
-  xbt_free(task->host_list);
+  delete task->allocation;
   xbt_free(task->bytes_amount);
   xbt_free(task->flops_amount);
 
@@ -256,8 +250,7 @@ void *SD_task_get_data(SD_task_t task)
 /**
  * \brief Sets the user data of a task
  *
- * The new data can be \c nullptr. The old data should have been freed first
- * if it was not \c nullptr.
+ * The new data can be \c nullptr. The old data should have been freed first, if it was not \c nullptr.
  *
  * \param task a task
  * \param data the new data you want to associate with this task
@@ -344,12 +337,13 @@ void SD_task_set_state(SD_task_t task, e_SD_task_state_t new_state)
       task->finish_time = surf_get_clock();
     task->surf_action->unref();
     task->surf_action = nullptr;
+    task->allocation->clear();
   }
 
   task->state = new_state;
 
   if (task->watch_points & new_state) {
-    XBT_VERB("Watch point reached with task '%s'!", SD_task_get_name(task));
+    XBT_VERB("Watch point reached with task '%s'!", task->name);
     sd_global->watch_point_reached = true;
     SD_task_unwatch(task, new_state);   /* remove the watch point */
   }
@@ -416,7 +410,7 @@ xbt_dynar_t SD_task_get_children(SD_task_t task)
  */
 int SD_task_get_workstation_count(SD_task_t task)
 {
-  return task->host_count;
+  return task->allocation->size();
 }
 
 /**
@@ -427,7 +421,7 @@ int SD_task_get_workstation_count(SD_task_t task)
  */
 sg_host_t *SD_task_get_workstation_list(SD_task_t task)
 {
-  return task->host_list;
+  return &(*(task->allocation))[0];
 }
 
 /**
@@ -532,18 +526,18 @@ void SD_task_dump(SD_task_t task)
   if ((task->inputs->size()+ task->predecessors->size()) > 0) {
     XBT_INFO("  - pre-dependencies:");
     for (auto it : *task->predecessors)
-      XBT_INFO("    %s", SD_task_get_name(it));
+      XBT_INFO("    %s", it->name);
 
     for (auto it: *task->inputs)
-      XBT_INFO("    %s", SD_task_get_name(it));
+      XBT_INFO("    %s", it->name);
   }
   if ((task->outputs->size() + task->successors->size()) > 0) {
     XBT_INFO("  - post-dependencies:");
 
     for (auto it : *task->successors)
-      XBT_INFO("    %s", SD_task_get_name(it));
+      XBT_INFO("    %s", it->name);
     for (auto it : *task->outputs)
-      XBT_INFO("    %s", SD_task_get_name(it));
+      XBT_INFO("    %s", it->name);
   }
 }
 
@@ -778,9 +772,8 @@ static inline void SD_task_do_schedule(SD_task_t task)
 void SD_task_schedule(SD_task_t task, int host_count, const sg_host_t * host_list,
                       const double *flops_amount, const double *bytes_amount, double rate)
 {
-  xbt_assert(host_count > 0, "workstation_nb must be positive");
+  xbt_assert(host_count > 0, "host_count must be positive");
 
-  task->host_count = host_count;
   task->rate = rate;
 
   if (flops_amount) {
@@ -800,8 +793,8 @@ void SD_task_schedule(SD_task_t task, int host_count, const sg_host_t * host_lis
     task->bytes_amount = nullptr;
   }
 
-  task->host_list =  static_cast<sg_host_t*>(xbt_realloc(task->host_list, sizeof(sg_host_t) * host_count));
-  memcpy(task->host_list, host_list, sizeof(sg_host_t) * host_count);
+  for(int i =0; i<host_count; i++)
+    task->allocation->push_back(host_list[i]);
 
   SD_task_do_schedule(task);
 }
@@ -825,9 +818,7 @@ void SD_task_unschedule(SD_task_t task)
       && ((task->kind == SD_TASK_COMP_PAR_AMDAHL) || (task->kind == SD_TASK_COMM_PAR_MXN_1D_BLOCK))) {
           /* Don't free scheduling data for typed tasks */
     __SD_task_destroy_scheduling_data(task);
-    xbt_free(task->host_list);
-    task->host_list=nullptr;
-    task->host_count = 0;
+    task->allocation->clear();
   }
 
   if (SD_task_get_state(task) == SD_RUNNING)
@@ -847,16 +838,17 @@ void SD_task_unschedule(SD_task_t task)
 void SD_task_run(SD_task_t task)
 {
   xbt_assert(task->state == SD_RUNNABLE, "Task '%s' is not runnable! Task state: %d", task->name, (int) task->state);
-  xbt_assert(task->host_list != nullptr, "Task '%s': workstation_list is nullptr!", task->name);
+  xbt_assert(task->allocation != nullptr, "Task '%s': host_list is nullptr!", task->name);
 
   XBT_VERB("Executing task '%s'", task->name);
 
   /* Copy the elements of the task into the action */
-  int host_nb = task->host_count;
+  int host_nb = task->allocation->size();
+  XBT_DEBUG("%d", host_nb);
   sg_host_t *hosts = xbt_new(sg_host_t, host_nb);
-
-  for (int i = 0; i < host_nb; i++)
-    hosts[i] =  task->host_list[i];
+  int i =0;
+  for (auto host: *task->allocation)
+    hosts[i++] = host;
 
   double *flops_amount = xbt_new0(double, host_nb);
   double *bytes_amount = xbt_new0(double, host_nb * host_nb);
@@ -911,18 +903,15 @@ double SD_task_get_finish_time(SD_task_t task)
     return task->finish_time;
 }
 
-void SD_task_distribute_comp_amdahl(SD_task_t task, int ws_count)
+void SD_task_distribute_comp_amdahl(SD_task_t task, int count)
 {
   xbt_assert(task->kind == SD_TASK_COMP_PAR_AMDAHL, "Task %s is not a SD_TASK_COMP_PAR_AMDAHL typed task."
               "Cannot use this function.", task->name);
-  task->flops_amount = xbt_new0(double, ws_count);
-  task->bytes_amount = xbt_new0(double, ws_count * ws_count);
-  xbt_free(task->host_list);
-  task->host_count = ws_count;
-  task->host_list = xbt_new0(sg_host_t, ws_count);
-
-  for (int i=0; i<ws_count; i++){
-    task->flops_amount[i] = (task->alpha + (1 - task->alpha)/ws_count) * task->amount;
+  task->flops_amount = xbt_new0(double, count);
+  task->bytes_amount = xbt_new0(double, count * count);
+
+  for (int i=0; i<count; i++){
+    task->flops_amount[i] = (task->alpha + (1 - task->alpha)/count) * task->amount;
   }
 }
 
@@ -945,92 +934,74 @@ void SD_task_distribute_comp_amdahl(SD_task_t task, int ws_count)
 void SD_task_schedulev(SD_task_t task, int count, const sg_host_t * list)
 {
   xbt_assert(task->kind != 0, "Task %s is not typed. Cannot automatically schedule it.", SD_task_get_name(task));
-  switch (task->kind) {
-  case SD_TASK_COMP_PAR_AMDAHL:
-    SD_task_distribute_comp_amdahl(task, count);
-    /* no break */
-  case SD_TASK_COMM_E2E:
-  case SD_TASK_COMP_SEQ:
-    xbt_assert(task->host_count == count, "Got %d locations, but were expecting %d locations", count,task->host_count);
-    for (int i=0; i<count; i++)
-      task->host_list[i] = list[i];
-    if (SD_task_get_kind(task)== SD_TASK_COMP_SEQ && !task->flops_amount){
+
+  for(int i =0; i<count; i++)
+    task->allocation->push_back(list[i]);
+
+  if (task->kind == SD_TASK_COMP_SEQ) {
+    if (!task->flops_amount){
       /*This task has failed and is rescheduled. Reset the flops_amount*/
       task->flops_amount = xbt_new0(double, 1);
       task->flops_amount[0] = task->remains;
     }
+    XBT_VERB("Schedule computation task %s on %s. It costs %.f flops", task->name,
+             sg_host_get_name(task->allocation->at(0)), task->flops_amount[0]);
     SD_task_do_schedule(task);
-    break;
-  default:
-    xbt_die("Kind of task %s not supported by SD_task_schedulev()", SD_task_get_name(task));
-  }
-
-  if (task->kind == SD_TASK_COMM_E2E) {
-    XBT_VERB("Schedule comm task %s between %s -> %s. It costs %.f bytes", SD_task_get_name(task),
-          sg_host_get_name(task->host_list[0]), sg_host_get_name(task->host_list[1]), task->bytes_amount[2]);
-  }
-
-  /* Iterate over all inputs and outputs to say where I am located (and start them if runnable) */
-  if (task->kind == SD_TASK_COMP_SEQ) {
-    XBT_VERB("Schedule computation task %s on %s. It costs %.f flops", SD_task_get_name(task),
-          sg_host_get_name(task->host_list[0]), task->flops_amount[0]);
 
+    /* Iterate over all inputs and outputs to say where I am located (and start them if runnable) */
     for (auto input : *task->inputs){
-      input->host_list[1] = task->host_list[0];
-      if (input->host_list[0] && (SD_task_get_state(input) < SD_SCHEDULED)) {
+      input->allocation->push_back(task->allocation->front());
+      if (input->allocation->size () == 2) {
         SD_task_do_schedule(input);
         XBT_VERB ("Auto-Schedule comm task %s between %s -> %s. It costs %.f bytes", SD_task_get_name(input),
-                  sg_host_get_name(input->host_list[0]), sg_host_get_name(input->host_list[1]), input->bytes_amount[2]);
+                  sg_host_get_name(input->allocation->at(0)), sg_host_get_name(input->allocation->at(1)),
+                  input->bytes_amount[2]);
       }
     }
 
     for (auto output : *task->outputs){
-      output->host_list[0] = task->host_list[0];
-      if (output->host_list[1] && (SD_task_get_state(output) < SD_SCHEDULED)) {
+      output->allocation->insert(output->allocation->begin(),task->allocation->front());
+      if (output->allocation->size() == 2) {
         SD_task_do_schedule(output);
         XBT_VERB ("Auto-Schedule comm task %s between %s -> %s. It costs %.f bytes", SD_task_get_name(output),
-                  sg_host_get_name(output->host_list[0]), sg_host_get_name(output->host_list[1]),
+                  sg_host_get_name(output->allocation->at(0)), sg_host_get_name(output->allocation->at(1)),
                   output->bytes_amount[2]);
       }
     }
   }
 
-  /* Iterate over all children and parents being MXN_1D_BLOCK to say where I am located (and start them if runnable) */
   if (task->kind == SD_TASK_COMP_PAR_AMDAHL) {
-    XBT_VERB("Schedule computation task %s on %d workstations. %.f flops will be distributed following Amdahl's Law",
-          SD_task_get_name(task), task->host_count, task->flops_amount[0]);
+    SD_task_distribute_comp_amdahl(task, count);
+    XBT_VERB("Schedule computation task %s on %zu workstations. %.f flops will be distributed following Amdahl's Law",
+             task->name, task->allocation->size(), task->flops_amount[0]);
+    SD_task_do_schedule(task);
+
+    /* Iterate over all inputs and outputs to say where I am located (and start them if runnable) */
     for (auto input : *task->inputs){
-      if (!input->host_list){
-        XBT_VERB("Sender side of Task %s is not scheduled yet", SD_task_get_name(input));
-        input->host_list = xbt_new0(sg_host_t, count);
-        input->host_count = count;
-        XBT_VERB("Fill the workstation list with list of Task '%s'", SD_task_get_name(task));
-        for (int i=0; i<count; i++)
-          input->host_list[i] = task->host_list[i];
+      if (input->allocation->empty()){
+        XBT_VERB("Sender side of Task %s is not scheduled yet", input->name);
+        XBT_VERB("Fill the workstation list with list of Task '%s'", task->name);
+        for (int i=0; i<count;i++)
+          input->allocation->push_back(task->allocation->at(i));
       } else {
-        XBT_VERB("Build communication matrix for task '%s'", SD_task_get_name(input));
-        int src_nb, dst_nb;
-        double src_start, src_end, dst_start, dst_end;
-        src_nb = input->host_count;
-        dst_nb = count;
-        input->host_list = static_cast<sg_host_t*>(xbt_realloc(input->host_list, (input->host_count+count)*sizeof(sg_host_t)));
-        for (int i=0; i<count; i++)
-          input->host_list[input->host_count+i] = task->host_list[i];
-
-        input->host_count += count;
+        XBT_VERB("Build communication matrix for task '%s'", input->name);
+        int src_nb = input->allocation->size();
+        int dst_nb = count;
+        for (int i=0; i<count;i++)
+          input->allocation->push_back(task->allocation->at(i));
         xbt_free(input->flops_amount);
         xbt_free(input->bytes_amount);
-        input->flops_amount = xbt_new0(double, input->host_count);
-        input->bytes_amount = xbt_new0(double, input->host_count* input->host_count);
+        input->flops_amount = xbt_new0(double, input->allocation->size());
+        input->bytes_amount = xbt_new0(double, input->allocation->size() * input->allocation->size());
 
         for (int i=0; i<src_nb; i++) {
-          src_start = i*input->amount/src_nb;
-          src_end = src_start + input->amount/src_nb;
+          double src_start = i*input->amount/src_nb;
+          double src_end = src_start + input->amount/src_nb;
           for (int j=0; j<dst_nb; j++) {
-            dst_start = j*input->amount/dst_nb;
-            dst_end = dst_start + input->amount/dst_nb;
-            XBT_VERB("(%s->%s): (%.2f, %.2f)-> (%.2f, %.2f)", sg_host_get_name(input->host_list[i]),
-                sg_host_get_name(input->host_list[src_nb+j]), src_start, src_end, dst_start, dst_end);
+            double dst_start = j*input->amount/dst_nb;
+            double dst_end = dst_start + input->amount/dst_nb;
+            XBT_VERB("(%s->%s): (%.2f, %.2f)-> (%.2f, %.2f)", sg_host_get_name(input->allocation->at(i)),
+                sg_host_get_name(input->allocation->at(src_nb+j)), src_start, src_end, dst_start, dst_end);
             if ((src_end <= dst_start) || (dst_end <= src_start)) {
               input->bytes_amount[i*(src_nb+dst_nb)+src_nb+j]=0.0;
             } else {
@@ -1043,36 +1014,28 @@ void SD_task_schedulev(SD_task_t task, int count, const sg_host_t * list)
         if (SD_task_get_state(input)< SD_SCHEDULED) {
           SD_task_do_schedule(input);
           XBT_VERB ("Auto-Schedule redistribution task %s. Send %.f bytes from %d hosts to %d hosts.",
-                    SD_task_get_name(input),input->amount, src_nb, dst_nb);
+                    input->name,input->amount, src_nb, dst_nb);
         }
       }
     }
 
     for (auto output : *task->outputs) {
-      if (!output->host_list){
+      if (output->allocation->empty()){
         XBT_VERB("Receiver side of Task '%s' is not scheduled yet", SD_task_get_name(output));
-        output->host_list = xbt_new0(sg_host_t, count);
-        output->host_count = count;
         XBT_VERB("Fill the workstation list with list of Task '%s'", SD_task_get_name(task));
-        for (int i=0; i<count; i++)
-          output->host_list[i] = task->host_list[i];
+        for (int i=0; i<count;i++)
+          output->allocation->push_back(task->allocation->at(i));
       } else {
         double src_start, src_end, dst_start, dst_end;
         int src_nb = count;
-        int dst_nb = output->host_count;
-        output->host_list = static_cast<sg_host_t*>(xbt_realloc(output->host_list, (output->host_count+count)*sizeof(sg_host_t)));
-        for (int i=output->host_count - 1; i>=0; i--)
-          output->host_list[count+i] = output->host_list[i];
-        for (int i=0; i<count; i++)
-          output->host_list[i] = task->host_list[i];
-
-        output->host_count += count;
-
+        int dst_nb = output->allocation->size();
+        for (int i=0; i<count;i++)
+          output->allocation->insert(output->allocation->begin()+i, task->allocation->at(i));
         xbt_free(output->flops_amount);
         xbt_free(output->bytes_amount);
 
-        output->flops_amount = xbt_new0(double, output->host_count);
-        output->bytes_amount = xbt_new0(double, output->host_count* output->host_count);
+        output->flops_amount = xbt_new0(double, output->allocation->size());
+        output->bytes_amount = xbt_new0(double, output->allocation->size() * output->allocation->size());
 
         for (int i=0; i<src_nb; i++) {
           src_start = i*output->amount/src_nb;
@@ -1111,9 +1074,9 @@ void SD_task_schedulel(SD_task_t task, int count, ...)
   va_list ap;
   sg_host_t *list = xbt_new(sg_host_t, count);
   va_start(ap, count);
-  for (int i=0; i<count; i++) {
+  for (int i=0; i<count; i++)
     list[i] = va_arg(ap, sg_host_t);
-  }
+
   va_end(ap);
   SD_task_schedulev(task, count, list);
   free(list);