X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/d8d9b58131bfeed28a5c1458ea2bee892121e3a6..b6f58f8266d8c00c00ccce419b7e08e8eb8006b9:/src/plugins/file_system/s4u_FileSystem.cpp diff --git a/src/plugins/file_system/s4u_FileSystem.cpp b/src/plugins/file_system/s4u_FileSystem.cpp index b5460bf847..bcb6879503 100644 --- a/src/plugins/file_system/s4u_FileSystem.cpp +++ b/src/plugins/file_system/s4u_FileSystem.cpp @@ -5,6 +5,7 @@ #include "xbt/log.h" +#include "simgrid/s4u/Actor.hpp" #include "simgrid/s4u/Host.hpp" #include "simgrid/s4u/Storage.hpp" #include "simgrid/simix.hpp" @@ -16,12 +17,15 @@ #include #include #include +#include XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_file, "S4U files"); +int sg_storage_max_file_descriptors = 1024; namespace simgrid { namespace s4u { -simgrid::xbt::Extension FileSystemStorageExt::EXTENSION_ID; +simgrid::xbt::Extension FileSystemStorageExt::EXTENSION_ID; +simgrid::xbt::Extension FileDescriptorHostExt::EXTENSION_ID; File::File(std::string fullpath, void* userdata) : File(fullpath, Host::current(), userdata){}; @@ -50,6 +54,16 @@ File::File(std::string fullpath, sg_host_t host, void* userdata) : fullpath_(ful localStorage = st; + // assign a file descriptor id to the newly opened File + FileDescriptorHostExt* ext = host->extension(); + if (ext->file_descriptor_table == nullptr) { + ext->file_descriptor_table = new std::vector(sg_storage_max_file_descriptors); + std::iota(ext->file_descriptor_table->rbegin(), ext->file_descriptor_table->rend(), 0); // Fill with ..., 1, 0. + } + xbt_assert(not ext->file_descriptor_table->empty(), "Too much files are opened! Some have to be closed."); + desc_id = ext->file_descriptor_table->back(); + ext->file_descriptor_table->pop_back(); + XBT_DEBUG("\tOpen file '%s'", path_.c_str()); std::map* content = localStorage->extension()->getContent(); // if file does not exist create an empty file @@ -63,6 +77,11 @@ File::File(std::string fullpath, sg_host_t host, void* userdata) : fullpath_(ful } } +File::~File() +{ + Host::current()->extension()->file_descriptor_table->push_back(desc_id); +} + void File::dump() { XBT_INFO("File Descriptor information:\n" @@ -77,15 +96,53 @@ void File::dump() sg_size_t File::read(sg_size_t size) { + if (size_ == 0) /* Nothing to read, return */ + return 0; + + /* Find the host where the file is physically located and read it */ + Host* host = localStorage->getHost(); XBT_DEBUG("READ %s on disk '%s'", getPath(), localStorage->getCname()); // if the current position is close to the end of the file, we may not be able to read the requested size sg_size_t read_size = localStorage->read(std::min(size, size_ - current_position_)); current_position_ += read_size; + + if (strcmp(host->getCname(), Host::current()->getCname())) { + /* the file is hosted on a remote host, initiate a communication between src and dest hosts for data transfer */ + XBT_DEBUG("File is on %s remote host, initiate data transfer of %llu bytes.", host->getCname(), read_size); + Host* m_host_list[] = {Host::current(), host}; + double* flops_amount = new double[2]{0, 0}; + double* bytes_amount = new double[4]{0, 0, static_cast(read_size), 0}; + + this_actor::parallel_execute(2, m_host_list, flops_amount, bytes_amount); + } + return read_size; } +/** \brief Write into a file (local or remote) + * + * \param size of the file to write + * \param fd is a the file descriptor + * \return the number of bytes successfully write or -1 if an error occurred + */ sg_size_t File::write(sg_size_t size) { + if (size == 0) /* Nothing to write, return */ + return 0; + + /* Find the host where the file is physically located (remote or local)*/ + Host* host = localStorage->getHost(); + + if (strcmp(host->getCname(), Host::current()->getCname())) { + /* the file is hosted on a remote host, initiate a communication between src and dest hosts for data transfer */ + XBT_DEBUG("File is on %s remote host, initiate data transfer of %llu bytes.", host->getCname(), size); + Host* m_host_list[] = {Host::current(), host}; + double* flops_amount = new double[2]{0, 0}; + double* bytes_amount = new double[4]{0, static_cast(size), 0, 0}; + + this_actor::parallel_execute(2, m_host_list, flops_amount, bytes_amount); + } + XBT_DEBUG("WRITE %s on disk '%s'. size '%llu/%llu'", getPath(), localStorage->getCname(), size, size_); // If the storage is full before even starting to write if (sg_storage_get_size_used(localStorage) >= sg_storage_get_size(localStorage)) @@ -177,6 +234,63 @@ int File::unlink() } } +int File::remoteCopy(sg_host_t host, const char* fullpath) +{ + /* Find the host where the file is physically located and read it */ + Storage* storage_src = localStorage; + Host* src_host = storage_src->getHost(); + seek(0, SEEK_SET); + XBT_DEBUG("READ %s on disk '%s'", getPath(), localStorage->getCname()); + // if the current position is close to the end of the file, we may not be able to read the requested size + sg_size_t read_size = localStorage->read(size_); + current_position_ += read_size; + + /* Find the host that owns the storage where the file has to be copied */ + Storage* storage_dest = nullptr; + Host* dst_host; + size_t longest_prefix_length = 0; + + for (auto const& elm : host->getMountedStorages()) { + std::string mount_point = std::string(fullpath).substr(0, elm.first.size()); + if (mount_point == elm.first && elm.first.length() > longest_prefix_length) { + /* The current mount name is found in the full path and is bigger than the previous*/ + longest_prefix_length = elm.first.length(); + storage_dest = elm.second; + } + } + + if (storage_dest != nullptr) { + /* Mount point found, retrieve the host the storage is attached to */ + dst_host = storage_dest->getHost(); + } else { + XBT_WARN("Can't find mount point for '%s' on destination host '%s'", fullpath, host->getCname()); + return -1; + } + + XBT_DEBUG("Initiate data transfer of %llu bytes between %s and %s.", read_size, src_host->getCname(), + storage_dest->getHost()->getCname()); + Host* m_host_list[] = {src_host, dst_host}; + double* flops_amount = new double[2]{0, 0}; + double* bytes_amount = new double[4]{0, static_cast(read_size), 0, 0}; + + this_actor::parallel_execute(2, m_host_list, flops_amount, bytes_amount); + + /* Create file on remote host, write it and close it */ + File* fd = new File(fullpath, dst_host, nullptr); + sg_size_t write_size = fd->localStorage->write(read_size); + fd->localStorage->extension()->incrUsedSize(write_size); + (*(fd->localStorage->extension()->getContent()))[path_] = size_; + delete fd; + return 0; +} + +int File::remoteMove(sg_host_t host, const char* fullpath) +{ + int res = remoteCopy(host, fullpath); + unlink(); + return res; +} + FileSystemStorageExt::FileSystemStorageExt(simgrid::s4u::Storage* ptr) { content_ = parseContent(ptr->getImpl()->content_name); @@ -218,6 +332,7 @@ std::map* FileSystemStorageExt::parseContent(std::string } using simgrid::s4u::FileSystemStorageExt; +using simgrid::s4u::FileDescriptorHostExt; static void onStorageCreation(simgrid::s4u::Storage& st) { @@ -229,18 +344,46 @@ static void onStorageDestruction(simgrid::s4u::Storage& st) delete st.extension(); } +static void onHostCreation(simgrid::s4u::Host& host) +{ + host.extension_set(new FileDescriptorHostExt()); +} + /* **************************** Public interface *************************** */ SG_BEGIN_DECL() void sg_storage_file_system_init() { - if (FileSystemStorageExt::EXTENSION_ID.valid()) - return; + if (not FileSystemStorageExt::EXTENSION_ID.valid()) { + FileSystemStorageExt::EXTENSION_ID = simgrid::s4u::Storage::extension_create(); + simgrid::s4u::Storage::onCreation.connect(&onStorageCreation); + simgrid::s4u::Storage::onDestruction.connect(&onStorageDestruction); + } + + if (not FileDescriptorHostExt::EXTENSION_ID.valid()) { + FileDescriptorHostExt::EXTENSION_ID = simgrid::s4u::Host::extension_create(); + simgrid::s4u::Host::onCreation.connect(&onHostCreation); + } +} + +sg_file_t sg_file_open(const char* fullpath, void* data) +{ + return new simgrid::s4u::File(fullpath, data); +} + +sg_size_t sg_file_read(sg_file_t fd, sg_size_t size) +{ + return fd->read(size); +} - FileSystemStorageExt::EXTENSION_ID = simgrid::s4u::Storage::extension_create(); +sg_size_t sg_file_write(sg_file_t fd, sg_size_t size) +{ + return fd->write(size); +} - simgrid::s4u::Storage::onCreation.connect(&onStorageCreation); - simgrid::s4u::Storage::onDestruction.connect(&onStorageDestruction); +void sg_file_close(sg_file_t fd) +{ + delete fd; } const char* sg_file_get_name(sg_file_t fd) @@ -300,6 +443,30 @@ void sg_file_unlink(sg_file_t fd) delete fd; } +/** + * \brief Copy a file to another location on a remote host. + * \param file : the file to move + * \param host : the remote host where the file has to be copied + * \param fullpath : the complete path destination on the remote host + * \return If successful, the function returns 0. Otherwise, it returns -1. + */ +int sg_file_rcopy(sg_file_t file, sg_host_t host, const char* fullpath) +{ + return file->remoteCopy(host, fullpath); +} + +/** + * \brief Move a file to another location on a remote host. + * \param file : the file to move + * \param host : the remote host where the file has to be moved + * \param fullpath : the complete path destination on the remote host + * \return If successful, the function returns 0. Otherwise, it returns -1. + */ +int sg_file_rmove(sg_file_t file, sg_host_t host, const char* fullpath) +{ + return file->remoteMove(host, fullpath); +} + sg_size_t sg_storage_get_size_free(sg_storage_t st) { return st->extension()->getSize() - st->extension()->getUsedSize();