Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[surf] Move host creation callbacks of Infiniband to hostCreatedCallbacks
[simgrid.git] / src / surf / network_ib.cpp
1 /* Copyright (c) 2014-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "network_ib.hpp"
8 #include "simgrid/sg_config.h"
9 #include "maxmin_private.hpp"
10 #include "src/surf/host_interface.hpp"
11
12 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(surf_network);
13
14 static void IB_create_host_callback(Host* host){
15   
16   static int id=0;
17 // pour t->id -> rajouter une nouvelle struct dans le dict, pour stocker les comms actives
18   if(((NetworkIBModel*)surf_network_model)->active_nodes==NULL)
19     ((NetworkIBModel*)surf_network_model)->active_nodes=xbt_dict_new();
20   
21   IBNode* act = new IBNode(id);
22
23   id++;
24   xbt_dict_set(((NetworkIBModel*)surf_network_model)->active_nodes,
25     host->getName(), act, NULL);
26  
27 }
28
29 static void IB_action_state_changed_callback(NetworkAction *action, e_surf_action_state_t statein, e_surf_action_state_t stateout){
30  if(statein!=SURF_ACTION_RUNNING|| stateout!=SURF_ACTION_DONE)
31     return;
32   std::pair<IBNode*,IBNode*> pair = ((NetworkIBModel*)surf_network_model)->active_comms[action];
33   XBT_DEBUG("IB callback - action %p finished", action);
34  
35  ((NetworkIBModel*)surf_network_model)->updateIBfactors(action, pair.first, pair.second, 1);
36
37   ((NetworkIBModel*)surf_network_model)->active_comms.erase(action);
38   
39 }
40
41
42 static void IB_action_init_callback(NetworkAction *action,RoutingEdge *src, RoutingEdge *dst, double size, double rate){
43   if(((NetworkIBModel*)surf_network_model)->active_nodes==NULL)
44     xbt_die("IB comm added, without any node connected !");
45   
46   IBNode* act_src= (IBNode*) xbt_dict_get_or_null(((NetworkIBModel*)surf_network_model)->active_nodes, src->getName());
47   if(act_src==NULL)
48     xbt_die("could not find src node active comms !");
49   //act_src->rate=rate;
50   
51   IBNode* act_dst= (IBNode*) xbt_dict_get_or_null(((NetworkIBModel*)surf_network_model)->active_nodes, dst->getName());
52   if(act_dst==NULL)
53     xbt_die("could not find dst node active comms !");  
54  // act_dst->rate=rate;
55   
56   ((NetworkIBModel*)surf_network_model)->active_comms[action]=make_pair(act_src, act_dst);
57   //post the action in the second dist, to retrieve in the other callback
58   XBT_DEBUG("IB callback - action %p init", action);
59
60   ((NetworkIBModel*)surf_network_model)->updateIBfactors(action, act_src, act_dst, 0);
61   
62 }
63
64
65
66 /*********
67  * Model *
68  *********/
69
70 /************************************************************************/
71 /* New model based on MPI contention model for Infiniband platforms */
72 /************************************************************************/
73 /* @Inproceedings{mescal_vienne_phd, */
74 /*  author={Jérôme Vienne}, */
75 /*  title={prédiction de performances d’applications de calcul haute performance sur réseau Infiniband}, */
76 /*  address={Grenoble FRANCE}, */
77 /*  month=june, */
78 /*  year={2010} */
79 /*  } */
80 void surf_network_model_init_IB(void)
81 {
82
83   if (surf_network_model)
84     return;
85   surf_network_model = new NetworkIBModel();
86   net_define_callbacks();
87   xbt_dynar_push(all_existing_models, &surf_network_model);
88   surf_callback_connect(networkActionStateChangedCallbacks, IB_action_state_changed_callback);
89   surf_callback_connect(networkCommunicateCallbacks, IB_action_init_callback);
90
91   hostCreatedCallbacks.connect(IB_create_host_callback);
92   xbt_cfg_setdefault_double(_sg_cfg_set, "network/weight_S", 8775);
93   
94 }
95
96 NetworkIBModel::NetworkIBModel()
97  : NetworkSmpiModel() {
98   m_haveGap=false;
99   active_nodes=NULL;
100     
101   const char* IB_factors_string=sg_cfg_get_string("smpi/IB_penalty_factors");
102   xbt_dynar_t radical_elements = xbt_str_split(IB_factors_string, ";");
103   
104   if(xbt_dynar_length(radical_elements)!=3)
105     surf_parse_error("smpi/IB_penalty_factors should be provided and contain 3 elements, semi-colon separated : for example 0.965;0.925;1.35");
106   
107   Be = atof(xbt_dynar_get_as(radical_elements, 0, char *));
108   Bs = atof(xbt_dynar_get_as(radical_elements, 1, char *));
109   ys = atof(xbt_dynar_get_as(radical_elements, 2, char *));
110
111   xbt_dynar_free(&radical_elements);
112 }
113
114 NetworkIBModel::~NetworkIBModel()
115 {
116   xbt_dict_cursor_t cursor = NULL;
117   IBNode* instance = NULL;
118   char *name = NULL;
119   xbt_dict_foreach(active_nodes, cursor, name, instance)
120     delete instance;
121   xbt_dict_free(&active_nodes);
122 }
123
124 void NetworkIBModel::computeIBfactors(IBNode *root) {
125   double penalized_bw=0.0;
126   double num_comm_out = (double) root->ActiveCommsUp.size();
127   double max_penalty_out=0.0;
128   //first, compute all outbound penalties to get their max
129   for (std::vector<ActiveComm*>::iterator it= root->ActiveCommsUp.begin(); it != root->ActiveCommsUp.end(); ++it) {
130     double my_penalty_out = 1.0;
131
132     if(num_comm_out!=1){
133       if((*it)->destination->nbActiveCommsDown > 2)//number of comms sent to the receiving node
134         my_penalty_out = num_comm_out * Bs * ys;
135       else
136         my_penalty_out = num_comm_out * Bs;
137     }
138
139     max_penalty_out = max(max_penalty_out,my_penalty_out);
140   }
141
142   for (std::vector<ActiveComm*>::iterator it= root->ActiveCommsUp.begin(); it != root->ActiveCommsUp.end(); ++it) {
143
144     //compute inbound penalty
145     double my_penalty_in = 1.0;
146     int nb_comms = (*it)->destination->nbActiveCommsDown;//total number of incoming comms
147     if(nb_comms!=1)
148       my_penalty_in = ((*it)->destination->ActiveCommsDown)[root] //number of comm sent to dest by root node
149                       * Be 
150                       * (*it)->destination->ActiveCommsDown.size();//number of different nodes sending to dest
151     
152     double penalty=max(my_penalty_in,max_penalty_out);
153     
154     double rate_before_update = (*it)->action->getBound();
155     //save initial rate of the action
156     if((*it)->init_rate==-1) 
157       (*it)->init_rate= rate_before_update;
158     
159     penalized_bw= ! num_comm_out ? (*it)->init_rate : (*it)->init_rate /penalty;
160     
161     if (!double_equals(penalized_bw, rate_before_update, sg_surf_precision)){
162       XBT_DEBUG("%d->%d action %p penalty updated : bw now %f, before %f , initial rate %f", root->id,(*it)->destination->id,(*it)->action,penalized_bw, (*it)->action->getBound(), (*it)->init_rate );
163       lmm_update_variable_bound(p_maxminSystem, (*it)->action->getVariable(), penalized_bw);
164     }else{
165       XBT_DEBUG("%d->%d action %p penalty not updated : bw %f, initial rate %f", root->id,(*it)->destination->id,(*it)->action,penalized_bw, (*it)->init_rate );
166     }
167
168   }
169   XBT_DEBUG("Finished computing IB penalties");
170 }
171
172 void NetworkIBModel::updateIBfactors_rec(IBNode *root, bool* updatedlist) {
173   if(updatedlist[root->id]==0){
174     XBT_DEBUG("IB - Updating rec %d", root->id);
175     computeIBfactors(root);
176     updatedlist[root->id]=1;
177     for (std::vector<ActiveComm*>::iterator it= root->ActiveCommsUp.begin(); it != root->ActiveCommsUp.end(); ++it) {
178         if(updatedlist[(*it)->destination->id]!=1)
179           updateIBfactors_rec((*it)->destination, updatedlist);
180     }
181     for (std::map<IBNode*, int>::iterator it= root->ActiveCommsDown.begin(); it != root->ActiveCommsDown.end(); ++it) {
182         if(updatedlist[it->first->id]!=1)
183           updateIBfactors_rec(it->first, updatedlist);
184     }
185   }
186 }
187
188
189 void NetworkIBModel::updateIBfactors(NetworkAction *action, IBNode *from, IBNode * to, int remove) {
190   if (from == to)//disregard local comms (should use loopback)
191     return;
192   
193   bool* updated=(bool*)xbt_malloc0(xbt_dict_size(active_nodes)*sizeof(bool));
194   ActiveComm* comm=NULL;
195   if(remove){
196     if(to->ActiveCommsDown[from]==1)
197       to->ActiveCommsDown.erase(from);
198     else
199       to->ActiveCommsDown[from]-=1;
200
201     to->nbActiveCommsDown--;
202     for (std::vector<ActiveComm*>::iterator it= from->ActiveCommsUp.begin(); 
203          it != from->ActiveCommsUp.end(); ++it) {
204       if((*it)->action==action){
205         comm=(*it);
206         from->ActiveCommsUp.erase(it);
207         break;
208       }
209     }
210     action->unref();
211
212   }else{
213     action->ref();
214     ActiveComm* comm=new ActiveComm();
215     comm->action=action;
216     comm->destination=to;
217     from->ActiveCommsUp.push_back(comm);
218
219     to->ActiveCommsDown[from]+=1;
220     to->nbActiveCommsDown++;
221   }
222   XBT_DEBUG("IB - Updating %d", from->id);
223   updateIBfactors_rec(from, updated);
224   XBT_DEBUG("IB - Finished updating %d", from->id);
225   if(comm)
226     delete comm;
227   xbt_free(updated);
228 }