3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15 * \brief This section describes the functions that can be used
16 * by an agent for handling some task.
19 /** \ingroup msg_gos_functions
20 * \brief This function is now deprecated and useless. Please stop using it.
22 MSG_error_t MSG_process_start(m_process_t process)
24 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
29 /** \ingroup msg_gos_functions
30 * \brief Listen on a channel and wait for receiving a task.
32 * It takes two parameter.
33 * \param task a memory location for storing a #m_task_t. It will
34 hold a task when this function will return. Thus \a task should not
35 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
36 those two condition does not hold, there will be a warning message.
37 * \param channel the channel on which the agent should be
38 listening. This value has to be >=0 and < than the maximal
39 number of channels fixed with MSG_set_channel_number().
40 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
41 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
43 MSG_error_t MSG_task_get(m_task_t * task,
46 m_process_t process = MSG_process_self();
49 simdata_task_t t_simdata = NULL;
50 simdata_host_t h_simdata = NULL;
52 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
56 xbt_assert0(task,"Null pointer for the task\n");
59 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
63 h_simdata = h->simdata;
65 DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
67 while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
68 xbt_assert2(!(h_simdata->sleeping[channel]),
69 "A process (%s(%d)) is already blocked on this channel",
70 h_simdata->sleeping[channel]->name,
71 h_simdata->sleeping[channel]->simdata->PID);
72 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
73 __MSG_process_block();
74 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
76 MSG_RETURN(MSG_HOST_FAILURE);
77 h_simdata->sleeping[channel] = NULL;
78 /* OK, we should both be ready now. Are you there ? */
81 t_simdata = t->simdata;
82 /* *task = __MSG_task_copy(t); */
88 t_simdata->comm = surf_workstation_resource->extension_public->
89 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
90 h->simdata->host, t_simdata->message_size,t_simdata->rate);
92 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
94 if(__MSG_process_isBlocked(t_simdata->sender))
95 __MSG_process_unblock(t_simdata->sender);
97 PAJE_PROCESS_PUSH_STATE(process,"C");
100 __MSG_task_wait_event(process, t);
101 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
102 } while (state==SURF_ACTION_RUNNING);
104 if(t->simdata->using>1) {
105 xbt_fifo_unshift(msg_global->process_to_run,process);
109 PAJE_PROCESS_POP_STATE(process);
110 PAJE_COMM_STOP(process,t,channel);
112 if(state == SURF_ACTION_DONE) {
113 if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
114 t_simdata->comm = NULL;
116 } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
118 if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
119 t_simdata->comm = NULL;
120 MSG_RETURN(MSG_HOST_FAILURE);
122 if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
123 t_simdata->comm = NULL;
124 MSG_RETURN(MSG_TRANSFER_FAILURE);
128 /** \ingroup msg_gos_functions
129 * \brief Test whether there is a pending communication on a channel.
131 * It takes one parameter.
132 * \param channel the channel on which the agent should be
133 listening. This value has to be >=0 and < than the maximal
134 number of channels fixed with MSG_set_channel_number().
135 * \return 1 if there is a pending communication and 0 otherwise
137 int MSG_task_Iprobe(m_channel_t channel)
140 simdata_host_t h_simdata = NULL;
142 DEBUG2("Probing on channel %d (%s)", channel,h->name);
145 h_simdata = h->simdata;
146 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
149 /** \ingroup msg_gos_functions
150 * \brief Test whether there is a pending communication on a channel, and who sent it.
152 * It takes one parameter.
153 * \param channel the channel on which the agent should be
154 listening. This value has to be >=0 and < than the maximal
155 number of channels fixed with MSG_set_channel_number().
156 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
158 int MSG_task_probe_from(m_channel_t channel)
161 simdata_host_t h_simdata = NULL;
162 xbt_fifo_item_t item;
167 h_simdata = h->simdata;
169 DEBUG2("Probing on channel %d (%s)", channel,h->name);
171 item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
172 if (!item || !(t = xbt_fifo_get_item_content(item)))
175 return MSG_process_get_PID(t->simdata->sender);
178 /** \ingroup msg_gos_functions
179 * \brief Put a task on a channel of an host and waits for the end of the
182 * This function is used for describing the behavior of an agent. It
183 * takes three parameter.
184 * \param task a #m_task_t to send on another location. This task
185 will not be usable anymore when the function will return. There is
186 no automatic task duplication and you have to save your parameters
187 before calling this function. Tasks are unique and once it has been
188 sent to another location, you should not access it anymore. You do
189 not need to call MSG_task_destroy() but to avoid using, as an
190 effect of inattention, this task anymore, you definitely should
191 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
192 can be transfered iff it has been correctly created with
194 * \param dest the destination of the message
195 * \param channel the channel on which the agent should put this
196 task. This value has to be >=0 and < than the maximal number of
197 channels fixed with MSG_set_channel_number().
198 * \return #MSG_FATAL if \a task is not properly initialized and
201 MSG_error_t MSG_task_put(m_task_t task,
202 m_host_t dest, m_channel_t channel)
204 m_process_t process = MSG_process_self();
205 simdata_task_t task_simdata = NULL;
206 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
207 m_host_t local_host = NULL;
208 m_host_t remote_host = NULL;
212 task_simdata = task->simdata;
213 task_simdata->sender = process;
214 xbt_assert0(task_simdata->using==1,"Gargl!");
215 task_simdata->comm = NULL;
217 local_host = ((simdata_process_t) process->simdata)->host;
220 DEBUG4("Trying to send a task (%lg Mb) from %s to %s on channel %d",
221 task->simdata->message_size,local_host->name, remote_host->name, channel);
223 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
224 mbox[channel], task);
226 PAJE_COMM_START(process,task,channel);
228 if(remote_host->simdata->sleeping[channel])
229 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
231 process->simdata->put_host = dest;
232 process->simdata->put_channel = channel;
233 while(!(task_simdata->comm))
234 __MSG_process_block();
235 surf_workstation_resource->common_public->action_use(task_simdata->comm);
236 process->simdata->put_host = NULL;
237 process->simdata->put_channel = -1;
240 PAJE_PROCESS_PUSH_STATE(process,"C");
242 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
243 while (state==SURF_ACTION_RUNNING) {
244 __MSG_task_wait_event(process, task);
245 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
248 MSG_task_destroy(task);
250 PAJE_PROCESS_POP_STATE(process);
252 if(state == SURF_ACTION_DONE) {
253 if(surf_workstation_resource->common_public->action_free(task_simdata->comm))
254 task_simdata->comm = NULL;
256 } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
258 if(surf_workstation_resource->common_public->action_free(task_simdata->comm))
259 task_simdata->comm = NULL;
260 MSG_RETURN(MSG_HOST_FAILURE);
262 if(surf_workstation_resource->common_public->action_free(task_simdata->comm))
263 task_simdata->comm = NULL;
264 MSG_RETURN(MSG_TRANSFER_FAILURE);
268 /** \ingroup msg_gos_functions
269 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
274 MSG_error_t MSG_task_put_bounded(m_task_t task,
275 m_host_t dest, m_channel_t channel,
278 task->simdata->rate=max_rate;
279 return(MSG_task_put(task, dest, channel));
280 task->simdata->rate=-1.0;
283 /** \ingroup msg_gos_functions
284 * \brief Executes a task and waits for its termination.
286 * This function is used for describing the behavior of an agent. It
287 * takes only one parameter.
288 * \param task a #m_task_t to execute on the location on which the
290 * \return #MSG_FATAL if \a task is not properly initialized and
293 MSG_error_t MSG_task_execute(m_task_t task)
295 m_process_t process = MSG_process_self();
298 DEBUG1("Computing on %s", process->simdata->host->name);
300 __MSG_task_execute(process, task);
302 PAJE_PROCESS_PUSH_STATE(process,"E");
303 res = __MSG_wait_for_computation(process,task);
304 PAJE_PROCESS_POP_STATE(process);
308 void __MSG_task_execute(m_process_t process, m_task_t task)
310 simdata_task_t simdata = NULL;
314 simdata = task->simdata;
316 simdata->compute = surf_workstation_resource->extension_public->
317 execute(MSG_process_get_host(process)->simdata->host,
318 simdata->computation_amount);
319 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
322 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
324 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
325 simdata_task_t simdata = task->simdata;
329 __MSG_task_wait_event(process, task);
330 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
331 } while (state==SURF_ACTION_RUNNING);
335 if(state == SURF_ACTION_DONE) {
336 if(surf_workstation_resource->common_public->action_free(simdata->compute))
337 simdata->compute = NULL;
339 } else if(surf_workstation_resource->extension_public->
340 get_state(MSG_process_get_host(process)->simdata->host)
342 if(surf_workstation_resource->common_public->action_free(simdata->compute))
343 simdata->compute = NULL;
344 MSG_RETURN(MSG_HOST_FAILURE);
346 if(surf_workstation_resource->common_public->action_free(simdata->compute))
347 simdata->compute = NULL;
348 MSG_RETURN(MSG_TRANSFER_FAILURE);
352 /** \ingroup msg_gos_functions
353 * \brief Sleep for the specified number of seconds
355 * Makes the current process sleep until \a time seconds have elapsed.
357 * \param nb_sec a number of second
359 MSG_error_t MSG_process_sleep(double nb_sec)
361 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
362 m_process_t process = MSG_process_self();
363 m_task_t dummy = NULL;
364 simdata_task_t simdata = NULL;
367 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
368 simdata = dummy->simdata;
370 simdata->compute = surf_workstation_resource->extension_public->
371 sleep(MSG_process_get_host(process)->simdata->host,
372 simdata->computation_amount);
373 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
378 __MSG_task_wait_event(process, dummy);
379 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
380 } while (state==SURF_ACTION_RUNNING);
383 if(state == SURF_ACTION_DONE) {
384 if(surf_workstation_resource->extension_public->
385 get_state(MSG_process_get_host(process)->simdata->host)
387 if(surf_workstation_resource->common_public->action_free(simdata->compute))
388 simdata->compute = NULL;
389 MSG_RETURN(MSG_HOST_FAILURE);
391 if(__MSG_process_isBlocked(process)) {
392 __MSG_process_unblock(MSG_process_self());
394 if(surf_workstation_resource->extension_public->
395 get_state(MSG_process_get_host(process)->simdata->host)
397 if(surf_workstation_resource->common_public->action_free(simdata->compute))
398 simdata->compute = NULL;
399 MSG_RETURN(MSG_HOST_FAILURE);
401 if(surf_workstation_resource->common_public->action_free(simdata->compute))
402 simdata->compute = NULL;
403 MSG_task_destroy(dummy);
405 } else MSG_RETURN(MSG_HOST_FAILURE);
408 /** \ingroup msg_gos_functions
409 * \brief Return the number of MSG tasks currently running on a
410 * the host of the current running process.
412 int MSG_get_msgload(void)
418 process = MSG_process_self();
419 return xbt_fifo_size(process->simdata->host->simdata->process_list);
422 /** \ingroup msg_gos_functions
424 * \brief Return the the last value returned by a MSG function (except
427 MSG_error_t MSG_get_errno(void)
429 return PROCESS_GET_ERRNO();