git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3750
48e7efb5-ca39-0410-a469-
dd3cf9ba447f
xbt_assert1(!gras_socket_is_meas(sock),
"Asked to send a message on the measurement socket %p", sock);
xbt_assert1(!gras_socket_is_meas(sock),
"Asked to send a message on the measurement socket %p", sock);
- /* got the mutex my port */
- DEBUG1("Sock port %d",sock->port);
- SIMIX_mutex_lock(sock_data->mutex);
-
/*initialize gras message */
msg = xbt_new(s_gras_msg_t,1);
msg->expe = sock;
/*initialize gras message */
msg = xbt_new(s_gras_msg_t,1);
msg->expe = sock;
whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,
payload, msg->payl);
}
whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,
payload, msg->payl);
}
- /* put message on msg_queue */
- msg_remote_proc = (gras_msg_procdata_t)
- gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
- xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
-
- /* wake-up the receiver */
+
+ /* put the selectable socket on the queue */
trp_remote_proc = (gras_trp_procdata_t)
gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
trp_remote_proc = (gras_trp_procdata_t)
gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
- xbt_fifo_push(trp_remote_proc->msg_selectable_sockets,sock);
- SIMIX_cond_signal(trp_remote_proc->msg_select_cond);
+ xbt_queue_push(trp_remote_proc->msg_selectable_sockets,&sock);
+
+ /* put message on msg_queue */
+ msg_remote_proc = (gras_msg_procdata_t)
+ gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
+ xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
/* wait for the receiver */
SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
/* wait for the receiver */
SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
(gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
gras_trp_sg_sock_data_t *sockdata;
gras_trp_plugin_t trp;
(gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
gras_trp_sg_sock_data_t *sockdata;
gras_trp_plugin_t trp;
- gras_socket_t active_socket;
+ gras_socket_t active_socket = NULL;
gras_trp_sg_sock_data_t *active_socket_data;
gras_socket_t sock_iter; /* iterating over all sockets */
gras_trp_sg_sock_data_t *active_socket_data;
gras_socket_t sock_iter; /* iterating over all sockets */
- DEBUG0("Trying to get the lock pd, trp_select");
- SIMIX_mutex_lock(pd->msg_select_mutex);
DEBUG3("select on %s@%s with timeout=%f",
SIMIX_process_get_name(SIMIX_process_self()),
SIMIX_host_get_name(SIMIX_host_self()),
timeout);
DEBUG3("select on %s@%s with timeout=%f",
SIMIX_process_get_name(SIMIX_process_self()),
SIMIX_host_get_name(SIMIX_host_self()),
timeout);
- if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) {
- /* message didn't arrive yet, wait */
- SIMIX_cond_wait_timeout(pd->msg_select_cond,pd->msg_select_mutex,timeout);
- }
-
- if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) {
+ TRY {
+ xbt_queue_shift_timed(pd->msg_selectable_sockets,
+ &active_socket, timeout);
+ } CATCH(e) {
+ RETHROW;
+ }
+ if (active_socket == NULL) {
- SIMIX_mutex_unlock(pd->msg_select_mutex);
THROW0(timeout_error,0,"Timeout");
}
THROW0(timeout_error,0,"Timeout");
}
- active_socket = xbt_fifo_shift(pd->msg_selectable_sockets);
active_socket_data = (gras_trp_sg_sock_data_t*)active_socket->data;
/* Ok, got something. Open a socket back to the expeditor */
active_socket_data = (gras_trp_sg_sock_data_t*)active_socket->data;
/* Ok, got something. Open a socket back to the expeditor */
if ( (sock_data->to_socket == active_socket) &&
(sock_data->to_host == SIMIX_process_get_host(active_socket_data->from_process)) ) {
if ( (sock_data->to_socket == active_socket) &&
(sock_data->to_host == SIMIX_process_get_host(active_socket_data->from_process)) ) {
- SIMIX_mutex_unlock(pd->msg_select_mutex);
res->peer_port,
SIMIX_process_get_name(sockdata->to_process), res->port);
res->peer_port,
SIMIX_process_get_name(sockdata->to_process), res->port);
- SIMIX_mutex_unlock(pd->msg_select_mutex);
#include "portable.h" /* sometimes needed for fd_set */
#include "simix/simix.h"
#include "portable.h" /* sometimes needed for fd_set */
#include "simix/simix.h"
/* SG only elements. In RL, they are part of the OS ;) */
/* List of sockets ready to be select()ed */
/* SG only elements. In RL, they are part of the OS ;) */
/* List of sockets ready to be select()ed */
- xbt_fifo_t msg_selectable_sockets; /* regular sockets */
- xbt_fifo_t meas_selectable_sockets;/* measurement ones */
-
- /* Synchronisation on msg_selectable_sockets */
- smx_cond_t msg_select_cond;
- smx_mutex_t msg_select_mutex;
- /* Synchronisation on meas_selectable_sockets */
- smx_cond_t meas_select_cond;
- smx_mutex_t meas_select_mutex;
-
+ xbt_queue_t msg_selectable_sockets; /* regular sockets */
+ xbt_queue_t meas_selectable_sockets;/* measurement ones */
+
} s_gras_trp_procdata_t,*gras_trp_procdata_t;
/* Display the content of our socket set (debugging purpose) */
} s_gras_trp_procdata_t,*gras_trp_procdata_t;
/* Display the content of our socket set (debugging purpose) */
- /* put message on msg_queue */
- msg_remote_proc = (gras_msg_procdata_t)
- gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
- xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg);
-
- /* put his socket on the selectable socket list */
+ /* put his socket on the selectable socket queue */
trp_remote_proc = (gras_trp_procdata_t)
gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
trp_remote_proc = (gras_trp_procdata_t)
gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
+ xbt_queue_push(trp_remote_proc->meas_selectable_sockets,&sock);
- xbt_fifo_push(trp_remote_proc->meas_selectable_sockets,sock);
+ /* put message on msg_queue */
+ msg_remote_proc = (gras_msg_procdata_t)
+ gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
- /* wake-up the receiver */
- SIMIX_cond_signal(trp_remote_proc->meas_select_cond);
+ xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg);
/* wait for the receiver */
SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
/* wait for the receiver */
SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
unsigned long int size){
gras_trp_sg_sock_data_t *sock_data;
gras_trp_sg_sock_data_t *remote_sock_data;
unsigned long int size){
gras_trp_sg_sock_data_t *sock_data;
gras_trp_sg_sock_data_t *remote_sock_data;
- gras_socket_t remote_socket;
+ gras_socket_t remote_socket= NULL;
gras_msg_t msg_got;
gras_msg_procdata_t msg_procdata =
(gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
gras_trp_procdata_t trp_proc =
(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
gras_msg_t msg_got;
gras_msg_procdata_t msg_procdata =
(gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
gras_trp_procdata_t trp_proc =
(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
xbt_assert0(sock->meas,
"SG chunk exchange shouldn't be used on non-measurement sockets");
xbt_assert0(sock->meas,
"SG chunk exchange shouldn't be used on non-measurement sockets");
-
- SIMIX_mutex_lock(trp_proc->meas_select_mutex);
- if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
- SIMIX_cond_wait_timeout(trp_proc->meas_select_cond,
- trp_proc->meas_select_mutex,60);
- }
- if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
- SIMIX_mutex_unlock(trp_proc->meas_select_mutex);
+ TRY {
+ xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
+ &remote_socket, 60);
+ } CATCH(e) {
+ RETHROW;
+ }
+
+ if (remote_socket == NULL) {
THROW0(timeout_error,0,"Timeout");
THROW0(timeout_error,0,"Timeout");
- }
- SIMIX_mutex_unlock(trp_proc->meas_select_mutex);
-
- remote_socket = xbt_fifo_shift(trp_proc->meas_selectable_sockets);
- remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data;
+ }
+
+ remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data;
msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
sock_data = (gras_trp_sg_sock_data_t *)sock->data;
msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
sock_data = (gras_trp_sg_sock_data_t *)sock->data;
- trp_pd->msg_selectable_sockets = xbt_fifo_new();
- trp_pd->msg_select_mutex = SIMIX_mutex_init();
- trp_pd->msg_select_cond = SIMIX_cond_init();
+ trp_pd->msg_selectable_sockets = xbt_queue_new(1000,sizeof(gras_socket_t));
- trp_pd->meas_selectable_sockets = xbt_fifo_new();
- trp_pd->meas_select_mutex = SIMIX_mutex_init();
- trp_pd->meas_select_cond = SIMIX_cond_init();
+ trp_pd->meas_selectable_sockets = xbt_queue_new(1000,sizeof(gras_socket_t));
VERB2("Creating process '%s' (%d)",
SIMIX_process_get_name(SIMIX_process_self()),
VERB2("Creating process '%s' (%d)",
SIMIX_process_get_name(SIMIX_process_self()),
gras_trp_procdata_t trp_pd=
(gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
gras_trp_procdata_t trp_pd=
(gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
- SIMIX_mutex_destroy(trp_pd->msg_select_mutex);
- SIMIX_cond_destroy(trp_pd->msg_select_cond);
- xbt_fifo_free(trp_pd->msg_selectable_sockets);
+ xbt_queue_free(&trp_pd->msg_selectable_sockets);
- SIMIX_mutex_destroy(trp_pd->meas_select_mutex);
- SIMIX_cond_destroy(trp_pd->meas_select_cond);
- xbt_fifo_free(trp_pd->meas_selectable_sockets);
+ xbt_queue_free(&trp_pd->meas_selectable_sockets);
xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!");
xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!");