Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
merge back the master trunk into the smpi branch
[simgrid.git] / src / surf / ns3 / ns3_simulator.cc
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "surf/ns3/ns3_simulator.h"
8 #include "xbt/dict.h"
9 #include "xbt/log.h"
10 #include "xbt/sysdep.h"
11
12 using namespace ns3;
13 using namespace std;
14
15 xbt_dict_t dict_socket = NULL;
16
17 NS3Sim SimulatorNS3;
18 static char socket_key[24];
19
20 static void receive_callback(Ptr<Socket> localSocket);
21 static void send_callback(Ptr<Socket> localSocket, uint32_t txSpace);
22 static void datasent_callback(Ptr<Socket> localSocket, uint32_t dataSent);
23 static void StartFlow(Ptr<Socket> sock,
24     const char *to,
25     uint16_t port_number);
26
27 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simulator_ns3, surf,
28                                 "Logging specific to the SURF network NS3 module");
29
30 // Constructor.
31 NS3Sim::NS3Sim(){
32 }
33 //Destructor.
34 NS3Sim::~NS3Sim(){
35 }
36
37 static XBT_INLINE void transformSocketPtr (Ptr<Socket> localSocket){
38   std::stringstream sstream;
39   sstream << localSocket ;
40   std::string s = sstream.str();
41   sprintf(socket_key,"%s",s.c_str());
42 }
43
44 /*
45  * This function create a flow from src to dst
46  *
47  * Parameters
48  *              src: node source
49  *              dst: node destination
50  *              port_number: The port number to use
51  *              start: the time the communication start
52  *              addr:  ip address
53  *              totalBytes: number of bytes to transmit
54  */
55 void NS3Sim::create_flow_NS3(
56                 Ptr<Node> src,
57                 Ptr<Node> dst,
58                 uint16_t port_number,
59                 double start,
60                 const char *addr,
61                 uint32_t totalBytes,
62                 void * action)
63 {
64         if(!dict_socket) dict_socket = xbt_dict_new_homogeneous(free);
65
66         PacketSinkHelper sink ("ns3::TcpSocketFactory",
67                                                         InetSocketAddress (Ipv4Address::GetAny(),
68                                                         port_number));
69         sink.Install (dst);
70
71         Ptr<Socket> sock = Socket::CreateSocket (src,
72                                                         TcpSocketFactory::GetTypeId());
73
74         MySocket *mysocket = new MySocket();
75         mysocket->totalBytes = totalBytes;
76         mysocket->remaining = totalBytes;
77         mysocket->bufferedBytes = 0;
78         mysocket->sentBytes = 0;
79         mysocket->finished = 0;
80         mysocket->action = action;
81
82         transformSocketPtr(sock);
83         xbt_dict_set(dict_socket,socket_key, mysocket,NULL);
84
85         sock->Bind(InetSocketAddress(port_number));
86         XBT_DEBUG("Create flow starting to %fs + %fs = %fs",start-ns3_time(), ns3_time(), start);
87
88         Simulator::Schedule (Seconds(start-ns3_time()),&StartFlow, sock, addr, port_number);
89 //      Simulator::Schedule (Seconds(0.0),&StartFlow, sock, addr, port_number);
90
91 }
92
93 void* NS3Sim::get_action_from_socket(void *socket){
94         return ((MySocket *)socket)->action;
95 }
96
97 char NS3Sim::get_finished(void *socket){
98         return ((MySocket *)socket)->finished;
99 }
100
101 double NS3Sim::get_remains_from_socket(void *socket){
102         return ((MySocket *)socket)->remaining;
103 }
104
105 double NS3Sim::get_sent_from_socket(void *socket){
106   return ((MySocket *)socket)->sentBytes;
107 }
108
109 void NS3Sim::simulator_start(double min){
110   if(min > 0.0)
111     Simulator::Stop(Seconds(min));
112   XBT_DEBUG("Start simulator");
113   Simulator::Run ();
114 }
115
116 static MySocket* get_my_socket(Ptr<Socket> localSocket) {
117         transformSocketPtr(localSocket);
118         return (MySocket*)xbt_dict_get_or_null(dict_socket,socket_key);
119 }
120
121 static void receive_callback(Ptr<Socket> localSocket){
122   MySocket* mysocket = get_my_socket(localSocket);
123
124   if (mysocket->finished == 0){
125     mysocket->finished = 1;
126     XBT_DEBUG("recv_cb of F[%p, %p, %d]", mysocket, mysocket->action, mysocket->totalBytes);
127     XBT_DEBUG("Stop simulator at %f seconds", Simulator::Now().GetSeconds());
128     Simulator::Stop(Seconds(0.0));
129     Simulator::Run();
130   }
131 }
132
133 static void send_callback(Ptr<Socket> localSocket, uint32_t txSpace){
134         MySocket* mysocket = get_my_socket(localSocket);
135
136         if (mysocket->remaining == 0){
137                   //all data was already buffered (and socket was already closed), just return
138                   return;
139         }
140
141         uint8_t *data = (uint8_t*)malloc(sizeof(uint8_t)*txSpace);
142
143         while (mysocket->bufferedBytes < mysocket->totalBytes
144                         && localSocket->GetTxAvailable () > 0)
145         {
146       uint32_t toWrite = min ((mysocket->remaining), txSpace);
147       toWrite = min (toWrite, localSocket->GetTxAvailable ());
148       int amountSent = localSocket->Send (data, toWrite, 0);
149
150       if(amountSent < 0)
151           return;
152       (mysocket->bufferedBytes) += amountSent;
153       (mysocket->remaining) -= amountSent;
154       XBT_DEBUG("send_cb of F[%p, %p, %d] (%d/%d) %d buffered", mysocket, mysocket->action, mysocket->totalBytes, mysocket->remaining, mysocket->totalBytes, amountSent);
155
156     }
157
158         free(data);
159
160         if ((mysocket->bufferedBytes) >= mysocket->totalBytes){
161                 localSocket->Close();
162         }
163 }
164
165 static void datasent_callback(Ptr<Socket> localSocket, uint32_t dataSent){
166   MySocket* mysocket = get_my_socket(localSocket);
167   mysocket->sentBytes += dataSent;
168   XBT_DEBUG("datasent_cb of F[%p, %p, %d] %d sent", mysocket, mysocket->action, mysocket->totalBytes, dataSent);
169 }
170
171 static void normalClose_callback(Ptr<Socket> localSocket){
172   MySocket* mysocket = get_my_socket(localSocket);
173   XBT_DEBUG("normalClose_cb of F[%p, %p, %d]", mysocket, mysocket->action, mysocket->totalBytes);
174   receive_callback (localSocket);
175 }
176
177 static void errorClose_callback(Ptr<Socket> localSocket){
178   MySocket* mysocket = get_my_socket(localSocket);
179   XBT_DEBUG("errorClose_cb of F[%p, %p, %d]", mysocket, mysocket->action, mysocket->totalBytes);
180   xbt_die("NS3: a socket was closed anormally");
181 }
182
183 static void succeededConnect_callback(Ptr<Socket> localSocket){
184   MySocket* mysocket = get_my_socket(localSocket);
185   XBT_DEBUG("succeededConnect_cb of F[%p, %p, %d]", mysocket, mysocket->action, mysocket->totalBytes);
186 }
187
188 static void failedConnect_callback(Ptr<Socket> localSocket){
189   MySocket* mysocket = get_my_socket(localSocket);
190   XBT_DEBUG("failedConnect_cb of F[%p, %p, %d]", mysocket, mysocket->action, mysocket->totalBytes);
191   xbt_die("NS3: a socket failed to connect");
192 }
193
194 static void StartFlow(Ptr<Socket> sock,
195     const char *to,
196     uint16_t port_number)
197 {
198   InetSocketAddress serverAddr (to, port_number);
199
200   sock->Connect(serverAddr);
201   sock->SetSendCallback (MakeCallback (&send_callback));
202   sock->SetRecvCallback (MakeCallback (&receive_callback));
203   sock->SetDataSentCallback (MakeCallback (&datasent_callback));
204   sock->SetConnectCallback (MakeCallback (&succeededConnect_callback), MakeCallback (&failedConnect_callback));
205   sock->SetCloseCallbacks (MakeCallback (&normalClose_callback), MakeCallback (&errorClose_callback));
206
207   MySocket* mysocket = get_my_socket(sock);
208   XBT_DEBUG("startFlow_cb of F[%p, %p, %d] dest=%s port=%d", mysocket, mysocket->action, mysocket->totalBytes, to, port_number);
209 }