-/* Data exchange over raw sockets */
-gras_error_t gras_socket_raw_exchange(gras_socket_t peer,
- int sender,
- unsigned int timeout,
- unsigned long int expSize,
- unsigned long int msgSize) {
- unsigned int bytesTotal;
- static unsigned int count=0;
- m_task_t task=NULL;
- char name[256];
- gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)peer->data;
-
- gras_procdata_t *pd=gras_procdata_get();
- double startTime;
-
- startTime=gras_os_time(); /* used only in sender mode */
-
- for(bytesTotal = 0; bytesTotal < expSize; bytesTotal += msgSize) {
-
- if (sender) {
-
- sprintf(name,"Raw data[%d]",count++);
-
- task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
-
- DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) BEGIN",
- gras_os_time(), MSG_process_get_name(MSG_process_self()),
- ((double)msgSize)/(1024.0*1024.0),
- MSG_host_get_name( MSG_host_self()), peer->peer_name);
-
- if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK)
- RAISE0(system_error,"Problem during the MSG_task_put()");
-
- DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) END",
- gras_os_time(), MSG_process_get_name(MSG_process_self()),
- ((double)msgSize)/(1024.0*1024.0),
- MSG_host_get_name( MSG_host_self()), peer->peer_name);
-
- } else { /* we are receiver, simulate a select */
-
- task=NULL;
- DEBUG2("%f:%s: gras_socket_raw_recv() BEGIN\n",
- gras_os_time(), MSG_process_get_name(MSG_process_self()));
- do {
- if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) {
- if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
- fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
- return unknown_error;
- }
-
- if (MSG_task_destroy(task) != MSG_OK) {
- fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
- return unknown_error;
- }
-
- DEBUG2("%f:%s: gras_socket_raw_recv() END\n",
- gras_os_time(), MSG_process_get_name(MSG_process_self()));
- break;
- } else {
- MSG_process_sleep(0.0001);
- }
-
- } while (gras_os_time() - startTime < timeout);
-
- if (gras_os_time() - startTime > timeout)
- return timeout_error;
- } /* receiver part */
- } /* foreach msg */
-
- return no_error;
+ remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data;
+ msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
+
+ sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+
+ /* ok, I'm here, you can continue the communication */
+ SIMIX_cond_signal(remote_sock_data->cond);
+
+ SIMIX_mutex_lock(remote_sock_data->mutex);
+ /* wait for communication end */
+ SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
+
+ if (msg_got->payl_size != size)
+ THROW5(mismatch_error, 0,
+ "Got %d bytes when %ld where expected (in %s->%s:%d)",
+ msg_got->payl_size, size,
+ SIMIX_host_get_name(sock_data->to_host),
+ SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
+
+ if (data)
+ memcpy(data, msg_got->payl, size);
+
+ if (msg_got->payl)
+ xbt_free(msg_got->payl);
+
+ xbt_free(msg_got);
+ SIMIX_mutex_unlock(remote_sock_data->mutex);
+#endif
+ return 0;