Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
131ee9f7b18e80704ae710719cb4ff0c089aede5
[simgrid.git] / src / gras / Msg / sg_msg.c
1 /* $Id$ */
2
3 /* messaging - Function related to messaging code specific to SG            */
4
5 /* Copyright (c) 2003-2005 Martin Quinson. All rights reserved.             */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11
12 #include "gras/Virtu/virtu_sg.h"
13
14 #include "gras/Msg/msg_private.h"
15
16 #include "gras/DataDesc/datadesc_interface.h"
17 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
18 #include "gras/Transport/transport_private.h"   /* sock->data */
19
20 XBT_LOG_EXTERNAL_CATEGORY(gras_msg);
21 XBT_LOG_DEFAULT_CATEGORY(gras_msg);
22
23 typedef void *gras_trp_bufdata_;
24
25 void gras_msg_send_ext(gras_socket_t sock,
26                        e_gras_msg_kind_t kind,
27                        unsigned long int ID,
28                        gras_msgtype_t msgtype, void *payload)
29 {
30   gras_trp_sg_sock_data_t *sock_data;
31   gras_msg_t msg;               /* message to send */
32   int whole_payload_size = 0;   /* msg->payload_size is used to memcpy the payload.
33                                    This is used to report the load onto the simulator. It also counts the size of pointed stuff */
34
35   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
36
37   xbt_assert1(!gras_socket_is_meas(sock),
38               "Asked to send a message on the measurement socket %p", sock);
39
40   /*initialize gras message */
41   msg = xbt_new(s_gras_msg_t, 1);
42   msg->expe = sock;
43   msg->kind = kind;
44   msg->type = msgtype;
45   msg->ID = ID;
46   if (kind == e_gras_msg_kind_rpcerror) {
47     /* error on remote host, careful, payload is an exception */
48     msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t"));
49     msg->payl = xbt_malloc(msg->payl_size);
50     whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"),
51                                               payload, msg->payl);
52   } else if (kind == e_gras_msg_kind_rpcanswer) {
53     msg->payl_size = gras_datadesc_size(msgtype->answer_type);
54     if (msg->payl_size)
55       msg->payl = xbt_malloc(msg->payl_size);
56     else
57       msg->payl = NULL;
58
59     if (msgtype->answer_type)
60       whole_payload_size = gras_datadesc_memcpy(msgtype->answer_type,
61                                                 payload, msg->payl);
62   } else {
63     msg->payl_size = gras_datadesc_size(msgtype->ctn_type);
64     msg->payl = msg->payl_size ? xbt_malloc(msg->payl_size) : NULL;
65     if (msgtype->ctn_type)
66       whole_payload_size = gras_datadesc_memcpy(msgtype->ctn_type,
67                                                 payload, msg->payl);
68   }
69
70
71
72   VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
73         sock->peer_name,sock->peer_proc,
74         msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
75   SIMIX_network_send(sock_data->rdv,whole_payload_size,-1.,-1.,msg,sizeof(s_gras_msg_t),(smx_comm_t*)&(msg->comm),&msg);
76
77   VERB0("Message sent");
78 }
79
80 /*
81  * receive the next message on the given socket.
82  */
83 void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) {
84   gras_trp_procdata_t pd =
85     (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
86   gras_trp_sg_sock_data_t *sock_data;
87
88   xbt_assert1(!gras_socket_is_meas(sock),
89               "Asked to receive a message on the measurement socket %p",
90               sock);
91
92   xbt_assert0(msg, "msg is an out parameter of gras_msg_recv...");
93
94   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
95
96   /* The message was already received while emulating the select, so simply copy it here */
97   memcpy(msg,&(sock_data->ongoing_msg),sizeof(s_gras_msg_t));
98   msg->expe = sock;
99   VERB1("Using %p as a msg",&(sock_data->ongoing_msg));
100   VERB5("Received a message type '%s' kind '%s' ID %lu from %s(%s)",
101         msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID,
102         sock->peer_name,sock->peer_proc);
103
104   /* Recreate another comm object to replace the one which just terminated */
105   int rank = xbt_dynar_search(pd->sockets,&sock);
106   xbt_assert0(rank>=0,"Socket not found in my array");
107   sock_data->ongoing_msg_size = sizeof(s_gras_msg_t);
108   smx_comm_t comm = SIMIX_network_irecv(sock_data->rdv,&(sock_data->ongoing_msg),&(sock_data->ongoing_msg_size));
109   xbt_dynar_set(pd->comms,rank,&comm);
110
111 #if 0 /* KILLME */
112   SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm);
113
114
115   remote_sock_data =
116     ((gras_trp_sg_sock_data_t *) sock->data)->to_socket->data;
117   remote_hd = (gras_hostdata_t *) SIMIX_host_get_data(sock_data->to_host);
118
119   if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0) {
120     THROW_IMPOSSIBLE;
121   }
122   DEBUG1("Size msg_to_receive buffer: %d",
123          xbt_fifo_size(msg_procdata->msg_to_receive_queue));
124   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue);
125
126   SIMIX_mutex_lock(remote_sock_data->mutex);
127   /* ok, I'm here, you can continuate the communication */
128   SIMIX_cond_signal(remote_sock_data->cond);
129
130   /* wait for communication end */
131   INFO2("Wait communication (from %s) termination on %p",sock->peer_name,sock_data->cond);
132
133   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
134
135   msg_got->expe = msg->expe;
136   memcpy(msg, msg_got, sizeof(s_gras_msg_t));
137   xbt_free(msg_got);
138   SIMIX_mutex_unlock(remote_sock_data->mutex);
139 #endif
140 }