3 /* file trp (transport) - send/receive a bunch of bytes in SG realm */
5 /* Note that this is only used to debug other parts of GRAS since message */
6 /* exchange in SG realm is implemented directly without mimicing real life */
7 /* This would be terribly unefficient. */
9 /* Copyright (c) 2004 Martin Quinson. All rights reserved. */
11 /* This program is free software; you can redistribute it and/or modify it
12 * under the terms of the license (GNU LGPL) which comes with this package. */
16 #include "gras/Msg/msg_private.h"
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Virtu/virtu_sg.h"
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp,
21 "SimGrid pseudo-transport");
28 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
29 /* OUT */ gras_socket_t sock);
30 void gras_trp_sg_socket_server(gras_trp_plugin_t self,
31 /* OUT */ gras_socket_t sock);
32 void gras_trp_sg_socket_close(gras_socket_t sd);
34 void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
35 const char *data, unsigned long int size);
36 void gras_trp_sg_chunk_send(gras_socket_t sd,
38 unsigned long int size, int stable_ignored);
40 int gras_trp_sg_chunk_recv(gras_socket_t sd,
41 char *data, unsigned long int size);
44 *** Specific plugin part
47 xbt_dict_t sockets; /* all known sockets */
48 } s_gras_trp_sg_plug_data_t,*gras_trp_sg_plug_data_t;
55 static void gras_trp_sg_exit(gras_trp_plugin_t plug){
56 gras_trp_sg_plug_data_t mydata = (gras_trp_sg_plug_data_t) plug->data;
57 xbt_dict_free(&(mydata->sockets));
60 void gras_trp_sg_setup(gras_trp_plugin_t plug)
62 gras_trp_sg_plug_data_t data = xbt_new(s_gras_trp_sg_plug_data_t, 1);
65 data->sockets = xbt_dict_new();
67 plug->exit = gras_trp_sg_exit;
69 plug->socket_client = gras_trp_sg_socket_client;
70 plug->socket_server = gras_trp_sg_socket_server;
71 plug->socket_close = gras_trp_sg_socket_close;
73 plug->raw_send = gras_trp_sg_chunk_send_raw;
74 plug->send = gras_trp_sg_chunk_send;
75 plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
77 plug->flush = NULL; /* nothing cached */
80 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
81 /* OUT */ gras_socket_t sock) {
84 gras_trp_sg_sock_data_t *data;
85 gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
88 if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
89 THROW1(mismatch_error, 0,
90 "Can't connect to %s: no such host", sock->peer_name);
92 /* make sure this socket will reach someone */
93 xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
94 char *sock_name=bprintf("%s:%d",sock->peer_name,sock->peer_port);
95 gras_socket_t server = xbt_dict_get_or_null(all_sockets,sock_name);
99 THROW2(mismatch_error, 0,
100 "can't connect to %s:%d, no process listen on this port",
101 sock->peer_name, sock->peer_port);
103 if (server->meas && !sock->meas) {
104 THROW2(mismatch_error, 0,
105 "can't connect to %s:%d in regular mode, the process listen "
106 "in measurement mode on this port", sock->peer_name,
109 if (!server->meas && sock->meas) {
110 THROW2(mismatch_error, 0,
111 "can't connect to %s:%d in measurement mode, the process listen "
112 "in regular mode on this port", sock->peer_name, sock->peer_port);
114 /* create the socket */
115 data = xbt_new0(gras_trp_sg_sock_data_t, 1);
116 data->rdv = ((gras_trp_sg_sock_data_t *)server->data)->rdv;
117 data->from_process = SIMIX_process_self();
122 /* Create a smx comm object about this socket */
123 data->ongoing_msg_size = sizeof(s_gras_msg_t);
124 smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
125 xbt_dynar_push(pd->comms,&comm);
127 DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
128 SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
129 sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port);
132 void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) {
133 gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
134 xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
136 /* Make sure that this socket was not opened so far */
137 char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
138 gras_socket_t old = xbt_dict_get_or_null(all_sockets,sock_name);
140 THROW1(mismatch_error, 0,
141 "can't listen on address %s: port already in use.",
145 /* Create the data associated to the socket */
146 gras_trp_sg_sock_data_t *data = xbt_new0(gras_trp_sg_sock_data_t, 1);
147 data->rdv = SIMIX_rdv_create(sock_name);
148 data->from_process = SIMIX_process_self();
149 SIMIX_rdv_set_data(data->rdv,sock);
156 /* Register the socket to the set of sockets known simulation-wide */
157 xbt_dict_set(all_sockets,sock_name,sock,NULL); /* FIXME: add a function to raise a warning at simulation end for non-closed sockets */
159 /* Create a smx comm object about this socket */
160 data->ongoing_msg_size = sizeof(s_gras_msg_t);
161 smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
162 INFO2("irecv comm %p onto %p",comm,&(data->ongoing_msg));
163 xbt_dynar_push(pd->comms,&comm);
165 VERB6("'%s' (%d) ears on %s:%d%s (%p)",
166 SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
167 gras_os_myname(), sock->port, sock->meas ? " (mode meas)" : "", sock);
171 void gras_trp_sg_socket_close(gras_socket_t sock) {
172 gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
173 if (sock->is_master) {
174 /* server mode socket. Unregister it from 'OS' tables */
175 char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
177 xbt_dict_t sockets = ((gras_trp_sg_plug_data_t)sock->plugin->data)->sockets;
178 gras_socket_t old = xbt_dict_get_or_null(sockets,sock_name);
180 WARN2("socket_close called on the unknown server socket %p (port=%d)",
182 xbt_dict_remove(sockets,sock_name);
188 SIMIX_rdv_destroy(((gras_trp_sg_sock_data_t *) sock->data)->rdv);
192 xbt_dynar_push(pd->sockets_to_close,&sock);
193 gras_msg_listener_awake();
203 void gras_trp_sg_chunk_send(gras_socket_t sock,
205 unsigned long int size, int stable_ignored)
207 gras_trp_sg_chunk_send_raw(sock, data, size);
210 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
211 const char *data, unsigned long int size)
214 static unsigned int count = 0;
216 gras_trp_sg_sock_data_t *sock_data;
217 gras_msg_t msg; /* message to send */
219 sock_data = (gras_trp_sg_sock_data_t *) sock->data;
221 xbt_assert0(sock->meas,
222 "SG chunk exchange shouldn't be used on non-measurement sockets");
224 sprintf(name, "Chunk[%d]", count++);
225 /*initialize gras message */
226 msg = xbt_new(s_gras_msg_t, 1);
228 msg->payl_size = size;
231 msg->payl = (void *) xbt_malloc(size);
232 memcpy(msg->payl, data, size);
236 SIMIX_network_send(sock_data->rdv,size,-1.,-1.,&msg,sizeof(msg),(smx_comm_t*)&(msg->comm),msg);
239 int gras_trp_sg_chunk_recv(gras_socket_t sock,
240 char *data, unsigned long int size)
242 gras_trp_sg_sock_data_t *sock_data;
246 xbt_assert0(sock->meas,
247 "SG chunk exchange shouldn't be used on non-measurement sockets");
249 sock_data = (gras_trp_sg_sock_data_t *) sock->data;
251 SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm);
253 if (msg_got->payl_size != size)
254 THROW5(mismatch_error, 0,
255 "Got %d bytes when %ld where expected (in %s->%s:%d)",
256 msg_got->payl_size, size,
257 SIMIX_host_get_name(sock_data->to_host),
258 SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
261 memcpy(data, msg_got->payl, size);
264 xbt_free(msg_got->payl);