3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \ingroup msg_gos_functions
15 * \brief This function is now deprecated and useless. Please stop using it.
17 MSG_error_t MSG_process_start(m_process_t process)
19 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
24 /** \ingroup msg_gos_functions
25 * \brief Listen on a channel and wait for receiving a task.
27 * It takes two parameter.
28 * \param task a memory location for storing a #m_task_t. It will
29 hold a task when this function will return. Thus \a task should not
30 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
31 those two condition does not hold, there will be a warning message.
32 * \param channel the channel on which the agent should be
33 listening. This value has to be >=0 and < than the maximal
34 number of channels fixed with MSG_set_channel_number().
35 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
36 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
38 MSG_error_t MSG_task_get(m_task_t * task,
41 m_process_t process = MSG_process_self();
44 simdata_task_t t_simdata = NULL;
45 simdata_host_t h_simdata = NULL;
47 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
51 xbt_assert0(task,"Null pointer for the task\n");
54 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
58 h_simdata = h->simdata;
59 while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
60 xbt_assert2(!(h_simdata->sleeping[channel]),
61 "A process (%s(%d)) is already blocked on this channel",
62 h_simdata->sleeping[channel]->name,
63 h_simdata->sleeping[channel]->simdata->PID);
64 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
65 __MSG_process_block();
66 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
68 MSG_RETURN(MSG_HOST_FAILURE);
69 h_simdata->sleeping[channel] = NULL;
70 /* OK, we should both be ready now. Are you there ? */
73 t_simdata = t->simdata;
74 /* *task = __MSG_task_copy(t); */
80 t_simdata->comm = surf_workstation_resource->extension_public->
81 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
82 h->simdata->host, t_simdata->message_size,t_simdata->rate);
84 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
86 if(__MSG_process_isBlocked(t_simdata->sender))
87 __MSG_process_unblock(t_simdata->sender);
89 PAJE_PROCESS_PUSH_STATE(process,"C");
92 __MSG_task_wait_event(process, t);
93 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
94 } while (state==SURF_ACTION_RUNNING);
96 if(t->simdata->using>1) {
97 xbt_fifo_unshift(msg_global->process_to_run,process);
101 PAJE_PROCESS_POP_STATE(process);
102 PAJE_COMM_STOP(process,t,channel);
104 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
105 else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
107 MSG_RETURN(MSG_HOST_FAILURE);
108 else MSG_RETURN(MSG_TRANSFER_FAILURE);
111 /** \ingroup msg_gos_functions
112 * \brief Test whether there is a pending communication on a channel.
114 * It takes one parameter.
115 * \param channel the channel on which the agent should be
116 listening. This value has to be >=0 and < than the maximal
117 number of channels fixed with MSG_set_channel_number().
118 * \return 1 if there is a pending communication and 0 otherwise
120 int MSG_task_Iprobe(m_channel_t channel)
123 simdata_host_t h_simdata = NULL;
127 h_simdata = h->simdata;
128 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
131 /** \ingroup msg_gos_functions
132 * \brief Test whether there is a pending communication on a channel, and who sent it.
134 * It takes one parameter.
135 * \param channel the channel on which the agent should be
136 listening. This value has to be >=0 and < than the maximal
137 number of channels fixed with MSG_set_channel_number().
138 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
140 int MSG_task_probe_from(m_channel_t channel)
143 simdata_host_t h_simdata = NULL;
144 xbt_fifo_item_t item;
149 h_simdata = h->simdata;
151 item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
152 if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
155 return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
158 /** \ingroup msg_gos_functions
159 * \brief Put a task on a channel of an host and waits for the end of the
162 * This function is used for describing the behavior of an agent. It
163 * takes three parameter.
164 * \param task a #m_task_t to send on another location. This task
165 will not be usable anymore when the function will return. There is
166 no automatic task duplication and you have to save your parameters
167 before calling this function. Tasks are unique and once it has been
168 sent to another location, you should not access it anymore. You do
169 not need to call MSG_task_destroy() but to avoid using, as an
170 effect of inattention, this task anymore, you definitely should
171 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
172 can be transfered iff it has been correctly created with
174 * \param dest the destination of the message
175 * \param channel the channel on which the agent should put this
176 task. This value has to be >=0 and < than the maximal number of
177 channels fixed with MSG_set_channel_number().
178 * \return #MSG_FATAL if \a task is not properly initialized and
181 MSG_error_t MSG_task_put(m_task_t task,
182 m_host_t dest, m_channel_t channel)
184 m_process_t process = MSG_process_self();
185 simdata_task_t task_simdata = NULL;
186 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
187 m_host_t local_host = NULL;
188 m_host_t remote_host = NULL;
192 task_simdata = task->simdata;
193 task_simdata->sender = process;
194 xbt_assert0(task_simdata->using==1,"Gargl!");
195 task_simdata->comm = NULL;
197 local_host = ((simdata_process_t) process->simdata)->host;
200 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
201 mbox[channel], task);
203 PAJE_COMM_START(process,task,channel);
205 if(remote_host->simdata->sleeping[channel])
206 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
208 process->simdata->put_host = dest;
209 process->simdata->put_channel = channel;
210 while(!(task_simdata->comm))
211 __MSG_process_block();
212 process->simdata->put_host = NULL;
213 process->simdata->put_channel = -1;
216 PAJE_PROCESS_PUSH_STATE(process,"C");
218 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
219 while (state==SURF_ACTION_RUNNING) {
220 __MSG_task_wait_event(process, task);
221 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
224 MSG_task_destroy(task);
226 PAJE_PROCESS_POP_STATE(process);
228 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
229 else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
231 MSG_RETURN(MSG_HOST_FAILURE);
232 else MSG_RETURN(MSG_TRANSFER_FAILURE);
235 /** \ingroup msg_gos_functions
236 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
241 MSG_error_t MSG_task_put_bounded(m_task_t task,
242 m_host_t dest, m_channel_t channel,
243 long double max_rate)
245 task->simdata->rate=max_rate;
246 return(MSG_task_put(task, dest, channel));
247 task->simdata->rate=-1.0;
250 /** \ingroup msg_gos_functions
251 * \brief Executes a task and waits for its termination.
253 * This function is used for describing the behavior of an agent. It
254 * takes only one parameter.
255 * \param task a #m_task_t to execute on the location on which the
257 * \return #MSG_FATAL if \a task is not properly initialized and
260 MSG_error_t MSG_task_execute(m_task_t task)
262 m_process_t process = MSG_process_self();
264 __MSG_task_execute(process, task);
266 PAJE_PROCESS_PUSH_STATE(process,"E");
267 res = __MSG_wait_for_computation(process,task);
268 PAJE_PROCESS_POP_STATE(process);
272 void __MSG_task_execute(m_process_t process, m_task_t task)
274 simdata_task_t simdata = NULL;
278 simdata = task->simdata;
280 simdata->compute = surf_workstation_resource->extension_public->
281 execute(MSG_process_get_host(process)->simdata->host,
282 simdata->computation_amount);
283 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
286 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
288 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
289 simdata_task_t simdata = task->simdata;
293 __MSG_task_wait_event(process, task);
294 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
295 } while (state==SURF_ACTION_RUNNING);
299 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
300 else if(surf_workstation_resource->extension_public->
301 get_state(MSG_process_get_host(process)->simdata->host)
303 MSG_RETURN(MSG_HOST_FAILURE);
304 else MSG_RETURN(MSG_TRANSFER_FAILURE);
307 /** \ingroup msg_gos_functions
308 * \brief Sleep for the specified number of seconds
310 * Makes the current process sleep until \a time seconds have elapsed.
312 * \param nb_sec a number of second
314 MSG_error_t MSG_process_sleep(long double nb_sec)
316 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
317 m_process_t process = MSG_process_self();
318 m_task_t dummy = NULL;
319 simdata_task_t simdata = NULL;
322 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
323 simdata = dummy->simdata;
325 simdata->compute = surf_workstation_resource->extension_public->
326 sleep(MSG_process_get_host(process)->simdata->host,
327 simdata->computation_amount);
328 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
333 __MSG_task_wait_event(process, dummy);
334 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
335 } while (state==SURF_ACTION_RUNNING);
338 if(state == SURF_ACTION_DONE) {
339 if(surf_workstation_resource->extension_public->
340 get_state(MSG_process_get_host(process)->simdata->host)
342 MSG_RETURN(MSG_HOST_FAILURE);
344 if(__MSG_process_isBlocked(process)) {
345 __MSG_process_unblock(MSG_process_self());
347 if(surf_workstation_resource->extension_public->
348 get_state(MSG_process_get_host(process)->simdata->host)
350 MSG_RETURN(MSG_HOST_FAILURE);
351 MSG_task_destroy(dummy);
353 } else MSG_RETURN(MSG_HOST_FAILURE);
356 /** \ingroup msg_gos_functions
357 * \brief Return the number of MSG tasks currently running on a
358 * the host of the current running process.
360 int MSG_get_msgload(void)
363 xbt_assert0(0,"Not implemented yet!");
368 /** \ingroup msg_gos_functions
370 * \brief Return the the last value returned by a MSG function (except
373 MSG_error_t MSG_get_errno(void)
375 return PROCESS_GET_ERRNO();