Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Reindent some code, no real change (should do it for all my code once for good)
[simgrid.git] / src / gras / Msg / sg_msg.c
1 /* $Id$ */
2
3 /* messaging - Function related to messaging code specific to SG            */
4
5 /* Copyright (c) 2003-2005 Martin Quinson. All rights reserved.             */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11
12 #include "gras/Virtu/virtu_sg.h"
13
14 #include "gras/Msg/msg_private.h"
15
16 #include "gras/DataDesc/datadesc_interface.h"
17 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
18 #include "gras/Transport/transport_private.h" /* sock->data */
19
20 XBT_LOG_EXTERNAL_CATEGORY(gras_msg);
21 XBT_LOG_DEFAULT_CATEGORY(gras_msg);
22
23 typedef void* gras_trp_bufdata_;
24
25 void gras_msg_send_ext(gras_socket_t   sock,
26                 e_gras_msg_kind_t kind,
27                 unsigned long int ID,
28                 gras_msgtype_t  msgtype,
29                 void           *payload) {
30
31         smx_action_t act; /* simix action */
32         gras_trp_sg_sock_data_t *sock_data;
33         gras_hostdata_t *hd;
34         gras_trp_procdata_t trp_remote_proc;
35         gras_msg_procdata_t msg_remote_proc;
36         gras_msg_t msg; /* message to send */
37         int whole_payload_size=0; /* msg->payload_size is used to memcpy the payload.
38                                This is used to report the load onto the simulator. It also counts the size of pointed stuff */
39
40         sock_data = (gras_trp_sg_sock_data_t *)sock->data;
41
42         hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
43
44         xbt_assert1(!gras_socket_is_meas(sock),
45                         "Asked to send a message on the measurement socket %p", sock);
46
47         /*initialize gras message */
48         msg = xbt_new(s_gras_msg_t,1);
49         msg->expe = sock;
50         msg->kind = kind;
51         msg->type = msgtype;
52         msg->ID = ID;
53         if (kind == e_gras_msg_kind_rpcerror) {
54                 /* error on remote host, carfull, payload is an exception */
55                 msg->payl_size=gras_datadesc_size(gras_datadesc_by_name("ex_t"));
56                 msg->payl=xbt_malloc(msg->payl_size);
57                 whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"),
58                                 payload,msg->payl);
59         } else if (kind == e_gras_msg_kind_rpcanswer) {
60                 msg->payl_size=gras_datadesc_size(msgtype->answer_type);
61                 if (msg->payl_size)
62                         msg->payl=xbt_malloc(msg->payl_size);
63                 else
64                         msg->payl=NULL;
65
66                 if (msgtype->answer_type)
67                         whole_payload_size = gras_datadesc_memcpy(msgtype->answer_type,
68                                         payload, msg->payl);
69         } else {
70                 msg->payl_size=gras_datadesc_size(msgtype->ctn_type);
71                 msg->payl=msg->payl_size?xbt_malloc(msg->payl_size):NULL;
72                 if (msgtype->ctn_type)
73                         whole_payload_size = gras_datadesc_memcpy(msgtype->ctn_type,
74                                         payload, msg->payl);
75         }
76
77         /* put the selectable socket on the queue */
78         trp_remote_proc = (gras_trp_procdata_t)
79         gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
80
81         xbt_queue_push(trp_remote_proc->msg_selectable_sockets,&sock);
82
83         /* put message on msg_queue */
84         msg_remote_proc = (gras_msg_procdata_t)
85         gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
86         xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
87
88         /* wait for the receiver */
89         SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
90
91         /* creates simix action and waits its ends, waits in the sender host
92      condition*/
93         act = SIMIX_action_communicate(SIMIX_host_self(),
94                         sock_data->to_host,msgtype->name,
95                         (double)whole_payload_size, -1);
96         SIMIX_register_action_to_condition(act,sock_data->cond);
97
98         VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
99                         SIMIX_host_get_name(sock_data->to_host),
100                         SIMIX_process_get_name(sock_data->to_process),
101                         msg->type->name,e_gras_msg_kind_names[msg->kind], msg->ID);
102
103         SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
104         SIMIX_unregister_action_to_condition(act,sock_data->cond);
105         /* error treatmeant (FIXME)*/
106
107         /* cleanup structures */
108         SIMIX_action_destroy(act);
109         SIMIX_mutex_unlock(sock_data->mutex);
110
111         VERB0("Message sent");
112
113 }
114 /*
115  * receive the next message on the given socket.
116  */
117 void
118 gras_msg_recv(gras_socket_t    sock,
119                 gras_msg_t       msg) {
120
121         gras_trp_sg_sock_data_t *sock_data;
122         gras_trp_sg_sock_data_t *remote_sock_data;
123         gras_hostdata_t *remote_hd;
124         gras_msg_t msg_got;
125         gras_msg_procdata_t msg_procdata =
126                 (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
127
128         xbt_assert1(!gras_socket_is_meas(sock),
129                         "Asked to receive a message on the measurement socket %p", sock);
130
131         xbt_assert0(msg,"msg is an out parameter of gras_msg_recv...");
132
133         sock_data = (gras_trp_sg_sock_data_t *)sock->data;
134         remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data;
135         DEBUG3("Remote host %s, Remote Port: %d Local port %d",
136                         SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port);
137         remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host);
138
139         if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) {
140                 THROW_IMPOSSIBLE;
141         }
142         DEBUG1("Size msg_to_receive buffer: %d",
143                         xbt_fifo_size(msg_procdata->msg_to_receive_queue));
144         msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue);
145
146         SIMIX_mutex_lock(remote_sock_data->mutex);
147         /* ok, I'm here, you can continuate the communication */
148         SIMIX_cond_signal(remote_sock_data->cond);
149
150         /* wait for communication end */
151         SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
152
153         msg_got->expe= msg->expe;
154         memcpy(msg,msg_got,sizeof(s_gras_msg_t));
155         xbt_free(msg_got);
156         SIMIX_mutex_unlock(remote_sock_data->mutex);
157
158         VERB3("Received a message type '%s' kind '%s' ID %lu",// from %s",
159                         msg->type->name,
160                         e_gras_msg_kind_names[msg->kind],
161                         msg->ID);
162 }