Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Ongoing work to port GRAS to smx_network. Not working yet
[simgrid.git] / src / gras / Msg / sg_msg.c
1 /* messaging - Function related to messaging code specific to SG            */
2
3 /* Copyright (c) 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "xbt/ex.h"
10
11 #include "gras/Virtu/virtu_sg.h"
12
13 #include "gras/Msg/msg_private.h"
14
15 #include "gras/DataDesc/datadesc_interface.h"
16 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
17 #include "gras/Transport/transport_private.h"   /* sock->data */
18
19 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
20
21 typedef void *gras_trp_bufdata_;
22
23 gras_msg_t gras_msg_recv_any(void) {
24   gras_trp_procdata_t trp_proc =
25       (gras_trp_procdata_t) gras_libdata_by_name("gras_trp");
26   gras_msg_t msg;
27   /* Build a dynar of all communications I could get something from */
28   xbt_dynar_t comms = xbt_dynar_new(sizeof(smx_comm_t),NULL);
29   unsigned int cursor;
30   gras_socket_t sock;
31   gras_trp_sg_sock_data_t *sock_data;
32   xbt_dynar_foreach(trp_proc->sockets,cursor,sock) {
33     sock_data = (gras_trp_sg_sock_data_t *) sock->data;
34     if (sock_data->comm_recv) {
35       INFO2("Copy %p of size %d",sock_data->comm_recv,sizeof(smx_comm_t));
36       xbt_dynar_push(comms,&(sock_data->comm_recv));
37     }
38   }
39   VERB1("Wait on %ld 'sockets'",xbt_dynar_length(comms));
40   /* Wait for the end of any of these communications */
41   int got = SIMIX_network_waitany(comms);
42   smx_comm_t comm;
43
44   /* retrieve the message sent in that communication */
45   xbt_dynar_get_cpy(comms,got,&(comm));
46   msg=SIMIX_communication_get_data(comm);
47   VERB1("Got something. Communication %p's over",comm);
48
49   /* Reinstall a waiting communication on that rdv */
50   /* Get the sock again */
51   xbt_dynar_foreach(trp_proc->sockets,cursor,sock) {
52     sock_data = (gras_trp_sg_sock_data_t *) sock->data;
53     if (sock_data->comm_recv && sock_data->comm_recv == comm)
54       break;
55   }
56   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
57   sock_data->comm_recv = SIMIX_network_irecv(
58       sock_data->im_server?sock_data->rdv_server:sock_data->rdv_client,
59       NULL,0);
60   SIMIX_communication_destroy(comm);
61
62   return msg;
63 }
64
65
66 void gras_msg_send_ext(gras_socket_t sock,
67                        e_gras_msg_kind_t kind,
68                        unsigned long int ID,
69                        gras_msgtype_t msgtype, void *payload)
70 {
71   int whole_payload_size = 0;   /* msg->payload_size is used to memcpy the payload.
72                                    This is used to report the load onto the simulator. It also counts the size of pointed stuff */
73   gras_msg_t msg;               /* message to send */
74
75   /*initialize gras message */
76   msg = xbt_new(s_gras_msg_t, 1);
77   msg->expe = sock;
78   msg->kind = kind;
79   msg->type = msgtype;
80   msg->ID = ID;
81   if (kind == e_gras_msg_kind_rpcerror) {
82     /* error on remote host, careful, payload is an exception */
83     msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t"));
84     msg->payl = xbt_malloc(msg->payl_size);
85     whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"),
86                                               payload, msg->payl);
87   } else if (kind == e_gras_msg_kind_rpcanswer) {
88     msg->payl_size = gras_datadesc_size(msgtype->answer_type);
89     if (msg->payl_size)
90       msg->payl = xbt_malloc(msg->payl_size);
91     else
92       msg->payl = NULL;
93
94     if (msgtype->answer_type)
95       whole_payload_size = gras_datadesc_memcpy(msgtype->answer_type,
96                                                 payload, msg->payl);
97   } else {
98     msg->payl_size = gras_datadesc_size(msgtype->ctn_type);
99     msg->payl = msg->payl_size ? xbt_malloc(msg->payl_size) : NULL;
100     if (msgtype->ctn_type)
101       whole_payload_size = gras_datadesc_memcpy(msgtype->ctn_type,
102                                                 payload, msg->payl);
103   }
104   gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *) sock->data;
105   smx_comm_t comm;
106   SIMIX_network_send(sock_data->im_server ? sock_data->rdv_client : sock_data->rdv_client,
107       whole_payload_size,-1,-1,&msg,sizeof(void*),&comm,msg);
108
109 #ifdef KILLME
110   smx_action_t act;             /* simix action */
111   gras_hostdata_t *hd;
112   gras_trp_procdata_t trp_remote_proc;
113   gras_msg_procdata_t msg_remote_proc;
114
115   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
116
117   hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
118
119   xbt_assert1(!gras_socket_is_meas(sock),
120               "Asked to send a message on the measurement socket %p", sock);
121
122
123   /* put the selectable socket on the queue */
124   trp_remote_proc = (gras_trp_procdata_t)
125     gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
126
127   xbt_queue_push(trp_remote_proc->msg_selectable_sockets, &sock);
128
129   /* put message on msg_queue */
130   msg_remote_proc = (gras_msg_procdata_t)
131     gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
132   xbt_fifo_push(msg_remote_proc->msg_to_receive_queue, msg);
133
134   /* wait for the receiver */
135   SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
136
137   /* creates simix action and waits its ends, waits in the sender host
138      condition */
139   act = SIMIX_action_communicate(SIMIX_host_self(),
140                                  sock_data->to_host, msgtype->name,
141                                  (double) whole_payload_size, -1);
142   SIMIX_register_action_to_condition(act, sock_data->cond);
143
144   VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
145         SIMIX_host_get_name(sock_data->to_host),
146         SIMIX_process_get_name(sock_data->to_process),
147         msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
148
149   SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
150   SIMIX_unregister_action_to_condition(act, sock_data->cond);
151   /* error treatmeant (FIXME) */
152
153   /* cleanup structures */
154   SIMIX_action_destroy(act);
155   SIMIX_mutex_unlock(sock_data->mutex);
156 #endif
157   VERB0("Message sent");
158
159 }
160
161 #ifdef KILLMETOO
162 /*
163  * receive the next message on the given socket.
164  */
165 void gras_msg_recv(gras_socket_t sock, gras_msg_t msg)
166 {
167
168   gras_trp_sg_sock_data_t *sock_data =
169        (gras_trp_sg_sock_data_t *) sock->data;
170   gras_msg_t msg_got;
171   size_t size_got = sizeof(void*);
172
173   xbt_assert1(!gras_socket_is_meas(sock),
174               "Asked to receive a message on the measurement socket %p",
175               sock);
176
177   SIMIX_network_recv(sock_data->rdv,-1,&msg_got,&size_got,NULL);
178 #ifdef KILLME
179   gras_trp_sg_sock_data_t *remote_sock_data;
180   gras_hostdata_t *remote_hd;
181   gras_msg_procdata_t msg_procdata =
182     (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
183
184   xbt_assert0(msg, "msg is an out parameter of gras_msg_recv...");
185
186   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
187   remote_sock_data =
188     ((gras_trp_sg_sock_data_t *) sock->data)->to_socket->data;
189   DEBUG3("Remote host %s, Remote Port: %d Local port %d",
190          SIMIX_host_get_name(sock_data->to_host), sock->peer_port,
191          sock->port);
192   remote_hd = (gras_hostdata_t *) SIMIX_host_get_data(sock_data->to_host);
193
194   if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0) {
195     THROW_IMPOSSIBLE;
196   }
197   DEBUG1("Size msg_to_receive buffer: %d",
198          xbt_fifo_size(msg_procdata->msg_to_receive_queue));
199   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue);
200
201   SIMIX_mutex_lock(remote_sock_data->mutex);
202   /* ok, I'm here, you can continuate the communication */
203   SIMIX_cond_signal(remote_sock_data->cond);
204
205   /* wait for communication end */
206   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
207
208   msg_got->expe = msg->expe;
209   memcpy(msg, msg_got, sizeof(s_gras_msg_t));
210   xbt_free(msg_got);
211   SIMIX_mutex_unlock(remote_sock_data->mutex);
212 #endif
213   VERB3("Received a message type '%s' kind '%s' ID %lu",        // from %s",
214         msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
215 }
216 #endif