From 1dc69874c312d5cc9fff17769e2a97d42b34f592 Mon Sep 17 00:00:00 2001 From: donassbr Date: Thu, 5 Jul 2007 12:04:38 +0000 Subject: [PATCH] It was more difficult to find the bug that I thought, but it seems working now. The AMOK test works. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3660 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/gras/Msg/msg.c | 2 + src/gras/Msg/msg_interface.h | 1 + src/gras/Transport/transport_interface.h | 3 + src/gras/Transport/transport_plugin_sg.c | 89 ++++++++---------------- src/gras/Virtu/sg_process.c | 10 ++- 5 files changed, 43 insertions(+), 62 deletions(-) diff --git a/src/gras/Msg/msg.c b/src/gras/Msg/msg.c index 19adf4341f..b24b4c505f 100644 --- a/src/gras/Msg/msg.c +++ b/src/gras/Msg/msg.c @@ -40,6 +40,7 @@ static void *gras_msg_procdata_new() { res->cbl_list = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free); res->timers = xbt_dynar_new(sizeof(s_gras_timer_t), NULL); res->msg_to_receive_queue = xbt_fifo_new(); + res->msg_to_receive_queue_meas = xbt_fifo_new(); return (void*)res; } @@ -55,6 +56,7 @@ static void gras_msg_procdata_free(void *data) { xbt_dynar_free(&( res->cbl_list )); xbt_dynar_free(&( res->timers )); xbt_fifo_free( res->msg_to_receive_queue ); + xbt_fifo_free( res->msg_to_receive_queue_meas ); free(res->name); free(res); diff --git a/src/gras/Msg/msg_interface.h b/src/gras/Msg/msg_interface.h index 2cb52fe0d3..9c53b396ab 100644 --- a/src/gras/Msg/msg_interface.h +++ b/src/gras/Msg/msg_interface.h @@ -40,6 +40,7 @@ typedef struct { /* queue storing the msgs that have to received and the process synchronization made (wait the surf action done) */ xbt_fifo_t msg_to_receive_queue; /* elm type: s_gras_msg_t */ + xbt_fifo_t msg_to_receive_queue_meas; /* elm type: s_gras_msg_t */ } s_gras_msg_procdata_t,*gras_msg_procdata_t; diff --git a/src/gras/Transport/transport_interface.h b/src/gras/Transport/transport_interface.h index eb6064b8ee..379e5bca0a 100644 --- a/src/gras/Transport/transport_interface.h +++ b/src/gras/Transport/transport_interface.h @@ -105,7 +105,10 @@ typedef struct { /* SG only elements. In RL, they are part of the OS ;) */ smx_cond_t cond; smx_mutex_t mutex; + smx_cond_t cond_meas; + smx_mutex_t mutex_meas; xbt_fifo_t active_socket; + xbt_fifo_t active_socket_meas; } s_gras_trp_procdata_t,*gras_trp_procdata_t; diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index 32134a995e..ee6de67289 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -259,21 +259,15 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, smx_action_t act; /* simix action */ gras_trp_sg_sock_data_t *sock_data; - gras_trp_sg_sock_data_t *remote_sock_data; - gras_hostdata_t *hd; - gras_hostdata_t *remote_hd; gras_trp_procdata_t trp_remote_proc; gras_msg_procdata_t msg_remote_proc; gras_msg_t msg; /* message to send */ sock_data = (gras_trp_sg_sock_data_t *)sock->data; - remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data; - hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); - remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); - + SIMIX_mutex_lock(sock_data->mutex); sprintf(name,"Chunk[%d]",count++); /*initialize gras message */ msg = xbt_new(s_gras_msg_t,1); @@ -289,17 +283,16 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, /* put message on msg_queue */ msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); - xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg); - - /* wake-up the receiver */ + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg); + /* put his socket on the active_socket list */ trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); - SIMIX_cond_signal(trp_remote_proc->cond); + xbt_fifo_push(trp_remote_proc->active_socket_meas,sock); + /* wake-up the receiver */ + + SIMIX_cond_signal(trp_remote_proc->cond_meas); - SIMIX_mutex_lock(remote_sock_data->mutex); - //SIMIX_mutex_lock(remote_hd->mutex_port[sock->peer_port]); /* wait for the receiver */ - SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); - //SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]); + SIMIX_cond_wait(sock_data->cond,sock_data->mutex); /* creates simix action and waits its ends, waits in the sender host condition*/ DEBUG5("send chunk %s from %s to %s:%d (size=%ld)", @@ -307,68 +300,52 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size); act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, name, size, -1); - /* - SIMIX_register_action_to_condition(act,remote_hd->cond_port[sock->peer_port]); - SIMIX_register_condition_to_action(act,remote_hd->cond_port[sock->peer_port]); - */ - SIMIX_register_action_to_condition(act,remote_sock_data->cond); - SIMIX_register_condition_to_action(act,remote_sock_data->cond); - - SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process), - - SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); - //SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]); + SIMIX_register_action_to_condition(act,sock_data->cond); + SIMIX_register_condition_to_action(act,sock_data->cond); + + SIMIX_cond_wait(sock_data->cond,sock_data->mutex); /* error treatmeant */ /* cleanup structures */ SIMIX_action_destroy(act); - SIMIX_mutex_unlock(remote_sock_data->mutex); - //SIMIX_mutex_unlock(remote_hd->mutex_port[sock->peer_port]); - SIMIX_cond_signal(remote_sock_data->cond); - //SIMIX_cond_signal(remote_hd->cond_port[sock->peer_port]); + SIMIX_mutex_unlock(sock_data->mutex); } int gras_trp_sg_chunk_recv(gras_socket_t sock, char *data, unsigned long int size){ - gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); gras_trp_sg_sock_data_t *sock_data; - gras_hostdata_t *remote_hd; - gras_hostdata_t *local_hd; + gras_trp_sg_sock_data_t *remote_sock_data; + gras_socket_t remote_socket; gras_msg_t msg_got; gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); + gras_trp_procdata_t trp_proc=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); - SIMIX_mutex_lock(pd->mutex); - if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { - SIMIX_cond_wait_timeout(pd->cond,pd->mutex,60); + SIMIX_mutex_lock(trp_proc->mutex_meas); + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) { + SIMIX_cond_wait_timeout(trp_proc->cond_meas,trp_proc->mutex_meas,60); } - if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) { + SIMIX_mutex_unlock(trp_proc->mutex_meas); THROW0(timeout_error,0,"Timeout"); } - SIMIX_mutex_unlock(pd->mutex); - - sock_data = (gras_trp_sg_sock_data_t *)sock->data; - DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port); - remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); - local_hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + SIMIX_mutex_unlock(trp_proc->mutex_meas); + remote_socket = xbt_fifo_shift(trp_proc->active_socket_meas); + remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data; + msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas); + sock_data = (gras_trp_sg_sock_data_t *)sock->data; - msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue); - - SIMIX_mutex_lock(sock_data->mutex); - //SIMIX_mutex_lock(local_hd->mutex_port[sock->port]); /* ok, I'm here, you can continue the communication */ - SIMIX_cond_signal(sock_data->cond); - //SIMIX_cond_signal(local_hd->cond_port[sock->port]); + SIMIX_cond_signal(remote_sock_data->cond); + SIMIX_mutex_lock(remote_sock_data->mutex); /* wait for communication end */ - SIMIX_cond_wait(sock_data->cond,sock_data->mutex); - //SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]); - + SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); if (msg_got->payl_size != size) THROW5(mismatch_error,0, @@ -382,13 +359,7 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock, if (msg_got->payl) xbt_free(msg_got->payl); xbt_free(msg_got); - SIMIX_cond_wait(sock_data->cond,sock_data->mutex); - SIMIX_mutex_unlock(sock_data->mutex); - //SIMIX_mutex_unlock(local_hd->mutex_port[sock->port]); - /* - SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]); - SIMIX_mutex_unlock(local_hd->mutex_port[sock->port]); - */ + SIMIX_mutex_unlock(remote_sock_data->mutex); return 0; } diff --git a/src/gras/Virtu/sg_process.c b/src/gras/Virtu/sg_process.c index ff44d661e5..77951a53a9 100644 --- a/src/gras/Virtu/sg_process.c +++ b/src/gras/Virtu/sg_process.c @@ -48,7 +48,10 @@ gras_process_init() { trp_pd->mutex = SIMIX_mutex_init(); trp_pd->cond = SIMIX_cond_init(); + trp_pd->mutex_meas = SIMIX_mutex_init(); + trp_pd->cond_meas = SIMIX_cond_init(); trp_pd->active_socket = xbt_fifo_new(); + trp_pd->active_socket_meas = xbt_fifo_new(); VERB2("Creating process '%s' (%d)", SIMIX_process_get_name(SIMIX_process_self()), @@ -69,9 +72,10 @@ gras_process_exit() { SIMIX_mutex_destroy(trp_pd->mutex); SIMIX_cond_destroy(trp_pd->cond); xbt_fifo_free(trp_pd->active_socket); - //int myPID=gras_os_getpid(); - //int cpt; - //gras_sg_portrec_t pr; + SIMIX_mutex_destroy(trp_pd->mutex_meas); + SIMIX_cond_destroy(trp_pd->cond_meas); + xbt_fifo_free(trp_pd->active_socket_meas); + xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!"); -- 2.20.1