From 8f429af6d19d72c513cbed65a6b6797b16589ecb Mon Sep 17 00:00:00 2001 From: degomme Date: Mon, 15 Apr 2019 16:02:38 +0200 Subject: [PATCH] read, seek + init TODO: properly init plugin, only when needed. --- src/smpi/bindings/smpi_mpi.cpp | 4 +-- src/smpi/bindings/smpi_pmpi_file.cpp | 45 +++++++++++++++++++++++-- src/smpi/include/smpi_file.hpp | 4 ++- src/smpi/internals/instr_smpi.cpp | 4 ++- src/smpi/internals/smpi_global.cpp | 3 +- src/smpi/mpi/smpi_file.cpp | 49 +++++++++++++++++++++++++--- 6 files changed, 97 insertions(+), 12 deletions(-) diff --git a/src/smpi/bindings/smpi_mpi.cpp b/src/smpi/bindings/smpi_mpi.cpp index 0305b3958e..83386adba5 100644 --- a/src/smpi/bindings/smpi_mpi.cpp +++ b/src/smpi/bindings/smpi_mpi.cpp @@ -366,7 +366,7 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread_at,(MPI_File fh, MPI_Offset UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite_at,(MPI_File fh, MPI_Offset offset, void *buf,int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread_at_all,(MPI_File fh, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite_at_all,(MPI_File fh, MPI_Offset offset, void *buf,int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_read,(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) +WRAPPED_PMPI_CALL(int, MPI_File_read,(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_read_all,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_write,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_write_all,(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) @@ -374,7 +374,7 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread,(MPI_File fh, void *buf, int UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, buf, count, datatype, request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread_all,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, buf, count, datatype, request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite_all,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, buf, count, datatype, request)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_seek,(MPI_File fh, MPI_Offset offset, int whenace), (fh, offset, whenace)) +WRAPPED_PMPI_CALL(int, MPI_File_seek,(MPI_File fh, MPI_Offset offset, int whenace), (fh, offset, whenace)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_get_position,(MPI_File fh, MPI_Offset *offset), (fh, offset)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_get_byte_offset,(MPI_File fh, MPI_Offset offset, MPI_Offset *disp), (fh, offset, disp)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_read_shared,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) diff --git a/src/smpi/bindings/smpi_pmpi_file.cpp b/src/smpi/bindings/smpi_pmpi_file.cpp index 8af99f1d3b..5c14cdf5cb 100644 --- a/src/smpi/bindings/smpi_pmpi_file.cpp +++ b/src/smpi/bindings/smpi_pmpi_file.cpp @@ -5,6 +5,7 @@ #include "private.hpp" #include "smpi_file.hpp" +#include "smpi_datatype.hpp" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(smpi_pmpi); @@ -19,10 +20,13 @@ int PMPI_File_open(MPI_Comm comm, char *filename, int amode, MPI_Info info, MPI_ smpi_bench_end(); *fh = new simgrid::smpi::File(comm, filename, amode, info); smpi_bench_begin(); - if((*fh)->size()==0 && not amode & MPI_MODE_CREATE){ + if (((*fh)->size() == 0 && (not amode & MPI_MODE_CREATE)) || + ((*fh)->size() != 0 && (amode & MPI_MODE_EXCL))){ delete fh; return MPI_ERR_AMODE; } + if(amode & MPI_MODE_APPEND) + (*fh)->seek(0,MPI_SEEK_END); return MPI_SUCCESS; } } @@ -39,6 +43,42 @@ int PMPI_File_close(MPI_File *fh){ } } +int PMPI_File_seek(MPI_File fh, MPI_Offset offset, int whence){ + if (fh==MPI_FILE_NULL){ + return MPI_ERR_FILE; + } else { + smpi_bench_end(); + int ret = fh->seek(offset,whence); + smpi_bench_begin(); + return ret; + } +} + +int PMPI_File_read(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status){ + if (fh==MPI_FILE_NULL){ + return MPI_ERR_FILE; + } else if (buf==nullptr && count > 0){ + return MPI_ERR_BUFFER; + } else if ( count < 0){ + return MPI_ERR_COUNT; + } else if ( datatype == MPI_DATATYPE_NULL && count > 0){ + return MPI_ERR_TYPE; + } else if (status == nullptr){ + return MPI_ERR_ARG; + } else if (fh->flags() & MPI_MODE_SEQUENTIAL){ + return MPI_ERR_AMODE; + } else { + smpi_bench_end(); + int rank_traced = simgrid::s4u::this_actor::get_pid(); + TRACE_smpi_comm_in(rank_traced, __func__, new simgrid::instr::CpuTIData("IO - read", static_cast(count*datatype->size()))); + int ret = fh->read(buf, count, datatype, status); + TRACE_smpi_comm_out(rank_traced); + smpi_bench_begin(); + return ret; + } +} + + int PMPI_File_delete(char *filename, MPI_Info info){ if (filename == nullptr) { return MPI_ERR_FILE; @@ -48,4 +88,5 @@ int PMPI_File_delete(char *filename, MPI_Info info){ smpi_bench_begin(); return ret; } -} \ No newline at end of file +} + diff --git a/src/smpi/include/smpi_file.hpp b/src/smpi/include/smpi_file.hpp index 923674705f..917207130f 100644 --- a/src/smpi/include/smpi_file.hpp +++ b/src/smpi/include/smpi_file.hpp @@ -22,9 +22,11 @@ class File{ int size(); int flags(); int sync(); + int seek(MPI_Offset offset, int whence); + int read(void *buf, int count,MPI_Datatype datatype, MPI_Status *status); static int close(MPI_File *fh); static int del(char *filename, MPI_Info info); }; } } -#endif \ No newline at end of file +#endif diff --git a/src/smpi/internals/instr_smpi.cpp b/src/smpi/internals/instr_smpi.cpp index 4d3ab5a2b4..50112e0ede 100644 --- a/src/smpi/internals/instr_smpi.cpp +++ b/src/smpi/internals/instr_smpi.cpp @@ -74,7 +74,9 @@ static std::map smpi_colors = {{"recv", "1 0 0"}, {"win_flush", "1 0 0.3"}, {"win_flush_local", "1 0 0.8"}, {"win_flush_all", "1 0.8 0"}, - {"win_flush_local_all", "1 0 0.3"} + {"win_flush_local_all", "1 0 0.3"}, + + {"file_read", "1 1 0.3"} }; static const char* instr_find_color(const char* c_state) diff --git a/src/smpi/internals/smpi_global.cpp b/src/smpi/internals/smpi_global.cpp index 9ad0c6d881..5f0e9887db 100644 --- a/src/smpi/internals/smpi_global.cpp +++ b/src/smpi/internals/smpi_global.cpp @@ -5,6 +5,7 @@ #include "mc/mc.h" #include "simgrid/s4u/Engine.hpp" +#include "simgrid/plugins/file_system.h" #include "smpi_coll.hpp" #include "smpi_f2c.hpp" #include "smpi_host.hpp" @@ -663,7 +664,7 @@ int smpi_main(const char* executable, int argc, char* argv[]) SIMIX_global_init(&argc, argv); SMPI_switch_data_segment = &smpi_switch_data_segment; - + sg_storage_file_system_init(); // parse the platform file: get the host list simgrid::s4u::Engine::get_instance()->load_platform(argv[1]); SIMIX_comm_set_copy_data_callback(smpi_comm_copy_buffer_callback); diff --git a/src/smpi/mpi/smpi_file.cpp b/src/smpi/mpi/smpi_file.cpp index 4cfbe05a87..35e36d4e4c 100644 --- a/src/smpi/mpi/smpi_file.cpp +++ b/src/smpi/mpi/smpi_file.cpp @@ -6,6 +6,7 @@ #include "smpi_comm.hpp" #include "smpi_coll.hpp" +#include "smpi_datatype.hpp" #include "smpi_info.hpp" #include "smpi_file.hpp" #include "simgrid/plugins/file_system.h" @@ -23,31 +24,69 @@ namespace smpi{ File::~File(){ delete file_; } - + int File::close(MPI_File *fh){ + XBT_DEBUG("Closing MPI_File %s", (*fh)->file_->get_path()); (*fh)->sync(); if((*fh)->flags() & MPI_MODE_DELETE_ON_CLOSE) (*fh)->file_->unlink(); - delete fh; + delete (*fh); return MPI_SUCCESS; } - + int File::del(char *filename, MPI_Info info){ + //get the file with MPI_MODE_DELETE_ON_CLOSE and then close it File* f = new File(MPI_COMM_SELF,filename,MPI_MODE_DELETE_ON_CLOSE|MPI_MODE_RDWR, nullptr); close(&f); return MPI_SUCCESS; } + + int File::seek(MPI_Offset offset, int whence){ + switch(whence){ + case(MPI_SEEK_SET): + XBT_DEBUG("Seeking in MPI_File %s, setting offset %lld", file_->get_path(), offset); + file_->seek(offset,SEEK_SET); + break; + case(MPI_SEEK_CUR): + XBT_DEBUG("Seeking in MPI_File %s, current offset + %lld", file_->get_path(), offset); + file_->seek(offset,SEEK_CUR); + break; + case(MPI_SEEK_END): + XBT_DEBUG("Seeking in MPI_File %s, end offset + %lld", file_->get_path(), offset); + file_->seek(offset,SEEK_END); + break; + default: + return MPI_ERR_FILE; + } + return MPI_SUCCESS; + } + int File::read(void *buf, int count, MPI_Datatype datatype, MPI_Status *status){ + //get position first as we may be doing non contiguous reads and it will probably be updated badly + MPI_Offset position = file_->tell(); + MPI_Offset movesize = datatype->get_extent()*count; + MPI_Offset readsize = datatype->size()*count; + XBT_DEBUG("Position before read in MPI_File %s : %llu",file_->get_path(),file_->tell()); + MPI_Offset read = file_->read(readsize); + XBT_DEBUG("Read in MPI_File %s, %lld bytes read, readsize %lld bytes, movesize %lld", file_->get_path(), read, readsize, movesize); + if(readsize!=movesize){ + file_->seek(position+movesize, SEEK_SET); + } + XBT_DEBUG("Position after read in MPI_File %s : %llu",file_->get_path(), file_->tell()); + return MPI_SUCCESS; + } + int File::size(){ return file_->size(); } - + int File::flags(){ return flags_; } + int File::sync(){ //no idea return simgrid::smpi::Colls::barrier(comm_); } } -} \ No newline at end of file +} -- 2.20.1