Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add new entry in Release_Notes.
[simgrid.git] / src / smpi / mpi / smpi_file.cpp
index 5318322..47c73eb 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 #include "simgrid/s4u/Host.hpp"
 #include "simgrid/plugins/file_system.h"
 
+#include <mutex> // std::scoped_lock
+
 #define FP_SIZE sizeof(MPI_Offset)
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_io, smpi, "Logging specific to SMPI (RMA operations)");
 
 MPI_Errhandler SMPI_default_File_Errhandler =  _smpi_cfg_default_errhandler_is_error ? MPI_ERRORS_ARE_FATAL : MPI_ERRORS_RETURN;;
 
-namespace simgrid{
-namespace smpi{
+namespace simgrid::smpi {
 
 File::File(MPI_Comm comm, const char* filename, int amode, MPI_Info info) : comm_(comm), flags_(amode), info_(info)
 {
@@ -34,10 +35,9 @@ File::File(MPI_Comm comm, const char* filename, int amode, MPI_Info info) : comm
   xbt_assert(not simgrid::s4u::Host::current()->get_disks().empty(),
              "SMPI/IO : Trying to open file on a diskless host ! Add one to your platform file");
 
-  size_t found = fullname.find('/');
   // in case no fullpath is provided ... just pick the first mountpoint.
-  if (found == std::string::npos || fullname.rfind("./", 1) != std::string::npos) {
-    auto disk = simgrid::s4u::Host::current()->get_disks().front();
+  if (size_t found = fullname.find('/'); found == std::string::npos || fullname.rfind("./", 1) != std::string::npos) {
+    const auto* disk = simgrid::s4u::Host::current()->get_disks().front();
     std::string mount;
     if (disk->get_host() != simgrid::s4u::Host::current())
       mount = disk->extension<simgrid::s4u::FileSystemDiskExt>()->get_mount_point(disk->get_host());
@@ -51,15 +51,17 @@ File::File(MPI_Comm comm, const char* filename, int amode, MPI_Info info) : comm
       fullname.insert(0, mount);
     }
   }
-
+  XBT_DEBUG("Opening %s", fullname.c_str());
   file_ = simgrid::s4u::File::open(fullname, nullptr);
   list_ = nullptr;
+  disp_ = 0;
+  etype_ = MPI_BYTE;
+  atomicity_ = true;
   if (comm_->rank() == 0) {
     int size    = comm_->size() + FP_SIZE;
-    list_       = new char[size];
+    list_       = new char[size]();
     errhandler_ = SMPI_default_File_Errhandler;
     errhandler_->ref();
-    memset(list_, 0, size);
     shared_file_pointer_  = new MPI_Offset();
     shared_mutex_         = s4u::Mutex::create();
     *shared_file_pointer_ = 0;
@@ -110,15 +112,14 @@ int File::del(const char* filename, const Info*)
 
 int File::get_position(MPI_Offset* offset) const
 {
-  *offset = file_->tell();
+  *offset = file_->tell()/etype_->get_extent();
   return MPI_SUCCESS;
 }
 
 int File::get_position_shared(MPI_Offset* offset) const
 {
-  shared_mutex_->lock();
-  *offset = *shared_file_pointer_;
-  shared_mutex_->unlock();
+  const std::scoped_lock lock(*shared_mutex_);
+  *offset = *shared_file_pointer_/etype_->get_extent();
   return MPI_SUCCESS;
 }
 
@@ -145,10 +146,9 @@ int File::seek(MPI_Offset offset, int whence)
 
 int File::seek_shared(MPI_Offset offset, int whence)
 {
-  shared_mutex_->lock();
+  const std::scoped_lock lock(*shared_mutex_);
   seek(offset, whence);
   *shared_file_pointer_ = file_->tell();
-  shared_mutex_->unlock();
   return MPI_SUCCESS;
 }
 
@@ -158,9 +158,9 @@ int File::read(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype,
   MPI_Offset position = fh->file_->tell();
   MPI_Offset movesize = datatype->get_extent() * count;
   MPI_Offset readsize = datatype->size() * count;
-  XBT_DEBUG("Position before read in MPI_File %s : %llu", fh->file_->get_path(), fh->file_->tell());
+  XBT_DEBUG("Position before read in MPI_File %s : %llu, size %llu", fh->file_->get_path(), fh->file_->tell(), fh->file_->size());
   MPI_Offset read = fh->file_->read(readsize);
-  XBT_VERB("Read in MPI_File %s, %lld bytes read, readsize %lld bytes, movesize %lld", fh->file_->get_path(), read,
+  XBT_VERB("Read in MPI_File %s, %lld bytes read, count %d, readsize %lld bytes, movesize %lld", fh->file_->get_path(), read, count,
            readsize, movesize);
   if (readsize != movesize) {
     fh->file_->seek(position + movesize, SEEK_SET);
@@ -183,11 +183,12 @@ int File::read(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype,
 /* }*/
 int File::read_shared(MPI_File fh, void* buf, int count, const Datatype* datatype, MPI_Status* status)
 {
-  fh->shared_mutex_->lock();
+  if (const std::scoped_lock lock(*fh->shared_mutex_); true) {
+    fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET);
+    read(fh, buf, count, datatype, status);
+    *(fh->shared_file_pointer_) = fh->file_->tell();
+  }
   fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET);
-  read(fh, buf, count, datatype, status);
-  *(fh->shared_file_pointer_) = fh->file_->tell();
-  fh->shared_mutex_->unlock();
   return MPI_SUCCESS;
 }
 
@@ -203,15 +204,17 @@ int File::read_ordered(MPI_File fh, void* buf, int count, const Datatype* dataty
 
   MPI_Offset result;
   simgrid::smpi::colls::scan(&val, &result, 1, MPI_OFFSET, MPI_SUM, fh->comm_);
+  MPI_Offset prev;
+  fh->get_position(&prev);
   fh->seek(result, MPI_SEEK_SET);
   int ret = fh->op_all<simgrid::smpi::File::read>(buf, count, datatype, status);
   if (fh->comm_->rank() == fh->comm_->size() - 1) {
-    fh->shared_mutex_->lock();
+    const std::scoped_lock lock(*fh->shared_mutex_);
     *(fh->shared_file_pointer_)=fh->file_->tell();
-    fh->shared_mutex_->unlock();
   }
   char c;
   simgrid::smpi::colls::bcast(&c, 1, MPI_BYTE, fh->comm_->size() - 1, fh->comm_);
+  fh->seek(prev, MPI_SEEK_SET);
   return ret;
 }
 
@@ -221,10 +224,10 @@ int File::write(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype,
   MPI_Offset position  = fh->file_->tell();
   MPI_Offset movesize  = datatype->get_extent() * count;
   MPI_Offset writesize = datatype->size() * count;
-  XBT_DEBUG("Position before write in MPI_File %s : %llu", fh->file_->get_path(), fh->file_->tell());
+  XBT_DEBUG("Position before write in MPI_File %s : %llu, size %llu", fh->file_->get_path(), fh->file_->tell(), fh->file_->size());
   MPI_Offset write = fh->file_->write(writesize, true);
-  XBT_VERB("Write in MPI_File %s, %lld bytes written, readsize %lld bytes, movesize %lld", fh->file_->get_path(), write,
-           writesize, movesize);
+  XBT_VERB("Write in MPI_File %s, %lld bytes written, count %d, writesize %lld bytes, movesize %lld", fh->file_->get_path(), write,
+           count, writesize, movesize);
   if (writesize != movesize) {
     fh->file_->seek(position + movesize, SEEK_SET);
   }
@@ -236,13 +239,13 @@ int File::write(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype,
 
 int File::write_shared(MPI_File fh, const void* buf, int count, const Datatype* datatype, MPI_Status* status)
 {
-  fh->shared_mutex_->lock();
+  const std::scoped_lock lock(*fh->shared_mutex_);
   XBT_DEBUG("Write shared on %s - Shared ptr before : %lld", fh->file_->get_path(), *(fh->shared_file_pointer_));
   fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET);
   write(fh, const_cast<void*>(buf), count, datatype, status);
   *(fh->shared_file_pointer_) = fh->file_->tell();
   XBT_DEBUG("Write shared on %s - Shared ptr after : %lld", fh->file_->get_path(), *(fh->shared_file_pointer_));
-  fh->shared_mutex_->unlock();
+  fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET);
   return MPI_SUCCESS;
 }
 
@@ -257,29 +260,39 @@ int File::write_ordered(MPI_File fh, const void* buf, int count, const Datatype*
   }
   MPI_Offset result;
   simgrid::smpi::colls::scan(&val, &result, 1, MPI_OFFSET, MPI_SUM, fh->comm_);
+  MPI_Offset prev;
+  fh->get_position(&prev);
   fh->seek(result, MPI_SEEK_SET);
   int ret = fh->op_all<simgrid::smpi::File::write>(const_cast<void*>(buf), count, datatype, status);
   if (fh->comm_->rank() == fh->comm_->size() - 1) {
-    fh->shared_mutex_->lock();
+    const std::scoped_lock lock(*fh->shared_mutex_);
     *(fh->shared_file_pointer_)=fh->file_->tell();
-    fh->shared_mutex_->unlock();
   }
   char c;
   simgrid::smpi::colls::bcast(&c, 1, MPI_BYTE, fh->comm_->size() - 1, fh->comm_);
+  fh->seek(prev, MPI_SEEK_SET);
   return ret;
 }
 
-int File::set_view(MPI_Offset /*disp*/, MPI_Datatype etype, MPI_Datatype filetype, const char* datarep, const Info*)
+int File::set_view(MPI_Offset disp, MPI_Datatype etype, MPI_Datatype filetype, const char* datarep, const Info*)
 {
   etype_    = etype;
   filetype_ = filetype;
-  datarep_  = std::string(datarep);
-  seek_shared(0, MPI_SEEK_SET);
+  datarep_  = datarep;
+  disp_     = disp;
+  if (comm_->rank() == 0){
+    if(disp != MPI_DISPLACEMENT_CURRENT)
+      seek_shared(disp, MPI_SEEK_SET);
+    else
+      seek_shared(0, MPI_SEEK_CUR);
+  }
+  sync();
   return MPI_SUCCESS;
 }
 
-int File::get_view(MPI_Offset* /*disp*/, MPI_Datatype* etype, MPI_Datatype* filetype, char* datarep) const
+int File::get_view(MPI_Offset* disp, MPI_Datatype* etype, MPI_Datatype* filetype, char* datarep) const
 {
+  *disp     = disp_;
   *etype    = etype_;
   *filetype = filetype_;
   snprintf(datarep, MPI_MAX_NAME_STRING + 1, "%s", datarep_.c_str());
@@ -301,6 +314,11 @@ int File::flags() const
   return flags_;
 }
 
+MPI_Datatype File::etype() const
+{
+  return etype_;
+}
+
 int File::sync()
 {
   // no idea
@@ -346,5 +364,14 @@ File* File::f2c(int id)
 {
   return static_cast<File*>(F2C::f2c(id));
 }
-} // namespace smpi
-} // namespace simgrid
+
+void File::set_atomicity(bool a){
+  atomicity_ = a;
+}
+
+bool File::get_atomicity() const
+{
+  return atomicity_;
+}
+
+} // namespace simgrid::smpi