Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
6cd4c728abd58c0635db23c73558dc8d5d71786f
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
1 /* file trp (transport) - send/receive a bunch of bytes in SG realm         */
2
3 /* Note that this is only used to debug other parts of GRAS since message   */
4 /*  exchange in SG realm is implemented directly without mimicing real life */
5 /*  This would be terribly unefficient.                                     */
6
7 /* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team.
8  * All rights reserved.                                                     */
9
10 /* This program is free software; you can redistribute it and/or modify it
11  * under the terms of the license (GNU LGPL) which comes with this package. */
12
13 #include "xbt/ex.h"
14
15 #include "simix/simix.h"
16 #include "gras/Msg/msg_private.h"
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Virtu/virtu_sg.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp,
21                                 "SimGrid pseudo-transport");
22
23 /***
24  *** Prototypes 
25  ***/
26
27 /* retrieve the port record associated to a numerical port on an host */
28 static gras_sg_portrec_t find_port(gras_hostdata_t * hd, int port);
29
30 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
31                                const char*host,
32                                int port,
33                                /* OUT */ gras_socket_t sock);
34 void gras_trp_sg_socket_server(gras_trp_plugin_t self,
35                                int port,
36                                /* OUT */ gras_socket_t sock);
37 void gras_trp_sg_socket_close(gras_socket_t sd);
38
39 void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
40                                 const char *data, unsigned long int size);
41 void gras_trp_sg_chunk_send(gras_socket_t sd,
42                             const char *data,
43                             unsigned long int size, int stable_ignored);
44
45 int gras_trp_sg_chunk_recv(gras_socket_t sd,
46                            char *data, unsigned long int size);
47
48 /***
49  *** Specific plugin part
50  ***/
51 typedef struct {
52   int placeholder;              /* nothing plugin specific so far */
53 } gras_trp_sg_plug_data_t;
54
55
56 /***
57  *** Code
58  ***/
59 static gras_sg_portrec_t find_port(gras_hostdata_t * hd, int port)
60 {
61   unsigned int cpt;
62   gras_sg_portrec_t pr;
63
64   xbt_assert0(hd, "Please run gras_process_init on each process");
65
66   xbt_dynar_foreach(hd->ports, cpt, pr) {
67     if (pr->port == port)
68       return pr;
69   }
70   return NULL;
71 }
72
73 /***
74  *** Info about who's speaking
75  ***/
76 static int gras_trp_sg_my_port(gras_socket_t s) {
77   gras_trp_sg_sock_data_t sockdata = s->data;
78   if (sockdata->rdv_client == NULL) /* Master socket, I'm server */
79     return sockdata->server_port;
80   else
81     return sockdata->client_port;
82 }
83 static int gras_trp_sg_peer_port(gras_socket_t s) {
84   gras_trp_sg_sock_data_t sockdata = s->data;
85   if (sockdata->server == SIMIX_process_self())
86     return sockdata->client_port;
87   else
88     return sockdata->server_port;
89 }
90 static const char* gras_trp_sg_peer_name(gras_socket_t s) {
91   gras_trp_sg_sock_data_t sockdata = s->data;
92   if (sockdata->server == SIMIX_process_self())
93     return SIMIX_host_get_name(SIMIX_process_get_host(sockdata->client));
94   else {
95     if (sockdata->client!=SIMIX_process_self()) {
96       /* THAT'S BAD! I should be either client or server of the sockets I get messages on!! */
97       /* This is where the bug is visible. Try to die as loudly as possible */
98       xbt_backtrace_display_current();
99       ((char*)s)[sizeof(*s)+1] = '0'; /* Try to make valgrind angry to see where that damn socket comes from */
100       xbt_die(bprintf("I'm not the client in socket %p (comm:%p, rdvser=%p, rdvcli=%p) to %s, that's %s",
101           socket,sockdata->comm_recv,sockdata->rdv_server,sockdata->rdv_client,
102           SIMIX_host_get_name(SIMIX_process_get_host(sockdata->server)),
103           SIMIX_host_get_name(SIMIX_process_get_host(sockdata->client))));
104     }
105     xbt_assert(sockdata->client_port==gras_os_myport());
106     return SIMIX_host_get_name(SIMIX_process_get_host(sockdata->server));
107   }
108 }
109 static const char* gras_trp_sg_peer_proc(gras_socket_t s) {
110   THROW_UNIMPLEMENTED;
111 }
112 static void gras_trp_sg_peer_proc_set(gras_socket_t s,char *name) {
113   THROW_UNIMPLEMENTED;
114 }
115
116 void gras_trp_sg_setup(gras_trp_plugin_t plug)
117 {
118
119   plug->my_port = gras_trp_sg_my_port;
120   plug->peer_port = gras_trp_sg_peer_port;
121   plug->peer_name = gras_trp_sg_peer_name;
122   plug->peer_proc = gras_trp_sg_peer_proc;
123   plug->peer_proc_set = gras_trp_sg_peer_proc_set;
124
125   gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1);
126
127   plug->data = data;
128
129   plug->socket_client = gras_trp_sg_socket_client;
130   plug->socket_server = gras_trp_sg_socket_server;
131   plug->socket_close = gras_trp_sg_socket_close;
132
133   plug->raw_send = gras_trp_sg_chunk_send_raw;
134   plug->send = gras_trp_sg_chunk_send;
135   plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
136
137   plug->flush = NULL;           /* nothing cached */
138 }
139
140 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
141                                const char*host,
142                                int port,
143                                /* OUT */ gras_socket_t sock)
144 {
145
146   smx_host_t peer;
147   gras_hostdata_t *hd;
148   gras_trp_sg_sock_data_t data;
149   gras_sg_portrec_t pr;
150
151   /* make sure this socket will reach someone */
152   if (!(peer = SIMIX_host_get_by_name(host)))
153     THROW1(mismatch_error, 0,
154            "Can't connect to %s: no such host.\n", host);
155
156   if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer)))
157     THROW1(mismatch_error, 0,
158            "can't connect to %s: no process on this host",
159            host);
160
161   pr = find_port(hd, port);
162
163   if (pr == NULL) {
164     THROW2(mismatch_error, 0,
165            "can't connect to %s:%d, no process listen on this port",
166            host, port);
167   }
168
169   /* Ensure that the listener is expecting the kind of stuff we want to send */
170   if (pr->meas && !sock->meas) {
171     THROW2(mismatch_error, 0,
172            "can't connect to %s:%d in regular mode, the process listen "
173            "in measurement mode on this port", host,
174            port);
175   }
176   if (!pr->meas && sock->meas) {
177     THROW2(mismatch_error, 0,
178            "can't connect to %s:%d in measurement mode, the process listen "
179            "in regular mode on this port", host,
180            port);
181   }
182
183   /* create simulation data of the socket */
184   data = xbt_new0(s_gras_trp_sg_sock_data_t, 1);
185   data->client = SIMIX_process_self();
186   data->server = pr->server;
187   data->server_port = port;
188   data->client_port = gras_os_myport();
189
190   /* initialize synchronization stuff on the socket */
191   data->rdv_server = pr->rdv;
192   data->rdv_client = SIMIX_rdv_create(NULL);
193   data->comm_recv = SIMIX_network_irecv(data->rdv_client, NULL, 0);
194
195   /* connect that simulation data to the socket */
196   sock->data = data;
197   sock->incoming = 1;
198
199   DEBUG8("%s (PID %d) connects in %s mode to %s:%d (rdv_ser:%p, rdv_cli:%p, comm:%p)",
200          SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
201          sock->meas ? "meas" : "regular", host, port,
202          data->rdv_server,data->rdv_client,data->comm_recv);
203 }
204
205 void gras_trp_sg_socket_server(gras_trp_plugin_t self, int port, gras_socket_t sock)
206 {
207
208   gras_hostdata_t *hd =
209       (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
210   gras_sg_portrec_t pr;
211   gras_trp_sg_sock_data_t data;
212
213   xbt_assert0(hd, "Please run gras_process_init on each process");
214
215   sock->accepting = 1;
216
217   /* Check whether a server is already listening on that port or not */
218   pr = find_port(hd, port);
219
220   if (pr)
221     THROW2(mismatch_error, 0,
222            "can't listen on address %s:%d: port already in use.",
223            SIMIX_host_get_name(SIMIX_host_self()), port);
224
225   /* This port is free, let's take it */
226   pr = xbt_new(s_gras_sg_portrec_t, 1);
227   pr->port = port;
228   pr->meas = sock->meas;
229   pr->server = SIMIX_process_self();
230   pr->rdv = SIMIX_rdv_create(NULL);
231   xbt_dynar_push(hd->ports, &pr);
232
233   /* Create the socket */
234   data = xbt_new0(s_gras_trp_sg_sock_data_t, 1);
235   data->server = SIMIX_process_self();
236   data->server_port = port;
237   data->client = NULL;
238   data->rdv_server = pr->rdv;
239   data->rdv_client = NULL;
240   data->comm_recv = SIMIX_network_irecv(pr->rdv, NULL, 0);
241
242   sock->data = data;
243
244   VERB10
245       ("'%s' (%d) ears on %s:%d%s (%p; data:%p); Here rdv: %p; Remote rdv: %p; Comm %p",
246        SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
247        SIMIX_host_get_name(SIMIX_host_self()), port,
248        sock->meas ? " (mode meas)" : "", sock, data,
249        (data->server ==
250         SIMIX_process_self())? data->rdv_server : data->rdv_client,
251        (data->server ==
252         SIMIX_process_self())? data->rdv_client : data->rdv_server,
253        data->comm_recv);
254
255 }
256
257 void gras_trp_sg_socket_close(gras_socket_t sock)
258 {
259   gras_hostdata_t *hd =
260       (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
261   unsigned int cpt;
262   gras_sg_portrec_t pr;
263
264   XBT_IN1(" (sock=%p)", sock);
265
266   if (!sock)
267     return;
268
269   xbt_assert0(hd, "Please run gras_process_init on each process");
270
271   gras_trp_sg_sock_data_t sockdata = sock->data;
272
273   if (sock->incoming && !sock->outgoing && sockdata->server_port >= 0) {
274     /* server mode socket. Unregister it from 'OS' tables */
275     xbt_dynar_foreach(hd->ports, cpt, pr) {
276       DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
277       if (pr->port == sockdata->server_port) {
278         xbt_dynar_cursor_rm(hd->ports, &cpt);
279         XBT_OUT;
280         return;
281       }
282     }
283     WARN2
284         ("socket_close called on the unknown incoming socket %p (port=%d)",
285          sock, sockdata->server_port);
286   }
287   if (sock->data) {
288     /* FIXME: kill the rdv point if receiver side */
289     free(sock->data);
290   }
291   XBT_OUT;
292 }
293
294 typedef struct {
295   int size;
296   void *data;
297 } sg_task_data_t;
298
299 void gras_trp_sg_chunk_send(gras_socket_t sock,
300                             const char *data,
301                             unsigned long int size, int stable_ignored)
302 {
303   gras_trp_sg_chunk_send_raw(sock, data, size);
304 }
305
306 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
307                                 const char *data, unsigned long int size)
308 {
309 #ifdef KILLME
310   char name[256];
311   static unsigned int count = 0;
312
313   smx_action_t act;             /* simix action */
314   gras_trp_procdata_t trp_remote_proc;
315   gras_msg_procdata_t msg_remote_proc;
316   gras_msg_t msg;               /* message to send */
317
318   //gras_trp_sg_sock_data_t sock_data = (gras_trp_sg_sock_data_t) sock->data;
319   xbt_assert0(sock->meas,
320               "SG chunk exchange shouldn't be used on non-measurement sockets");
321
322
323   /* creates simix action and waits its ends, waits in the sender host
324      condition */
325   /*
326   if (XBT_LOG_ISENABLED(gras_trp_sg, xbt_log_priority_debug)) {
327     smx_process_t remote_dude =
328         (sock_data->server ==
329          SIMIX_process_self())? (sock_data->client) : (sock_data->server);
330     smx_host_t remote_host = SIMIX_process_get_host(remote_dude);
331   }
332   */
333   //SIMIX_network_send(sock_data->rdv,size,1,-1,NULL,0,NULL,NULL);
334 #endif
335   THROW_UNIMPLEMENTED;
336 }
337
338 int gras_trp_sg_chunk_recv(gras_socket_t sock,
339                            char *data, unsigned long int size)
340 {
341   //gras_trp_sg_sock_data_t *sock_data =
342   //    (gras_trp_sg_sock_data_t *) sock->data;
343
344   //SIMIX_network_recv(sock_data->rdv,-1,NULL,0,NULL);
345   THROW_UNIMPLEMENTED;
346 #ifdef KILLME
347   gras_trp_sg_sock_data_t *remote_sock_data;
348   gras_socket_t remote_socket = NULL;
349   gras_msg_t msg_got;
350   gras_msg_procdata_t msg_procdata =
351       (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
352   gras_trp_procdata_t trp_proc =
353       (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
354
355   xbt_assert0(sock->meas,
356               "SG chunk exchange shouldn't be used on non-measurement sockets");
357   xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
358                         &remote_socket, 60);
359
360   if (remote_socket == NULL) {
361     THROW0(timeout_error, 0, "Timeout");
362   }
363
364   remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data;
365   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
366
367   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
368
369   /* ok, I'm here, you can continue the communication */
370   SIMIX_cond_signal(remote_sock_data->cond);
371
372   SIMIX_mutex_lock(remote_sock_data->mutex);
373   /* wait for communication end */
374   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
375
376   if (msg_got->payl_size != size)
377     THROW5(mismatch_error, 0,
378            "Got %d bytes when %ld where expected (in %s->%s:%d)",
379            msg_got->payl_size, size,
380            SIMIX_host_get_name(sock_data->to_host),
381            SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
382
383   if (data)
384     memcpy(data, msg_got->payl, size);
385
386   if (msg_got->payl)
387     xbt_free(msg_got->payl);
388
389   xbt_free(msg_got);
390   SIMIX_mutex_unlock(remote_sock_data->mutex);
391 #endif
392   return 0;
393 }