Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
a80c33118bdafca1ef27ddc42206a3870541fd11
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
1 /* $Id$ */
2
3 /* file trp (transport) - send/receive a bunch of bytes in SG realm         */
4
5 /* Note that this is only used to debug other parts of GRAS since message   */
6 /*  exchange in SG realm is implemented directly without mimicing real life */
7 /*  This would be terribly unefficient.                                     */
8
9 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
10
11 /* This program is free software; you can redistribute it and/or modify it
12  * under the terms of the license (GNU LGPL) which comes with this package. */
13
14 #include "xbt/ex.h"
15
16 #include "gras/Msg/msg_private.h"
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Virtu/virtu_sg.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp,
21                                 "SimGrid pseudo-transport");
22
23 /***
24  *** Prototypes 
25  ***/
26
27
28 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
29                                /* OUT */ gras_socket_t sock);
30 void gras_trp_sg_socket_server(gras_trp_plugin_t self,
31                                /* OUT */ gras_socket_t sock);
32 void gras_trp_sg_socket_close(gras_socket_t sd);
33
34 void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
35                                 const char *data, unsigned long int size);
36 void gras_trp_sg_chunk_send(gras_socket_t sd,
37                             const char *data,
38                             unsigned long int size, int stable_ignored);
39
40 int gras_trp_sg_chunk_recv(gras_socket_t sd,
41                            char *data, unsigned long int size);
42
43 /***
44  *** Specific plugin part
45  ***/
46 typedef struct {
47   xbt_dict_t sockets; /* all known sockets */
48 } s_gras_trp_sg_plug_data_t,*gras_trp_sg_plug_data_t;
49
50
51 /***
52  *** Code
53  ***/
54
55 static void gras_trp_sg_exit(gras_trp_plugin_t plug){
56   gras_trp_sg_plug_data_t mydata = (gras_trp_sg_plug_data_t) plug->data;
57   xbt_dict_free(&(mydata->sockets));
58   xbt_free(plug->data);
59 }
60 void gras_trp_sg_setup(gras_trp_plugin_t plug)
61 {
62   gras_trp_sg_plug_data_t data = xbt_new(s_gras_trp_sg_plug_data_t, 1);
63
64   plug->data = data;
65   data->sockets = xbt_dict_new();
66
67   plug->exit = gras_trp_sg_exit;
68
69   plug->socket_client = gras_trp_sg_socket_client;
70   plug->socket_server = gras_trp_sg_socket_server;
71   plug->socket_close = gras_trp_sg_socket_close;
72
73   plug->raw_send = gras_trp_sg_chunk_send_raw;
74   plug->send = gras_trp_sg_chunk_send;
75   plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
76
77   plug->flush = NULL;           /* nothing cached */
78 }
79
80 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
81                                /* OUT */ gras_socket_t sock) {
82
83   smx_host_t peer;
84   gras_trp_sg_sock_data_t *data;
85   gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
86
87
88   if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
89     THROW1(mismatch_error, 0,
90            "Can't connect to %s: no such host", sock->peer_name);
91
92   /* make sure this socket will reach someone */
93   xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
94   char *sock_name=bprintf("%s:%d",sock->peer_name,sock->peer_port);
95   gras_socket_t server = xbt_dict_get_or_null(all_sockets,sock_name);
96   free(sock_name);
97
98   if (!server)
99     THROW2(mismatch_error, 0,
100            "can't connect to %s:%d, no process listen on this port",
101            sock->peer_name, sock->peer_port);
102
103   if (server->meas && !sock->meas) {
104     THROW2(mismatch_error, 0,
105            "can't connect to %s:%d in regular mode, the process listen "
106            "in measurement mode on this port", sock->peer_name,
107            sock->peer_port);
108   }
109   if (!server->meas && sock->meas) {
110     THROW2(mismatch_error, 0,
111            "can't connect to %s:%d in measurement mode, the process listen "
112            "in regular mode on this port", sock->peer_name, sock->peer_port);
113   }
114   /* create the socket */
115   data = xbt_new0(gras_trp_sg_sock_data_t, 1);
116   data->rdv = ((gras_trp_sg_sock_data_t *)server->data)->rdv;
117   data->from_process = SIMIX_process_self();
118
119   sock->data = data;
120   sock->incoming = 1;
121
122   /* Create a smx comm object about this socket */
123   data->ongoing_msg_size = sizeof(s_gras_msg_t);
124   smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
125   xbt_dynar_push(pd->comms,&comm);
126
127   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
128          SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
129          sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port);
130 }
131
132 void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) {
133   gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
134   xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
135
136   /* Make sure that this socket was not opened so far */
137   char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
138   gras_socket_t old = xbt_dict_get_or_null(all_sockets,sock_name);
139   if (old)
140     THROW1(mismatch_error, 0,
141            "can't listen on address %s: port already in use.",
142            sock_name);
143
144
145   /* Create the data associated to the socket */
146   gras_trp_sg_sock_data_t *data = xbt_new0(gras_trp_sg_sock_data_t, 1);
147   data->rdv = SIMIX_rdv_create(sock_name);
148   data->from_process = SIMIX_process_self();
149   SIMIX_rdv_set_data(data->rdv,sock);
150
151   sock->is_master = 1;
152   sock->incoming = 1;
153
154   sock->data = data;
155
156   /* Register the socket to the set of sockets known simulation-wide */
157   xbt_dict_set(all_sockets,sock_name,sock,NULL); /* FIXME: add a function to raise a warning at simulation end for non-closed sockets */
158
159   /* Create a smx comm object about this socket */
160   data->ongoing_msg_size = sizeof(s_gras_msg_t);
161   smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
162   INFO2("irecv comm %p onto %p",comm,&(data->ongoing_msg));
163   xbt_dynar_push(pd->comms,&comm);
164
165   VERB6("'%s' (%d) ears on %s:%d%s (%p)",
166         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
167         gras_os_myname(), sock->port, sock->meas ? " (mode meas)" : "", sock);
168   free(sock_name);
169 }
170
171 void gras_trp_sg_socket_close(gras_socket_t sock) {
172   gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
173   if (sock->is_master) {
174     /* server mode socket. Unregister it from 'OS' tables */
175     char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
176
177     xbt_dict_t sockets = ((gras_trp_sg_plug_data_t)sock->plugin->data)->sockets;
178     gras_socket_t old = xbt_dict_get_or_null(sockets,sock_name);
179     if (!old)
180       WARN2("socket_close called on the unknown server socket %p (port=%d)",
181             sock, sock->port);
182     xbt_dict_remove(sockets,sock_name);
183     free(sock_name);
184
185   }
186
187   if (sock->data) {
188     SIMIX_rdv_destroy(((gras_trp_sg_sock_data_t *) sock->data)->rdv);
189     free(sock->data);
190   }
191
192   xbt_dynar_push(pd->sockets_to_close,&sock);
193   gras_msg_listener_awake();
194
195   XBT_OUT;
196 }
197
198 typedef struct {
199   int size;
200   void *data;
201 } sg_task_data_t;
202
203 void gras_trp_sg_chunk_send(gras_socket_t sock,
204                             const char *data,
205                             unsigned long int size, int stable_ignored)
206 {
207   gras_trp_sg_chunk_send_raw(sock, data, size);
208 }
209
210 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
211                                 const char *data, unsigned long int size)
212 {
213   char name[256];
214   static unsigned int count = 0;
215
216   gras_trp_sg_sock_data_t *sock_data;
217   gras_msg_t msg;               /* message to send */
218
219   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
220
221   xbt_assert0(sock->meas,
222               "SG chunk exchange shouldn't be used on non-measurement sockets");
223
224   sprintf(name, "Chunk[%d]", count++);
225   /*initialize gras message */
226   msg = xbt_new(s_gras_msg_t, 1);
227   msg->expe = sock;
228   msg->payl_size = size;
229
230   if (data) {
231     msg->payl = (void *) xbt_malloc(size);
232     memcpy(msg->payl, data, size);
233   } else {
234     msg->payl = NULL;
235   }
236   SIMIX_network_send(sock_data->rdv,size,-1.,-1.,&msg,sizeof(msg),(smx_comm_t*)&(msg->comm),msg);
237 }
238
239 int gras_trp_sg_chunk_recv(gras_socket_t sock,
240                            char *data, unsigned long int size)
241 {
242   gras_trp_sg_sock_data_t *sock_data;
243   gras_msg_t msg_got;
244   smx_comm_t comm;
245
246   xbt_assert0(sock->meas,
247               "SG chunk exchange shouldn't be used on non-measurement sockets");
248
249   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
250
251   SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm);
252
253   if (msg_got->payl_size != size)
254     THROW5(mismatch_error, 0,
255            "Got %d bytes when %ld where expected (in %s->%s:%d)",
256            msg_got->payl_size, size,
257            SIMIX_host_get_name(sock_data->to_host),
258            SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
259
260   if (data)
261     memcpy(data, msg_got->payl, size);
262
263   if (msg_got->payl)
264     xbt_free(msg_got->payl);
265
266   xbt_free(msg_got);
267   return 0;
268 }