/***
*** Prototypes
***/
-void hexa_print(unsigned char *data, int size); /* in gras.c */
/* retrieve the port record associated to a numerical port on an host */
static void find_port(gras_hostdata_t *hd, int port, gras_sg_portrec_t *hpd);
/* OUT */ gras_socket_t sock);
void gras_trp_sg_socket_close(gras_socket_t sd);
+void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
+ const char *data,
+ unsigned long int size);
void gras_trp_sg_chunk_send(gras_socket_t sd,
const char *data,
- unsigned long int size);
+ unsigned long int size,
+ int stable_ignored);
-void gras_trp_sg_chunk_recv(gras_socket_t sd,
+int gras_trp_sg_chunk_recv(gras_socket_t sd,
char *data,
unsigned long int size);
plug->socket_server = gras_trp_sg_socket_server;
plug->socket_close = gras_trp_sg_socket_close;
- plug->chunk_send = gras_trp_sg_chunk_send;
- plug->chunk_recv = gras_trp_sg_chunk_recv;
+ plug->raw_send = gras_trp_sg_chunk_send_raw;
+ plug->send = gras_trp_sg_chunk_send;
+ plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
plug->flush = NULL; /* nothing cached */
}
gras_socket_t sock){
gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
- gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp");
+ gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
gras_sg_portrec_t pr;
gras_trp_sg_sock_data_t *data;
int found;
void gras_trp_sg_chunk_send(gras_socket_t sock,
const char *data,
- unsigned long int size) {
+ unsigned long int size,
+ int stable_ignored) {
+ gras_trp_sg_chunk_send_raw(sock,data,size);
+}
+
+void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
+ const char *data,
+ unsigned long int size) {
m_task_t task=NULL;
static unsigned int count=0;
char name[256];
gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data;
sg_task_data_t *task_data;
+ xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
+
sprintf(name,"Chunk[%d]",count++);
task_data=xbt_new(sg_task_data_t,1);
- task_data->data=(void*)xbt_malloc(size);
task_data->size = size;
- memcpy(task_data->data,data,size);
+ if (data) {
+ task_data->data=(void*)xbt_malloc(size);
+ memcpy(task_data->data,data,size);
+ } else {
+ task_data->data = NULL;
+ }
task=MSG_task_create(name,0,((double)size)/(1024.0*1024.0),task_data);
}
}
-void gras_trp_sg_chunk_recv(gras_socket_t sock,
+int gras_trp_sg_chunk_recv(gras_socket_t sock,
char *data,
unsigned long int size){
- gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp");
+ gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
m_task_t task=NULL;
sg_task_data_t *task_data;
gras_trp_sg_sock_data_t *sock_data = sock->data;
+ xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
XBT_IN;
DEBUG4("recv chunk on %s -> %s:%d (size=%ld)",
MSG_host_get_name(sock_data->to_host),
MSG_host_get_name(MSG_host_self()), sock_data->to_chan, size);
- if (MSG_task_get(&task, (sock->meas ? pd->measChan : pd->chan)) != MSG_OK)
+ if (MSG_task_get_with_time_out(&task,
+ (sock->meas ? pd->measChan : pd->chan),
+ 60) != MSG_OK)
THROW0(system_error,0,"Error in MSG_task_get()");
DEBUG1("Got chuck %s",MSG_task_get_name(task));
task_data = MSG_task_get_data(task);
- if (size != -1) {
- if (task_data->size != size)
- THROW5(mismatch_error,0,
- "Got %d bytes when %ld where expected (in %s->%s:%d)",
- task_data->size, size,
- MSG_host_get_name(sock_data->to_host),
- MSG_host_get_name(MSG_host_self()), sock_data->to_chan);
- memcpy(data,task_data->data,size);
- } else {
- /* damn, the size is embeeded at the begining of the chunk */
- int netsize;
-
- memcpy((char*)&netsize,task_data->data,4);
- DEBUG1("netsize embeeded = %d",netsize);
-
- memcpy(data,task_data->data,netsize+4);
- }
- free(task_data->data);
+ if (task_data->size != size)
+ THROW5(mismatch_error,0,
+ "Got %d bytes when %ld where expected (in %s->%s:%d)",
+ task_data->size, size,
+ MSG_host_get_name(sock_data->to_host),
+ MSG_host_get_name(MSG_host_self()), sock_data->to_chan);
+ if (data)
+ memcpy(data,task_data->data,size);
+ if (task_data->data)
+ free(task_data->data);
free(task_data);
if (MSG_task_destroy(task) != MSG_OK)
THROW0(system_error,0,"Error in MSG_task_destroy()");
XBT_OUT;
+ return size;
}