Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
rle-encoding.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_RLE_ENCODING_H
16 #define IMPALA_RLE_ENCODING_H
17 
18 #include <math.h>
19 
20 #include "common/compiler-util.h"
22 #include "util/bit-util.h"
23 
24 namespace impala {
25 
42 //
47 //
57 //
63 //
69 //
74 //
75 
77 class RleDecoder {
78  public:
81  RleDecoder(uint8_t* buffer, int buffer_len, int bit_width)
82  : bit_reader_(buffer, buffer_len),
83  bit_width_(bit_width),
84  current_value_(0),
85  repeat_count_(0),
86  literal_count_(0) {
87  DCHECK_GE(bit_width_, 0);
88  DCHECK_LE(bit_width_, 64);
89  }
90 
92 
94  template<typename T>
95  bool Get(T* val);
96 
97  private:
101  uint32_t repeat_count_;
102  uint32_t literal_count_;
103 };
104 
111 class RleEncoder {
112  public:
119  RleEncoder(uint8_t* buffer, int buffer_len, int bit_width)
120  : bit_width_(bit_width),
121  bit_writer_(buffer, buffer_len) {
122  DCHECK_GE(bit_width_, 1);
123  DCHECK_LE(bit_width_, 64);
124  max_run_byte_size_ = MinBufferSize(bit_width);
125  DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough.";
126  Clear();
127  }
128 
132  static int MinBufferSize(int bit_width) {
134  int max_literal_run_size = 1 +
137  int max_repeated_run_size = BitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8);
138  return std::max(max_literal_run_size, max_repeated_run_size);
139  }
140 
142  static int MaxBufferSize(int bit_width, int num_values) {
143  int bytes_per_run = BitUtil::Ceil(bit_width * MAX_VALUES_PER_LITERAL_RUN, 8.0);
144  int num_runs = BitUtil::Ceil(num_values, MAX_VALUES_PER_LITERAL_RUN);
145  int literal_max_size = num_runs + num_runs * bytes_per_run;
146  return std::max(MinBufferSize(bit_width), literal_max_size);
147  }
148 
151  bool Put(uint64_t value);
152 
155  int Flush();
156 
158  void Clear();
159 
161  uint8_t* buffer() { return bit_writer_.buffer(); }
162  int32_t len() { return bit_writer_.bytes_written(); }
163 
164  private:
173  void FlushBufferedValues(bool done);
174 
177  void FlushLiteralRun(bool update_indicator_byte);
178 
180  void FlushRepeatedRun();
181 
184  void CheckBufferFull();
185 
188  static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8;
189 
191  const int bit_width_;
192 
195 
198 
201 
205  int64_t buffered_values_[8];
206 
209 
214  int64_t current_value_;
216 
221 
226 };
227 
228 template<typename T>
229 inline bool RleDecoder::Get(T* val) {
230  if (UNLIKELY(literal_count_ == 0 && repeat_count_ == 0)) {
231  // Read the next run's indicator int, it could be a literal or repeated run
232  // The int is encoded as a vlq-encoded value.
233  int32_t indicator_value = 0;
234  bool result = bit_reader_.GetVlqInt(&indicator_value);
235  if (!result) return false;
236 
237  // lsb indicates if it is a literal run or repeated run
238  bool is_literal = indicator_value & 1;
239  if (is_literal) {
240  literal_count_ = (indicator_value >> 1) * 8;
241  } else {
242  repeat_count_ = indicator_value >> 1;
243  bool result = bit_reader_.GetAligned<T>(
244  BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(&current_value_));
245  DCHECK(result);
246  }
247  }
248 
249  if (LIKELY(repeat_count_ > 0)) {
250  *val = current_value_;
251  --repeat_count_;
252  } else {
253  DCHECK(literal_count_ > 0);
254  bool result = bit_reader_.GetValue(bit_width_, val);
255  DCHECK(result);
256  --literal_count_;
257  }
258 
259  return true;
260 }
261 
264 inline bool RleEncoder::Put(uint64_t value) {
265  DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
266  if (UNLIKELY(buffer_full_)) return false;
267 
268  if (LIKELY(current_value_ == value)) {
269  ++repeat_count_;
270  if (repeat_count_ > 8) {
271  // This is just a continuation of the current run, no need to buffer the
272  // values.
273  // Note that this is the fast path for long repeated runs.
274  return true;
275  }
276  } else {
277  if (repeat_count_ >= 8) {
278  // We had a run that was long enough but it has ended. Flush the
279  // current repeated run.
280  DCHECK_EQ(literal_count_, 0);
282  }
283  repeat_count_ = 1;
284  current_value_ = value;
285  }
286 
288  if (++num_buffered_values_ == 8) {
289  DCHECK_EQ(literal_count_ % 8, 0);
290  FlushBufferedValues(false);
291  }
292  return true;
293 }
294 
295 inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) {
296  if (literal_indicator_byte_ == NULL) {
297  // The literal indicator byte has not been reserved yet, get one now.
299  DCHECK(literal_indicator_byte_ != NULL);
300  }
301 
302  // Write all the buffered values as bit packed literals
303  for (int i = 0; i < num_buffered_values_; ++i) {
304  bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_);
305  DCHECK(success) << "There is a bug in using CheckBufferFull()";
306  }
307  num_buffered_values_ = 0;
308 
309  if (update_indicator_byte) {
310  // At this point we need to write the indicator byte for the literal run.
311  // We only reserve one byte, to allow for streaming writes of literal values.
312  // The logic makes sure we flush literal runs often enough to not overrun
313  // the 1 byte.
314  DCHECK_EQ(literal_count_ % 8, 0);
315  int num_groups = literal_count_ / 8;
316  int32_t indicator_value = (num_groups << 1) | 1;
317  DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
318  *literal_indicator_byte_ = indicator_value;
320  literal_count_ = 0;
321  CheckBufferFull();
322  }
323 }
324 
326  DCHECK_GT(repeat_count_, 0);
327  bool result = true;
328  // The lsb of 0 indicates this is a repeated run
329  int32_t indicator_value = repeat_count_ << 1 | 0;
330  result &= bit_writer_.PutVlqInt(indicator_value);
332  DCHECK(result);
334  repeat_count_ = 0;
335  CheckBufferFull();
336 }
337 
340 inline void RleEncoder::FlushBufferedValues(bool done) {
341  if (repeat_count_ >= 8) {
342  // Clear the buffered values. They are part of the repeated run now and we
343  // don't want to flush them out as literals.
345  if (literal_count_ != 0) {
346  // There was a current literal run. All the values in it have been flushed
347  // but we still need to update the indicator byte.
348  DCHECK_EQ(literal_count_ % 8, 0);
349  DCHECK_EQ(repeat_count_, 8);
350  FlushLiteralRun(true);
351  }
352  DCHECK_EQ(literal_count_, 0);
353  return;
354  }
355 
357  DCHECK_EQ(literal_count_ % 8, 0);
358  int num_groups = literal_count_ / 8;
359  if (num_groups + 1 >= (1 << 6)) {
360  // We need to start a new literal run because the indicator byte we've reserved
361  // cannot store more values.
362  DCHECK(literal_indicator_byte_ != NULL);
363  FlushLiteralRun(true);
364  } else {
365  FlushLiteralRun(done);
366  }
367  repeat_count_ = 0;
368 }
369 
370 inline int RleEncoder::Flush() {
371  if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
372  bool all_repeat = literal_count_ == 0 &&
374  // There is something pending, figure out if it's a repeated or literal run
375  if (repeat_count_ > 0 && all_repeat) {
377  } else {
378  DCHECK_EQ(literal_count_ % 8, 0);
379  // Buffer the last group of literals to 8 by padding with 0s.
380  for (; num_buffered_values_ != 0 && num_buffered_values_ < 8;
383  }
385  FlushLiteralRun(true);
386  repeat_count_ = 0;
387  }
388  }
389  bit_writer_.Flush();
390  DCHECK_EQ(num_buffered_values_, 0);
391  DCHECK_EQ(literal_count_, 0);
392  DCHECK_EQ(repeat_count_, 0);
393 
394  return bit_writer_.bytes_written();
395 }
396 
398  int bytes_written = bit_writer_.bytes_written();
399  if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) {
400  buffer_full_ = true;
401  }
402 }
403 
404 inline void RleEncoder::Clear() {
405  buffer_full_ = false;
406  current_value_ = 0;
407  repeat_count_ = 0;
409  literal_count_ = 0;
411  bit_writer_.Clear();
412 }
413 
414 }
415 #endif
bool GetAligned(int num_bytes, T *v)
bool PutValue(uint64_t v, int num_bits)
int num_buffered_values_
Number of values in buffered_values_.
Definition: rle-encoding.h:208
static const int MAX_VALUES_PER_LITERAL_RUN
Definition: rle-encoding.h:188
BitWriter bit_writer_
Underlying buffer.
Definition: rle-encoding.h:194
BitReader bit_reader_
Definition: rle-encoding.h:98
const int bit_width_
Number of bits needed to encode the value.
Definition: rle-encoding.h:191
int bytes_written() const
uint8_t * buffer() const
uint8_t * buffer()
Returns pointer to underlying buffer.
Definition: rle-encoding.h:161
RleEncoder(uint8_t *buffer, int buffer_len, int bit_width)
Definition: rle-encoding.h:119
void FlushBufferedValues(bool done)
Definition: rle-encoding.h:340
void FlushLiteralRun(bool update_indicator_byte)
Definition: rle-encoding.h:295
uint32_t repeat_count_
Definition: rle-encoding.h:101
uint8_t * GetNextBytePtr(int num_bytes=1)
bool PutAligned(T v, int num_bytes)
bool GetVlqInt(int32_t *v)
void Flush(bool align=false)
static int Ceil(int value, int divisor)
Returns the ceil of value/divisor.
Definition: bit-util.h:32
int max_run_byte_size_
The maximum byte size a single run can take.
Definition: rle-encoding.h:200
bool buffer_full_
If true, the buffer is full and subsequent Put()'s will fail.
Definition: rle-encoding.h:197
uint64_t current_value_
Definition: rle-encoding.h:100
static int MinBufferSize(int bit_width)
Definition: rle-encoding.h:132
RleDecoder(uint8_t *buffer, int buffer_len, int bit_width)
Definition: rle-encoding.h:81
int buffer_len() const
#define UNLIKELY(expr)
Definition: compiler-util.h:33
bool Put(uint64_t value)
Definition: rle-encoding.h:264
#define LIKELY(expr)
Definition: compiler-util.h:32
void FlushRepeatedRun()
Flushes a repeated run to the underlying buffer.
Definition: rle-encoding.h:325
bool Get(T *val)
Gets the next value. Returns false if there are no more.
Definition: rle-encoding.h:229
uint32_t literal_count_
Definition: rle-encoding.h:102
static int MaxBufferSize(int bit_width, int num_values)
Returns the maximum byte size it could take to encode 'num_values'.
Definition: rle-encoding.h:142
void Clear()
Resets all the state in the encoder.
Definition: rle-encoding.h:404
static const int MAX_VLQ_BYTE_LEN
Maximum byte length of a vlq encoded int.
Decoder class for RLE encoded data.
Definition: rle-encoding.h:77
uint8_t * literal_indicator_byte_
Definition: rle-encoding.h:225
bool GetValue(int num_bits, T *v)
int64_t buffered_values_[8]
Definition: rle-encoding.h:205