19 #include <boost/algorithm/string.hpp>
20 #include <gflags/gflags.h>
21 #include <gutil/strings/substitute.h>
49 using boost::algorithm::is_any_of;
50 using boost::algorithm::split;
51 using boost::algorithm::token_compress_on;
52 using namespace impala;
53 using namespace strings;
56 DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps,
false,
57 "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
58 "be converted from UTC to local time. Writes are unaffected.");
69 #define LOG_OR_ABORT(error_msg, runtime_state) \
70 if (runtime_state->abort_on_error()) { \
71 return Status(error_msg); \
73 runtime_state->LogError(error_msg); \
77 #define LOG_OR_RETURN_ON_ERROR(error_msg, runtime_state) \
78 if (runtime_state->abort_on_error()) { \
79 return Status(error_msg.msg()); \
81 runtime_state->LogError(error_msg);
84 const std::vector<HdfsFileDesc*>& files) {
85 vector<DiskIoMgr::ScanRange*> footer_ranges;
86 for (
int i = 0; i < files.size(); ++i) {
87 for (
int j = 0; j < files[i]->splits.size(); ++j) {
94 if (split->
offset() != 0) {
98 ErrorMsg(TErrorCode::PARQUET_MULTIPLE_BLOCKS, files[i]->filename));
101 scan_node->
RangeComplete(THdfsFileFormat::PARQUET, THdfsCompression::NONE);
106 DCHECK_GT(files[i]->file_length, 0);
107 int64_t footer_size = min(static_cast<int64_t>(FOOTER_SIZE), files[i]->file_length);
108 int64_t footer_start = files[i]->file_length - footer_size;
113 files[i]->fs, files[i]->filename.c_str(), footer_size, footer_start,
116 footer_ranges.push_back(footer_range);
127 metadata_range_(NULL),
128 dictionary_pool_(new
MemPool(scan_node->mem_tracker())),
129 assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
146 DCHECK_NOTNULL(stream);
147 DCHECK_NOTNULL(metadata);
155 if (
metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
172 THdfsCompression::type
codec()
const {
173 if (
metadata_ == NULL)
return THdfsCompression::NONE;
300 (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
313 parquet::Encoding::PLAIN_DICTIONARY) {
315 return Status(
"File corrupt. Missing dictionary page.");
331 parquet::Encoding::type page_encoding =
336 if (page_encoding == parquet::Encoding::PLAIN_DICTIONARY) {
375 if (slot->
len == 0)
return;
377 memcpy(buffer, slot->
ptr, slot->
len);
378 slot->
ptr =
reinterpret_cast<char*
>(buffer);
384 DCHECK(slot_desc()->type().type ==
TYPE_CHAR);
385 int len = slot_desc()->type().len;
388 if (slot_desc()->type().IsVarLen()) {
389 sv.
ptr =
reinterpret_cast<char*
>(pool->
Allocate(len));
391 sv.
ptr =
reinterpret_cast<char*
>(dst);
393 int unpadded_len = min(len, src->
len);
394 memcpy(sv.
ptr, src->
ptr, unpadded_len);
397 if (slot_desc()->type().IsVarLen()) *dst = sv;
404 DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
418 DCHECK(
false) <<
"Dictionary encoding is not supported for bools. Should never "
419 <<
"have gotten this far.";
450 vector<THdfsCompression::type> compression_types;
464 if (compression_types.empty()) compression_types.push_back(THdfsCompression::NONE);
553 if (buffer_size == 0) {
555 ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
563 uint32_t header_size;
565 header_size = buffer_size;
568 if (status.
ok())
break;
572 ss <<
"ParquetScanner: could not read data page because page header exceeded "
573 <<
"maximum size of "
581 int64_t new_buffer_size = max(buffer_size * 2, 1024L);
583 new_buffer_size, &buffer, &new_buffer_size, &status,
true);
585 DCHECK(!status.
ok());
590 if (buffer_size == new_buffer_size) {
591 DCHECK_NE(new_buffer_size, 0);
592 ErrorMsg msg(TErrorCode::PARQUET_HEADER_EOF);
595 DCHECK_GT(new_buffer_size, buffer_size);
596 buffer_size = new_buffer_size;
607 return Status(
"Column chunk should not contain two dictionary pages.");
610 return Status(
"Unexpected dictionary page. Dictionary page is not"
611 " supported for booleans.");
613 const parquet::DictionaryPageHeader* dict_header = NULL;
618 return Status(
"Dictionary page does not have dictionary header set.");
621 if (dict_header != NULL &&
623 dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
624 return Status(
"Only PLAIN and PLAIN_DICTIONARY encodings are supported "
625 "for dictionary pages.");
630 uint8_t* dict_values = NULL;
634 &uncompressed_size, &dict_values));
635 VLOG_FILE <<
"Decompressed " << data_size <<
" to " << uncompressed_size;
636 data_size = uncompressed_size;
642 memcpy(dict_values,
data_, data_size);
646 if (dict_header != NULL &&
649 "Invalid dictionary. Expected $0 entries but data contained $1 entries",
672 &decompressed_buffer));
674 <<
" to " << uncompressed_size;
676 data_ = decompressed_buffer;
679 DCHECK_EQ(
metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
685 int32_t num_definition_bytes = 0;
687 case parquet::Encoding::RLE: {
695 case parquet::Encoding::BIT_PACKED:
701 ss <<
"Unsupported definition level encoding: "
706 DCHECK_GT(num_definition_bytes, 0);
707 data_ += num_definition_bytes;
708 data_size -= num_definition_bytes;
721 if (max_def_level() == 0) {
727 uint8_t definition_level;
729 switch (current_page_header_.data_page_header.definition_level_encoding) {
730 case parquet::Encoding::RLE:
731 valid = rle_def_levels_.Get(&definition_level);
733 case parquet::Encoding::BIT_PACKED: {
734 valid = bit_packed_def_levels_.GetValue(1, &definition_level);
740 if (!valid)
return -1;
741 return definition_level;
746 if (num_buffered_values_ == 0) {
747 parent_->assemble_rows_timer_.Stop();
748 parent_->parse_status_ = ReadDataPage();
752 if (num_buffered_values_ == 0 || !parent_->parse_status_.ok())
return false;
753 parent_->assemble_rows_timer_.Start();
756 --num_buffered_values_;
757 int definition_level = ReadDefinitionLevel();
758 if (definition_level < 0)
return false;
760 if (definition_level != max_def_level()) {
762 DCHECK_LT(definition_level, max_def_level());
763 tuple->
SetNull(slot_desc()->null_indicator_offset());
766 return ReadSlot(tuple->
GetSlot(slot_desc()->tuple_offset()),
pool, conjuncts_failed);
808 int64_t expected_rows_in_group =
file_metadata_.row_groups[row_group_idx].num_rows;
809 int64_t rows_read = 0;
815 while (!reached_limit && !cancelled && rows_read < expected_rows_in_group) {
818 int64_t row_mem_limit =
static_cast<int64_t
>(
GetMemory(&pool, &tuple, &row));
819 int64_t expected_rows_to_read = expected_rows_in_group - rows_read;
820 int64_t num_rows = std::min(expected_rows_to_read, row_mem_limit);
822 int num_to_commit = 0;
823 if (num_column_readers > 0) {
824 for (
int i = 0; i < num_rows; ++i) {
825 bool conjuncts_failed =
false;
827 for (
int c = 0; c < num_column_readers; ++c) {
843 if (rows_read != expected_rows_in_group) {
845 DCHECK_NOTNULL(reader->
stream_);
847 ErrorMsg msg(TErrorCode::PARQUET_GROUP_ROW_COUNT_ERROR,
849 expected_rows_in_group, rows_read);
855 if (conjuncts_failed)
continue;
867 DCHECK_GT(num_rows, 0);
874 for (
int i = 1; i < num_rows; ++i) {
880 num_to_commit += num_rows;
883 rows_read += num_rows;
891 if (!reached_limit && !cancelled && (num_column_readers > 0)) {
895 DCHECK_EQ(rows_read, expected_rows_in_group);
897 Tuple* dummy_tuple =
reinterpret_cast<Tuple*
>(&dummy_tuple_mem);
899 bool conjuncts_failed =
false;
900 if (
column_readers_[0]->ReadValue(pool, dummy_tuple, &conjuncts_failed)) {
904 DCHECK_NOTNULL(reader->
stream_);
905 ErrorMsg msg(TErrorCode::PARQUET_GROUP_ROW_COUNT_OVERFLOW,
907 expected_rows_in_group);
928 if (remaining_bytes_buffered < 0) {
929 return Status(Substitute(
"File $0 is invalid. Missing metadata.",
935 if (memcmp(magic_number_ptr,
937 return Status(Substitute(
"File $0 is invalid. Invalid file footer: $1",
944 uint8_t* metadata_size_ptr = magic_number_ptr -
sizeof(int32_t);
945 uint32_t metadata_size = *
reinterpret_cast<uint32_t*
>(metadata_size_ptr);
946 uint8_t* metadata_ptr = metadata_size_ptr - metadata_size;
949 vector<uint8_t> metadata_buffer;
952 if (
UNLIKELY(metadata_size > remaining_bytes_buffered)) {
957 DCHECK_NOTNULL(file_desc);
962 int64_t metadata_bytes_to_read = metadata_size;
963 if (metadata_start < 0) {
964 return Status(Substitute(
"File $0 is invalid. Invalid metadata size in file "
965 "footer: $1 bytes. File size: $2 bytes.",
stream_->
filename(), metadata_size,
973 metadata_buffer.resize(metadata_size);
974 metadata_ptr = &metadata_buffer[0];
975 int64_t copy_offset = 0;
978 while (metadata_bytes_to_read > 0) {
980 metadata_bytes_to_read);
989 memcpy(metadata_ptr + copy_offset, io_buffer->
buffer(), io_buffer->
len());
992 metadata_bytes_to_read -= to_read;
993 copy_offset += to_read;
995 DCHECK_EQ(metadata_bytes_to_read, 0);
1002 return Status(Substitute(
"File $0 has invalid file metadata at file offset $1. "
1023 while (num_tuples > 0) {
1027 int max_tuples =
GetMemory(&pool, &tuple, ¤t_row);
1028 max_tuples = min(static_cast<int64_t>(max_tuples), num_tuples);
1029 num_tuples -= max_tuples;
1044 return Status(Substitute(
"Invalid file. This file: $0 has no row groups",
1058 for (
int j = 0; j < path.size(); ++j) {
1062 VLOG_FILE << Substitute(
"File $0 does not contain path $1",
1070 if (node != NULL && node->
children.size() > 0) {
1071 string error = Substitute(
"Path $0 is not a supported type in file $1",
1095 DCHECK_NOTNULL(file_desc);
1096 parquet::RowGroup& row_group =
file_metadata_.row_groups[row_group_idx];
1099 vector<DiskIoMgr::ScanRange*> col_ranges;
1102 const parquet::ColumnChunk& col_chunk =
1104 int64_t col_start = col_chunk.meta_data.data_page_offset;
1109 if (col_chunk.meta_data.__isset.dictionary_page_offset) {
1110 if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
1112 ss <<
"File " << file_desc->
filename <<
": metadata is corrupt. "
1113 <<
"Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset
1114 <<
") must come before any data pages (offset=" << col_start <<
").";
1117 col_start = col_chunk.meta_data.dictionary_page_offset;
1119 int64_t col_len = col_chunk.meta_data.total_compressed_size;
1120 int64_t col_end = col_start + col_len;
1121 if (col_end <= 0 || col_end > file_desc->
file_length) {
1123 ss <<
"File " << file_desc->
filename <<
": metadata is corrupt. "
1124 <<
"Column " <<
column_readers_[i]->col_idx() <<
" has invalid column offsets "
1125 <<
"(offset=" << col_start <<
", size=" << col_len <<
", "
1126 <<
"file_size=" << file_desc->
file_length <<
").";
1133 int64_t bytes_remaining = file_desc->
file_length - col_end;
1140 if (!col_chunk.file_path.empty()) {
1149 col_ranges.push_back(col_range);
1153 DCHECK(stream != NULL);
1158 col_chunk.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
1179 int max_def_level = 0;
1186 const vector<parquet::SchemaElement>& schema,
int max_def_level,
int*
idx,
1188 if (*idx >= schema.size()) {
1189 return Status(Substitute(
"File $0 corrupt: could not reconstruct schema tree from "
1195 if (node->
element->num_children == 0) {
1202 if (node->
element->repetition_type == parquet::FieldRepetitionType::OPTIONAL) {
1208 for (
int i = 0; i < node->
element->num_children; ++i) {
1216 string created_by_lower = created_by;
1217 std::transform(created_by_lower.begin(), created_by_lower.end(),
1218 created_by_lower.begin(), ::tolower);
1219 is_impala_internal =
false;
1221 vector<string> tokens;
1222 split(tokens, created_by_lower, is_any_of(
" "), token_compress_on);
1224 DCHECK_GT(tokens.size(), 0);
1225 application = tokens[0];
1227 if (tokens.size() >= 3 && tokens[1] ==
"version") {
1228 string version_string = tokens[2];
1230 int n = version_string.find_first_not_of(
"0123456789.");
1231 string version_string_trimmed = version_string.substr(0, n);
1233 vector<string> version_tokens;
1234 split(version_tokens, version_string_trimmed, is_any_of(
"."));
1235 version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0;
1236 version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0;
1237 version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0;
1239 if (application ==
"impala") {
1240 if (version_string.find(
"-internal") != string::npos) is_impala_internal =
true;
1250 if (version.major < major)
return true;
1251 if (version.major > major)
return false;
1252 DCHECK_EQ(version.major, major);
1253 if (version.minor < minor)
return true;
1254 if (version.minor > minor)
return false;
1255 DCHECK_EQ(version.minor, minor);
1256 return version.patch < patch;
1260 return version.major == major && version.minor == minor && version.patch == patch;
1266 ss <<
"File: " <<
stream_->
filename() <<
" is of an unsupported version. "
1281 case parquet::Encoding::PLAIN_DICTIONARY:
1282 case parquet::Encoding::BIT_PACKED:
1283 case parquet::Encoding::RLE:
1293 int col_idx = col_reader.
col_idx();
1294 const parquet::SchemaElement& schema_element = col_reader.
schema_element();
1295 parquet::ColumnChunk& file_data =
1299 vector<parquet::Encoding::type>& encodings = file_data.meta_data.encodings;
1300 for (
int i = 0; i < encodings.size(); ++i) {
1304 <<
PrintEncoding(encodings[i]) <<
" for column '" << schema_element.name
1311 if (file_data.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED &&
1312 file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY &&
1313 file_data.meta_data.codec != parquet::CompressionCodec::GZIP) {
1316 << file_data.meta_data.codec <<
" for column '" << schema_element.name
1323 if (type != file_data.meta_data.type) {
1326 <<
" table schema for column '" << schema_element.name <<
"'. Expected type: "
1327 << type <<
". Actual type: " << file_data.meta_data.type;
1332 if (schema_element.repetition_type != parquet::FieldRepetitionType::OPTIONAL &&
1333 schema_element.repetition_type != parquet::FieldRepetitionType::REQUIRED) {
1336 <<
"' contains an unsupported column repetition type: "
1337 << schema_element.repetition_type;
1345 bool is_converted_type_decimal = schema_element.__isset.converted_type &&
1346 schema_element.converted_type == parquet::ConvertedType::DECIMAL;
1349 if (schema_element.type != parquet::Type::FIXED_LEN_BYTE_ARRAY) {
1352 <<
"' should be a decimal column encoded using FIXED_LEN_BYTE_ARRAY.";
1356 if (!schema_element.__isset.type_length) {
1359 <<
"' does not have type_length set.";
1364 if (schema_element.type_length != expected_len) {
1367 <<
"' has an invalid type length. Expecting: " << expected_len
1368 <<
" len in file: " << schema_element.type_length;
1372 if (!schema_element.__isset.scale) {
1375 <<
"' does not have the scale set.";
1379 if (schema_element.scale != slot_desc->
type().
scale) {
1383 <<
"' has a scale that does not match the table metadata scale."
1384 <<
" File metadata scale: " << schema_element.scale
1385 <<
" Table metadata scale: " << slot_desc->
type().
scale;
1390 if (!schema_element.__isset.precision) {
1391 ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION,
1395 if (schema_element.precision != slot_desc->
type().
precision) {
1397 ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION,
1404 if (!is_converted_type_decimal) {
1407 ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE,
1411 }
else if (schema_element.__isset.scale || schema_element.__isset.precision ||
1412 is_converted_type_decimal) {
1413 ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL,
1422 case parquet::FieldRepetitionType::REQUIRED:
return "required";
1423 case parquet::FieldRepetitionType::OPTIONAL:
return "optional";
1424 case parquet::FieldRepetitionType::REPEATED:
return "repeated";
1425 default:
return "<unknown>";
1431 case parquet::Type::BOOLEAN:
return "boolean";
1432 case parquet::Type::INT32:
return "int32";
1433 case parquet::Type::INT64:
return "int64";
1434 case parquet::Type::INT96:
return "int96";
1435 case parquet::Type::FLOAT:
return "float";
1436 case parquet::Type::DOUBLE:
return "double";
1437 case parquet::Type::BYTE_ARRAY:
return "byte_array";
1438 case parquet::Type::FIXED_LEN_BYTE_ARRAY:
return "fixed_len_byte_array";
1439 default:
return "<unknown>";
1445 for (
int i = 0; i < indent; ++i) ss <<
" ";
1447 if (element->num_children > 0) {
1452 ss <<
" " << element->name <<
" [i:" << col_idx <<
" d:" << max_def_level <<
"]";
1453 if (element->num_children > 0) {
1455 for (
int i = 0; i < element->num_children; ++i) {
1456 ss << children[i].DebugString(indent + 2) << endl;
1458 for (
int i = 0; i < indent; ++i) ss <<
" ";
const std::vector< SlotDescriptor * > & materialized_slots() const
ScannerContext::Stream * stream_
int64_t bitmap_filter_rows_rejected_
static int DecimalSize(const ColumnType &t)
The minimum byte size to store decimals of with precision t.precision.
virtual void CreateDictionaryDecoder(uint8_t *values, int size)=0
scoped_ptr< DictDecoder< T > > dict_decoder_
scoped_ptr< Codec > decompressor_
THdfsCompression::type codec() const
BitReader bit_packed_def_levels_
HdfsScanNode * scan_node_
The scan node that started this scanner.
const std::string GetDetail() const
Internal representation of a column schema (including nested-type columns).
Status ValidateFileMetadata()
Validates the file metadata.
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
void SetNull(const NullIndicatorOffset &offset)
const int MAX_PAGE_HEADER_SIZE
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
HdfsParquetScanner * parent_
Status GetBuffer(bool peek, uint8_t **buffer, int64_t *out_len)
ScannerContext * context_
Context for this scanner.
std::string filename
File name including the path.
BoolColumnReader(HdfsParquetScanner *parent, const SchemaNode &node)
int tuple_byte_size_
Fixed size of each tuple, in bytes.
string PrintRepetitionType(const parquet::FieldRepetitionType::type &t)
BaseColumnReader(HdfsParquetScanner *parent, const SchemaNode &node)
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
bool VersionEq(int major, int minor, int patch) const
Returns true if version is equal to <major>.<minor>.<patch>
string PrintParquetType(const parquet::Type::type &t)
int precision
Only set if type == TYPE_DECIMAL.
A tuple with 0 materialised slots is represented as NULL.
const uint8_t PARQUET_VERSION_NUMBER[4]
virtual Status InitDataPage(uint8_t *data, int size)
std::vector< SchemaNode > children
Any nested schema nodes. Empty for non-nested types.
void ReleaseCompletedResources(RowBatch *batch, bool done)
int ReadDefinitionLevel()
string PrintPath(const vector< int > &path)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static bool RequiresSkippedDictionaryHeaderCheck(const HdfsParquetScanner::FileVersion &v)
#define LOG_OR_RETURN_ON_ERROR(error_msg, runtime_state)
RleDecoder rle_def_levels_
const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[]
Mapping of Parquet codec enums to Impala enums.
SchemaNode schema_
The root schema node for this file.
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
const std::vector< int > & col_path() const
Status parse_status_
Returned in ProcessSplit.
void * GetSlot(int offset)
static void PadWithSpaces(char *cptr, int64_t cptr_len, int64_t num_chars)
RuntimeProfile::Counter * rows_read_counter() const
const char * file() const
ColumnReader(HdfsParquetScanner *parent, const SchemaNode &node)
void MarkFileDescIssued(const HdfsFileDesc *file_desc)
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
virtual bool ReadSlot(void *slot, MemPool *pool, bool *conjuncts_failed)
bool cancelled() const
If true, the ScanNode has been cancelled and the scanner thread should finish up. ...
const DiskIoMgr::ScanRange * metadata_range_
Scan range for the metadata.
Tuple * InitEmptyTemplateTuple()
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
const int MAX_DICT_HEADER_SIZE
virtual void CreateDictionaryDecoder(uint8_t *values, int size)
TupleRow * next_row(TupleRow *r) const
#define COUNTER_ADD(c, v)
parquet::FileMetaData file_metadata_
File metadata thrift object.
virtual Status InitDataPage(uint8_t *data, int size)
const NullIndicatorOffset & null_indicator_offset() const
int max_def_level() const
int64_t total_len() const
bool Get(int64_t bit_index) const
Status Read(RequestContext *reader, ScanRange *range, BufferDescriptor **buffer)
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
std::vector< BaseColumnReader * > column_readers_
Column reader for each materialized columns for this file.
boost::scoped_ptr< MemPool > decompressed_data_pool_
std::string application
Application that wrote the file. e.g. "IMPALA".
Status AssembleRows(int row_group_idx)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
friend class BoolColumnReader
bool GetBytes(int64_t requested_len, uint8_t **buffer, int64_t *out_len, Status *status, bool peek=false)
boost::scoped_ptr< MemPool > dictionary_pool_
virtual bool ReadSlot(void *slot, MemPool *pool, bool *conjuncts_failed)
std::string DebugString() const
RuntimeState * state_
RuntimeState for error reporting.
FileVersion file_version_
Version of the application that wrote this file.
ScopedTimer< MonotonicStopWatch > assemble_rows_timer_
Timer for materializing rows. This ignores time getting the next buffer.
const parquet::Type::type IMPALA_TO_PARQUET_TYPES[]
int64_t mtime
Last modified time.
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
bool LogError(const ErrorMsg &msg)
void InitTuple(Tuple *template_tuple, Tuple *tuple)
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
static bool Read(uint8_t **buf, int *buf_len, T *val, Status *status)
HdfsParquetScanner(HdfsScanNode *scan_node, RuntimeState *state)
const Bitmap * GetBitmapFilter(SlotId slot)
const ColumnType & type() const
ObjectPool * obj_pool() const
const Bitmap * bitmap_filter_
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
virtual Status InitDataPage(uint8_t *data, int size)=0
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
Status ValidateColumn(const BaseColumnReader &col_reader, int row_group_idx)
Status CommitRows(int num_rows)
Status InitColumns(int row_group_idx)
#define ADD_COUNTER(profile, name, unit)
uint32_t fragment_hash_seed() const
int len
Only set if type == TYPE_CHAR or type == TYPE_VARCHAR.
RuntimeState * runtime_state()
virtual void CreateDictionaryDecoder(uint8_t *values, int size)
Status Reset(const parquet::ColumnMetaData *metadata, ScannerContext::Stream *stream)
DiskIoMgr::RequestContext * reader_context()
static int Ceil(int value, int divisor)
Returns the ceil of value/divisor.
RuntimeProfile::Counter * num_cols_counter_
Number of cols that need to be read.
DictDecoderBase * dict_decoder_base_
DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,"When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will ""be converted from UTC to local time. Writes are unaffected.")
const SlotDescriptor * slot_desc() const
#define COUNTER_SET(c, v)
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
virtual ~HdfsParquetScanner()
void AttachPool(MemPool *pool, bool commit_batch)
const parquet::SchemaElement * element
The corresponding schema element defined in the file metadata.
void SetTuple(int tuple_idx, Tuple *tuple)
bool IsEncodingSupported(parquet::Encoding::type e)
SlotDescriptor * slot_desc
virtual bool ReadSlot(void *slot, MemPool *pool, bool *conjuncts_failed)=0
std::string PrintEncoding(const parquet::Encoding::type &type)
Status ProcessFooter(bool *eosr)
const parquet::SchemaElement & schema_element() const
bool expected_local() const
BaseColumnReader * CreateReader(const SchemaNode &node)
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
parquet::PageHeader current_page_header_
virtual Status ProcessSplit()
virtual ~BaseColumnReader()
const DiskIoMgr::ScanRange * scan_range()
Stream * AddStream(DiskIoMgr::ScanRange *range)
Status DeserializeThriftMsg(JNIEnv *env, jbyteArray serialized_msg, T *deserialized_msg)
virtual int num_entries() const =0
const uint32_t PARQUET_CURRENT_VERSION
Status CreateColumnReaders()
void ConvertSlot(const T *src, T *dst, MemPool *pool)
int max_read_buffer_size() const
Returns the maximum read buffer size.
void CopySlot(T *slot, MemPool *pool)
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
bool ReadBytes(int64_t length, uint8_t **buf, Status *, bool peek=false)
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
bool HasDateAndTime() const
const parquet::ColumnMetaData * metadata_
#define LOG_OR_ABORT(error_msg, runtime_state)
bool ReadValue(MemPool *pool, Tuple *tuple, bool *conjuncts_failed)
std::string DebugString(int indent=0) const
Decoder class for RLE encoded data.
void IncNumScannersCodegenDisabled()
ScannerContext::Stream * stream_
The first stream for context_.
void set_contains_tuple_data(bool v)
uint8_t * Allocate(int size)
bool is_impala_internal
If true, this file was generated by an Impala internal release.
Tuple * next_tuple(Tuple *t) const
static uint32_t GetHashValue(const void *v, const ColumnType &type, uint32_t seed=0)
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
RuntimeProfile * runtime_profile()
bool VersionLt(int major, int minor=0, int patch=0) const
Returns true if version is strictly less than <major>.<minor>.<patch>
Status CreateSchemaTree(const std::vector< parquet::SchemaElement > &schema, SchemaNode *node) const
static int Log2(uint64_t x)
bool GetValue(int num_bits, T *v)