casacore
Loading...
Searching...
No Matches
threadeddyscocolumn.h
Go to the documentation of this file.
1#ifndef DYSCO_THREADED_DYSCO_COLUMN_H
2#define DYSCO_THREADED_DYSCO_COLUMN_H
3
4#include <casacore/tables/DataMan/DataManError.h>
5
6#include <casacore/casa/Arrays/IPosition.h>
7#include <casacore/tables/Tables/ScalarColumn.h>
8
9#include <condition_variable>
10#include <cstdint>
11#include <map>
12#include <memory>
13#include <mutex>
14#include <random>
15
16#include "dyscostmancol.h"
17#include "serializable.h"
18#include "stochasticencoder.h"
19#include "threadgroup.h"
20#include "timeblockbuffer.h"
21
22namespace dyscostman {
23
24class DyscoStMan;
25
31template <typename DataType>
33 public:
34 typedef DataType data_t;
35
41
43
44 void operator=(const ThreadedDyscoColumn &source) = delete;
45
48
50 virtual void setShapeColumn(const casacore::IPosition &shape) override;
51
54 virtual casacore::IPosition shape(casacore::rownr_t /*rownr*/) override {
55 return _shape;
56 }
57
64 virtual void getArrayV(
66 casacore::ArrayBase &dataPtr) override {
67 return DyscoStManColumn::getArrayV(rowNr, dataPtr);
68 }
69
77 virtual void putArrayV(
79 const casacore::ArrayBase &dataPtr) override {
80 return DyscoStManColumn::putArrayV(rowNr, dataPtr);
81 }
82
83 virtual void Prepare(DyscoDistribution distribution,
84 Normalization normalization, double studentsTNu,
85 double distributionTruncation) override;
86
90 virtual void InitializeAfterNRowsPerBlockIsKnown() override;
91
96 void SetBitsPerSymbol(unsigned bitsPerSymbol) {
97 _bitsPerSymbol = bitsPerSymbol;
98 }
99
100 virtual size_t CalculateBlockSize(size_t nRowsInBlock,
101 size_t nAntennae) const final override;
102
103 virtual size_t ExtraHeaderSize() const override { return Header::Size(); }
104
105 virtual void SerializeExtraHeader(std::ostream &stream) const final override;
106
107 virtual void UnserializeExtraHeader(std::istream &stream) final override;
108
109 protected:
111 public:
112 virtual ~ThreadDataBase(){};
113 };
114
116
118 const float *metaBuffer, size_t nRow,
119 size_t nAntennae) = 0;
120
121 virtual void decode(TimeBlockBuffer<data_t> *buffer, const symbol_t *data,
122 size_t blockRow, size_t a1, size_t a2) = 0;
123
124 virtual std::unique_ptr<ThreadDataBase> initializeEncodeThread() = 0;
125
126 virtual void encode(ThreadDataBase *threadData,
127 TimeBlockBuffer<data_t> *buffer, float *metaBuffer,
128 symbol_t *symbolBuffer, size_t nAntennae) = 0;
129
130 virtual size_t metaDataFloatCount(size_t nRow, size_t nPolarizations,
131 size_t nChannels,
132 size_t nAntennae) const = 0;
133
134 virtual size_t symbolCount(size_t nRowsInBlock, size_t nPolarizations,
135 size_t nChannels) const = 0;
136
137 virtual void shutdown() override final;
138
139 virtual size_t defaultThreadCount() const;
140
141 size_t getBitsPerSymbol() const { return _bitsPerSymbol; }
142
143 const casacore::IPosition &shape() const { return _shape; }
144
145 private:
146 struct CacheItem {
147 CacheItem(std::unique_ptr<TimeBlockBuffer<data_t>> &&encoder_)
148 : encoder(std::move(encoder_)), isBeingWritten(false) {}
149
150 std::unique_ptr<TimeBlockBuffer<data_t>> encoder;
152 };
153
158 struct Header : public Serializable {
159 uint32_t blockSize;
160 uint32_t antennaCount;
161
162 static uint32_t Size() { return 8; }
163
164 virtual void Serialize(std::ostream &stream) const override {
167 }
168
169 virtual void Unserialize(std::istream &stream) override {
172 }
173 };
174
175 typedef std::map<size_t, CacheItem *> cache_t;
176
179
181 void encodeAndWrite(size_t blockIndex, const CacheItem &item,
182 unsigned char *packedSymbolBuffer,
183 unsigned int *unpackedSymbolBuffer,
184 ThreadDataBase *threadUserData);
185 bool isWriteItemAvailable(typename cache_t::iterator &i);
186 void loadBlock(size_t blockIndex);
188 size_t maxCacheSize() const {
189 return ThreadedDyscoColumn::defaultThreadCount() * 12 / 10 + 1;
190 }
191
194 std::unique_ptr<casacore::ScalarColumn<int>> _ant1Col, _ant2Col, _fieldCol,
196 std::unique_ptr<casacore::ScalarColumn<double>> _timeCol;
203 std::mutex _mutex;
205 std::condition_variable _cacheChangedCondition;
210
211 std::unique_ptr<TimeBlockBuffer<data_t>> _timeBlockBuffer;
212};
213
214template <>
216 casacore::rownr_t rowNr, casacore::ArrayBase &dataPtr) {
217 getValues(rowNr, static_cast<casacore::Array<std::complex<float>>*>(&dataPtr));
218}
219template <>
221 casacore::rownr_t rowNr, const casacore::ArrayBase &dataPtr) {
222 putValues(rowNr, static_cast<const casacore::Array<std::complex<float>>*>(&dataPtr));
223}
224template <>
226 casacore::rownr_t rowNr, casacore::ArrayBase &dataPtr) {
227 getValues(rowNr, static_cast<casacore::Array<float>*>(&dataPtr));
228}
229template <>
231 casacore::rownr_t rowNr, const casacore::ArrayBase &dataPtr) {
232 putValues(rowNr, static_cast<const casacore::Array<float>*>(&dataPtr));
233}
234
235extern template class ThreadedDyscoColumn<std::complex<float>>;
236extern template class ThreadedDyscoColumn<float>;
237
238} // namespace dyscostman
239
240#endif
static uint32_t UnserializeUInt32(std::istream &stream)
static void SerializeToUInt32(std::ostream &stream, T value)
A container similar to std::vector, but one that allows construction without initializing its element...
Definition uvector.h:74
Non-templated base class for templated Array class.
Definition ArrayBase.h:71
virtual void putArrayV(rownr_t rownr, const ArrayBase &data)
Put the array value into the given row.
virtual void getArrayV(rownr_t rownr, ArrayBase &dataPtr)
Get the array value in the given row.
Base class for columns of the DyscoStMan.
The main class for the Dysco storage manager.
Definition dyscostman.h:46
A column for storing compressed values in a threaded way, tailored for the data and weight columns th...
virtual void InitializeAfterNRowsPerBlockIsKnown() override
Prepare this column for reading/writing.
TimeBlockBuffer< data_t >::symbol_t symbol_t
std::unique_ptr< TimeBlockBuffer< data_t > > _timeBlockBuffer
std::map< size_t, CacheItem * > cache_t
virtual ~ThreadedDyscoColumn()
Destructor.
std::condition_variable _cacheChangedCondition
virtual void encode(ThreadDataBase *threadData, TimeBlockBuffer< data_t > *buffer, float *metaBuffer, symbol_t *symbolBuffer, size_t nAntennae)=0
virtual void setShapeColumn(const casacore::IPosition &shape) override
Set the dimensions of values in this column.
bool isWriteItemAvailable(typename cache_t::iterator &i)
void getValues(casacore::rownr_t rowNr, casacore::Array< data_t > *dataPtr)
std::unique_ptr< casacore::ScalarColumn< int > > _fieldCol
const casacore::IPosition & shape() const
std::unique_ptr< casacore::ScalarColumn< int > > _dataDescIdCol
virtual std::unique_ptr< ThreadDataBase > initializeEncodeThread()=0
ThreadedDyscoColumn(DyscoStMan *parent, int dtype)
Create a new column.
virtual void Prepare(DyscoDistribution distribution, Normalization normalization, double studentsTNu, double distributionTruncation) override
virtual size_t CalculateBlockSize(size_t nRowsInBlock, size_t nAntennae) const final override
virtual size_t symbolCount(size_t nRowsInBlock, size_t nPolarizations, size_t nChannels) const =0
ThreadedDyscoColumn(const ThreadedDyscoColumn &source)=delete
void SetBitsPerSymbol(unsigned bitsPerSymbol)
Set the bits per symbol.
virtual void decode(TimeBlockBuffer< data_t > *buffer, const symbol_t *data, size_t blockRow, size_t a1, size_t a2)=0
virtual void UnserializeExtraHeader(std::istream &stream) final override
ao::uvector< unsigned char > _packedBlockReadBuffer
void operator=(const ThreadedDyscoColumn &source)=delete
void putValues(casacore::rownr_t rowNr, const casacore::Array< data_t > *dataPtr)
virtual void initializeDecode(TimeBlockBuffer< data_t > *buffer, const float *metaBuffer, size_t nRow, size_t nAntennae)=0
std::unique_ptr< casacore::ScalarColumn< int > > _ant2Col
virtual size_t defaultThreadCount() const
std::unique_ptr< casacore::ScalarColumn< int > > _ant1Col
virtual casacore::IPosition shape(casacore::rownr_t) override
Get the dimensions of the values in a particular row.
virtual void shutdown() override final
To be called before destructing the class.
virtual size_t ExtraHeaderSize() const override
Get number of bytes needed for column header of this column.
void loadBlock(size_t blockIndex)
virtual size_t metaDataFloatCount(size_t nRow, size_t nPolarizations, size_t nChannels, size_t nAntennae) const =0
virtual void putArrayV(casacore::rownr_t rowNr, const casacore::ArrayBase &dataPtr) override
Write values into a particular row.
std::unique_ptr< casacore::ScalarColumn< double > > _timeCol
ao::uvector< unsigned int > _unpackedSymbolReadBuffer
virtual void getArrayV(casacore::rownr_t rowNr, casacore::ArrayBase &dataPtr) override
Read the values for a particular row.
void encodeAndWrite(size_t blockIndex, const CacheItem &item, unsigned char *packedSymbolBuffer, unsigned int *unpackedSymbolBuffer, ThreadDataBase *threadUserData)
virtual void SerializeExtraHeader(std::ostream &stream) const final override
Group of threads.
Definition threadgroup.h:13
uInt64 rownr_t
Define the type of a row number in a table.
Definition aipsxtype.h:44
Define real & complex conjugation for non-complex types and put comparisons into std namespace.
Definition Complex.h:350
std::unique_ptr< TimeBlockBuffer< data_t > > encoder
CacheItem(std::unique_ptr< TimeBlockBuffer< data_t > > &&encoder_)
virtual void Serialize(std::ostream &stream) const override
virtual void Unserialize(std::istream &stream) override