3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15 * \brief This section describes the functions that can be used
16 * by an agent for handling some task.
19 /** \ingroup msg_gos_functions
20 * \brief This function is now deprecated and useless. Please stop using it.
22 MSG_error_t MSG_process_start(m_process_t process)
24 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
29 /** \ingroup msg_gos_functions
30 * \brief Listen on a channel and wait for receiving a task.
32 * It takes two parameter.
33 * \param task a memory location for storing a #m_task_t. It will
34 hold a task when this function will return. Thus \a task should not
35 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
36 those two condition does not hold, there will be a warning message.
37 * \param channel the channel on which the agent should be
38 listening. This value has to be >=0 and < than the maximal
39 number of channels fixed with MSG_set_channel_number().
40 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
41 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
43 MSG_error_t MSG_task_get(m_task_t * task,
46 m_process_t process = MSG_process_self();
49 simdata_task_t t_simdata = NULL;
50 simdata_host_t h_simdata = NULL;
52 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
56 xbt_assert0(task,"Null pointer for the task\n");
59 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
63 h_simdata = h->simdata;
65 DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
67 while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
68 xbt_assert2(!(h_simdata->sleeping[channel]),
69 "A process (%s(%d)) is already blocked on this channel",
70 h_simdata->sleeping[channel]->name,
71 h_simdata->sleeping[channel]->simdata->PID);
72 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
73 __MSG_process_block();
74 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
76 MSG_RETURN(MSG_HOST_FAILURE);
77 h_simdata->sleeping[channel] = NULL;
78 /* OK, we should both be ready now. Are you there ? */
81 t_simdata = t->simdata;
82 /* *task = __MSG_task_copy(t); */
88 t_simdata->comm = surf_workstation_resource->extension_public->
89 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
90 h->simdata->host, t_simdata->message_size,t_simdata->rate);
92 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
94 if(__MSG_process_isBlocked(t_simdata->sender))
95 __MSG_process_unblock(t_simdata->sender);
97 PAJE_PROCESS_PUSH_STATE(process,"C");
100 __MSG_task_wait_event(process, t);
101 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
102 } while (state==SURF_ACTION_RUNNING);
104 if(t->simdata->using>1) {
105 xbt_fifo_unshift(msg_global->process_to_run,process);
109 PAJE_PROCESS_POP_STATE(process);
110 PAJE_COMM_STOP(process,t,channel);
112 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
113 else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
115 MSG_RETURN(MSG_HOST_FAILURE);
116 else MSG_RETURN(MSG_TRANSFER_FAILURE);
119 /** \ingroup msg_gos_functions
120 * \brief Test whether there is a pending communication on a channel.
122 * It takes one parameter.
123 * \param channel the channel on which the agent should be
124 listening. This value has to be >=0 and < than the maximal
125 number of channels fixed with MSG_set_channel_number().
126 * \return 1 if there is a pending communication and 0 otherwise
128 int MSG_task_Iprobe(m_channel_t channel)
131 simdata_host_t h_simdata = NULL;
133 DEBUG2("Probing on channel %d (%s)", channel,h->name);
136 h_simdata = h->simdata;
137 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
140 /** \ingroup msg_gos_functions
141 * \brief Test whether there is a pending communication on a channel, and who sent it.
143 * It takes one parameter.
144 * \param channel the channel on which the agent should be
145 listening. This value has to be >=0 and < than the maximal
146 number of channels fixed with MSG_set_channel_number().
147 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
149 int MSG_task_probe_from(m_channel_t channel)
152 simdata_host_t h_simdata = NULL;
153 xbt_fifo_item_t item;
158 h_simdata = h->simdata;
160 DEBUG2("Probing on channel %d (%s)", channel,h->name);
162 item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
163 if (!item || !(t = xbt_fifo_get_item_content(item)))
166 return MSG_process_get_PID(t->simdata->sender);
169 /** \ingroup msg_gos_functions
170 * \brief Put a task on a channel of an host and waits for the end of the
173 * This function is used for describing the behavior of an agent. It
174 * takes three parameter.
175 * \param task a #m_task_t to send on another location. This task
176 will not be usable anymore when the function will return. There is
177 no automatic task duplication and you have to save your parameters
178 before calling this function. Tasks are unique and once it has been
179 sent to another location, you should not access it anymore. You do
180 not need to call MSG_task_destroy() but to avoid using, as an
181 effect of inattention, this task anymore, you definitely should
182 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
183 can be transfered iff it has been correctly created with
185 * \param dest the destination of the message
186 * \param channel the channel on which the agent should put this
187 task. This value has to be >=0 and < than the maximal number of
188 channels fixed with MSG_set_channel_number().
189 * \return #MSG_FATAL if \a task is not properly initialized and
192 MSG_error_t MSG_task_put(m_task_t task,
193 m_host_t dest, m_channel_t channel)
195 m_process_t process = MSG_process_self();
196 simdata_task_t task_simdata = NULL;
197 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
198 m_host_t local_host = NULL;
199 m_host_t remote_host = NULL;
203 task_simdata = task->simdata;
204 task_simdata->sender = process;
205 xbt_assert0(task_simdata->using==1,"Gargl!");
206 task_simdata->comm = NULL;
208 local_host = ((simdata_process_t) process->simdata)->host;
211 DEBUG4("Trying to send a task (%lg Mb) from %s to %s on channel %d",
212 task->simdata->message_size,local_host->name, remote_host->name, channel);
214 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
215 mbox[channel], task);
217 PAJE_COMM_START(process,task,channel);
219 if(remote_host->simdata->sleeping[channel])
220 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
222 process->simdata->put_host = dest;
223 process->simdata->put_channel = channel;
224 while(!(task_simdata->comm))
225 __MSG_process_block();
226 process->simdata->put_host = NULL;
227 process->simdata->put_channel = -1;
230 PAJE_PROCESS_PUSH_STATE(process,"C");
232 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
233 while (state==SURF_ACTION_RUNNING) {
234 __MSG_task_wait_event(process, task);
235 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
238 MSG_task_destroy(task);
240 PAJE_PROCESS_POP_STATE(process);
242 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
243 else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
245 MSG_RETURN(MSG_HOST_FAILURE);
246 else MSG_RETURN(MSG_TRANSFER_FAILURE);
249 /** \ingroup msg_gos_functions
250 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
255 MSG_error_t MSG_task_put_bounded(m_task_t task,
256 m_host_t dest, m_channel_t channel,
259 task->simdata->rate=max_rate;
260 return(MSG_task_put(task, dest, channel));
261 task->simdata->rate=-1.0;
264 /** \ingroup msg_gos_functions
265 * \brief Executes a task and waits for its termination.
267 * This function is used for describing the behavior of an agent. It
268 * takes only one parameter.
269 * \param task a #m_task_t to execute on the location on which the
271 * \return #MSG_FATAL if \a task is not properly initialized and
274 MSG_error_t MSG_task_execute(m_task_t task)
276 m_process_t process = MSG_process_self();
279 DEBUG1("Computing on %s", process->simdata->host->name);
281 __MSG_task_execute(process, task);
283 PAJE_PROCESS_PUSH_STATE(process,"E");
284 res = __MSG_wait_for_computation(process,task);
285 PAJE_PROCESS_POP_STATE(process);
289 void __MSG_task_execute(m_process_t process, m_task_t task)
291 simdata_task_t simdata = NULL;
295 simdata = task->simdata;
297 simdata->compute = surf_workstation_resource->extension_public->
298 execute(MSG_process_get_host(process)->simdata->host,
299 simdata->computation_amount);
300 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
303 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
305 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
306 simdata_task_t simdata = task->simdata;
310 __MSG_task_wait_event(process, task);
311 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
312 } while (state==SURF_ACTION_RUNNING);
316 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
317 else if(surf_workstation_resource->extension_public->
318 get_state(MSG_process_get_host(process)->simdata->host)
320 MSG_RETURN(MSG_HOST_FAILURE);
321 else MSG_RETURN(MSG_TRANSFER_FAILURE);
324 /** \ingroup msg_gos_functions
325 * \brief Sleep for the specified number of seconds
327 * Makes the current process sleep until \a time seconds have elapsed.
329 * \param nb_sec a number of second
331 MSG_error_t MSG_process_sleep(double nb_sec)
333 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
334 m_process_t process = MSG_process_self();
335 m_task_t dummy = NULL;
336 simdata_task_t simdata = NULL;
339 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
340 simdata = dummy->simdata;
342 simdata->compute = surf_workstation_resource->extension_public->
343 sleep(MSG_process_get_host(process)->simdata->host,
344 simdata->computation_amount);
345 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
350 __MSG_task_wait_event(process, dummy);
351 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
352 } while (state==SURF_ACTION_RUNNING);
355 if(state == SURF_ACTION_DONE) {
356 if(surf_workstation_resource->extension_public->
357 get_state(MSG_process_get_host(process)->simdata->host)
359 MSG_RETURN(MSG_HOST_FAILURE);
361 if(__MSG_process_isBlocked(process)) {
362 __MSG_process_unblock(MSG_process_self());
364 if(surf_workstation_resource->extension_public->
365 get_state(MSG_process_get_host(process)->simdata->host)
367 MSG_RETURN(MSG_HOST_FAILURE);
368 MSG_task_destroy(dummy);
370 } else MSG_RETURN(MSG_HOST_FAILURE);
373 /** \ingroup msg_gos_functions
374 * \brief Return the number of MSG tasks currently running on a
375 * the host of the current running process.
377 int MSG_get_msgload(void)
380 xbt_assert0(0,"Not implemented yet!");
385 /** \ingroup msg_gos_functions
387 * \brief Return the the last value returned by a MSG function (except
390 MSG_error_t MSG_get_errno(void)
392 return PROCESS_GET_ERRNO();