git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1360
48e7efb5-ca39-0410-a469-
dd3cf9ba447f
DEBUG2("accepted=%p,&accepted=%p",accepted,&accepted);
TRY((sock_iter->plugin->socket_accept)(sock_iter,&accepted));
DEBUG2("accepted=%p,&accepted=%p",accepted,&accepted);
TRY((sock_iter->plugin->socket_accept)(sock_iter,&accepted));
- accepted->raw = sock_iter->raw;
+ accepted->meas = sock_iter->meas;
} else {
#if 0
FIXME: this fails of files. quite logical
} else {
#if 0
FIXME: this fails of files. quite logical
DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter);
sockdata = sock_iter->data;
DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter);
sockdata = sock_iter->data;
- if (sock_iter->raw || !sock_iter->outgoing)
+ if (sock_iter->meas || !sock_iter->outgoing)
continue;
if (sockdata->from_PID == r_pid) {
continue;
if (sockdata->from_PID == r_pid) {
xbt_dynar_foreach(remote_hd->ports, cpt, pr) {
if (sockdata->to_chan == pr.tochan) {
xbt_dynar_foreach(remote_hd->ports, cpt, pr) {
if (sockdata->to_chan == pr.tochan) {
- if (pr.raw) {
- DEBUG0("Damn, it's raw");
+ if (pr.meas) {
+ DEBUG0("Damn, it's for measurement");
}
}
if ((*dst)->peer_port == -10) {
}
}
if ((*dst)->peer_port == -10) {
+ /* was for measurement */
sockdata->to_chan = -1;
} else {
sockdata->to_chan = -1;
} else {
- /* found it, don't let it override by raw */
+ /* found it, don't let it override by meas */
#include "gras/Transport/transport_private.h"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(transport,gras,"Conveying bytes over the network");
#include "gras/Transport/transport_private.h"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(transport,gras,"Conveying bytes over the network");
-XBT_LOG_NEW_SUBCATEGORY(raw_trp,transport,"Conveying bytes over the network without formating");
+XBT_LOG_NEW_SUBCATEGORY(meas_trp,transport,"Conveying bytes over the network without formating for perf measurements");
static short int _gras_trp_started = 0;
static xbt_dict_t _gras_trp_plugins; /* All registered plugins */
static short int _gras_trp_started = 0;
static xbt_dict_t _gras_trp_plugins; /* All registered plugins */
sock->port = -1;
sock->peer_port = -1;
sock->peer_name = NULL;
sock->port = -1;
sock->peer_port = -1;
sock->peer_name = NULL;
gras_socket_server_ext(unsigned short port,
unsigned long int bufSize,
gras_socket_server_ext(unsigned short port,
unsigned long int bufSize,
/* OUT */ gras_socket_t *dst) {
/* OUT */ gras_socket_t *dst) {
sock->plugin= trp;
sock->port=port;
sock->bufSize = bufSize;
sock->plugin= trp;
sock->port=port;
sock->bufSize = bufSize;
+ sock->meas = measurement;
/* Call plugin socket creation function */
DEBUG1("Prepare socket with plugin (fct=%p)",trp->socket_server);
/* Call plugin socket creation function */
DEBUG1("Prepare socket with plugin (fct=%p)",trp->socket_server);
unsigned short port,
unsigned long int bufSize,
unsigned short port,
unsigned long int bufSize,
/* OUT */ gras_socket_t *dst) {
/* OUT */ gras_socket_t *dst) {
sock->peer_port = port;
sock->peer_name = (char*)strdup(host?host:"localhost");
sock->bufSize = bufSize;
sock->peer_port = port;
sock->peer_name = (char*)strdup(host?host:"localhost");
sock->bufSize = bufSize;
/* plugin-specific */
errcode= (*trp->socket_client)(trp, sock);
/* plugin-specific */
errcode= (*trp->socket_client)(trp, sock);
return sock->peer_name;
}
return sock->peer_name;
}
-xbt_error_t gras_socket_raw_send(gras_socket_t peer,
+xbt_error_t gras_socket_meas_send(gras_socket_t peer,
unsigned int timeout,
unsigned long int exp_size,
unsigned long int msg_size) {
unsigned int timeout,
unsigned long int exp_size,
unsigned long int msg_size) {
char *chunk = xbt_malloc(msg_size);
int exp_sofar;
char *chunk = xbt_malloc(msg_size);
int exp_sofar;
- xbt_assert0(peer->raw,"Asked to send raw data on a regular socket");
+ XBT_IN;
+ xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket");
for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
- CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d",
+ CDEBUG5(meas_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d",
exp_sofar,exp_size,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
TRY(gras_trp_chunk_send(peer,chunk,msg_size));
}
exp_sofar,exp_size,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
TRY(gras_trp_chunk_send(peer,chunk,msg_size));
}
- CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d",
+ CDEBUG5(meas_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d",
exp_sofar,exp_size,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
free(chunk);
exp_sofar,exp_size,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
free(chunk);
- return no_error;/* gras_socket_raw_exchange(peer,1,timeout,expSize,msgSize); */
+ XBT_OUT;
+ return no_error;/* gras_socket_meas_exchange(peer,1,timeout,expSize,msgSize); */
-xbt_error_t gras_socket_raw_recv(gras_socket_t peer,
+xbt_error_t gras_socket_meas_recv(gras_socket_t peer,
unsigned int timeout,
unsigned long int exp_size,
unsigned long int msg_size){
unsigned int timeout,
unsigned long int exp_size,
unsigned long int msg_size){
char *chunk = xbt_malloc(msg_size);
int exp_sofar;
char *chunk = xbt_malloc(msg_size);
int exp_sofar;
- xbt_assert0(peer->raw,"Asked to recveive raw data on a regular socket\n");
+ XBT_IN;
+ xbt_assert0(peer->meas,"Asked to receive measurement data on a regular socket\n");
for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
- CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d",
+ CDEBUG5(meas_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d",
exp_sofar,exp_size,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
TRY(gras_trp_chunk_recv(peer,chunk,msg_size));
}
exp_sofar,exp_size,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
TRY(gras_trp_chunk_recv(peer,chunk,msg_size));
}
- CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d",
+ CDEBUG5(meas_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d",
exp_sofar,exp_size,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
free(chunk);
exp_sofar,exp_size,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
free(chunk);
- return no_error;/* gras_socket_raw_exchange(peer,0,timeout,expSize,msgSize); */
+ XBT_OUT;
+ return no_error;/* gras_socket_meas_exchange(peer,0,timeout,expSize,msgSize); */
typedef struct {
/* SG only elements. In RL, they are part of the OS ;) */
int chan; /* Formated messages channel */
typedef struct {
/* SG only elements. In RL, they are part of the OS ;) */
int chan; /* Formated messages channel */
- int rawChan; /* Unformated echange channel */
+ int measChan; /* Unformated echange channel for performance measurement*/
xbt_dynar_t sockets; /* all sockets known to this process */
} s_gras_trp_procdata_t,*gras_trp_procdata_t;
xbt_dynar_t sockets; /* all sockets known to this process */
} s_gras_trp_procdata_t,*gras_trp_procdata_t;
sock->peer_name,sock->peer_port);
}
sock->peer_name,sock->peer_port);
}
- if (pr.raw && !sock->raw) {
+ if (pr.meas && !sock->meas) {
RAISE2(mismatch_error,
"can't connect to %s:%d in regular mode, the process listen "
RAISE2(mismatch_error,
"can't connect to %s:%d in regular mode, the process listen "
- "in raw mode on this port",sock->peer_name,sock->peer_port);
+ "in meas mode on this port",sock->peer_name,sock->peer_port);
- if (!pr.raw && sock->raw) {
+ if (!pr.meas && sock->meas) {
- "can't connect to %s:%d in raw mode, the process listen "
+ "can't connect to %s:%d in meas mode, the process listen "
"in regular mode on this port",sock->peer_name,sock->peer_port);
}
"in regular mode on this port",sock->peer_name,sock->peer_port);
}
DEBUG6("%s (PID %d) connects in %s mode to %s:%d (to_PID=%d)",
MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
DEBUG6("%s (PID %d) connects in %s mode to %s:%d (to_PID=%d)",
MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
- sock->raw?"RAW":"regular",
+ sock->meas?"meas":"regular",
sock->peer_name,sock->peer_port,data->to_PID);
return no_error;
sock->peer_name,sock->peer_port,data->to_PID);
return no_error;
break;
case mismatch_error: /* Port not used so far. Do it */
break;
case mismatch_error: /* Port not used so far. Do it */
- pr.tochan = sock->raw ? pd->rawChan : pd->chan;
+ pr.tochan = sock->meas ? pd->measChan : pd->chan;
xbt_dynar_push(hd->ports,&pr);
break;
xbt_dynar_push(hd->ports,&pr);
break;
VERB6("'%s' (%d) ears on %s:%d%s (%p)",
MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
VERB6("'%s' (%d) ears on %s:%d%s (%p)",
MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
- host,sock->port,sock->raw? " (mode RAW)":"",sock);
+ host,sock->port,sock->meas? " (mode meas)":"",sock);
DEBUG4("recv chunk on %s -> %s:%d (size=%ld)",
MSG_host_get_name(sock_data->to_host),
MSG_host_get_name(MSG_host_self()), sock_data->to_chan, size);
DEBUG4("recv chunk on %s -> %s:%d (size=%ld)",
MSG_host_get_name(sock_data->to_host),
MSG_host_get_name(MSG_host_self()), sock_data->to_chan, size);
- if (MSG_task_get(&task, (sock->raw ? pd->rawChan : pd->chan)) != MSG_OK)
+ if (MSG_task_get(&task, (sock->meas ? pd->measChan : pd->chan)) != MSG_OK)
RAISE0(unknown_error,"Error in MSG_task_get()");
DEBUG1("Got chuck %s",MSG_task_get_name(task));
RAISE0(unknown_error,"Error in MSG_task_get()");
DEBUG1("Got chuck %s",MSG_task_get_name(task));
-/* Data exchange over raw sockets */
-xbt_error_t gras_socket_raw_exchange(gras_socket_t peer,
+#if 0
+/* Data exchange over measurement sockets */
+xbt_error_t gras_socket_meas_exchange(gras_socket_t peer,
int sender,
unsigned int timeout,
unsigned long int expSize,
int sender,
unsigned int timeout,
unsigned long int expSize,
- sprintf(name,"Raw data[%d]",count++);
+ sprintf(name,"meas data[%d]",count++);
task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
- DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) BEGIN",
+ DEBUG5("%f:%s: gras_socket_meas_send(%f %s -> %s) BEGIN",
gras_os_time(), MSG_process_get_name(MSG_process_self()),
((double)msgSize)/(1024.0*1024.0),
MSG_host_get_name( MSG_host_self()), peer->peer_name);
gras_os_time(), MSG_process_get_name(MSG_process_self()),
((double)msgSize)/(1024.0*1024.0),
MSG_host_get_name( MSG_host_self()), peer->peer_name);
if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK)
RAISE0(system_error,"Problem during the MSG_task_put()");
if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK)
RAISE0(system_error,"Problem during the MSG_task_put()");
- DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) END",
+ DEBUG5("%f:%s: gras_socket_meas_send(%f %s -> %s) END",
gras_os_time(), MSG_process_get_name(MSG_process_self()),
((double)msgSize)/(1024.0*1024.0),
MSG_host_get_name( MSG_host_self()), peer->peer_name);
gras_os_time(), MSG_process_get_name(MSG_process_self()),
((double)msgSize)/(1024.0*1024.0),
MSG_host_get_name( MSG_host_self()), peer->peer_name);
} else { /* we are receiver, simulate a select */
task=NULL;
} else { /* we are receiver, simulate a select */
task=NULL;
- DEBUG2("%f:%s: gras_socket_raw_recv() BEGIN\n",
+ DEBUG2("%f:%s: gras_socket_meas_recv() BEGIN\n",
gras_os_time(), MSG_process_get_name(MSG_process_self()));
do {
gras_os_time(), MSG_process_get_name(MSG_process_self()));
do {
- if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) {
- if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
+ if (MSG_task_Iprobe((m_channel_t) pd->measChan)) {
+ if (MSG_task_get(&task, (m_channel_t) pd->measChan) != MSG_OK) {
fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
return unknown_error;
}
fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
return unknown_error;
}
- DEBUG2("%f:%s: gras_socket_raw_recv() END\n",
+ DEBUG2("%f:%s: gras_socket_meas_recv() END\n",
gras_os_time(), MSG_process_get_name(MSG_process_self()));
break;
} else {
gras_os_time(), MSG_process_get_name(MSG_process_self()));
break;
} else {
typedef struct {
fd_set msg_socks;
typedef struct {
fd_set msg_socks;
} gras_trp_tcp_plug_data_t;
/***
} gras_trp_tcp_plug_data_t;
/***
gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
FD_ZERO(&(data->msg_socks));
gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
FD_ZERO(&(data->msg_socks));
- FD_ZERO(&(data->raw_socks));
+ FD_ZERO(&(data->meas_socks));
plug->socket_client = gras_trp_tcp_socket_client;
plug->socket_server = gras_trp_tcp_socket_server;
plug->socket_client = gras_trp_tcp_socket_client;
plug->socket_server = gras_trp_tcp_socket_server;
RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,sock_errstr);
}
RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,sock_errstr);
}
- if (sock->raw)
- FD_SET(sock->sd, &(tcp->raw_socks));
+ if (sock->meas)
+ FD_SET(sock->sd, &(tcp->meas_socks));
else
FD_SET(sock->sd, &(tcp->msg_socks));
else
FD_SET(sock->sd, &(tcp->msg_socks));
/* forget about the socket
... but not when using winsock since accept'ed socket can not fit
into the fd_set*/
/* forget about the socket
... but not when using winsock since accept'ed socket can not fit
into the fd_set*/
- if (sock->raw){
- FD_CLR(sock->sd, &(tcp->raw_socks));
+ if (sock->meas){
+ FD_CLR(sock->sd, &(tcp->meas_socks));
} else {
FD_CLR(sock->sd, &(tcp->msg_socks));
}
} else {
FD_CLR(sock->sd, &(tcp->msg_socks));
}
-/* Data exchange over raw sockets. Placing this in there is a kind of crude hack.
- It means that the only possible raw are TCP where we may want to do UDP for them.
+#if 0 /* KILLME */
+/* Data exchange over measurement sockets. Placing this in there is a kind of crude hack.
+ It means that the only possible measurement sockets are TCP where we may want to do UDP for them.
But I fail to find a good internal organization for now. We may want to split
But I fail to find a good internal organization for now. We may want to split
- raw and regular sockets more efficiently.
+ meas and regular sockets more efficiently.
-xbt_error_t gras_socket_raw_exchange(gras_socket_t peer,
+xbt_error_t gras_socket_meas_exchange(gras_socket_t peer,
int sender,
unsigned int timeout,
unsigned long int exp_size,
int sender,
unsigned int timeout,
unsigned long int exp_size,
free(chunk);
return no_error;
}
free(chunk);
return no_error;
}
#ifdef HAVE_WINSOCK_H
#define RETSTR( x ) case x: return #x
#ifdef HAVE_WINSOCK_H
#define RETSTR( x ) case x: return #x
int incoming :1; /* true if we can read from this sock */
int outgoing :1; /* true if we can write on this sock */
int accepting :1; /* true if master incoming sock in tcp */
int incoming :1; /* true if we can read from this sock */
int outgoing :1; /* true if we can write on this sock */
int accepting :1; /* true if master incoming sock in tcp */
- int raw :1; /* true if this is an experiment socket instead of messaging */
+ int meas :1; /* true if this is an experiment socket instead of messaging */
unsigned long int bufSize; /* what to say to the OS. field here to remember it when accepting */
unsigned long int bufSize; /* what to say to the OS. field here to remember it when accepting */
void gras_trp_buf_init_sock(gras_socket_t sock);
void gras_trp_buf_init_sock(gras_socket_t sock);
-/* Data exchange over raw sockets */
-xbt_error_t gras_socket_raw_exchange(gras_socket_t peer,
+/* Data exchange over measurement sockets */ /* FIXME: KILLME */
+xbt_error_t gras_socket_meas_exchange(gras_socket_t peer,
int sender,
unsigned int timeout,
unsigned long int expSize,
int sender,
unsigned int timeout,
unsigned long int expSize,
gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
gras_procdata_t *pd=xbt_new(gras_procdata_t,1);
gras_trp_procdata_t trp_pd;
gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
gras_procdata_t *pd=xbt_new(gras_procdata_t,1);
gras_trp_procdata_t trp_pd;
- gras_sg_portrec_t prraw,pr;
+ gras_sg_portrec_t prmeas,pr;
int i;
if (MSG_process_set_data(MSG_process_self(),(void*)pd) != MSG_OK)
int i;
if (MSG_process_set_data(MSG_process_self(),(void*)pd) != MSG_OK)
/* regiter it to the ports structure */
pr.port = -1;
pr.tochan = i;
/* regiter it to the ports structure */
pr.port = -1;
pr.tochan = i;
xbt_dynar_push(hd->ports,&pr);
xbt_dynar_push(hd->ports,&pr);
- /* take a free RAW channel for this process */
+ /* take a free meas channel for this process */
for (i=0; i<XBT_MAX_CHANNEL && hd->proc[i]; i++);
if (i == XBT_MAX_CHANNEL) {
RAISE2(system_error,
"GRAS: Can't add a new process on %s, because all channel are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS\n.",
MSG_host_get_name(MSG_host_self()),XBT_MAX_CHANNEL);
}
for (i=0; i<XBT_MAX_CHANNEL && hd->proc[i]; i++);
if (i == XBT_MAX_CHANNEL) {
RAISE2(system_error,
"GRAS: Can't add a new process on %s, because all channel are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS\n.",
MSG_host_get_name(MSG_host_self()),XBT_MAX_CHANNEL);
}
hd->proc[ i ] = MSG_process_self_PID();
/* register it to the ports structure */
hd->proc[ i ] = MSG_process_self_PID();
/* register it to the ports structure */
- prraw.port = -1;
- prraw.tochan = i;
- prraw.raw = 1;
- xbt_dynar_push(hd->ports,&prraw);
+ prmeas.port = -1;
+ prmeas.tochan = i;
+ prmeas.meas = 1;
+ xbt_dynar_push(hd->ports,&prmeas);
VERB2("Creating process '%s' (%d)",
MSG_process_get_name(MSG_process_self()),
VERB2("Creating process '%s' (%d)",
MSG_process_get_name(MSG_process_self()),
hd->proc[cpt] = 0;
xbt_dynar_foreach(hd->ports, cpt, pr) {
hd->proc[cpt] = 0;
xbt_dynar_foreach(hd->ports, cpt, pr) {
- if (pr.port == trp_pd->chan || pr.port == trp_pd->rawChan) {
+ if (pr.port == trp_pd->chan || pr.port == trp_pd->measChan) {
xbt_dynar_cursor_rm(hd->ports, &cpt);
}
}
xbt_dynar_cursor_rm(hd->ports, &cpt);
}
}
typedef struct {
int port; /* list of ports used by a server socket */
int tochan; /* the channel it points to */
typedef struct {
int port; /* list of ports used by a server socket */
int tochan; /* the channel it points to */
- int raw; /* (boolean) the channel is in raw mode or for messages */
+ int meas; /* (boolean) the channel is for measurements or for messages */
} gras_sg_portrec_t;
/* Data for each host */
} gras_sg_portrec_t;
/* Data for each host */