Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Restructure surf++ storage
[simgrid.git] / src / surf / storage_n11.cpp
1 #include "storage_n11.hpp"
2 #include "surf_private.h"
3
4 #define __STDC_FORMAT_MACROS
5
6 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(surf_storage);
7
8
9 static int storage_selective_update = 0;
10 static xbt_swag_t storage_running_action_set_that_does_not_need_being_checked = NULL;
11
12 /*************
13  * CallBacks *
14  *************/
15
16 static XBT_INLINE void routing_storage_type_free(void *r)
17 {
18   storage_type_t stype = (storage_type_t) r;
19   free(stype->model);
20   free(stype->type_id);
21   free(stype->content);
22   free(stype->content_type);
23   xbt_dict_free(&(stype->properties));
24   xbt_dict_free(&(stype->properties));
25   free(stype);
26 }
27
28 static XBT_INLINE void surf_storage_resource_free(void *r)
29 {
30   // specific to storage
31   StoragePtr storage = dynamic_cast<StoragePtr>(static_cast<ResourcePtr>(r));
32   xbt_dict_free(&storage->p_content);
33   xbt_dynar_free(&storage->p_writeActions);
34   free(storage->p_typeId);
35   free(storage->p_contentType);
36   // generic resource
37   delete storage;
38 }
39
40 static XBT_INLINE void routing_storage_host_free(void *r)
41 {
42   xbt_dynar_t dyn = (xbt_dynar_t) r;
43   xbt_dynar_free(&dyn);
44 }
45
46 static void parse_storage_init(sg_platf_storage_cbarg_t storage)
47 {
48   void* stype = xbt_lib_get_or_null(storage_type_lib,
49                                     storage->type_id,
50                                     ROUTING_STORAGE_TYPE_LEVEL);
51   if(!stype) xbt_die("No storage type '%s'",storage->type_id);
52
53   // if storage content is not specified use the content of storage_type if exist
54   if(!strcmp(storage->content,"") && strcmp(((storage_type_t) stype)->content,"")){
55     storage->content = ((storage_type_t) stype)->content;
56     storage->content_type = ((storage_type_t) stype)->content_type;
57     XBT_DEBUG("For disk '%s' content is empty, inherit the content (of type %s) from storage type '%s' ",
58         storage->id,((storage_type_t) stype)->content_type,
59         ((storage_type_t) stype)->type_id);
60   }
61
62   XBT_DEBUG("SURF storage create resource\n\t\tid '%s'\n\t\ttype '%s' "
63       "\n\t\tmodel '%s' \n\t\tcontent '%s'\n\t\tcontent_type '%s' "
64       "\n\t\tproperties '%p'\n",
65       storage->id,
66       ((storage_type_t) stype)->model,
67       ((storage_type_t) stype)->type_id,
68       storage->content,
69       storage->content_type,
70       ((storage_type_t) stype)->properties);
71
72   surf_storage_model->createResource(storage->id,
73                                      ((storage_type_t) stype)->type_id,
74                                      storage->content,
75                                      storage->content_type,
76                                      storage->properties);
77 }
78
79 static void parse_mstorage_init(sg_platf_mstorage_cbarg_t /*mstorage*/)
80 {
81   XBT_DEBUG("parse_mstorage_init");
82 }
83
84 static void parse_storage_type_init(sg_platf_storage_type_cbarg_t /*storagetype_*/)
85 {
86   XBT_DEBUG("parse_storage_type_init");
87 }
88
89 static void parse_mount_init(sg_platf_mount_cbarg_t /*mount*/)
90 {
91   XBT_DEBUG("parse_mount_init");
92 }
93
94 static void storage_parse_storage(sg_platf_storage_cbarg_t storage)
95 {
96   xbt_assert(!xbt_lib_get_or_null(storage_lib, storage->id,ROUTING_STORAGE_LEVEL),
97                "Reading a storage, processing unit \"%s\" already exists", storage->id);
98
99   // Verification of an existing type_id
100 #ifndef NDEBUG
101   void* storage_type = xbt_lib_get_or_null(storage_type_lib, storage->type_id,ROUTING_STORAGE_TYPE_LEVEL);
102 #endif
103   xbt_assert(storage_type,"Reading a storage, type id \"%s\" does not exists", storage->type_id);
104
105   XBT_DEBUG("ROUTING Create a storage name '%s' with type_id '%s' and content '%s'",
106       storage->id,
107       storage->type_id,
108       storage->content);
109
110   xbt_lib_set(storage_lib,
111       storage->id,
112       ROUTING_STORAGE_LEVEL,
113       (void *) xbt_strdup(storage->type_id));
114 }
115
116 static void storage_parse_storage_type(sg_platf_storage_type_cbarg_t storage_type)
117 {
118   xbt_assert(!xbt_lib_get_or_null(storage_type_lib, storage_type->id,ROUTING_STORAGE_TYPE_LEVEL),
119                "Reading a storage type, processing unit \"%s\" already exists", storage_type->id);
120
121   storage_type_t stype = xbt_new0(s_storage_type_t, 1);
122   stype->model = xbt_strdup(storage_type->model);
123   stype->properties = storage_type->properties;
124   stype->content = xbt_strdup(storage_type->content);
125   stype->content_type = xbt_strdup(storage_type->content_type);
126   stype->type_id = xbt_strdup(storage_type->id);
127   stype->size = storage_type->size;
128
129   XBT_DEBUG("ROUTING Create a storage type id '%s' with model '%s', "
130       "content '%s', and content_type '%s'",
131       stype->type_id,
132       stype->model,
133       storage_type->content,
134       storage_type->content_type);
135
136   xbt_lib_set(storage_type_lib,
137       stype->type_id,
138       ROUTING_STORAGE_TYPE_LEVEL,
139       (void *) stype);
140 }
141
142 static void storage_parse_mstorage(sg_platf_mstorage_cbarg_t /*mstorage*/)
143 {
144   THROW_UNIMPLEMENTED;
145 //  mount_t mnt = xbt_new0(s_mount_t, 1);
146 //  mnt->id = xbt_strdup(mstorage->type_id);
147 //  mnt->name = xbt_strdup(mstorage->name);
148 //
149 //  if(!mount_list){
150 //    XBT_DEBUG("Creata a Mount list for %s",A_surfxml_host_id);
151 //    mount_list = xbt_dynar_new(sizeof(char *), NULL);
152 //  }
153 //  xbt_dynar_push(mount_list,(void *) mnt);
154 //  free(mnt->id);
155 //  free(mnt->name);
156 //  xbt_free(mnt);
157 //  XBT_DEBUG("ROUTING Mount a storage name '%s' with type_id '%s'",mstorage->name, mstorage->id);
158 }
159
160 static void mount_free(void *p)
161 {
162   mount_t mnt = (mount_t) p;
163   xbt_free(mnt->name);
164 }
165
166 static void storage_parse_mount(sg_platf_mount_cbarg_t mount)
167 {
168   // Verification of an existing storage
169 #ifndef NDEBUG
170   void* storage = xbt_lib_get_or_null(storage_lib, mount->storageId, ROUTING_STORAGE_LEVEL);
171 #endif
172   xbt_assert(storage,"Disk id \"%s\" does not exists", mount->storageId);
173
174   XBT_DEBUG("ROUTING Mount '%s' on '%s'",mount->storageId, mount->name);
175
176   s_mount_t mnt;
177   mnt.storage = surf_storage_resource_priv(surf_storage_resource_by_name(mount->storageId));
178   mnt.name = xbt_strdup(mount->name);
179
180   if(!mount_list){
181     XBT_DEBUG("Create a Mount list for %s",A_surfxml_host_id);
182     mount_list = xbt_dynar_new(sizeof(s_mount_t), mount_free);
183   }
184   xbt_dynar_push(mount_list, &mnt);
185 }
186
187 static void storage_define_callbacks()
188 {
189   sg_platf_storage_add_cb(parse_storage_init);
190   sg_platf_storage_type_add_cb(parse_storage_type_init);
191   sg_platf_mstorage_add_cb(parse_mstorage_init);
192   sg_platf_mount_add_cb(parse_mount_init);
193 }
194
195 void storage_register_callbacks() {
196
197   ROUTING_STORAGE_LEVEL = xbt_lib_add_level(storage_lib,xbt_free);
198   ROUTING_STORAGE_HOST_LEVEL = xbt_lib_add_level(storage_lib, routing_storage_host_free);
199   ROUTING_STORAGE_TYPE_LEVEL = xbt_lib_add_level(storage_type_lib, routing_storage_type_free);
200   SURF_STORAGE_LEVEL = xbt_lib_add_level(storage_lib, surf_storage_resource_free);
201
202   sg_platf_storage_add_cb(storage_parse_storage);
203   sg_platf_mstorage_add_cb(storage_parse_mstorage);
204   sg_platf_storage_type_add_cb(storage_parse_storage_type);
205   sg_platf_mount_add_cb(storage_parse_mount);
206 }
207
208 /*********
209  * Model *
210  *********/
211
212 void surf_storage_model_init_default(void)
213 {
214   surf_storage_model = new StorageN11Model();
215   storage_define_callbacks();
216   xbt_dynar_push(model_list, &surf_storage_model);
217 }
218
219 StorageN11Model::StorageN11Model() : StorageModel() {
220   StorageN11ActionLmm action;
221
222   XBT_DEBUG("surf_storage_model_init_internal");
223
224   storage_running_action_set_that_does_not_need_being_checked =
225       xbt_swag_new(xbt_swag_offset(action, p_stateHookup));
226
227   if (!p_maxminSystem) {
228     p_maxminSystem = lmm_system_new(storage_selective_update);
229   }
230 }
231
232 StorageN11Model::~StorageN11Model(){
233   xbt_swag_free(storage_running_action_set_that_does_not_need_being_checked);
234   storage_running_action_set_that_does_not_need_being_checked = NULL;
235 }
236
237 StoragePtr StorageN11Model::createResource(const char* id, const char* type_id,
238                 const char* content_name, const char* content_type, xbt_dict_t properties)
239 {
240
241   xbt_assert(!surf_storage_resource_priv(surf_storage_resource_by_name(id)),
242               "Storage '%s' declared several times in the platform file",
243               id);
244
245   storage_type_t storage_type = (storage_type_t) xbt_lib_get_or_null(storage_type_lib, type_id,ROUTING_STORAGE_TYPE_LEVEL);
246
247   double Bread  = surf_parse_get_bandwidth((char*)xbt_dict_get(storage_type->properties, "Bread"));
248   double Bwrite = surf_parse_get_bandwidth((char*)xbt_dict_get(storage_type->properties, "Bwrite"));
249   double Bconnection   = surf_parse_get_bandwidth((char*)xbt_dict_get(storage_type->properties, "Bconnection"));
250
251   StoragePtr storage = new StorageN11Lmm(this, id, properties, p_maxminSystem,
252                   Bread, Bwrite, Bconnection,
253                   type_id, (char *)content_name, xbt_strdup(content_type), storage_type->size);
254
255   xbt_lib_set(storage_lib, id, SURF_STORAGE_LEVEL, static_cast<ResourcePtr>(storage));
256
257   XBT_DEBUG("SURF storage create resource\n\t\tid '%s'\n\t\ttype '%s' \n\t\tmodel '%s' \n\t\tproperties '%p'\n\t\tBread '%f'\n",
258       id,
259       this,
260       type_id,
261       storage_type->properties,
262       Bread);
263
264   if(!p_storageList)
265         p_storageList = xbt_dynar_new(sizeof(char *),NULL);
266   xbt_dynar_push(p_storageList, &storage);
267
268   return storage;
269 }
270
271 double StorageN11Model::shareResources(double now)
272 {
273   XBT_DEBUG("storage_share_resources %f", now);
274   unsigned int i, j;
275   StoragePtr storage;
276   void *_write_action;
277   StorageActionLmmPtr write_action;
278
279   double min_completion = shareResourcesMaxMin(p_runningActionSet,
280       p_maxminSystem, lmm_solve);
281
282   double rate;
283   // Foreach disk
284   xbt_dynar_foreach(p_storageList,i,storage)
285   {
286     rate = 0;
287     // Foreach write action on disk
288     xbt_dynar_foreach(storage->p_writeActions, j, _write_action)
289     {
290       write_action = dynamic_cast<StorageActionLmmPtr>(static_cast<ActionPtr>(_write_action));
291       rate += lmm_variable_getvalue(write_action->p_variable);
292     }
293     if(rate > 0)
294       min_completion = MIN(min_completion, (storage->m_size-storage->m_usedSize)/rate);
295   }
296
297   return min_completion;
298 }
299
300 void StorageN11Model::updateActionsState(double /*now*/, double delta)
301 {
302   void *_action, *_next_action;
303   StorageActionLmmPtr action = NULL;
304
305   xbt_swag_foreach_safe(_action, _next_action, p_runningActionSet) {
306         action = dynamic_cast<StorageActionLmmPtr>(static_cast<ActionPtr>(_action));
307     if(action->m_type == WRITE)
308     {
309       // Update the disk usage
310      // Update the file size
311      // For each action of type write
312       double rate = lmm_variable_getvalue(action->p_variable);
313       /* Hack to avoid rounding differences between x86 and x86_64
314        * (note that the next sizes are of type sg_size_t). */
315       long incr = delta * rate + MAXMIN_PRECISION;
316       action->p_storage->m_usedSize += (incr - action->p_file->size); // disk usage
317       action->p_file->size = incr; // file size
318
319       sg_size_t *psize = xbt_new(sg_size_t,1);
320       *psize = action->p_file->size;
321
322       xbt_dict_t content_dict = action->p_storage->p_content;
323       xbt_dict_set(content_dict, action->p_file->name, psize, NULL);
324     }
325
326     double_update(&action->m_remains,
327                   lmm_variable_getvalue(action->p_variable) * delta);
328
329     if (action->m_maxDuration != NO_MAX_DURATION)
330       double_update(&action->m_maxDuration, delta);
331
332     if(action->m_remains > 0 &&
333         lmm_get_variable_weight(action->p_variable) > 0 &&
334         action->p_storage->m_usedSize == action->p_storage->m_size)
335     {
336       action->m_finish = surf_get_clock();
337       action->setState(SURF_ACTION_FAILED);
338     } else if ((action->m_remains <= 0) &&
339         (lmm_get_variable_weight(action->p_variable) > 0))
340     {
341       action->m_finish = surf_get_clock();
342       action->setState(SURF_ACTION_DONE);
343     } else if ((action->m_maxDuration != NO_MAX_DURATION) &&
344                (action->m_maxDuration <= 0))
345     {
346       action->m_finish = surf_get_clock();
347       action->setState(SURF_ACTION_DONE);
348     }
349   }
350
351   return;
352 }
353
354 /************
355  * Resource *
356  ************/
357
358 StorageN11Lmm::StorageN11Lmm(StorageModelPtr model, const char* name, xbt_dict_t properties,
359              lmm_system_t maxminSystem, double bread, double bwrite, double bconnection,
360              const char* type_id, char *content_name, char *content_type, sg_size_t size)
361  :  Resource(model, name, properties),
362     StorageLmm(maxminSystem, bread, bwrite, bconnection, type_id, content_name, content_type, size) {
363   XBT_DEBUG("Create resource with Bconnection '%f' Bread '%f' Bwrite '%f' and Size '%llu'", bconnection, bread, bwrite, size);
364
365   p_stateCurrent = SURF_RESOURCE_ON;
366   m_usedSize = 0;
367   m_size = 0;
368
369   p_content = parseContent(content_name);
370   p_contentType = content_type;
371   p_constraint = lmm_constraint_new(maxminSystem, this, bconnection);
372   p_constraintRead  = lmm_constraint_new(maxminSystem, this, bread);
373   p_constraintWrite = lmm_constraint_new(maxminSystem, this, bwrite);
374   m_size = size;
375   p_typeId = xbt_strdup(type_id);
376 }
377
378 StorageActionPtr StorageN11Lmm::ls(const char* path)
379 {
380   StorageActionLmmPtr action = new StorageN11ActionLmm(p_model, 0, p_stateCurrent != SURF_RESOURCE_ON, this, LS);
381
382   action->p_lsDict = NULL;
383   xbt_dict_t ls_dict = xbt_dict_new_homogeneous(xbt_free);
384
385   char* key;
386   sg_size_t size = 0;
387   xbt_dict_cursor_t cursor = NULL;
388
389   xbt_dynar_t dyn = NULL;
390   char* file = NULL;
391
392   // for each file in the storage content
393   xbt_dict_foreach(p_content,cursor,key,size){
394     // Search if file start with the prefix 'path'
395     if(xbt_str_start_with(key,path)){
396       file = &key[strlen(path)];
397
398       // Split file with '/'
399       dyn = xbt_str_split(file,"/");
400       file = xbt_dynar_get_as(dyn,0,char*);
401
402       // file
403       if(xbt_dynar_length(dyn) == 1){
404         sg_size_t *psize = xbt_new(sg_size_t, 1);
405         *psize=size;
406         xbt_dict_set(ls_dict, file, psize, NULL);
407       }
408       // Directory
409       else
410       {
411         // if directory does not exist yet in the dictionary
412         if(!xbt_dict_get_or_null(ls_dict,file))
413           xbt_dict_set(ls_dict,file,NULL,NULL);
414       }
415       xbt_dynar_free(&dyn);
416     }
417   }
418
419   action->p_lsDict = ls_dict;
420   return action;
421 }
422
423 StorageActionPtr StorageN11Lmm::open(const char* mount, const char* path)
424 {
425   XBT_DEBUG("\tOpen file '%s'",path);
426   sg_size_t size, *psize;
427   psize = (sg_size_t*) xbt_dict_get_or_null(p_content, path);
428   // if file does not exist create an empty file
429   if(psize)
430     size = *psize;
431   else {
432         psize = xbt_new(sg_size_t,1);
433     size = 0;
434     *psize = size;
435     xbt_dict_set(p_content, path, psize, NULL);
436     XBT_DEBUG("File '%s' was not found, file created.",path);
437   }
438   surf_file_t file = xbt_new0(s_surf_file_t,1);
439   file->name = xbt_strdup(path);
440   file->size = size;
441   file->mount = xbt_strdup(mount);
442   file->current_position = 0;
443
444   StorageActionLmmPtr action = new StorageN11ActionLmm(p_model, 0, p_stateCurrent != SURF_RESOURCE_ON, this, OPEN);
445   action->p_file = file;
446   return action;
447 }
448
449 StorageActionPtr StorageN11Lmm::close(surf_file_t fd)
450 {
451   char *filename = fd->name;
452   XBT_DEBUG("\tClose file '%s' size '%llu'", filename, fd->size);
453   // unref write actions from storage
454   void *_write_action;
455   StorageActionLmmPtr write_action;
456   unsigned int i;
457   xbt_dynar_foreach(p_writeActions, i, _write_action) {
458         write_action = dynamic_cast<StorageActionLmmPtr>(static_cast<ActionPtr>(_write_action));
459     if ((write_action->p_file) == fd) {
460       xbt_dynar_cursor_rm(p_writeActions, &i);
461       write_action->unref();
462     }
463   }
464   free(fd->name);
465   free(fd->mount);
466   xbt_free(fd);
467   StorageActionLmmPtr action = new StorageN11ActionLmm(p_model, 0, p_stateCurrent != SURF_RESOURCE_ON, this, CLOSE);
468   return action;
469 }
470
471 StorageActionPtr StorageN11Lmm::read(surf_file_t fd, sg_size_t size)
472 {
473   if(size > fd->size){
474     size = fd->size;
475     fd->current_position = fd->size;
476   }
477   else
478         fd->current_position += size;
479
480   StorageActionLmmPtr action = new StorageN11ActionLmm(p_model, size, p_stateCurrent != SURF_RESOURCE_ON, this, READ);
481   return action;
482 }
483
484 StorageActionPtr StorageN11Lmm::write(surf_file_t fd, sg_size_t size)
485 {
486   char *filename = fd->name;
487   XBT_DEBUG("\tWrite file '%s' size '%llu/%llu'",filename,size,fd->size);
488
489   StorageActionLmmPtr action = new StorageN11ActionLmm(p_model, size, p_stateCurrent != SURF_RESOURCE_ON, this, WRITE);
490   action->p_file = fd;
491   fd->current_position += size;
492   // If the storage is full
493   if(m_usedSize==m_size) {
494     action->setState(SURF_ACTION_FAILED);
495   }
496   return action;
497 }
498
499 void StorageN11Lmm::rename(const char *src, const char *dest)
500 {
501   sg_size_t *psize, *new_psize;
502   psize = (sg_size_t*) xbt_dict_get_or_null(p_content,src);
503   new_psize = xbt_new(sg_size_t, 1);
504   *new_psize = *psize;
505   if (psize){// src file exists
506     xbt_dict_remove(p_content, src);
507     xbt_dict_set(p_content, dest, new_psize,NULL);
508     XBT_DEBUG("Change file name from %s to %s, size '%llu'",src, dest, *psize);
509   }
510   else
511     XBT_DEBUG("File %s doesn't exist",src);
512 }
513
514 xbt_dict_t StorageN11Lmm::getContent()
515 {
516   /* For the moment this action has no cost, but in the future we could take in account access latency of the disk */
517   /*surf_action_t action = storage_action_execute(storage,0, LS);*/
518
519   xbt_dict_t content_dict = xbt_dict_new_homogeneous(NULL);
520   xbt_dict_cursor_t cursor = NULL;
521   char *file;
522   sg_size_t *psize;
523
524   xbt_dict_foreach(p_content, cursor, file, psize){
525     xbt_dict_set(content_dict,file,psize,NULL);
526   }
527   return content_dict;
528 }
529
530 sg_size_t StorageN11Lmm::getSize(){
531   return m_size;
532 }
533
534 /**********
535  * Action *
536  **********/
537
538 StorageN11ActionLmm::StorageN11ActionLmm(ModelPtr model, double cost, bool failed, StorageLmmPtr storage, e_surf_action_storage_type_t type)
539   : Action(model, cost, failed),
540     StorageActionLmm(storage, type) {
541   XBT_IN("(%s,%g", storage->m_name, cost);
542   p_variable = lmm_variable_new(p_model->p_maxminSystem, this, 1.0, -1.0 , 3);
543
544   // Must be less than the max bandwidth for all actions
545   lmm_expand(p_model->p_maxminSystem, storage->p_constraint, p_variable, 1.0);
546   switch(type) {
547   case OPEN:
548   case CLOSE:
549   case STAT:
550   case LS:
551     break;
552   case READ:
553     lmm_expand(p_model->p_maxminSystem, storage->p_constraintRead,
554                p_variable, 1.0);
555     break;
556   case WRITE:
557     lmm_expand(p_model->p_maxminSystem, storage->p_constraintWrite,
558                p_variable, 1.0);
559     ActionPtr action = this;
560     xbt_dynar_push(storage->p_writeActions, &action);
561     ref();
562     break;
563   }
564   XBT_OUT();
565 }
566
567 int StorageN11ActionLmm::unref()
568 {
569   m_refcount--;
570   if (!m_refcount) {
571     xbt_swag_remove(static_cast<ActionPtr>(this), p_stateSet);
572     if (p_variable)
573       lmm_variable_free(p_model->p_maxminSystem, p_variable);
574 #ifdef HAVE_TRACING
575     xbt_free(p_category);
576 #endif
577     delete this;
578     return 1;
579   }
580   return 0;
581 }
582
583 void StorageN11ActionLmm::cancel()
584 {
585   setState(SURF_ACTION_FAILED);
586   return;
587 }
588
589 void StorageN11ActionLmm::suspend()
590 {
591   XBT_IN("(%p)", this);
592   if (m_suspended != 2) {
593     lmm_update_variable_weight(p_model->p_maxminSystem,
594                                p_variable,
595                                0.0);
596     m_suspended = 1;
597   }
598   XBT_OUT();
599 }
600
601 void StorageN11ActionLmm::resume()
602 {
603   THROW_UNIMPLEMENTED;
604 }
605
606 bool StorageN11ActionLmm::isSuspended()
607 {
608   return m_suspended == 1;
609 }
610
611 void StorageN11ActionLmm::setMaxDuration(double /*duration*/)
612 {
613   THROW_UNIMPLEMENTED;
614 }
615
616 void StorageN11ActionLmm::setPriority(double /*priority*/)
617 {
618   THROW_UNIMPLEMENTED;
619 }
620