3 /* Copyright (c) 2009 Cristian Rosa.
4 All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
12 /******************************************************************************/
13 /* Rendez-Vous Points */
14 /******************************************************************************/
17 * \brief Creates a new rendez-vous point
18 * \param name The name of the rendez-vous point
19 * \return The created rendez-vous point
21 smx_rvpoint_t SIMIX_rvpoint_create(char *name)
23 smx_rvpoint_t rvp = xbt_new0(s_smx_rvpoint_t, 1);
24 rvp->name = xbt_strdup(name);
25 rvp->read = SIMIX_mutex_init();
26 rvp->write = SIMIX_mutex_init();
27 rvp->comm_mutex = SIMIX_mutex_init();
28 rvp->comm_fifo = xbt_fifo_new();
34 * \brief Destroy a rendez-vous point
35 * \param name The rendez-vous point to destroy
37 void SIMIX_rvpoint_destroy(smx_rvpoint_t rvp)
40 SIMIX_mutex_destroy(rvp->read);
41 SIMIX_mutex_destroy(rvp->write);
42 SIMIX_mutex_destroy(rvp->comm_mutex);
43 xbt_fifo_free(rvp->comm_fifo);
48 * \brief Push a communication request into a rendez-vous point
49 * The communications request are dequeued by the two functions below
50 * \param rvp The rendez-vous point
51 * \param comm The communication request
53 static inline void SIMIX_rvpoint_push(smx_rvpoint_t rvp, smx_comm_t comm)
55 xbt_fifo_push(rvp->comm_fifo, comm);
59 * \brief Checks if there is a receive communication request queued in a rendez-vous
60 * \param rvp The rendez-vous with the queue
61 * \return The communication request if found, NULL otherwise.
63 smx_comm_t SIMIX_rvpoint_get_receiver(smx_rvpoint_t rvp)
65 /* Get a communication request from the rendez-vous queue. If it is a receive
66 request then return it, otherwise put it again in the queue and return NULL
68 smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo);
70 if(comm_head != NULL && comm_head->dst_host != NULL)
73 xbt_fifo_unshift(rvp->comm_fifo, comm_head);
78 * \brief Checks if there is a send communication request queued in a rendez-vous
79 * \param rvp The rendez-vous with the queue
80 * \return The communication request if found, NULL otherwise.
82 smx_comm_t SIMIX_rvpoint_get_sender(smx_rvpoint_t rvp)
84 /* Get a communication request from the rendez-vous queue. If it is a send
85 request then return it, otherwise put it again in the queue and return NULL
87 smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo);
89 if(comm_head != NULL && comm_head->src_host != NULL)
92 xbt_fifo_unshift(rvp->comm_fifo, comm_head);
97 * \brief Get the communication mutex of the rendez-vous point
98 * \param rvp The rendez-vous point
100 static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rvpoint_t rvp)
102 return rvp->comm_mutex;
105 /******************************************************************************/
106 /* Communication Requests */
107 /******************************************************************************/
110 * \brief Creates a new communication request
111 * \param sender The process starting the communication (by send)
112 * \param receiver The process receiving the communication (by recv)
113 * \return the communication request
115 smx_comm_t SIMIX_communication_new(smx_host_t src_host, smx_host_t dst_host,
118 /* alloc structures */
119 smx_comm_t comm = xbt_new0(s_smx_comm_t, 1);
120 comm->cond = SIMIX_cond_init();
122 /* initialize them */
123 comm->src_host = src_host;
124 comm->dst_host = dst_host;
131 * \brief Destroy a communication request
132 * \param comm The request to be destroyed
134 void SIMIX_communication_destroy(smx_comm_t comm)
137 if(comm->refcount == 0){
138 if(comm->act != NULL)
139 SIMIX_action_destroy(comm->act);
141 if(comm->data != NULL)
142 xbt_free(comm->data);
144 xbt_free(comm->cond);
150 * \brief Increase the number of users of the communication.
151 * \param comm The communication request
152 * Each communication request can be used by more than one process, so it is
153 * necessary to know number of them at destroy time, to avoid freeing stuff that
154 * maybe is in use by others.
157 static inline void SIMIX_communication_use(smx_comm_t comm)
163 * \brief Start the simulation of a communication request
164 * \param comm The communicatino request
166 static inline void SIMIX_communication_start(smx_comm_t comm)
168 comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL,
169 comm->data_size, comm->rate);
171 SIMIX_register_action_to_condition(comm->act, comm->cond);
172 __SIMIX_cond_wait(comm->cond);
173 SIMIX_unregister_action_to_condition(comm->act, comm->cond);
176 /******************************************************************************/
177 /* Synchronous Communication */
178 /******************************************************************************/
180 void SIMIX_network_send(smx_rvpoint_t rdv, void *data, size_t size, double timeout, double rate)
182 /*double start_time = SIMIX_get_clock();*/
184 smx_host_t my_host = SIMIX_host_self();
186 smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv);
188 /* Lock the rendez-vous point */
189 SIMIX_mutex_lock(rvp_comm_mutex);
190 /* Copy the message to the network */
191 /*FIXME here the MC should allocate the space from the network storage area */
192 smx_net_data = xbt_malloc(size);
193 memcpy(smx_net_data, data, size);
195 /* Check if there is already a receive waiting in the rendez-vous point */
196 if((comm = SIMIX_rvpoint_get_receiver(rdv)) != NULL){
197 comm->src_host = my_host;
198 comm->data = smx_net_data;
199 comm->data_size = size;
201 SIMIX_communication_use(comm);
203 /* Unlock the rendez-vous point and start the communication action.*/
204 /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
205 SIMIX_mutex_unlock(rvp_comm_mutex);
206 SIMIX_communication_start(comm);
208 /* Nobody is at the rendez-vous point, so push the comm action into it */
210 comm = SIMIX_communication_new(my_host, NULL, rdv);
211 comm->data = smx_net_data;
212 comm->data_size = size;
214 SIMIX_communication_use(comm);
215 SIMIX_rvpoint_push(rdv, comm);
217 /* Wait for communication completion */
218 /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
219 /* FIXME: add timeout checking stuff */
220 SIMIX_mutex_unlock (rvp_comm_mutex);
221 __SIMIX_cond_wait(comm->cond);
224 /* Check for errors */
225 if (SIMIX_host_get_state(comm->dst_host) == 0){
226 THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name);
227 }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
228 THROW0(network_error, 0, "Link failure");
231 SIMIX_communication_destroy(comm);
235 void SIMIX_network_recv(smx_rvpoint_t rdv, void **data, size_t *size, double timeout)
237 /*double start_time = SIMIX_get_clock();*/
239 smx_host_t my_host = SIMIX_host_self();
240 smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv);
242 /* Lock the rendez-vous point */
243 SIMIX_mutex_lock(rvp_comm_mutex);
245 /* Check if there is already a send waiting in the rendez-vous point */
246 if((comm = SIMIX_rvpoint_get_sender(rdv)) != NULL){
247 comm->dst_host = my_host;
248 comm->dest_buff = data;
249 SIMIX_communication_use(comm);
251 /* Unlock the rendez-vous point and start the communication action.*/
252 /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
253 SIMIX_mutex_unlock(rvp_comm_mutex);
254 SIMIX_communication_start(comm);
256 /* Nobody is at the rendez-vous point, so push the comm action into it */
258 comm = SIMIX_communication_new(NULL, my_host, rdv);
259 comm->dest_buff = data;
260 SIMIX_communication_use(comm);
261 SIMIX_rvpoint_push(rdv, comm);
263 /* Wait for communication completion */
264 /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
265 /* FIXME: add timeout checking stuff*/
266 SIMIX_mutex_unlock (rvp_comm_mutex);
267 __SIMIX_cond_wait(comm->cond);
270 /* Check for errors */
271 if (SIMIX_host_get_state(comm->src_host) == 0){
272 THROW1(host_error, 0, "Source host %s failed", comm->src_host->name);
273 }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
274 THROW0(network_error, 0, "Link failure");
277 /* We are OK, let's copy the message to receiver's buffer */
278 *size = *size < comm->data_size ? *size : comm->data_size;
279 memcpy(*data, comm->data, *size);
281 SIMIX_communication_destroy(comm);
285 /******************************************************************************/
286 /* Asynchronous Communication */
287 /******************************************************************************/
290 void SIMIX_network_wait(smx_action_t comm, double timeout)
293 SIMIX_cond_wait_timeout(rvp_cond, rvp_comm_mutex, timeout - start_time);
295 SIMIX_cond_wait(rvp_cond, rvp_comm_mutex);
300 XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm)
302 if(SIMIX_action_get_state (comm) == SURF_ACTION_DONE){
305 return SIMIX_action_get_state (comm) == SURF_ACTION_DONE ? TRUE : FALSE;