Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-parquet-scanner.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <limits> // for std::numeric_limits
18 
19 #include <boost/algorithm/string.hpp>
20 #include <gflags/gflags.h>
21 #include <gutil/strings/substitute.h>
22 
23 #include "common/object-pool.h"
24 #include "common/logging.h"
25 #include "exec/hdfs-scan-node.h"
27 #include "exec/read-write-util.h"
28 #include "exprs/expr.h"
29 #include "runtime/descriptors.h"
30 #include "runtime/runtime-state.h"
31 #include "runtime/mem-pool.h"
32 #include "runtime/raw-value.h"
33 #include "runtime/row-batch.h"
34 #include "runtime/tuple-row.h"
35 #include "runtime/tuple.h"
36 #include "runtime/string-value.h"
37 #include "util/bitmap.h"
38 #include "util/bit-util.h"
39 #include "util/decompress.h"
40 #include "util/debug-util.h"
41 #include "util/error-util.h"
42 #include "util/dict-encoding.h"
43 #include "util/rle-encoding.h"
44 #include "util/runtime-profile.h"
45 #include "rpc/thrift-util.h"
46 
47 #include "common/names.h"
48 
49 using boost::algorithm::is_any_of;
50 using boost::algorithm::split;
51 using boost::algorithm::token_compress_on;
52 using namespace impala;
53 using namespace strings;
54 
55 // Provide a workaround for IMPALA-1658.
56 DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
57  "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
58  "be converted from UTC to local time. Writes are unaffected.");
59 
60 // Max data page header size in bytes. This is an estimate and only needs to be an upper
61 // bound. It is theoretically possible to have a page header of any size due to string
62 // value statistics, but in practice we'll have trouble reading string values this large.
63 const int MAX_PAGE_HEADER_SIZE = 8 * 1024 * 1024;
64 
65 // Max dictionary page header size in bytes. This is an estimate and only needs to be an
66 // upper bound.
67 const int MAX_DICT_HEADER_SIZE = 100;
68 
69 #define LOG_OR_ABORT(error_msg, runtime_state) \
70  if (runtime_state->abort_on_error()) { \
71  return Status(error_msg); \
72  } else { \
73  runtime_state->LogError(error_msg); \
74  return Status::OK; \
75  }
76 
77 #define LOG_OR_RETURN_ON_ERROR(error_msg, runtime_state) \
78  if (runtime_state->abort_on_error()) { \
79  return Status(error_msg.msg()); \
80  } \
81  runtime_state->LogError(error_msg);
82 
84  const std::vector<HdfsFileDesc*>& files) {
85  vector<DiskIoMgr::ScanRange*> footer_ranges;
86  for (int i = 0; i < files.size(); ++i) {
87  for (int j = 0; j < files[i]->splits.size(); ++j) {
88  DiskIoMgr::ScanRange* split = files[i]->splits[j];
89 
90  // Since Parquet scanners always read entire files, only read a file if we're
91  // assigned the first split to avoid reading multi-block files with multiple
92  // scanners.
93  // We only process the split that starts at offset 0.
94  if (split->offset() != 0) {
95  // We are expecting each file to be one hdfs block (so all the scan range offsets
96  // should be 0). This is not incorrect but we will issue a warning.
97  scan_node->runtime_state()->LogError(
98  ErrorMsg(TErrorCode::PARQUET_MULTIPLE_BLOCKS, files[i]->filename));
99  // We assign the entire file to one scan range, so mark all but one split
100  // (i.e. the first split) as complete
101  scan_node->RangeComplete(THdfsFileFormat::PARQUET, THdfsCompression::NONE);
102  continue;
103  }
104 
105  // Compute the offset of the file footer
106  DCHECK_GT(files[i]->file_length, 0);
107  int64_t footer_size = min(static_cast<int64_t>(FOOTER_SIZE), files[i]->file_length);
108  int64_t footer_start = files[i]->file_length - footer_size;
109 
110  ScanRangeMetadata* metadata =
111  reinterpret_cast<ScanRangeMetadata*>(split->meta_data());
112  DiskIoMgr::ScanRange* footer_range = scan_node->AllocateScanRange(
113  files[i]->fs, files[i]->filename.c_str(), footer_size, footer_start,
114  metadata->partition_id, split->disk_id(), split->try_cache(),
115  split->expected_local(), files[i]->mtime);
116  footer_ranges.push_back(footer_range);
117  }
118  }
119  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges));
120  return Status::OK;
121 }
122 
123 namespace impala {
124 
126  : HdfsScanner(scan_node, state),
127  metadata_range_(NULL),
128  dictionary_pool_(new MemPool(scan_node->mem_tracker())),
129  assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
130  assemble_rows_timer_.Stop();
131 }
132 
134 }
135 
136 // Reader for a single column from the parquet file. It's associated with a
137 // ScannerContext::Stream and is responsible for decoding the data.
138 // Super class for per-type column readers. This contains most of the logic,
139 // the type specific functions must be implemented in the subclass.
141  public:
142  virtual ~BaseColumnReader() {}
143 
144  // This is called once for each row group in the file.
145  Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) {
146  DCHECK_NOTNULL(stream);
147  DCHECK_NOTNULL(metadata);
148 
150  data_ = NULL;
151  stream_ = stream;
152  metadata_ = metadata;
153  dict_decoder_base_ = NULL;
154  num_values_read_ = 0;
155  if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
157  NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_));
158  }
159  return Status::OK;
160  }
161 
162  // Called once when the scanner is complete for final cleanup.
163  void Close() {
164  if (decompressor_.get() != NULL) decompressor_->Close();
165  }
166 
167  int64_t total_len() const { return metadata_->total_compressed_size; }
168  const SlotDescriptor* slot_desc() const { return node_.slot_desc; }
169  const parquet::SchemaElement& schema_element() const { return *node_.element; }
170  int col_idx() const { return node_.col_idx; }
171  int max_def_level() const { return node_.max_def_level; }
172  THdfsCompression::type codec() const {
173  if (metadata_ == NULL) return THdfsCompression::NONE;
174  return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
175  }
176 
177  // Read the next value into tuple for this column. Returns false if there are no
178  // more values in the file.
179  // *conjuncts_failed is an in/out parameter. If false, it means this row has already
180  // been filtered out (i.e. ReadValue is really a SkipValue()) and should be set to
181  // true if ReadValue() can filter out this row.
182  // TODO: this is the function that needs to be codegen'd (e.g. CodegenReadValue())
183  // The codegened functions from all the materialized cols will then be combined
184  // into one function.
185  // TODO: another option is to materialize col by col for the entire row batch in
186  // one call. e.g. MaterializeCol would write out 1024 values. Our row batches
187  // are currently dense so we'll need to figure out something there.
188  bool ReadValue(MemPool* pool, Tuple* tuple, bool* conjuncts_failed);
189 
190  // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) if
191  // we know this row can be skipped. This could be very useful with stats and big
192  // sections can be skipped. Implement that when we can benefit from it.
193 
194  protected:
195  friend class HdfsParquetScanner;
196 
199 
200  const parquet::ColumnMetaData* metadata_;
201  scoped_ptr<Codec> decompressor_;
203 
204  // Pool to allocate decompression buffers from.
205  boost::scoped_ptr<MemPool> decompressed_data_pool_;
206 
207  // Header for current data page.
208  parquet::PageHeader current_page_header_;
209 
210  // Num values remaining in the current data page
212 
213  // Pointer to start of next value in data page
214  uint8_t* data_;
215 
216  // Decoder for definition. Only one of these is valid at a time, depending on
217  // the data page metadata.
220 
221  // Decoder for dictionary-encoded columns. Set by the subclass.
223 
224  // The number of values seen so far. Updated per data page.
226 
227  // Cache of the bitmap_filter_ (if any) for this slot.
229  // Cache of hash_seed_ to use with bitmap_filter_.
230  uint32_t hash_seed_;
231 
232  // Bitmap filters are optional (i.e. they can be ignored and the results will be
233  // correct). Keep track of stats to determine if the filter is not effective. If
234  // the number of rows filtered out is too low, this is not worth the cost.
235  // TODO: this should be cost based taking into account how much we save when we
236  // filter a row.
237  int64_t rows_returned_;
239 
241  : parent_(parent),
242  node_(node),
243  metadata_(NULL),
244  stream_(NULL),
245  decompressed_data_pool_(new MemPool(parent->scan_node_->mem_tracker())),
247  num_values_read_(0) {
248  DCHECK_NOTNULL(node.slot_desc);
249  DCHECK_GE(node.col_idx, 0);
250  DCHECK_GE(node.max_def_level, 0);
251 
253  bitmap_filter_ = state->GetBitmapFilter(slot_desc()->id());
254  hash_seed_ = state->fragment_hash_seed();
255  rows_returned_ = 0;
257  }
258 
259  // Read the next data page. If a dictionary page is encountered, that will
260  // be read and this function will continue reading for the next data page.
262 
263  // Returns the definition level for the next value
264  // Returns -1 if there was a error parsing it.
265  int ReadDefinitionLevel();
266 
267  // Creates a dictionary decoder from values/size. Subclass must implement this
268  // and set dict_decoder_base_.
269  virtual void CreateDictionaryDecoder(uint8_t* values, int size) = 0;
270 
271  // Initializes the reader with the data contents. This is the content for
272  // the entire decompressed data page. Decoders can initialize state from
273  // here.
274  virtual Status InitDataPage(uint8_t* data, int size) = 0;
275 
276  // Writes the next value into *slot using pool if necessary.
277  // Returns false if there was an error.
278  // Subclass must implement this.
279  // TODO: we need to remove this with codegen.
280  virtual bool ReadSlot(void* slot, MemPool* pool, bool* conjuncts_failed) = 0;
281 };
282 
283 // Per column type reader.
284 template<typename T>
286  public:
288  : BaseColumnReader(parent, node) {
289  DCHECK_NE(slot_desc()->type().type, TYPE_BOOLEAN);
290  if (slot_desc()->type().type == TYPE_DECIMAL) {
292  } else if (slot_desc()->type().type == TYPE_VARCHAR) {
294  } else {
295  fixed_len_size_ = -1;
296  }
298  // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
299  // values. Currently all versions have converted values.
300  (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
301  slot_desc()->type().type == TYPE_TIMESTAMP &&
302  parent->file_version_.application == "parquet-mr");
303  }
304 
305  protected:
306  virtual void CreateDictionaryDecoder(uint8_t* values, int size) {
307  dict_decoder_.reset(new DictDecoder<T>(values, size, fixed_len_size_));
309  }
310 
311  virtual Status InitDataPage(uint8_t* data, int size) {
312  if (current_page_header_.data_page_header.encoding ==
313  parquet::Encoding::PLAIN_DICTIONARY) {
314  if (dict_decoder_.get() == NULL) {
315  return Status("File corrupt. Missing dictionary page.");
316  }
317  dict_decoder_->SetData(data, size);
318  }
319 
320  // Check if we should disable the bitmap filter. We'll do this if the filter
321  // is not removing a lot of rows.
322  // TODO: how to pick the selectivity?
323  if (bitmap_filter_ != NULL && rows_returned_ > 10000 &&
325  bitmap_filter_ = NULL;
326  }
327  return Status::OK;
328  }
329 
330  virtual bool ReadSlot(void* slot, MemPool* pool, bool* conjuncts_failed) {
331  parquet::Encoding::type page_encoding =
332  current_page_header_.data_page_header.encoding;
333  bool result = true;
334  T val;
335  T* val_ptr = needs_conversion_ ? &val : reinterpret_cast<T*>(slot);
336  if (page_encoding == parquet::Encoding::PLAIN_DICTIONARY) {
337  result = dict_decoder_->GetValue(val_ptr);
338  } else {
339  DCHECK(page_encoding == parquet::Encoding::PLAIN);
340  data_ += ParquetPlainEncoder::Decode<T>(data_, fixed_len_size_, val_ptr);
341  }
342  if (needs_conversion_) ConvertSlot(&val, reinterpret_cast<T*>(slot), pool);
343  ++rows_returned_;
344  if (!*conjuncts_failed && bitmap_filter_ != NULL) {
345  uint32_t h = RawValue::GetHashValue(slot, slot_desc()->type(), hash_seed_);
346  *conjuncts_failed = !bitmap_filter_->Get<true>(h);
348  }
349  return result;
350  }
351 
352  private:
353  void CopySlot(T* slot, MemPool* pool) {
354  // no-op for non-string columns.
355  }
356 
357  // Converts and writes src into dst based on desc_->type()
358  void ConvertSlot(const T* src, T* dst, MemPool* pool) {
359  DCHECK(false);
360  }
361 
362  scoped_ptr<DictDecoder<T> > dict_decoder_;
363 
364  // true decoded values must be converted before being written to an output tuple
366 
367  // The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
368  // the max length for VARCHAR columns. Unused otherwise.
370 };
371 
372 template<>
374  StringValue* slot, MemPool* pool) {
375  if (slot->len == 0) return;
376  uint8_t* buffer = pool->Allocate(slot->len);
377  memcpy(buffer, slot->ptr, slot->len);
378  slot->ptr = reinterpret_cast<char*>(buffer);
379 }
380 
381 template<>
383  const StringValue* src, StringValue* dst, MemPool* pool) {
384  DCHECK(slot_desc()->type().type == TYPE_CHAR);
385  int len = slot_desc()->type().len;
386  StringValue sv;
387  sv.len = len;
388  if (slot_desc()->type().IsVarLen()) {
389  sv.ptr = reinterpret_cast<char*>(pool->Allocate(len));
390  } else {
391  sv.ptr = reinterpret_cast<char*>(dst);
392  }
393  int unpadded_len = min(len, src->len);
394  memcpy(sv.ptr, src->ptr, unpadded_len);
395  StringValue::PadWithSpaces(sv.ptr, len, unpadded_len);
396 
397  if (slot_desc()->type().IsVarLen()) *dst = sv;
398 }
399 
400 template<>
402  const TimestampValue* src, TimestampValue* dst, MemPool* pool) {
403  // Conversion should only happen when this flag is enabled.
404  DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
405  *dst = *src;
406  if (dst->HasDateAndTime()) dst->UtcToLocal();
407 }
408 
410  public:
412  : BaseColumnReader(parent, node) {
413  DCHECK_EQ(slot_desc()->type().type, TYPE_BOOLEAN);
414  }
415 
416  protected:
417  virtual void CreateDictionaryDecoder(uint8_t* values, int size) {
418  DCHECK(false) << "Dictionary encoding is not supported for bools. Should never "
419  << "have gotten this far.";
420  }
421 
422  virtual Status InitDataPage(uint8_t* data, int size) {
423  // Initialize bool decoder
424  bool_values_ = BitReader(data, size);
425  return Status::OK;
426  }
427 
428  virtual bool ReadSlot(void* slot, MemPool* pool, bool* conjuncts_failed) {
429  bool valid = bool_values_.GetValue(1, reinterpret_cast<bool*>(slot));
430  if (!valid) parent_->parse_status_ = Status("Invalid bool column.");
431  return valid;
432  }
433 
434  private:
436 };
437 
438 }
439 
440 Status HdfsParquetScanner::Prepare(ScannerContext* context) {
443  ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
444 
446  return Status::OK;
447 }
448 
450  vector<THdfsCompression::type> compression_types;
451  for (int i = 0; i < column_readers_.size(); ++i) {
452  if (column_readers_[i]->decompressed_data_pool_.get() != NULL) {
453  // No need to commit the row batches with the AttachPool() calls
454  // since AddFinalRowBatch() already does below.
455  AttachPool(column_readers_[i]->decompressed_data_pool_.get(), false);
456  }
457  column_readers_[i]->Close();
458  compression_types.push_back(column_readers_[i]->codec());
459  }
460  AttachPool(dictionary_pool_.get(), false);
462 
463  // If this was a metadata only read (i.e. count(*)), there are no columns.
464  if (compression_types.empty()) compression_types.push_back(THdfsCompression::NONE);
465  scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types);
466  assemble_rows_timer_.Stop();
467  assemble_rows_timer_.ReleaseCounter();
468 
470 }
471 
473  const SchemaNode& node) {
474  BaseColumnReader* reader = NULL;
475  switch (node.slot_desc->type().type) {
476  case TYPE_BOOLEAN:
477  reader = new BoolColumnReader(this, node);
478  break;
479  case TYPE_TINYINT:
480  reader = new ColumnReader<int8_t>(this, node);
481  break;
482  case TYPE_SMALLINT:
483  reader = new ColumnReader<int16_t>(this, node);
484  break;
485  case TYPE_INT:
486  reader = new ColumnReader<int32_t>(this, node);
487  break;
488  case TYPE_BIGINT:
489  reader = new ColumnReader<int64_t>(this, node);
490  break;
491  case TYPE_FLOAT:
492  reader = new ColumnReader<float>(this, node);
493  break;
494  case TYPE_DOUBLE:
495  reader = new ColumnReader<double>(this, node);
496  break;
497  case TYPE_TIMESTAMP:
498  reader = new ColumnReader<TimestampValue>(this, node);
499  break;
500  case TYPE_STRING:
501  case TYPE_VARCHAR:
502  case TYPE_CHAR:
503  reader = new ColumnReader<StringValue>(this, node);
504  break;
505  case TYPE_DECIMAL:
506  switch (node.slot_desc->type().GetByteSize()) {
507  case 4:
508  reader = new ColumnReader<Decimal4Value>(this, node);
509  break;
510  case 8:
511  reader = new ColumnReader<Decimal8Value>(this, node);
512  break;
513  case 16:
514  reader = new ColumnReader<Decimal16Value>(this, node);
515  break;
516  }
517  break;
518  default:
519  DCHECK(false);
520  }
521  return scan_node_->runtime_state()->obj_pool()->Add(reader);
522 }
523 
524 // In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
525 // if this matches those versions and compatibility workarounds need to be used.
528  if (v.application != "impala") return false;
529  return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
530 }
531 
533  Status status;
534  uint8_t* buffer;
535 
536  // We're about to move to the next data page. The previous data page is
537  // now complete, pass along the memory allocated for it.
539 
540  // Read the next data page, skipping page types we don't care about.
541  // We break out of this loop on the non-error case (a data page was found or we read all
542  // the pages).
543  while (true) {
544  DCHECK_EQ(num_buffered_values_, 0);
545  if (num_values_read_ >= metadata_->num_values) {
546  // No more pages to read
547  DCHECK_EQ(num_values_read_, metadata_->num_values);
548  break;
549  }
550 
551  int64_t buffer_size;
552  RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
553  if (buffer_size == 0) {
554  DCHECK(stream_->eosr());
555  ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
556  metadata_->num_values, num_values_read_,
557  slot_desc()->col_pos() - parent_->scan_node_->num_partition_keys());
559  }
560 
561  // We don't know the actual header size until the thrift object is deserialized. Loop
562  // until we successfully deserialize the header or exceed the maximum header size.
563  uint32_t header_size;
564  while (true) {
565  header_size = buffer_size;
566  status = DeserializeThriftMsg(
567  buffer, &header_size, true, &current_page_header_);
568  if (status.ok()) break;
569 
570  if (buffer_size >= MAX_PAGE_HEADER_SIZE) {
571  stringstream ss;
572  ss << "ParquetScanner: could not read data page because page header exceeded "
573  << "maximum size of "
574  << PrettyPrinter::Print(MAX_PAGE_HEADER_SIZE, TUnit::BYTES);
575  status.AddDetail(ss.str());
576  return status;
577  }
578 
579  // Didn't read entire header, increase buffer size and try again
580  Status status;
581  int64_t new_buffer_size = max(buffer_size * 2, 1024L);
582  bool success = stream_->GetBytes(
583  new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
584  if (!success) {
585  DCHECK(!status.ok());
586  return status;
587  }
588  DCHECK(status.ok());
589 
590  if (buffer_size == new_buffer_size) {
591  DCHECK_NE(new_buffer_size, 0);
592  ErrorMsg msg(TErrorCode::PARQUET_HEADER_EOF);
594  }
595  DCHECK_GT(new_buffer_size, buffer_size);
596  buffer_size = new_buffer_size;
597  }
598 
599  // Successfully deserialized current_page_header_
600  if (!stream_->SkipBytes(header_size, &status)) return status;
601 
602  int data_size = current_page_header_.compressed_page_size;
603  int uncompressed_size = current_page_header_.uncompressed_page_size;
604 
605  if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
606  if (dict_decoder_base_ != NULL) {
607  return Status("Column chunk should not contain two dictionary pages.");
608  }
609  if (slot_desc()->type().type == TYPE_BOOLEAN) {
610  return Status("Unexpected dictionary page. Dictionary page is not"
611  " supported for booleans.");
612  }
613  const parquet::DictionaryPageHeader* dict_header = NULL;
614  if (current_page_header_.__isset.dictionary_page_header) {
615  dict_header = &current_page_header_.dictionary_page_header;
616  } else {
618  return Status("Dictionary page does not have dictionary header set.");
619  }
620  }
621  if (dict_header != NULL &&
622  dict_header->encoding != parquet::Encoding::PLAIN &&
623  dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
624  return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
625  "for dictionary pages.");
626  }
627 
628  if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
629 
630  uint8_t* dict_values = NULL;
631  if (decompressor_.get() != NULL) {
632  dict_values = parent_->dictionary_pool_->Allocate(uncompressed_size);
633  RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
634  &uncompressed_size, &dict_values));
635  VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
636  data_size = uncompressed_size;
637  } else {
638  DCHECK_EQ(data_size, current_page_header_.uncompressed_page_size);
639  // Copy dictionary from io buffer (which will be recycled as we read
640  // more data) to a new buffer
641  dict_values = parent_->dictionary_pool_->Allocate(data_size);
642  memcpy(dict_values, data_, data_size);
643  }
644 
645  CreateDictionaryDecoder(dict_values, data_size);
646  if (dict_header != NULL &&
647  dict_header->num_values != dict_decoder_base_->num_entries()) {
648  return Status(Substitute(
649  "Invalid dictionary. Expected $0 entries but data contained $1 entries",
650  dict_header->num_values, dict_decoder_base_->num_entries()));
651  }
652  // Done with dictionary page, read next page
653  continue;
654  }
655 
656  if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
657  // We can safely skip non-data pages
658  if (!stream_->SkipBytes(data_size, &status)) return status;
659  continue;
660  }
661 
662  // Read Data Page
663  if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
664  num_buffered_values_ = current_page_header_.data_page_header.num_values;
666 
667  if (decompressor_.get() != NULL) {
669  uint8_t* decompressed_buffer = decompressed_data_pool_->Allocate(uncompressed_size);
670  RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
671  current_page_header_.compressed_page_size, data_, &uncompressed_size,
672  &decompressed_buffer));
673  VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
674  << " to " << uncompressed_size;
675  DCHECK_EQ(current_page_header_.uncompressed_page_size, uncompressed_size);
676  data_ = decompressed_buffer;
677  data_size = current_page_header_.uncompressed_page_size;
678  } else {
679  DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
680  DCHECK_EQ(current_page_header_.compressed_page_size, uncompressed_size);
681  }
682 
683  if (max_def_level() > 0) {
684  // Initialize the definition level data
685  int32_t num_definition_bytes = 0;
686  switch (current_page_header_.data_page_header.definition_level_encoding) {
687  case parquet::Encoding::RLE: {
688  if (!ReadWriteUtil::Read(&data_, &data_size, &num_definition_bytes, &status)) {
689  return status;
690  }
691  int bit_width = BitUtil::Log2(max_def_level() + 1);
692  rle_def_levels_ = RleDecoder(data_, num_definition_bytes, bit_width);
693  break;
694  }
695  case parquet::Encoding::BIT_PACKED:
696  num_definition_bytes = BitUtil::Ceil(num_buffered_values_, 8);
697  bit_packed_def_levels_ = BitReader(data_, num_definition_bytes);
698  break;
699  default: {
700  stringstream ss;
701  ss << "Unsupported definition level encoding: "
702  << current_page_header_.data_page_header.definition_level_encoding;
703  return Status(ss.str());
704  }
705  }
706  DCHECK_GT(num_definition_bytes, 0);
707  data_ += num_definition_bytes;
708  data_size -= num_definition_bytes;
709  }
710 
711  // Data can be empty if the column contains all NULLs
712  if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size));
713  break;
714  }
715 
716  return Status::OK;
717 }
718 
719 // TODO More codegen here as well.
721  if (max_def_level() == 0) {
722  // This column and any containing structs are required so there is nothing encoded for
723  // the definition levels.
724  return 1;
725  }
726 
727  uint8_t definition_level;
728  bool valid = false;
729  switch (current_page_header_.data_page_header.definition_level_encoding) {
730  case parquet::Encoding::RLE:
731  valid = rle_def_levels_.Get(&definition_level);
732  break;
733  case parquet::Encoding::BIT_PACKED: {
734  valid = bit_packed_def_levels_.GetValue(1, &definition_level);
735  break;
736  }
737  default:
738  DCHECK(false);
739  }
740  if (!valid) return -1;
741  return definition_level;
742 }
743 
745  MemPool* pool, Tuple* tuple, bool* conjuncts_failed) {
746  if (num_buffered_values_ == 0) {
747  parent_->assemble_rows_timer_.Stop();
748  parent_->parse_status_ = ReadDataPage();
749  // We don't return Status objects as parameters because they are too
750  // expensive for per row/per col calls. If ReadDataPage failed,
751  // return false to indicate this column reader is done.
752  if (num_buffered_values_ == 0 || !parent_->parse_status_.ok()) return false;
753  parent_->assemble_rows_timer_.Start();
754  }
755 
756  --num_buffered_values_;
757  int definition_level = ReadDefinitionLevel();
758  if (definition_level < 0) return false;
759 
760  if (definition_level != max_def_level()) {
761  // Null value
762  DCHECK_LT(definition_level, max_def_level());
763  tuple->SetNull(slot_desc()->null_indicator_offset());
764  return true;
765  }
766  return ReadSlot(tuple->GetSlot(slot_desc()->tuple_offset()), pool, conjuncts_failed);
767 }
768 
770  // First process the file metadata in the footer
771  bool eosr;
773  if (eosr) return Status::OK;
774 
775  // We've processed the metadata and there are columns that need to be materialized.
777  COUNTER_SET(num_cols_counter_, static_cast<int64_t>(column_readers_.size()));
778 
779  // The scanner-wide stream was used only to read the file footer. Each column has added
780  // its own stream.
781  stream_ = NULL;
782 
783  // Iterate through each row group in the file and read all the materialized columns
784  // per row group. Row groups are independent, so this this could be parallelized.
785  // However, having multiple row groups per file should be seen as an edge case and
786  // we can do better parallelizing across files instead.
787  // TODO: not really an edge case since MR writes multiple row groups
788  for (int i = 0; i < file_metadata_.row_groups.size(); ++i) {
789  // Attach any resources and clear the streams before starting a new row group. These
790  // streams could either be just the footer stream or streams for the previous row
791  // group.
792  context_->ReleaseCompletedResources(batch_, /* done */ true);
793  // Commit the rows to flush the row batch from the previous row group
794  CommitRows(0);
795 
798  }
799 
800  return Status::OK;
801 }
802 
803 // TODO: this needs to be codegen'd. The ReadValue function needs to be codegen'd,
804 // specific to type and encoding and then inlined into AssembleRows().
806  assemble_rows_timer_.Start();
807  // Read at most as many rows as stated in the metadata
808  int64_t expected_rows_in_group = file_metadata_.row_groups[row_group_idx].num_rows;
809  int64_t rows_read = 0;
810  bool reached_limit = scan_node_->ReachedLimit();
811  bool cancelled = context_->cancelled();
812  int num_column_readers = column_readers_.size();
813  MemPool* pool;
814 
815  while (!reached_limit && !cancelled && rows_read < expected_rows_in_group) {
816  Tuple* tuple;
817  TupleRow* row;
818  int64_t row_mem_limit = static_cast<int64_t>(GetMemory(&pool, &tuple, &row));
819  int64_t expected_rows_to_read = expected_rows_in_group - rows_read;
820  int64_t num_rows = std::min(expected_rows_to_read, row_mem_limit);
821 
822  int num_to_commit = 0;
823  if (num_column_readers > 0) {
824  for (int i = 0; i < num_rows; ++i) {
825  bool conjuncts_failed = false;
826  InitTuple(template_tuple_, tuple);
827  for (int c = 0; c < num_column_readers; ++c) {
828  if (!column_readers_[c]->ReadValue(pool, tuple, &conjuncts_failed)) {
829  assemble_rows_timer_.Stop();
830  // This column is complete and has no more data. This indicates
831  // we are done with this row group.
832  // For correctly formed files, this should be the first column we
833  // are reading.
834  DCHECK(c == 0 || !parse_status_.ok())
835  << "c=" << c << " " << parse_status_.GetDetail();;
837  RETURN_IF_ERROR(CommitRows(num_to_commit));
838 
839  // If we reach this point, it means that we reached the end of file for
840  // this column. Test if the expected number of rows from metadata matches
841  // the actual number of rows in the file.
842  rows_read += i;
843  if (rows_read != expected_rows_in_group) {
845  DCHECK_NOTNULL(reader->stream_);
846 
847  ErrorMsg msg(TErrorCode::PARQUET_GROUP_ROW_COUNT_ERROR,
848  reader->stream_->filename(), row_group_idx,
849  expected_rows_in_group, rows_read);
851  }
852  return parse_status_;
853  }
854  }
855  if (conjuncts_failed) continue;
856  row->SetTuple(scan_node_->tuple_idx(), tuple);
857  if (EvalConjuncts(row)) {
858  row = next_row(row);
859  tuple = next_tuple(tuple);
860  ++num_to_commit;
861  }
862  }
863  } else {
864  // Special case when there is no data for the accessed column(s) in the file.
865  // This can happen, for example, due to schema evolution (alter table add column).
866  // Since all the tuples are same, evaluating conjuncts only for the first tuple.
867  DCHECK_GT(num_rows, 0);
868  InitTuple(template_tuple_, tuple);
869  row->SetTuple(scan_node_->tuple_idx(), tuple);
870  if (EvalConjuncts(row)) {
871  row = next_row(row);
872  tuple = next_tuple(tuple);
873 
874  for (int i = 1; i < num_rows; ++i) {
875  InitTuple(template_tuple_, tuple);
876  row->SetTuple(scan_node_->tuple_idx(), tuple);
877  row = next_row(row);
878  tuple = next_tuple(tuple);
879  }
880  num_to_commit += num_rows;
881  }
882  }
883  rows_read += num_rows;
885  RETURN_IF_ERROR(CommitRows(num_to_commit));
886 
887  reached_limit = scan_node_->ReachedLimit();
888  cancelled = context_->cancelled();
889  }
890 
891  if (!reached_limit && !cancelled && (num_column_readers > 0)) {
892  // If we get to this point, it means that we have read as many rows as the metadata
893  // told us we should read. Attempt to read one more row and if that succeeds report
894  // the error.
895  DCHECK_EQ(rows_read, expected_rows_in_group);
896  uint8_t dummy_tuple_mem[tuple_byte_size_];
897  Tuple* dummy_tuple = reinterpret_cast<Tuple*>(&dummy_tuple_mem);
898  InitTuple(template_tuple_, dummy_tuple);
899  bool conjuncts_failed = false;
900  if (column_readers_[0]->ReadValue(pool, dummy_tuple, &conjuncts_failed)) {
901  // If another tuple is successfully read, it means that there are still values
902  // in the file.
904  DCHECK_NOTNULL(reader->stream_);
905  ErrorMsg msg(TErrorCode::PARQUET_GROUP_ROW_COUNT_OVERFLOW,
906  reader->stream_->filename(), row_group_idx,
907  expected_rows_in_group);
909  }
910  }
911 
912  assemble_rows_timer_.Stop();
913  return parse_status_;
914 }
915 
917  *eosr = false;
918  uint8_t* buffer;
919  int64_t len;
920 
921  RETURN_IF_ERROR(stream_->GetBuffer(false, &buffer, &len));
922  DCHECK(stream_->eosr());
923 
924  // Number of bytes in buffer after the fixed size footer is accounted for.
925  int remaining_bytes_buffered = len - sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER);
926 
927  // Make sure footer has enough bytes to contain the required information.
928  if (remaining_bytes_buffered < 0) {
929  return Status(Substitute("File $0 is invalid. Missing metadata.",
930  stream_->filename()));
931  }
932 
933  // Validate magic file bytes are correct
934  uint8_t* magic_number_ptr = buffer + len - sizeof(PARQUET_VERSION_NUMBER);
935  if (memcmp(magic_number_ptr,
937  return Status(Substitute("File $0 is invalid. Invalid file footer: $1",
938  stream_->filename(),
939  string((char*)magic_number_ptr, sizeof(PARQUET_VERSION_NUMBER))));
940  }
941 
942  // The size of the metadata is encoded as a 4 byte little endian value before
943  // the magic number
944  uint8_t* metadata_size_ptr = magic_number_ptr - sizeof(int32_t);
945  uint32_t metadata_size = *reinterpret_cast<uint32_t*>(metadata_size_ptr);
946  uint8_t* metadata_ptr = metadata_size_ptr - metadata_size;
947  // If the metadata was too big, we need to stitch it before deserializing it.
948  // In that case, we stitch the data in this buffer.
949  vector<uint8_t> metadata_buffer;
951 
952  if (UNLIKELY(metadata_size > remaining_bytes_buffered)) {
953  // In this case, the metadata is bigger than our guess meaning there are
954  // not enough bytes in the footer range from IssueInitialRanges().
955  // We'll just issue more ranges to the IoMgr that is the actual footer.
956  const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(metadata_range_->file());
957  DCHECK_NOTNULL(file_desc);
958  // The start of the metadata is:
959  // file_length - 4-byte metadata size - footer-size - metadata size
960  int64_t metadata_start = file_desc->file_length -
961  sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER) - metadata_size;
962  int64_t metadata_bytes_to_read = metadata_size;
963  if (metadata_start < 0) {
964  return Status(Substitute("File $0 is invalid. Invalid metadata size in file "
965  "footer: $1 bytes. File size: $2 bytes.", stream_->filename(), metadata_size,
966  file_desc->file_length));
967  }
968  // IoMgr can only do a fixed size Read(). The metadata could be larger
969  // so we stitch it here.
970  // TODO: consider moving this stitching into the scanner context. The scanner
971  // context usually handles the stitching but no other scanner need this logic
972  // now.
973  metadata_buffer.resize(metadata_size);
974  metadata_ptr = &metadata_buffer[0];
975  int64_t copy_offset = 0;
976  DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
977 
978  while (metadata_bytes_to_read > 0) {
979  int64_t to_read = ::min(static_cast<int64_t>(io_mgr->max_read_buffer_size()),
980  metadata_bytes_to_read);
982  metadata_range_->fs(), metadata_range_->file(), to_read,
983  metadata_start + copy_offset, -1, metadata_range_->disk_id(),
985  file_desc->mtime);
986 
987  DiskIoMgr::BufferDescriptor* io_buffer = NULL;
988  RETURN_IF_ERROR(io_mgr->Read(scan_node_->reader_context(), range, &io_buffer));
989  memcpy(metadata_ptr + copy_offset, io_buffer->buffer(), io_buffer->len());
990  io_buffer->Return();
991 
992  metadata_bytes_to_read -= to_read;
993  copy_offset += to_read;
994  }
995  DCHECK_EQ(metadata_bytes_to_read, 0);
996  }
997  // Deserialize file header
998  // TODO: this takes ~7ms for a 1000-column table, figure out how to reduce this.
999  Status status =
1000  DeserializeThriftMsg(metadata_ptr, &metadata_size, true, &file_metadata_);
1001  if (!status.ok()) {
1002  return Status(Substitute("File $0 has invalid file metadata at file offset $1. "
1003  "Error = $2.", stream_->filename(),
1004  metadata_size + sizeof(PARQUET_VERSION_NUMBER) + sizeof(uint32_t),
1005  status.GetDetail()));
1006  }
1007 
1009 
1010  // Tell the scan node this file has been taken care of.
1013 
1014  // Parse file schema
1016 
1017  if (scan_node_->materialized_slots().empty()) {
1018  // No materialized columns. We can serve this query from just the metadata. We
1019  // don't need to read the column data.
1020  int64_t num_tuples = file_metadata_.num_rows;
1021  COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples);
1022 
1023  while (num_tuples > 0) {
1024  MemPool* pool;
1025  Tuple* tuple;
1026  TupleRow* current_row;
1027  int max_tuples = GetMemory(&pool, &tuple, &current_row);
1028  max_tuples = min(static_cast<int64_t>(max_tuples), num_tuples);
1029  num_tuples -= max_tuples;
1030 
1031  int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
1032  RETURN_IF_ERROR(CommitRows(num_to_commit));
1033  }
1034 
1035  *eosr = true;
1036  return Status::OK;
1037  } else if (file_metadata_.num_rows == 0) {
1038  // Empty file
1039  *eosr = true;
1040  return Status::OK;
1041  }
1042 
1043  if (file_metadata_.row_groups.empty()) {
1044  return Status(Substitute("Invalid file. This file: $0 has no row groups",
1045  stream_->filename()));
1046  }
1047  return Status::OK;
1048 }
1049 
1051  DCHECK(column_readers_.empty());
1052  for (int i = 0; i < scan_node_->materialized_slots().size(); ++i) {
1053  SlotDescriptor* slot_desc = scan_node_->materialized_slots()[i];
1054  const vector<int>& path = slot_desc->col_path();
1055  SchemaNode* node = &schema_;
1056  // Traverse path and resolve node to this slot's SchemaNode, or NULL if this slot
1057  // doesn't exist in this file's schema
1058  for (int j = 0; j < path.size(); ++j) {
1059  int idx = j > 0 ? path[j] : path[j] - scan_node_->num_partition_keys();
1060  if (node->children.size() <= idx) {
1061  // The selected column is not in the file
1062  VLOG_FILE << Substitute("File $0 does not contain path $1",
1063  stream_->filename(), PrintPath(path));
1064  node = NULL;
1065  break;
1066  }
1067  node = &node->children[idx];
1068  }
1069 
1070  if (node != NULL && node->children.size() > 0) {
1071  string error = Substitute("Path $0 is not a supported type in file $1",
1072  PrintPath(path), stream_->filename());
1073  VLOG_QUERY << error << endl << schema_.DebugString();
1074  return Status(error);
1075  }
1076 
1077  if (node == NULL) {
1078  // In this case, we are selecting a column that is not in the file.
1079  // Update the template tuple to put a NULL in this slot.
1080  if (template_tuple_ == NULL) {
1082  }
1084  continue;
1085  }
1086  node->slot_desc = slot_desc;
1087 
1088  column_readers_.push_back(CreateReader(*node));
1089  }
1090  return Status::OK;
1091 }
1092 
1094  const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(metadata_range_->file());
1095  DCHECK_NOTNULL(file_desc);
1096  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
1097 
1098  // All the scan ranges (one for each column).
1099  vector<DiskIoMgr::ScanRange*> col_ranges;
1100 
1101  for (int i = 0; i < column_readers_.size(); ++i) {
1102  const parquet::ColumnChunk& col_chunk =
1103  row_group.columns[column_readers_[i]->col_idx()];
1104  int64_t col_start = col_chunk.meta_data.data_page_offset;
1105  RETURN_IF_ERROR(ValidateColumn(*column_readers_[i], row_group_idx));
1106 
1107  // If there is a dictionary page, the file format requires it to come before
1108  // any data pages. We need to start reading the column from the data page.
1109  if (col_chunk.meta_data.__isset.dictionary_page_offset) {
1110  if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
1111  stringstream ss;
1112  ss << "File " << file_desc->filename << ": metadata is corrupt. "
1113  << "Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset
1114  << ") must come before any data pages (offset=" << col_start << ").";
1115  return Status(ss.str());
1116  }
1117  col_start = col_chunk.meta_data.dictionary_page_offset;
1118  }
1119  int64_t col_len = col_chunk.meta_data.total_compressed_size;
1120  int64_t col_end = col_start + col_len;
1121  if (col_end <= 0 || col_end > file_desc->file_length) {
1122  stringstream ss;
1123  ss << "File " << file_desc->filename << ": metadata is corrupt. "
1124  << "Column " << column_readers_[i]->col_idx() << " has invalid column offsets "
1125  << "(offset=" << col_start << ", size=" << col_len << ", "
1126  << "file_size=" << file_desc->file_length << ").";
1127  return Status(ss.str());
1128  }
1129  if (file_version_.application == "parquet-mr" && file_version_.VersionLt(1, 2, 9)) {
1130  // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
1131  // dictionary page header size in total_compressed_size and total_uncompressed_size
1132  // (see IMPALA-694). We pad col_len to compensate.
1133  int64_t bytes_remaining = file_desc->file_length - col_end;
1134  int64_t pad = min(static_cast<int64_t>(MAX_DICT_HEADER_SIZE), bytes_remaining);
1135  col_len += pad;
1136  }
1137 
1138  // TODO: this will need to change when we have co-located files and the columns
1139  // are different files.
1140  if (!col_chunk.file_path.empty()) {
1141  DCHECK_EQ(col_chunk.file_path, string(metadata_range_->file()));
1142  }
1143 
1145  metadata_range_->fs(), metadata_range_->file(), col_len, col_start,
1146  column_readers_[i]->col_idx(), metadata_range_->disk_id(),
1148  file_desc->mtime);
1149  col_ranges.push_back(col_range);
1150 
1151  // Get the stream that will be used for this column
1152  ScannerContext::Stream* stream = context_->AddStream(col_range);
1153  DCHECK(stream != NULL);
1154 
1155  RETURN_IF_ERROR(column_readers_[i]->Reset(&col_chunk.meta_data, stream));
1156 
1157  if (!scan_node_->materialized_slots()[i]->type().IsStringType() ||
1158  col_chunk.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
1159  // Non-string types are always compact. Compressed columns don't reference data
1160  // in the io buffers after tuple materialization. In both cases, we can set compact
1161  // to true and recycle buffers more promptly.
1162  stream->set_contains_tuple_data(false);
1163  }
1164  }
1165  DCHECK_EQ(col_ranges.size(), column_readers_.size());
1166  DCHECK_GE(scan_node_->materialized_slots().size(), column_readers_.size());
1167 
1168  // Issue all the column chunks to the io mgr and have them scheduled immediately.
1169  // This means these ranges aren't returned via DiskIoMgr::GetNextRange and
1170  // instead are scheduled to be read immediately.
1172  scan_node_->reader_context(), col_ranges, true));
1173 
1174  return Status::OK;
1175 }
1176 
1177 Status HdfsParquetScanner::CreateSchemaTree(const vector<parquet::SchemaElement>& schema,
1178  HdfsParquetScanner::SchemaNode* node) const {
1179  int max_def_level = 0;
1180  int idx = 0;
1181  int col_idx = 0;
1182  return CreateSchemaTree(schema, max_def_level, &idx, &col_idx, node);
1183 }
1184 
1186  const vector<parquet::SchemaElement>& schema, int max_def_level, int* idx,
1187  int* col_idx, HdfsParquetScanner::SchemaNode* node) const {
1188  if (*idx >= schema.size()) {
1189  return Status(Substitute("File $0 corrupt: could not reconstruct schema tree from "
1190  "flattened schema in file metadata", stream_->filename()));
1191  }
1192  node->element = &schema[*idx];
1193  ++(*idx);
1194 
1195  if (node->element->num_children == 0) {
1196  // node is a leaf node, meaning it's materialized in the file and appears in
1197  // file_metadata_.row_groups.columns
1198  node->col_idx = *col_idx;
1199  ++(*col_idx);
1200  }
1201 
1202  if (node->element->repetition_type == parquet::FieldRepetitionType::OPTIONAL) {
1203  ++max_def_level;
1204  }
1205  node->max_def_level = max_def_level;
1206 
1207  node->children.resize(node->element->num_children);
1208  for (int i = 0; i < node->element->num_children; ++i) {
1210  CreateSchemaTree(schema, max_def_level, idx, col_idx, &node->children[i]));
1211  }
1212  return Status::OK;
1213 }
1214 
1215 HdfsParquetScanner::FileVersion::FileVersion(const string& created_by) {
1216  string created_by_lower = created_by;
1217  std::transform(created_by_lower.begin(), created_by_lower.end(),
1218  created_by_lower.begin(), ::tolower);
1219  is_impala_internal = false;
1220 
1221  vector<string> tokens;
1222  split(tokens, created_by_lower, is_any_of(" "), token_compress_on);
1223  // Boost always creates at least one token
1224  DCHECK_GT(tokens.size(), 0);
1225  application = tokens[0];
1226 
1227  if (tokens.size() >= 3 && tokens[1] == "version") {
1228  string version_string = tokens[2];
1229  // Ignore any trailing nodextra characters
1230  int n = version_string.find_first_not_of("0123456789.");
1231  string version_string_trimmed = version_string.substr(0, n);
1232 
1233  vector<string> version_tokens;
1234  split(version_tokens, version_string_trimmed, is_any_of("."));
1235  version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0;
1236  version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0;
1237  version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0;
1238 
1239  if (application == "impala") {
1240  if (version_string.find("-internal") != string::npos) is_impala_internal = true;
1241  }
1242  } else {
1243  version.major = 0;
1244  version.minor = 0;
1245  version.patch = 0;
1246  }
1247 }
1248 
1249 bool HdfsParquetScanner::FileVersion::VersionLt(int major, int minor, int patch) const {
1250  if (version.major < major) return true;
1251  if (version.major > major) return false;
1252  DCHECK_EQ(version.major, major);
1253  if (version.minor < minor) return true;
1254  if (version.minor > minor) return false;
1255  DCHECK_EQ(version.minor, minor);
1256  return version.patch < patch;
1257 }
1258 
1259 bool HdfsParquetScanner::FileVersion::VersionEq(int major, int minor, int patch) const {
1260  return version.major == major && version.minor == minor && version.patch == patch;
1261 }
1262 
1264  if (file_metadata_.version > PARQUET_CURRENT_VERSION) {
1265  stringstream ss;
1266  ss << "File: " << stream_->filename() << " is of an unsupported version. "
1267  << "file version: " << file_metadata_.version;
1268  return Status(ss.str());
1269  }
1270 
1271  // Parse out the created by application version string
1272  if (file_metadata_.__isset.created_by) {
1274  }
1275  return Status::OK;
1276 }
1277 
1278 bool IsEncodingSupported(parquet::Encoding::type e) {
1279  switch (e) {
1281  case parquet::Encoding::PLAIN_DICTIONARY:
1282  case parquet::Encoding::BIT_PACKED:
1283  case parquet::Encoding::RLE:
1284  return true;
1285  default:
1286  return false;
1287  }
1288 }
1289 
1291  const BaseColumnReader& col_reader, int row_group_idx) {
1292  const SlotDescriptor* slot_desc = col_reader.slot_desc();
1293  int col_idx = col_reader.col_idx();
1294  const parquet::SchemaElement& schema_element = col_reader.schema_element();
1295  parquet::ColumnChunk& file_data =
1296  file_metadata_.row_groups[row_group_idx].columns[col_idx];
1297 
1298  // Check the encodings are supported
1299  vector<parquet::Encoding::type>& encodings = file_data.meta_data.encodings;
1300  for (int i = 0; i < encodings.size(); ++i) {
1301  if (!IsEncodingSupported(encodings[i])) {
1302  stringstream ss;
1303  ss << "File '" << metadata_range_->file() << "' uses an unsupported encoding: "
1304  << PrintEncoding(encodings[i]) << " for column '" << schema_element.name
1305  << "'.";
1306  return Status(ss.str());
1307  }
1308  }
1309 
1310  // Check the compression is supported
1311  if (file_data.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED &&
1312  file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY &&
1313  file_data.meta_data.codec != parquet::CompressionCodec::GZIP) {
1314  stringstream ss;
1315  ss << "File '" << metadata_range_->file() << "' uses an unsupported compression: "
1316  << file_data.meta_data.codec << " for column '" << schema_element.name
1317  << "'.";
1318  return Status(ss.str());
1319  }
1320 
1321  // Check the type in the file is compatible with the catalog metadata.
1322  parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type];
1323  if (type != file_data.meta_data.type) {
1324  stringstream ss;
1325  ss << "File '" << metadata_range_->file() << "' has an incompatible type with the"
1326  << " table schema for column '" << schema_element.name << "'. Expected type: "
1327  << type << ". Actual type: " << file_data.meta_data.type;
1328  return Status(ss.str());
1329  }
1330 
1331  // Check that this column is optional or required
1332  if (schema_element.repetition_type != parquet::FieldRepetitionType::OPTIONAL &&
1333  schema_element.repetition_type != parquet::FieldRepetitionType::REQUIRED) {
1334  stringstream ss;
1335  ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
1336  << "' contains an unsupported column repetition type: "
1337  << schema_element.repetition_type;
1338  return Status(ss.str());
1339  }
1340 
1341  // Check the decimal scale in the file matches the metastore scale and precision.
1342  // We fail the query if the metadata makes it impossible for us to safely read
1343  // the file. If we don't require the metadata, we will fail the query if
1344  // abort_on_error is true, otherwise we will just log a warning.
1345  bool is_converted_type_decimal = schema_element.__isset.converted_type &&
1346  schema_element.converted_type == parquet::ConvertedType::DECIMAL;
1347  if (slot_desc->type().type == TYPE_DECIMAL) {
1348  // We require that the scale and byte length be set.
1349  if (schema_element.type != parquet::Type::FIXED_LEN_BYTE_ARRAY) {
1350  stringstream ss;
1351  ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
1352  << "' should be a decimal column encoded using FIXED_LEN_BYTE_ARRAY.";
1353  return Status(ss.str());
1354  }
1355 
1356  if (!schema_element.__isset.type_length) {
1357  stringstream ss;
1358  ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
1359  << "' does not have type_length set.";
1360  return Status(ss.str());
1361  }
1362 
1363  int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
1364  if (schema_element.type_length != expected_len) {
1365  stringstream ss;
1366  ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
1367  << "' has an invalid type length. Expecting: " << expected_len
1368  << " len in file: " << schema_element.type_length;
1369  return Status(ss.str());
1370  }
1371 
1372  if (!schema_element.__isset.scale) {
1373  stringstream ss;
1374  ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
1375  << "' does not have the scale set.";
1376  return Status(ss.str());
1377  }
1378 
1379  if (schema_element.scale != slot_desc->type().scale) {
1380  // TODO: we could allow a mismatch and do a conversion at this step.
1381  stringstream ss;
1382  ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
1383  << "' has a scale that does not match the table metadata scale."
1384  << " File metadata scale: " << schema_element.scale
1385  << " Table metadata scale: " << slot_desc->type().scale;
1386  return Status(ss.str());
1387  }
1388 
1389  // The other decimal metadata should be there but we don't need it.
1390  if (!schema_element.__isset.precision) {
1391  ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION,
1392  metadata_range_->file(), schema_element.name);
1394  } else {
1395  if (schema_element.precision != slot_desc->type().precision) {
1396  // TODO: we could allow a mismatch and do a conversion at this step.
1397  ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION,
1398  metadata_range_->file(), schema_element.name,
1399  schema_element.precision, slot_desc->type().precision);
1401  }
1402  }
1403 
1404  if (!is_converted_type_decimal) {
1405  // TODO: is this validation useful? It is not required at all to read the data and
1406  // might only serve to reject otherwise perfectly readable files.
1407  ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE,
1408  metadata_range_->file(), schema_element.name);
1410  }
1411  } else if (schema_element.__isset.scale || schema_element.__isset.precision ||
1412  is_converted_type_decimal) {
1413  ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL,
1414  metadata_range_->file(), schema_element.name, slot_desc->type().DebugString());
1416  }
1417  return Status::OK;
1418 }
1419 
1420 string PrintRepetitionType(const parquet::FieldRepetitionType::type& t) {
1421  switch (t) {
1422  case parquet::FieldRepetitionType::REQUIRED: return "required";
1423  case parquet::FieldRepetitionType::OPTIONAL: return "optional";
1424  case parquet::FieldRepetitionType::REPEATED: return "repeated";
1425  default: return "<unknown>";
1426  }
1427 }
1428 
1429 string PrintParquetType(const parquet::Type::type& t) {
1430  switch (t) {
1431  case parquet::Type::BOOLEAN: return "boolean";
1432  case parquet::Type::INT32: return "int32";
1433  case parquet::Type::INT64: return "int64";
1434  case parquet::Type::INT96: return "int96";
1435  case parquet::Type::FLOAT: return "float";
1436  case parquet::Type::DOUBLE: return "double";
1437  case parquet::Type::BYTE_ARRAY: return "byte_array";
1438  case parquet::Type::FIXED_LEN_BYTE_ARRAY: return "fixed_len_byte_array";
1439  default: return "<unknown>";
1440  }
1441 }
1442 
1444  stringstream ss;
1445  for (int i = 0; i < indent; ++i) ss << " ";
1446  ss << PrintRepetitionType(element->repetition_type) << " ";
1447  if (element->num_children > 0) {
1448  ss << "struct";
1449  } else {
1450  ss << PrintParquetType(element->type);
1451  }
1452  ss << " " << element->name << " [i:" << col_idx << " d:" << max_def_level << "]";
1453  if (element->num_children > 0) {
1454  ss << " {" << endl;
1455  for (int i = 0; i < element->num_children; ++i) {
1456  ss << children[i].DebugString(indent + 2) << endl;
1457  }
1458  for (int i = 0; i < indent; ++i) ss << " ";
1459  ss << "}";
1460  }
1461  return ss.str();
1462 }
const std::vector< SlotDescriptor * > & materialized_slots() const
static int DecimalSize(const ColumnType &t)
The minimum byte size to store decimals of with precision t.precision.
virtual void CreateDictionaryDecoder(uint8_t *values, int size)=0
scoped_ptr< DictDecoder< T > > dict_decoder_
HdfsScanNode * scan_node_
The scan node that started this scanner.
Definition: hdfs-scanner.h:141
const std::string GetDetail() const
Definition: status.cc:184
Internal representation of a column schema (including nested-type columns).
Status ValidateFileMetadata()
Validates the file metadata.
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
void SetNull(const NullIndicatorOffset &offset)
Definition: tuple.h:101
const int MAX_PAGE_HEADER_SIZE
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
Status GetBuffer(bool peek, uint8_t **buffer, int64_t *out_len)
ScannerContext * context_
Context for this scanner.
Definition: hdfs-scanner.h:147
std::string filename
File name including the path.
BoolColumnReader(HdfsParquetScanner *parent, const SchemaNode &node)
int tuple_byte_size_
Fixed size of each tuple, in bytes.
Definition: hdfs-scanner.h:167
string PrintRepetitionType(const parquet::FieldRepetitionType::type &t)
BaseColumnReader(HdfsParquetScanner *parent, const SchemaNode &node)
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
bool VersionEq(int major, int minor, int patch) const
Returns true if version is equal to <major>.<minor>.<patch>
string PrintParquetType(const parquet::Type::type &t)
int precision
Only set if type == TYPE_DECIMAL.
Definition: types.h:68
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
const uint8_t PARQUET_VERSION_NUMBER[4]
virtual Status InitDataPage(uint8_t *data, int size)
std::vector< SchemaNode > children
Any nested schema nodes. Empty for non-nested types.
void ReleaseCompletedResources(RowBatch *batch, bool done)
string PrintPath(const vector< int > &path)
Definition: debug-util.cc:211
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static bool RequiresSkippedDictionaryHeaderCheck(const HdfsParquetScanner::FileVersion &v)
#define LOG_OR_RETURN_ON_ERROR(error_msg, runtime_state)
const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[]
Mapping of Parquet codec enums to Impala enums.
SchemaNode schema_
The root schema node for this file.
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
const std::vector< int > & col_path() const
Definition: descriptors.h:85
Status parse_status_
Returned in ProcessSplit.
void * GetSlot(int offset)
Definition: tuple.h:118
static void PadWithSpaces(char *cptr, int64_t cptr_len, int64_t num_chars)
RuntimeProfile::Counter * rows_read_counter() const
Definition: scan-node.h:96
const char * file() const
Definition: disk-io-mgr.h:266
ColumnReader(HdfsParquetScanner *parent, const SchemaNode &node)
void MarkFileDescIssued(const HdfsFileDesc *file_desc)
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
Definition: disk-io-mgr.cc:455
virtual bool ReadSlot(void *slot, MemPool *pool, bool *conjuncts_failed)
bool cancelled() const
If true, the ScanNode has been cancelled and the scanner thread should finish up. ...
const DiskIoMgr::ScanRange * metadata_range_
Scan range for the metadata.
Tuple * InitEmptyTemplateTuple()
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
Definition: status.cc:166
const int MAX_DICT_HEADER_SIZE
virtual void CreateDictionaryDecoder(uint8_t *values, int size)
TupleRow * next_row(TupleRow *r) const
Definition: hdfs-scanner.h:368
#define COUNTER_ADD(c, v)
parquet::FileMetaData file_metadata_
File metadata thrift object.
int64_t partition_id
The partition id that this range is part of.
virtual Status InitDataPage(uint8_t *data, int size)
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
bool ReachedLimit()
Definition: exec-node.h:159
bool Get(int64_t bit_index) const
Definition: bitmap.h:54
Status Read(RequestContext *reader, ScanRange *range, BufferDescriptor **buffer)
Definition: disk-io-mgr.cc:555
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
#define SCOPED_TIMER(c)
std::vector< BaseColumnReader * > column_readers_
Column reader for each materialized columns for this file.
boost::scoped_ptr< MemPool > decompressed_data_pool_
std::string application
Application that wrote the file. e.g. "IMPALA".
Status AssembleRows(int row_group_idx)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
virtual void Close()
Definition: hdfs-scanner.cc:82
bool GetBytes(int64_t requested_len, uint8_t **buffer, int64_t *out_len, Status *status, bool peek=false)
boost::scoped_ptr< MemPool > dictionary_pool_
virtual bool ReadSlot(void *slot, MemPool *pool, bool *conjuncts_failed)
std::string DebugString() const
Definition: types.cc:194
RuntimeState * state_
RuntimeState for error reporting.
Definition: hdfs-scanner.h:144
FileVersion file_version_
Version of the application that wrote this file.
#define VLOG_QUERY
Definition: logging.h:57
PrimitiveType type
Definition: types.h:60
ScopedTimer< MonotonicStopWatch > assemble_rows_timer_
Timer for materializing rows. This ignores time getting the next buffer.
const parquet::Type::type IMPALA_TO_PARQUET_TYPES[]
int64_t mtime
Last modified time.
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
bool LogError(const ErrorMsg &msg)
void InitTuple(Tuple *template_tuple, Tuple *tuple)
Definition: hdfs-scanner.h:355
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
static bool Read(uint8_t **buf, int *buf_len, T *val, Status *status)
HdfsParquetScanner(HdfsScanNode *scan_node, RuntimeState *state)
const Bitmap * GetBitmapFilter(SlotId slot)
const ColumnType & type() const
Definition: descriptors.h:78
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
ObjectPool pool
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
Definition: types.h:178
virtual Status InitDataPage(uint8_t *data, int size)=0
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
Status ValidateColumn(const BaseColumnReader &col_reader, int row_group_idx)
Status CommitRows(int num_rows)
Status InitColumns(int row_group_idx)
#define ADD_COUNTER(profile, name, unit)
uint32_t fragment_hash_seed() const
int len
Only set if type == TYPE_CHAR or type == TYPE_VARCHAR.
Definition: types.h:62
RuntimeState * runtime_state()
virtual void CreateDictionaryDecoder(uint8_t *values, int size)
Status Reset(const parquet::ColumnMetaData *metadata, ScannerContext::Stream *stream)
DiskIoMgr::RequestContext * reader_context()
static int Ceil(int value, int divisor)
Returns the ceil of value/divisor.
Definition: bit-util.h:32
RuntimeProfile::Counter * num_cols_counter_
Number of cols that need to be read.
DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,"When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will ""be converted from UTC to local time. Writes are unaffected.")
#define COUNTER_SET(c, v)
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
Definition: hdfs-scanner.h:266
void AttachPool(MemPool *pool, bool commit_batch)
Definition: hdfs-scanner.h:256
const parquet::SchemaElement * element
The corresponding schema element defined in the file metadata.
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
bool IsEncodingSupported(parquet::Encoding::type e)
#define UNLIKELY(expr)
Definition: compiler-util.h:33
virtual bool ReadSlot(void *slot, MemPool *pool, bool *conjuncts_failed)=0
std::string PrintEncoding(const parquet::Encoding::type &type)
const parquet::SchemaElement & schema_element() const
BaseColumnReader * CreateReader(const SchemaNode &node)
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
static const Status OK
Definition: status.h:87
const DiskIoMgr::ScanRange * scan_range()
Stream * AddStream(DiskIoMgr::ScanRange *range)
Status DeserializeThriftMsg(JNIEnv *env, jbyteArray serialized_msg, T *deserialized_msg)
virtual int num_entries() const =0
const uint32_t PARQUET_CURRENT_VERSION
#define VLOG_FILE
Definition: logging.h:58
int tuple_idx() const
void ConvertSlot(const T *src, T *dst, MemPool *pool)
int max_read_buffer_size() const
Returns the maximum read buffer size.
Definition: disk-io-mgr.h:590
bool ok() const
Definition: status.h:172
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
Definition: hdfs-scanner.h:208
bool ReadBytes(int64_t length, uint8_t **buf, Status *, bool peek=false)
DiskIoMgr * io_mgr()
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
bool HasDateAndTime() const
#define LOG_OR_ABORT(error_msg, runtime_state)
bool ReadValue(MemPool *pool, Tuple *tuple, bool *conjuncts_failed)
std::string DebugString(int indent=0) const
Decoder class for RLE encoded data.
Definition: rle-encoding.h:77
void IncNumScannersCodegenDisabled()
ScannerContext::Stream * stream_
The first stream for context_.
Definition: hdfs-scanner.h:150
uint8_t * Allocate(int size)
Definition: mem-pool.h:92
bool is_impala_internal
If true, this file was generated by an Impala internal release.
Tuple * next_tuple(Tuple *t) const
Definition: hdfs-scanner.h:363
static uint32_t GetHashValue(const void *v, const ColumnType &type, uint32_t seed=0)
Definition: raw-value.h:168
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
Definition: hdfs-scanner.cc:71
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
bool VersionLt(int major, int minor=0, int patch=0) const
Returns true if version is strictly less than <major>.<minor>.<patch>
Status CreateSchemaTree(const std::vector< parquet::SchemaElement > &schema, SchemaNode *node) const
static int Log2(uint64_t x)
Definition: bit-util.h:135
bool GetValue(int num_bits, T *v)