From 3415111c9b5acc19a1df8352138fe56d02e35eea Mon Sep 17 00:00:00 2001 From: velho Date: Mon, 9 Jun 2008 11:26:14 +0000 Subject: [PATCH] Added the feature of getting remaining communication from gtnets flows. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@5573 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- examples/msg/gtnets/gtnets_onelink.c | 120 +++++++++++++++++++++++++++ include/msg/msg.h | 1 + src/msg/task.c | 14 ++++ src/surf/gtnets/gtnets_interface.cc | 6 ++ src/surf/gtnets/gtnets_interface.h | 2 + src/surf/gtnets/gtnets_simulator.cc | 21 +++++ src/surf/gtnets/gtnets_simulator.h | 9 +- 7 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 examples/msg/gtnets/gtnets_onelink.c diff --git a/examples/msg/gtnets/gtnets_onelink.c b/examples/msg/gtnets/gtnets_onelink.c new file mode 100644 index 0000000000..f84936158a --- /dev/null +++ b/examples/msg/gtnets/gtnets_onelink.c @@ -0,0 +1,120 @@ +#include +#include +#include "msg/msg.h" +/* Create a log channel to have nice outputs. */ +#include "xbt/log.h" +#include "xbt/asserts.h" + +XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,"Messages specific for this msg example"); + +int master(int argc, char *argv[]); +int slave(int argc, char *argv[]); +MSG_error_t test_all(const char *platform_file, const char *application_file); + + +typedef enum { + PORT_22 = 0, + MAX_CHANNEL +} channel_t; + + +double time=0; +double task_comm_size=0.0; + +#define FINALIZE ((void*)221297) /* a magic number to tell people to stop working */ + +/** Master */ +int master(int argc, char *argv[]){ + char *slavename = NULL; + m_task_t todo = NULL; + m_host_t slave; + MSG_error_t a; + + if(argc != 3){ + xbt_assert1(0, "Invalid number of arguments in master process, expected 4 got %d\n", argc); + } + + /* data size */ + xbt_assert1(sscanf(argv[1],"%lg", &task_comm_size), + "Invalid argument %s\n", argv[1]); + INFO1("task_comm_size : %f\n", task_comm_size); + + /* slave name */ + slavename = argv[2]; + + + /* Task creation. */ + char sprintf_buffer[64] = "Task_0"; + todo = MSG_task_create(sprintf_buffer, 0, task_comm_size, NULL); + + /* Process organisation */ + slave = MSG_get_host_by_name(slavename); + + /* Time measurement */ + time = MSG_get_clock(); + + a = MSG_task_put(todo, slave, PORT_22); + xbt_assert0(!a,"MSG_task_put failure"); + + return 0; +} /* end_of_master */ + + +/** Receiver function */ +int slave(int argc, char *argv[]) +{ + m_task_t task = NULL; + int a; + + a = MSG_task_get(&(task), PORT_22); + xbt_assert0(!a, "MSG_task_get failure"); + + /* Elapsed time */ + time = MSG_get_clock() - time; + + INFO1("Total transmission time: %f", time); + INFO1("Hockney estimated bandwidth: %f", ((task_comm_size*8)/time)/1000); + + MSG_task_destroy(task); + return 0; +} /* end_of_slave */ + + + + +/** Test function */ +MSG_error_t test_all(const char *platform_file, + const char *application_file){ + MSG_error_t res = MSG_OK; + + /* Simulation setting */ + MSG_set_channel_number(MAX_CHANNEL); + MSG_paje_output("msg_test.trace"); + MSG_create_environment(platform_file); + + /* Application deployment */ + MSG_function_register("master", master); + MSG_function_register("slave", slave); + MSG_launch_application(application_file); + + res = MSG_main(); + return res; +} /* end_of_test_all */ + + +/** Main function */ +int main(int argc, char *argv[]){ + MSG_error_t res = MSG_OK; + + MSG_global_init(&argc,argv); + if (argc < 3) { + printf ("Usage: %s platform_file deployment_file\n",argv[0]); + exit(1); + } + res = test_all(argv[1],argv[2]); + + MSG_clean(); + + if(res==MSG_OK) return 0; + else return 1; +} /* end_of_main */ diff --git a/include/msg/msg.h b/include/msg/msg.h index 0cc7ca70a6..4fe9c1d18e 100644 --- a/include/msg/msg.h +++ b/include/msg/msg.h @@ -132,6 +132,7 @@ XBT_PUBLIC(MSG_error_t) MSG_get_errno(void); XBT_PUBLIC(double) MSG_task_get_compute_duration(m_task_t task); XBT_PUBLIC(double) MSG_task_get_remaining_computation(m_task_t task); +XBT_PUBLIC(double) MSG_task_get_remaining_communication(m_task_t task); XBT_PUBLIC(double) MSG_task_get_data_size(m_task_t task); diff --git a/src/msg/task.c b/src/msg/task.c index 62bcdac519..dea7d9ba31 100644 --- a/src/msg/task.c +++ b/src/msg/task.c @@ -212,6 +212,20 @@ double MSG_task_get_remaining_computation(m_task_t task) } } + + +/** \ingroup m_task_management + * \brief Returns the total amount received by a task #m_task_t. + * + */ +double MSG_task_get_remaining_communication(m_task_t task) +{ + xbt_assert0((task != NULL) + && (task->simdata != NULL), "Invalid parameter"); + + return SIMIX_action_get_remains(task->simdata->comm); +} + /** \ingroup m_task_management * \brief Returns the size of the data attached to a task #m_task_t. * diff --git a/src/surf/gtnets/gtnets_interface.cc b/src/surf/gtnets/gtnets_interface.cc index ab9b0aab39..8768db965d 100644 --- a/src/surf/gtnets/gtnets_interface.cc +++ b/src/surf/gtnets/gtnets_interface.cc @@ -61,6 +61,12 @@ int gtnets_run_until_next_flow_completion(void ***metadata, int *number_of_flows return gtnets_sim->run_until_next_flow_completion(metadata, number_of_flows); } +// get the total received in bytes using the TCPServer object totRx field +double gtnets_get_flow_rx(void *metadata){ + return gtnets_sim->gtnets_get_flow_rx(metadata); +} + + // run for a given time (double) int gtnets_run(Time_t deltat){ gtnets_sim->run(deltat); diff --git a/src/surf/gtnets/gtnets_interface.h b/src/surf/gtnets/gtnets_interface.h index aa3f2d2469..6d11d68e24 100644 --- a/src/surf/gtnets/gtnets_interface.h +++ b/src/surf/gtnets/gtnets_interface.h @@ -20,6 +20,8 @@ extern "C" { int gtnets_create_flow(int src, int dst, long datasize, void* metadata); double gtnets_get_time_to_next_flow_completion(); int gtnets_run_until_next_flow_completion(void*** metadata, int* number_of_flows); + double gtnets_get_flow_rx(void *metadata); + int gtnets_run(double delta); int gtnets_finalize(); diff --git a/src/surf/gtnets/gtnets_simulator.cc b/src/surf/gtnets/gtnets_simulator.cc index eac36f447b..4b3032426a 100644 --- a/src/surf/gtnets/gtnets_simulator.cc +++ b/src/surf/gtnets/gtnets_simulator.cc @@ -26,6 +26,7 @@ GTSim::GTSim(){ sim_ = new Simulator(); topo_ = new GTNETS_Topology(); + sim_->verbose=false; // Set default values. TCP::DefaultAdvWin(wsize); TCP::DefaultSegSize(1000); @@ -250,13 +251,25 @@ int GTSim::create_flow(int src, int dst, long datasize, void* metadata){ gtnets_servers_[nflow_] = (TCPServer*)gtnets_nodes_[dst_node]-> AddApplication(TCPServer(TCPReno())); + //added by arnaud in order to avoid TCPServer duplicates. + //It is not needed since we create a new TCPServer for + //each flow. Also we need to control this variable + //to proper set the value of remaining communication amount. + //See functions: + gtnets_servers_[nflow_]->copyOnConnect=false; gtnets_servers_[nflow_]->BindAndListen(80); gtnets_clients_[nflow_] = (TCPSend*)gtnets_nodes_[src_node]-> AddApplication(TCPSend(metadata, gtnets_nodes_[dst_node]->GetIPAddr(), 80, Constant(datasize), TCPReno())); + gtnets_clients_[nflow_]->copyOnConnect=false; gtnets_clients_[nflow_]->SetSendCallBack(tcp_sent_callback); gtnets_clients_[nflow_]->Start(0); + + //added by pedro in order to get statistics + //map an action from a gtnets flow id + //metadata is the action and flow is the id to gtnets + gtnets_action_to_flow_[metadata] = nflow_; nflow_++; return 0; @@ -267,6 +280,9 @@ Time_t GTSim::get_time_to_next_flow_completion(){ Time_t t1; int pfds[2]; meta_flg=0; + + //remain needs to be updated in the future + Count_t remain; pipe(pfds); @@ -284,6 +300,11 @@ Time_t GTSim::get_time_to_next_flow_completion(){ return t1; } +double GTSim::gtnets_get_flow_rx(void *metadata){ + int flow = gtnets_action_to_flow_[metadata]; + return ((TCPServer *)gtnets_servers_[flow])->totRx;//action_remain[flow]; +} + int GTSim::run_until_next_flow_completion(void ***metadata, int *number_of_flows){ meta_flows.clear(); meta_nflow = number_of_flows; diff --git a/src/surf/gtnets/gtnets_simulator.h b/src/surf/gtnets/gtnets_simulator.h index 4194383559..1b5f50efff 100644 --- a/src/surf/gtnets/gtnets_simulator.h +++ b/src/surf/gtnets/gtnets_simulator.h @@ -45,7 +45,8 @@ public: double get_time_to_next_flow_completion(); int run_until_next_flow_completion(void*** metadata, int* number_of_flows); int run(double deltat); - + //added by pedro, returns the total received by the TCPServer peer of the given action + double gtnets_get_flow_rx(void *metadata); void create_gtnets_topology(); private: void add_nodes(); @@ -60,10 +61,12 @@ private: int is_topology_; int nflow_; - map gtnets_links_; - map gtnets_nodes_; map gtnets_servers_; map gtnets_clients_; + map gtnets_links_; + map gtnets_nodes_; + //added by pedro in order to get statistics + map gtnets_action_to_flow_; map gtnets_metadata_; }; -- 2.20.1