3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \ingroup msg_gos_functions
15 * \brief This function is now deprecated and useless. Please stop using it.
18 MSG_error_t MSG_process_start(m_process_t process)
20 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
25 /** \ingroup msg_gos_functions
26 * \brief Listen on a channel and wait for receiving a task.
28 * It takes two parameter.
29 * \param task a memory location for storing a #m_task_t. It will
30 hold a task when this function will return. Thus \a task should not
31 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
32 those two condition does not hold, there will be a warning message.
33 * \param channel the channel on which the agent should be
34 listening. This value has to be >=0 and < than the maximal
35 number of channels fixed with MSG_set_channel_number().
36 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
37 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
39 MSG_error_t MSG_task_get(m_task_t * task,
42 m_process_t process = MSG_process_self();
45 simdata_task_t t_simdata = NULL;
46 simdata_host_t h_simdata = NULL;
48 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
52 xbt_assert0(task,"Null pointer for the task\n");
55 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
59 h_simdata = h->simdata;
60 while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
61 xbt_assert0(!(h_simdata->sleeping[channel]),
62 "A process is already blocked on this channel");
63 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
64 __MSG_process_block();
65 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
67 MSG_RETURN(MSG_HOST_FAILURE);
68 h_simdata->sleeping[channel] = NULL;
69 /* OK, we should both be ready now. Are you there ? */
72 t_simdata = t->simdata;
73 /* *task = __MSG_task_copy(t); */
79 t_simdata->comm = surf_workstation_resource->extension_public->
80 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
81 h->simdata->host, t_simdata->message_size,t_simdata->rate);
83 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
85 if(__MSG_process_isBlocked(t_simdata->sender))
86 __MSG_process_unblock(t_simdata->sender);
89 __MSG_task_wait_event(process, t);
90 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
91 } while (state==SURF_ACTION_RUNNING);
93 if(t->simdata->using>1) {
94 xbt_fifo_unshift(msg_global->process_to_run,process);
98 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
99 else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
101 MSG_RETURN(MSG_HOST_FAILURE);
102 else MSG_RETURN(MSG_TRANSFER_FAILURE);
105 /** \ingroup msg_gos_functions
106 * \brief Test whether there is a pending communication on a channel.
108 * It takes one parameter.
109 * \param channel the channel on which the agent should be
110 listening. This value has to be >=0 and < than the maximal
111 number of channels fixed with MSG_set_channel_number().
112 * \return 1 if there is a pending communication and 0 otherwise
114 int MSG_task_Iprobe(m_channel_t channel)
117 simdata_host_t h_simdata = NULL;
121 h_simdata = h->simdata;
122 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
125 /** \ingroup msg_gos_functions
126 * \brief Test whether there is a pending communication on a channel, and who sent it.
128 * It takes one parameter.
129 * \param channel the channel on which the agent should be
130 listening. This value has to be >=0 and < than the maximal
131 number of channels fixed with MSG_set_channel_number().
132 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
134 int MSG_task_probe_from(m_channel_t channel)
137 simdata_host_t h_simdata = NULL;
138 xbt_fifo_item_t item;
143 h_simdata = h->simdata;
145 item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
146 if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
149 return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
152 /** \ingroup msg_gos_functions
153 * \brief Put a task on a channel of an host and waits for the end of the
156 * This function is used for describing the behavior of an agent. It
157 * takes three parameter.
158 * \param task a #m_task_t to send on another location. This task
159 will not be usable anymore when the function will return. There is
160 no automatic task duplication and you have to save your parameters
161 before calling this function. Tasks are unique and once it has been
162 sent to another location, you should not access it anymore. You do
163 not need to call MSG_task_destroy() but to avoid using, as an
164 effect of inattention, this task anymore, you definitely should
165 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
166 can be transfered iff it has been correctly created with
168 * \param dest the destination of the message
169 * \param channel the channel on which the agent should put this
170 task. This value has to be >=0 and < than the maximal number of
171 channels fixed with MSG_set_channel_number().
172 * \return #MSG_FATAL if \a task is not properly initialized and
175 MSG_error_t MSG_task_put(m_task_t task,
176 m_host_t dest, m_channel_t channel)
178 m_process_t process = MSG_process_self();
179 simdata_task_t task_simdata = NULL;
180 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
181 m_host_t local_host = NULL;
182 m_host_t remote_host = NULL;
186 task_simdata = task->simdata;
187 task_simdata->sender = process;
188 xbt_assert0(task_simdata->using==1,"Gargl!");
189 task_simdata->comm = NULL;
191 local_host = ((simdata_process_t) process->simdata)->host;
194 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
195 mbox[channel], task);
197 if(remote_host->simdata->sleeping[channel])
198 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
200 process->simdata->put_host = dest;
201 process->simdata->put_channel = channel;
202 while(!(task_simdata->comm))
203 __MSG_process_block();
204 process->simdata->put_host = NULL;
205 process->simdata->put_channel = -1;
208 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
209 while (state==SURF_ACTION_RUNNING) {
210 __MSG_task_wait_event(process, task);
211 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
214 MSG_task_destroy(task);
216 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
217 else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
219 MSG_RETURN(MSG_HOST_FAILURE);
220 else MSG_RETURN(MSG_TRANSFER_FAILURE);
223 MSG_error_t MSG_task_put_bounded(m_task_t task,
224 m_host_t dest, m_channel_t channel,
225 long double max_rate)
227 task->simdata->rate=max_rate;
228 return(MSG_task_put(task, dest, channel));
231 /** \ingroup msg_gos_functions
232 * \brief Executes a task and waits for its termination.
234 * This function is used for describing the behavior of an agent. It
235 * takes only one parameter.
236 * \param task a #m_task_t to execute on the location on which the
238 * \return #MSG_FATAL if \a task is not properly initialized and
241 MSG_error_t MSG_task_execute(m_task_t task)
243 m_process_t process = MSG_process_self();
245 __MSG_task_execute(process, task);
246 return __MSG_wait_for_computation(process,task);
249 void __MSG_task_execute(m_process_t process, m_task_t task)
251 simdata_task_t simdata = NULL;
255 simdata = task->simdata;
257 simdata->compute = surf_workstation_resource->extension_public->
258 execute(MSG_process_get_host(process)->simdata->host,
259 simdata->computation_amount);
260 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
263 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
265 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
266 simdata_task_t simdata = task->simdata;
270 __MSG_task_wait_event(process, task);
271 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
272 } while (state==SURF_ACTION_RUNNING);
276 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
277 else if(surf_workstation_resource->extension_public->
278 get_state(MSG_process_get_host(process)->simdata->host)
280 MSG_RETURN(MSG_HOST_FAILURE);
281 else MSG_RETURN(MSG_TRANSFER_FAILURE);
284 /** \ingroup msg_gos_functions
285 * \brief Sleep for the specified number of seconds
287 * Makes the current process sleep until \a time seconds have elapsed.
289 * \param nb_sec a number of second
291 MSG_error_t MSG_process_sleep(long double nb_sec)
293 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
294 m_process_t process = MSG_process_self();
295 m_task_t dummy = NULL;
296 simdata_task_t simdata = NULL;
299 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
300 simdata = dummy->simdata;
302 simdata->compute = surf_workstation_resource->extension_public->
303 sleep(MSG_process_get_host(process)->simdata->host,
304 simdata->computation_amount);
305 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
310 __MSG_task_wait_event(process, dummy);
311 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
312 } while (state==SURF_ACTION_RUNNING);
315 if(state == SURF_ACTION_DONE) {
316 if(surf_workstation_resource->extension_public->
317 get_state(MSG_process_get_host(process)->simdata->host)
319 MSG_RETURN(MSG_HOST_FAILURE);
321 if(__MSG_process_isBlocked(process)) {
322 __MSG_process_unblock(MSG_process_self());
324 if(surf_workstation_resource->extension_public->
325 get_state(MSG_process_get_host(process)->simdata->host)
327 MSG_RETURN(MSG_HOST_FAILURE);
328 MSG_task_destroy(dummy);
330 } else MSG_RETURN(MSG_HOST_FAILURE);
333 /** \ingroup msg_gos_functions
334 * \brief Return the number of MSG tasks currently running on a
335 * the host of the current running process.
337 int MSG_get_msgload(void)
340 xbt_assert0(0,"Not implemented yet!");
345 /** \ingroup msg_gos_functions
347 * \brief Return the the last value returned by a MSG function (except
350 MSG_error_t MSG_get_errno(void)
352 return PROCESS_GET_ERRNO();