From 325e11b2f4756b1912369af33486b0b6513cf315 Mon Sep 17 00:00:00 2001 From: donassbr Date: Thu, 26 Apr 2007 08:39:17 +0000 Subject: [PATCH] A lot of changes made. Testing phase git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3448 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/gras_simix/Msg/gras_simix_msg.c | 10 +- src/gras_simix/Msg/gras_simix_msg_interface.h | 3 + src/gras_simix/Msg/gras_simix_msg_private.h | 2 +- src/gras_simix/Msg/gras_simix_rl_msg.c | 6 +- src/gras_simix/Msg/gras_simix_rpc.c | 2 +- src/gras_simix/Msg/gras_simix_sg_msg.c | 104 ++++++++++++------ src/gras_simix/Msg/gras_simix_timer.c | 4 +- .../Transport/gras_simix_rl_transport.c | 2 +- .../Transport/gras_simix_sg_transport.c | 79 ++++++------- .../Transport/gras_simix_transport.c | 5 +- .../gras_simix_transport_interface.h | 4 +- .../gras_simix_transport_plugin_file.c | 2 +- .../gras_simix_transport_plugin_sg.c | 74 +++++++------ .../gras_simix_transport_plugin_tcp.c | 2 +- src/gras_simix/Virtu/gras_simix_process.c | 3 - src/gras_simix/Virtu/gras_simix_rl_dns.c | 2 +- src/gras_simix/Virtu/gras_simix_rl_emul.c | 2 +- src/gras_simix/Virtu/gras_simix_rl_process.c | 2 +- src/gras_simix/Virtu/gras_simix_sg_emul.c | 55 +++++++++ src/gras_simix/Virtu/gras_simix_sg_process.c | 27 +++-- src/gras_simix/Virtu/gras_simix_virtu_rl.h | 2 +- src/gras_simix/Virtu/gras_simix_virtu_sg.h | 8 +- 22 files changed, 253 insertions(+), 147 deletions(-) diff --git a/src/gras_simix/Msg/gras_simix_msg.c b/src/gras_simix/Msg/gras_simix_msg.c index 6fcf0ad434..58bf165e96 100644 --- a/src/gras_simix/Msg/gras_simix_msg.c +++ b/src/gras_simix/Msg/gras_simix_msg.c @@ -9,10 +9,10 @@ #include "xbt/ex.h" #include "xbt/ex_interface.h" -#include "gras/Msg/msg_private.h" -#include "gras/Virtu/virtu_interface.h" -#include "gras/DataDesc/datadesc_interface.h" -#include "gras/Transport/transport_interface.h" /* gras_select */ +#include "gras_simix/Msg/gras_simix_msg_private.h" +#include "gras_simix/Virtu/gras_simix_virtu_interface.h" +#include "gras_simix/DataDesc/gras_simix_datadesc_interface.h" +#include "gras_simix/Transport/gras_simix_transport_interface.h" /* gras_select */ #include "portable.h" /* execinfo when available to propagate exceptions */ #ifndef MIN @@ -39,6 +39,7 @@ static void *gras_msg_procdata_new() { res->msg_waitqueue = xbt_dynar_new(sizeof(s_gras_msg_t), NULL); res->cbl_list = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free); res->timers = xbt_dynar_new(sizeof(s_gras_timer_t), NULL); + res->msg_to_receive_queue = xbt_dynar_new(sizeof(s_gras_msg_t), NULL); return (void*)res; } @@ -53,6 +54,7 @@ static void gras_msg_procdata_free(void *data) { xbt_dynar_free(&( res->msg_waitqueue )); xbt_dynar_free(&( res->cbl_list )); xbt_dynar_free(&( res->timers )); + xbt_dynar_free(&( res->msg_to_receive_queue )); free(res->name); free(res); diff --git a/src/gras_simix/Msg/gras_simix_msg_interface.h b/src/gras_simix/Msg/gras_simix_msg_interface.h index 3595ead608..256e44f30e 100644 --- a/src/gras_simix/Msg/gras_simix_msg_interface.h +++ b/src/gras_simix/Msg/gras_simix_msg_interface.h @@ -36,6 +36,9 @@ typedef struct { /* registered timers */ xbt_dynar_t timers; /* elm type: s_gras_timer_t */ + + /* queue storing the msgs that have to received and the process synchronization made (wait the surf action done) */ + xbt_dynar_t msg_to_receive_queue; /* elm type: s_gras_msg_t */ } s_gras_msg_procdata_t,*gras_msg_procdata_t; diff --git a/src/gras_simix/Msg/gras_simix_msg_private.h b/src/gras_simix/Msg/gras_simix_msg_private.h index 540e000a64..2773ad10e7 100644 --- a/src/gras_simix/Msg/gras_simix_msg_private.h +++ b/src/gras_simix/Msg/gras_simix_msg_private.h @@ -26,7 +26,7 @@ #include "gras/timer.h" #include "gras_modinter.h" -#include "gras/Msg/msg_interface.h" +#include "gras_simix/Msg/gras_simix_msg_interface.h" extern char _GRAS_header[6]; diff --git a/src/gras_simix/Msg/gras_simix_rl_msg.c b/src/gras_simix/Msg/gras_simix_rl_msg.c index a4c4e160d5..b9b65387b3 100644 --- a/src/gras_simix/Msg/gras_simix_rl_msg.c +++ b/src/gras_simix/Msg/gras_simix_rl_msg.c @@ -8,10 +8,10 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "xbt/ex.h" -#include "gras/Msg/msg_private.h" +#include "gras_simix/Msg/gras_simix_msg_private.h" -#include "gras/DataDesc/datadesc_interface.h" -#include "gras/Transport/transport_interface.h" /* gras_trp_send/recv */ +#include "gras_simix/DataDesc/gras_simix_datadesc_interface.h" +#include "gras_simix/Transport/gras_simix_transport_interface.h" /* gras_trp_send/recv */ XBT_LOG_EXTERNAL_CATEGORY(gras_msg); XBT_LOG_DEFAULT_CATEGORY(gras_msg); diff --git a/src/gras_simix/Msg/gras_simix_rpc.c b/src/gras_simix/Msg/gras_simix_rpc.c index 89374859ad..84ec9e9bf6 100644 --- a/src/gras_simix/Msg/gras_simix_rpc.c +++ b/src/gras_simix/Msg/gras_simix_rpc.c @@ -7,7 +7,7 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -#include "gras/Msg/msg_private.h" +#include "gras_simix/Msg/gras_simix_msg_private.h" xbt_set_t _gras_rpctype_set = NULL; xbt_dynar_t _gras_rpc_cancelled = NULL; diff --git a/src/gras_simix/Msg/gras_simix_sg_msg.c b/src/gras_simix/Msg/gras_simix_sg_msg.c index 444950f4e9..f5e6e172fb 100644 --- a/src/gras_simix/Msg/gras_simix_sg_msg.c +++ b/src/gras_simix/Msg/gras_simix_sg_msg.c @@ -27,26 +27,35 @@ void gras_msg_send_ext(gras_socket_t sock, unsigned long int ID, gras_msgtype_t msgtype, void *payload) { -/* - m_task_t task=NULL; - gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data; - gras_msg_t msg; - int whole_payload_size=0;*/ /* msg->payload_size is used to memcpy the payload. + smx_action_t act; /* simix action */ + gras_trp_sg_sock_data_t *sock_data; + gras_hostdata_t *hd; + gras_trp_procdata_t trp_remote_proc; + gras_msg_procdata_t msg_remote_proc; + gras_msg_t msg; /* message to send */ + int whole_payload_size=0; /* msg->payload_size is used to memcpy the payload. This is used to report the load onto the simulator. It also counts the size of pointed stuff */ - /* - char *name; + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); xbt_assert1(!gras_socket_is_meas(sock), "Asked to send a message on the measurement socket %p", sock); - - msg=xbt_new0(s_gras_msg_t,1); - msg->type=msgtype; - msg->ID = ID; - + + /* got the mutex my port */ + DEBUG1("Sock port %d",sock->port); + SIMIX_mutex_lock(hd->mutex_port[sock->port]); + + /*initialize gras message */ + msg = xbt_new(s_gras_msg_t,1); + msg->expe = sock; + msg->kind = kind; + msg->type = msgtype; + msg->ID = ID; if (kind == e_gras_msg_kind_rpcerror) { - */ /* error on remote host, carfull, payload is an exception */ - /*msg->payl_size=gras_datadesc_size(gras_datadesc_by_name("ex_t")); + /* error on remote host, carfull, payload is an exception */ + msg->payl_size=gras_datadesc_size(gras_datadesc_by_name("ex_t")); msg->payl=xbt_malloc(msg->payl_size); whole_payload_size = gras_datadesc_copy(gras_datadesc_by_name("ex_t"), payload,msg->payl); @@ -63,9 +72,37 @@ void gras_msg_send_ext(gras_socket_t sock, whole_payload_size = gras_datadesc_copy(msgtype->ctn_type, payload, msg->payl); } + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); + xbt_dynar_push(msg_remote_proc->msg_to_receive_queue,msg); + + /* wake-up the receiver */ + trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); + trp_remote_proc->active_socket = sock; + + SIMIX_cond_signal(trp_remote_proc->cond); + + /* wait for the receiver */ + SIMIX_cond_wait(hd->cond_port[sock->port], hd->mutex_port[sock->port]); + + /* creates simix action and waits its ends, waits in the sender host condition*/ + act = SIMIX_action_communicate(sock_data->to_host, SIMIX_host_self(),msgtype->name, msg->payl_size, -1); + SIMIX_register_action_to_condition(act,hd->cond_port[sock->port]); + SIMIX_register_condition_to_action(act,hd->cond_port[sock->port]); + + + SIMIX_cond_wait(hd->cond_port[sock->port], hd->mutex_port[sock->port]); + /* error treatmeant */ - msg->kind = kind; + /* cleanup structures */ + SIMIX_action_destroy(act); + SIMIX_mutex_unlock(hd->mutex_port[sock->port]); + VERB5("Sent to %s(%s) a message type '%s' kind '%s' ID %lu", + SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process), + msg->type->name,e_gras_msg_kind_names[msg->kind], msg->ID); + +/* if (XBT_LOG_ISENABLED(gras_msg,xbt_log_priority_verbose)) { asprintf(&name,"type:'%s';kind:'%s';ID %lu from %s:%d to %s:%d", msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID, @@ -99,37 +136,40 @@ void gras_msg_send_ext(gras_socket_t sock, void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) { -/* - m_task_t task=NULL; + + gras_trp_sg_sock_data_t *sock_data; + gras_hostdata_t *remote_hd; s_gras_msg_t msg_got; - gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); - gras_msg_procdata_t msg = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); + gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); + xbt_assert1(!gras_socket_is_meas(sock), "Asked to receive a message on the measurement socket %p", sock); xbt_assert0(msg,"msg is an out parameter of gras_msg_recv..."); - if (xbt_dynar_length(msg->msg_queue) == 0 ) { + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port); + remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); + + if (xbt_dynar_length(msg_procdata->msg_to_receive_queue) == 0 ) { THROW_IMPOSSIBLE; } - xbt_dynar_shift(msg->msg_queue,&msg_got); - -// if (MSG_task_get(&task, pd->chan) != MSG_OK) -// THROW0(system_error,0,"Error in MSG_task_get()"); + xbt_dynar_shift(msg_procdata->msg_to_receive_queue,&msg_got); - msg_got=MSG_task_get_data(task); + SIMIX_mutex_lock(remote_hd->mutex_port[sock->peer_port]); +/* ok, I'm here, you can continuate the communication */ + SIMIX_cond_signal(remote_hd->cond_port[sock->peer_port]); +/* wait for communication end */ + SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port],remote_hd->mutex_port[sock->peer_port]); - msg_got->expe= msg->expe; - memcpy(msg,msg_got,sizeof(s_gras_msg_t)); + msg_got.expe= msg->expe; + memcpy(msg,&msg_got,sizeof(s_gras_msg_t)); - free(msg_got); - if (MSG_task_destroy(task) != MSG_OK) - THROW0(system_error,0,"Error in MSG_task_destroy()"); + SIMIX_mutex_unlock(remote_hd->mutex_port[sock->peer_port]); - VERB3("Received a message type '%s' kind '%s' ID %lu",// from %s", + VERB3("Received a message type '%s' kind '%s' ID %lu",// from %s", msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID); - */ } diff --git a/src/gras_simix/Msg/gras_simix_timer.c b/src/gras_simix/Msg/gras_simix_timer.c index 616250284a..d15e482b9c 100644 --- a/src/gras_simix/Msg/gras_simix_timer.c +++ b/src/gras_simix/Msg/gras_simix_timer.c @@ -8,9 +8,9 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "xbt/ex.h" -#include "gras/Msg/msg_private.h" +#include "gras_simix/Msg/gras_simix_msg_private.h" #include "gras/timer.h" -#include "gras/Virtu/virtu_interface.h" +#include "gras_simix/Virtu/gras_simix_virtu_interface.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_timer,gras, diff --git a/src/gras_simix/Transport/gras_simix_rl_transport.c b/src/gras_simix/Transport/gras_simix_rl_transport.c index 7d7c7a413c..9407947ed7 100644 --- a/src/gras_simix/Transport/gras_simix_rl_transport.c +++ b/src/gras_simix/Transport/gras_simix_rl_transport.c @@ -9,7 +9,7 @@ #include "xbt/ex.h" #include "portable.h" -#include "gras/Transport/transport_private.h" +#include "gras_simix/Transport/gras_simix_transport_private.h" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp); /* check transport_private.h for an explanation of this variable */ diff --git a/src/gras_simix/Transport/gras_simix_sg_transport.c b/src/gras_simix/Transport/gras_simix_sg_transport.c index b316d32f09..ca88ffcf89 100644 --- a/src/gras_simix/Transport/gras_simix_sg_transport.c +++ b/src/gras_simix/Transport/gras_simix_sg_transport.c @@ -36,28 +36,29 @@ gras_socket_t gras_trp_select(double timeout) { gras_trp_plugin_t trp; gras_socket_t sock_iter; /* iterating over all sockets */ int cursor; + int i; - //gras_sg_portrec_t pr; /* iterating to find the chanel of expeditor */ -// int r_pid; gras_hostdata_t *remote_hd; - + gras_hostdata_t *local_hd; + + DEBUG0("Trying to get the lock pd, trp_select"); SIMIX_mutex_lock(pd->mutex); DEBUG3("select on %s@%s with timeout=%f", SIMIX_process_get_name(SIMIX_process_self()), SIMIX_host_get_name(SIMIX_host_self()), timeout); - //MSG_channel_select_from((m_channel_t) pd->chan, timeout, &r_pid); - SIMIX_cond_wait_timeout(pd->cond,pd->mutex,timeout); if (pd->active_socket == NULL) { - DEBUG0("TIMEOUT"); - THROW0(timeout_error,0,"Timeout"); + /* message doesn't arrive yet, wait */ + SIMIX_cond_wait_timeout(pd->cond,pd->mutex,timeout); } -/* if (r_pid < 0) { + + if (pd->active_socket == NULL) { DEBUG0("TIMEOUT"); + SIMIX_mutex_unlock(pd->mutex); THROW0(timeout_error,0,"Timeout"); - }*/ - + } + /* Ok, got something. Open a socket back to the expeditor */ /* Try to reuse an already openned socket to that expeditor */ @@ -66,8 +67,15 @@ gras_socket_t gras_trp_select(double timeout) { if (sock_iter->meas || !sock_iter->outgoing) continue; - if (sock_iter->peer_port == pd->active_socket->port) + DEBUG4("sock_iter %p port %d active %p port %d",((gras_trp_sg_sock_data_t*)sock_iter->data)->to_process,sock_iter->peer_port, + ((gras_trp_sg_sock_data_t*)pd->active_socket->data)->from_process, pd->active_socket->port); + if ((sock_iter->peer_port == pd->active_socket->port) && + (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_host == SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)pd->active_socket->data)->from_process))) { + DEBUG0("\n\nAproveitou socket\n\n"); + pd->active_socket=NULL; + SIMIX_mutex_unlock(pd->mutex); return sock_iter; + } } @@ -87,13 +95,12 @@ gras_socket_t gras_trp_select(double timeout) { res->port = -1; sockdata = xbt_new(gras_trp_sg_sock_data_t,1); - //sockdata->from_PID = MSG_process_self_PID(); - //sockdata->to_PID = r_pid; sockdata->from_process = SIMIX_process_self(); sockdata->to_process = ((gras_trp_sg_sock_data_t*)(pd->active_socket->data))->from_process; /* complicated*/ sockdata->to_host = SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)(pd->active_socket->data))->from_process); + res->data = sockdata; gras_trp_buf_init_sock(res); @@ -102,39 +109,23 @@ gras_socket_t gras_trp_select(double timeout) { remote_hd=(gras_hostdata_t *)SIMIX_host_get_data(sockdata->to_host); xbt_assert0(remote_hd,"Run gras_process_init!!"); - //sockdata->to_chan = -1; res->peer_port = pd->active_socket->port; - /* - for (cursor=0; cursorproc[cursor] == r_pid) { - sockdata->to_chan = cursor; - DEBUG2("Chan %d on %s is for my pal", - cursor,res->peer_name); - - xbt_dynar_foreach(remote_hd->ports, cpt, pr) { - if (sockdata->to_chan == pr.tochan) { - if (pr.meas) { - DEBUG0("Damn, it's for measurement"); - continue; - } - - res->peer_port = pr.port; - DEBUG1("Cool, it points to port %d", pr.port); - break; - } else { - DEBUG2("Wrong port (tochan=%d, looking for %d)\n", - pr.tochan,sockdata->to_chan); - } - } - if (res->peer_port == -10) { - sockdata->to_chan = -1; - } else { - break; - } - } + + /* search for a free port on the host */ + local_hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + for (i=1;i<65536;i++) { + if (local_hd->cond_port[i] == NULL) + break; } - */ - + if (i == 65536) { + SIMIX_mutex_unlock(pd->mutex); + THROW0(system_error,0,"No port free"); + } + res->port = i; + /*initialize the cond and mutex */ + local_hd->cond_port[i] = SIMIX_cond_init(); + local_hd->mutex_port[i] = SIMIX_mutex_init(); + DEBUG1("New socket: Peer port %d", res->peer_port); pd->active_socket=NULL; SIMIX_mutex_unlock(pd->mutex); diff --git a/src/gras_simix/Transport/gras_simix_transport.c b/src/gras_simix/Transport/gras_simix_transport.c index 134b418420..1407e26905 100644 --- a/src/gras_simix/Transport/gras_simix_transport.c +++ b/src/gras_simix/Transport/gras_simix_transport.c @@ -15,7 +15,7 @@ int gras_opt_trp_nomoredata_on_close=0; #include "xbt/ex.h" #include "xbt/peer.h" #include "portable.h" -#include "gras/Transport/transport_private.h" +#include "gras_simix/Transport/gras_simix_transport_private.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp,gras,"Conveying bytes over the network"); XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas,gras_trp,"Conveying bytes over the network without formating for perf measurements"); @@ -573,7 +573,7 @@ static void *gras_trp_procdata_new() { res->name = xbt_strdup("gras_trp"); res->name_len = 0; - res->sockets = xbt_dynar_new(sizeof(gras_socket_t*), NULL); + res->sockets = xbt_dynar_new(sizeof(gras_socket_t*), NULL); res->myport = 0; return (void*)res; @@ -611,6 +611,7 @@ void gras_trp_socketset_dump(const char *name) { */ int gras_trp_libdata_id; void gras_trp_register() { + DEBUG0("\ntrp add\n"); gras_trp_libdata_id = gras_procdata_add("gras_trp",gras_trp_procdata_new, gras_trp_procdata_free); } diff --git a/src/gras_simix/Transport/gras_simix_transport_interface.h b/src/gras_simix/Transport/gras_simix_transport_interface.h index 189f605871..d6212afa86 100644 --- a/src/gras_simix/Transport/gras_simix_transport_interface.h +++ b/src/gras_simix/Transport/gras_simix_transport_interface.h @@ -94,8 +94,6 @@ gras_trp_plugin_get_by_name(const char *name); */ typedef struct { /* set headers */ - long int pid; - long int ppid; unsigned int ID; char *name; unsigned int name_len; @@ -108,6 +106,8 @@ typedef struct { smx_cond_t cond; smx_mutex_t mutex; gras_socket_t active_socket; + long int pid; + long int ppid; //int chan; /* Formated messages channel */ //int measChan; /* Unformated echange channel for performance measurement*/ diff --git a/src/gras_simix/Transport/gras_simix_transport_plugin_file.c b/src/gras_simix/Transport/gras_simix_transport_plugin_file.c index 1b559697a1..ba21c3c382 100644 --- a/src/gras_simix/Transport/gras_simix_transport_plugin_file.c +++ b/src/gras_simix/Transport/gras_simix_transport_plugin_file.c @@ -8,7 +8,7 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "portable.h" -#include "transport_private.h" +#include "gras_simix_transport_private.h" #include "xbt/ex.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_file,gras_trp, diff --git a/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c b/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c index 23c384e34b..14e5674647 100644 --- a/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c +++ b/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c @@ -94,16 +94,17 @@ gras_trp_sg_setup(gras_trp_plugin_t plug) { void gras_trp_sg_socket_client(gras_trp_plugin_t self, /* OUT */ gras_socket_t sock){ - /* xbt_ex_t e; smx_host_t peer; gras_hostdata_t *hd; + gras_hostdata_t *local_hd; gras_trp_sg_sock_data_t *data; gras_sg_portrec_t pr; -*/ + int i; + /* make sure this socket will reach someone */ - /*if (!(peer=SIMIX_get_host_by_name(sock->peer_name))) + if (!(peer=SIMIX_host_get_by_name(sock->peer_name))) THROW1(mismatch_error,0,"Can't connect to %s: no such host.\n",sock->peer_name); if (!(hd=(gras_hostdata_t *)SIMIX_host_get_data(peer))) @@ -133,32 +134,37 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self, "can't connect to %s:%d in meas mode, the process listen " "in regular mode on this port",sock->peer_name,sock->peer_port); } -*/ /* create the socket */ - /*data = xbt_new(gras_trp_sg_sock_data_t,1); - //data->from_PID = gras_os_getpid(); + data = xbt_new(gras_trp_sg_sock_data_t,1); data->from_process = SIMIX_process_self(); data->to_process = pr.process; - //data->to_PID = hd->proc[ pr.tochan ]; - data->to_port = pr; data->to_host = peer; - //data->to_chan = pr.tochan; + + /* searches for a free port on this host */ + local_hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + for (i=1;i<65536;i++) { + if (local_hd->cond_port[i] == NULL) + break; + } + if (i == 65536) THROW0(system_error,0,"No port free"); + sock->port = i; + local_hd->cond_port[i] = SIMIX_cond_init(); + local_hd->mutex_port[i] = SIMIX_mutex_init(); sock->data = data; sock->incoming = 1; - DEBUG6("%s (PID %d) connects in %s mode to %s:%d (to_PID=%d)", - SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid, + DEBUG5("%s (PID %ld) connects in %s mode to %s:%d", + SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), sock->meas?"meas":"regular", - sock->peer_name,sock->peer_port,data->to_PID); -*/ + sock->peer_name,sock->peer_port); } void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock){ -/* + gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); - gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); + //gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); gras_sg_portrec_t pr; gras_trp_sg_sock_data_t *data; volatile int found; @@ -169,8 +175,7 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, xbt_assert0(hd,"Please run gras_process_init on each process"); - sock->accepting = 0;*/ /* no such nuisance in SG */ -/* + sock->accepting = 0; /* no such nuisance in SG */ found = 0; TRY { find_port(hd,sock->port,&pr); @@ -187,31 +192,30 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, "can't listen on address %s:%d: port already in use.", host,sock->port); - //pr.tochan = sock->meas ? pd->measChan : pd->chan; pr.port = sock->port; pr.meas = sock->meas; pr.process = SIMIX_process_self(); xbt_dynar_push(hd->ports,&pr); - */ + + hd->cond_port[sock->port] = SIMIX_cond_init(); + hd->mutex_port[sock->port] = SIMIX_mutex_init(); + /* Create the socket */ - /*data = xbt_new(gras_trp_sg_sock_data_t,1); - data->from_process = NULL; - data->to_PID = SIMIX_process_self();; - //data->to_process = gras_os_getpid(); - data->to_host = SIMIX_host_self(); - data->to_port = pr; - //data->to_chan = pd->chan; + data = xbt_new(gras_trp_sg_sock_data_t,1); + data->from_process = SIMIX_process_self(); + data->to_process = NULL; + data->to_host = SIMIX_host_self(); sock->data = data; - VERB6("'%s' (%d) ears on %s:%d%s (%p)", + VERB6("'%s' (%ld) ears on %s:%d%s (%p)", SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), host,sock->port,sock->meas? " (mode meas)":"",sock); -*/ + } void gras_trp_sg_socket_close(gras_socket_t sock){ -/* + gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); int cpt; @@ -220,14 +224,20 @@ void gras_trp_sg_socket_close(gras_socket_t sock){ XBT_IN1(" (sock=%p)",sock); if (!sock) return; + xbt_assert0(hd,"Please run gras_process_init on each process"); if (sock->data) free(sock->data); - if (sock->incoming && sock->port >= 0) {*/ + SIMIX_cond_destroy(hd->cond_port[sock->port]); + hd->cond_port[sock->port] = NULL; + SIMIX_mutex_destroy(hd->mutex_port[sock->port]); + hd->mutex_port[sock->port] = NULL; + + if (sock->incoming && sock->port >= 0) { /* server mode socket. Unregister it from 'OS' tables */ -/* xbt_dynar_foreach(hd->ports, cpt, pr) { + xbt_dynar_foreach(hd->ports, cpt, pr) { DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); if (pr.port == sock->port) { xbt_dynar_cursor_rm(hd->ports, &cpt); @@ -239,7 +249,7 @@ void gras_trp_sg_socket_close(gras_socket_t sock){ sock,sock->port); } XBT_OUT; -*/ + } typedef struct { diff --git a/src/gras_simix/Transport/gras_simix_transport_plugin_tcp.c b/src/gras_simix/Transport/gras_simix_transport_plugin_tcp.c index e5b5d4c466..2ab6cec48d 100644 --- a/src/gras_simix/Transport/gras_simix_transport_plugin_tcp.c +++ b/src/gras_simix/Transport/gras_simix_transport_plugin_tcp.c @@ -14,7 +14,7 @@ #include "xbt/misc.h" #include "xbt/sysdep.h" #include "xbt/ex.h" -#include "transport_private.h" +#include "gras_simix_transport_private.h" /* FIXME maybe READV is sometime a good thing? */ #undef HAVE_READV diff --git a/src/gras_simix/Virtu/gras_simix_process.c b/src/gras_simix/Virtu/gras_simix_process.c index 28ab76ca70..4511574b3e 100644 --- a/src/gras_simix/Virtu/gras_simix_process.c +++ b/src/gras_simix/Virtu/gras_simix_process.c @@ -80,13 +80,11 @@ void *gras_libdata_by_name(const char *name) { void *gras_libdata_by_name_from_procdata(const char*name, gras_procdata_t* pd) { void *res=NULL; xbt_ex_t e; - if (xbt_set_length(pd->libdata) < xbt_dynar_length(_gras_procdata_fabrics)) { /* Damn, some new modules were added since procdata_init(). Amok? */ /* Get 'em all */ gras_procdata_init(); } - TRY { res = xbt_set_get_by_name(pd->libdata, name); } CATCH(e) { @@ -152,7 +150,6 @@ gras_procdata_init() { WARN1("Module '%s' constructor is borken: it does not set elem->name_len", fab.name); } - xbt_set_add(pd->libdata, elem, fab.destructor); } } diff --git a/src/gras_simix/Virtu/gras_simix_rl_dns.c b/src/gras_simix/Virtu/gras_simix_rl_dns.c index 04e2f5408b..993204f8c0 100644 --- a/src/gras_simix/Virtu/gras_simix_rl_dns.c +++ b/src/gras_simix/Virtu/gras_simix_rl_dns.c @@ -7,7 +7,7 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -#include "gras/Virtu/virtu_rl.h" +#include "gras_simix/Virtu/gras_simix_virtu_rl.h" #include "portable.h" /* A portable DNS resolver is a nightmare to do in a portable manner. diff --git a/src/gras_simix/Virtu/gras_simix_rl_emul.c b/src/gras_simix/Virtu/gras_simix_rl_emul.c index 908268d4fa..ee4d4139b8 100644 --- a/src/gras_simix/Virtu/gras_simix_rl_emul.c +++ b/src/gras_simix/Virtu/gras_simix_rl_emul.c @@ -8,7 +8,7 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "gras/emul.h" -#include "gras/Virtu/virtu_rl.h" +#include "gras_simix/Virtu/gras_simix_virtu_rl.h" #include "gras_modinter.h" XBT_LOG_NEW_SUBCATEGORY(gras_virtu_emul,gras_virtu,"Emulation support"); diff --git a/src/gras_simix/Virtu/gras_simix_rl_process.c b/src/gras_simix/Virtu/gras_simix_rl_process.c index 7351f8aff4..2c6329e10e 100644 --- a/src/gras_simix/Virtu/gras_simix_rl_process.c +++ b/src/gras_simix/Virtu/gras_simix_rl_process.c @@ -8,7 +8,7 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "gras_modinter.h" /* module initialization interface */ -#include "gras/Virtu/virtu_rl.h" +#include "gras_simix/Virtu/gras_simix_virtu_rl.h" #include "portable.h" /* globals */ diff --git a/src/gras_simix/Virtu/gras_simix_sg_emul.c b/src/gras_simix/Virtu/gras_simix_sg_emul.c index d07083f6dd..836353e1f0 100644 --- a/src/gras_simix/Virtu/gras_simix_sg_emul.c +++ b/src/gras_simix/Virtu/gras_simix_sg_emul.c @@ -169,4 +169,59 @@ int gras_if_SG(void) { return 1; } +void gras_global_init(int *argc,char **argv) { +return SIMIX_global_init(argc,argv); +} +void gras_create_environment(const char *file) { +return SIMIX_create_environment(file); +} +void gras_function_register(const char *name, void *code) { +return SIMIX_function_register(name, (smx_process_code_t)code); +} +void gras_main() { + smx_cond_t cond = NULL; + smx_action_t smx_action; + xbt_fifo_t actions_done = xbt_fifo_new(); + xbt_fifo_t actions_failed = xbt_fifo_new(); + + + /* Clean IO before the run */ + fflush(stdout); + fflush(stderr); + + + while (SIMIX_solve(actions_done, actions_failed) != -1.0) { + + while ( (smx_action = xbt_fifo_pop(actions_failed)) ) { + + + DEBUG1("** %s failed **",smx_action->name); + while ( (cond = xbt_fifo_pop(smx_action->cond_list)) ) { + SIMIX_cond_broadcast(cond); + } + /* action finished, destroy it */ + // SIMIX_action_destroy(smx_action); + } + + while ( (smx_action = xbt_fifo_pop(actions_done)) ) { + + DEBUG1("** %s done **",smx_action->name); + while ( (cond = xbt_fifo_pop(smx_action->cond_list)) ) { + SIMIX_cond_broadcast(cond); + } + /* action finished, destroy it */ + //SIMIX_action_destroy(smx_action); + } + } + xbt_fifo_free(actions_failed); + xbt_fifo_free(actions_done); + return; + +} +void gras_launch_application(const char *file) { +return SIMIX_launch_application(file); +} +void gras_clean() { +return SIMIX_clean(); +} diff --git a/src/gras_simix/Virtu/gras_simix_sg_process.c b/src/gras_simix/Virtu/gras_simix_sg_process.c index 7d8b43b9ab..e09f6fdd4f 100644 --- a/src/gras_simix/Virtu/gras_simix_sg_process.c +++ b/src/gras_simix/Virtu/gras_simix_sg_process.c @@ -19,6 +19,7 @@ static long int PID = 1; void gras_process_init() { + int i; gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); gras_procdata_t *pd=xbt_new0(gras_procdata_t,1); gras_trp_procdata_t trp_pd; @@ -27,12 +28,7 @@ gras_process_init() { SIMIX_process_set_data(SIMIX_process_self(),(void*)pd); - trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); - trp_pd->pid = PID++; - if (SIMIX_process_self() != NULL ) { - trp_pd->ppid = gras_os_getpid(); - } - else trp_pd->ppid = -1; + gras_procdata_init(); @@ -43,12 +39,25 @@ gras_process_init() { hd->ports = xbt_dynar_new(sizeof(gras_sg_portrec_t),NULL); // memset(hd->proc, 0, sizeof(hd->proc[0]) * XBT_MAX_CHANNEL); - - SIMIX_host_set_data(SIMIX_host_self(),(void*)hd); + + for (i=0;i<65536;i++) { + hd->cond_port[i] =NULL; + hd->mutex_port[i] =NULL; + } + SIMIX_host_set_data(SIMIX_host_self(),(void*)hd); } else { hd->refcount++; } - + + trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); + trp_pd->pid = PID++; + if (SIMIX_process_self() != NULL ) { + trp_pd->ppid = gras_os_getpid(); + } + else trp_pd->ppid = -1; + trp_pd->mutex = SIMIX_mutex_init(); + trp_pd->cond = SIMIX_cond_init(); + trp_pd->active_socket = NULL; /* take a free channel for this process */ /* trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); diff --git a/src/gras_simix/Virtu/gras_simix_virtu_rl.h b/src/gras_simix/Virtu/gras_simix_virtu_rl.h index 0a30c4a19a..8df16c178f 100644 --- a/src/gras_simix/Virtu/gras_simix_virtu_rl.h +++ b/src/gras_simix/Virtu/gras_simix_virtu_rl.h @@ -10,6 +10,6 @@ #ifndef VIRTU_RL_H #define VIRTU_RL_H -#include "gras/Virtu/virtu_private.h" +#include "gras_simix/Virtu/gras_simix_virtu_private.h" #endif /* VIRTU_RL_H */ diff --git a/src/gras_simix/Virtu/gras_simix_virtu_sg.h b/src/gras_simix/Virtu/gras_simix_virtu_sg.h index 6db30cad7b..d2fb67d2d3 100644 --- a/src/gras_simix/Virtu/gras_simix_virtu_sg.h +++ b/src/gras_simix/Virtu/gras_simix_virtu_sg.h @@ -17,17 +17,16 @@ typedef struct { int port; /* list of ports used by a server socket */ -// int tochan; /* the channel it points to */ int meas; /* (boolean) the channel is for measurements or for messages */ + smx_process_t process; } gras_sg_portrec_t; /* Data for each host */ typedef struct { int refcount; -// int proc[XBT_MAX_CHANNEL]; /* PID of who's connected to each channel */ - /* If =0, then free */ smx_cond_t cond_port[65536]; + smx_mutex_t mutex_port[65536]; xbt_dynar_t ports; @@ -41,8 +40,7 @@ typedef struct { smx_process_t to_process; smx_host_t to_host; /* Who's on other side */ - gras_sg_portrec_t port; - //m_channel_t to_chan;/* Channel on which the other side is earing */ + } gras_trp_sg_sock_data_t; -- 2.20.1