casacore
Loading...
Searching...
No Matches
BufferedColumnarFile.h
Go to the documentation of this file.
1#ifndef CASACORE_BUFFERED_COLUMNAR_FILE_H_
2#define CASACORE_BUFFERED_COLUMNAR_FILE_H_
3
4#include <cassert>
5#include <complex>
6#include <cstdint>
7#include <string>
8#include <vector>
9
10#include "BitPacking.h"
11#include "RowBasedFile.h"
12
13namespace casacore {
14
32template <uint64_t BufferSize = 100 * 1024>
34 public:
43
44 VarBufferedColumnarFile() noexcept = default;
45
48 : packed_buffer_(std::move(rhs.packed_buffer_)),
52 block_buffer_(std::move(rhs.block_buffer_)) {
53 rhs.block_changed_ = false;
54 rhs.active_block_ = 0;
55 rhs.rows_per_block_ = 0;
56 }
57
59 if (IsOpen()) {
60 if (block_changed_) {
61 const uint64_t start_row = active_block_ * rows_per_block_;
62 const size_t n_rows_to_write =
63 std::min(rows_per_block_, std::max(NRows(), start_row) - start_row);
64 Seek(start_row * Stride() + DataLocation(), SEEK_SET);
65 WriteData(block_buffer_.data(), n_rows_to_write * Stride());
66 block_changed_ = false;
67 }
68 }
69 }
70
72 Close();
73 RowBasedFile::operator=(std::move(rhs));
74 std::swap(packed_buffer_, rhs.packed_buffer_);
75 std::swap(block_changed_, rhs.block_changed_);
76 std::swap(active_block_, rhs.active_block_);
77 std::swap(rows_per_block_, rhs.rows_per_block_);
78 std::swap(block_buffer_, rhs.block_buffer_);
79 return *this;
80 }
81
87 void Close() {
88 if (IsOpen()) {
89 if (block_changed_) {
90 try {
92 } catch (...) {
93 // Even if writing fails, the file should still be closed to
94 // prevent the file to remain open after destruction.
96 throw;
97 }
98 }
100 }
101 }
102
108 static VarBufferedColumnarFile CreateNew(const std::string& filename,
109 uint64_t header_size,
110 uint64_t stride) {
111 return VarBufferedColumnarFile(filename, header_size, stride);
112 }
113
120 static VarBufferedColumnarFile OpenExisting(const std::string& filename,
121 size_t header_size) {
122 return VarBufferedColumnarFile(filename, header_size);
123 }
124
134 void Read(uint64_t row, uint64_t column_offset, float* data, uint64_t n) {
135 ReadImplementation(row, column_offset, data, n);
136 }
137
141 void Read(uint64_t row, uint64_t column_offset, double* data, uint64_t n) {
142 ReadImplementation(row, column_offset, data, n);
143 }
144
148 void Read(uint64_t row, uint64_t column_offset, int32_t* data, uint64_t n) {
149 ReadImplementation(row, column_offset, data, n);
150 }
151
155 void Read(uint64_t row, uint64_t column_offset, std::complex<float>* data,
156 uint64_t n) {
157 ReadImplementation(row, column_offset, data, n);
158 }
159
164 void Read(uint64_t row, uint64_t column_offset, bool* data, uint64_t n) {
165 const size_t byte_size = (n + 7) / 8;
166 assert(column_offset + byte_size <= Stride());
167 ActivateBlock(row);
168 if (row >= NRows()) {
169 std::fill_n(data, n, false);
170 } else {
171 Seek(row * Stride() + column_offset + DataLocation(), SEEK_SET);
172 ReadData(packed_buffer_.data(), byte_size);
173 UnpackBoolArray(data, packed_buffer_.data(), n);
174 }
175 }
180 void Write(uint64_t row, uint64_t column_offset, const float* data,
181 uint64_t n) {
182 WriteImplementation(row, column_offset, data, n);
183 }
184
188 void Write(uint64_t row, uint64_t column_offset, const double* data,
189 uint64_t n) {
190 WriteImplementation(row, column_offset, data, n);
191 }
192
196 void Write(uint64_t row, uint64_t column_offset, const int32_t* data,
197 uint64_t n) {
198 WriteImplementation(row, column_offset, data, n);
199 }
200
204 void Write(uint64_t row, uint64_t column_offset,
205 const std::complex<float>* data, uint64_t n) {
206 WriteImplementation(row, column_offset, data, n);
207 }
208
212 void Write(uint64_t row, uint64_t column_offset,
213 const std::complex<double>* data, uint64_t n) {
214 WriteImplementation(row, column_offset, data, n);
215 }
216
221 void Write(uint64_t row, uint64_t column_offset, const bool* data,
222 uint64_t n) {
223 const size_t byte_size = (n + 7) / 8;
224 assert(column_offset + byte_size <= Stride());
225 ActivateBlock(row);
226 PackBoolArray(packed_buffer_.data(), data, n);
227 Seek(row * Stride() + column_offset + DataLocation(), SEEK_SET);
228 WriteData(packed_buffer_.data(), byte_size);
229 SetNRows(std::max(row + 1, NRows()));
230 }
231
236 void SetStride(uint64_t new_stride) {
237 RowBasedFile::SetStride(new_stride);
238 packed_buffer_.resize(new_stride);
239 active_block_ = std::numeric_limits<uint64_t>::max();
241 new_stride == 0 ? 0 : std::max<size_t>(1, BufferSize / new_stride);
242 block_buffer_.resize(rows_per_block_ * new_stride);
243 block_changed_ = false;
244 }
245
246 private:
247 // Create or overwrite a new columnar file on disk
248 VarBufferedColumnarFile(const std::string& filename, uint64_t header_size,
249 uint64_t stride)
250 : RowBasedFile(filename, header_size, stride),
251 packed_buffer_(stride),
252 rows_per_block_(stride == 0 ? 0
253 : std::max<size_t>(1, BufferSize / stride)),
254 block_buffer_(rows_per_block_ * stride) {}
255
256 // Open an existing columnar file
257 VarBufferedColumnarFile(const std::string& filename, size_t header_size)
258 : RowBasedFile(filename, header_size) {
259 if (Stride() != 0) {
260 packed_buffer_.resize(Stride());
261 rows_per_block_ = std::max<size_t>(1, BufferSize / Stride());
263 }
264 active_block_ = std::numeric_limits<uint64_t>::max();
265 }
266
267 void ActivateBlock(uint64_t row) {
268 const uint64_t block = row / rows_per_block_;
269 if (active_block_ != block) {
270 if (block_changed_) {
272 }
273
274 const uint64_t start_row = block * rows_per_block_;
275 const size_t n_rows_to_read =
276 std::min(rows_per_block_, std::max(NRows(), start_row) - start_row);
277 Seek(start_row * Stride() + DataLocation(), SEEK_SET);
278 ReadData(block_buffer_.data(), n_rows_to_read * Stride());
279 // Fill the remainder of block_buffer_ with zeroes. Doing it here makes
280 // the code robust and avoids the need for inserting zeroes when adding
281 // rows out-of-order, e.g., when adding row 5 while NRows() is 2."
282 std::fill(block_buffer_.begin() + n_rows_to_read * Stride(),
283 block_buffer_.end(), 0);
284
285 active_block_ = block;
286 }
287 }
288
289 template <typename ValueType>
290 void ReadImplementation(uint64_t row, uint64_t column_offset, ValueType* data,
291 uint64_t n) {
292 assert(column_offset + n * sizeof(ValueType) <= Stride());
293 if (row >= NRows()) {
294 std::fill_n(data, n, ValueType());
295 } else {
296 ActivateBlock(row);
297 const uint64_t block_row = active_block_ * rows_per_block_;
298 const unsigned char* position =
299 block_buffer_.data() + (row - block_row) * Stride() + column_offset;
300 std::copy_n(position, n * sizeof(ValueType),
301 reinterpret_cast<unsigned char*>(data));
302 }
303 }
304
305 template <typename ValueType>
306 void WriteImplementation(uint64_t row, uint64_t column_offset,
307 const ValueType* data, uint64_t n) {
308 assert(column_offset + n * sizeof(ValueType) <= Stride());
309 ActivateBlock(row);
310 const uint64_t block_row = active_block_ * rows_per_block_;
311 unsigned char* position =
312 block_buffer_.data() + (row - block_row) * Stride() + column_offset;
313 std::copy_n(reinterpret_cast<const unsigned char*>(data),
314 n * sizeof(ValueType), position);
315 SetNRows(std::max(row + 1, NRows()));
316 block_changed_ = true;
317 }
318
320 const uint64_t start_row = active_block_ * rows_per_block_;
321 const size_t n_rows_to_write =
322 std::min(rows_per_block_, std::max(NRows(), start_row) - start_row);
323 Seek(start_row * Stride() + DataLocation(), SEEK_SET);
324 WriteData(block_buffer_.data(), n_rows_to_write * Stride());
325 block_changed_ = false;
326 }
327
332 std::vector<unsigned char> packed_buffer_;
333
334 bool block_changed_ = false;
335 uint64_t active_block_ = 0;
336 uint64_t rows_per_block_ = 0;
337 std::vector<unsigned char> block_buffer_;
338};
339
341
342} // namespace casacore
343
344#endif
void UnpackBoolArray(bool *output, const unsigned char *packed_input, size_t n)
Definition BitPacking.h:29
void PackBoolArray(unsigned char *packed_buffer, const bool *input, size_t n)
Definition BitPacking.h:6
void WriteData(const unsigned char *data, uint64_t size)
void Seek(off_t pos, int seek_direction)
uint64_t DataLocation() const
Offset of the first row in the file.
void SetStride(uint64_t new_stride)
Set the number of bytes per row for this file.
void ReadHeader(unsigned char *data)
Read an optional extra header to the file.
const std::string & Filename() const
RowBasedFile & operator=(RowBasedFile &&rhs)
void WriteHeader(const unsigned char *data)
Write an optional extra header to the file.
void AddRows(uint64_t n_rows)
Adds a given number of rows to the back of the file.
uint64_t NRows() const
Total number of rows stored in this file.
void ReadData(unsigned char *data, uint64_t size)
void DeleteRow()
Deletes the last row.
uint64_t Stride() const
Total number of bytes in one row.
void SetNRows(uint64_t new_n_rows)
void Close()
Close the file.
Class that provides binary table I/O.
static VarBufferedColumnarFile CreateNew(const std::string &filename, uint64_t header_size, uint64_t stride)
Create a new file on disk.
static VarBufferedColumnarFile OpenExisting(const std::string &filename, size_t header_size)
Open an existing file from disk.
void Write(uint64_t row, uint64_t column_offset, const std::complex< double > *data, uint64_t n)
Write an array of complex doubles.
void Write(uint64_t row, uint64_t column_offset, const bool *data, uint64_t n)
Write an array of bools.
void ReadImplementation(uint64_t row, uint64_t column_offset, ValueType *data, uint64_t n)
void Write(uint64_t row, uint64_t column_offset, const double *data, uint64_t n)
Write an array of doubles.
void Read(uint64_t row, uint64_t column_offset, int32_t *data, uint64_t n)
Read array of int32_t.
void Read(uint64_t row, uint64_t column_offset, std::complex< float > *data, uint64_t n)
Read array of complex floats.
void SetStride(uint64_t new_stride)
Set the number of bytes per row for this file.
VarBufferedColumnarFile() noexcept=default
VarBufferedColumnarFile(const std::string &filename, size_t header_size)
Open an existing columnar file.
void Read(uint64_t row, uint64_t column_offset, double *data, uint64_t n)
Read array of doubles.
void WriteImplementation(uint64_t row, uint64_t column_offset, const ValueType *data, uint64_t n)
VarBufferedColumnarFile(const std::string &filename, uint64_t header_size, uint64_t stride)
Create or overwrite a new columnar file on disk.
void Write(uint64_t row, uint64_t column_offset, const int32_t *data, uint64_t n)
Write an array of int32_t.
std::vector< unsigned char > block_buffer_
VarBufferedColumnarFile & operator=(VarBufferedColumnarFile &&rhs)
void Write(uint64_t row, uint64_t column_offset, const std::complex< float > *data, uint64_t n)
Write an array of complex floats.
void Read(uint64_t row, uint64_t column_offset, bool *data, uint64_t n)
Read an array of bools.
uint64_t NRows() const
Total number of rows stored in this file.
void Read(uint64_t row, uint64_t column_offset, float *data, uint64_t n)
Read one cell containing an array of floats.
void Write(uint64_t row, uint64_t column_offset, const float *data, uint64_t n)
Write one cell containing an array of floats.
std::vector< unsigned char > packed_buffer_
This buffer is used temporarily for (un)packing booleans.
uint64_t Stride() const
Total number of bytes in one row.
this file contains all the compiler specific defines
Definition mainpage.dox:28
LatticeExprNode max(const LatticeExprNode &left, const LatticeExprNode &right)
Define real & complex conjugation for non-complex types and put comparisons into std namespace.
Definition Complex.h:350