Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Reduce scope for temporary variables.
[simgrid.git] / src / smpi / mpi / smpi_file.cpp
1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5 #include "private.hpp"
6
7 #include "smpi_comm.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_info.hpp"
11 #include "smpi_win.hpp"
12 #include "smpi_request.hpp"
13
14 #include "smpi_file.hpp"
15 #include "smpi_status.hpp"
16 #include "simgrid/s4u/Disk.hpp"
17 #include "simgrid/s4u/Host.hpp"
18 #include "simgrid/plugins/file_system.h"
19
20 #define FP_SIZE sizeof(MPI_Offset)
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_io, smpi, "Logging specific to SMPI (RMA operations)");
23
24 MPI_Errhandler SMPI_default_File_Errhandler =  _smpi_cfg_default_errhandler_is_error ? MPI_ERRORS_ARE_FATAL : MPI_ERRORS_RETURN;;
25
26 namespace simgrid{
27 namespace smpi{
28
29 File::File(MPI_Comm comm, const char* filename, int amode, MPI_Info info) : comm_(comm), flags_(amode), info_(info)
30 {
31   if (info_ != MPI_INFO_NULL)
32     info_->ref();
33   std::string fullname = filename;
34   xbt_assert(not simgrid::s4u::Host::current()->get_disks().empty(),
35              "SMPI/IO : Trying to open file on a diskless host ! Add one to your platform file");
36
37   // in case no fullpath is provided ... just pick the first mountpoint.
38   if (size_t found = fullname.find('/'); found == std::string::npos || fullname.rfind("./", 1) != std::string::npos) {
39     auto disk = simgrid::s4u::Host::current()->get_disks().front();
40     std::string mount;
41     if (disk->get_host() != simgrid::s4u::Host::current())
42       mount = disk->extension<simgrid::s4u::FileSystemDiskExt>()->get_mount_point(disk->get_host());
43     else
44       mount = disk->extension<simgrid::s4u::FileSystemDiskExt>()->get_mount_point();
45     XBT_DEBUG("No absolute path given for file opening, use '%s'", mount.c_str());
46     if (fullname.rfind("./", 1) != std::string::npos)
47       fullname.replace(fullname.begin(), fullname.begin() + 1, mount);
48     else {
49       mount.append("/");
50       fullname.insert(0, mount);
51     }
52   }
53
54   file_ = simgrid::s4u::File::open(fullname, nullptr);
55   list_ = nullptr;
56   if (comm_->rank() == 0) {
57     int size    = comm_->size() + FP_SIZE;
58     list_       = new char[size];
59     errhandler_ = SMPI_default_File_Errhandler;
60     errhandler_->ref();
61     memset(list_, 0, size);
62     shared_file_pointer_  = new MPI_Offset();
63     shared_mutex_         = s4u::Mutex::create();
64     *shared_file_pointer_ = 0;
65     win_                  = new Win(list_, size, 1, MPI_INFO_NULL, comm_);
66   } else {
67     errhandler_ = MPI_ERRHANDLER_NULL;
68     win_        = new Win(list_, 0, 1, MPI_INFO_NULL, comm_);
69   }
70   simgrid::smpi::colls::bcast(&shared_file_pointer_, 1, MPI_AINT, 0, comm);
71   simgrid::smpi::colls::bcast(&shared_mutex_, 1, MPI_AINT, 0, comm);
72   if (comm_->rank() != 0)
73     intrusive_ptr_add_ref(&*shared_mutex_);
74   this->add_f();
75 }
76
77 File::~File()
78 {
79   if (comm_->rank() == 0) {
80     delete shared_file_pointer_;
81     delete[] list_;
82   }
83   simgrid::smpi::Win::del(win_);
84   file_->close();
85   F2C::free_f(this->f2c_id());
86   if (info_ != MPI_INFO_NULL)
87     simgrid::smpi::Info::unref(info_);
88   if (errhandler_ != MPI_ERRHANDLER_NULL)
89     simgrid::smpi::Errhandler::unref(errhandler_);
90 }
91
92 int File::close(MPI_File* fh)
93 {
94   XBT_DEBUG("Closing MPI_File %s", (*fh)->file_->get_path());
95   (*fh)->sync();
96   if ((*fh)->flags() & MPI_MODE_DELETE_ON_CLOSE)
97     (*fh)->file_->unlink();
98   delete (*fh);
99   return MPI_SUCCESS;
100 }
101
102 int File::del(const char* filename, const Info*)
103 {
104   // get the file with MPI_MODE_DELETE_ON_CLOSE and then close it
105   auto* f = new File(MPI_COMM_SELF, filename, MPI_MODE_DELETE_ON_CLOSE | MPI_MODE_RDWR, nullptr);
106   close(&f);
107   return MPI_SUCCESS;
108 }
109
110 int File::get_position(MPI_Offset* offset) const
111 {
112   *offset = file_->tell();
113   return MPI_SUCCESS;
114 }
115
116 int File::get_position_shared(MPI_Offset* offset) const
117 {
118   shared_mutex_->lock();
119   *offset = *shared_file_pointer_;
120   shared_mutex_->unlock();
121   return MPI_SUCCESS;
122 }
123
124 int File::seek(MPI_Offset offset, int whence)
125 {
126   switch (whence) {
127     case MPI_SEEK_SET:
128       XBT_VERB("Seeking in MPI_File %s, setting offset %lld", file_->get_path(), offset);
129       file_->seek(offset, SEEK_SET);
130       break;
131     case MPI_SEEK_CUR:
132       XBT_VERB("Seeking in MPI_File %s, current offset + %lld", file_->get_path(), offset);
133       file_->seek(offset, SEEK_CUR);
134       break;
135     case MPI_SEEK_END:
136       XBT_VERB("Seeking in MPI_File %s, end offset + %lld", file_->get_path(), offset);
137       file_->seek(offset, SEEK_END);
138       break;
139     default:
140       return MPI_ERR_FILE;
141   }
142   return MPI_SUCCESS;
143 }
144
145 int File::seek_shared(MPI_Offset offset, int whence)
146 {
147   shared_mutex_->lock();
148   seek(offset, whence);
149   *shared_file_pointer_ = file_->tell();
150   shared_mutex_->unlock();
151   return MPI_SUCCESS;
152 }
153
154 int File::read(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype, MPI_Status* status)
155 {
156   // get position first as we may be doing non contiguous reads and it will probably be updated badly
157   MPI_Offset position = fh->file_->tell();
158   MPI_Offset movesize = datatype->get_extent() * count;
159   MPI_Offset readsize = datatype->size() * count;
160   XBT_DEBUG("Position before read in MPI_File %s : %llu", fh->file_->get_path(), fh->file_->tell());
161   MPI_Offset read = fh->file_->read(readsize);
162   XBT_VERB("Read in MPI_File %s, %lld bytes read, readsize %lld bytes, movesize %lld", fh->file_->get_path(), read,
163            readsize, movesize);
164   if (readsize != movesize) {
165     fh->file_->seek(position + movesize, SEEK_SET);
166   }
167   XBT_VERB("Position after read in MPI_File %s : %llu", fh->file_->get_path(), fh->file_->tell());
168   if (status != MPI_STATUS_IGNORE)
169     status->count = count * datatype->size();
170   return MPI_SUCCESS;
171 }
172
173 /*Ordered and Shared Versions, with RMA-based locks : Based on the model described in :*/
174 /* @InProceedings{10.1007/11557265_15,*/
175 /* author="Latham, Robert and Ross, Robert and Thakur, Rajeev and Toonen, Brian",*/
176 /* title="Implementing MPI-IO Shared File Pointers Without File System Support",*/
177 /* booktitle="Recent Advances in Parallel Virtual Machine and Message Passing Interface",*/
178 /* year="2005",*/
179 /* publisher="Springer Berlin Heidelberg",*/
180 /* address="Berlin, Heidelberg",*/
181 /* pages="84--93"*/
182 /* }*/
183 int File::read_shared(MPI_File fh, void* buf, int count, const Datatype* datatype, MPI_Status* status)
184 {
185   fh->shared_mutex_->lock();
186   fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET);
187   read(fh, buf, count, datatype, status);
188   *(fh->shared_file_pointer_) = fh->file_->tell();
189   fh->shared_mutex_->unlock();
190   return MPI_SUCCESS;
191 }
192
193 int File::read_ordered(MPI_File fh, void* buf, int count, const Datatype* datatype, MPI_Status* status)
194 {
195   // 0 needs to get the shared pointer value
196   MPI_Offset val;
197   if (fh->comm_->rank() == 0) {
198     val = *(fh->shared_file_pointer_);
199   } else {
200     val = count * datatype->size();
201   }
202
203   MPI_Offset result;
204   simgrid::smpi::colls::scan(&val, &result, 1, MPI_OFFSET, MPI_SUM, fh->comm_);
205   fh->seek(result, MPI_SEEK_SET);
206   int ret = fh->op_all<simgrid::smpi::File::read>(buf, count, datatype, status);
207   if (fh->comm_->rank() == fh->comm_->size() - 1) {
208     fh->shared_mutex_->lock();
209     *(fh->shared_file_pointer_)=fh->file_->tell();
210     fh->shared_mutex_->unlock();
211   }
212   char c;
213   simgrid::smpi::colls::bcast(&c, 1, MPI_BYTE, fh->comm_->size() - 1, fh->comm_);
214   return ret;
215 }
216
217 int File::write(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype, MPI_Status* status)
218 {
219   // get position first as we may be doing non contiguous reads and it will probably be updated badly
220   MPI_Offset position  = fh->file_->tell();
221   MPI_Offset movesize  = datatype->get_extent() * count;
222   MPI_Offset writesize = datatype->size() * count;
223   XBT_DEBUG("Position before write in MPI_File %s : %llu", fh->file_->get_path(), fh->file_->tell());
224   MPI_Offset write = fh->file_->write(writesize, true);
225   XBT_VERB("Write in MPI_File %s, %lld bytes written, readsize %lld bytes, movesize %lld", fh->file_->get_path(), write,
226            writesize, movesize);
227   if (writesize != movesize) {
228     fh->file_->seek(position + movesize, SEEK_SET);
229   }
230   XBT_VERB("Position after write in MPI_File %s : %llu", fh->file_->get_path(), fh->file_->tell());
231   if (status != MPI_STATUS_IGNORE)
232     status->count = count * datatype->size();
233   return MPI_SUCCESS;
234 }
235
236 int File::write_shared(MPI_File fh, const void* buf, int count, const Datatype* datatype, MPI_Status* status)
237 {
238   fh->shared_mutex_->lock();
239   XBT_DEBUG("Write shared on %s - Shared ptr before : %lld", fh->file_->get_path(), *(fh->shared_file_pointer_));
240   fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET);
241   write(fh, const_cast<void*>(buf), count, datatype, status);
242   *(fh->shared_file_pointer_) = fh->file_->tell();
243   XBT_DEBUG("Write shared on %s - Shared ptr after : %lld", fh->file_->get_path(), *(fh->shared_file_pointer_));
244   fh->shared_mutex_->unlock();
245   return MPI_SUCCESS;
246 }
247
248 int File::write_ordered(MPI_File fh, const void* buf, int count, const Datatype* datatype, MPI_Status* status)
249 {
250   // 0 needs to get the shared pointer value
251   MPI_Offset val;
252   if (fh->comm_->rank() == 0) {
253     val = *(fh->shared_file_pointer_);
254   } else {
255     val = count * datatype->size();
256   }
257   MPI_Offset result;
258   simgrid::smpi::colls::scan(&val, &result, 1, MPI_OFFSET, MPI_SUM, fh->comm_);
259   fh->seek(result, MPI_SEEK_SET);
260   int ret = fh->op_all<simgrid::smpi::File::write>(const_cast<void*>(buf), count, datatype, status);
261   if (fh->comm_->rank() == fh->comm_->size() - 1) {
262     fh->shared_mutex_->lock();
263     *(fh->shared_file_pointer_)=fh->file_->tell();
264     fh->shared_mutex_->unlock();
265   }
266   char c;
267   simgrid::smpi::colls::bcast(&c, 1, MPI_BYTE, fh->comm_->size() - 1, fh->comm_);
268   return ret;
269 }
270
271 int File::set_view(MPI_Offset /*disp*/, MPI_Datatype etype, MPI_Datatype filetype, const char* datarep, const Info*)
272 {
273   etype_    = etype;
274   filetype_ = filetype;
275   datarep_  = std::string(datarep);
276   seek_shared(0, MPI_SEEK_SET);
277   return MPI_SUCCESS;
278 }
279
280 int File::get_view(MPI_Offset* /*disp*/, MPI_Datatype* etype, MPI_Datatype* filetype, char* datarep) const
281 {
282   *etype    = etype_;
283   *filetype = filetype_;
284   snprintf(datarep, MPI_MAX_NAME_STRING + 1, "%s", datarep_.c_str());
285   return MPI_SUCCESS;
286 }
287
288 int File::size() const
289 {
290   return file_->size();
291 }
292
293 void File::set_size(int size)
294 {
295   file_->write(size, true);
296 }
297
298 int File::flags() const
299 {
300   return flags_;
301 }
302
303 int File::sync()
304 {
305   // no idea
306   return simgrid::smpi::colls::barrier(comm_);
307 }
308
309 MPI_Info File::info()
310 {
311   return info_;
312 }
313
314 void File::set_info(MPI_Info info)
315 {
316   if (info_ != MPI_INFO_NULL)
317     simgrid::smpi::Info::unref(info_);
318   info_ = info;
319   if (info_ != MPI_INFO_NULL)
320     info_->ref();
321 }
322
323 MPI_Comm File::comm() const
324 {
325   return comm_;
326 }
327
328 MPI_Errhandler File::errhandler()
329 {
330   if (errhandler_ != MPI_ERRHANDLER_NULL)
331     errhandler_->ref();
332   return errhandler_;
333 }
334
335 void File::set_errhandler(MPI_Errhandler errhandler)
336 {
337   if (errhandler_ != MPI_ERRHANDLER_NULL)
338     simgrid::smpi::Errhandler::unref(errhandler_);
339   errhandler_ = errhandler;
340   if (errhandler_ != MPI_ERRHANDLER_NULL)
341     errhandler_->ref();
342 }
343
344 File* File::f2c(int id)
345 {
346   return static_cast<File*>(F2C::f2c(id));
347 }
348 } // namespace smpi
349 } // namespace simgrid