3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \ingroup msg_gos_functions
15 * \brief This function is now deprecated and useless. Please stop using it.
18 MSG_error_t MSG_process_start(m_process_t process)
20 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
25 /** \ingroup msg_gos_functions
26 * \brief Listen on a channel and wait for receiving a task.
28 * It takes two parameter.
29 * \param task a memory location for storing a #m_task_t. It will
30 hold a task when this function will return. Thus \a task should not
31 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
32 those two condition does not hold, there will be a warning message.
33 * \param channel the channel on which the agent should be
34 listening. This value has to be >=0 and < than the maximal
35 number of channels fixed with MSG_set_channel_number().
36 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
37 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
39 MSG_error_t MSG_task_get(m_task_t * task,
42 m_process_t process = MSG_process_self();
45 simdata_task_t t_simdata = NULL;
46 simdata_host_t h_simdata = NULL;
48 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
52 xbt_assert0(task,"Null pointer for the task\n");
55 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
59 h_simdata = h->simdata;
60 while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
61 xbt_assert0(!(h_simdata->sleeping[channel]),
62 "A process is already blocked on this channel");
63 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
64 __MSG_process_block();
65 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
67 MSG_RETURN(MSG_HOST_FAILURE);
68 h_simdata->sleeping[channel] = NULL;
69 /* OK, we should both be ready now. Are you there ? */
72 t_simdata = t->simdata;
73 /* *task = __MSG_task_copy(t); */
79 t_simdata->comm = surf_workstation_resource->extension_public->
80 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
81 h->simdata->host, t_simdata->message_size,t_simdata->rate);
83 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
85 if(__MSG_process_isBlocked(t_simdata->sender))
86 __MSG_process_unblock(t_simdata->sender);
89 __MSG_task_wait_event(process, t);
90 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
91 } while (state==SURF_ACTION_RUNNING);
93 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
94 else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
96 MSG_RETURN(MSG_HOST_FAILURE);
97 else MSG_RETURN(MSG_TRANSFER_FAILURE);
100 /** \ingroup msg_gos_functions
101 * \brief Test whether there is a pending communication on a channel.
103 * It takes one parameter.
104 * \param channel the channel on which the agent should be
105 listening. This value has to be >=0 and < than the maximal
106 number of channels fixed with MSG_set_channel_number().
107 * \return 1 if there is a pending communication and 0 otherwise
109 int MSG_task_Iprobe(m_channel_t channel)
112 simdata_host_t h_simdata = NULL;
116 h_simdata = h->simdata;
117 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
120 /** \ingroup msg_gos_functions
121 * \brief Test whether there is a pending communication on a channel, and who sent it.
123 * It takes one parameter.
124 * \param channel the channel on which the agent should be
125 listening. This value has to be >=0 and < than the maximal
126 number of channels fixed with MSG_set_channel_number().
127 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
129 int MSG_task_probe_from(m_channel_t channel)
132 simdata_host_t h_simdata = NULL;
133 xbt_fifo_item_t item;
138 h_simdata = h->simdata;
140 item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
141 if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
144 return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
147 /** \ingroup msg_gos_functions
148 * \brief Put a task on a channel of an host and waits for the end of the
151 * This function is used for describing the behavior of an agent. It
152 * takes three parameter.
153 * \param task a #m_task_t to send on another location. This task
154 will not be usable anymore when the function will return. There is
155 no automatic task duplication and you have to save your parameters
156 before calling this function. Tasks are unique and once it has been
157 sent to another location, you should not access it anymore. You do
158 not need to call MSG_task_destroy() but to avoid using, as an
159 effect of inattention, this task anymore, you definitely should
160 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
161 can be transfered iff it has been correctly created with
163 * \param dest the destination of the message
164 * \param channel the channel on which the agent should put this
165 task. This value has to be >=0 and < than the maximal number of
166 channels fixed with MSG_set_channel_number().
167 * \return #MSG_FATAL if \a task is not properly initialized and
170 MSG_error_t MSG_task_put(m_task_t task,
171 m_host_t dest, m_channel_t channel)
173 m_process_t process = MSG_process_self();
174 simdata_task_t task_simdata = NULL;
175 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
176 m_host_t local_host = NULL;
177 m_host_t remote_host = NULL;
181 task_simdata = task->simdata;
182 task_simdata->sender = process;
183 xbt_assert0(task_simdata->using==1,"Gargl!");
184 task_simdata->comm = NULL;
186 local_host = ((simdata_process_t) process->simdata)->host;
189 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
190 mbox[channel], task);
192 if(remote_host->simdata->sleeping[channel])
193 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
195 process->simdata->put_host = dest;
196 process->simdata->put_channel = channel;
197 while(!(task_simdata->comm))
198 __MSG_process_block();
199 process->simdata->put_host = NULL;
200 process->simdata->put_channel = -1;
203 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
204 while (state==SURF_ACTION_RUNNING) {
205 __MSG_task_wait_event(process, task);
206 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
209 MSG_task_destroy(task);
211 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
212 else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
214 MSG_RETURN(MSG_HOST_FAILURE);
215 else MSG_RETURN(MSG_TRANSFER_FAILURE);
218 MSG_error_t MSG_task_put_bounded(m_task_t task,
219 m_host_t dest, m_channel_t channel,
220 long double max_rate)
222 task->simdata->rate=max_rate;
223 return(MSG_task_put(task, dest, channel));
226 /** \ingroup msg_gos_functions
227 * \brief Executes a task and waits for its termination.
229 * This function is used for describing the behavior of an agent. It
230 * takes only one parameter.
231 * \param task a #m_task_t to execute on the location on which the
233 * \return #MSG_FATAL if \a task is not properly initialized and
236 MSG_error_t MSG_task_execute(m_task_t task)
238 m_process_t process = MSG_process_self();
240 __MSG_task_execute(process, task);
241 return __MSG_wait_for_computation(process,task);
244 void __MSG_task_execute(m_process_t process, m_task_t task)
246 simdata_task_t simdata = NULL;
250 simdata = task->simdata;
252 simdata->compute = surf_workstation_resource->extension_public->
253 execute(MSG_process_get_host(process)->simdata->host,
254 simdata->computation_amount);
255 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
258 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
260 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
261 simdata_task_t simdata = task->simdata;
265 __MSG_task_wait_event(process, task);
266 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
267 } while (state==SURF_ACTION_RUNNING);
271 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
272 else if(surf_workstation_resource->extension_public->
273 get_state(MSG_process_get_host(process)->simdata->host)
275 MSG_RETURN(MSG_HOST_FAILURE);
276 else MSG_RETURN(MSG_TRANSFER_FAILURE);
279 /** \ingroup msg_gos_functions
280 * \brief Sleep for the specified number of seconds
282 * Makes the current process sleep until \a time seconds have elapsed.
284 * \param nb_sec a number of second
286 MSG_error_t MSG_process_sleep(long double nb_sec)
288 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
289 m_process_t process = MSG_process_self();
290 m_task_t dummy = NULL;
291 simdata_task_t simdata = NULL;
294 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
295 simdata = dummy->simdata;
297 simdata->compute = surf_workstation_resource->extension_public->
298 sleep(MSG_process_get_host(process)->simdata->host,
299 simdata->computation_amount);
300 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
305 __MSG_task_wait_event(process, dummy);
306 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
307 } while (state==SURF_ACTION_RUNNING);
310 if(state == SURF_ACTION_DONE) {
311 if(surf_workstation_resource->extension_public->
312 get_state(MSG_process_get_host(process)->simdata->host)
314 MSG_RETURN(MSG_HOST_FAILURE);
316 if(__MSG_process_isBlocked(process)) {
317 __MSG_process_unblock(MSG_process_self());
319 if(surf_workstation_resource->extension_public->
320 get_state(MSG_process_get_host(process)->simdata->host)
322 MSG_RETURN(MSG_HOST_FAILURE);
323 MSG_task_destroy(dummy);
325 } else MSG_RETURN(MSG_HOST_FAILURE);
328 /** \ingroup msg_gos_functions
329 * \brief Return the number of MSG tasks currently running on a
330 * the host of the current running process.
332 int MSG_get_msgload(void)
335 xbt_assert0(0,"Not implemented yet!");
340 /** \ingroup msg_gos_functions
342 * \brief Return the the last value returned by a MSG function (except
345 MSG_error_t MSG_get_errno(void)
347 return PROCESS_GET_ERRNO();