hdf5serie  2.0.0
HDF5 Serie
file.h
1/* Copyright (C) 2009 Markus Friedrich
2 *
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
7 *
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 *
17 * Contact:
18 * friedrich.at.gc@googlemail.com
19 *
20 */
21
22#ifndef _HDF5SERIE_FILE_H_
23#define _HDF5SERIE_FILE_H_
24
25#include <hdf5serie/group.h>
26#include <boost/filesystem.hpp>
27#include <boost/interprocess/sync/interprocess_mutex.hpp>
28#include <boost/interprocess/sync/scoped_lock.hpp>
29#ifdef _WIN32
30 #include <boost/interprocess/windows_shared_memory.hpp>
31#else
32 #include <boost/interprocess/shared_memory_object.hpp>
33#endif
34#include <boost/interprocess/mapped_region.hpp>
35#include <boost/container/static_vector.hpp>
36#include <boost/uuid/uuid.hpp>
37#include <boost/date_time/posix_time/posix_time.hpp>
38#include <thread>
39#include <boost/thread/thread.hpp>
40
41namespace H5 {
42
43 namespace Internal {
44#ifdef _WIN32
45 using SharedMemory = boost::interprocess::windows_shared_memory;
46 inline void SharedMemoryRemove(const char* shmName) {
47 }
48 inline SharedMemory SharedMemoryCreate(const char *shmName, boost::interprocess::mode_t mode,
49 size_t size) {
50 auto shm=SharedMemory(boost::interprocess::create_only, shmName, mode, size);
51 return shm;
52 }
53#else
54 using SharedMemory = boost::interprocess::shared_memory_object;
55 inline void SharedMemoryRemove(const char* shmName) {
56 SharedMemory::remove(shmName);
57 }
58 inline SharedMemory SharedMemoryCreate(const char *shmName, boost::interprocess::mode_t mode,
59 size_t size) {
60 auto shm=SharedMemory(boost::interprocess::create_only, shmName, mode);
61 shm.truncate(size);
62 return shm;
63 }
64#endif
65
66 class ScopedLock;
67
68 // A simple robust condition variable.
69 // The robustness is simply achived by polling in constant time intervalls instead of putting thread to sleep and wakeup.
70 // This polling delay is no problem in this scope since its only used for user visible things.
71 // So a polling delay of 1/25 second (25 frames per second) is used which equal the human visible reaction time.
72 // The interface equals boost::interprocess::interprocess_condition but not with all member functions.
73 // However, only N threads (which may be from different processes) can be waiting.
74 // This class must be implemented in a address-free way since it is placed usually in shared memory.
75 // The implementation is similar to
76 // https://cseweb.ucsd.edu/classes/sp17/cse120-a/applications/ln/lecture7.html
77 template<int N>
79 public:
80 void wait(boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> &externLock,
81 const std::function<bool()> &pred);
82 void notify_all();
83 private:
84 boost::container::static_vector<boost::uuids::uuid, N> waiter;
85 boost::interprocess::interprocess_mutex waiterMutex;
86 };
87 }
88
89 class Dataset;
90
91 /* A wrapper around a HDF5 file.
92 * It handles automatically HDF5 SWMR access using a shared memory based inter-process communication:
93 * This ensure that:
94 * - only one writer is active (more writers are blocked until the writer closes the file)
95 * - readers are blocked until no writer is active or the writer is in SWMR mode
96 * - if readers are active and a writer wants to start writing the readers are notified by a reopen request.
97 */
98 class File : public GroupBase {
99 friend class Internal::ScopedLock;
100 public:
104 };
114 File(const boost::filesystem::path &filename_, FileAccess type_,
115 const std::function<void()> &closeRequestCallback_=std::function<void()>(),
116 const std::function<void()> &refreshCallback_=std::function<void()>());
118 ~File() override;
121 void enableSWMR() override;
122
123 static int getDefaultCompression() { return defaultCompression; }
124 static void setDefaultCompression(int comp) { defaultCompression=comp; }
125 static int getDefaultChunkSize() { return defaultChunkSize; }
126 static void setDefaultChunkSize(int chunk) { defaultChunkSize=chunk; }
127 static int getDefaultCacheSize() { return defaultCacheSize; }
128 static void setDefaultCacheSize(int cache) { defaultCacheSize=cache; }
129
131 void refresh() override;
134 void requestFlush();
138 void flushIfRequested();
139
142 static void dumpSharedMemory(const boost::filesystem::path &filename);
144 static void removeSharedMemory(const boost::filesystem::path &filename);
145
146 private:
147 static int defaultCompression;
148 static int defaultChunkSize;
149 static int defaultCacheSize;
150
151 void close() override;
152
154 const boost::filesystem::path filename;
158 const std::function<void()> closeRequestCallback;
160 const std::function<void()> refreshCallback;
161
163 std::string shmName;
165 const boost::uuids::uuid processUUID;
166
168 enum class WriterState {
169 none, //<! no writer is currently active
170 writeRequest, //<! a writer wants to write the file but readers still exists
171 active, //<! a writer exists and is currently in dateset/attribute creation mode
172 swmr, //<! a writer exists and is currently in SWMR mode (readers can use the file using SWMR)
173 };
175 constexpr static size_t MAXREADERS { 100 };
177 constexpr static size_t MAXWRITERS { 10 };
179 struct ProcessInfo {
180 boost::uuids::uuid processUUID;
181 boost::posix_time::ptime lastAliveTime;
183 };
188 // the following member is only used for life-time handling of the shared memroy object itself
189 size_t shmUseCount { 0 }; //<! the number users of this shared memory object
190 // the following members are used to synchronize the writer and all readers.
191 boost::interprocess::interprocess_mutex mutex; //<! mutex for synchronization handling.
192 Internal::ConditionVariable<MAXREADERS+MAXWRITERS> cond; //<! a condition variable for signaling state changes.
193 // the following members represent the state of the writer and readers
194 // after setting any of these variables sharedData->cond.notify_all() must be called to notify all waiting process about the change
195 WriterState writerState { WriterState::none }; //<! the current state of the write of this file.
196 size_t activeReaders { 0 }; //<! the number of active readers on this file.
197 // the follwing members are only used for still-alive/crash detection handling
198 boost::container::static_vector<ProcessInfo, MAXREADERS+1> processes; //<! a list of all processes accessing the shared memory
199 // the follwing members are only used for flush/refresh handling
200 bool flushRequest { false }; //<! Is set to true by reader if a flush of the writer should be done. The writer resets to false after a flush.
201 };
202
205 Internal::SharedMemory shm;
208 boost::interprocess::mapped_region region;
209
212
214 bool flushRequested { false };
216 WriterState lastWriterState { WriterState::none };
217
219 static std::string createShmName(const boost::filesystem::path &filename);
220
222 void openReader();
224 void closeReader();
226 void openWriter();
228 void closeWriter();
229
234 void wait(Internal::ScopedLock &lock,
235 std::string_view blockingMsg, const std::function<bool()> &pred);
236
237 // still alive pings use boost::thread since interruption point of boost, which are not availabe for std::thread, are used
238 boost::thread stillAlivePingThread; // the thread for still alive pings
239 void stillAlivePing(); // the worker function
240
246 void listenForRequest();
248 bool exitThread { false }; // access is object is guarded by sharedData->mutex (interprocess wide)
249
251 void initProcessInfo();
253 void deinitProcessInfo();
254
256 void openOrCreateShm();
257 };
258}
259
260#endif
Definition: interface.h:284
Definition: file.h:98
static constexpr size_t MAXREADERS
the maximal number of readers which can access the file simultanously
Definition: file.h:175
FileAccess
Definition: file.h:101
@ read
open file for reading
Definition: file.h:102
@ write
open file for writing
Definition: file.h:103
const FileAccess type
Flag if this instance is a writer or reader.
Definition: file.h:156
const std::function< void()> closeRequestCallback
This callback is called when a writer requested a close of all readers.
Definition: file.h:158
void deinitProcessInfo()
Remove process information of this process from the shared memory.
Definition: file.cc:300
bool exitThread
Flag which is set to true to enforce the thread to exit (on the next condition notify signal)
Definition: file.h:248
void closeReader()
Helper function to close the file as a reader.
Definition: file.cc:450
static constexpr size_t MAXWRITERS
the maximal number of writers. Just need since only a fixed amount of readers+writers can wait for th...
Definition: file.h:177
void openReader()
Helper function to open the file as a reader.
Definition: file.cc:353
void listenForRequest()
The worker function for the thread listenForRequestThread.
Definition: file.cc:548
File(const boost::filesystem::path &filename_, FileAccess type_, const std::function< void()> &closeRequestCallback_=std::function< void()>(), const std::function< void()> &refreshCallback_=std::function< void()>())
Definition: file.cc:160
void flushIfRequested()
Definition: file.cc:490
static std::string createShmName(const boost::filesystem::path &filename)
transform filename to a valid boost interprocess name.
Definition: file.cc:649
void openOrCreateShm()
open or create the shared memory atomically (process with using file lock)
Definition: file.cc:185
WriterState lastWriterState
The last wrtierState known by this object.
Definition: file.h:216
void enableSWMR() override
Definition: file.cc:515
void closeWriter()
Helper function to close the file as a writer.
Definition: file.cc:434
std::string shmName
Name of the shared memory.
Definition: file.h:163
static void dumpSharedMemory(const boost::filesystem::path &filename)
Definition: file.cc:598
bool flushRequested
True if this reader has requested a flush.
Definition: file.h:214
WriterState
A writer can be in the following state:
Definition: file.h:168
void refresh() override
Refresh the dataset of a reader.
Definition: file.cc:472
const boost::filesystem::path filename
The name of the file.
Definition: file.h:154
void initProcessInfo()
Write process information of this process to the shared memory.
Definition: file.cc:288
Internal::SharedMemory shm
Definition: file.h:205
static void removeSharedMemory(const boost::filesystem::path &filename)
Internal helper function which removes the shared memory associated with filename,...
Definition: file.cc:663
void requestFlush()
Definition: file.cc:479
~File() override
Closes the HDF5 file.
Definition: file.cc:392
void openWriter()
Helper function to open the file as a writer.
Definition: file.cc:240
std::thread listenForRequestThread
Definition: file.h:244
void wait(Internal::ScopedLock &lock, std::string_view blockingMsg, const std::function< bool()> &pred)
Definition: file.cc:535
boost::interprocess::mapped_region region
Definition: file.h:208
SharedMemObject * sharedData
Pointer to the shared memory object.
Definition: file.h:211
const std::function< void()> refreshCallback
This callback is called when a writer has flushed.
Definition: file.h:160
Definition: group.h:31
Definition: file.h:78
Definition: file.cc:59
Information about a process accessing the shared memory (a process means here an instance of a File c...
Definition: file.h:179
boost::posix_time::ptime lastAliveTime
the last still alive timestamp of the process
Definition: file.h:181
boost::uuids::uuid processUUID
a globally unique identifier for each process
Definition: file.h:180
FileAccess type
the type of the process read or write
Definition: file.h:182
Definition: file.h:187