From 745934a3a7f8924055f3ecdb3491884dd60d194a Mon Sep 17 00:00:00 2001 From: Frederic Suter Date: Thu, 30 Nov 2017 23:01:23 +0100 Subject: [PATCH] the plugin is now complete \o/ --- ChangeLog | 2 +- include/simgrid/msg.h | 3 - include/simgrid/plugins/file_system.h | 4 + src/msg/msg_io.cpp | 93 ---------------------- src/plugins/file_system/s4u_FileSystem.cpp | 58 +++++++++++++- 5 files changed, 60 insertions(+), 100 deletions(-) diff --git a/ChangeLog b/ChangeLog index b6d0e20465..22c53c2315 100644 --- a/ChangeLog +++ b/ChangeLog @@ -25,7 +25,7 @@ SimGrid (3.18) NOT RELEASED YET (target: December 24 2017) PLUGINS: - New link_energy plugin for the consumption of the links. - - Most of the operations on files and storage contents have been + - All of the operations on files and storage contents have been packaged into a plugin (src/plugins/file_system). The current public interface can be found in include/simgrid/plugins/file_system.h diff --git a/include/simgrid/msg.h b/include/simgrid/msg.h index e84fc8d911..068e469e54 100644 --- a/include/simgrid/msg.h +++ b/include/simgrid/msg.h @@ -216,9 +216,6 @@ XBT_ATTRIB_DEPRECATED_v319("Use MSG_zone_get_hosts() instead: v3.19 will remove return res; } -/************************** File handling ***********************************/ -XBT_PUBLIC(sg_size_t) MSG_file_read(msg_file_t fd, sg_size_t size); -XBT_PUBLIC(sg_size_t) MSG_file_write(msg_file_t fd, sg_size_t size); /************************** Storage handling ***********************************/ XBT_PUBLIC(const char *) MSG_storage_get_name(msg_storage_t storage); XBT_PUBLIC(msg_storage_t) MSG_storage_get_by_name(const char *name); diff --git a/include/simgrid/plugins/file_system.h b/include/simgrid/plugins/file_system.h index 2430be6cd4..4d7390dec4 100644 --- a/include/simgrid/plugins/file_system.h +++ b/include/simgrid/plugins/file_system.h @@ -14,6 +14,8 @@ SG_BEGIN_DECL() XBT_PUBLIC(void) sg_storage_file_system_init(); XBT_PUBLIC(sg_file_t) sg_file_open(const char* fullpath, void* data); +XBT_PUBLIC(sg_size_t) sg_file_read(sg_file_t fd, sg_size_t size); +XBT_PUBLIC(sg_size_t) sg_file_write(sg_file_t fd, sg_size_t size); XBT_PUBLIC(void) sg_file_close(sg_file_t fd); XBT_PUBLIC(const char*) sg_file_get_name(sg_file_t fd); @@ -38,6 +40,8 @@ XBT_PUBLIC(xbt_dict_t) sg_storage_get_content(sg_storage_t storage); XBT_PUBLIC(xbt_dict_t) sg_host_get_storage_content(sg_host_t host); #define MSG_file_open(fullpath, data) sg_file_open(fullpath, data) +#define MSG_file_read(fd, size) sg_file_read(fd, size) +#define MSG_file_write(fd, size) sg_file_write(fd, size) #define MSG_file_close(fd) sg_file_close(fd) #define MSG_file_get_name(fd) sg_file_get_name(fd) #define MSG_file_get_size(fd) sg_file_get_size(fd) diff --git a/src/msg/msg_io.cpp b/src/msg/msg_io.cpp index 8a8e8fba30..aff1939d78 100644 --- a/src/msg/msg_io.cpp +++ b/src/msg/msg_io.cpp @@ -13,99 +13,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_io, msg, "Logging specific to MSG (io)"); extern "C" { -/** @addtogroup msg_file - * (#msg_file_t) and the functions for managing it. - * - * \see #msg_file_t - */ - -/** \ingroup msg_file - * \brief Read a file (local or remote) - * - * \param size of the file to read - * \param fd is a the file descriptor - * \return the number of bytes successfully read or -1 if an error occurred - */ -sg_size_t MSG_file_read(msg_file_t fd, sg_size_t size) -{ - sg_size_t read_size; - - if (fd->size() == 0) /* Nothing to read, return */ - return 0; - - /* Find the host where the file is physically located and read it */ - msg_storage_t storage_src = fd->localStorage; - msg_host_t attached_host = storage_src->getHost(); - read_size = fd->read(size); - - if (strcmp(attached_host->getCname(), MSG_host_self()->getCname())) { - /* the file is hosted on a remote host, initiate a communication between src and dest hosts for data transfer */ - XBT_DEBUG("File is on %s remote host, initiate data transfer of %llu bytes.", attached_host->getCname(), read_size); - msg_host_t m_host_list[] = {MSG_host_self(), attached_host}; - double flops_amount[] = {0, 0}; - double bytes_amount[] = {0, 0, static_cast(read_size), 0}; - - msg_task_t task = MSG_parallel_task_create("file transfer for read", 2, m_host_list, flops_amount, bytes_amount, - nullptr); - msg_error_t transfer = MSG_parallel_task_execute(task); - MSG_task_destroy(task); - - if(transfer != MSG_OK){ - if (transfer == MSG_HOST_FAILURE) - XBT_WARN("Transfer error, %s remote host just turned off!", attached_host->getCname()); - if (transfer == MSG_TASK_CANCELED) - XBT_WARN("Transfer error, task has been canceled!"); - - return -1; - } - } - return read_size; -} - -/** \ingroup msg_file - * \brief Write into a file (local or remote) - * - * \param size of the file to write - * \param fd is a the file descriptor - * \return the number of bytes successfully write or -1 if an error occurred - */ -sg_size_t MSG_file_write(msg_file_t fd, sg_size_t size) -{ - if (size == 0) /* Nothing to write, return */ - return 0; - - /* Find the host where the file is physically located (remote or local)*/ - msg_storage_t storage_src = fd->localStorage; - msg_host_t attached_host = storage_src->getHost(); - - if (strcmp(attached_host->getCname(), MSG_host_self()->getCname())) { - /* the file is hosted on a remote host, initiate a communication between src and dest hosts for data transfer */ - XBT_DEBUG("File is on %s remote host, initiate data transfer of %llu bytes.", attached_host->getCname(), size); - msg_host_t m_host_list[] = {MSG_host_self(), attached_host}; - double flops_amount[] = {0, 0}; - double bytes_amount[] = {0, static_cast(size), 0, 0}; - - msg_task_t task = MSG_parallel_task_create("file transfer for write", 2, m_host_list, flops_amount, bytes_amount, - nullptr); - msg_error_t transfer = MSG_parallel_task_execute(task); - MSG_task_destroy(task); - - if(transfer != MSG_OK){ - if (transfer == MSG_HOST_FAILURE) - XBT_WARN("Transfer error, %s remote host just turned off!", attached_host->getCname()); - if (transfer == MSG_TASK_CANCELED) - XBT_WARN("Transfer error, task has been canceled!"); - - return -1; - } - } - /* Write file on local or remote host */ - sg_size_t write_size = fd->write(size); - - return write_size; -} - - /********************************* Storage **************************************/ /** @addtogroup msg_storage_management * (#msg_storage_t) and the functions for managing it. diff --git a/src/plugins/file_system/s4u_FileSystem.cpp b/src/plugins/file_system/s4u_FileSystem.cpp index 8f71c8bf4e..2fc34265c9 100644 --- a/src/plugins/file_system/s4u_FileSystem.cpp +++ b/src/plugins/file_system/s4u_FileSystem.cpp @@ -96,16 +96,53 @@ void File::dump() sg_size_t File::read(sg_size_t size) { + if (size_ == 0) /* Nothing to read, return */ + return 0; + /* Find the host where the file is physically located and read it */ + Host* host = localStorage->getHost(); XBT_DEBUG("READ %s on disk '%s'", getPath(), localStorage->getCname()); // if the current position is close to the end of the file, we may not be able to read the requested size sg_size_t read_size = localStorage->read(std::min(size, size_ - current_position_)); current_position_ += read_size; + + if (strcmp(host->getCname(), Host::current()->getCname())) { + /* the file is hosted on a remote host, initiate a communication between src and dest hosts for data transfer */ + XBT_DEBUG("File is on %s remote host, initiate data transfer of %llu bytes.", host->getCname(), read_size); + Host* m_host_list[] = {Host::current(), host}; + double* flops_amount = new double[2]{0, 0}; + double* bytes_amount = new double[4]{0, 0, static_cast(read_size), 0}; + + this_actor::parallel_execute(2, m_host_list, flops_amount, bytes_amount); + } + return read_size; } +/** \brief Write into a file (local or remote) + * + * \param size of the file to write + * \param fd is a the file descriptor + * \return the number of bytes successfully write or -1 if an error occurred + */ sg_size_t File::write(sg_size_t size) { + if (size == 0) /* Nothing to write, return */ + return 0; + + /* Find the host where the file is physically located (remote or local)*/ + Host* host = localStorage->getHost(); + + if (strcmp(host->getCname(), Host::current()->getCname())) { + /* the file is hosted on a remote host, initiate a communication between src and dest hosts for data transfer */ + XBT_DEBUG("File is on %s remote host, initiate data transfer of %llu bytes.", host->getCname(), size); + Host* m_host_list[] = {Host::current(), host}; + double* flops_amount = new double[2]{0, 0}; + double* bytes_amount = new double[4]{0, static_cast(size), 0, 0}; + + this_actor::parallel_execute(2, m_host_list, flops_amount, bytes_amount); + } + XBT_DEBUG("WRITE %s on disk '%s'. size '%llu/%llu'", getPath(), localStorage->getCname(), size, size_); // If the storage is full before even starting to write if (sg_storage_get_size_used(localStorage) >= sg_storage_get_size(localStorage)) @@ -203,7 +240,10 @@ int File::remoteCopy(sg_host_t host, const char* fullpath) Storage* storage_src = localStorage; Host* src_host = storage_src->getHost(); seek(0, SEEK_SET); - sg_size_t read_size = read(size_); + XBT_DEBUG("READ %s on disk '%s'", getPath(), localStorage->getCname()); + // if the current position is close to the end of the file, we may not be able to read the requested size + sg_size_t read_size = localStorage->read(size_); + current_position_ += read_size; /* Find the host that owns the storage where the file has to be copied */ Storage* storage_dest = nullptr; @@ -229,7 +269,7 @@ int File::remoteCopy(sg_host_t host, const char* fullpath) XBT_DEBUG("Initiate data transfer of %llu bytes between %s and %s.", read_size, src_host->getCname(), storage_dest->getHost()->getCname()); - sg_host_t m_host_list[] = {src_host, dst_host}; + Host* m_host_list[] = {src_host, dst_host}; double* flops_amount = new double[2]{0, 0}; double* bytes_amount = new double[4]{0, static_cast(read_size), 0, 0}; @@ -237,7 +277,9 @@ int File::remoteCopy(sg_host_t host, const char* fullpath) /* Create file on remote host, write it and close it */ File* fd = new File(fullpath, dst_host, nullptr); - fd->write(read_size); + sg_size_t write_size = fd->localStorage->write(read_size); + fd->localStorage->extension()->incrUsedSize(write_size); + (*(fd->localStorage->extension()->getContent()))[path_] = size_; delete fd; return 0; } @@ -329,6 +371,16 @@ sg_file_t sg_file_open(const char* fullpath, void* data) return new simgrid::s4u::File(fullpath, data); } +sg_size_t sg_file_read(sg_file_t fd, sg_size_t size) +{ + return fd->read(size); +} + +sg_size_t sg_file_write(sg_file_t fd, sg_size_t size) +{ + return fd->write(size); +} + void sg_file_close(sg_file_t fd) { delete fd; -- 2.20.1