+ bufsize -= status;
+ got += status;
+ } else {
+ THROWF(system_error, errno,
+ "Socket closed by remote side (got %d bytes before this)",
+ got);
+ }
+ }
+
+ return got;
+}
+
+static int gras_trp_tcp_recv(gras_socket_t sock,
+ char *data, unsigned long int size)
+{
+ return gras_trp_tcp_recv_withbuffer(sock, data, size, size);
+
+}
+
+/*******************************************/
+/****[ end of UNBUFFERED DATA EXCHANGE ]****/
+/*******************************************/
+
+/**********************************/
+/****[ BUFFERED DATA EXCHANGE ]****/
+/**********************************/
+
+/* Make sure the data is sent */
+static void gras_trp_bufiov_flush(gras_socket_t sock)
+{
+#ifdef HAVE_READV
+ xbt_dynar_t vect;
+ int size;
+#endif
+ gras_trp_bufdata_t *data = sock->bufdata;
+ XBT_IN("");
+
+ XBT_DEBUG("Flush");
+ if (data->out == buffering_buf) {
+ if (XBT_LOG_ISENABLED(gras_trp_tcp, xbt_log_priority_debug))
+ hexa_print("chunk to send ",
+ (unsigned char *) data->out_buf.data, data->out_buf.size);
+ if ((data->out_buf.size - data->out_buf.pos) != 0) {
+ XBT_DEBUG("Send the chunk (size=%d) to %s:%d", data->out_buf.size,
+ gras_socket_peer_name(sock), gras_socket_peer_port(sock));
+ gras_trp_tcp_send(sock, data->out_buf.data, data->out_buf.size);
+ XBT_VERB("Chunk sent (size=%d)", data->out_buf.size);
+ data->out_buf.size = 0;
+ }
+ }
+#ifdef HAVE_READV
+ if (data->out == buffering_iov) {
+ XBT_DEBUG("Flush out iov");
+ vect = sock->bufdata->out_buf_v;
+ if ((size = xbt_dynar_length(vect))) {
+ XBT_DEBUG("Flush %d chunks out of this socket", size);
+ writev(sock->sd, xbt_dynar_get_ptr(vect, 0), size);
+ xbt_dynar_reset(vect);
+ }
+ data->out_buf.size = 0; /* reset the buffer containing non-stable data */
+ }
+
+ if (data->in == buffering_iov) {
+ XBT_DEBUG("Flush in iov");
+ vect = sock->bufdata->in_buf_v;
+ if ((size = xbt_dynar_length(vect))) {
+ XBT_DEBUG("Get %d chunks from of this socket", size);
+ readv(sock->sd, xbt_dynar_get_ptr(vect, 0), size);
+ xbt_dynar_reset(vect);
+ }
+ }
+#endif
+}
+
+static void
+gras_trp_buf_send(gras_socket_t sock,
+ const char *chunk,
+ unsigned long int size, int stable_ignored)
+{
+
+ gras_trp_bufdata_t *data = (gras_trp_bufdata_t *) sock->bufdata;
+ int chunk_pos = 0;
+
+ XBT_IN("");
+
+ while (chunk_pos < size) {
+ /* size of the chunk to receive in that shot */
+ long int thissize =
+ min(size - chunk_pos, data->buffsize - data->out_buf.size);
+ XBT_DEBUG("Set the chars %d..%ld into the buffer; size=%ld, ctn=(%s)",
+ (int) data->out_buf.size,
+ ((int) data->out_buf.size) + thissize - 1, size,
+ hexa_str((unsigned char *) chunk, thissize, 0));
+
+ memcpy(data->out_buf.data + data->out_buf.size, chunk + chunk_pos,
+ thissize);
+
+ data->out_buf.size += thissize;
+ chunk_pos += thissize;
+ XBT_DEBUG("New pos = %d; Still to send = %ld of %ld; ctn sofar=(%s)",
+ data->out_buf.size, size - chunk_pos, size,
+ hexa_str((unsigned char *) chunk, chunk_pos, 0));
+
+ if (data->out_buf.size == data->buffsize) /* out of space. Flush it */
+ gras_trp_bufiov_flush(sock);
+ }
+
+ XBT_OUT();
+}
+
+static int
+gras_trp_buf_recv(gras_socket_t sock, char *chunk, unsigned long int size)
+{
+
+ gras_trp_bufdata_t *data = sock->bufdata;
+ long int chunk_pos = 0;
+
+ XBT_IN("");
+
+ while (chunk_pos < size) {
+ /* size of the chunk to receive in that shot */
+ long int thissize;
+
+ if (data->in_buf.size == data->in_buf.pos) { /* out of data. Get more */
+
+ XBT_DEBUG("Get more data (size=%d,bufsize=%d)",
+ (int) MIN(size - chunk_pos, data->buffsize),
+ (int) data->buffsize);
+
+
+ data->in_buf.size =
+ gras_trp_tcp_recv_withbuffer(sock, data->in_buf.data,
+ MIN(size - chunk_pos,
+ data->buffsize),
+ data->buffsize);
+
+ data->in_buf.pos = 0;
+ }
+
+ thissize = min(size - chunk_pos, data->in_buf.size - data->in_buf.pos);
+ memcpy(chunk + chunk_pos, data->in_buf.data + data->in_buf.pos,
+ thissize);
+
+ data->in_buf.pos += thissize;
+ chunk_pos += thissize;
+ XBT_DEBUG("New pos = %d; Still to receive = %ld of %ld. Ctn so far=(%s)",
+ data->in_buf.pos, size - chunk_pos, size,
+ hexa_str((unsigned char *) chunk, chunk_pos, 0));
+ }
+ /* indicate on need to the gras_select function that there is more to read on this socket so that it does not actually select */
+ sock->moredata = (data->in_buf.size > data->in_buf.pos);
+ XBT_DEBUG("There is %smore data", (sock->moredata ? "" : "no "));
+
+ XBT_OUT();
+ return chunk_pos;
+}
+
+/*****************************************/
+/****[ end of BUFFERED DATA EXCHANGE ]****/
+/*****************************************/
+
+/********************************/
+/****[ VECTOR DATA EXCHANGE ]****/
+/********************************/
+#ifdef HAVE_READV
+static void
+gras_trp_iov_send(gras_socket_t sock,
+ const char *chunk, unsigned long int size, int stable)
+{
+ struct iovec elm;
+ gras_trp_bufdata_t *data = (gras_trp_bufdata_t *) sock->bufdata;
+
+
+ XBT_DEBUG("Buffer one chunk to be sent later (%s)",
+ hexa_str((char *) chunk, size, 0));
+
+ elm.iov_len = (size_t) size;
+
+ if (!stable) {
+ /* data storage won't last until flush. Save it in a buffer if we can */
+
+ if (size > data->buffsize - data->out_buf.size) {
+ /* buffer too small:
+ flush the socket, using data in its actual storage */
+ elm.iov_base = (void *) chunk;
+ xbt_dynar_push(data->out_buf_v, &elm);
+
+ gras_trp_bufiov_flush(sock);
+ return;