Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
[simgrid.git] / src / msg / msg_task.cpp
1 /* Copyright (c) 2004-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "msg_private.h"
8 #include "src/simix/smx_private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11
12 /** @addtogroup m_task_management
13  *
14  *  Since most scheduling algorithms rely on a concept of task  that can be either <em>computed</em> locally or
15  *  <em>transferred</em> on another processor, it seems to be the right level of abstraction for our purposes.
16  *  A <em>task</em> may then be defined by a <em>computing amount</em>, a <em>message size</em> and
17  *  some <em>private data</em>.
18  */
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_task, msg, "Logging specific to MSG (task)");
21
22 /********************************* Task **************************************/
23 /** \ingroup m_task_management
24  * \brief Creates a new #msg_task_t.
25  *
26  * A constructor for #msg_task_t taking four arguments and returning the corresponding object.
27  * \param name a name for the object. It is for user-level information and can be NULL.
28  * \param flop_amount a value of the processing amount (in flop) needed to process this new task.
29  * If 0, then it cannot be executed with MSG_task_execute(). This value has to be >=0.
30  * \param message_size a value of the amount of data (in bytes) needed to transfer this new task. If 0, then it cannot
31  * be transfered with MSG_task_send() and MSG_task_recv(). This value has to be >=0.
32  * \param data a pointer to any data may want to attach to the new object.  It is for user-level information and can
33  * be NULL. It can be retrieved with the function \ref MSG_task_get_data.
34  * \see msg_task_t
35  * \return The new corresponding object.
36  */
37 msg_task_t MSG_task_create(const char *name, double flop_amount, double message_size, void *data)
38 {
39   msg_task_t task = xbt_new(s_msg_task_t, 1);
40   simdata_task_t simdata = xbt_new(s_simdata_task_t, 1);
41   task->simdata = simdata;
42
43   /* Task structure */
44   task->name = xbt_strdup(name);
45   task->data = data;
46
47   /* Simulator Data */
48   simdata->compute = NULL;
49   simdata->comm = NULL;
50   simdata->bytes_amount = message_size;
51   simdata->flops_amount = flop_amount;
52   simdata->sender = NULL;
53   simdata->receiver = NULL;
54   simdata->source = NULL;
55   simdata->priority = 1.0;
56   simdata->bound = 0;
57   simdata->affinity_mask_db = xbt_dict_new_homogeneous(NULL);
58   simdata->rate = -1.0;
59   simdata->isused = 0;
60
61   simdata->host_nb = 0;
62   simdata->host_list = NULL;
63   simdata->flops_parallel_amount = NULL;
64   simdata->bytes_parallel_amount = NULL;
65   TRACE_msg_task_create(task);
66
67   return task;
68 }
69
70 /** \ingroup m_task_management
71  * \brief Creates a new #msg_task_t (a parallel one....).
72  *
73  * A constructor for #msg_task_t taking six arguments and returning the corresponding object.
74  * \param name a name for the object. It is for user-level information and can be NULL.
75  * \param host_nb the number of hosts implied in the parallel task.
76  * \param host_list an array of \p host_nb msg_host_t.
77  * \param flops_amount an array of \p host_nb doubles.
78  *        flops_amount[i] is the total number of operations that have to be performed on host_list[i].
79  * \param bytes_amount an array of \p host_nb* \p host_nb doubles.
80  * \param data a pointer to any data may want to attach to the new object.
81  *             It is for user-level information and can be NULL.
82  *             It can be retrieved with the function \ref MSG_task_get_data.
83  * \see msg_task_t
84  * \return The new corresponding object.
85  */
86 msg_task_t MSG_parallel_task_create(const char *name, int host_nb, const msg_host_t * host_list,
87                                     double *flops_amount, double *bytes_amount, void *data)
88 {
89   msg_task_t task = MSG_task_create(name, 0, 0, data);
90   simdata_task_t simdata = task->simdata;
91   int i;
92
93   /* Simulator Data specific to parallel tasks */
94   simdata->host_nb = host_nb;
95   simdata->host_list = xbt_new0(sg_host_t, host_nb);
96   simdata->flops_parallel_amount = flops_amount;
97   simdata->bytes_parallel_amount = bytes_amount;
98
99   for (i = 0; i < host_nb; i++)
100     simdata->host_list[i] = host_list[i];
101
102   return task;
103 }
104
105 /*************** Begin GPU ***************/
106 /** \ingroup m_task_management
107  * \brief Creates a new #msg_gpu_task_t.
108
109  * A constructor for #msg_gpu_task_t taking four arguments and returning a pointer to the new created GPU task.
110
111  * \param name a name for the object. It is for user-level information and can be NULL.
112  * \param flops_amount a value of the processing amount (in flop)needed to process this new task. If 0, then it cannot
113  * be executed with MSG_gpu_task_execute(). This value has to be >=0.
114  * \param dispatch_latency time in seconds to load this task on the GPU
115  * \param collect_latency time in seconds to transfer result from the GPU back to the CPU (host) when done
116
117  * \see msg_gpu_task_t
118  * \return The new corresponding object.
119  */
120 msg_gpu_task_t MSG_gpu_task_create(const char *name, double flops_amount, double dispatch_latency,
121                                    double collect_latency)
122 {
123   msg_gpu_task_t task = xbt_new(s_msg_gpu_task_t, 1);
124   simdata_gpu_task_t simdata = xbt_new(s_simdata_gpu_task_t, 1);
125   task->simdata = simdata;
126   /* Task structure */
127   task->name = xbt_strdup(name);
128
129   /* Simulator Data */
130   simdata->flops_amount = flops_amount;
131   simdata->dispatch_latency   = dispatch_latency;
132   simdata->collect_latency    = collect_latency;
133
134   /* TRACE_msg_gpu_task_create(task); FIXME*/
135   return task;
136 }
137 /*************** End GPU ***************/
138
139 /** \ingroup m_task_management
140  * \brief Return the user data of a #msg_task_t.
141  *
142  * This function checks whether \a task is a valid pointer and return the user data associated to \a task if possible.
143  */
144 void *MSG_task_get_data(msg_task_t task)
145 {
146   xbt_assert((task != NULL), "Invalid parameter");
147   return (task->data);
148 }
149
150 /** \ingroup m_task_management
151  * \brief Sets the user data of a #msg_task_t.
152  *
153  * This function allows to associate a new pointer to the user data associated of \a task.
154  */
155 void MSG_task_set_data(msg_task_t task, void *data)
156 {
157   xbt_assert((task != NULL), "Invalid parameter");
158   task->data = data;
159 }
160
161 /** \ingroup m_task_management
162  * \brief Sets a function to be called when a task has just been copied.
163  * \param callback a callback function
164  */
165 void MSG_task_set_copy_callback(void (*callback) (msg_task_t task, msg_process_t sender, msg_process_t receiver)) {
166
167   msg_global->task_copy_callback = callback;
168
169   if (callback) {
170     SIMIX_comm_set_copy_data_callback(MSG_comm_copy_data_from_SIMIX);
171   } else {
172     SIMIX_comm_set_copy_data_callback(SIMIX_comm_copy_pointer_callback);
173   }
174 }
175
176 /** \ingroup m_task_management
177  * \brief Return the sender of a #msg_task_t.
178  *
179  * This functions returns the #msg_process_t which sent this task
180  */
181 msg_process_t MSG_task_get_sender(msg_task_t task)
182 {
183   xbt_assert(task, "Invalid parameters");
184   return ((simdata_task_t) task->simdata)->sender;
185 }
186
187 /** \ingroup m_task_management
188  * \brief Return the source of a #msg_task_t.
189  *
190  * This functions returns the #msg_host_t from which this task was sent
191  */
192 msg_host_t MSG_task_get_source(msg_task_t task)
193 {
194   xbt_assert(task, "Invalid parameters");
195   return ((simdata_task_t) task->simdata)->source;
196 }
197
198 /** \ingroup m_task_management
199  * \brief Return the name of a #msg_task_t.
200  *
201  * This functions returns the name of a #msg_task_t as specified on creation
202  */
203 const char *MSG_task_get_name(msg_task_t task)
204 {
205   xbt_assert(task, "Invalid parameters");
206   return task->name;
207 }
208
209 /** \ingroup m_task_management
210  * \brief Sets the name of a #msg_task_t.
211  *
212  * This functions allows to associate a name to a task
213  */
214 void MSG_task_set_name(msg_task_t task, const char *name)
215 {
216   xbt_assert(task, "Invalid parameters");
217   task->name = xbt_strdup(name);
218 }
219
220 /** \ingroup m_task_management
221  * \brief Destroy a #msg_task_t.
222  *
223  * Destructor for #msg_task_t. Note that you should free user data, if any, \b before calling this function.
224  *
225  * Only the process that owns the task can destroy it.
226  * The owner changes after a successful send.
227  * If a task is successfully sent, the receiver becomes the owner and is supposed to destroy it. The sender should not
228  * use it anymore.
229  * If the task failed to be sent, the sender remains the owner of the task.
230  */
231 msg_error_t MSG_task_destroy(msg_task_t task)
232 {
233   smx_synchro_t action = NULL;
234   xbt_assert((task != NULL), "Invalid parameter");
235
236   if (task->simdata->isused) {
237     /* the task is being sent or executed: cancel it first */
238     MSG_task_cancel(task);
239   }
240   TRACE_msg_task_destroy(task);
241
242   xbt_free(task->name);
243
244   action = task->simdata->compute;
245   if (action)
246     simcall_execution_destroy(action);
247
248   /* parallel tasks only */
249   xbt_free(task->simdata->host_list);
250
251   xbt_dict_free(&task->simdata->affinity_mask_db);
252
253   /* free main structures */
254   xbt_free(task->simdata);
255   xbt_free(task);
256
257   return MSG_OK;
258 }
259
260 /** \ingroup m_task_usage
261  * \brief Cancel a #msg_task_t.
262  * \param task the task to cancel. If it was executed or transfered, it stops the process that were working on it.
263  */
264 msg_error_t MSG_task_cancel(msg_task_t task)
265 {
266   xbt_assert((task != NULL), "Cannot cancel a NULL task");
267
268   if (task->simdata->compute) {
269     simcall_execution_cancel(task->simdata->compute);
270   }
271   else if (task->simdata->comm) {
272     simdata_task_t simdata = task->simdata;
273     simcall_comm_cancel(simdata->comm);
274     if (msg_global->debug_multiple_use && simdata->isused!=0)
275       xbt_ex_free(*(xbt_ex_t*)simdata->isused);
276     simdata->isused = 0;
277   }
278   return MSG_OK;
279 }
280
281 /** \ingroup m_task_management
282  * \brief Returns the remaining amount of flops needed to execute a task #msg_task_t.
283  *
284  * Once a task has been processed, this amount is set to 0. If you want, you can reset this value with
285  * #MSG_task_set_flops_amount before restarting the task.
286  */
287 double MSG_task_get_flops_amount(msg_task_t task) {
288   if (task->simdata->compute) {
289     return simcall_execution_get_remains(task->simdata->compute);
290   } else {
291     return task->simdata->flops_amount;
292   }
293 }
294
295 /** \ingroup m_task_management
296  * \brief set the computation amount needed to process a task #msg_task_t.
297  *
298  * \warning If the computation is ongoing (already started and not finished),
299  * it is not modified by this call. Moreover, after its completion, the ongoing execution with set the flops_amount to
300  * zero, overriding any value set during the execution.
301  */
302 void MSG_task_set_flops_amount(msg_task_t task, double flops_amount)
303 {
304   task->simdata->flops_amount = flops_amount;
305 }
306
307 /** \ingroup m_task_management
308  * \brief set the amount data attached with a task #msg_task_t.
309  *
310  * \warning If the transfer is ongoing (already started and not finished), it is not modified by this call.
311  */
312 void MSG_task_set_bytes_amount(msg_task_t task, double data_size)
313 {
314   task->simdata->bytes_amount = data_size;
315 }
316
317 /** \ingroup m_task_management
318  * \brief Returns the total amount received by a task #msg_task_t.
319  *        If the communication does not exist it will return 0.
320  *        So, if the communication has FINISHED or FAILED it returns zero.
321  */
322 double MSG_task_get_remaining_communication(msg_task_t task)
323 {
324   xbt_assert((task != NULL) && (task->simdata != NULL), "Invalid parameter");
325   XBT_DEBUG("calling simcall_communication_get_remains(%p)", task->simdata->comm);
326   return simcall_comm_get_remains(task->simdata->comm);
327 }
328
329 /** \ingroup m_task_management
330  * \brief Returns the size of the data attached to a task #msg_task_t.
331  */
332 double MSG_task_get_bytes_amount(msg_task_t task)
333 {
334   xbt_assert((task != NULL) && (task->simdata != NULL), "Invalid parameter");
335   return task->simdata->bytes_amount;
336 }
337
338 /** \ingroup m_task_management
339  * \brief Changes the priority of a computation task. This priority doesn't affect the transfer rate. A priority of 2
340  *        will make a task receive two times more cpu power than the other ones.
341  */
342 void MSG_task_set_priority(msg_task_t task, double priority)
343 {
344   xbt_assert((task != NULL) && (task->simdata != NULL), "Invalid parameter");
345   task->simdata->priority = 1 / priority;
346   if (task->simdata->compute)
347     simcall_execution_set_priority(task->simdata->compute,
348         task->simdata->priority);
349 }
350
351 /** \ingroup m_task_management
352  * \brief Changes the maximum CPU utilization of a computation task.
353  *        Unit is flops/s.
354  *
355  * For VMs, there is a pitfall. Please see MSG_vm_set_bound().
356  */
357 void MSG_task_set_bound(msg_task_t task, double bound)
358 {
359   xbt_assert(task, "Invalid parameter");
360   xbt_assert(task->simdata, "Invalid parameter");
361
362   if (bound == 0)
363     XBT_INFO("bound == 0 means no capping (i.e., unlimited).");
364
365   task->simdata->bound = bound;
366   if (task->simdata->compute)
367     simcall_execution_set_bound(task->simdata->compute, task->simdata->bound);
368 }
369
370 /** \ingroup m_task_management
371  * \brief Changes the CPU affinity of a computation task.
372  *
373  * When pinning the given task to the first CPU core of the given host, use 0x01 for the mask value. Each bit of the
374  * mask value corresponds to each CPU core. See taskset(1) on Linux.
375  *
376  * \param task a target task
377  * \param host the host having a multi-core CPU
378  * \param mask the bit mask of a new CPU affinity setting for the task
379  *
380  * Usage:
381  * 0. Define a host with multiple cores.
382  *    \<host id="PM0" power="1E8" core="2"/\>
383  *
384  * 1. Pin a given task to the first CPU core of a host.
385  *   MSG_task_set_affinity(task, pm0, 0x01);
386  *
387  * 2. Pin a given task to the third CPU core of a host. Turn on the third bit of the mask.
388  *   MSG_task_set_affinity(task, pm0, 0x04); // 0x04 == 100B
389  *
390  * 3. Pin a given VM to the first CPU core of a host.
391  *   MSG_vm_set_affinity(vm, pm0, 0x01);
392  *
393  * See examples/msg/cloud/multicore.c for more information.
394  *
395  * Note:
396  * 1. The current code does not allow an affinity of a task to multiple cores.
397  *    The mask value 0x03 (i.e., a given task will be executed on the first core or the second core) is not allowed.
398  *    The mask value 0x01 or 0x02 works. See cpu_cas01.c for details.
399  *
400  * 2. It is recommended to first compare simulation results in both the Lazy and Full calculation modes
401  *    (using --cfg=cpu/optim:Full or not). Fix cpu_cas01.c if you find wrong results in the Lazy mode.
402  */
403 void MSG_task_set_affinity(msg_task_t task, msg_host_t host, unsigned long mask)
404 {
405   xbt_assert(task, "Invalid parameter");
406   xbt_assert(task->simdata, "Invalid parameter");
407
408   if (mask == 0) {
409     /* 0 means clear */
410       /* We need remove_ext() not throwing exception. */
411       void *ret = xbt_dict_get_or_null_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(msg_host_t));
412       if (ret != NULL)
413         xbt_dict_remove_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(host));
414   } else
415     xbt_dict_set_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(host), (void *)(uintptr_t) mask, NULL);
416
417   /* We set affinity data of this task. If the task is being executed, we actually change the affinity setting of the
418    * task. Otherwise, this change will be applied when the task is executed. */
419   if (!task->simdata->compute) {
420     /* task is not yet executed */
421     XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(host),
422              MSG_task_get_name(task));
423     return;
424   }
425
426   {
427     smx_synchro_t compute = task->simdata->compute;
428     msg_host_t host_now = compute->execution.host;  // simix_private.h is necessary
429     if (host_now != host) {
430       /* task is not yet executed on this host */
431       XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(host),
432                MSG_task_get_name(task));
433       return;
434     }
435
436     /* task is being executed on this host. so change the affinity now */
437     {
438       /* check it works. remove me if it works. */
439       xbt_assert((unsigned long)(uintptr_t) xbt_dict_get_or_null_ext(task->simdata->affinity_mask_db,
440                  (char *) host, sizeof(msg_host_t)) == mask);
441     }
442
443     XBT_INFO("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(host), MSG_task_get_name(task));
444     simcall_execution_set_affinity(task->simdata->compute, host, mask);
445   }
446 }