Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9ebfd79a75b10f67b50266f27091b6662320e101
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
1 /* $Id$ */
2
3 /* file trp (transport) - send/receive a bunch of bytes in SG realm         */
4
5 /* Note that this is only used to debug other parts of GRAS since message   */
6 /*  exchange in SG realm is implemented directly without mimicing real life */
7 /*  This would be terribly unefficient.                                     */
8
9 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
10
11 /* This program is free software; you can redistribute it and/or modify it
12  * under the terms of the license (GNU LGPL) which comes with this package. */
13
14 #include "xbt/ex.h"
15
16 #include "gras/Msg/msg_private.h"
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Virtu/virtu_sg.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp,
21                                 "SimGrid pseudo-transport");
22
23 /***
24  *** Prototypes 
25  ***/
26
27 /* retrieve the port record associated to a numerical port on an host */
28 static void find_port(gras_hostdata_t * hd, int port,
29                       gras_sg_portrec_t * hpd);
30
31
32 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
33                                /* OUT */ gras_socket_t sock);
34 void gras_trp_sg_socket_server(gras_trp_plugin_t self,
35                                /* OUT */ gras_socket_t sock);
36 void gras_trp_sg_socket_close(gras_socket_t sd);
37
38 void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
39                                 const char *data, unsigned long int size);
40 void gras_trp_sg_chunk_send(gras_socket_t sd,
41                             const char *data,
42                             unsigned long int size, int stable_ignored);
43
44 int gras_trp_sg_chunk_recv(gras_socket_t sd,
45                            char *data, unsigned long int size);
46
47 /***
48  *** Specific plugin part
49  ***/
50 typedef struct {
51   int placeholder;              /* nothing plugin specific so far */
52 } gras_trp_sg_plug_data_t;
53
54
55 /***
56  *** Code
57  ***/
58 static void find_port(gras_hostdata_t * hd, int port, gras_sg_portrec_t * hpd)
59 {
60   unsigned int cpt;
61   gras_sg_portrec_t pr;
62
63   xbt_assert0(hd, "Please run gras_process_init on each process");
64
65   xbt_dynar_foreach(hd->ports, cpt, pr) {
66     if (pr.port == port) {
67       memcpy(hpd, &pr, sizeof(gras_sg_portrec_t));
68       return;
69     }
70   }
71   THROW1(mismatch_error, 0, "Unable to find any portrec for port #%d", port);
72 }
73
74
75 void gras_trp_sg_setup(gras_trp_plugin_t plug)
76 {
77
78   gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1);
79
80   plug->data = data;
81
82   plug->socket_client = gras_trp_sg_socket_client;
83   plug->socket_server = gras_trp_sg_socket_server;
84   plug->socket_close = gras_trp_sg_socket_close;
85
86   plug->raw_send = gras_trp_sg_chunk_send_raw;
87   plug->send = gras_trp_sg_chunk_send;
88   plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
89
90   plug->flush = NULL;           /* nothing cached */
91 }
92
93 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
94                                /* OUT */ gras_socket_t sock)
95 {
96   xbt_ex_t e;
97
98   smx_host_t peer;
99   gras_hostdata_t *hd;
100   gras_trp_sg_sock_data_t *data;
101   gras_sg_portrec_t pr;
102
103   /* make sure this socket will reach someone */
104   if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
105     THROW1(mismatch_error, 0,
106            "Can't connect to %s: no such host.\n", sock->peer_name);
107
108   if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer)))
109     THROW1(mismatch_error, 0,
110            "can't connect to %s: no process on this host", sock->peer_name);
111
112   TRY {
113     find_port(hd, sock->peer_port, &pr);
114   }
115   CATCH(e) {
116     if (e.category == mismatch_error) {
117       xbt_ex_free(e);
118       THROW2(mismatch_error, 0,
119              "can't connect to %s:%d, no process listen on this port",
120              sock->peer_name, sock->peer_port);
121     }
122     RETHROW;
123   }
124
125   if (pr.meas && !sock->meas) {
126     THROW2(mismatch_error, 0,
127            "can't connect to %s:%d in regular mode, the process listen "
128            "in measurement mode on this port", sock->peer_name,
129            sock->peer_port);
130   }
131   if (!pr.meas && sock->meas) {
132     THROW2(mismatch_error, 0,
133            "can't connect to %s:%d in measurement mode, the process listen "
134            "in regular mode on this port", sock->peer_name, sock->peer_port);
135   }
136   /* create the socket */
137   data = xbt_new(gras_trp_sg_sock_data_t, 1);
138   data->from_process = SIMIX_process_self();
139   data->to_process = pr.process;
140   data->to_host = peer;
141
142   /* initialize mutex and condition of the socket */
143   data->mutex = SIMIX_mutex_init();
144   data->cond = SIMIX_cond_init();
145   data->to_socket = pr.socket;
146
147   sock->data = data;
148   sock->incoming = 1;
149
150   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
151          SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
152          sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port);
153 }
154
155 void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
156 {
157
158   gras_hostdata_t *hd =
159     (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
160   gras_sg_portrec_t pr;
161   gras_trp_sg_sock_data_t *data;
162   volatile int found;
163
164   const char *host = SIMIX_host_get_name(SIMIX_host_self());
165
166   xbt_ex_t e;
167
168   xbt_assert0(hd, "Please run gras_process_init on each process");
169
170   sock->accepting = 0;          /* no such nuisance in SG */
171   found = 0;
172   TRY {
173     find_port(hd, sock->port, &pr);
174     found = 1;
175   } CATCH(e) {
176     if (e.category == mismatch_error)
177       xbt_ex_free(e);
178     else
179       RETHROW;
180   }
181
182   if (found)
183     THROW2(mismatch_error, 0,
184            "can't listen on address %s:%d: port already in use.",
185            host, sock->port);
186
187   pr.port = sock->port;
188   pr.meas = sock->meas;
189   pr.socket = sock;
190   pr.process = SIMIX_process_self();
191   xbt_dynar_push(hd->ports, &pr);
192
193   /* Create the socket */
194   data = xbt_new(gras_trp_sg_sock_data_t, 1);
195   data->from_process = SIMIX_process_self();
196   data->to_process = NULL;
197   data->to_host = SIMIX_host_self();
198
199   data->cond = SIMIX_cond_init();
200   data->mutex = SIMIX_mutex_init();
201
202   sock->data = data;
203
204   VERB6("'%s' (%d) ears on %s:%d%s (%p)",
205         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
206         host, sock->port, sock->meas ? " (mode meas)" : "", sock);
207
208 }
209
210 void gras_trp_sg_socket_close(gras_socket_t sock)
211 {
212   gras_hostdata_t *hd =
213     (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
214   unsigned int cpt;
215   gras_sg_portrec_t pr;
216
217   XBT_IN1(" (sock=%p)", sock);
218
219   if (!sock)
220     return;
221
222   xbt_assert0(hd, "Please run gras_process_init on each process");
223
224   if (sock->data) {
225     SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond);
226     SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex);
227     free(sock->data);
228   }
229
230   if (sock->incoming && !sock->outgoing && sock->port >= 0) {
231     /* server mode socket. Unregister it from 'OS' tables */
232     xbt_dynar_foreach(hd->ports, cpt, pr) {
233       DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
234       if (pr.port == sock->port) {
235         xbt_dynar_cursor_rm(hd->ports, &cpt);
236         XBT_OUT;
237         return;
238       }
239     }
240     WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
241           sock, sock->port);
242   }
243   XBT_OUT;
244 }
245
246 typedef struct {
247   int size;
248   void *data;
249 } sg_task_data_t;
250
251 void gras_trp_sg_chunk_send(gras_socket_t sock,
252                             const char *data,
253                             unsigned long int size, int stable_ignored)
254 {
255   gras_trp_sg_chunk_send_raw(sock, data, size);
256 }
257
258 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
259                                 const char *data, unsigned long int size)
260 {
261   char name[256];
262   static unsigned int count = 0;
263
264   smx_action_t act;             /* simix action */
265   gras_trp_sg_sock_data_t *sock_data;
266   gras_trp_procdata_t trp_remote_proc;
267   gras_msg_procdata_t msg_remote_proc;
268   gras_msg_t msg;               /* message to send */
269
270   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
271
272   xbt_assert0(sock->meas,
273               "SG chunk exchange shouldn't be used on non-measurement sockets");
274
275   SIMIX_mutex_lock(sock_data->mutex);
276   sprintf(name, "Chunk[%d]", count++);
277   /*initialize gras message */
278   msg = xbt_new(s_gras_msg_t, 1);
279   msg->expe = sock;
280   msg->payl_size = size;
281
282   if (data) {
283     msg->payl = (void *) xbt_malloc(size);
284     memcpy(msg->payl, data, size);
285   } else {
286     msg->payl = NULL;
287   }
288
289
290   /* put his socket on the selectable socket queue */
291   trp_remote_proc = (gras_trp_procdata_t)
292     gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
293   xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock);
294
295   /* put message on msg_queue */
296   msg_remote_proc = (gras_msg_procdata_t)
297     gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
298
299   xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg);
300
301   /* wait for the receiver */
302   SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
303
304   /* creates simix action and waits its ends, waits in the sender host
305      condition */
306   DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
307          name, SIMIX_host_get_name(SIMIX_host_self()),
308          SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size);
309
310   act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
311                                  name, size, -1);
312   SIMIX_register_action_to_condition(act, sock_data->cond);
313   SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
314   SIMIX_unregister_action_to_condition(act, sock_data->cond);
315   /* error treatmeant (FIXME) */
316
317   /* cleanup structures */
318   SIMIX_action_destroy(act);
319
320   SIMIX_mutex_unlock(sock_data->mutex);
321 }
322
323 int gras_trp_sg_chunk_recv(gras_socket_t sock,
324                            char *data, unsigned long int size)
325 {
326   gras_trp_sg_sock_data_t *sock_data;
327   gras_trp_sg_sock_data_t *remote_sock_data;
328   gras_socket_t remote_socket = NULL;
329   gras_msg_t msg_got;
330   gras_msg_procdata_t msg_procdata =
331     (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
332   gras_trp_procdata_t trp_proc =
333     (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
334
335   xbt_assert0(sock->meas,
336               "SG chunk exchange shouldn't be used on non-measurement sockets");
337   xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
338                         &remote_socket, 60);
339
340   if (remote_socket == NULL) {
341     THROW0(timeout_error, 0, "Timeout");
342   }
343
344   remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data;
345   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
346
347   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
348
349   /* ok, I'm here, you can continue the communication */
350   SIMIX_cond_signal(remote_sock_data->cond);
351
352   SIMIX_mutex_lock(remote_sock_data->mutex);
353   /* wait for communication end */
354   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
355
356   if (msg_got->payl_size != size)
357     THROW5(mismatch_error, 0,
358            "Got %d bytes when %ld where expected (in %s->%s:%d)",
359            msg_got->payl_size, size,
360            SIMIX_host_get_name(sock_data->to_host),
361            SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
362
363   if (data)
364     memcpy(data, msg_got->payl, size);
365
366   if (msg_got->payl)
367     xbt_free(msg_got->payl);
368
369   xbt_free(msg_got);
370   SIMIX_mutex_unlock(remote_sock_data->mutex);
371   return 0;
372 }