Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill old $Id$ command dating from CVS
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
1 /* file trp (transport) - send/receive a bunch of bytes in SG realm         */
2
3 /* Note that this is only used to debug other parts of GRAS since message   */
4 /*  exchange in SG realm is implemented directly without mimicing real life */
5 /*  This would be terribly unefficient.                                     */
6
7 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13
14 #include "gras/Msg/msg_private.h"
15 #include "gras/Transport/transport_private.h"
16 #include "gras/Virtu/virtu_sg.h"
17
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp,
19                                 "SimGrid pseudo-transport");
20
21 /***
22  *** Prototypes 
23  ***/
24
25 /* retrieve the port record associated to a numerical port on an host */
26 static void find_port(gras_hostdata_t * hd, int port,
27                       gras_sg_portrec_t * hpd);
28
29
30 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
31                                /* OUT */ gras_socket_t sock);
32 void gras_trp_sg_socket_server(gras_trp_plugin_t self,
33                                /* OUT */ gras_socket_t sock);
34 void gras_trp_sg_socket_close(gras_socket_t sd);
35
36 void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
37                                 const char *data, unsigned long int size);
38 void gras_trp_sg_chunk_send(gras_socket_t sd,
39                             const char *data,
40                             unsigned long int size, int stable_ignored);
41
42 int gras_trp_sg_chunk_recv(gras_socket_t sd,
43                            char *data, unsigned long int size);
44
45 /***
46  *** Specific plugin part
47  ***/
48 typedef struct {
49   int placeholder;              /* nothing plugin specific so far */
50 } gras_trp_sg_plug_data_t;
51
52
53 /***
54  *** Code
55  ***/
56 static void find_port(gras_hostdata_t * hd, int port, gras_sg_portrec_t * hpd)
57 {
58   unsigned int cpt;
59   gras_sg_portrec_t pr;
60
61   xbt_assert0(hd, "Please run gras_process_init on each process");
62
63   xbt_dynar_foreach(hd->ports, cpt, pr) {
64     if (pr.port == port) {
65       memcpy(hpd, &pr, sizeof(gras_sg_portrec_t));
66       return;
67     }
68   }
69   THROW1(mismatch_error, 0, "Unable to find any portrec for port #%d", port);
70 }
71
72
73 void gras_trp_sg_setup(gras_trp_plugin_t plug)
74 {
75
76   gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1);
77
78   plug->data = data;
79
80   plug->socket_client = gras_trp_sg_socket_client;
81   plug->socket_server = gras_trp_sg_socket_server;
82   plug->socket_close = gras_trp_sg_socket_close;
83
84   plug->raw_send = gras_trp_sg_chunk_send_raw;
85   plug->send = gras_trp_sg_chunk_send;
86   plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
87
88   plug->flush = NULL;           /* nothing cached */
89 }
90
91 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
92                                /* OUT */ gras_socket_t sock)
93 {
94   xbt_ex_t e;
95
96   smx_host_t peer;
97   gras_hostdata_t *hd;
98   gras_trp_sg_sock_data_t *data;
99   gras_sg_portrec_t pr;
100
101   /* make sure this socket will reach someone */
102   if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
103     THROW1(mismatch_error, 0,
104            "Can't connect to %s: no such host.\n", sock->peer_name);
105
106   if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer)))
107     THROW1(mismatch_error, 0,
108            "can't connect to %s: no process on this host", sock->peer_name);
109
110   TRY {
111     find_port(hd, sock->peer_port, &pr);
112   }
113   CATCH(e) {
114     if (e.category == mismatch_error) {
115       xbt_ex_free(e);
116       THROW2(mismatch_error, 0,
117              "can't connect to %s:%d, no process listen on this port",
118              sock->peer_name, sock->peer_port);
119     }
120     RETHROW;
121   }
122
123   if (pr.meas && !sock->meas) {
124     THROW2(mismatch_error, 0,
125            "can't connect to %s:%d in regular mode, the process listen "
126            "in measurement mode on this port", sock->peer_name,
127            sock->peer_port);
128   }
129   if (!pr.meas && sock->meas) {
130     THROW2(mismatch_error, 0,
131            "can't connect to %s:%d in measurement mode, the process listen "
132            "in regular mode on this port", sock->peer_name, sock->peer_port);
133   }
134   /* create the socket */
135   data = xbt_new(gras_trp_sg_sock_data_t, 1);
136   data->from_process = SIMIX_process_self();
137   data->to_process = pr.process;
138   data->to_host = peer;
139
140   /* initialize mutex and condition of the socket */
141   data->mutex = SIMIX_mutex_init();
142   data->cond = SIMIX_cond_init();
143   data->to_socket = pr.socket;
144
145   sock->data = data;
146   sock->incoming = 1;
147
148   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
149          SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
150          sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port);
151 }
152
153 void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
154 {
155
156   gras_hostdata_t *hd =
157     (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
158   gras_sg_portrec_t pr;
159   gras_trp_sg_sock_data_t *data;
160   volatile int found;
161
162   const char *host = SIMIX_host_get_name(SIMIX_host_self());
163
164   xbt_ex_t e;
165
166   xbt_assert0(hd, "Please run gras_process_init on each process");
167
168   sock->accepting = 0;          /* no such nuisance in SG */
169   found = 0;
170   TRY {
171     find_port(hd, sock->port, &pr);
172     found = 1;
173   } CATCH(e) {
174     if (e.category == mismatch_error)
175       xbt_ex_free(e);
176     else
177       RETHROW;
178   }
179
180   if (found)
181     THROW2(mismatch_error, 0,
182            "can't listen on address %s:%d: port already in use.",
183            host, sock->port);
184
185   pr.port = sock->port;
186   pr.meas = sock->meas;
187   pr.socket = sock;
188   pr.process = SIMIX_process_self();
189   xbt_dynar_push(hd->ports, &pr);
190
191   /* Create the socket */
192   data = xbt_new(gras_trp_sg_sock_data_t, 1);
193   data->from_process = SIMIX_process_self();
194   data->to_process = NULL;
195   data->to_host = SIMIX_host_self();
196
197   data->cond = SIMIX_cond_init();
198   data->mutex = SIMIX_mutex_init();
199
200   sock->data = data;
201
202   VERB6("'%s' (%d) ears on %s:%d%s (%p)",
203         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
204         host, sock->port, sock->meas ? " (mode meas)" : "", sock);
205
206 }
207
208 void gras_trp_sg_socket_close(gras_socket_t sock)
209 {
210   gras_hostdata_t *hd =
211     (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
212   unsigned int cpt;
213   gras_sg_portrec_t pr;
214
215   XBT_IN1(" (sock=%p)", sock);
216
217   if (!sock)
218     return;
219
220   xbt_assert0(hd, "Please run gras_process_init on each process");
221
222   if (sock->data) {
223     SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond);
224     SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex);
225     free(sock->data);
226   }
227
228   if (sock->incoming && !sock->outgoing && sock->port >= 0) {
229     /* server mode socket. Unregister it from 'OS' tables */
230     xbt_dynar_foreach(hd->ports, cpt, pr) {
231       DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
232       if (pr.port == sock->port) {
233         xbt_dynar_cursor_rm(hd->ports, &cpt);
234         XBT_OUT;
235         return;
236       }
237     }
238     WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
239           sock, sock->port);
240   }
241   XBT_OUT;
242 }
243
244 typedef struct {
245   int size;
246   void *data;
247 } sg_task_data_t;
248
249 void gras_trp_sg_chunk_send(gras_socket_t sock,
250                             const char *data,
251                             unsigned long int size, int stable_ignored)
252 {
253   gras_trp_sg_chunk_send_raw(sock, data, size);
254 }
255
256 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
257                                 const char *data, unsigned long int size)
258 {
259   char name[256];
260   static unsigned int count = 0;
261
262   smx_action_t act;             /* simix action */
263   gras_trp_sg_sock_data_t *sock_data;
264   gras_trp_procdata_t trp_remote_proc;
265   gras_msg_procdata_t msg_remote_proc;
266   gras_msg_t msg;               /* message to send */
267
268   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
269
270   xbt_assert0(sock->meas,
271               "SG chunk exchange shouldn't be used on non-measurement sockets");
272
273   SIMIX_mutex_lock(sock_data->mutex);
274   sprintf(name, "Chunk[%d]", count++);
275   /*initialize gras message */
276   msg = xbt_new(s_gras_msg_t, 1);
277   msg->expe = sock;
278   msg->payl_size = size;
279
280   if (data) {
281     msg->payl = (void *) xbt_malloc(size);
282     memcpy(msg->payl, data, size);
283   } else {
284     msg->payl = NULL;
285   }
286
287
288   /* put his socket on the selectable socket queue */
289   trp_remote_proc = (gras_trp_procdata_t)
290     gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
291   xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock);
292
293   /* put message on msg_queue */
294   msg_remote_proc = (gras_msg_procdata_t)
295     gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
296
297   xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg);
298
299   /* wait for the receiver */
300   SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
301
302   /* creates simix action and waits its ends, waits in the sender host
303      condition */
304   DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
305          name, SIMIX_host_get_name(SIMIX_host_self()),
306          SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size);
307
308   act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
309                                  name, size, -1);
310   SIMIX_register_action_to_condition(act, sock_data->cond);
311   SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
312   SIMIX_unregister_action_to_condition(act, sock_data->cond);
313   /* error treatmeant (FIXME) */
314
315   /* cleanup structures */
316   SIMIX_action_destroy(act);
317
318   SIMIX_mutex_unlock(sock_data->mutex);
319 }
320
321 int gras_trp_sg_chunk_recv(gras_socket_t sock,
322                            char *data, unsigned long int size)
323 {
324   gras_trp_sg_sock_data_t *sock_data;
325   gras_trp_sg_sock_data_t *remote_sock_data;
326   gras_socket_t remote_socket = NULL;
327   gras_msg_t msg_got;
328   gras_msg_procdata_t msg_procdata =
329     (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
330   gras_trp_procdata_t trp_proc =
331     (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
332
333   xbt_assert0(sock->meas,
334               "SG chunk exchange shouldn't be used on non-measurement sockets");
335   xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
336                         &remote_socket, 60);
337
338   if (remote_socket == NULL) {
339     THROW0(timeout_error, 0, "Timeout");
340   }
341
342   remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data;
343   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
344
345   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
346
347   /* ok, I'm here, you can continue the communication */
348   SIMIX_cond_signal(remote_sock_data->cond);
349
350   SIMIX_mutex_lock(remote_sock_data->mutex);
351   /* wait for communication end */
352   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
353
354   if (msg_got->payl_size != size)
355     THROW5(mismatch_error, 0,
356            "Got %d bytes when %ld where expected (in %s->%s:%d)",
357            msg_got->payl_size, size,
358            SIMIX_host_get_name(sock_data->to_host),
359            SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
360
361   if (data)
362     memcpy(data, msg_got->payl, size);
363
364   if (msg_got->payl)
365     xbt_free(msg_got->payl);
366
367   xbt_free(msg_got);
368   SIMIX_mutex_unlock(remote_sock_data->mutex);
369   return 0;
370 }