3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \ingroup msg_gos_functions
15 * \brief This function is now deprecated and useless. Please stop using it.
17 MSG_error_t MSG_process_start(m_process_t process)
19 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
24 /** \ingroup msg_gos_functions
25 * \brief Listen on a channel and wait for receiving a task.
27 * It takes two parameter.
28 * \param task a memory location for storing a #m_task_t. It will
29 hold a task when this function will return. Thus \a task should not
30 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
31 those two condition does not hold, there will be a warning message.
32 * \param channel the channel on which the agent should be
33 listening. This value has to be >=0 and < than the maximal
34 number of channels fixed with MSG_set_channel_number().
35 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
36 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
38 MSG_error_t MSG_task_get(m_task_t * task,
41 m_process_t process = MSG_process_self();
44 simdata_task_t t_simdata = NULL;
45 simdata_host_t h_simdata = NULL;
47 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
51 xbt_assert0(task,"Null pointer for the task\n");
54 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
58 h_simdata = h->simdata;
59 while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
60 xbt_assert2(!(h_simdata->sleeping[channel]),
61 "A process (%s(%d)) is already blocked on this channel",
62 h_simdata->sleeping[channel]->name,
63 h_simdata->sleeping[channel]->simdata->PID);
64 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
65 __MSG_process_block();
66 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
68 MSG_RETURN(MSG_HOST_FAILURE);
69 h_simdata->sleeping[channel] = NULL;
70 /* OK, we should both be ready now. Are you there ? */
73 t_simdata = t->simdata;
74 /* *task = __MSG_task_copy(t); */
80 t_simdata->comm = surf_workstation_resource->extension_public->
81 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
82 h->simdata->host, t_simdata->message_size,t_simdata->rate);
84 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
86 if(__MSG_process_isBlocked(t_simdata->sender))
87 __MSG_process_unblock(t_simdata->sender);
89 PAJE_PROCESS_STATE(process,"C");
92 __MSG_task_wait_event(process, t);
93 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
94 } while (state==SURF_ACTION_RUNNING);
96 if(t->simdata->using>1) {
97 xbt_fifo_unshift(msg_global->process_to_run,process);
101 PAJE_COMM_STOP(process,t,channel);
103 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
104 else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
106 MSG_RETURN(MSG_HOST_FAILURE);
107 else MSG_RETURN(MSG_TRANSFER_FAILURE);
110 /** \ingroup msg_gos_functions
111 * \brief Test whether there is a pending communication on a channel.
113 * It takes one parameter.
114 * \param channel the channel on which the agent should be
115 listening. This value has to be >=0 and < than the maximal
116 number of channels fixed with MSG_set_channel_number().
117 * \return 1 if there is a pending communication and 0 otherwise
119 int MSG_task_Iprobe(m_channel_t channel)
122 simdata_host_t h_simdata = NULL;
126 h_simdata = h->simdata;
127 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
130 /** \ingroup msg_gos_functions
131 * \brief Test whether there is a pending communication on a channel, and who sent it.
133 * It takes one parameter.
134 * \param channel the channel on which the agent should be
135 listening. This value has to be >=0 and < than the maximal
136 number of channels fixed with MSG_set_channel_number().
137 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
139 int MSG_task_probe_from(m_channel_t channel)
142 simdata_host_t h_simdata = NULL;
143 xbt_fifo_item_t item;
148 h_simdata = h->simdata;
150 item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
151 if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
154 return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
157 /** \ingroup msg_gos_functions
158 * \brief Put a task on a channel of an host and waits for the end of the
161 * This function is used for describing the behavior of an agent. It
162 * takes three parameter.
163 * \param task a #m_task_t to send on another location. This task
164 will not be usable anymore when the function will return. There is
165 no automatic task duplication and you have to save your parameters
166 before calling this function. Tasks are unique and once it has been
167 sent to another location, you should not access it anymore. You do
168 not need to call MSG_task_destroy() but to avoid using, as an
169 effect of inattention, this task anymore, you definitely should
170 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
171 can be transfered iff it has been correctly created with
173 * \param dest the destination of the message
174 * \param channel the channel on which the agent should put this
175 task. This value has to be >=0 and < than the maximal number of
176 channels fixed with MSG_set_channel_number().
177 * \return #MSG_FATAL if \a task is not properly initialized and
180 MSG_error_t MSG_task_put(m_task_t task,
181 m_host_t dest, m_channel_t channel)
183 m_process_t process = MSG_process_self();
184 simdata_task_t task_simdata = NULL;
185 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
186 m_host_t local_host = NULL;
187 m_host_t remote_host = NULL;
191 task_simdata = task->simdata;
192 task_simdata->sender = process;
193 xbt_assert0(task_simdata->using==1,"Gargl!");
194 task_simdata->comm = NULL;
196 local_host = ((simdata_process_t) process->simdata)->host;
199 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
200 mbox[channel], task);
202 PAJE_COMM_START(process,task,channel);
204 if(remote_host->simdata->sleeping[channel])
205 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
207 process->simdata->put_host = dest;
208 process->simdata->put_channel = channel;
209 while(!(task_simdata->comm))
210 __MSG_process_block();
211 process->simdata->put_host = NULL;
212 process->simdata->put_channel = -1;
215 PAJE_PROCESS_STATE(process,"C");
217 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
218 while (state==SURF_ACTION_RUNNING) {
219 __MSG_task_wait_event(process, task);
220 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
223 MSG_task_destroy(task);
225 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
226 else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
228 MSG_RETURN(MSG_HOST_FAILURE);
229 else MSG_RETURN(MSG_TRANSFER_FAILURE);
232 /** \ingroup msg_gos_functions
233 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
238 MSG_error_t MSG_task_put_bounded(m_task_t task,
239 m_host_t dest, m_channel_t channel,
240 long double max_rate)
242 task->simdata->rate=max_rate;
243 return(MSG_task_put(task, dest, channel));
244 task->simdata->rate=-1.0;
247 /** \ingroup msg_gos_functions
248 * \brief Executes a task and waits for its termination.
250 * This function is used for describing the behavior of an agent. It
251 * takes only one parameter.
252 * \param task a #m_task_t to execute on the location on which the
254 * \return #MSG_FATAL if \a task is not properly initialized and
257 MSG_error_t MSG_task_execute(m_task_t task)
259 m_process_t process = MSG_process_self();
261 __MSG_task_execute(process, task);
262 PAJE_PROCESS_STATE(process,"E");
263 return __MSG_wait_for_computation(process,task);
266 void __MSG_task_execute(m_process_t process, m_task_t task)
268 simdata_task_t simdata = NULL;
272 simdata = task->simdata;
274 simdata->compute = surf_workstation_resource->extension_public->
275 execute(MSG_process_get_host(process)->simdata->host,
276 simdata->computation_amount);
277 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
280 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
282 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
283 simdata_task_t simdata = task->simdata;
287 __MSG_task_wait_event(process, task);
288 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
289 } while (state==SURF_ACTION_RUNNING);
293 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
294 else if(surf_workstation_resource->extension_public->
295 get_state(MSG_process_get_host(process)->simdata->host)
297 MSG_RETURN(MSG_HOST_FAILURE);
298 else MSG_RETURN(MSG_TRANSFER_FAILURE);
301 /** \ingroup msg_gos_functions
302 * \brief Sleep for the specified number of seconds
304 * Makes the current process sleep until \a time seconds have elapsed.
306 * \param nb_sec a number of second
308 MSG_error_t MSG_process_sleep(long double nb_sec)
310 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
311 m_process_t process = MSG_process_self();
312 m_task_t dummy = NULL;
313 simdata_task_t simdata = NULL;
316 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
317 simdata = dummy->simdata;
319 simdata->compute = surf_workstation_resource->extension_public->
320 sleep(MSG_process_get_host(process)->simdata->host,
321 simdata->computation_amount);
322 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
327 __MSG_task_wait_event(process, dummy);
328 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
329 } while (state==SURF_ACTION_RUNNING);
332 if(state == SURF_ACTION_DONE) {
333 if(surf_workstation_resource->extension_public->
334 get_state(MSG_process_get_host(process)->simdata->host)
336 MSG_RETURN(MSG_HOST_FAILURE);
338 if(__MSG_process_isBlocked(process)) {
339 __MSG_process_unblock(MSG_process_self());
341 if(surf_workstation_resource->extension_public->
342 get_state(MSG_process_get_host(process)->simdata->host)
344 MSG_RETURN(MSG_HOST_FAILURE);
345 MSG_task_destroy(dummy);
347 } else MSG_RETURN(MSG_HOST_FAILURE);
350 /** \ingroup msg_gos_functions
351 * \brief Return the number of MSG tasks currently running on a
352 * the host of the current running process.
354 int MSG_get_msgload(void)
357 xbt_assert0(0,"Not implemented yet!");
362 /** \ingroup msg_gos_functions
364 * \brief Return the the last value returned by a MSG function (except
367 MSG_error_t MSG_get_errno(void)
369 return PROCESS_GET_ERRNO();