casacore
threadeddyscocolumn.h
Go to the documentation of this file.
1 #ifndef DYSCO_THREADED_DYSCO_COLUMN_H
2 #define DYSCO_THREADED_DYSCO_COLUMN_H
3 
4 #include <casacore/tables/DataMan/DataManError.h>
5 
6 #include <casacore/casa/Arrays/IPosition.h>
7 #include <casacore/tables/Tables/ScalarColumn.h>
8 
9 #include <condition_variable>
10 #include <cstdint>
11 #include <map>
12 #include <memory>
13 #include <mutex>
14 #include <random>
15 
16 #include "dyscostmancol.h"
17 #include "serializable.h"
18 #include "stochasticencoder.h"
19 #include "threadgroup.h"
20 #include "timeblockbuffer.h"
21 
22 namespace dyscostman {
23 
24 class DyscoStMan;
25 
31 template <typename DataType>
33  public:
34  typedef DataType data_t;
35 
41 
42  ThreadedDyscoColumn(const ThreadedDyscoColumn &source) = delete;
43 
44  void operator=(const ThreadedDyscoColumn &source) = delete;
47 
49  virtual void setShapeColumn(const casacore::IPosition &shape) override;
50 
53  virtual casacore::IPosition shape(casacore::rownr_t /*rownr*/) override {
54  return _shape;
55  }
56 
63  virtual void getArrayV(
64  casacore::rownr_t rowNr,
65  casacore::ArrayBase &dataPtr) override {
66  return DyscoStManColumn::getArrayV(rowNr, dataPtr);
67  }
68 
76  virtual void putArrayV(
77  casacore::rownr_t rowNr,
78  const casacore::ArrayBase &dataPtr) override {
79  return DyscoStManColumn::putArrayV(rowNr, dataPtr);
80  }
81 
82  virtual void Prepare(DyscoDistribution distribution,
83  Normalization normalization, double studentsTNu,
84  double distributionTruncation) override;
85 
89  virtual void InitializeAfterNRowsPerBlockIsKnown() override;
90 
95  void SetBitsPerSymbol(unsigned bitsPerSymbol) {
96  _bitsPerSymbol = bitsPerSymbol;
97  }
98 
99  virtual size_t CalculateBlockSize(size_t nRowsInBlock,
100  size_t nAntennae) const final override;
101 
102  virtual size_t ExtraHeaderSize() const override { return Header::Size(); }
103 
104  virtual void SerializeExtraHeader(std::ostream &stream) const final override;
105 
106  virtual void UnserializeExtraHeader(std::istream &stream) final override;
107 
108  protected:
110  public:
111  virtual ~ThreadDataBase(){};
112  };
113 
115 
117  const float *metaBuffer, size_t nRow,
118  size_t nAntennae) = 0;
119 
120  virtual void decode(TimeBlockBuffer<data_t> *buffer, const symbol_t *data,
121  size_t blockRow, size_t a1, size_t a2) = 0;
122 
123  virtual std::unique_ptr<ThreadDataBase> initializeEncodeThread() = 0;
124 
125  virtual void encode(ThreadDataBase *threadData,
126  TimeBlockBuffer<data_t> *buffer, float *metaBuffer,
127  symbol_t *symbolBuffer, size_t nAntennae) = 0;
128 
129  virtual size_t metaDataFloatCount(size_t nRow, size_t nPolarizations,
130  size_t nChannels,
131  size_t nAntennae) const = 0;
132 
133  virtual size_t symbolCount(size_t nRowsInBlock, size_t nPolarizations,
134  size_t nChannels) const = 0;
135 
136  virtual void shutdown() override final;
137 
138  virtual size_t defaultThreadCount() const;
139 
140  size_t getBitsPerSymbol() const { return _bitsPerSymbol; }
141 
142  const casacore::IPosition &shape() const { return _shape; }
143 
144  private:
145  struct CacheItem {
146  CacheItem(std::unique_ptr<TimeBlockBuffer<data_t>> &&encoder_)
147  : encoder(std::move(encoder_)), isBeingWritten(false) {}
148 
149  std::unique_ptr<TimeBlockBuffer<data_t>> encoder;
151  };
152 
154  void operator()();
156  };
157  struct Header : public Serializable {
158  uint32_t blockSize;
159  uint32_t antennaCount;
160 
161  static uint32_t Size() { return 8; }
162 
163  virtual void Serialize(std::ostream &stream) const override {
164  SerializeToUInt32(stream, blockSize);
166  }
167 
168  virtual void Unserialize(std::istream &stream) override {
169  blockSize = UnserializeUInt32(stream);
171  }
172  };
173 
174  typedef std::map<size_t, CacheItem *> cache_t;
175 
178 
179  void stopThreads();
180  void encodeAndWrite(size_t blockIndex, const CacheItem &item,
181  unsigned char *packedSymbolBuffer,
182  unsigned int *unpackedSymbolBuffer,
183  ThreadDataBase *threadUserData);
184  bool isWriteItemAvailable(typename cache_t::iterator &i);
185  void loadBlock(size_t blockIndex);
186  void storeBlock();
187  size_t maxCacheSize() const {
188  return ThreadedDyscoColumn::defaultThreadCount() * 12 / 10 + 1;
189  }
190 
191  unsigned _bitsPerSymbol;
193  std::unique_ptr<casacore::ScalarColumn<int>> _ant1Col, _ant2Col, _fieldCol,
195  std::unique_ptr<casacore::ScalarColumn<double>> _timeCol;
202  std::mutex _mutex;
204  std::condition_variable _cacheChangedCondition;
207  size_t _blockSize;
209 
210  std::unique_ptr<TimeBlockBuffer<data_t>> _timeBlockBuffer;
211 };
212 
213 template <>
215  casacore::rownr_t rowNr, casacore::ArrayBase &dataPtr) {
216  getValues(rowNr, static_cast<casacore::Array<std::complex<float>>*>(&dataPtr));
217 }
218 template <>
220  casacore::rownr_t rowNr, const casacore::ArrayBase &dataPtr) {
221  putValues(rowNr, static_cast<const casacore::Array<std::complex<float>>*>(&dataPtr));
222 }
223 template <>
225  casacore::rownr_t rowNr, casacore::ArrayBase &dataPtr) {
226  getValues(rowNr, static_cast<casacore::Array<float>*>(&dataPtr));
227 }
228 template <>
230  casacore::rownr_t rowNr, const casacore::ArrayBase &dataPtr) {
231  putValues(rowNr, static_cast<const casacore::Array<float>*>(&dataPtr));
232 }
233 
234 extern template class ThreadedDyscoColumn<std::complex<float>>;
235 extern template class ThreadedDyscoColumn<float>;
236 
237 } // namespace dyscostman
238 
239 #endif
static uint32_t UnserializeUInt32(std::istream &stream)
Definition: serializable.h:76
static void SerializeToUInt32(std::ostream &stream, T value)
Definition: serializable.h:22
Non-templated base class for templated Array class.
Definition: ArrayBase.h:73
virtual void putArrayV(rownr_t rownr, const ArrayBase &data)
Put the array value into the given row.
virtual void getArrayV(rownr_t rownr, ArrayBase &dataPtr)
Get the array value in the given row.
Base class for columns of the DyscoStMan.
Definition: dyscostmancol.h:22
The main class for the Dysco storage manager.
Definition: dyscostman.h:46
A column for storing compressed values in a threaded way, tailored for the data and weight columns th...
virtual void InitializeAfterNRowsPerBlockIsKnown() override
Prepare this column for reading/writing.
TimeBlockBuffer< data_t >::symbol_t symbol_t
std::unique_ptr< TimeBlockBuffer< data_t > > _timeBlockBuffer
std::map< size_t, CacheItem * > cache_t
virtual ~ThreadedDyscoColumn()
Destructor.
std::condition_variable _cacheChangedCondition
virtual void encode(ThreadDataBase *threadData, TimeBlockBuffer< data_t > *buffer, float *metaBuffer, symbol_t *symbolBuffer, size_t nAntennae)=0
virtual void setShapeColumn(const casacore::IPosition &shape) override
Set the dimensions of values in this column.
bool isWriteItemAvailable(typename cache_t::iterator &i)
void getValues(casacore::rownr_t rowNr, casacore::Array< data_t > *dataPtr)
std::unique_ptr< casacore::ScalarColumn< int > > _fieldCol
std::unique_ptr< casacore::ScalarColumn< int > > _dataDescIdCol
ThreadedDyscoColumn(DyscoStMan *parent, int dtype)
Create a new column.
virtual void Prepare(DyscoDistribution distribution, Normalization normalization, double studentsTNu, double distributionTruncation) override
virtual size_t CalculateBlockSize(size_t nRowsInBlock, size_t nAntennae) const final override
virtual size_t symbolCount(size_t nRowsInBlock, size_t nPolarizations, size_t nChannels) const =0
const casacore::IPosition & shape() const
ThreadedDyscoColumn(const ThreadedDyscoColumn &source)=delete
void SetBitsPerSymbol(unsigned bitsPerSymbol)
Set the bits per symbol.
virtual void decode(TimeBlockBuffer< data_t > *buffer, const symbol_t *data, size_t blockRow, size_t a1, size_t a2)=0
virtual void UnserializeExtraHeader(std::istream &stream) final override
ao::uvector< unsigned char > _packedBlockReadBuffer
virtual std::unique_ptr< ThreadDataBase > initializeEncodeThread()=0
void operator=(const ThreadedDyscoColumn &source)=delete
void putValues(casacore::rownr_t rowNr, const casacore::Array< data_t > *dataPtr)
virtual void initializeDecode(TimeBlockBuffer< data_t > *buffer, const float *metaBuffer, size_t nRow, size_t nAntennae)=0
std::unique_ptr< casacore::ScalarColumn< int > > _ant2Col
virtual size_t defaultThreadCount() const
std::unique_ptr< casacore::ScalarColumn< int > > _ant1Col
virtual casacore::IPosition shape(casacore::rownr_t) override
Get the dimensions of the values in a particular row.
virtual void shutdown() override final
To be called before destructing the class.
virtual size_t ExtraHeaderSize() const override
Get number of bytes needed for column header of this column.
void loadBlock(size_t blockIndex)
virtual size_t metaDataFloatCount(size_t nRow, size_t nPolarizations, size_t nChannels, size_t nAntennae) const =0
virtual void putArrayV(casacore::rownr_t rowNr, const casacore::ArrayBase &dataPtr) override
Write values into a particular row.
std::unique_ptr< casacore::ScalarColumn< double > > _timeCol
ao::uvector< unsigned int > _unpackedSymbolReadBuffer
virtual void getArrayV(casacore::rownr_t rowNr, casacore::ArrayBase &dataPtr) override
Read the values for a particular row.
void encodeAndWrite(size_t blockIndex, const CacheItem &item, unsigned char *packedSymbolBuffer, unsigned int *unpackedSymbolBuffer, ThreadDataBase *threadUserData)
virtual void SerializeExtraHeader(std::ostream &stream) const final override
Group of threads.
Definition: threadgroup.h:13
uInt64 rownr_t
Define the type of a row number in a table.
Definition: aipsxtype.h:46
Define real & complex conjugation for non-complex types and put comparisons into std namespace.
Definition: Complex.h:352
CacheItem(std::unique_ptr< TimeBlockBuffer< data_t >> &&encoder_)
std::unique_ptr< TimeBlockBuffer< data_t > > encoder
virtual void Serialize(std::ostream &stream) const override
virtual void Unserialize(std::istream &stream) override