3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \ingroup msg_gos_functions
15 * \brief This function is now deprecated and useless. Please stop using it.
18 MSG_error_t MSG_process_start(m_process_t process)
20 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
25 /** \ingroup msg_gos_functions
26 * \brief Listen on a channel and wait for receiving a task.
28 * It takes two parameter.
29 * \param task a memory location for storing a #m_task_t. It will
30 hold a task when this function will return. Thus \a task should not
31 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
32 those two condition does not hold, there will be a warning message.
33 * \param channel the channel on which the agent should be
34 listening. This value has to be >=0 and < than the maximal
35 number of channels fixed with MSG_set_channel_number().
36 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
37 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
39 MSG_error_t MSG_task_get(m_task_t * task,
42 m_process_t process = MSG_process_self();
45 simdata_task_t t_simdata = NULL;
46 simdata_host_t h_simdata = NULL;
48 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
52 xbt_assert0(task,"Null pointer for the task\n");
55 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
59 h_simdata = h->simdata;
60 while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
61 xbt_assert0(!(h_simdata->sleeping[channel]),
62 "A process is already blocked on this channel");
63 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
64 MSG_process_suspend(process);
65 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
67 MSG_RETURN(MSG_HOST_FAILURE);
68 h_simdata->sleeping[channel] = NULL;
69 /* OK, we should both be ready now. Are you there ? */
72 t_simdata = t->simdata;
73 /* *task = __MSG_task_copy(t); */
78 t_simdata->comm = surf_workstation_resource->extension_public->
79 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
80 h->simdata->host, t_simdata->message_size);
81 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
83 if(MSG_process_isSuspended(t_simdata->sender))
84 MSG_process_resume(t_simdata->sender);
87 __MSG_task_wait_event(process, t);
88 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
89 } while (state==SURF_ACTION_RUNNING);
91 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
92 else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
94 MSG_RETURN(MSG_HOST_FAILURE);
95 else MSG_RETURN(MSG_TRANSFER_FAILURE);
98 /** \ingroup msg_gos_functions
99 * \brief Test whether there is a pending communication on a channel.
101 * It takes one parameter.
102 * \param channel the channel on which the agent should be
103 listening. This value has to be >=0 and < than the maximal
104 number of channels fixed with MSG_set_channel_number().
105 * \return 1 if there is a pending communication and 0 otherwise
107 int MSG_task_Iprobe(m_channel_t channel)
110 simdata_host_t h_simdata = NULL;
114 h_simdata = h->simdata;
115 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
118 /** \ingroup msg_gos_functions
119 * \brief Test whether there is a pending communication on a channel, and who sent it.
121 * It takes one parameter.
122 * \param channel the channel on which the agent should be
123 listening. This value has to be >=0 and < than the maximal
124 number of channels fixed with MSG_set_channel_number().
125 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
127 int MSG_task_probe_from(m_channel_t channel)
130 simdata_host_t h_simdata = NULL;
131 xbt_fifo_item_t item;
136 h_simdata = h->simdata;
138 item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
139 if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
142 return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
145 /** \ingroup msg_gos_functions
146 * \brief Put a task on a channel of an host and waits for the end of the
149 * This function is used for describing the behavior of an agent. It
150 * takes three parameter.
151 * \param task a #m_task_t to send on another location. This task
152 will not be usable anymore when the function will return. There is
153 no automatic task duplication and you have to save your parameters
154 before calling this function. Tasks are unique and once it has been
155 sent to another location, you should not access it anymore. You do
156 not need to call MSG_task_destroy() but to avoid using, as an
157 effect of inattention, this task anymore, you definitely should
158 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
159 can be transfered iff it has been correctly created with
161 * \param dest the destination of the message
162 * \param channel the channel on which the agent should put this
163 task. This value has to be >=0 and < than the maximal number of
164 channels fixed with MSG_set_channel_number().
165 * \return #MSG_FATAL if \a task is not properly initialized and
168 MSG_error_t MSG_task_put(m_task_t task,
169 m_host_t dest, m_channel_t channel)
171 m_process_t process = MSG_process_self();
172 simdata_task_t task_simdata = NULL;
173 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
174 m_host_t local_host = NULL;
175 m_host_t remote_host = NULL;
179 task_simdata = task->simdata;
180 task_simdata->sender = process;
181 xbt_assert0(task_simdata->using==1,"Gargl!");
182 task_simdata->comm = NULL;
184 local_host = ((simdata_process_t) process->simdata)->host;
187 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
188 mbox[channel], task);
190 if(remote_host->simdata->sleeping[channel])
191 MSG_process_resume(remote_host->simdata->sleeping[channel]);
193 process->simdata->put_host = dest;
194 process->simdata->put_channel = channel;
195 while(!(task_simdata->comm))
196 MSG_process_suspend(process);
197 process->simdata->put_host = NULL;
198 process->simdata->put_channel = -1;
202 __MSG_task_wait_event(process, task);
203 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
204 } while (state==SURF_ACTION_RUNNING);
206 MSG_task_destroy(task);
208 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
209 else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
211 MSG_RETURN(MSG_HOST_FAILURE);
212 else MSG_RETURN(MSG_TRANSFER_FAILURE);
215 /** \ingroup msg_gos_functions
216 * \brief Executes a task and waits for its termination.
218 * This function is used for describing the behavior of an agent. It
219 * takes only one parameter.
220 * \param task a #m_task_t to execute on the location on which the
222 * \return #MSG_FATAL if \a task is not properly initialized and
225 MSG_error_t MSG_task_execute(m_task_t task)
227 m_process_t process = MSG_process_self();
229 __MSG_task_execute(process, task);
230 return __MSG_wait_for_computation(process,task);
233 void __MSG_task_execute(m_process_t process, m_task_t task)
235 simdata_task_t simdata = NULL;
239 simdata = task->simdata;
241 simdata->compute = surf_workstation_resource->extension_public->
242 execute(MSG_process_get_host(process)->simdata->host,
243 simdata->computation_amount);
244 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
247 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
249 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
250 simdata_task_t simdata = task->simdata;
254 __MSG_task_wait_event(process, task);
255 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
256 } while (state==SURF_ACTION_RUNNING);
260 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
261 else if(surf_workstation_resource->extension_public->
262 get_state(MSG_process_get_host(process)->simdata->host)
264 MSG_RETURN(MSG_HOST_FAILURE);
265 else MSG_RETURN(MSG_TRANSFER_FAILURE);
268 /** \ingroup msg_gos_functions
269 * \brief Sleep for the specified number of seconds
271 * Makes the current process sleep until \a time seconds have elapsed.
273 * \param nb_sec a number of second
275 MSG_error_t MSG_process_sleep(long double nb_sec)
277 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
278 m_process_t process = MSG_process_self();
279 m_task_t dummy = NULL;
280 simdata_task_t simdata = NULL;
283 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
284 simdata = dummy->simdata;
286 simdata->compute = surf_workstation_resource->extension_public->
287 sleep(MSG_process_get_host(process)->simdata->host,
288 simdata->computation_amount);
289 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
294 __MSG_task_wait_event(process, dummy);
295 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
296 } while (state==SURF_ACTION_RUNNING);
299 if(state == SURF_ACTION_DONE) {
300 if(surf_workstation_resource->extension_public->
301 get_state(MSG_process_get_host(process)->simdata->host)
303 MSG_RETURN(MSG_HOST_FAILURE);
305 if(MSG_process_isSuspended(process)) {
306 MSG_process_suspend(MSG_process_self());
308 if(surf_workstation_resource->extension_public->
309 get_state(MSG_process_get_host(process)->simdata->host)
311 MSG_RETURN(MSG_HOST_FAILURE);
312 MSG_task_destroy(dummy);
314 } else MSG_RETURN(MSG_HOST_FAILURE);
317 /** \ingroup msg_gos_functions
318 * \brief Return the number of MSG tasks currently running on a
319 * the host of the current running process.
321 int MSG_get_msgload(void)
324 xbt_assert0(0,"Not implemented yet!");
329 /** \ingroup msg_gos_functions
331 * \brief Return the the last value returned by a MSG function (except
334 MSG_error_t MSG_get_errno(void)
336 return PROCESS_GET_ERRNO();