Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
739a2a3c080850c3f8f9ffd6bd1faa8b648a8dfd
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
1 /* file trp (transport) - send/receive a bunch of bytes in SG realm         */
2
3 /* Note that this is only used to debug other parts of GRAS since message   */
4 /*  exchange in SG realm is implemented directly without mimicing real life */
5 /*  This would be terribly unefficient.                                     */
6
7 /* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team.
8  * All rights reserved.                                                     */
9
10 /* This program is free software; you can redistribute it and/or modify it
11  * under the terms of the license (GNU LGPL) which comes with this package. */
12
13 #include "xbt/ex.h"
14
15 #include "gras/Msg/msg_private.h"
16 #include "gras/Transport/transport_private.h"
17 #include "gras/Virtu/virtu_sg.h"
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp,
20                                 "SimGrid pseudo-transport");
21
22 /***
23  *** Prototypes 
24  ***/
25
26 /* retrieve the port record associated to a numerical port on an host */
27 static gras_sg_portrec_t find_port(gras_hostdata_t * hd, int port);
28
29 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
30                                /* OUT */ gras_socket_t sock);
31 void gras_trp_sg_socket_server(gras_trp_plugin_t self,
32                                /* OUT */ gras_socket_t sock);
33 void gras_trp_sg_socket_close(gras_socket_t sd);
34
35 void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
36                                 const char *data, unsigned long int size);
37 void gras_trp_sg_chunk_send(gras_socket_t sd,
38                             const char *data,
39                             unsigned long int size, int stable_ignored);
40
41 int gras_trp_sg_chunk_recv(gras_socket_t sd,
42                            char *data, unsigned long int size);
43
44 /***
45  *** Specific plugin part
46  ***/
47 typedef struct {
48   int placeholder;              /* nothing plugin specific so far */
49 } gras_trp_sg_plug_data_t;
50
51
52 /***
53  *** Code
54  ***/
55 static gras_sg_portrec_t find_port(gras_hostdata_t * hd, int port)
56 {
57   unsigned int cpt;
58   gras_sg_portrec_t pr;
59
60   xbt_assert0(hd, "Please run gras_process_init on each process");
61
62   xbt_dynar_foreach(hd->ports, cpt, pr) {
63     if (pr->port == port)
64       return pr;
65   }
66   return NULL;
67 }
68
69
70 void gras_trp_sg_setup(gras_trp_plugin_t plug)
71 {
72
73   gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1);
74
75   plug->data = data;
76
77   plug->socket_client = gras_trp_sg_socket_client;
78   plug->socket_server = gras_trp_sg_socket_server;
79   plug->socket_close = gras_trp_sg_socket_close;
80
81   plug->raw_send = gras_trp_sg_chunk_send_raw;
82   plug->send = gras_trp_sg_chunk_send;
83   plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
84
85   plug->flush = NULL;           /* nothing cached */
86 }
87
88 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
89                                /* OUT */ gras_socket_t sock)
90 {
91
92   smx_host_t peer;
93   gras_hostdata_t *hd;
94   gras_trp_sg_sock_data_t data;
95   gras_sg_portrec_t pr;
96
97   /* make sure this socket will reach someone */
98   if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
99     THROW1(mismatch_error, 0,
100            "Can't connect to %s: no such host.\n", sock->peer_name);
101
102   if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer)))
103     THROW1(mismatch_error, 0,
104            "can't connect to %s: no process on this host",
105            sock->peer_name);
106
107   pr = find_port(hd, sock->peer_port);
108
109   if (pr == NULL) {
110     THROW2(mismatch_error, 0,
111            "can't connect to %s:%d, no process listen on this port",
112            sock->peer_name, sock->peer_port);
113   }
114
115   /* Ensure that the listener is expecting the kind of stuff we want to send */
116   if (pr->meas && !sock->meas) {
117     THROW2(mismatch_error, 0,
118            "can't connect to %s:%d in regular mode, the process listen "
119            "in measurement mode on this port", sock->peer_name,
120            sock->peer_port);
121   }
122   if (!pr->meas && sock->meas) {
123     THROW2(mismatch_error, 0,
124            "can't connect to %s:%d in measurement mode, the process listen "
125            "in regular mode on this port", sock->peer_name,
126            sock->peer_port);
127   }
128
129   /* create simulation data of the socket */
130   data = xbt_new(s_gras_trp_sg_sock_data_t, 1);
131   data->client = SIMIX_process_self();
132   data->server = pr->server;
133
134   /* initialize synchronization stuff on the socket */
135   data->rdv_server = pr->rdv;
136   data->rdv_client = SIMIX_rdv_create(NULL);
137   data->comm_recv = SIMIX_network_irecv(data->rdv_client, NULL, 0);
138
139   /* connect that simulation data to the socket */
140   sock->data = data;
141   sock->incoming = 1;
142
143   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
144          SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
145          sock->meas ? "meas" : "regular", sock->peer_name,
146          sock->peer_port);
147 }
148
149 void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
150 {
151
152   gras_hostdata_t *hd =
153       (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
154   gras_sg_portrec_t pr;
155   gras_trp_sg_sock_data_t data;
156
157   xbt_assert0(hd, "Please run gras_process_init on each process");
158
159   sock->accepting = 1;
160
161   /* Check whether a server is already listening on that port or not */
162   pr = find_port(hd, sock->port);
163
164   if (pr)
165     THROW2(mismatch_error, 0,
166            "can't listen on address %s:%d: port already in use.",
167            SIMIX_host_get_name(SIMIX_host_self()), sock->port);
168
169   /* This port is free, let's take it */
170   pr = xbt_new(s_gras_sg_portrec_t, 1);
171   pr->port = sock->port;
172   pr->meas = sock->meas;
173   pr->server = SIMIX_process_self();
174   pr->rdv = SIMIX_rdv_create(NULL);
175   xbt_dynar_push(hd->ports, &pr);
176
177   /* Create the socket */
178   data = xbt_new(s_gras_trp_sg_sock_data_t, 1);
179   data->server = SIMIX_process_self();
180   data->client = NULL;
181   data->rdv_server = pr->rdv;
182   data->rdv_client = NULL;
183   data->comm_recv = SIMIX_network_irecv(pr->rdv, NULL, 0);
184
185   sock->data = data;
186
187   VERB10
188       ("'%s' (%d) ears on %s:%d%s (%p; data:%p); Here rdv: %p; Remote rdv: %p; Comm %p",
189        SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
190        SIMIX_host_get_name(SIMIX_host_self()), sock->port,
191        sock->meas ? " (mode meas)" : "", sock, data,
192        (data->server ==
193         SIMIX_process_self())? data->rdv_server : data->rdv_client,
194        (data->server ==
195         SIMIX_process_self())? data->rdv_client : data->rdv_server,
196        data->comm_recv);
197
198 }
199
200 void gras_trp_sg_socket_close(gras_socket_t sock)
201 {
202   gras_hostdata_t *hd =
203       (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
204   unsigned int cpt;
205   gras_sg_portrec_t pr;
206
207   XBT_IN1(" (sock=%p)", sock);
208
209   if (!sock)
210     return;
211
212   xbt_assert0(hd, "Please run gras_process_init on each process");
213
214   if (sock->data) {
215     /* FIXME: kill the rdv point if receiver side */
216     free(sock->data);
217   }
218
219   if (sock->incoming && !sock->outgoing && sock->port >= 0) {
220     /* server mode socket. Unregister it from 'OS' tables */
221     xbt_dynar_foreach(hd->ports, cpt, pr) {
222       DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
223       if (pr->port == sock->port) {
224         xbt_dynar_cursor_rm(hd->ports, &cpt);
225         XBT_OUT;
226         return;
227       }
228     }
229     WARN2
230         ("socket_close called on the unknown incoming socket %p (port=%d)",
231          sock, sock->port);
232   }
233   XBT_OUT;
234 }
235
236 typedef struct {
237   int size;
238   void *data;
239 } sg_task_data_t;
240
241 void gras_trp_sg_chunk_send(gras_socket_t sock,
242                             const char *data,
243                             unsigned long int size, int stable_ignored)
244 {
245   gras_trp_sg_chunk_send_raw(sock, data, size);
246 }
247
248 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
249                                 const char *data, unsigned long int size)
250 {
251 #ifdef KILLME
252   char name[256];
253   static unsigned int count = 0;
254
255   smx_action_t act;             /* simix action */
256   gras_trp_procdata_t trp_remote_proc;
257   gras_msg_procdata_t msg_remote_proc;
258   gras_msg_t msg;               /* message to send */
259 #endif
260
261   gras_trp_sg_sock_data_t sock_data = (gras_trp_sg_sock_data_t) sock->data;
262   xbt_assert0(sock->meas,
263               "SG chunk exchange shouldn't be used on non-measurement sockets");
264
265
266   /* creates simix action and waits its ends, waits in the sender host
267      condition */
268   if (XBT_LOG_ISENABLED(gras_trp_sg, xbt_log_priority_debug)) {
269     smx_process_t remote_dude =
270         (sock_data->server ==
271          SIMIX_process_self())? (sock_data->client) : (sock_data->server);
272     smx_host_t remote_host = SIMIX_process_get_host(remote_dude);
273     DEBUG4("send chunk from %s to  %s:%d (size=%ld)",
274            SIMIX_host_get_name(SIMIX_host_self()),
275            SIMIX_host_get_name(remote_host), sock->peer_port, size);
276   }
277   //SIMIX_network_send(sock_data->rdv,size,1,-1,NULL,0,NULL,NULL);
278   THROW_UNIMPLEMENTED;
279 }
280
281 int gras_trp_sg_chunk_recv(gras_socket_t sock,
282                            char *data, unsigned long int size)
283 {
284   //gras_trp_sg_sock_data_t *sock_data =
285   //    (gras_trp_sg_sock_data_t *) sock->data;
286
287   //SIMIX_network_recv(sock_data->rdv,-1,NULL,0,NULL);
288   THROW_UNIMPLEMENTED;
289 #ifdef KILLME
290   gras_trp_sg_sock_data_t *remote_sock_data;
291   gras_socket_t remote_socket = NULL;
292   gras_msg_t msg_got;
293   gras_msg_procdata_t msg_procdata =
294       (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
295   gras_trp_procdata_t trp_proc =
296       (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
297
298   xbt_assert0(sock->meas,
299               "SG chunk exchange shouldn't be used on non-measurement sockets");
300   xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
301                         &remote_socket, 60);
302
303   if (remote_socket == NULL) {
304     THROW0(timeout_error, 0, "Timeout");
305   }
306
307   remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data;
308   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
309
310   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
311
312   /* ok, I'm here, you can continue the communication */
313   SIMIX_cond_signal(remote_sock_data->cond);
314
315   SIMIX_mutex_lock(remote_sock_data->mutex);
316   /* wait for communication end */
317   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
318
319   if (msg_got->payl_size != size)
320     THROW5(mismatch_error, 0,
321            "Got %d bytes when %ld where expected (in %s->%s:%d)",
322            msg_got->payl_size, size,
323            SIMIX_host_get_name(sock_data->to_host),
324            SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
325
326   if (data)
327     memcpy(data, msg_got->payl, size);
328
329   if (msg_got->payl)
330     xbt_free(msg_got->payl);
331
332   xbt_free(msg_got);
333   SIMIX_mutex_unlock(remote_sock_data->mutex);
334 #endif
335   return 0;
336 }