From 334c91befa04ae9840e7ba3a4cca0fdeb6b32957 Mon Sep 17 00:00:00 2001 From: donassbr Date: Thu, 12 Jul 2007 13:57:50 +0000 Subject: [PATCH] Use a queue to control the selectable sockets. We don't need anymore the mutex and contidions on the transport structure. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3750 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/gras/Msg/sg_msg.c | 20 +++++------ src/gras/Transport/sg_transport.c | 22 +++++-------- src/gras/Transport/transport_interface.h | 14 +++----- src/gras/Transport/transport_plugin_sg.c | 42 +++++++++++------------- src/gras/Virtu/sg_process.c | 16 +++------ 5 files changed, 44 insertions(+), 70 deletions(-) diff --git a/src/gras/Msg/sg_msg.c b/src/gras/Msg/sg_msg.c index 2a9c84a7b6..0012b3e79b 100644 --- a/src/gras/Msg/sg_msg.c +++ b/src/gras/Msg/sg_msg.c @@ -44,10 +44,6 @@ void gras_msg_send_ext(gras_socket_t sock, xbt_assert1(!gras_socket_is_meas(sock), "Asked to send a message on the measurement socket %p", sock); - /* got the mutex my port */ - DEBUG1("Sock port %d",sock->port); - SIMIX_mutex_lock(sock_data->mutex); - /*initialize gras message */ msg = xbt_new(s_gras_msg_t,1); msg->expe = sock; @@ -73,17 +69,17 @@ void gras_msg_send_ext(gras_socket_t sock, whole_payload_size = gras_datadesc_copy(msgtype->ctn_type, payload, msg->payl); } - /* put message on msg_queue */ - msg_remote_proc = (gras_msg_procdata_t) - gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); - xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg); - - /* wake-up the receiver */ + + /* put the selectable socket on the queue */ trp_remote_proc = (gras_trp_procdata_t) gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); - xbt_fifo_push(trp_remote_proc->msg_selectable_sockets,sock); - SIMIX_cond_signal(trp_remote_proc->msg_select_cond); + xbt_queue_push(trp_remote_proc->msg_selectable_sockets,&sock); + + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t) + gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg); /* wait for the receiver */ SIMIX_cond_wait(sock_data->cond, sock_data->mutex); diff --git a/src/gras/Transport/sg_transport.c b/src/gras/Transport/sg_transport.c index 729920570b..50b7dfe2bf 100644 --- a/src/gras/Transport/sg_transport.c +++ b/src/gras/Transport/sg_transport.c @@ -33,29 +33,27 @@ gras_socket_t gras_trp_select(double timeout) { (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); gras_trp_sg_sock_data_t *sockdata; gras_trp_plugin_t trp; - gras_socket_t active_socket; + gras_socket_t active_socket = NULL; gras_trp_sg_sock_data_t *active_socket_data; gras_socket_t sock_iter; /* iterating over all sockets */ + xbt_ex_t e; int cursor; - DEBUG0("Trying to get the lock pd, trp_select"); - SIMIX_mutex_lock(pd->msg_select_mutex); DEBUG3("select on %s@%s with timeout=%f", SIMIX_process_get_name(SIMIX_process_self()), SIMIX_host_get_name(SIMIX_host_self()), timeout); - if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) { - /* message didn't arrive yet, wait */ - SIMIX_cond_wait_timeout(pd->msg_select_cond,pd->msg_select_mutex,timeout); - } - - if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) { + TRY { + xbt_queue_shift_timed(pd->msg_selectable_sockets, + &active_socket, timeout); + } CATCH(e) { + RETHROW; + } + if (active_socket == NULL) { DEBUG0("TIMEOUT"); - SIMIX_mutex_unlock(pd->msg_select_mutex); THROW0(timeout_error,0,"Timeout"); } - active_socket = xbt_fifo_shift(pd->msg_selectable_sockets); active_socket_data = (gras_trp_sg_sock_data_t*)active_socket->data; /* Ok, got something. Open a socket back to the expeditor */ @@ -72,7 +70,6 @@ gras_socket_t gras_trp_select(double timeout) { if ( (sock_data->to_socket == active_socket) && (sock_data->to_host == SIMIX_process_get_host(active_socket_data->from_process)) ) { - SIMIX_mutex_unlock(pd->msg_select_mutex); return sock_iter; } } @@ -121,7 +118,6 @@ gras_socket_t gras_trp_select(double timeout) { res->peer_port, SIMIX_process_get_name(sockdata->to_process), res->port); - SIMIX_mutex_unlock(pd->msg_select_mutex); return res; } diff --git a/src/gras/Transport/transport_interface.h b/src/gras/Transport/transport_interface.h index 63547d229a..fde430f6a4 100644 --- a/src/gras/Transport/transport_interface.h +++ b/src/gras/Transport/transport_interface.h @@ -14,6 +14,7 @@ #include "portable.h" /* sometimes needed for fd_set */ #include "simix/simix.h" +#include "xbt/queue.h" /*** *** Options @@ -109,16 +110,9 @@ typedef struct { /* SG only elements. In RL, they are part of the OS ;) */ /* List of sockets ready to be select()ed */ - xbt_fifo_t msg_selectable_sockets; /* regular sockets */ - xbt_fifo_t meas_selectable_sockets;/* measurement ones */ - - /* Synchronisation on msg_selectable_sockets */ - smx_cond_t msg_select_cond; - smx_mutex_t msg_select_mutex; - /* Synchronisation on meas_selectable_sockets */ - smx_cond_t meas_select_cond; - smx_mutex_t meas_select_mutex; - + xbt_queue_t msg_selectable_sockets; /* regular sockets */ + xbt_queue_t meas_selectable_sockets;/* measurement ones */ + } s_gras_trp_procdata_t,*gras_trp_procdata_t; /* Display the content of our socket set (debugging purpose) */ diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index b46e05e00d..86eb93462e 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -282,20 +282,17 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, msg->payl = NULL; } - /* put message on msg_queue */ - msg_remote_proc = (gras_msg_procdata_t) - gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process); - xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg); - - /* put his socket on the selectable socket list */ + /* put his socket on the selectable socket queue */ trp_remote_proc = (gras_trp_procdata_t) gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); + xbt_queue_push(trp_remote_proc->meas_selectable_sockets,&sock); - xbt_fifo_push(trp_remote_proc->meas_selectable_sockets,sock); + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t) + gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process); - /* wake-up the receiver */ - SIMIX_cond_signal(trp_remote_proc->meas_select_cond); + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg); /* wait for the receiver */ SIMIX_cond_wait(sock_data->cond,sock_data->mutex); @@ -325,29 +322,28 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock, unsigned long int size){ gras_trp_sg_sock_data_t *sock_data; gras_trp_sg_sock_data_t *remote_sock_data; - gras_socket_t remote_socket; + gras_socket_t remote_socket= NULL; gras_msg_t msg_got; gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); gras_trp_procdata_t trp_proc = (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); + xbt_ex_t e; xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); - - SIMIX_mutex_lock(trp_proc->meas_select_mutex); - if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) { - SIMIX_cond_wait_timeout(trp_proc->meas_select_cond, - trp_proc->meas_select_mutex,60); - } - if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) { - SIMIX_mutex_unlock(trp_proc->meas_select_mutex); + TRY { + xbt_queue_shift_timed(trp_proc->meas_selectable_sockets, + &remote_socket, 60); + } CATCH(e) { + RETHROW; + } + + if (remote_socket == NULL) { THROW0(timeout_error,0,"Timeout"); - } - SIMIX_mutex_unlock(trp_proc->meas_select_mutex); - - remote_socket = xbt_fifo_shift(trp_proc->meas_selectable_sockets); - remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data; + } + + remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data; msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas); sock_data = (gras_trp_sg_sock_data_t *)sock->data; diff --git a/src/gras/Virtu/sg_process.c b/src/gras/Virtu/sg_process.c index 48cd2732ec..af417a4699 100644 --- a/src/gras/Virtu/sg_process.c +++ b/src/gras/Virtu/sg_process.c @@ -60,13 +60,9 @@ gras_process_init() { } else pd->ppid = -1; - trp_pd->msg_selectable_sockets = xbt_fifo_new(); - trp_pd->msg_select_mutex = SIMIX_mutex_init(); - trp_pd->msg_select_cond = SIMIX_cond_init(); + trp_pd->msg_selectable_sockets = xbt_queue_new(1000,sizeof(gras_socket_t)); - trp_pd->meas_selectable_sockets = xbt_fifo_new(); - trp_pd->meas_select_mutex = SIMIX_mutex_init(); - trp_pd->meas_select_cond = SIMIX_cond_init(); + trp_pd->meas_selectable_sockets = xbt_queue_new(1000,sizeof(gras_socket_t)); VERB2("Creating process '%s' (%d)", SIMIX_process_get_name(SIMIX_process_self()), @@ -88,13 +84,9 @@ gras_process_exit() { gras_trp_procdata_t trp_pd= (gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); - SIMIX_mutex_destroy(trp_pd->msg_select_mutex); - SIMIX_cond_destroy(trp_pd->msg_select_cond); - xbt_fifo_free(trp_pd->msg_selectable_sockets); + xbt_queue_free(&trp_pd->msg_selectable_sockets); - SIMIX_mutex_destroy(trp_pd->meas_select_mutex); - SIMIX_cond_destroy(trp_pd->meas_select_cond); - xbt_fifo_free(trp_pd->meas_selectable_sockets); + xbt_queue_free(&trp_pd->meas_selectable_sockets); xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!"); -- 2.20.1