X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/ea6829292c6de875ade6f45c7146175a23a092e3..98755faee042e94d1ff52f6e9508b18015bb1ae5:/src/smpi/include/smpi_file.hpp diff --git a/src/smpi/include/smpi_file.hpp b/src/smpi/include/smpi_file.hpp index 72efd93580..3feb57f696 100644 --- a/src/smpi/include/smpi_file.hpp +++ b/src/smpi/include/smpi_file.hpp @@ -1,182 +1,196 @@ -/* Copyright (c) 2010-2019. The SimGrid Team. - * All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#ifndef SMPI_FILE_HPP_INCLUDED -#define SMPI_FILE_HPP_INCLUDED -#include "simgrid/plugins/file_system.h" -#include "smpi_comm.hpp" -#include "smpi_coll.hpp" -#include "smpi_datatype.hpp" -#include "smpi_info.hpp" -#include - - -namespace simgrid{ -namespace smpi{ -class File{ - MPI_Comm comm_; - int flags_; - simgrid::s4u::File* file_; - MPI_Info info_; - MPI_Offset* shared_file_pointer_; - s4u::MutexPtr shared_mutex_; - MPI_Win win_; - char* list_; - public: - File(MPI_Comm comm, char *filename, int amode, MPI_Info info); - ~File(); - int size(); - int get_position(MPI_Offset* offset); - int get_position_shared(MPI_Offset* offset); - int flags(); - int sync(); - int seek(MPI_Offset offset, int whence); - int seek_shared(MPI_Offset offset, int whence); - MPI_Info info(); - void set_info( MPI_Info info); - static int read(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int read_shared(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int read_ordered(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int write(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int write_shared(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int write_ordered(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - template int op_all(void *buf, int count,MPI_Datatype datatype, MPI_Status *status); - static int close(MPI_File *fh); - static int del(char *filename, MPI_Info info); -}; - - /* Read_all, Write_all : loosely based on */ - /* @article{Thakur:1996:ETM:245875.245879,*/ - /* author = {Thakur, Rajeev and Choudhary, Alok},*/ - /* title = {An Extended Two-phase Method for Accessing Sections of Out-of-core Arrays},*/ - /* journal = {Sci. Program.},*/ - /* issue_date = {Winter 1996},*/ - /* pages = {301--317},*/ - /* }*/ - template - int File::op_all(void *buf, int count, MPI_Datatype datatype, MPI_Status *status){ - //get min and max offsets from everyone. - int size = comm_->size(); - int rank = comm_-> rank(); - MPI_Offset min_offset = file_->tell(); - MPI_Offset max_offset = (min_offset + count * datatype->size());//cheating, as we don't care about exact data location, we can skip extent - MPI_Offset* min_offsets = xbt_new(MPI_Offset, size); - MPI_Offset* max_offsets = xbt_new(MPI_Offset, size); - simgrid::smpi::Colls::allgather(&min_offset, 1, MPI_OFFSET, min_offsets, 1, MPI_OFFSET, comm_); - simgrid::smpi::Colls::allgather(&max_offset, 1, MPI_OFFSET, max_offsets, 1, MPI_OFFSET, comm_); - MPI_Offset min=min_offset; - MPI_Offset max=max_offset; - MPI_Offset tot= 0; - int empty=1; - for(int i=0;imax) - max=max_offsets[i]; - } - - XBT_DEBUG("my offsets to read : %lld:%lld, global min and max %lld:%lld", min_offset, max_offset, min, max); - if(empty==1){ - status->count=0; - return MPI_SUCCESS; - } - MPI_Offset total = max-min; - if(total==tot && (datatype->flags() & DT_FLAG_CONTIGUOUS)){ - //contiguous. Just have each proc perform its read - status->count=count * datatype->size(); - return T(this,buf,count,datatype, status); - } - - //Interleaved case : How much do I need to read, and whom to send it ? - MPI_Offset my_chunk_start=(max-min+1)/size*rank; - MPI_Offset my_chunk_end=((max-min+1)/size*(rank+1)); - XBT_DEBUG("my chunks to read : %lld:%lld", my_chunk_start, my_chunk_end); - int* send_sizes = xbt_new0(int, size); - int* recv_sizes = xbt_new(int, size); - int* send_disps = xbt_new(int, size); - int* recv_disps = xbt_new(int, size); - int total_sent=0; - for(int i=0;i=min_offsets[i] && my_chunk_start < max_offsets[i])|| - ((my_chunk_end<=max_offsets[i]) && my_chunk_end> min_offsets[i])){ - send_sizes[i]=(std::min(max_offsets[i]-1, my_chunk_end-1)-std::max(min_offsets[i], my_chunk_start)); - //store min and max offest to actually read - min_offset=std::min(min_offset, min_offsets[i]); - send_disps[i]=0;//send_sizes[i]; cheat to avoid issues when send>recv as we use recv buffer - total_sent+=send_sizes[i]; - XBT_DEBUG("will have to send %d bytes to %d", send_sizes[i], i); - } - } - min_offset=std::max(min_offset, my_chunk_start); - - //merge the ranges of every process - std::vector> ranges; - for(int i=0; i> chunks; - chunks.push_back(ranges[0]); - - unsigned int nchunks=0; - unsigned int i=1; - while(i < ranges.size()){ - if(ranges[i].second>chunks[nchunks].second){ - // else range included - ignore - if(ranges[i].first>chunks[nchunks].second){ - //new disjoint range - chunks.push_back(ranges[i]); - nchunks++; - } else { - //merge ranges - chunks[nchunks].second=ranges[i].second; - } - } - i++; - } - //what do I need to read ? - MPI_Offset totreads=0; - for(i=0; i my_chunk_end) - continue; - else - totreads += (std::min(chunks[i].second, my_chunk_end-1)-std::max(chunks[i].first, my_chunk_start)); - } - XBT_DEBUG("will have to access %lld from my chunk", totreads); - - char* sendbuf= static_cast(smpi_get_tmp_sendbuffer(totreads)); - - if(totreads>0){ - seek(min_offset, MPI_SEEK_SET); - T(this,sendbuf,totreads/datatype->size(),datatype, status); - } - simgrid::smpi::Colls::alltoall(send_sizes, 1, MPI_INT, recv_sizes, 1, MPI_INT, comm_); - int total_recv=0; - for(int i=0;icount=count * datatype->size(); - smpi_free_tmp_buffer(sendbuf); - xbt_free(send_sizes); - xbt_free(recv_sizes); - xbt_free(send_disps); - xbt_free(recv_disps); - xbt_free(min_offsets); - xbt_free(max_offsets); - return MPI_SUCCESS; - } -} -} - -#endif +/* Copyright (c) 2010-2019. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SMPI_FILE_HPP_INCLUDED +#define SMPI_FILE_HPP_INCLUDED +#include "simgrid/plugins/file_system.h" +#include "smpi_comm.hpp" +#include "smpi_coll.hpp" +#include "smpi_datatype.hpp" +#include "smpi_errhandler.hpp" +#include "smpi_info.hpp" +#include + +XBT_LOG_EXTERNAL_CATEGORY(smpi_pmpi); + +namespace simgrid{ +namespace smpi{ +class File{ + MPI_Comm comm_; + int flags_; + simgrid::s4u::File* file_; + MPI_Info info_; + MPI_Offset* shared_file_pointer_; + s4u::MutexPtr shared_mutex_; + MPI_Win win_; + char* list_; + MPI_Errhandler errhandler_; + + public: + File(MPI_Comm comm, const char *filename, int amode, MPI_Info info); + File(const File&) = delete; + File& operator=(const File&) = delete; + ~File(); + int size(); + int get_position(MPI_Offset* offset); + int get_position_shared(MPI_Offset* offset); + int flags(); + MPI_Comm comm(); + int sync(); + int seek(MPI_Offset offset, int whence); + int seek_shared(MPI_Offset offset, int whence); + MPI_Info info(); + void set_info( MPI_Info info); + static int read(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); + static int read_shared(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); + static int read_ordered(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); + static int write(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); + static int write_shared(MPI_File fh, const void *buf, int count,MPI_Datatype datatype, MPI_Status *status); + static int write_ordered(MPI_File fh, const void *buf, int count,MPI_Datatype datatype, MPI_Status *status); + template int op_all(void *buf, int count,MPI_Datatype datatype, MPI_Status *status); + static int close(MPI_File *fh); + static int del(const char *filename, MPI_Info info); + MPI_Errhandler errhandler(); + void set_errhandler( MPI_Errhandler errhandler); +}; + + /* Read_all, Write_all : loosely based on */ + /* @article{Thakur:1996:ETM:245875.245879,*/ + /* author = {Thakur, Rajeev and Choudhary, Alok},*/ + /* title = {An Extended Two-phase Method for Accessing Sections of Out-of-core Arrays},*/ + /* journal = {Sci. Program.},*/ + /* issue_date = {Winter 1996},*/ + /* pages = {301--317},*/ + /* }*/ + template + int File::op_all(void *buf, int count, MPI_Datatype datatype, MPI_Status *status){ + //get min and max offsets from everyone. + int size = comm_->size(); + int rank = comm_-> rank(); + MPI_Offset min_offset = file_->tell(); + MPI_Offset max_offset = (min_offset + count * datatype->size());//cheating, as we don't care about exact data location, we can skip extent + MPI_Offset* min_offsets = new MPI_Offset[size]; + MPI_Offset* max_offsets = new MPI_Offset[size]; + simgrid::smpi::Colls::allgather(&min_offset, 1, MPI_OFFSET, min_offsets, 1, MPI_OFFSET, comm_); + simgrid::smpi::Colls::allgather(&max_offset, 1, MPI_OFFSET, max_offsets, 1, MPI_OFFSET, comm_); + MPI_Offset min=min_offset; + MPI_Offset max=max_offset; + MPI_Offset tot= 0; + int empty=1; + for(int i=0;imax) + max=max_offsets[i]; + } + + XBT_CDEBUG(smpi_pmpi, "my offsets to read : %lld:%lld, global min and max %lld:%lld", min_offset, max_offset, min, max); + if(empty==1){ + delete[] min_offsets; + delete[] max_offsets; + status->count=0; + return MPI_SUCCESS; + } + MPI_Offset total = max-min; + if(total==tot && (datatype->flags() & DT_FLAG_CONTIGUOUS)){ + delete[] min_offsets; + delete[] max_offsets; + //contiguous. Just have each proc perform its read + status->count=count * datatype->size(); + return T(this,buf,count,datatype, status); + } + + //Interleaved case : How much do I need to read, and whom to send it ? + MPI_Offset my_chunk_start=(max-min+1)/size*rank; + MPI_Offset my_chunk_end=((max-min+1)/size*(rank+1)); + XBT_CDEBUG(smpi_pmpi, "my chunks to read : %lld:%lld", my_chunk_start, my_chunk_end); + int* send_sizes = new int[size]; + int* recv_sizes = new int[size]; + int* send_disps = new int[size]; + int* recv_disps = new int[size]; + int total_sent=0; + for(int i=0;irecv as we use recv buffer + if((my_chunk_start>=min_offsets[i] && my_chunk_start < max_offsets[i])|| + ((my_chunk_end<=max_offsets[i]) && my_chunk_end> min_offsets[i])){ + send_sizes[i]=(std::min(max_offsets[i]-1, my_chunk_end-1)-std::max(min_offsets[i], my_chunk_start)); + //store min and max offest to actually read + min_offset=std::min(min_offset, min_offsets[i]); + total_sent+=send_sizes[i]; + XBT_CDEBUG(smpi_pmpi, "will have to send %d bytes to %d", send_sizes[i], i); + } + } + min_offset=std::max(min_offset, my_chunk_start); + + //merge the ranges of every process + std::vector> ranges; + for(int i=0; i> chunks; + chunks.push_back(ranges[0]); + + unsigned int nchunks=0; + unsigned int i=1; + while(i < ranges.size()){ + if(ranges[i].second>chunks[nchunks].second){ + // else range included - ignore + if(ranges[i].first>chunks[nchunks].second){ + //new disjoint range + chunks.push_back(ranges[i]); + nchunks++; + } else { + //merge ranges + chunks[nchunks].second=ranges[i].second; + } + } + i++; + } + //what do I need to read ? + MPI_Offset totreads=0; + for(i=0; i my_chunk_end) + continue; + else + totreads += (std::min(chunks[i].second, my_chunk_end-1)-std::max(chunks[i].first, my_chunk_start)); + } + XBT_CDEBUG(smpi_pmpi, "will have to access %lld from my chunk", totreads); + + unsigned char* sendbuf = smpi_get_tmp_sendbuffer(total_sent); + + if(totreads>0){ + seek(min_offset, MPI_SEEK_SET); + T(this,sendbuf,totreads/datatype->size(),datatype, status); + } + simgrid::smpi::Colls::alltoall(send_sizes, 1, MPI_INT, recv_sizes, 1, MPI_INT, comm_); + int total_recv=0; + for(int i=0;icount=count * datatype->size(); + smpi_free_tmp_buffer(sendbuf); + delete[] send_sizes; + delete[] recv_sizes; + delete[] send_disps; + delete[] recv_disps; + delete[] min_offsets; + delete[] max_offsets; + return MPI_SUCCESS; + } +} +} + +#endif