3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \ingroup msg_gos_functions
15 * \brief This function is now deprecated and useless. Please stop using it.
17 MSG_error_t MSG_process_start(m_process_t process)
19 xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
24 /** \ingroup msg_gos_functions
25 * \brief Listen on a channel and wait for receiving a task.
27 * It takes two parameter.
28 * \param task a memory location for storing a #m_task_t. It will
29 hold a task when this function will return. Thus \a task should not
30 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
31 those two condition does not hold, there will be a warning message.
32 * \param channel the channel on which the agent should be
33 listening. This value has to be >=0 and < than the maximal
34 number of channels fixed with MSG_set_channel_number().
35 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
36 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
38 MSG_error_t MSG_task_get(m_task_t * task,
41 m_process_t process = MSG_process_self();
44 simdata_task_t t_simdata = NULL;
45 simdata_host_t h_simdata = NULL;
47 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
51 xbt_assert0(task,"Null pointer for the task\n");
54 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
58 h_simdata = h->simdata;
59 while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
60 xbt_assert2(!(h_simdata->sleeping[channel]),
61 "A process (%s(%d)) is already blocked on this channel",
62 h_simdata->sleeping[channel]->name,
63 h_simdata->sleeping[channel]->simdata->PID);
64 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
65 __MSG_process_block();
66 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
68 MSG_RETURN(MSG_HOST_FAILURE);
69 h_simdata->sleeping[channel] = NULL;
70 /* OK, we should both be ready now. Are you there ? */
73 t_simdata = t->simdata;
74 /* *task = __MSG_task_copy(t); */
80 t_simdata->comm = surf_workstation_resource->extension_public->
81 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
82 h->simdata->host, t_simdata->message_size,t_simdata->rate);
84 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
86 if(__MSG_process_isBlocked(t_simdata->sender))
87 __MSG_process_unblock(t_simdata->sender);
89 // PAJE_PROCESS_STATE(process,"C");
90 PAJE_PROCESS_PUSH_STATE(process,"C");
93 __MSG_task_wait_event(process, t);
94 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
95 } while (state==SURF_ACTION_RUNNING);
97 if(t->simdata->using>1) {
98 xbt_fifo_unshift(msg_global->process_to_run,process);
102 PAJE_PROCESS_POP_STATE(process);
103 PAJE_COMM_STOP(process,t,channel);
105 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
106 else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
108 MSG_RETURN(MSG_HOST_FAILURE);
109 else MSG_RETURN(MSG_TRANSFER_FAILURE);
112 /** \ingroup msg_gos_functions
113 * \brief Test whether there is a pending communication on a channel.
115 * It takes one parameter.
116 * \param channel the channel on which the agent should be
117 listening. This value has to be >=0 and < than the maximal
118 number of channels fixed with MSG_set_channel_number().
119 * \return 1 if there is a pending communication and 0 otherwise
121 int MSG_task_Iprobe(m_channel_t channel)
124 simdata_host_t h_simdata = NULL;
128 h_simdata = h->simdata;
129 return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
132 /** \ingroup msg_gos_functions
133 * \brief Test whether there is a pending communication on a channel, and who sent it.
135 * It takes one parameter.
136 * \param channel the channel on which the agent should be
137 listening. This value has to be >=0 and < than the maximal
138 number of channels fixed with MSG_set_channel_number().
139 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
141 int MSG_task_probe_from(m_channel_t channel)
144 simdata_host_t h_simdata = NULL;
145 xbt_fifo_item_t item;
150 h_simdata = h->simdata;
152 item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
153 if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
156 return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
159 /** \ingroup msg_gos_functions
160 * \brief Put a task on a channel of an host and waits for the end of the
163 * This function is used for describing the behavior of an agent. It
164 * takes three parameter.
165 * \param task a #m_task_t to send on another location. This task
166 will not be usable anymore when the function will return. There is
167 no automatic task duplication and you have to save your parameters
168 before calling this function. Tasks are unique and once it has been
169 sent to another location, you should not access it anymore. You do
170 not need to call MSG_task_destroy() but to avoid using, as an
171 effect of inattention, this task anymore, you definitely should
172 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
173 can be transfered iff it has been correctly created with
175 * \param dest the destination of the message
176 * \param channel the channel on which the agent should put this
177 task. This value has to be >=0 and < than the maximal number of
178 channels fixed with MSG_set_channel_number().
179 * \return #MSG_FATAL if \a task is not properly initialized and
182 MSG_error_t MSG_task_put(m_task_t task,
183 m_host_t dest, m_channel_t channel)
185 m_process_t process = MSG_process_self();
186 simdata_task_t task_simdata = NULL;
187 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
188 m_host_t local_host = NULL;
189 m_host_t remote_host = NULL;
193 task_simdata = task->simdata;
194 task_simdata->sender = process;
195 xbt_assert0(task_simdata->using==1,"Gargl!");
196 task_simdata->comm = NULL;
198 local_host = ((simdata_process_t) process->simdata)->host;
201 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
202 mbox[channel], task);
204 PAJE_COMM_START(process,task,channel);
206 if(remote_host->simdata->sleeping[channel])
207 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
209 process->simdata->put_host = dest;
210 process->simdata->put_channel = channel;
211 while(!(task_simdata->comm))
212 __MSG_process_block();
213 process->simdata->put_host = NULL;
214 process->simdata->put_channel = -1;
217 // PAJE_PROCESS_STATE(process,"C");
218 PAJE_PROCESS_PUSH_STATE(process,"C");
220 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
221 while (state==SURF_ACTION_RUNNING) {
222 __MSG_task_wait_event(process, task);
223 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
226 MSG_task_destroy(task);
228 PAJE_PROCESS_POP_STATE(process);
230 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
231 else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
233 MSG_RETURN(MSG_HOST_FAILURE);
234 else MSG_RETURN(MSG_TRANSFER_FAILURE);
237 /** \ingroup msg_gos_functions
238 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
243 MSG_error_t MSG_task_put_bounded(m_task_t task,
244 m_host_t dest, m_channel_t channel,
245 long double max_rate)
247 task->simdata->rate=max_rate;
248 return(MSG_task_put(task, dest, channel));
249 task->simdata->rate=-1.0;
252 /** \ingroup msg_gos_functions
253 * \brief Executes a task and waits for its termination.
255 * This function is used for describing the behavior of an agent. It
256 * takes only one parameter.
257 * \param task a #m_task_t to execute on the location on which the
259 * \return #MSG_FATAL if \a task is not properly initialized and
262 MSG_error_t MSG_task_execute(m_task_t task)
264 m_process_t process = MSG_process_self();
266 __MSG_task_execute(process, task);
268 // PAJE_PROCESS_STATE(process,"E");
269 PAJE_PROCESS_PUSH_STATE(process,"E");
270 res = __MSG_wait_for_computation(process,task);
271 PAJE_PROCESS_POP_STATE(process);
275 void __MSG_task_execute(m_process_t process, m_task_t task)
277 simdata_task_t simdata = NULL;
281 simdata = task->simdata;
283 simdata->compute = surf_workstation_resource->extension_public->
284 execute(MSG_process_get_host(process)->simdata->host,
285 simdata->computation_amount);
286 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
289 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
291 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
292 simdata_task_t simdata = task->simdata;
296 __MSG_task_wait_event(process, task);
297 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
298 } while (state==SURF_ACTION_RUNNING);
302 if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
303 else if(surf_workstation_resource->extension_public->
304 get_state(MSG_process_get_host(process)->simdata->host)
306 MSG_RETURN(MSG_HOST_FAILURE);
307 else MSG_RETURN(MSG_TRANSFER_FAILURE);
310 /** \ingroup msg_gos_functions
311 * \brief Sleep for the specified number of seconds
313 * Makes the current process sleep until \a time seconds have elapsed.
315 * \param nb_sec a number of second
317 MSG_error_t MSG_process_sleep(long double nb_sec)
319 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
320 m_process_t process = MSG_process_self();
321 m_task_t dummy = NULL;
322 simdata_task_t simdata = NULL;
325 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
326 simdata = dummy->simdata;
328 simdata->compute = surf_workstation_resource->extension_public->
329 sleep(MSG_process_get_host(process)->simdata->host,
330 simdata->computation_amount);
331 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
336 __MSG_task_wait_event(process, dummy);
337 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
338 } while (state==SURF_ACTION_RUNNING);
341 if(state == SURF_ACTION_DONE) {
342 if(surf_workstation_resource->extension_public->
343 get_state(MSG_process_get_host(process)->simdata->host)
345 MSG_RETURN(MSG_HOST_FAILURE);
347 if(__MSG_process_isBlocked(process)) {
348 __MSG_process_unblock(MSG_process_self());
350 if(surf_workstation_resource->extension_public->
351 get_state(MSG_process_get_host(process)->simdata->host)
353 MSG_RETURN(MSG_HOST_FAILURE);
354 MSG_task_destroy(dummy);
356 } else MSG_RETURN(MSG_HOST_FAILURE);
359 /** \ingroup msg_gos_functions
360 * \brief Return the number of MSG tasks currently running on a
361 * the host of the current running process.
363 int MSG_get_msgload(void)
366 xbt_assert0(0,"Not implemented yet!");
371 /** \ingroup msg_gos_functions
373 * \brief Return the the last value returned by a MSG function (except
376 MSG_error_t MSG_get_errno(void)
378 return PROCESS_GET_ERRNO();