Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix an external ref
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
1 /* file trp (transport) - send/receive a bunch of bytes in SG realm         */
2
3 /* Note that this is only used to debug other parts of GRAS since message   */
4 /*  exchange in SG realm is implemented directly without mimicing real life */
5 /*  This would be terribly unefficient.                                     */
6
7 /* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team.
8  * All rights reserved.                                                     */
9
10 /* This program is free software; you can redistribute it and/or modify it
11  * under the terms of the license (GNU LGPL) which comes with this package. */
12
13 #include "xbt/ex.h"
14
15 #include "gras/Msg/msg_private.h"
16 #include "gras/Transport/transport_private.h"
17 #include "gras/Virtu/virtu_sg.h"
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp,
20                                 "SimGrid pseudo-transport");
21
22 /***
23  *** Prototypes 
24  ***/
25
26 /* retrieve the port record associated to a numerical port on an host */
27 static gras_sg_portrec_t find_port(gras_hostdata_t * hd, int port);
28
29 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
30                                /* OUT */ gras_socket_t sock);
31 void gras_trp_sg_socket_server(gras_trp_plugin_t self,
32                                /* OUT */ gras_socket_t sock);
33 void gras_trp_sg_socket_close(gras_socket_t sd);
34
35 void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
36                                 const char *data, unsigned long int size);
37 void gras_trp_sg_chunk_send(gras_socket_t sd,
38                             const char *data,
39                             unsigned long int size, int stable_ignored);
40
41 int gras_trp_sg_chunk_recv(gras_socket_t sd,
42                            char *data, unsigned long int size);
43
44 /***
45  *** Specific plugin part
46  ***/
47 typedef struct {
48   int placeholder;              /* nothing plugin specific so far */
49 } gras_trp_sg_plug_data_t;
50
51
52 /***
53  *** Code
54  ***/
55 static gras_sg_portrec_t find_port(gras_hostdata_t * hd, int port)
56 {
57   unsigned int cpt;
58   gras_sg_portrec_t pr;
59
60   xbt_assert0(hd, "Please run gras_process_init on each process");
61
62   xbt_dynar_foreach(hd->ports, cpt, pr) {
63     if (pr->port == port)
64       return pr;
65   }
66   return NULL;
67 }
68
69
70 void gras_trp_sg_setup(gras_trp_plugin_t plug)
71 {
72
73   gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1);
74
75   plug->data = data;
76
77   plug->socket_client = gras_trp_sg_socket_client;
78   plug->socket_server = gras_trp_sg_socket_server;
79   plug->socket_close = gras_trp_sg_socket_close;
80
81   plug->raw_send = gras_trp_sg_chunk_send_raw;
82   plug->send = gras_trp_sg_chunk_send;
83   plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
84
85   plug->flush = NULL;           /* nothing cached */
86 }
87
88 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
89                                /* OUT */ gras_socket_t sock) {
90
91   smx_host_t peer;
92   gras_hostdata_t *hd;
93   gras_trp_sg_sock_data_t data;
94   gras_sg_portrec_t pr;
95
96   /* make sure this socket will reach someone */
97   if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
98     THROW1(mismatch_error, 0,
99            "Can't connect to %s: no such host.\n", sock->peer_name);
100
101   if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer)))
102     THROW1(mismatch_error, 0,
103            "can't connect to %s: no process on this host", sock->peer_name);
104
105   pr = find_port(hd, sock->peer_port);
106
107   if (pr == NULL) {
108     THROW2(mismatch_error, 0,
109         "can't connect to %s:%d, no process listen on this port",
110         sock->peer_name, sock->peer_port);
111   }
112
113   /* Ensure that the listener is expecting the kind of stuff we want to send */
114   if (pr->meas && !sock->meas) {
115     THROW2(mismatch_error, 0,
116            "can't connect to %s:%d in regular mode, the process listen "
117            "in measurement mode on this port", sock->peer_name,
118            sock->peer_port);
119   }
120   if (!pr->meas && sock->meas) {
121     THROW2(mismatch_error, 0,
122            "can't connect to %s:%d in measurement mode, the process listen "
123            "in regular mode on this port", sock->peer_name, sock->peer_port);
124   }
125
126   /* create simulation data of the socket */
127   data = xbt_new(s_gras_trp_sg_sock_data_t, 1);
128   data->client = SIMIX_process_self();
129   data->server = pr->server;
130
131   /* initialize synchronization stuff on the socket */
132   data->rdv_server = pr->rdv;
133   data->rdv_client = SIMIX_rdv_create(NULL);
134   data->comm_recv = SIMIX_network_irecv(data->rdv_client,NULL,0);
135
136   /* connect that simulation data to the socket */
137   sock->data = data;
138   sock->incoming = 1;
139
140   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
141          SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
142          sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port);
143 }
144
145 void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
146 {
147
148   gras_hostdata_t *hd =
149     (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
150   gras_sg_portrec_t pr;
151   gras_trp_sg_sock_data_t data;
152
153   xbt_assert0(hd, "Please run gras_process_init on each process");
154
155   sock->accepting = 1;
156
157   /* Check whether a server is already listening on that port or not */
158   pr = find_port(hd, sock->port);
159
160   if (pr)
161     THROW2(mismatch_error, 0,
162            "can't listen on address %s:%d: port already in use.",
163            SIMIX_host_get_name(SIMIX_host_self()), sock->port);
164
165   /* This port is free, let's take it */
166   pr = xbt_new(s_gras_sg_portrec_t,1);
167   pr->port = sock->port;
168   pr->meas = sock->meas;
169   pr->server = SIMIX_process_self();
170   pr->rdv = SIMIX_rdv_create(NULL);
171   xbt_dynar_push(hd->ports, &pr);
172
173   /* Create the socket */
174   data = xbt_new(s_gras_trp_sg_sock_data_t, 1);
175   data->server = SIMIX_process_self();
176   data->client = NULL;
177   data->rdv_server = pr->rdv;
178   data->rdv_client = NULL;
179   data->comm_recv = SIMIX_network_irecv(pr->rdv,NULL,0);
180
181   sock->data = data;
182
183   VERB10("'%s' (%d) ears on %s:%d%s (%p; data:%p); Here rdv: %p; Remote rdv: %p; Comm %p",
184         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
185         SIMIX_host_get_name(SIMIX_host_self()), sock->port,
186         sock->meas ? " (mode meas)" : "", sock,data,
187         (data->server==SIMIX_process_self())?data->rdv_server:data->rdv_client,
188         (data->server==SIMIX_process_self())?data->rdv_client:data->rdv_server,
189         data->comm_recv);
190
191 }
192
193 void gras_trp_sg_socket_close(gras_socket_t sock)
194 {
195   gras_hostdata_t *hd =
196     (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
197   unsigned int cpt;
198   gras_sg_portrec_t pr;
199
200   XBT_IN1(" (sock=%p)", sock);
201
202   if (!sock)
203     return;
204
205   xbt_assert0(hd, "Please run gras_process_init on each process");
206
207   if (sock->data) {
208     /* FIXME: kill the rdv point if receiver side */
209     free(sock->data);
210   }
211
212   if (sock->incoming && !sock->outgoing && sock->port >= 0) {
213     /* server mode socket. Unregister it from 'OS' tables */
214     xbt_dynar_foreach(hd->ports, cpt, pr) {
215       DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
216       if (pr->port == sock->port) {
217         xbt_dynar_cursor_rm(hd->ports, &cpt);
218         XBT_OUT;
219         return;
220       }
221     }
222     WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
223           sock, sock->port);
224   }
225   XBT_OUT;
226 }
227
228 typedef struct {
229   int size;
230   void *data;
231 } sg_task_data_t;
232
233 void gras_trp_sg_chunk_send(gras_socket_t sock,
234                             const char *data,
235                             unsigned long int size, int stable_ignored)
236 {
237   gras_trp_sg_chunk_send_raw(sock, data, size);
238 }
239
240 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
241                                 const char *data, unsigned long int size)
242 {
243 #ifdef KILLME
244   char name[256];
245   static unsigned int count = 0;
246
247   smx_action_t act;             /* simix action */
248   gras_trp_procdata_t trp_remote_proc;
249   gras_msg_procdata_t msg_remote_proc;
250   gras_msg_t msg;               /* message to send */
251 #endif
252
253   gras_trp_sg_sock_data_t sock_data = (gras_trp_sg_sock_data_t) sock->data;
254   xbt_assert0(sock->meas,
255               "SG chunk exchange shouldn't be used on non-measurement sockets");
256
257
258   /* creates simix action and waits its ends, waits in the sender host
259      condition */
260   if (XBT_LOG_ISENABLED(gras_trp_sg,xbt_log_priority_debug)) {
261     smx_process_t remote_dude = (sock_data->server==SIMIX_process_self())?(sock_data->client):(sock_data->server);
262     smx_host_t remote_host = SIMIX_process_get_host(remote_dude);
263     DEBUG4("send chunk from %s to  %s:%d (size=%ld)",
264         SIMIX_host_get_name(SIMIX_host_self()),
265         SIMIX_host_get_name(remote_host),
266         sock->peer_port, size);
267   }
268   //SIMIX_network_send(sock_data->rdv,size,1,-1,NULL,0,NULL,NULL);
269   THROW_UNIMPLEMENTED;
270 }
271
272 int gras_trp_sg_chunk_recv(gras_socket_t sock,
273                            char *data, unsigned long int size)
274 {
275   //gras_trp_sg_sock_data_t *sock_data =
276   //    (gras_trp_sg_sock_data_t *) sock->data;
277
278   //SIMIX_network_recv(sock_data->rdv,-1,NULL,0,NULL);
279   THROW_UNIMPLEMENTED;
280 #ifdef KILLME
281   gras_trp_sg_sock_data_t *remote_sock_data;
282   gras_socket_t remote_socket = NULL;
283   gras_msg_t msg_got;
284   gras_msg_procdata_t msg_procdata =
285     (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
286   gras_trp_procdata_t trp_proc =
287     (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
288
289   xbt_assert0(sock->meas,
290               "SG chunk exchange shouldn't be used on non-measurement sockets");
291   xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
292                         &remote_socket, 60);
293
294   if (remote_socket == NULL) {
295     THROW0(timeout_error, 0, "Timeout");
296   }
297
298   remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data;
299   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
300
301   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
302
303   /* ok, I'm here, you can continue the communication */
304   SIMIX_cond_signal(remote_sock_data->cond);
305
306   SIMIX_mutex_lock(remote_sock_data->mutex);
307   /* wait for communication end */
308   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
309
310   if (msg_got->payl_size != size)
311     THROW5(mismatch_error, 0,
312            "Got %d bytes when %ld where expected (in %s->%s:%d)",
313            msg_got->payl_size, size,
314            SIMIX_host_get_name(sock_data->to_host),
315            SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
316
317   if (data)
318     memcpy(data, msg_got->payl, size);
319
320   if (msg_got->payl)
321     xbt_free(msg_got->payl);
322
323   xbt_free(msg_got);
324   SIMIX_mutex_unlock(remote_sock_data->mutex);
325 #endif
326   return 0;
327 }