Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-parquet-table-writer.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "common/version.h"
18 #include "exprs/expr.h"
19 #include "exprs/expr-context.h"
20 #include "runtime/decimal-value.h"
21 #include "runtime/raw-value.h"
22 #include "runtime/row-batch.h"
23 #include "runtime/runtime-state.h"
25 #include "util/bit-stream-utils.h"
26 #include "util/bit-util.h"
27 #include "util/buffer-builder.h"
28 #include "util/compress.h"
29 #include "util/debug-util.h"
30 #include "util/dict-encoding.h"
31 #include "util/hdfs-util.h"
32 #include "util/rle-encoding.h"
33 #include "rpc/thrift-util.h"
34 
35 #include <sstream>
36 
37 #include "gen-cpp/ImpalaService_types.h"
38 
39 #include "common/names.h"
40 using namespace impala;
41 using namespace parquet;
42 using namespace apache::thrift;
43 
44 // Managing file sizes: We need to estimate how big the files being buffered
45 // are in order to split them correctly in HDFS. Having a file that is too big
46 // will cause remote reads (parquet files are non-splittable).
47 // It's too expensive to compute the exact file sizes as the rows are buffered
48 // since the values in the current pages are only encoded/compressed when the page
49 // is full. Once the page is full, we encode and compress it, at which point we know
50 // the exact on file size.
51 // The current buffered pages (one for each column) can have a very poor estimate.
52 // To adjust for this, we aim for a slightly smaller file size than the ideal.
53 
54 // The maximum entries in the dictionary before giving up and switching to
55 // plain encoding.
56 // TODO: more complicated heuristic?
57 static const int MAX_DICTIONARY_ENTRIES = (1 << 16) - 1;
58 
59 // Class that encapsulates all the state for writing a single column. This contains
60 // all the buffered pages as well as the metadata (e.g. byte sizes, num values, etc).
61 // This is intended to be created once per writer per column and reused across
62 // row groups.
63 // We currently accumulate all the data pages for an entire row group per column
64 // before flushing them. This can be pretty large (hundreds of MB) but we can't
65 // fix this without collocated files in HDFS. With collocated files, the minimum
66 // we'd need to buffer is 1 page per column so on the order of 1MB (although we might
67 // decide to buffer a few pages for better HDFS write performance).
68 // Pages are reused between flushes. They are created on demand as necessary and
69 // recycled after a flush.
70 // As rows come in, we accumulate the encoded values into the values_ and def_levels_
71 // buffers. When we've accumulated a page worth's of data, we combine values_ and
72 // def_levels_ into a single buffer that would be the exact bytes (with no gaps) in
73 // the file. The combined buffer is compressed if compression is enabled and we
74 // keep the combined/compressed buffer until we need to flush the file. The
75 // values_ and def_levels_ are then reused for the next page.
76 //
77 // TODO: For codegen, we would codegen the AppendRow() function for each column.
78 // This codegen is specific to the column expr (and type) and encoding. The
79 // parent writer object would combine all the generated AppendRow from all
80 // the columns and run that function over row batches.
81 // TODO: we need to pass in the compression from the FE/metadata
82 
83 namespace impala {
84 
85 // Base class for column writers. This contains most of the logic except for
86 // the type specific functions which are implemented in the subclasses.
88  public:
89  // expr - the expression to generate output values for this column.
91  const THdfsCompression::type& codec)
92  : parent_(parent), expr_ctx_(expr_ctx), codec_(codec),
93  page_size_(DEFAULT_DATA_PAGE_SIZE), current_page_(NULL), num_values_(0),
94  total_compressed_byte_size_(0),
95  total_uncompressed_byte_size_(0),
96  dict_encoder_base_(NULL),
97  def_levels_(NULL),
98  values_buffer_len_(DEFAULT_DATA_PAGE_SIZE) {
99  Codec::CreateCompressor(NULL, false, codec, &compressor_);
100 
101  def_levels_ = parent_->state_->obj_pool()->Add(
102  new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
103  DEFAULT_DATA_PAGE_SIZE, 1));
104  values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
105  }
106 
107  virtual ~BaseColumnWriter() {}
108 
109  // Appends the row to this column. This buffers the value into a data page. Returns
110  // error if the space needed for the encoded value is larger than the data page size.
111  // TODO: this needs to be batch based, instead of row based for better performance.
112  // This is a bit trickier to handle the case where only a partial row batch can be
113  // output to the current file because it reaches the max file size. Enabling codegen
114  // would also solve this problem.
115  Status AppendRow(TupleRow* row);
116 
117  // Flushes all buffered data pages to the file.
118  // *file_pos is an output parameter and will be incremented by
119  // the number of bytes needed to write all the data pages for this column.
120  // first_data_page and first_dictionary_page are also out parameters and
121  // will contain the byte offset for the data page and dictionary page. They
122  // will be set to -1 if the column does not contain that type of page.
123  Status Flush(int64_t* file_pos, int64_t* first_data_page,
124  int64_t* first_dictionary_page);
125 
126  // Resets all the data accumulated for this column. Memory can now be reused for
127  // the next row group
128  // Any data for previous row groups must be reset (e.g. dictionaries).
129  // Subclasses must call this if they override this function.
130  virtual void Reset() {
131  num_data_pages_ = 0;
132  current_page_ = NULL;
133  num_values_ = 0;
134  total_compressed_byte_size_ = 0;
135  current_encoding_ = Encoding::PLAIN;
136  }
137 
138  // Close this writer. This is only called after Flush() and no more rows will
139  // be added.
140  void Close() {
141  if (compressor_.get() != NULL) compressor_->Close();
142  if (dict_encoder_base_ != NULL) dict_encoder_base_->ClearIndices();
143  }
144 
145  const ColumnType& type() const { return expr_ctx_->root()->type(); }
146  uint64_t num_values() const { return num_values_; }
147  uint64_t total_compressed_size() const { return total_compressed_byte_size_; }
148  uint64_t total_uncompressed_size() const { return total_uncompressed_byte_size_; }
149  parquet::CompressionCodec::type codec() const {
150  return IMPALA_TO_PARQUET_CODEC[codec_];
151  }
152 
153  protected:
155 
156  // Encode value into the current page output buffer. Returns true if the value fits
157  // on the current page. If this function returned false, the caller should create a
158  // new page and try again with the same value.
159  // *bytes_needed will contain the (estimated) number of bytes needed to successfully
160  // encode the value in the page.
161  // Implemented in the subclass.
162  virtual bool EncodeValue(void* value, int64_t* bytes_needed) = 0;
163 
164  // Encodes out all data for the current page and updates the metadata.
165  virtual void FinalizeCurrentPage();
166 
167  // Update current_page_ to a new page, reusing pages allocated if possible.
168  void NewPage();
169 
170  // Writes out the dictionary encoded data buffered in dict_encoder_.
171  void WriteDictDataPage();
172 
173  struct DataPage {
174  // Page header. This is a union of all page types.
175  PageHeader header;
176 
177  // Number of bytes needed to store definition levels.
179 
180  // This is the payload for the data page. This includes the definition/repetition
181  // levels data and the encoded values. If compression is enabled, this is the
182  // compressed data.
183  uint8_t* data;
184 
185  // If true, this data page has been finalized. All sizes are computed, header is
186  // fully populated and any compression is done.
187  bool finalized;
188 
189  // Number of non-null values
191  };
192 
195 
196  THdfsCompression::type codec_;
197 
198  // Compression codec for this column. If NULL, this column is will not be compressed.
199  scoped_ptr<Codec> compressor_;
200 
201  vector<DataPage> pages_;
202 
203  // Number of pages in 'pages_' that are used. 'pages_' is reused between flushes
204  // so this number can be less than pages_.size()
206 
207  // Size of newly created pages. Defaults to DEFAULT_DATA_PAGE_SIZE and is increased
208  // when pages are not big enough. This only happens when there are enough unique values
209  // such that we switch from PLAIN_DICTIONARY to PLAIN encoding and then have very
210  // large values (i.e. greater than DEFAULT_DATA_PAGE_SIZE).
211  // TODO: Consider removing and only creating a single large page as necessary.
212  int64_t page_size_;
213 
215  int64_t num_values_; // Total number of values across all pages, including NULLs.
218  Encoding::type current_encoding_;
219 
220  // Created and set by the base class.
222 
223  // Rle encoder object for storing definition levels. For non-nested schemas,
224  // this always uses 1 bit per row.
225  // This is reused across pages since the underlying buffer is copied out when
226  // the page is finalized.
228 
229  // Data for buffered values. This is reused across pages.
230  uint8_t* values_buffer_;
231  // The size of values_buffer_.
233 };
234 
235 // Per type column writer.
236 template<typename T>
239  public:
241  const THdfsCompression::type& codec) : BaseColumnWriter(parent, ctx, codec),
242  num_values_since_dict_size_check_(0) {
243  DCHECK_NE(ctx->root()->type().type, TYPE_BOOLEAN);
244  encoded_value_size_ = ParquetPlainEncoder::ByteSize(ctx->root()->type());
245  }
246 
247  virtual void Reset() {
248  BaseColumnWriter::Reset();
249  // Default to dictionary encoding. If the cardinality ends up being too high,
250  // it will fall back to plain.
251  current_encoding_ = Encoding::PLAIN_DICTIONARY;
252  dict_encoder_.reset(
253  new DictEncoder<T>(parent_->per_file_mem_pool_.get(), encoded_value_size_));
254  dict_encoder_base_ = dict_encoder_.get();
255  }
256 
257  protected:
258  virtual bool EncodeValue(void* value, int64_t* bytes_needed) {
259  if (current_encoding_ == Encoding::PLAIN_DICTIONARY) {
260  if (UNLIKELY(num_values_since_dict_size_check_ >=
261  DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
262  num_values_since_dict_size_check_ = 0;
263  if (dict_encoder_->EstimatedDataEncodedSize() >= page_size_) return false;
264  }
265  ++num_values_since_dict_size_check_;
266  *bytes_needed = dict_encoder_->Put(*CastValue(value));
267  // If the dictionary contains the maximum number of values, switch to plain
268  // encoding. The current dictionary encoded page is written out.
269  if (UNLIKELY(*bytes_needed < 0)) {
270  FinalizeCurrentPage();
271  current_encoding_ = Encoding::PLAIN;
272  return false;
273  }
274  parent_->file_size_estimate_ += *bytes_needed;
275  } else if (current_encoding_ == Encoding::PLAIN) {
276  T* v = CastValue(value);
277  *bytes_needed = encoded_value_size_ < 0 ?
278  ParquetPlainEncoder::ByteSize<T>(*v) : encoded_value_size_;
279  if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) {
280  return false;
281  }
282  uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size;
283  int64_t written_len =
284  ParquetPlainEncoder::Encode(dst_ptr, encoded_value_size_, *v);
285  DCHECK_EQ(*bytes_needed, written_len);
286  current_page_->header.uncompressed_page_size += written_len;
287  } else {
288  // TODO: support other encodings here
289  DCHECK(false);
290  }
291  return true;
292  }
293 
294  private:
295  // The period, in # of rows, to check the estimated dictionary page size against
296  // the data page size. We want to start a new data page when the estimated size
297  // is at least that big. The estimated size computation is not very cheap and
298  // we can tolerate going over the data page size by some amount.
299  // The expected byte size per dictionary value is < 1B and at most 2 bytes so the
300  // error is pretty low.
301  // TODO: is there a better way?
302  static const int DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD = 100;
303 
304  // Encoder for dictionary encoding for different columns. Only one is set.
305  scoped_ptr<DictEncoder<T> > dict_encoder_;
306 
307  // The number of values added since we last checked the dictionary.
309 
310  // Size of each encoded value. -1 if the size is type is variable-length.
312 
313  // Temporary string value to hold CHAR(N)
315 
316  // Converts a slot pointer to a raw value suitable for encoding
317  inline T* CastValue(void* value) {
318  return reinterpret_cast<T*>(value);
319  }
320 };
321 
322 template<>
324  void* value) {
325  if (type().type == TYPE_CHAR) {
326  temp_.ptr = StringValue::CharSlotToPtr(value, type());
327  temp_.len = StringValue::UnpaddedCharLength(temp_.ptr, type().len);
328  return &temp_;
329  }
330  return reinterpret_cast<StringValue*>(value);
331 }
332 
333 // Bools are encoded a bit differently so subclass it explicitly.
336  public:
338  const THdfsCompression::type& codec) : BaseColumnWriter(parent, ctx, codec) {
339  DCHECK_EQ(ctx->root()->type().type, TYPE_BOOLEAN);
340  bool_values_ = parent_->state_->obj_pool()->Add(
341  new BitWriter(values_buffer_, values_buffer_len_));
342  // Dictionary encoding doesn't make sense for bools and is not allowed by
343  // the format.
344  current_encoding_ = Encoding::PLAIN;
345  dict_encoder_base_ = NULL;
346  }
347 
348  protected:
349  virtual bool EncodeValue(void* value, int64_t* bytes_needed) {
350  return bool_values_->PutValue(*reinterpret_cast<bool*>(value), 1);
351  }
352 
353  virtual void FinalizeCurrentPage() {
354  DCHECK(current_page_ != NULL);
355  if (current_page_->finalized) return;
356  bool_values_->Flush();
357  int num_bytes = bool_values_->bytes_written();
358  current_page_->header.uncompressed_page_size += num_bytes;
359  // Call into superclass to handle the rest.
360  BaseColumnWriter::FinalizeCurrentPage();
361  bool_values_->Clear();
362  }
363 
364  private:
365  // Used to encode bools as single bit values. This is reused across pages.
367 };
368 
369 }
370 
372  ++num_values_;
373  void* value = expr_ctx_->GetValue(row);
374  if (current_page_ == NULL) NewPage();
375 
376  // We might need to try again if this current page is not big enough
377  while (true) {
378  if (!def_levels_->Put(value != NULL)) {
379  FinalizeCurrentPage();
380  NewPage();
381  bool ret = def_levels_->Put(value != NULL);
382  DCHECK(ret);
383  }
384 
385  // Nulls don't get encoded.
386  if (value == NULL) break;
387  ++current_page_->num_non_null;
388 
389  int64_t bytes_needed = 0;
390  if (EncodeValue(value, &bytes_needed)) break;
391 
392  // Value didn't fit on page, try again on a new page.
393  FinalizeCurrentPage();
394 
395  // Check how much space it is needed to write this value. If that is larger than the
396  // page size then increase page size and try again.
397  if (UNLIKELY(bytes_needed > page_size_)) {
398  page_size_ = bytes_needed;
399  if (page_size_ > MAX_DATA_PAGE_SIZE) {
400  stringstream ss;
401  ss << "Cannot write value of size "
402  << PrettyPrinter::Print(bytes_needed, TUnit::BYTES) << " bytes to a Parquet "
403  << "data page that exceeds the max page limit "
404  << PrettyPrinter::Print(MAX_DATA_PAGE_SIZE , TUnit::BYTES) << ".";
405  return Status(ss.str());
406  }
407  values_buffer_len_ = page_size_;
408  values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
409  }
410  NewPage();
411  }
412  ++current_page_->header.data_page_header.num_values;
413  return Status::OK;
414 }
415 
417  DCHECK(dict_encoder_base_ != NULL);
418  DCHECK_EQ(current_page_->header.uncompressed_page_size, 0);
419  if (current_page_->num_non_null == 0) return;
420  int len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
421  while (UNLIKELY(len < 0)) {
422  // len < 0 indicates the data doesn't fit into a data page. Allocate a larger data
423  // page.
424  values_buffer_len_ *= 2;
425  values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
426  len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
427  }
428  dict_encoder_base_->ClearIndices();
429  current_page_->header.uncompressed_page_size = len;
430 }
431 
433  int64_t* first_data_page, int64_t* first_dictionary_page) {
434  if (current_page_ == NULL) {
435  // This column/file is empty
436  *first_data_page = *file_pos;
437  *first_dictionary_page = -1;
438  return Status::OK;
439  }
440 
441  FinalizeCurrentPage();
442 
443  *first_dictionary_page = -1;
444  // First write the dictionary page before any of the data pages.
445  if (dict_encoder_base_ != NULL) {
446  *first_dictionary_page = *file_pos;
447  // Write dictionary page header
448  DictionaryPageHeader dict_header;
449  dict_header.num_values = dict_encoder_base_->num_entries();
450  dict_header.encoding = Encoding::PLAIN_DICTIONARY;
451 
452  PageHeader header;
453  header.type = PageType::DICTIONARY_PAGE;
454  header.uncompressed_page_size = dict_encoder_base_->dict_encoded_size();
455  header.__set_dictionary_page_header(dict_header);
456 
457  // Write the dictionary page data, compressing it if necessary.
458  uint8_t* dict_buffer = parent_->per_file_mem_pool_->Allocate(
459  header.uncompressed_page_size);
460  dict_encoder_base_->WriteDict(dict_buffer);
461  if (compressor_.get() != NULL) {
462  SCOPED_TIMER(parent_->parent_->compress_timer());
463  int64_t max_compressed_size =
464  compressor_->MaxOutputLen(header.uncompressed_page_size);
465  DCHECK_GT(max_compressed_size, 0);
466  uint8_t* compressed_data =
467  parent_->per_file_mem_pool_->Allocate(max_compressed_size);
468  header.compressed_page_size = max_compressed_size;
469  compressor_->ProcessBlock32(true, header.uncompressed_page_size, dict_buffer,
470  &header.compressed_page_size, &compressed_data);
471  dict_buffer = compressed_data;
472  // We allocated the output based on the guessed size, return the extra allocated
473  // bytes back to the mem pool.
474  parent_->per_file_mem_pool_->ReturnPartialAllocation(
475  max_compressed_size - header.compressed_page_size);
476  } else {
477  header.compressed_page_size = header.uncompressed_page_size;
478  }
479 
480  uint8_t* header_buffer;
481  uint32_t header_len;
482  RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize(
483  &header, &header_len, &header_buffer));
484  RETURN_IF_ERROR(parent_->Write(header_buffer, header_len));
485  *file_pos += header_len;
486  total_compressed_byte_size_ += header_len;
487  total_uncompressed_byte_size_ += header_len;
488 
489  RETURN_IF_ERROR(parent_->Write(dict_buffer, header.compressed_page_size));
490  *file_pos += header.compressed_page_size;
491  total_compressed_byte_size_ += header.compressed_page_size;
492  total_uncompressed_byte_size_ += header.uncompressed_page_size;
493  }
494 
495  *first_data_page = *file_pos;
496  // Write data pages
497  for (int i = 0; i < num_data_pages_; ++i) {
498  DataPage& page = pages_[i];
499 
500  // Last page might be empty
501  if (page.header.data_page_header.num_values == 0) {
502  DCHECK_EQ(page.header.compressed_page_size, 0);
503  DCHECK_EQ(i, num_data_pages_ - 1);
504  continue;
505  }
506 
507  // Write data page header
508  uint8_t* buffer = NULL;
509  uint32_t len = 0;
511  parent_->thrift_serializer_->Serialize(&page.header, &len, &buffer));
512  RETURN_IF_ERROR(parent_->Write(buffer, len));
513  *file_pos += len;
514 
515  // Write the page data
516  RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size));
517  *file_pos += page.header.compressed_page_size;
518  }
519  return Status::OK;
520 }
521 
523  DCHECK(current_page_ != NULL);
524  if (current_page_->finalized) return;
525 
526  // If the entire page was NULL, encode it as PLAIN since there is no
527  // data anyway. We don't output a useless dictionary page and it works
528  // around a parquet MR bug (see IMPALA-759 for more details).
529  if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN;
530 
531  if (current_encoding_ == Encoding::PLAIN_DICTIONARY) WriteDictDataPage();
532 
533  PageHeader& header = current_page_->header;
534  header.data_page_header.encoding = current_encoding_;
535 
536  // Compute size of definition bits
537  def_levels_->Flush();
538  current_page_->num_def_bytes = sizeof(int32_t) + def_levels_->len();
539  header.uncompressed_page_size += current_page_->num_def_bytes;
540 
541  // At this point we know all the data for the data page. Combine them into one buffer.
542  uint8_t* uncompressed_data = NULL;
543  if (compressor_.get() == NULL) {
544  uncompressed_data =
545  parent_->per_file_mem_pool_->Allocate(header.uncompressed_page_size);
546  } else {
547  // We have compression. Combine into the staging buffer.
548  parent_->compression_staging_buffer_.resize(
549  header.uncompressed_page_size);
550  uncompressed_data = &parent_->compression_staging_buffer_[0];
551  }
552 
553  BufferBuilder buffer(uncompressed_data, header.uncompressed_page_size);
554 
555  // Copy the definition (null) data
556  int num_def_level_bytes = def_levels_->len();
557 
558  buffer.Append(num_def_level_bytes);
559  buffer.Append(def_levels_->buffer(), num_def_level_bytes);
560  // TODO: copy repetition data when we support nested types.
561  buffer.Append(values_buffer_, buffer.capacity() - buffer.size());
562 
563  // Apply compression if necessary
564  if (compressor_.get() == NULL) {
565  current_page_->data = reinterpret_cast<uint8_t*>(uncompressed_data);
566  header.compressed_page_size = header.uncompressed_page_size;
567  } else {
568  SCOPED_TIMER(parent_->parent_->compress_timer());
569  int64_t max_compressed_size =
570  compressor_->MaxOutputLen(header.uncompressed_page_size);
571  DCHECK_GT(max_compressed_size, 0);
572  uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size);
573  header.compressed_page_size = max_compressed_size;
574  compressor_->ProcessBlock32(true, header.uncompressed_page_size, uncompressed_data,
575  &header.compressed_page_size, &compressed_data);
576  current_page_->data = compressed_data;
577 
578  // We allocated the output based on the guessed size, return the extra allocated
579  // bytes back to the mem pool.
580  parent_->per_file_mem_pool_->ReturnPartialAllocation(
581  max_compressed_size - header.compressed_page_size);
582  }
583 
584  // Add the size of the data page header
585  uint8_t* header_buffer;
586  uint32_t header_len = 0;
587  parent_->thrift_serializer_->Serialize(
588  &current_page_->header, &header_len, &header_buffer);
589 
590  current_page_->finalized = true;
591  total_compressed_byte_size_ += header_len + header.compressed_page_size;
592  total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size;
593  parent_->file_size_estimate_ += header_len + header.compressed_page_size;
594  def_levels_->Clear();
595 }
596 
598  if (num_data_pages_ < pages_.size()) {
599  // Reuse an existing page
600  current_page_ = &pages_[num_data_pages_++];
601  current_page_->header.data_page_header.num_values = 0;
602  current_page_->header.compressed_page_size = 0;
603  current_page_->header.uncompressed_page_size = 0;
604  } else {
605  pages_.push_back(DataPage());
606  current_page_ = &pages_[num_data_pages_++];
607 
608  DataPageHeader header;
609  header.num_values = 0;
610  header.definition_level_encoding = Encoding::RLE;
611  header.repetition_level_encoding = Encoding::BIT_PACKED;
612  current_page_->header.__set_data_page_header(header);
613  }
614  current_page_->finalized = false;
615  current_page_->num_non_null = 0;
616 }
617 
619  OutputPartition* output, const HdfsPartitionDescriptor* part_desc,
620  const HdfsTableDescriptor* table_desc, const vector<ExprContext*>& output_expr_ctxs)
621  : HdfsTableWriter(
622  parent, state, output, part_desc, table_desc, output_expr_ctxs),
623  thrift_serializer_(new ThriftSerializer(true)),
624  current_row_group_(NULL),
625  row_count_(0),
626  file_size_limit_(0),
627  reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())),
628  per_file_mem_pool_(new MemPool(parent_->mem_tracker())),
629  row_idx_(0) {
630 }
631 
633 }
634 
636  // Initialize file metadata
638 
639  stringstream created_by;
640  created_by << "impala version " << IMPALA_BUILD_VERSION
641  << " (build " << IMPALA_BUILD_HASH << ")";
642  file_metadata_.__set_created_by(created_by.str());
643 
644  // Default to snappy compressed
645  THdfsCompression::type codec = THdfsCompression::SNAPPY;
646 
647  const TQueryOptions& query_options = state_->query_options();
648  if (query_options.__isset.compression_codec) {
649  codec = query_options.compression_codec;
650  }
651  if (!(codec == THdfsCompression::NONE ||
652  codec == THdfsCompression::GZIP ||
653  codec == THdfsCompression::SNAPPY)) {
654  stringstream ss;
655  ss << "Invalid parquet compression codec " << Codec::GetCodecName(codec);
656  return Status(ss.str());
657  }
658 
659  VLOG_FILE << "Using compression codec: " << codec;
660 
662  // Initialize each column structure.
663  for (int i = 0; i < columns_.size(); ++i) {
664  BaseColumnWriter* writer = NULL;
665  const ColumnType& type = output_expr_ctxs_[i]->root()->type();
666  switch (type.type) {
667  case TYPE_BOOLEAN:
668  writer = new BoolColumnWriter(
669  this, output_expr_ctxs_[i], codec);
670  break;
671  case TYPE_TINYINT:
672  writer = new ColumnWriter<int8_t>(
673  this, output_expr_ctxs_[i], codec);
674  break;
675  case TYPE_SMALLINT:
676  writer = new ColumnWriter<int16_t>(
677  this, output_expr_ctxs_[i], codec);
678  break;
679  case TYPE_INT:
680  writer = new ColumnWriter<int32_t>(
681  this, output_expr_ctxs_[i], codec);
682  break;
683  case TYPE_BIGINT:
684  writer = new ColumnWriter<int64_t>(
685  this, output_expr_ctxs_[i], codec);
686  break;
687  case TYPE_FLOAT:
688  writer = new ColumnWriter<float>(
689  this, output_expr_ctxs_[i], codec);
690  break;
691  case TYPE_DOUBLE:
692  writer = new ColumnWriter<double>(
693  this, output_expr_ctxs_[i], codec);
694  break;
695  case TYPE_TIMESTAMP:
696  writer = new ColumnWriter<TimestampValue>(
697  this, output_expr_ctxs_[i], codec);
698  break;
699  case TYPE_VARCHAR:
700  case TYPE_STRING:
701  case TYPE_CHAR:
702  writer = new ColumnWriter<StringValue>(
703  this, output_expr_ctxs_[i], codec);
704  break;
705  case TYPE_DECIMAL:
706  switch (output_expr_ctxs_[i]->root()->type().GetByteSize()) {
707  case 4:
708  writer = new ColumnWriter<Decimal4Value>(
709  this, output_expr_ctxs_[i], codec);
710  break;
711  case 8:
712  writer = new ColumnWriter<Decimal8Value>(
713  this, output_expr_ctxs_[i], codec);
714  break;
715  case 16:
716  writer = new ColumnWriter<Decimal16Value>(
717  this, output_expr_ctxs_[i], codec);
718  break;
719  default:
720  DCHECK(false);
721  }
722  break;
723  default:
724  DCHECK(false);
725  }
726  columns_[i] = state_->obj_pool()->Add(writer);
727  columns_[i]->Reset();
728  }
730  return Status::OK;
731 }
732 
734  int num_clustering_cols = table_desc_->num_clustering_cols();
735 
736  // Create flattened tree with a single root.
737  file_metadata_.schema.resize(columns_.size() + 1);
738  file_metadata_.schema[0].__set_num_children(columns_.size());
739  file_metadata_.schema[0].name = "schema";
740 
741  for (int i = 0; i < columns_.size(); ++i) {
742  parquet::SchemaElement& node = file_metadata_.schema[i + 1];
743  node.name = table_desc_->col_names()[i + num_clustering_cols];
744  node.__set_type(IMPALA_TO_PARQUET_TYPES[output_expr_ctxs_[i]->root()->type().type]);
745  node.__set_repetition_type(FieldRepetitionType::OPTIONAL);
746  const ColumnType& type = output_expr_ctxs_[i]->root()->type();
747  if (type.type == TYPE_DECIMAL) {
748  // This column is type decimal. Update the file metadata to include the
749  // additional fields:
750  // 1) converted_type: indicate this is really a decimal column.
751  // 2) type_length: the number of bytes used per decimal value in the data
752  // 3) precision/scale
753  node.__set_converted_type(ConvertedType::DECIMAL);
754  node.__set_type_length(
756  node.__set_scale(output_expr_ctxs_[i]->root()->type().scale);
757  node.__set_precision(output_expr_ctxs_[i]->root()->type().precision);
758  } else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR) {
759  node.__set_converted_type(ConvertedType::UTF8);
760  }
761  }
762 
763  return Status::OK;
764 }
765 
768  file_metadata_.row_groups.push_back(RowGroup());
769  current_row_group_ = &file_metadata_.row_groups[file_metadata_.row_groups.size() - 1];
770 
771  // Initialize new row group metadata.
772  int num_clustering_cols = table_desc_->num_clustering_cols();
773  current_row_group_->columns.resize(columns_.size());
774  for (int i = 0; i < columns_.size(); ++i) {
775  ColumnMetaData metadata;
776  metadata.type = IMPALA_TO_PARQUET_TYPES[columns_[i]->expr_ctx_->root()->type().type];
777  // Add all encodings that were used in this file. Currently we use PLAIN and
778  // PLAIN_DICTIONARY for data values and RLE for the definition levels.
779  metadata.encodings.push_back(Encoding::RLE);
780  // Columns are initially dictionary encoded
781  // TODO: we might not have PLAIN encoding in this case
782  metadata.encodings.push_back(Encoding::PLAIN_DICTIONARY);
783  metadata.encodings.push_back(Encoding::PLAIN);
784  metadata.path_in_schema.push_back(table_desc_->col_names()[i + num_clustering_cols]);
785  metadata.codec = columns_[i]->codec();
786  current_row_group_->columns[i].__set_meta_data(metadata);
787  }
788 
789  return Status::OK;
790 }
791 
793  // See file_size_limit_ calculation in InitNewFile().
794  return 3 * DEFAULT_DATA_PAGE_SIZE * columns_.size();
795 }
796 
798  int64_t block_size;
799  if (state_->query_options().__isset.parquet_file_size &&
800  state_->query_options().parquet_file_size > 0) {
801  // If the user specified a value explicitly, use it. InitNewFile() will verify that
802  // the actual file's block size is sufficient.
803  block_size = state_->query_options().parquet_file_size;
804  } else {
805  block_size = HDFS_BLOCK_SIZE;
806  // Blocks are usually HDFS_BLOCK_SIZE bytes, unless there are many columns, in
807  // which case a per-column minimum kicks in.
808  block_size = max(block_size, MinBlockSize());
809  }
810  // HDFS does not like block sizes that are not aligned
811  return BitUtil::RoundUp(block_size, HDFS_BLOCK_ALIGNMENT);
812 }
813 
815  DCHECK(current_row_group_ == NULL);
816 
817  per_file_mem_pool_->Clear();
818 
819  // Get the file limit
822  stringstream ss;
823  ss << "Hdfs file size (" << file_size_limit_ << ") is too small.";
824  return Status(ss.str());
825  }
826 
827  // We want to output HDFS files that are no more than file_size_limit_. If we
828  // go over the limit, HDFS will split the file into multiple blocks which
829  // is undesirable. We are under the limit, we potentially end up with more
830  // files than necessary. Either way, it is not going to generate a invalid
831  // file.
832  // With arbitrary encoding schemes, it is not possible to know if appending
833  // a new row will push us over the limit until after encoding it. Rolling back
834  // a row can be tricky as well so instead we will stop the file when it is
835  // 2 * DEFAULT_DATA_PAGE_SIZE * num_cols short of the limit. e.g. 50 cols with 8K data
836  // pages, means we stop 800KB shy of the limit.
837  // Data pages calculate their size precisely when they are complete so having
838  // a two page buffer guarantees we will never go over (unless there are huge values
839  // that require increasing the page size).
840  // TODO: this should be made dynamic based on the size of rows seen so far.
841  // This would for example, let us account for very long string columns.
842  if (file_size_limit_ < MinBlockSize()) {
843  stringstream ss;
844  ss << "Parquet file size " << file_size_limit_ << " bytes is too small for "
845  << "a table with " << columns_.size() << " columns. Set query option "
846  << "PARQUET_FILE_SIZE to at least " << MinBlockSize() << ".";
847  return Status(ss.str());
848  }
850  DCHECK_GE(file_size_limit_, DEFAULT_DATA_PAGE_SIZE * columns_.size());
851  file_pos_ = 0;
852  row_count_ = 0;
854 
855  file_metadata_.row_groups.clear();
858 
859  return Status::OK;
860 }
861 
863  const vector<int32_t>& row_group_indices, bool* new_file) {
865  *new_file = false;
866  int limit;
867  if (row_group_indices.empty()) {
868  limit = batch->num_rows();
869  } else {
870  limit = row_group_indices.size();
871  }
872 
873  bool all_rows = row_group_indices.empty();
874  for (; row_idx_ < limit;) {
875  TupleRow* current_row = all_rows ?
876  batch->GetRow(row_idx_) : batch->GetRow(row_group_indices[row_idx_]);
877  for (int j = 0; j < columns_.size(); ++j) {
878  RETURN_IF_ERROR(columns_[j]->AppendRow(current_row));
879  }
880  ++row_idx_;
881  ++row_count_;
882  ++output_->num_rows;
883 
885  // This file is full. We need a new file.
886  *new_file = true;
887  return Status::OK;
888  }
889  }
890 
891  // Reset the row_idx_ when we exhaust the batch. We can exit before exhausting
892  // the batch if we run out of file space and will continue from the last index.
893  row_idx_ = 0;
894  return Status::OK;
895 }
896 
899 
900  // At this point we write out the rest of the file. We first update the file
901  // metadata, now that all the values have been seen.
902  file_metadata_.num_rows = row_count_;
905  stats_.__set_parquet_stats(parquet_stats_);
907  return Status::OK;
908 }
909 
911  // Release all accumulated memory
912  for (int i = 0; i < columns_.size(); ++i) {
913  columns_[i]->Close();
914  }
915  reusable_col_mem_pool_->FreeAll();
916  per_file_mem_pool_->FreeAll();
918 }
919 
921  DCHECK_EQ(file_pos_, 0);
925  return Status::OK;
926 }
927 
929  if (current_row_group_ == NULL) return Status::OK;
930 
931  int num_clustering_cols = table_desc_->num_clustering_cols();
932  for (int i = 0; i < columns_.size(); ++i) {
933  int64_t data_page_offset, dict_page_offset;
934  // Flush this column. This updates the final metadata sizes for this column.
935  RETURN_IF_ERROR(columns_[i]->Flush(&file_pos_, &data_page_offset, &dict_page_offset));
936  DCHECK_GT(data_page_offset, 0);
937 
938  current_row_group_->columns[i].meta_data.data_page_offset = data_page_offset;
939  if (dict_page_offset >= 0) {
940  current_row_group_->columns[i].meta_data.__set_dictionary_page_offset(
941  dict_page_offset);
942  }
943 
944  current_row_group_->columns[i].meta_data.num_values = columns_[i]->num_values();
945  current_row_group_->columns[i].meta_data.total_uncompressed_size =
946  columns_[i]->total_uncompressed_size();
947  current_row_group_->columns[i].meta_data.total_compressed_size =
948  columns_[i]->total_compressed_size();
949  current_row_group_->total_byte_size += columns_[i]->total_compressed_size();
950  current_row_group_->num_rows = columns_[i]->num_values();
951  current_row_group_->columns[i].file_offset = file_pos_;
952  const string& col_name = table_desc_->col_names()[i + num_clustering_cols];
953  parquet_stats_.per_column_size[col_name] += columns_[i]->total_compressed_size();
954 
955  // Since we don't supported complex schemas, all columns should have the same
956  // number of values.
957  DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values,
958  columns_[i]->num_values());
959 
960  // Metadata for this column is complete, write it out to file. The column metadata
961  // goes at the end so that when we have collocated files, the column data can be
962  // written without buffering.
963  uint8_t* buffer = NULL;
964  uint32_t len = 0;
966  thrift_serializer_->Serialize(&current_row_group_->columns[i], &len, &buffer));
967  RETURN_IF_ERROR(Write(buffer, len));
968  file_pos_ += len;
969 
970  columns_[i]->Reset();
971  }
972 
973  current_row_group_ = NULL;
974  return Status::OK;
975 }
976 
978  // Write file_meta_data
979  uint32_t file_metadata_len = 0;
980  uint8_t* buffer = NULL;
982  thrift_serializer_->Serialize(&file_metadata_, &file_metadata_len, &buffer));
983  RETURN_IF_ERROR(Write(buffer, file_metadata_len));
984 
985  // Write footer
986  RETURN_IF_ERROR(Write<uint32_t>(file_metadata_len));
988  return Status::OK;
989 }
990 
const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[]
Mapping of Impala codec enums to Parquet enums.
boost::scoped_ptr< ThriftSerializer > thrift_serializer_
boost::scoped_ptr< MemPool > reusable_col_mem_pool_
int64_t file_size_limit_
Limit on the total size of the file.
std::vector< uint8_t > compression_staging_buffer_
static int DecimalSize(const ColumnType &t)
The minimum byte size to store decimals of with precision t.precision.
static Status GetFileBlockSize(OutputPartition *output_partition, int64_t *size)
int num_rows() const
Definition: row-batch.h:215
HdfsTableSink * parent_
Parent table sink object.
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
parquet::FileMetaData file_metadata_
File metdata thrift description.
static const int DEFAULT_DATA_PAGE_SIZE
Default data page size. In bytes.
#define IMPALA_BUILD_HASH
Definition: version.h:22
BaseColumnWriter(HdfsParquetTableWriter *parent, ExprContext *expr_ctx, const THdfsCompression::type &codec)
parquet::RowGroup * current_row_group_
The current row group being written to.
virtual Status Init()
Initialize column information.
virtual void Close()
Called once when this writer should cleanup any resources.
RuntimeState * state_
Runtime state.
int64_t row_count_
Number of rows in current file.
const uint8_t PARQUET_VERSION_NUMBER[4]
void Append(const void *buffer, int len)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static const int HDFS_BLOCK_SIZE
Default hdfs block size. In bytes.
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
Status Write(const char *data, int32_t len)
Write to the current hdfs file.
static const int HDFS_MIN_FILE_SIZE
Minimum file size. If the configured size is less, fail.
static const int HDFS_BLOCK_ALIGNMENT
Align block sizes to this constant. In bytes.
static int ByteSize(const T &v)
Returns the byte size of 'v'.
const HdfsTableDescriptor * table_desc_
Table descriptor of table to be written.
virtual Status Finalize()
Write out all the data.
Utility class to build an in-memory buffer.
#define COUNTER_ADD(c, v)
#define SCOPED_TIMER(c)
virtual bool EncodeValue(void *value, int64_t *bytes_needed)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
static int64_t UnpaddedCharLength(const char *cptr, int64_t len)
Returns number of characters in a char array (ignores trailing spaces)
static std::string GetCodecName(THdfsCompression::type)
Return the name of a compression algorithm.
Definition: codec.cc:50
ColumnWriter(HdfsParquetTableWriter *parent, ExprContext *ctx, const THdfsCompression::type &codec)
static int Encode(uint8_t *buffer, int fixed_len_size, const T &t)
PrimitiveType type
Definition: types.h:60
const parquet::Type::type IMPALA_TO_PARQUET_TYPES[]
const TQueryOptions & query_options() const
Definition: runtime-state.h:95
parquet::CompressionCodec::type codec() const
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
int num_clustering_cols() const
Definition: descriptors.h:153
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
Definition: types.h:178
static const int MAX_DICTIONARY_ENTRIES
HdfsParquetTableWriter(HdfsTableSink *parent, RuntimeState *state, OutputPartition *output_partition, const HdfsPartitionDescriptor *part_desc, const HdfsTableDescriptor *table_desc, const std::vector< ExprContext * > &output_expr_ctxs)
std::vector< ExprContext * > output_expr_ctxs_
Expressions that materialize output values.
const std::vector< std::string > & col_names() const
Definition: descriptors.h:165
int64_t num_rows
Records number of rows appended to the current file in this partition.
Status WriteFileHeader()
Write the file header information to the output file.
std::vector< BaseColumnWriter * > columns_
array of pointers to column information.
boost::scoped_ptr< MemPool > per_file_mem_pool_
const ColumnType & type() const
Definition: expr.h:145
#define UNLIKELY(expr)
Definition: compiler-util.h:33
RuntimeProfile::Counter * hdfs_write_timer()
RuntimeProfile::Counter * rows_inserted_counter()
TInsertStats stats_
Subclass should populate any file format specific stats.
static const Status OK
Definition: status.h:87
RuntimeProfile::Counter * encode_timer()
static char * CharSlotToPtr(void *slot, const ColumnType &type)
virtual uint64_t default_block_size() const
Returns the target HDFS block size to use.
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
virtual Status AppendRowBatch(RowBatch *batch, const std::vector< int32_t > &row_group_indices, bool *new_file)
Appends parquet representation of rows in the batch to the current file.
Status Flush(int64_t *file_pos, int64_t *first_data_page, int64_t *first_dictionary_page)
#define IMPALA_BUILD_VERSION
Definition: version.h:21
const uint32_t PARQUET_CURRENT_VERSION
BoolColumnWriter(HdfsParquetTableWriter *parent, ExprContext *ctx, const THdfsCompression::type &codec)
#define VLOG_FILE
Definition: logging.h:58
Status WriteFileFooter()
Write the file metadata and footer.
TParquetInsertStats parquet_stats_
For each column, the on disk size written.
OutputPartition * output_
Structure describing partition written to by this writer.
virtual bool EncodeValue(void *value, int64_t *bytes_needed)
static int RoundUp(int value, int factor)
Returns 'value' rounded up to the nearest multiple of 'factor'.
Definition: bit-util.h:37
int64_t MinBlockSize() const
Minimum allowable block size in bytes. This is a function of the number of columns.