3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \ingroup msg_gos_functions
15 * \brief This function is now deprecated and useless. Please stop using it.
17 MSG_error_t MSG_process_start(m_process_t process)
19 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
24 /** \ingroup msg_gos_functions
25 * \brief Listen on a channel and wait for receiving a task.
27 * It takes two parameter.
28 * \param task a memory location for storing a #m_task_t. It will
29 hold a task when this function will return. Thus \a task should not
30 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
31 those two condition does not hold, there will be a warning message.
32 * \param channel the channel on which the agent should be
33 listening. This value has to be >=0 and < than the maximal
34 number of channels fixed with MSG_set_channel_number().
35 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
36 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
38 MSG_error_t MSG_task_get(m_task_t * task,
41 m_process_t process = MSG_process_self();
44 simdata_task_t t_simdata = NULL;
45 simdata_host_t h_simdata = NULL;
47 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
51 xbt_assert0(task,"Null pointer for the task\n");
54 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
58 h_simdata = h->simdata;
59 while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
60 xbt_assert0(!(h_simdata->sleeping[channel]),
61 "A process is already blocked on this channel");
62 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
63 __MSG_process_block();
64 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
66 MSG_RETURN(MSG_HOST_FAILURE);
67 h_simdata->sleeping[channel] = NULL;
68 /* OK, we should both be ready now. Are you there ? */
71 t_simdata = t->simdata;
72 /* *task = __MSG_task_copy(t); */
78 t_simdata->comm = surf_workstation_resource->extension_public->
79 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
80 h->simdata->host, t_simdata->message_size,t_simdata->rate);
82 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
84 if(__MSG_process_isBlocked(t_simdata->sender))
85 __MSG_process_unblock(t_simdata->sender);
88 __MSG_task_wait_event(process, t);
89 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
90 } while (state==SURF_ACTION_RUNNING);
92 if(t->simdata->using>1) {
93 xbt_fifo_unshift(msg_global->process_to_run,process);
97 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
98 else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
100 MSG_RETURN(MSG_HOST_FAILURE);
101 else MSG_RETURN(MSG_TRANSFER_FAILURE);
104 /** \ingroup msg_gos_functions
105 * \brief Test whether there is a pending communication on a channel.
107 * It takes one parameter.
108 * \param channel the channel on which the agent should be
109 listening. This value has to be >=0 and < than the maximal
110 number of channels fixed with MSG_set_channel_number().
111 * \return 1 if there is a pending communication and 0 otherwise
113 int MSG_task_Iprobe(m_channel_t channel)
116 simdata_host_t h_simdata = NULL;
120 h_simdata = h->simdata;
121 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
124 /** \ingroup msg_gos_functions
125 * \brief Test whether there is a pending communication on a channel, and who sent it.
127 * It takes one parameter.
128 * \param channel the channel on which the agent should be
129 listening. This value has to be >=0 and < than the maximal
130 number of channels fixed with MSG_set_channel_number().
131 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
133 int MSG_task_probe_from(m_channel_t channel)
136 simdata_host_t h_simdata = NULL;
137 xbt_fifo_item_t item;
142 h_simdata = h->simdata;
144 item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
145 if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
148 return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
151 /** \ingroup msg_gos_functions
152 * \brief Put a task on a channel of an host and waits for the end of the
155 * This function is used for describing the behavior of an agent. It
156 * takes three parameter.
157 * \param task a #m_task_t to send on another location. This task
158 will not be usable anymore when the function will return. There is
159 no automatic task duplication and you have to save your parameters
160 before calling this function. Tasks are unique and once it has been
161 sent to another location, you should not access it anymore. You do
162 not need to call MSG_task_destroy() but to avoid using, as an
163 effect of inattention, this task anymore, you definitely should
164 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
165 can be transfered iff it has been correctly created with
167 * \param dest the destination of the message
168 * \param channel the channel on which the agent should put this
169 task. This value has to be >=0 and < than the maximal number of
170 channels fixed with MSG_set_channel_number().
171 * \return #MSG_FATAL if \a task is not properly initialized and
174 MSG_error_t MSG_task_put(m_task_t task,
175 m_host_t dest, m_channel_t channel)
177 m_process_t process = MSG_process_self();
178 simdata_task_t task_simdata = NULL;
179 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
180 m_host_t local_host = NULL;
181 m_host_t remote_host = NULL;
185 task_simdata = task->simdata;
186 task_simdata->sender = process;
187 xbt_assert0(task_simdata->using==1,"Gargl!");
188 task_simdata->comm = NULL;
190 local_host = ((simdata_process_t) process->simdata)->host;
193 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
194 mbox[channel], task);
196 if(remote_host->simdata->sleeping[channel])
197 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
199 process->simdata->put_host = dest;
200 process->simdata->put_channel = channel;
201 while(!(task_simdata->comm))
202 __MSG_process_block();
203 process->simdata->put_host = NULL;
204 process->simdata->put_channel = -1;
207 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
208 while (state==SURF_ACTION_RUNNING) {
209 __MSG_task_wait_event(process, task);
210 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
213 MSG_task_destroy(task);
215 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
216 else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
218 MSG_RETURN(MSG_HOST_FAILURE);
219 else MSG_RETURN(MSG_TRANSFER_FAILURE);
222 /** \ingroup msg_gos_functions
223 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
228 MSG_error_t MSG_task_put_bounded(m_task_t task,
229 m_host_t dest, m_channel_t channel,
230 long double max_rate)
232 task->simdata->rate=max_rate;
233 return(MSG_task_put(task, dest, channel));
236 /** \ingroup msg_gos_functions
237 * \brief Executes a task and waits for its termination.
239 * This function is used for describing the behavior of an agent. It
240 * takes only one parameter.
241 * \param task a #m_task_t to execute on the location on which the
243 * \return #MSG_FATAL if \a task is not properly initialized and
246 MSG_error_t MSG_task_execute(m_task_t task)
248 m_process_t process = MSG_process_self();
250 __MSG_task_execute(process, task);
251 return __MSG_wait_for_computation(process,task);
254 void __MSG_task_execute(m_process_t process, m_task_t task)
256 simdata_task_t simdata = NULL;
260 simdata = task->simdata;
262 simdata->compute = surf_workstation_resource->extension_public->
263 execute(MSG_process_get_host(process)->simdata->host,
264 simdata->computation_amount);
265 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
268 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
270 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
271 simdata_task_t simdata = task->simdata;
275 __MSG_task_wait_event(process, task);
276 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
277 } while (state==SURF_ACTION_RUNNING);
281 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
282 else if(surf_workstation_resource->extension_public->
283 get_state(MSG_process_get_host(process)->simdata->host)
285 MSG_RETURN(MSG_HOST_FAILURE);
286 else MSG_RETURN(MSG_TRANSFER_FAILURE);
289 /** \ingroup msg_gos_functions
290 * \brief Sleep for the specified number of seconds
292 * Makes the current process sleep until \a time seconds have elapsed.
294 * \param nb_sec a number of second
296 MSG_error_t MSG_process_sleep(long double nb_sec)
298 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
299 m_process_t process = MSG_process_self();
300 m_task_t dummy = NULL;
301 simdata_task_t simdata = NULL;
304 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
305 simdata = dummy->simdata;
307 simdata->compute = surf_workstation_resource->extension_public->
308 sleep(MSG_process_get_host(process)->simdata->host,
309 simdata->computation_amount);
310 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
315 __MSG_task_wait_event(process, dummy);
316 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
317 } while (state==SURF_ACTION_RUNNING);
320 if(state == SURF_ACTION_DONE) {
321 if(surf_workstation_resource->extension_public->
322 get_state(MSG_process_get_host(process)->simdata->host)
324 MSG_RETURN(MSG_HOST_FAILURE);
326 if(__MSG_process_isBlocked(process)) {
327 __MSG_process_unblock(MSG_process_self());
329 if(surf_workstation_resource->extension_public->
330 get_state(MSG_process_get_host(process)->simdata->host)
332 MSG_RETURN(MSG_HOST_FAILURE);
333 MSG_task_destroy(dummy);
335 } else MSG_RETURN(MSG_HOST_FAILURE);
338 /** \ingroup msg_gos_functions
339 * \brief Return the number of MSG tasks currently running on a
340 * the host of the current running process.
342 int MSG_get_msgload(void)
345 xbt_assert0(0,"Not implemented yet!");
350 /** \ingroup msg_gos_functions
352 * \brief Return the the last value returned by a MSG function (except
355 MSG_error_t MSG_get_errno(void)
357 return PROCESS_GET_ERRNO();