casacore
Loading...
Searching...
No Matches
threadeddyscocolumn.h
Go to the documentation of this file.
1#ifndef DYSCO_THREADED_DYSCO_COLUMN_H
2#define DYSCO_THREADED_DYSCO_COLUMN_H
3
4#include <casacore/tables/DataMan/DataManError.h>
5
6#include <casacore/casa/Arrays/IPosition.h>
7#include <casacore/tables/Tables/ScalarColumn.h>
8
9#include <condition_variable>
10#include <cstdint>
11#include <map>
12#include <memory>
13#include <mutex>
14#include <random>
15
16#include "dyscostmancol.h"
17#include "serializable.h"
18#include "stochasticencoder.h"
19#include "threadgroup.h"
20#include "timeblockbuffer.h"
21
22namespace dyscostman {
23
24class DyscoStMan;
25
31template <typename DataType>
33 public:
34 typedef DataType data_t;
35
41
43
44 void operator=(const ThreadedDyscoColumn &source) = delete;
47
49 virtual void setShapeColumn(const casacore::IPosition &shape) override;
50
53 virtual casacore::IPosition shape(casacore::rownr_t /*rownr*/) override {
54 return _shape;
55 }
56
63 virtual void getArrayV(
65 casacore::ArrayBase &dataPtr) override {
66 return DyscoStManColumn::getArrayV(rowNr, dataPtr);
67 }
68
76 virtual void putArrayV(
78 const casacore::ArrayBase &dataPtr) override {
79 return DyscoStManColumn::putArrayV(rowNr, dataPtr);
80 }
81
82 virtual void Prepare(DyscoDistribution distribution,
83 Normalization normalization, double studentsTNu,
84 double distributionTruncation) override;
85
89 virtual void InitializeAfterNRowsPerBlockIsKnown() override;
90
95 void SetBitsPerSymbol(unsigned bitsPerSymbol) {
96 _bitsPerSymbol = bitsPerSymbol;
97 }
98
99 virtual size_t CalculateBlockSize(size_t nRowsInBlock,
100 size_t nAntennae) const final override;
101
102 virtual size_t ExtraHeaderSize() const override { return Header::Size(); }
103
104 virtual void SerializeExtraHeader(std::ostream &stream) const final override;
105
106 virtual void UnserializeExtraHeader(std::istream &stream) final override;
107
108 protected:
110 public:
111 virtual ~ThreadDataBase(){};
112 };
113
115
117 const float *metaBuffer, size_t nRow,
118 size_t nAntennae) = 0;
119
120 virtual void decode(TimeBlockBuffer<data_t> *buffer, const symbol_t *data,
121 size_t blockRow, size_t a1, size_t a2) = 0;
122
123 virtual std::unique_ptr<ThreadDataBase> initializeEncodeThread() = 0;
124
125 virtual void encode(ThreadDataBase *threadData,
126 TimeBlockBuffer<data_t> *buffer, float *metaBuffer,
127 symbol_t *symbolBuffer, size_t nAntennae) = 0;
128
129 virtual size_t metaDataFloatCount(size_t nRow, size_t nPolarizations,
130 size_t nChannels,
131 size_t nAntennae) const = 0;
132
133 virtual size_t symbolCount(size_t nRowsInBlock, size_t nPolarizations,
134 size_t nChannels) const = 0;
135
136 virtual void shutdown() override final;
137
138 virtual size_t defaultThreadCount() const;
139
140 size_t getBitsPerSymbol() const { return _bitsPerSymbol; }
141
142 const casacore::IPosition &shape() const { return _shape; }
143
144 private:
145 struct CacheItem {
146 CacheItem(std::unique_ptr<TimeBlockBuffer<data_t>> &&encoder_)
147 : encoder(std::move(encoder_)), isBeingWritten(false) {}
148
149 std::unique_ptr<TimeBlockBuffer<data_t>> encoder;
151 };
152
157 struct Header : public Serializable {
158 uint32_t blockSize;
159 uint32_t antennaCount;
160
161 static uint32_t Size() { return 8; }
162
163 virtual void Serialize(std::ostream &stream) const override {
166 }
167
168 virtual void Unserialize(std::istream &stream) override {
171 }
172 };
173
174 typedef std::map<size_t, CacheItem *> cache_t;
175
178
180 void encodeAndWrite(size_t blockIndex, const CacheItem &item,
181 unsigned char *packedSymbolBuffer,
182 unsigned int *unpackedSymbolBuffer,
183 ThreadDataBase *threadUserData);
184 bool isWriteItemAvailable(typename cache_t::iterator &i);
185 void loadBlock(size_t blockIndex);
187 size_t maxCacheSize() const {
188 return ThreadedDyscoColumn::defaultThreadCount() * 12 / 10 + 1;
189 }
190
193 std::unique_ptr<casacore::ScalarColumn<int>> _ant1Col, _ant2Col, _fieldCol,
195 std::unique_ptr<casacore::ScalarColumn<double>> _timeCol;
202 std::mutex _mutex;
204 std::condition_variable _cacheChangedCondition;
209
210 std::unique_ptr<TimeBlockBuffer<data_t>> _timeBlockBuffer;
211};
212
213template <>
215 casacore::rownr_t rowNr, casacore::ArrayBase &dataPtr) {
216 getValues(rowNr, static_cast<casacore::Array<std::complex<float>>*>(&dataPtr));
217}
218template <>
220 casacore::rownr_t rowNr, const casacore::ArrayBase &dataPtr) {
221 putValues(rowNr, static_cast<const casacore::Array<std::complex<float>>*>(&dataPtr));
222}
223template <>
225 casacore::rownr_t rowNr, casacore::ArrayBase &dataPtr) {
226 getValues(rowNr, static_cast<casacore::Array<float>*>(&dataPtr));
227}
228template <>
230 casacore::rownr_t rowNr, const casacore::ArrayBase &dataPtr) {
231 putValues(rowNr, static_cast<const casacore::Array<float>*>(&dataPtr));
232}
233
234extern template class ThreadedDyscoColumn<std::complex<float>>;
235extern template class ThreadedDyscoColumn<float>;
236
237} // namespace dyscostman
238
239#endif
static uint32_t UnserializeUInt32(std::istream &stream)
static void SerializeToUInt32(std::ostream &stream, T value)
A container similar to std::vector, but one that allows construction without initializing its element...
Definition uvector.h:74
Non-templated base class for templated Array class.
Definition ArrayBase.h:73
virtual void putArrayV(rownr_t rownr, const ArrayBase &data)
Put the array value into the given row.
virtual void getArrayV(rownr_t rownr, ArrayBase &dataPtr)
Get the array value in the given row.
Base class for columns of the DyscoStMan.
The main class for the Dysco storage manager.
Definition dyscostman.h:46
A column for storing compressed values in a threaded way, tailored for the data and weight columns th...
virtual void InitializeAfterNRowsPerBlockIsKnown() override
Prepare this column for reading/writing.
TimeBlockBuffer< data_t >::symbol_t symbol_t
std::unique_ptr< TimeBlockBuffer< data_t > > _timeBlockBuffer
std::map< size_t, CacheItem * > cache_t
virtual ~ThreadedDyscoColumn()
Destructor.
std::condition_variable _cacheChangedCondition
virtual void encode(ThreadDataBase *threadData, TimeBlockBuffer< data_t > *buffer, float *metaBuffer, symbol_t *symbolBuffer, size_t nAntennae)=0
virtual void setShapeColumn(const casacore::IPosition &shape) override
Set the dimensions of values in this column.
bool isWriteItemAvailable(typename cache_t::iterator &i)
void getValues(casacore::rownr_t rowNr, casacore::Array< data_t > *dataPtr)
std::unique_ptr< casacore::ScalarColumn< int > > _fieldCol
const casacore::IPosition & shape() const
std::unique_ptr< casacore::ScalarColumn< int > > _dataDescIdCol
virtual std::unique_ptr< ThreadDataBase > initializeEncodeThread()=0
ThreadedDyscoColumn(DyscoStMan *parent, int dtype)
Create a new column.
virtual void Prepare(DyscoDistribution distribution, Normalization normalization, double studentsTNu, double distributionTruncation) override
virtual size_t CalculateBlockSize(size_t nRowsInBlock, size_t nAntennae) const final override
virtual size_t symbolCount(size_t nRowsInBlock, size_t nPolarizations, size_t nChannels) const =0
ThreadedDyscoColumn(const ThreadedDyscoColumn &source)=delete
void SetBitsPerSymbol(unsigned bitsPerSymbol)
Set the bits per symbol.
virtual void decode(TimeBlockBuffer< data_t > *buffer, const symbol_t *data, size_t blockRow, size_t a1, size_t a2)=0
virtual void UnserializeExtraHeader(std::istream &stream) final override
ao::uvector< unsigned char > _packedBlockReadBuffer
void operator=(const ThreadedDyscoColumn &source)=delete
void putValues(casacore::rownr_t rowNr, const casacore::Array< data_t > *dataPtr)
virtual void initializeDecode(TimeBlockBuffer< data_t > *buffer, const float *metaBuffer, size_t nRow, size_t nAntennae)=0
std::unique_ptr< casacore::ScalarColumn< int > > _ant2Col
virtual size_t defaultThreadCount() const
std::unique_ptr< casacore::ScalarColumn< int > > _ant1Col
virtual casacore::IPosition shape(casacore::rownr_t) override
Get the dimensions of the values in a particular row.
virtual void shutdown() override final
To be called before destructing the class.
virtual size_t ExtraHeaderSize() const override
Get number of bytes needed for column header of this column.
void loadBlock(size_t blockIndex)
virtual size_t metaDataFloatCount(size_t nRow, size_t nPolarizations, size_t nChannels, size_t nAntennae) const =0
virtual void putArrayV(casacore::rownr_t rowNr, const casacore::ArrayBase &dataPtr) override
Write values into a particular row.
std::unique_ptr< casacore::ScalarColumn< double > > _timeCol
ao::uvector< unsigned int > _unpackedSymbolReadBuffer
virtual void getArrayV(casacore::rownr_t rowNr, casacore::ArrayBase &dataPtr) override
Read the values for a particular row.
void encodeAndWrite(size_t blockIndex, const CacheItem &item, unsigned char *packedSymbolBuffer, unsigned int *unpackedSymbolBuffer, ThreadDataBase *threadUserData)
virtual void SerializeExtraHeader(std::ostream &stream) const final override
Group of threads.
Definition threadgroup.h:13
uInt64 rownr_t
Define the type of a row number in a table.
Definition aipsxtype.h:46
Define real & complex conjugation for non-complex types and put comparisons into std namespace.
Definition Complex.h:352
std::unique_ptr< TimeBlockBuffer< data_t > > encoder
CacheItem(std::unique_ptr< TimeBlockBuffer< data_t > > &&encoder_)
virtual void Serialize(std::ostream &stream) const override
virtual void Unserialize(std::istream &stream) override