Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[surf] Remove sg_platf_mstorage_cb
[simgrid.git] / src / surf / storage_n11.cpp
1 /* Copyright (c) 2013-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "storage_n11.hpp"
8 #include "surf_private.h"
9 #include <math.h> /*ceil*/
10 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(surf_storage);
11
12 static int storage_selective_update = 0;
13 static xbt_swag_t storage_running_action_set_that_does_not_need_being_checked = NULL;
14
15 /*************
16  * CallBacks *
17  *************/
18
19 static XBT_INLINE void routing_storage_type_free(void *r)
20 {
21   storage_type_t stype = (storage_type_t) r;
22   free(stype->model);
23   free(stype->type_id);
24   free(stype->content);
25   free(stype->content_type);
26   xbt_dict_free(&(stype->properties));
27   xbt_dict_free(&(stype->model_properties));
28   free(stype);
29 }
30
31 static XBT_INLINE void surf_storage_resource_free(void *r)
32 {
33   // specific to storage
34   Storage *storage = static_cast<Storage*>(r);
35   // generic resource
36   delete storage;
37 }
38
39 static XBT_INLINE void routing_storage_host_free(void *r)
40 {
41   xbt_dynar_t dyn = (xbt_dynar_t) r;
42   xbt_dynar_free(&dyn);
43 }
44
45 static void mount_free(void *p)
46 {
47   mount_t mnt = (mount_t) p;
48   xbt_free(mnt->name);
49 }
50
51 static void storage_parse_mount(sg_platf_mount_cbarg_t mount)
52 {
53   // Verification of an existing storage
54 #ifndef NDEBUG
55   void* storage = xbt_lib_get_or_null(storage_lib, mount->storageId, ROUTING_STORAGE_LEVEL);
56 #endif
57   xbt_assert(storage,"Disk id \"%s\" does not exists", mount->storageId);
58
59   XBT_DEBUG("ROUTING Mount '%s' on '%s'",mount->storageId, mount->name);
60
61   s_mount_t mnt;
62   mnt.storage = surf_storage_resource_priv(surf_storage_resource_by_name(mount->storageId));
63   mnt.name = xbt_strdup(mount->name);
64
65   if(!mount_list){
66     XBT_DEBUG("Create a Mount list for %s",A_surfxml_host_id);
67     mount_list = xbt_dynar_new(sizeof(s_mount_t), mount_free);
68   }
69   xbt_dynar_push(mount_list, &mnt);
70 }
71
72 void storage_register_callbacks() {
73
74   ROUTING_STORAGE_LEVEL = xbt_lib_add_level(storage_lib, xbt_free_f);
75   ROUTING_STORAGE_HOST_LEVEL = xbt_lib_add_level(storage_lib, routing_storage_host_free);
76   ROUTING_STORAGE_TYPE_LEVEL = xbt_lib_add_level(storage_type_lib, routing_storage_type_free);
77   SURF_STORAGE_LEVEL = xbt_lib_add_level(storage_lib, surf_storage_resource_free);
78
79   sg_platf_mount_add_cb(storage_parse_mount);
80 }
81
82 /*********
83  * Model *
84  *********/
85
86 void surf_storage_model_init_default(void)
87 {
88   surf_storage_model = new StorageN11Model();
89   xbt_dynar_push(all_existing_models, &surf_storage_model);
90 }
91
92 StorageN11Model::StorageN11Model() : StorageModel() {
93   Action *action = NULL;
94
95   XBT_DEBUG("surf_storage_model_init_internal");
96
97   storage_running_action_set_that_does_not_need_being_checked =
98       xbt_swag_new(xbt_swag_offset(*action, p_stateHookup));
99   if (!p_maxminSystem) {
100     p_maxminSystem = lmm_system_new(storage_selective_update);
101   }
102 }
103
104 StorageN11Model::~StorageN11Model(){
105   xbt_swag_free(storage_running_action_set_that_does_not_need_being_checked);
106   storage_running_action_set_that_does_not_need_being_checked = NULL;
107 }
108
109 Storage *StorageN11Model::createStorage(const char* id, const char* type_id,
110     const char* content_name, const char* content_type, xbt_dict_t properties,
111     const char* attach)
112 {
113
114   xbt_assert(!surf_storage_resource_priv(surf_storage_resource_by_name(id)),
115               "Storage '%s' declared several times in the platform file",
116               id);
117
118   storage_type_t storage_type = (storage_type_t) xbt_lib_get_or_null(storage_type_lib, type_id,ROUTING_STORAGE_TYPE_LEVEL);
119
120   double Bread  = surf_parse_get_bandwidth((char*)xbt_dict_get(storage_type->model_properties, "Bread"));
121   double Bwrite = surf_parse_get_bandwidth((char*)xbt_dict_get(storage_type->model_properties, "Bwrite"));
122   double Bconnection   = surf_parse_get_bandwidth((char*)xbt_dict_get(storage_type->model_properties, "Bconnection"));
123
124   Storage *storage = new StorageN11(this, id, properties, p_maxminSystem,
125       Bread, Bwrite, Bconnection, type_id, (char *)content_name,
126       xbt_strdup(content_type), storage_type->size, (char *) attach);
127   surf_callback_emit(storageCreatedCallbacks, storage);
128   xbt_lib_set(storage_lib, id, SURF_STORAGE_LEVEL, storage);
129
130   XBT_DEBUG("SURF storage create resource\n\t\tid '%s'\n\t\ttype '%s'\n\t\tproperties '%p'\n\t\tBread '%f'\n",
131       id,
132       type_id,
133       properties,
134       Bread);
135
136   if(!p_storageList)
137     p_storageList = xbt_dynar_new(sizeof(char *),NULL);
138   xbt_dynar_push(p_storageList, &storage);
139
140   return storage;
141 }
142
143 double StorageN11Model::shareResources(double now)
144 {
145   XBT_DEBUG("storage_share_resources %f", now);
146   unsigned int i, j;
147   Storage *storage;
148   void *_write_action;
149   StorageAction *write_action;
150
151   double min_completion = shareResourcesMaxMin(getRunningActionSet(),
152       p_maxminSystem, lmm_solve);
153
154   double rate;
155   // Foreach disk
156   xbt_dynar_foreach(p_storageList,i,storage)
157   {
158     rate = 0;
159     // Foreach write action on disk
160     xbt_dynar_foreach(storage->p_writeActions, j, _write_action)
161     {
162       write_action = static_cast<StorageAction*>(_write_action);
163       rate += lmm_variable_getvalue(write_action->getVariable());
164     }
165     if(rate > 0)
166       min_completion = MIN(min_completion, (storage->m_size-storage->m_usedSize)/rate);
167   }
168
169   return min_completion;
170 }
171
172 void StorageN11Model::updateActionsState(double /*now*/, double delta)
173 {
174   StorageAction *action = NULL;
175
176   ActionList *actionSet = getRunningActionSet();
177   for(ActionList::iterator it(actionSet->begin()), itNext=it, itend(actionSet->end())
178      ; it != itend ; it=itNext) {
179     ++itNext;
180     action = static_cast<StorageAction*>(&*it);
181
182     if(action->m_type == WRITE)
183     {
184       // Update the disk usage
185       // Update the file size
186       // For each action of type write
187       volatile double current_progress =
188           delta * lmm_variable_getvalue(action->getVariable());
189       long int incr = current_progress;
190
191       XBT_DEBUG("%s:\n\t progress =  %.2f, current_progress = %.2f, "
192                 "incr = %ld, lrint(1) = %ld, lrint(2) = %ld",
193                 action->p_file->name,
194                 action->progress,  current_progress, incr,
195                 lrint(action->progress + current_progress),
196                 lrint(action->progress)+ incr);
197
198       /* take care of rounding error accumulation */
199       if (lrint(action->progress + current_progress) >
200           lrint(action->progress)+ incr)
201         incr++;
202
203       action->progress +=current_progress;
204
205       action->p_storage->m_usedSize += incr; // disk usage
206       action->p_file->current_position+= incr; // current_position
207       //  which becomes the new file size
208       action->p_file->size = action->p_file->current_position ;
209
210       sg_size_t *psize = xbt_new(sg_size_t,1);
211       *psize = action->p_file->size;
212       xbt_dict_t content_dict = action->p_storage->p_content;
213       xbt_dict_set(content_dict, action->p_file->name, psize, NULL);
214     }
215
216     action->updateRemains(lmm_variable_getvalue(action->getVariable()) * delta);
217
218     if (action->getMaxDuration() != NO_MAX_DURATION)
219       action->updateMaxDuration(delta);
220
221     if(action->getRemainsNoUpdate() > 0 &&
222         lmm_get_variable_weight(action->getVariable()) > 0 &&
223         action->p_storage->m_usedSize == action->p_storage->m_size)
224     {
225       action->finish();
226       action->setState(SURF_ACTION_FAILED);
227     } else if ((action->getRemainsNoUpdate() <= 0) &&
228         (lmm_get_variable_weight(action->getVariable()) > 0))
229     {
230       action->finish();
231       action->setState(SURF_ACTION_DONE);
232     } else if ((action->getMaxDuration() != NO_MAX_DURATION) &&
233                (action->getMaxDuration() <= 0))
234     {
235       action->finish();
236       action->setState(SURF_ACTION_DONE);
237     }
238   }
239   return;
240 }
241
242 /************
243  * Resource *
244  ************/
245
246 StorageN11::StorageN11(StorageModel *model, const char* name,
247     xbt_dict_t properties, lmm_system_t maxminSystem, double bread,
248     double bwrite, double bconnection, const char* type_id, char *content_name,
249     char *content_type, sg_size_t size, char *attach)
250  : Storage(model, name, properties,
251            maxminSystem, bread, bwrite, bconnection, type_id, content_name, content_type, size, attach) {
252   XBT_DEBUG("Create resource with Bconnection '%f' Bread '%f' Bwrite '%f' and Size '%llu'", bconnection, bread, bwrite, size);
253 }
254
255 StorageAction *StorageN11::open(const char* mount, const char* path)
256 {
257   XBT_DEBUG("\tOpen file '%s'",path);
258
259   sg_size_t size, *psize;
260   psize = (sg_size_t*) xbt_dict_get_or_null(p_content, path);
261   // if file does not exist create an empty file
262   if(psize)
263     size = *psize;
264   else {
265     psize = xbt_new(sg_size_t,1);
266     size = 0;
267     *psize = size;
268     xbt_dict_set(p_content, path, psize, NULL);
269     XBT_DEBUG("File '%s' was not found, file created.",path);
270   }
271   surf_file_t file = xbt_new0(s_surf_file_t,1);
272   file->name = xbt_strdup(path);
273   file->size = size;
274   file->mount = xbt_strdup(mount);
275   file->current_position = 0;
276
277   StorageAction *action = new StorageN11Action(getModel(), 0, getState() != SURF_RESOURCE_ON, this, OPEN);
278   action->p_file = file;
279
280   return action;
281 }
282
283 StorageAction *StorageN11::close(surf_file_t fd)
284 {
285   char *filename = fd->name;
286   XBT_DEBUG("\tClose file '%s' size '%llu'", filename, fd->size);
287   // unref write actions from storage
288   void *_write_action;
289   StorageAction *write_action;
290   unsigned int i;
291   xbt_dynar_foreach(p_writeActions, i, _write_action) {
292         write_action = static_cast<StorageAction*>(_write_action);
293     if ((write_action->p_file) == fd) {
294       xbt_dynar_cursor_rm(p_writeActions, &i);
295       write_action->unref();
296     }
297   }
298   free(fd->name);
299   free(fd->mount);
300   xbt_free(fd);
301   StorageAction *action = new StorageN11Action(getModel(), 0, getState() != SURF_RESOURCE_ON, this, CLOSE);
302   return action;
303 }
304
305 StorageAction *StorageN11::read(surf_file_t fd, sg_size_t size)
306 {
307   if(fd->current_position + size > fd->size){
308     if (fd->current_position > fd->size){
309       size = 0;
310     } else {
311       size = fd->size - fd->current_position;
312     }
313     fd->current_position = fd->size;
314   }
315   else
316     fd->current_position += size;
317
318   StorageAction *action = new StorageN11Action(getModel(), size, getState() != SURF_RESOURCE_ON, this, READ);
319   return action;
320 }
321
322 StorageAction *StorageN11::write(surf_file_t fd, sg_size_t size)
323 {
324   char *filename = fd->name;
325   XBT_DEBUG("\tWrite file '%s' size '%llu/%llu'",filename,size,fd->size);
326
327   StorageAction *action = new StorageN11Action(getModel(), size, getState() != SURF_RESOURCE_ON, this, WRITE);
328   action->p_file = fd;
329   /* Substract the part of the file that might disappear from the used sized on
330    * the storage element */
331   m_usedSize -= (fd->size - fd->current_position);
332   // If the storage is full before even starting to write
333   if(m_usedSize==m_size) {
334     action->setState(SURF_ACTION_FAILED);
335   }
336   return action;
337 }
338
339 /**********
340  * Action *
341  **********/
342
343 StorageN11Action::StorageN11Action(Model *model, double cost, bool failed, Storage *storage, e_surf_action_storage_type_t type)
344   : StorageAction(model, cost, failed,
345                       lmm_variable_new(model->getMaxminSystem(), this, 1.0, -1.0 , 3),
346                       storage, type) {
347   XBT_IN("(%s,%g", storage->getName(), cost);
348
349   // Must be less than the max bandwidth for all actions
350   lmm_expand(model->getMaxminSystem(), storage->getConstraint(), getVariable(), 1.0);
351   switch(type) {
352   case OPEN:
353   case CLOSE:
354   case STAT:
355     break;
356   case READ:
357     lmm_expand(model->getMaxminSystem(), storage->p_constraintRead,
358                getVariable(), 1.0);
359     break;
360   case WRITE:
361     lmm_expand(model->getMaxminSystem(), storage->p_constraintWrite,
362                getVariable(), 1.0);
363
364 //TODO there is something annoying with what's below. Have to sort it out...
365 //    Action *action = this;
366 //    xbt_dynar_push(storage->p_writeActions, &action);
367 //    ref();
368     break;
369   }
370   XBT_OUT();
371 }
372
373 int StorageN11Action::unref()
374 {
375   m_refcount--;
376   if (!m_refcount) {
377         if (action_hook.is_linked())
378           p_stateSet->erase(p_stateSet->iterator_to(*this));
379     if (getVariable())
380       lmm_variable_free(getModel()->getMaxminSystem(), getVariable());
381     xbt_free(getCategory());
382     delete this;
383     return 1;
384   }
385   return 0;
386 }
387
388 void StorageN11Action::cancel()
389 {
390   setState(SURF_ACTION_FAILED);
391   return;
392 }
393
394 void StorageN11Action::suspend()
395 {
396   XBT_IN("(%p)", this);
397   if (m_suspended != 2) {
398     lmm_update_variable_weight(getModel()->getMaxminSystem(),
399                                getVariable(),
400                                0.0);
401     m_suspended = 1;
402   }
403   XBT_OUT();
404 }
405
406 void StorageN11Action::resume()
407 {
408   THROW_UNIMPLEMENTED;
409 }
410
411 bool StorageN11Action::isSuspended()
412 {
413   return m_suspended == 1;
414 }
415
416 void StorageN11Action::setMaxDuration(double /*duration*/)
417 {
418   THROW_UNIMPLEMENTED;
419 }
420
421 void StorageN11Action::setPriority(double /*priority*/)
422 {
423   THROW_UNIMPLEMENTED;
424 }
425