Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
dict-encoding.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_UTIL_DICT_ENCODING_H
16 #define IMPALA_UTIL_DICT_ENCODING_H
17 
18 #include <map>
19 
20 #include <boost/foreach.hpp>
21 #include <boost/scoped_ptr.hpp>
22 #include <boost/unordered_map.hpp>
23 
24 #include "exec/parquet-common.h"
25 #include "runtime/mem-pool.h"
26 #include "runtime/string-value.h"
27 #include "util/rle-encoding.h"
28 #include "util/runtime-profile.h"
29 
30 namespace impala {
31 
40 
49  public:
50  virtual ~DictEncoderBase() {
51  DCHECK(buffered_indices_.empty());
52  }
53 
56  virtual void WriteDict(uint8_t* buffer) = 0;
57 
59  virtual int num_entries() const = 0;
60 
62  void ClearIndices() { buffered_indices_.clear(); }
63 
68  }
69 
71  int bit_width() const {
72  if (UNLIKELY(num_entries() == 0)) return 0;
73  if (UNLIKELY(num_entries() == 1)) return 1;
74  return BitUtil::Log2(num_entries());
75  }
76 
82  int WriteData(uint8_t* buffer, int buffer_len);
83 
85 
86  protected:
88  : dict_encoded_size_(0), pool_(pool) {
89  }
90 
92  std::vector<int> buffered_indices_;
93 
96 
99 };
100 
101 template<typename T>
102 class DictEncoder : public DictEncoderBase {
103  public:
104  DictEncoder(MemPool* pool, int encoded_value_size) :
105  DictEncoderBase(pool), buckets_(HASH_TABLE_SIZE, Node::INVALID_INDEX),
106  encoded_value_size_(encoded_value_size) { }
107 
113  int Put(const T& value);
114 
115  virtual void WriteDict(uint8_t* buffer);
116 
117  virtual int num_entries() const { return nodes_.size(); }
118 
119  private:
121  enum { HASH_TABLE_SIZE = 1 << 16 };
122 
124  typedef uint16_t NodeIndex;
125 
129  std::vector<NodeIndex> buckets_;
130 
132  struct Node {
133  Node(const T& v, const NodeIndex& n) : value(v), next(n) { }
134 
136  T value;
137 
140 
143  enum { INVALID_INDEX = 40000 };
144  };
145 
148  std::vector<Node> nodes_;
149 
152 
154  inline uint32_t Hash(const T& value) const;
155 
160  int AddToTable(const T& value, NodeIndex* bucket);
161 };
162 
167  public:
169  void SetData(uint8_t* buffer, int buffer_len) {
170  DCHECK_GT(buffer_len, 0);
171  uint8_t bit_width = *buffer;
172  DCHECK_GE(bit_width, 0);
173  ++buffer;
174  --buffer_len;
175  data_decoder_.reset(new RleDecoder(buffer, buffer_len, bit_width));
176  }
177 
178  virtual ~DictDecoderBase() {}
179 
180  virtual int num_entries() const = 0;
181 
182  protected:
183  boost::scoped_ptr<RleDecoder> data_decoder_;
184 };
185 
186 template<typename T>
187 class DictDecoder : public DictDecoderBase {
188  public:
195  DictDecoder(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
196 
197  virtual int num_entries() const { return dict_.size(); }
198 
202  bool GetValue(T* value);
203 
204  private:
205  std::vector<T> dict_;
206 };
207 
208 template<typename T>
209 inline int DictEncoder<T>::Put(const T& value) {
210  NodeIndex* bucket = &buckets_[Hash(value) & (HASH_TABLE_SIZE - 1)];
211  NodeIndex i = *bucket;
212  // Look for the value in the dictionary.
213  while (i != Node::INVALID_INDEX) {
214  const Node* n = &nodes_[i];
215  if (LIKELY(n->value == value)) {
216  // Value already in dictionary.
217  buffered_indices_.push_back(i);
218  return 0;
219  }
220  i = n->next;
221  }
222  // Value not found. Add it to the dictionary if there's space.
223  i = nodes_.size();
224  if (UNLIKELY(i >= Node::INVALID_INDEX)) return -1;
225  buffered_indices_.push_back(i);
226  return AddToTable(value, bucket);
227 }
228 
229 template<typename T>
230 inline uint32_t DictEncoder<T>::Hash(const T& value) const {
231  return HashUtil::Hash(&value, sizeof(value), 0);
232 }
233 
234 template<>
235 inline uint32_t DictEncoder<StringValue>::Hash(const StringValue& value) const {
236  return HashUtil::Hash(value.ptr, value.len, 0);
237 }
238 
239 template<typename T>
240 inline int DictEncoder<T>::AddToTable(const T& value, NodeIndex* bucket) {
241  DCHECK_GT(encoded_value_size_, 0);
242  // Prepend the new node to this bucket's chain.
243  nodes_.push_back(Node(value, *bucket));
244  *bucket = nodes_.size() - 1;
245  dict_encoded_size_ += encoded_value_size_;
246  return encoded_value_size_;
247 }
248 
249 template<>
251  NodeIndex* bucket) {
252  char* ptr_copy = reinterpret_cast<char*>(pool_->Allocate(value.len));
253  memcpy(ptr_copy, value.ptr, value.len);
254  StringValue sv(ptr_copy, value.len);
255  // Prepend the new node to this bucket's chain.
256  nodes_.push_back(Node(sv, *bucket));
257  *bucket = nodes_.size() - 1;
258  int bytes_added = ParquetPlainEncoder::ByteSize(sv);
259  dict_encoded_size_ += bytes_added;
260  return bytes_added;
261 }
262 
263 template<typename T>
264 inline bool DictDecoder<T>::GetValue(T* value) {
265  DCHECK(data_decoder_.get() != NULL);
266  int index;
267  bool result = data_decoder_->Get(&index);
268  if (!result) return false;
269  if (index >= dict_.size()) return false;
270  *value = dict_[index];
271  return true;
272 }
273 
274 template<>
276  DCHECK(data_decoder_.get() != NULL);
277  int index;
278  bool result = data_decoder_->Get(&index);
279  if (!result) return false;
280  if (index >= dict_.size()) return false;
281  // Workaround for IMPALA-959. Use memcpy instead of '=' so addresses
282  // do not need to be 16 byte aligned.
283  uint8_t* addr = reinterpret_cast<uint8_t*>(&dict_[0]);
284  addr = addr + index * sizeof(*value);
285  memcpy(value, addr, sizeof(*value));
286  return true;
287 }
288 
289 template<typename T>
290 inline void DictEncoder<T>::WriteDict(uint8_t* buffer) {
291  BOOST_FOREACH(const Node& node, nodes_) {
292  buffer += ParquetPlainEncoder::Encode(buffer, encoded_value_size_, node.value);
293  }
294 }
295 
296 inline int DictEncoderBase::WriteData(uint8_t* buffer, int buffer_len) {
297  // Write bit width in first byte
298  *buffer = bit_width();
299  ++buffer;
300  --buffer_len;
301 
302  RleEncoder encoder(buffer, buffer_len, bit_width());
303  BOOST_FOREACH(int index, buffered_indices_) {
304  if (!encoder.Put(index)) return -1;
305  }
306  encoder.Flush();
307  return 1 + encoder.len();
308 }
309 
310 template<typename T>
311 inline DictDecoder<T>::DictDecoder(uint8_t* dict_buffer, int dict_len,
312  int fixed_len_size) {
313  uint8_t* end = dict_buffer + dict_len;
314  while (dict_buffer < end) {
315  T value;
316  dict_buffer +=
317  ParquetPlainEncoder::Decode(dict_buffer, fixed_len_size, &value);
318  dict_.push_back(value);
319  }
320 }
321 
322 }
323 #endif
uint16_t NodeIndex
Dictates an upper bound on the capacity of the hash table.
boost::scoped_ptr< RleDecoder > data_decoder_
uint32_t Hash(const T &value) const
Hash function for mapping a value to a bucket.
int encoded_value_size_
Size of each encoded dictionary value. -1 for variable-length types.
MemPool * pool_
Pool to store StringValue data. Not owned.
Definition: dict-encoding.h:98
int WriteData(uint8_t *buffer, int buffer_len)
static int ByteSize(const T &v)
Returns the byte size of 'v'.
virtual int num_entries() const
The number of entries in the dictionary.
std::vector< NodeIndex > buckets_
static int Encode(uint8_t *buffer, int fixed_len_size, const T &t)
T value
The dictionary value.
std::vector< Node > nodes_
int bit_width() const
The minimum bit width required to encode the currently buffered indices.
Definition: dict-encoding.h:71
virtual int num_entries() const =0
The number of entries in the dictionary.
virtual void WriteDict(uint8_t *buffer)
static uint32_t Hash(const void *data, int32_t bytes, uint32_t seed)
Definition: hash-util.h:135
ObjectPool pool
int dict_encoded_size_
The number of bytes needed to encode the dictionary.
Definition: dict-encoding.h:95
static uint64_t Hash(const IntVal &v)
static int Decode(uint8_t *buffer, int fixed_len_size, T *v)
int AddToTable(const T &value, NodeIndex *bucket)
void ClearIndices()
Clears all the indices (but leaves the dictionary).
Definition: dict-encoding.h:62
Node in the chained hash table.
DictDecoder(uint8_t *dict_buffer, int dict_len, int fixed_len_size)
#define UNLIKELY(expr)
Definition: compiler-util.h:33
bool Put(uint64_t value)
Definition: rle-encoding.h:264
std::vector< T > dict_
#define LIKELY(expr)
Definition: compiler-util.h:32
virtual void WriteDict(uint8_t *buffer)=0
int Put(const T &value)
virtual int num_entries() const =0
DictEncoder(MemPool *pool, int encoded_value_size)
bool GetValue(T *value)
Node(const T &v, const NodeIndex &n)
void SetData(uint8_t *buffer, int buffer_len)
The rle encoded indices into the dictionary.
static int MaxBufferSize(int bit_width, int num_values)
Returns the maximum byte size it could take to encode 'num_values'.
Definition: rle-encoding.h:142
std::vector< int > buffered_indices_
Indices that have not yet be written out by WriteData().
Definition: dict-encoding.h:92
DictEncoderBase(MemPool *pool)
Definition: dict-encoding.h:87
virtual int num_entries() const
Decoder class for RLE encoded data.
Definition: rle-encoding.h:77
static int Log2(uint64_t x)
Definition: bit-util.h:135
NodeIndex next
Index into nodes_ for the next Node in the chain. INVALID_INDEX indicates end.