3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15 * \brief This section describes the functions that can be used
16 * by an agent for handling some task.
19 /** \ingroup msg_gos_functions
20 * \brief This function is now deprecated and useless. Please stop using it.
22 MSG_error_t MSG_process_start(m_process_t process)
24 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
29 /** \ingroup msg_gos_functions
30 * \brief Listen on a channel and wait for receiving a task.
32 * It takes two parameter.
33 * \param task a memory location for storing a #m_task_t. It will
34 hold a task when this function will return. Thus \a task should not
35 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
36 those two condition does not hold, there will be a warning message.
37 * \param channel the channel on which the agent should be
38 listening. This value has to be >=0 and < than the maximal
39 number of channels fixed with MSG_set_channel_number().
40 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
41 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
43 MSG_error_t MSG_task_get(m_task_t * task,
46 m_process_t process = MSG_process_self();
49 simdata_task_t t_simdata = NULL;
50 simdata_host_t h_simdata = NULL;
52 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
56 xbt_assert0(task,"Null pointer for the task\n");
59 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
63 h_simdata = h->simdata;
64 while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
65 xbt_assert2(!(h_simdata->sleeping[channel]),
66 "A process (%s(%d)) is already blocked on this channel",
67 h_simdata->sleeping[channel]->name,
68 h_simdata->sleeping[channel]->simdata->PID);
69 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
70 __MSG_process_block();
71 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
73 MSG_RETURN(MSG_HOST_FAILURE);
74 h_simdata->sleeping[channel] = NULL;
75 /* OK, we should both be ready now. Are you there ? */
78 t_simdata = t->simdata;
79 /* *task = __MSG_task_copy(t); */
85 t_simdata->comm = surf_workstation_resource->extension_public->
86 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
87 h->simdata->host, t_simdata->message_size,t_simdata->rate);
89 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
91 if(__MSG_process_isBlocked(t_simdata->sender))
92 __MSG_process_unblock(t_simdata->sender);
94 PAJE_PROCESS_PUSH_STATE(process,"C");
97 __MSG_task_wait_event(process, t);
98 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
99 } while (state==SURF_ACTION_RUNNING);
101 if(t->simdata->using>1) {
102 xbt_fifo_unshift(msg_global->process_to_run,process);
106 PAJE_PROCESS_POP_STATE(process);
107 PAJE_COMM_STOP(process,t,channel);
109 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
110 else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
112 MSG_RETURN(MSG_HOST_FAILURE);
113 else MSG_RETURN(MSG_TRANSFER_FAILURE);
116 /** \ingroup msg_gos_functions
117 * \brief Test whether there is a pending communication on a channel.
119 * It takes one parameter.
120 * \param channel the channel on which the agent should be
121 listening. This value has to be >=0 and < than the maximal
122 number of channels fixed with MSG_set_channel_number().
123 * \return 1 if there is a pending communication and 0 otherwise
125 int MSG_task_Iprobe(m_channel_t channel)
128 simdata_host_t h_simdata = NULL;
132 h_simdata = h->simdata;
133 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
136 /** \ingroup msg_gos_functions
137 * \brief Test whether there is a pending communication on a channel, and who sent it.
139 * It takes one parameter.
140 * \param channel the channel on which the agent should be
141 listening. This value has to be >=0 and < than the maximal
142 number of channels fixed with MSG_set_channel_number().
143 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
145 int MSG_task_probe_from(m_channel_t channel)
148 simdata_host_t h_simdata = NULL;
149 xbt_fifo_item_t item;
154 h_simdata = h->simdata;
156 item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
157 if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
160 return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
163 /** \ingroup msg_gos_functions
164 * \brief Put a task on a channel of an host and waits for the end of the
167 * This function is used for describing the behavior of an agent. It
168 * takes three parameter.
169 * \param task a #m_task_t to send on another location. This task
170 will not be usable anymore when the function will return. There is
171 no automatic task duplication and you have to save your parameters
172 before calling this function. Tasks are unique and once it has been
173 sent to another location, you should not access it anymore. You do
174 not need to call MSG_task_destroy() but to avoid using, as an
175 effect of inattention, this task anymore, you definitely should
176 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
177 can be transfered iff it has been correctly created with
179 * \param dest the destination of the message
180 * \param channel the channel on which the agent should put this
181 task. This value has to be >=0 and < than the maximal number of
182 channels fixed with MSG_set_channel_number().
183 * \return #MSG_FATAL if \a task is not properly initialized and
186 MSG_error_t MSG_task_put(m_task_t task,
187 m_host_t dest, m_channel_t channel)
189 m_process_t process = MSG_process_self();
190 simdata_task_t task_simdata = NULL;
191 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
192 m_host_t local_host = NULL;
193 m_host_t remote_host = NULL;
197 task_simdata = task->simdata;
198 task_simdata->sender = process;
199 xbt_assert0(task_simdata->using==1,"Gargl!");
200 task_simdata->comm = NULL;
202 local_host = ((simdata_process_t) process->simdata)->host;
205 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
206 mbox[channel], task);
208 PAJE_COMM_START(process,task,channel);
210 if(remote_host->simdata->sleeping[channel])
211 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
213 process->simdata->put_host = dest;
214 process->simdata->put_channel = channel;
215 while(!(task_simdata->comm))
216 __MSG_process_block();
217 process->simdata->put_host = NULL;
218 process->simdata->put_channel = -1;
221 PAJE_PROCESS_PUSH_STATE(process,"C");
223 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
224 while (state==SURF_ACTION_RUNNING) {
225 __MSG_task_wait_event(process, task);
226 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
229 MSG_task_destroy(task);
231 PAJE_PROCESS_POP_STATE(process);
233 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
234 else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
236 MSG_RETURN(MSG_HOST_FAILURE);
237 else MSG_RETURN(MSG_TRANSFER_FAILURE);
240 /** \ingroup msg_gos_functions
241 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
246 MSG_error_t MSG_task_put_bounded(m_task_t task,
247 m_host_t dest, m_channel_t channel,
248 long double max_rate)
250 task->simdata->rate=max_rate;
251 return(MSG_task_put(task, dest, channel));
252 task->simdata->rate=-1.0;
255 /** \ingroup msg_gos_functions
256 * \brief Executes a task and waits for its termination.
258 * This function is used for describing the behavior of an agent. It
259 * takes only one parameter.
260 * \param task a #m_task_t to execute on the location on which the
262 * \return #MSG_FATAL if \a task is not properly initialized and
265 MSG_error_t MSG_task_execute(m_task_t task)
267 m_process_t process = MSG_process_self();
269 __MSG_task_execute(process, task);
271 PAJE_PROCESS_PUSH_STATE(process,"E");
272 res = __MSG_wait_for_computation(process,task);
273 PAJE_PROCESS_POP_STATE(process);
277 void __MSG_task_execute(m_process_t process, m_task_t task)
279 simdata_task_t simdata = NULL;
283 simdata = task->simdata;
285 simdata->compute = surf_workstation_resource->extension_public->
286 execute(MSG_process_get_host(process)->simdata->host,
287 simdata->computation_amount);
288 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
291 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
293 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
294 simdata_task_t simdata = task->simdata;
298 __MSG_task_wait_event(process, task);
299 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
300 } while (state==SURF_ACTION_RUNNING);
304 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
305 else if(surf_workstation_resource->extension_public->
306 get_state(MSG_process_get_host(process)->simdata->host)
308 MSG_RETURN(MSG_HOST_FAILURE);
309 else MSG_RETURN(MSG_TRANSFER_FAILURE);
312 /** \ingroup msg_gos_functions
313 * \brief Sleep for the specified number of seconds
315 * Makes the current process sleep until \a time seconds have elapsed.
317 * \param nb_sec a number of second
319 MSG_error_t MSG_process_sleep(long double nb_sec)
321 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
322 m_process_t process = MSG_process_self();
323 m_task_t dummy = NULL;
324 simdata_task_t simdata = NULL;
327 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
328 simdata = dummy->simdata;
330 simdata->compute = surf_workstation_resource->extension_public->
331 sleep(MSG_process_get_host(process)->simdata->host,
332 simdata->computation_amount);
333 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
338 __MSG_task_wait_event(process, dummy);
339 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
340 } while (state==SURF_ACTION_RUNNING);
343 if(state == SURF_ACTION_DONE) {
344 if(surf_workstation_resource->extension_public->
345 get_state(MSG_process_get_host(process)->simdata->host)
347 MSG_RETURN(MSG_HOST_FAILURE);
349 if(__MSG_process_isBlocked(process)) {
350 __MSG_process_unblock(MSG_process_self());
352 if(surf_workstation_resource->extension_public->
353 get_state(MSG_process_get_host(process)->simdata->host)
355 MSG_RETURN(MSG_HOST_FAILURE);
356 MSG_task_destroy(dummy);
358 } else MSG_RETURN(MSG_HOST_FAILURE);
361 /** \ingroup msg_gos_functions
362 * \brief Return the number of MSG tasks currently running on a
363 * the host of the current running process.
365 int MSG_get_msgload(void)
368 xbt_assert0(0,"Not implemented yet!");
373 /** \ingroup msg_gos_functions
375 * \brief Return the the last value returned by a MSG function (except
378 MSG_error_t MSG_get_errno(void)
380 return PROCESS_GET_ERRNO();