X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/53f32ad3574aa3e8c7f17b3721208313c8e8d69d..7d0421bd87d5d3ceb1345ce3ecf1e21cc4753060:/src/surf/ns3/ns3_simulator.cc diff --git a/src/surf/ns3/ns3_simulator.cc b/src/surf/ns3/ns3_simulator.cc index 0a2e4a58b3..8e25eba209 100644 --- a/src/surf/ns3/ns3_simulator.cc +++ b/src/surf/ns3/ns3_simulator.cc @@ -7,18 +7,18 @@ #include "surf/ns3/ns3_simulator.h" #include "xbt/dict.h" #include "xbt/log.h" +#include "xbt/sysdep.h" using namespace ns3; using namespace std; -static const uint32_t writeSize = 1024; // limit the amout of data to write -uint8_t data[writeSize]; xbt_dict_t dict_socket = NULL; NS3Sim SimulatorNS3; static void receive_callback(Ptr localSocket); static void send_callback(Ptr localSocket, uint32_t txSpace); +static void datasent_callback(Ptr localSocket, uint32_t dataSent); static void StartFlow(Ptr sock, const char *to, uint16_t port_number); @@ -42,7 +42,7 @@ NS3Sim::~NS3Sim(){ * port_number: The port number to use * start: the time the communication start * addr: ip address - * TotalBytes: number of bytes to transmit + * totalBytes: number of bytes to transmit */ void NS3Sim::create_flow_NS3( Ptr src, @@ -50,17 +50,22 @@ void NS3Sim::create_flow_NS3( uint16_t port_number, double start, const char *addr, - uint32_t TotalBytes, + uint32_t totalBytes, void * action) { if(!dict_socket) dict_socket = xbt_dict_new(); - PacketSinkHelper sink ("ns3::TcpSocketFactory", InetSocketAddress (Ipv4Address::GetAny(), port_number)); + + PacketSinkHelper sink ("ns3::TcpSocketFactory", + InetSocketAddress (Ipv4Address::GetAny(), + port_number)); sink.Install (dst); - Ptr sock = Socket::CreateSocket (src, TypeId::LookupByName ("ns3::TcpSocketFactory")); + Ptr sock = Socket::CreateSocket (src, + TcpSocketFactory::GetTypeId()); + MySocket *mysocket = new MySocket(); - mysocket->TotalBytes = TotalBytes; - mysocket->remaining = TotalBytes; - mysocket->last_amount_sent = 0; + mysocket->totalBytes = totalBytes; + mysocket->remaining = totalBytes; + mysocket->bufferedBytes = 0; mysocket->sentBytes = 0; mysocket->finished = 0; mysocket->action = action; @@ -81,12 +86,8 @@ double NS3Sim::get_remains_from_socket(void *socket){ return ((MySocket *)socket)->remaining; } -double NS3Sim::get_last_amount_sent_from_socket(void *socket){ - return ((MySocket *)socket)->last_amount_sent; -} - -void NS3Sim::reset_last_amount_sent_from_socket(void *socket){ - ((MySocket *)socket)->last_amount_sent = 0; +double NS3Sim::get_sent_from_socket(void *socket){ + return ((MySocket *)socket)->sentBytes; } void NS3Sim::simulator_stop(double min){ @@ -102,47 +103,69 @@ void NS3Sim::simulator_start(void){ } static void receive_callback(Ptr localSocket){ - Address addr; - localSocket->GetSockName (addr); - InetSocketAddress iaddr = InetSocketAddress::ConvertFrom (addr); MySocket* mysocket = (MySocket*)xbt_dict_get_or_null(dict_socket,(char*)&localSocket); - mysocket->finished = 1; - //cout << "[" << Simulator::Now ().GetSeconds() << "] " << "Received [" << mysocket->TotalBytes << "bytes], from: " << iaddr.GetIpv4 () << " port: " << iaddr.GetPort () << endl; - std::stringstream sstream; - sstream << Simulator::Now ().GetSeconds(); - std::string s = sstream.str(); - size_t size = s.size() + 1; - char * time_sec = new char[ size ]; - strncpy( time_sec, s.c_str(), size ); - XBT_DEBUG("Stop simulator at %s seconds",time_sec); - Simulator::Stop(); + if (mysocket->finished == 0){ + mysocket->finished = 1; +// cout << "[" << Simulator::Now ().GetSeconds() << "] " << "recv_cb of F[" << mysocket->totalBytes << "] " << endl; + XBT_DEBUG("Stop simulator at %f seconds", Simulator::Now().GetSeconds()); + Simulator::Stop(Seconds(0.0)); + Simulator::Run(); + } } static void send_callback(Ptr localSocket, uint32_t txSpace){ - - Address addr; - localSocket->GetSockName (addr); - InetSocketAddress iaddr = InetSocketAddress::ConvertFrom (addr); MySocket* mysocket = (MySocket*)xbt_dict_get_or_null(dict_socket,(char*)&localSocket); - uint32_t totalBytes = mysocket->TotalBytes; - while ((mysocket->sentBytes) < totalBytes && localSocket->GetTxAvailable () > 0){ - uint32_t toWrite = min ((mysocket->remaining), writeSize); - toWrite = min (toWrite, localSocket->GetTxAvailable ()); - int amountSent = localSocket->Send (&data[0], toWrite, 0); - - if(amountSent < 0) - return; - - (mysocket->last_amount_sent) += amountSent; - (mysocket->sentBytes) += amountSent; - (mysocket->remaining) -= amountSent; - //cout << "[" << Simulator::Now ().GetSeconds() << "] " << "Send one packet, remaining "<< mysocket->remaining << " bytes!" << endl; - } - if ((mysocket->sentBytes) >= totalBytes){ - localSocket->Close(); + + if (mysocket->remaining == 0){ + //all data was already buffered (and socket was already closed), just return + return; } + uint32_t toWrite = min (mysocket->remaining, txSpace); + uint8_t *data = (uint8_t*)malloc(sizeof(uint8_t)*toWrite); + int amountSent = localSocket->Send (&data[0], toWrite, 0); + free (data); + if (amountSent > 0){ + mysocket->bufferedBytes += amountSent; + mysocket->remaining -= amountSent; + } +// cout << "[" << Simulator::Now ().GetSeconds() << "] " << "send_cb of F[" << mysocket->totalBytes << "] ("<< mysocket->remaining << " / " << mysocket->totalBytes << ") " << amountSent << " buffered." << endl; + + if (mysocket->remaining == 0){ + //everything was buffered to send, tell NS3 to close the socket + localSocket->Close(); + } + return; +} + +static void datasent_callback(Ptr localSocket, uint32_t dataSent){ + MySocket* mysocket = (MySocket*)xbt_dict_get_or_null(dict_socket,(char*)&localSocket); + mysocket->sentBytes += dataSent; +// cout << "[" << Simulator::Now ().GetSeconds() << "] " << "datasent_cb of F[" << mysocket->totalBytes << "] " << dataSent << " sent." << endl; +} + +static void normalClose_callback(Ptr localSocket){ + MySocket* mysocket = (MySocket*)xbt_dict_get_or_null(dict_socket,(char*)&localSocket); +// cout << "[" << Simulator::Now ().GetSeconds() << "] " << "normalClose_cb of F[" << mysocket->totalBytes << "]" << endl; + receive_callback (localSocket); +} + +static void errorClose_callback(Ptr localSocket){ + MySocket* mysocket = (MySocket*)xbt_dict_get_or_null(dict_socket,(char*)&localSocket); +// cout << "[" << Simulator::Now ().GetSeconds() << "] " << "errorClose_cb of F[" << mysocket->totalBytes << "]" << endl; + xbt_die("NS3: a socket was closed anormally"); +} + +static void succeededConnect_callback(Ptr localSocket){ + MySocket* mysocket = (MySocket*)xbt_dict_get_or_null(dict_socket,(char*)&localSocket); +// cout << "[" << Simulator::Now ().GetSeconds() << "] " << "succeededConnect_cb of F[" << mysocket->totalBytes << "]" << endl; +} + +static void failedConnect_callback(Ptr localSocket){ + MySocket* mysocket = (MySocket*)xbt_dict_get_or_null(dict_socket,(char*)&localSocket); +// cout << "[" << Simulator::Now ().GetSeconds() << "] " << "failedConnect_cb of F[" << mysocket->totalBytes << "]" << endl; + xbt_die("NS3: a socket failed to connect"); } static void StartFlow(Ptr sock, @@ -151,9 +174,13 @@ static void StartFlow(Ptr sock, { InetSocketAddress serverAddr (to, port_number); - //cout << "[" << Simulator::Now().GetSeconds() << "] Starting flow to " << to << " using port " << port_number << endl; - sock->Connect(serverAddr); sock->SetSendCallback (MakeCallback (&send_callback)); sock->SetRecvCallback (MakeCallback (&receive_callback)); + sock->SetDataSentCallback (MakeCallback (&datasent_callback)); + sock->SetConnectCallback (MakeCallback (&succeededConnect_callback), MakeCallback (&failedConnect_callback)); + sock->SetCloseCallbacks (MakeCallback (&normalClose_callback), MakeCallback (&errorClose_callback)); + + MySocket* mysocket = (MySocket*)xbt_dict_get_or_null(dict_socket,(char*)&sock); +// cout << "[" << Simulator::Now().GetSeconds() << "] Starting flow to " << to << " using port " << port_number << endl; }