X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/7cd237f9659b2df47fca65a4340ca7b5407f46a0..HEAD:/src/smpi/include/smpi_file.hpp diff --git a/src/smpi/include/smpi_file.hpp b/src/smpi/include/smpi_file.hpp index 3e38aa4c65..41c20d9c3d 100644 --- a/src/smpi/include/smpi_file.hpp +++ b/src/smpi/include/smpi_file.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2010-2019. The SimGrid Team. +/* Copyright (c) 2010-2023. The SimGrid Team. * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it @@ -14,11 +14,10 @@ #include "smpi_info.hpp" #include -XBT_LOG_EXTERNAL_CATEGORY(smpi_pmpi); +XBT_LOG_EXTERNAL_CATEGORY(smpi_io); -namespace simgrid{ -namespace smpi{ -class File{ +namespace simgrid::smpi { +class File : public F2C{ MPI_Comm comm_; int flags_; simgrid::s4u::File* file_; @@ -31,173 +30,181 @@ class File{ MPI_Datatype etype_; MPI_Datatype filetype_; std::string datarep_; + MPI_Offset disp_; + bool atomicity_; public: File(MPI_Comm comm, const char *filename, int amode, MPI_Info info); File(const File&) = delete; File& operator=(const File&) = delete; - ~File(); - int size(); - int get_position(MPI_Offset* offset); - int get_position_shared(MPI_Offset* offset); - int flags(); - MPI_Comm comm(); + ~File() override; + int size() const; + int get_position(MPI_Offset* offset) const; + int get_position_shared(MPI_Offset* offset) const; + int flags() const; + MPI_Datatype etype() const; + MPI_Comm comm() const; + std::string name() const override { return file_ ? "MPI_File: " + std::string(file_->get_path()) : "MPI_File"; } + int sync(); int seek(MPI_Offset offset, int whence); int seek_shared(MPI_Offset offset, int whence); int set_view(MPI_Offset disp, MPI_Datatype etype, MPI_Datatype filetype, const char* datarep, const Info* info); - int get_view(MPI_Offset *disp, MPI_Datatype *etype, MPI_Datatype *filetype, char *datarep); + int get_view(MPI_Offset* disp, MPI_Datatype* etype, MPI_Datatype* filetype, char* datarep) const; MPI_Info info(); void set_info( MPI_Info info); - static int read(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int read_shared(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int read_ordered(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int write(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int write_shared(MPI_File fh, const void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int write_ordered(MPI_File fh, const void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - template int op_all(void *buf, int count,MPI_Datatype datatype, MPI_Status *status); + void set_size(int size); + static int read(MPI_File fh, void* buf, int count, const Datatype* datatype, MPI_Status* status); + static int read_shared(MPI_File fh, void* buf, int count, const Datatype* datatype, MPI_Status* status); + static int read_ordered(MPI_File fh, void* buf, int count, const Datatype* datatype, MPI_Status* status); + static int write(MPI_File fh, void* buf, int count, const Datatype* datatype, MPI_Status* status); + static int write_shared(MPI_File fh, const void* buf, int count, const Datatype* datatype, MPI_Status* status); + static int write_ordered(MPI_File fh, const void* buf, int count, const Datatype* datatype, MPI_Status* status); + template + int op_all(void* buf, int count, const Datatype* datatype, MPI_Status* status); static int close(MPI_File *fh); static int del(const char* filename, const Info* info); MPI_Errhandler errhandler(); void set_errhandler( MPI_Errhandler errhandler); + void set_atomicity(bool a); + bool get_atomicity() const; + static File* f2c(int id); }; - /* Read_all, Write_all : loosely based on */ - /* @article{Thakur:1996:ETM:245875.245879,*/ - /* author = {Thakur, Rajeev and Choudhary, Alok},*/ - /* title = {An Extended Two-phase Method for Accessing Sections of Out-of-core Arrays},*/ - /* journal = {Sci. Program.},*/ - /* issue_date = {Winter 1996},*/ - /* pages = {301--317},*/ - /* }*/ - template - int File::op_all(void *buf, int count, MPI_Datatype datatype, MPI_Status *status){ - //get min and max offsets from everyone. - int size = comm_->size(); - int rank = comm_-> rank(); - MPI_Offset min_offset = file_->tell(); - MPI_Offset max_offset = min_offset + count * datatype->get_extent();//cheating, as we don't care about exact data location, we can skip extent - MPI_Offset* min_offsets = new MPI_Offset[size]; - MPI_Offset* max_offsets = new MPI_Offset[size]; - simgrid::smpi::colls::allgather(&min_offset, 1, MPI_OFFSET, min_offsets, 1, MPI_OFFSET, comm_); - simgrid::smpi::colls::allgather(&max_offset, 1, MPI_OFFSET, max_offsets, 1, MPI_OFFSET, comm_); - MPI_Offset min=min_offset; - MPI_Offset max=max_offset; - MPI_Offset tot= 0; - int empty=1; - for(int i=0;imax) - max=max_offsets[i]; - } - - XBT_CDEBUG(smpi_pmpi, "my offsets to read : %lld:%lld, global min and max %lld:%lld", min_offset, max_offset, min, max); - if(empty==1){ - delete[] min_offsets; - delete[] max_offsets; - status->count=0; - return MPI_SUCCESS; - } - MPI_Offset total = max-min; - if(total==tot && (datatype->flags() & DT_FLAG_CONTIGUOUS)){ - delete[] min_offsets; - delete[] max_offsets; - //contiguous. Just have each proc perform its read - if(status != MPI_STATUS_IGNORE) - status->count=count * datatype->size(); - return T(this,buf,count,datatype, status); - } +/* Read_all, Write_all : loosely based on */ +/* @article{Thakur:1996:ETM:245875.245879,*/ +/* author = {Thakur, Rajeev and Choudhary, Alok},*/ +/* title = {An Extended Two-phase Method for Accessing Sections of Out-of-core Arrays},*/ +/* journal = {Sci. Program.},*/ +/* issue_date = {Winter 1996},*/ +/* pages = {301--317},*/ +/* }*/ +template +int File::op_all(void* buf, int count, const Datatype* datatype, MPI_Status* status) +{ + // get min and max offsets from everyone. + int size = comm_->size(); + int rank = comm_->rank(); + MPI_Offset min_offset = file_->tell(); + MPI_Offset max_offset = + min_offset + + count * datatype->get_extent(); // cheating, as we don't care about exact data location, we can skip extent + std::vector min_offsets(size); + std::vector max_offsets(size); + simgrid::smpi::colls::allgather(&min_offset, 1, MPI_OFFSET, min_offsets.data(), 1, MPI_OFFSET, comm_); + simgrid::smpi::colls::allgather(&max_offset, 1, MPI_OFFSET, max_offsets.data(), 1, MPI_OFFSET, comm_); + MPI_Offset min = min_offset; + MPI_Offset max = max_offset; + MPI_Offset tot = 0; + int empty = 1; + for (int i = 0; i < size; i++) { + if (min_offsets[i] != max_offsets[i]) + empty = 0; + tot += (max_offsets[i] - min_offsets[i]); + if (min_offsets[i] < min) + min = min_offsets[i]; + if (max_offsets[i] > max) + max = max_offsets[i]; + } - //Interleaved case : How much do I need to read, and whom to send it ? - MPI_Offset my_chunk_start=(max-min+1)/size*rank; - MPI_Offset my_chunk_end=((max-min+1)/size*(rank+1)); - XBT_CDEBUG(smpi_pmpi, "my chunks to read : %lld:%lld", my_chunk_start, my_chunk_end); - int* send_sizes = new int[size]; - int* recv_sizes = new int[size]; - int* send_disps = new int[size]; - int* recv_disps = new int[size]; - int total_sent=0; - for(int i=0;irecv as we use recv buffer - if((my_chunk_start>=min_offsets[i] && my_chunk_start < max_offsets[i])|| - ((my_chunk_end<=max_offsets[i]) && my_chunk_end> min_offsets[i])){ - send_sizes[i]=(std::min(max_offsets[i]-1, my_chunk_end-1)-std::max(min_offsets[i], my_chunk_start)); - // store min and max offset to actually read - min_offset=std::min(min_offset, min_offsets[i]); - total_sent+=send_sizes[i]; - XBT_CDEBUG(smpi_pmpi, "will have to send %d bytes to %d", send_sizes[i], i); - } + XBT_CDEBUG(smpi_io, "my offsets to read : %lld:%lld, global min and max %lld:%lld", min_offset, max_offset, min, + max); + if (empty == 1) { + if (status != MPI_STATUS_IGNORE) + status->count = 0; + return MPI_SUCCESS; + } + XBT_CDEBUG(smpi_io, "min:max : %lld:%lld, tot %lld contig %u", min, max, tot, (datatype->flags() & DT_FLAG_CONTIGUOUS)); + if ( size==1 || (max - min == tot && (datatype->flags() & DT_FLAG_CONTIGUOUS))) { + // contiguous. Just have each proc perform its read + if (status != MPI_STATUS_IGNORE) + status->count = count * datatype->size(); + int ret = T(this, buf, count, datatype, status); + seek(max_offset, MPI_SEEK_SET); + return ret; + } + + // Interleaved case : How much do I need to read, and whom to send it ? + MPI_Offset my_chunk_start = min + (max - min + 1) / size * rank; + MPI_Offset my_chunk_end = min + ((max - min + 1) / size * (rank + 1)) +1; + XBT_CDEBUG(smpi_io, "my chunks to read : %lld:%lld", my_chunk_start, my_chunk_end); + std::vector send_sizes(size); + std::vector recv_sizes(size); + std::vector send_disps(size); + std::vector recv_disps(size); + int total_sent = 0; + for (int i = 0; i < size; i++) { + send_sizes[i] = 0; + send_disps[i] = 0; // cheat to avoid issues when send>recv as we use recv buffer + if ((my_chunk_start >= min_offsets[i] && my_chunk_start < max_offsets[i]) || + ((my_chunk_end <= max_offsets[i]) && my_chunk_end > min_offsets[i])) { + send_sizes[i] = (std::min(max_offsets[i], my_chunk_end) - std::max(min_offsets[i], my_chunk_start)); + //we want to send only useful data, so let's pretend we pack it + send_sizes[i]=send_sizes[i]/datatype->get_extent()*datatype->size(); + // store min and max offset to actually read + + min_offset = std::min(min_offset, min_offsets[i]); + total_sent += send_sizes[i]; + XBT_CDEBUG(smpi_io, "will have to send %d bytes to %d", send_sizes[i], i); } - min_offset=std::max(min_offset, my_chunk_start); + } + min_offset = std::max(min_offset, my_chunk_start); - //merge the ranges of every process - std::vector> ranges; - for(int i=0; i> chunks; - chunks.push_back(ranges[0]); + // merge the ranges of every process + std::vector> ranges; + for (int i = 0; i < size; ++i) + ranges.emplace_back(min_offsets[i], max_offsets[i]); + std::sort(ranges.begin(), ranges.end()); + std::vector> chunks; + chunks.push_back(ranges[0]); - unsigned int nchunks=0; - unsigned int i=1; - while(i < ranges.size()){ - if(ranges[i].second>chunks[nchunks].second){ - // else range included - ignore - if(ranges[i].first>chunks[nchunks].second){ - //new disjoint range - chunks.push_back(ranges[i]); - nchunks++; - } else { - //merge ranges - chunks[nchunks].second=ranges[i].second; - } + unsigned int nchunks = 0; + for (unsigned i = 1; i < ranges.size(); i++) { + if (ranges[i].second > chunks[nchunks].second) { + // else range included - ignore + if (ranges[i].first > chunks[nchunks].second) { + // new disjoint range + chunks.push_back(ranges[i]); + nchunks++; + } else { + // merge ranges + chunks[nchunks].second = ranges[i].second; } - i++; - } - //what do I need to read ? - MPI_Offset totreads=0; - for(i=0; i my_chunk_end) - continue; - else - totreads += (std::min(chunks[i].second, my_chunk_end-1)-std::max(chunks[i].first, my_chunk_start)); } - XBT_CDEBUG(smpi_pmpi, "will have to access %lld from my chunk", totreads); + } + // what do I need to read ? + MPI_Offset totreads = 0; + for (auto const& [chunk_start, chunk_end] : chunks) { + if (chunk_end < my_chunk_start) + continue; + else if (chunk_start > my_chunk_end) + continue; + else + totreads += (std::min(chunk_end, my_chunk_end) - std::max(chunk_start, my_chunk_start)); + } + XBT_CDEBUG(smpi_io, "will have to access %lld from my chunk", totreads); - unsigned char* sendbuf = smpi_get_tmp_sendbuffer(total_sent); + unsigned char* sendbuf = smpi_get_tmp_sendbuffer(total_sent); - if(totreads>0){ - seek(min_offset, MPI_SEEK_SET); - T(this,sendbuf,totreads/datatype->size(),datatype, status); - } - simgrid::smpi::colls::alltoall(send_sizes, 1, MPI_INT, recv_sizes, 1, MPI_INT, comm_); - int total_recv=0; - for(int i=0;icount=count * datatype->size(); - smpi_free_tmp_buffer(sendbuf); - delete[] send_sizes; - delete[] recv_sizes; - delete[] send_disps; - delete[] recv_disps; - delete[] min_offsets; - delete[] max_offsets; - return MPI_SUCCESS; + if (totreads > 0) { + seek(min_offset, MPI_SEEK_SET); + T(this, sendbuf, totreads / datatype->get_extent(), datatype, status); + seek(max_offset, MPI_SEEK_SET); } + simgrid::smpi::colls::alltoall(send_sizes.data(), 1, MPI_INT, recv_sizes.data(), 1, MPI_INT, comm_); + int total_recv = 0; + for (int i = 0; i < size; i++) { + recv_disps[i] = total_recv; + total_recv += recv_sizes[i]; + } + // Set buf value to avoid copying dumb data + simgrid::smpi::colls::alltoallv(sendbuf, send_sizes.data(), send_disps.data(), MPI_BYTE, buf, recv_sizes.data(), + recv_disps.data(), MPI_BYTE, comm_); + if (status != MPI_STATUS_IGNORE) + status->count = count * datatype->size(); + smpi_free_tmp_buffer(sendbuf); + return MPI_SUCCESS; } -} +} // namespace simgrid::smpi #endif