hdf5serie  2.0.0
HDF5 Serie
file.h
1/* Copyright (C) 2009 Markus Friedrich
2 *
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
7 *
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 *
17 * Contact:
18 * friedrich.at.gc@googlemail.com
19 *
20 */
21
22#ifndef _HDF5SERIE_FILE_H_
23#define _HDF5SERIE_FILE_H_
24
25#include <hdf5serie/group.h>
26#include <boost/filesystem.hpp>
27#include <boost/interprocess/sync/interprocess_mutex.hpp>
28#include <boost/interprocess/sync/scoped_lock.hpp>
29#ifdef _WIN32
30 #include <boost/interprocess/windows_shared_memory.hpp>
31#else
32 #include <boost/interprocess/shared_memory_object.hpp>
33#endif
34#include <boost/interprocess/mapped_region.hpp>
35#include <boost/container/static_vector.hpp>
36#include <boost/uuid/uuid.hpp>
37#include <boost/date_time/posix_time/posix_time.hpp>
38#include <thread>
39#include <boost/thread/thread.hpp>
40
41namespace H5 {
42
43 class File;
44
45 namespace Internal {
46#ifdef _WIN32
47 using SharedMemory = boost::interprocess::windows_shared_memory;
48 inline void SharedMemoryRemove(const char* shmName) {
49 }
50 inline SharedMemory SharedMemoryCreate(const char *shmName, boost::interprocess::mode_t mode,
51 size_t size) {
52 auto shm=SharedMemory(boost::interprocess::create_only, shmName, mode, size);
53 return shm;
54 }
55#else
56 using SharedMemory = boost::interprocess::shared_memory_object;
57 inline void SharedMemoryRemove(const char* shmName) {
58 SharedMemory::remove(shmName);
59 }
60 inline SharedMemory SharedMemoryCreate(const char *shmName, boost::interprocess::mode_t mode,
61 size_t size) {
62 auto shm=SharedMemory(boost::interprocess::create_only, shmName, mode);
63 shm.truncate(size);
64 return shm;
65 }
66#endif
67
68 class ScopedLock;
69
70 // A simple robust condition variable,
71 // since boost::interprocess::interprocess:condition does not provide a robust condition on Linux.
72 // A none robust condition variable is blocking in notify_all or dtor if a process waiting on the condition crashed!
73 // The robustness is simply achived by polling in constant time intervalls instead of putting thread to sleep and wakeup.
74 // This polling delay is no problem in this scope since its only used for user visible things.
75 // So a polling delay of 1/25 second (25 frames per second) is used which equal the human visible reaction time.
76 // The interface equals boost::interprocess::interprocess_condition but not with all member functions.
77 // However, only N threads (which may be from different processes) can be waiting.
78 // This class must be implemented in a address-free way since it is placed usually in shared memory.
79 // The implementation is similar to
80 // https://cseweb.ucsd.edu/classes/sp17/cse120-a/applications/ln/lecture7.html
81 template<int N>
83 friend class H5::File; // to allow File::dumpSharedMemory to access the private members
84 public:
85 void wait(boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> &externLock,
86 const std::function<bool()> &pred);
87 bool wait_for(boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> &externLock,
88 const std::chrono::milliseconds& relTime,
89 const std::function<bool()> &pred);
90 void notify_all();
91 private:
92 boost::container::static_vector<boost::uuids::uuid, N> waiter;
93 boost::interprocess::interprocess_mutex waiterMutex;
94 };
95 }
96
97 class Dataset;
98
99 /* A wrapper around a HDF5 file.
100 * It handles automatically HDF5 SWMR access using a shared memory based inter-process communication:
101 * This ensure that:
102 * - only one writer is active (more writers are blocked until the writer closes the file)
103 * - readers are blocked until no writer is active or the writer is in SWMR mode
104 * - if readers are active and a writer wants to start writing the readers are notified by a reopen request.
105 */
106 class File : public GroupBase {
107 friend class Internal::ScopedLock;
108 public:
111 write,
119 };
132 File(const boost::filesystem::path &filename_, FileAccess type_,
133 const std::function<void()> &closeRequestCallback_={},
134 const std::function<void()> &refreshCallback_={},
135 const std::function<void()> &renameAtomicFunc_={});
137 ~File() override;
140 void enableSWMR() override;
141
142 static int getDefaultCompression() { return defaultCompression; }
143 static void setDefaultCompression(int comp) { defaultCompression=comp; }
144 static int getDefaultChunkSize() { return defaultChunkSize; }
145 static void setDefaultChunkSize(int chunk) { defaultChunkSize=chunk; }
146 static int getDefaultCacheSize() { return defaultCacheSize; }
147 static void setDefaultCacheSize(int cache) { defaultCacheSize=cache; }
148
150 void refresh() override;
153 void requestFlush();
157 void flushIfRequested();
158
161 static void dumpSharedMemory(const boost::filesystem::path &filename);
163 static void removeSharedMemory(const boost::filesystem::path &filename);
164
165 FileAccess getType(bool originalType=false); // returns write for write and writeWithRename and read for read
166
167 private:
168 static int defaultCompression;
169 static int defaultChunkSize;
170 static int defaultCacheSize;
171
172 void close() override;
173
175 boost::filesystem::path filename;
176 boost::filesystem::path getFilename(bool originalFilename=false); // gets the filename dependent on the current preSWMR
177
182 bool preSWMR { false };
183
185 enum class WriterState {
186 none, //<! no writer is currently active
187 writeRequest, //<! a writer wants to write the file but readers still exists
188 active, //<! a writer exists and is currently in dateset/attribute creation mode
189 swmr, //<! a writer exists and is currently in SWMR mode (readers can use the file using SWMR)
190 };
192 constexpr static size_t MAXREADERS { 100 };
194 constexpr static size_t MAXWRITERS { 10 };
196 struct ProcessInfo {
197 boost::uuids::uuid processUUID;
198 boost::posix_time::ptime lastAliveTime;
200 };
205 // the following member is only used for life-time handling of the shared memroy object itself
206 size_t shmUseCount { 0 }; //<! the number users of this shared memory object
207 // the following members are used to synchronize the writer and all readers.
208 boost::interprocess::interprocess_mutex mutex; //<! mutex for synchronization handling.
209 Internal::ConditionVariable<MAXREADERS+MAXWRITERS> cond; //<! a condition variable for signaling state changes.
210 // the following members represent the state of the writer and readers
211 // after setting any of these variables sharedData->cond.notify_all() must be called to notify all waiting process about the change
212 WriterState writerState { WriterState::none }; //<! the current state of the write of this file.
213 size_t activeReaders { 0 }; //<! the number of active readers on this file.
214 // the follwing members are only used for still-alive/crash detection handling
215 boost::container::static_vector<ProcessInfo, MAXREADERS+1> processes; //<! a list of all processes accessing the shared memory
216 // the follwing members are only used for flush/refresh handling
217 bool flushRequest { false }; //<! Is set to true by reader if a flush of the writer should be done. The writer resets to false after a flush.
218 };
219
221 const std::function<void()> closeRequestCallback;
223 const std::function<void()> refreshCallback;
224 const std::function<void()> renameAtomicFunc;
225
227 std::string shmName;
229 const boost::uuids::uuid processUUID;
230
233 Internal::SharedMemory shm;
236 boost::interprocess::mapped_region region;
237
240
242 bool flushRequested { false };
244 WriterState lastWriterState { WriterState::none };
245
247 static std::string createShmName(const boost::filesystem::path &filename);
248
250 void preOpenReader();
252 void openReader();
254 void closeReader();
255 void postCloseReader();
257 void preOpenWriter();
259 void openWriter();
261 void closeWriter();
262 void postCloseWriter();
263
269 void wait(Internal::ScopedLock &lock, const std::chrono::milliseconds& relTime,
270 std::string_view blockingMsg, const std::function<bool()> &pred);
271
272 // still alive pings use boost::thread since interruption point of boost, which are not availabe for std::thread, are used
273 boost::thread stillAlivePingThread; // the thread for still alive pings
274 void stillAlivePing(); // the worker function
275
281 void listenForRequest();
283 bool exitThread { false }; // access is object is guarded by sharedData->mutex (interprocess wide)
284
286 void initProcessInfo();
288 void deinitProcessInfo();
289
291 static void openOrCreateShm(const boost::filesystem::path &filename, File *self,
292 std::string &shmName, Internal::SharedMemory &shm, boost::interprocess::mapped_region &region,
293 SharedMemObject *&sharedData);
294 static void deinitShm(SharedMemObject *sharedData, const boost::filesystem::path &filename, File *self, const std::string &shmName);
295 };
296}
297
298#endif
Definition: interface.h:292
Definition: file.h:106
static constexpr size_t MAXREADERS
the maximal number of readers which can access the file simultanously
Definition: file.h:192
FileAccess
Definition: file.h:109
@ writeWithRename
Definition: file.h:113
@ read
Open file for reading with SWMR reading mode enabled.
Definition: file.h:110
@ write
Definition: file.h:111
const std::function< void()> closeRequestCallback
This callback is called when a writer requested a close of all readers.
Definition: file.h:221
void deinitProcessInfo()
Remove process information of this process from the shared memory.
Definition: file.cc:476
bool exitThread
Flag which is set to true to enforce the thread to exit (on the next condition notify signal)
Definition: file.h:283
void closeReader()
Helper function to close the file as a reader.
Definition: file.cc:656
static constexpr size_t MAXWRITERS
the maximal number of writers. Just need since only a fixed amount of readers+writers can wait for th...
Definition: file.h:194
void openReader()
Helper function to open the file as a reader. preOpenReader must be called before.
Definition: file.cc:551
void listenForRequest()
The worker function for the thread listenForRequestThread.
Definition: file.cc:803
void flushIfRequested()
Definition: file.cc:688
static std::string createShmName(const boost::filesystem::path &filename)
transform filename to a valid boost interprocess name.
Definition: file.cc:909
void preOpenWriter()
Helper function to prepare for openWriter.
Definition: file.cc:415
WriterState lastWriterState
The last wrtierState known by this object.
Definition: file.h:244
void enableSWMR() override
Definition: file.cc:713
void closeWriter()
Helper function to close the file as a writer.
Definition: file.cc:639
std::string shmName
Name of the shared memory.
Definition: file.h:227
static void dumpSharedMemory(const boost::filesystem::path &filename)
Definition: file.cc:853
bool flushRequested
True if this reader has requested a flush.
Definition: file.h:242
static void openOrCreateShm(const boost::filesystem::path &filename, File *self, std::string &shmName, Internal::SharedMemory &shm, boost::interprocess::mapped_region &region, SharedMemObject *&sharedData)
open or create the shared memory atomically (guarded by a global named mutex)
Definition: file.cc:364
WriterState
A writer can be in the following state:
Definition: file.h:185
bool preSWMR
Definition: file.h:182
void wait(Internal::ScopedLock &lock, const std::chrono::milliseconds &relTime, std::string_view blockingMsg, const std::function< bool()> &pred)
Definition: file.cc:782
void refresh() override
Refresh the dataset of a reader.
Definition: file.cc:672
FileAccess type
Flag if this instance is a writer or reader.
Definition: file.h:179
void initProcessInfo()
Write process information of this process to the shared memory.
Definition: file.cc:463
Internal::SharedMemory shm
Definition: file.h:233
static void removeSharedMemory(const boost::filesystem::path &filename)
Internal helper function which removes the shared memory associated with filename,...
Definition: file.cc:914
void requestFlush()
Definition: file.cc:679
~File() override
Closes the HDF5 file.
Definition: file.cc:588
void openWriter()
Helper function to open the file as a writer. preOpenWriter must be called before.
Definition: file.cc:440
File(const boost::filesystem::path &filename_, FileAccess type_, const std::function< void()> &closeRequestCallback_={}, const std::function< void()> &refreshCallback_={}, const std::function< void()> &renameAtomicFunc_={})
Definition: file.cc:321
std::thread listenForRequestThread
Definition: file.h:279
boost::interprocess::mapped_region region
Definition: file.h:236
SharedMemObject * sharedData
Pointer to the shared memory object.
Definition: file.h:239
boost::filesystem::path filename
The name of the file.
Definition: file.h:175
void preOpenReader()
Helper function to prepare for openReader.
Definition: file.cc:532
const std::function< void()> refreshCallback
This callback is called when a writer has flushed.
Definition: file.h:223
Definition: group.h:31
Definition: file.h:82
Definition: file.cc:190
Information about a process accessing the shared memory (a process means here an instance of a File c...
Definition: file.h:196
boost::posix_time::ptime lastAliveTime
the last still alive timestamp of the process
Definition: file.h:198
boost::uuids::uuid processUUID
a globally unique identifier for each process
Definition: file.h:197
FileAccess type
the type of the process read or write
Definition: file.h:199
Definition: file.h:204