37 #include "gen-cpp/ImpalaService_types.h"
40 using namespace impala;
41 using namespace parquet;
42 using namespace apache::thrift;
91 const THdfsCompression::type& codec)
92 : parent_(parent), expr_ctx_(expr_ctx), codec_(codec),
93 page_size_(DEFAULT_DATA_PAGE_SIZE), current_page_(NULL), num_values_(0),
94 total_compressed_byte_size_(0),
95 total_uncompressed_byte_size_(0),
96 dict_encoder_base_(NULL),
98 values_buffer_len_(DEFAULT_DATA_PAGE_SIZE) {
101 def_levels_ = parent_->state_->obj_pool()->Add(
102 new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
103 DEFAULT_DATA_PAGE_SIZE, 1));
104 values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
123 Status Flush(int64_t* file_pos, int64_t* first_data_page,
124 int64_t* first_dictionary_page);
132 current_page_ = NULL;
134 total_compressed_byte_size_ = 0;
141 if (compressor_.get() != NULL) compressor_->Close();
142 if (dict_encoder_base_ != NULL) dict_encoder_base_->ClearIndices();
149 parquet::CompressionCodec::type
codec()
const {
162 virtual bool EncodeValue(
void* value, int64_t* bytes_needed) = 0;
165 virtual void FinalizeCurrentPage();
171 void WriteDictDataPage();
241 const THdfsCompression::type& codec) :
BaseColumnWriter(parent, ctx, codec),
242 num_values_since_dict_size_check_(0) {
248 BaseColumnWriter::Reset();
251 current_encoding_ = Encoding::PLAIN_DICTIONARY;
253 new DictEncoder<T>(parent_->per_file_mem_pool_.get(), encoded_value_size_));
254 dict_encoder_base_ = dict_encoder_.get();
259 if (current_encoding_ == Encoding::PLAIN_DICTIONARY) {
260 if (
UNLIKELY(num_values_since_dict_size_check_ >=
261 DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
262 num_values_since_dict_size_check_ = 0;
263 if (dict_encoder_->EstimatedDataEncodedSize() >= page_size_)
return false;
265 ++num_values_since_dict_size_check_;
266 *bytes_needed = dict_encoder_->Put(*CastValue(value));
270 FinalizeCurrentPage();
274 parent_->file_size_estimate_ += *bytes_needed;
276 T* v = CastValue(value);
277 *bytes_needed = encoded_value_size_ < 0 ?
278 ParquetPlainEncoder::ByteSize<T>(*v) : encoded_value_size_;
279 if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) {
282 uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size;
283 int64_t written_len =
285 DCHECK_EQ(*bytes_needed, written_len);
286 current_page_->header.uncompressed_page_size += written_len;
302 static const int DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD = 100;
318 return reinterpret_cast<T*
>(value);
338 const THdfsCompression::type& codec) :
BaseColumnWriter(parent, ctx, codec) {
340 bool_values_ = parent_->state_->obj_pool()->Add(
341 new BitWriter(values_buffer_, values_buffer_len_));
345 dict_encoder_base_ = NULL;
350 return bool_values_->PutValue(*reinterpret_cast<bool*>(value), 1);
354 DCHECK(current_page_ != NULL);
355 if (current_page_->finalized)
return;
356 bool_values_->Flush();
357 int num_bytes = bool_values_->bytes_written();
358 current_page_->header.uncompressed_page_size += num_bytes;
360 BaseColumnWriter::FinalizeCurrentPage();
361 bool_values_->Clear();
373 void* value = expr_ctx_->GetValue(row);
374 if (current_page_ == NULL) NewPage();
378 if (!def_levels_->Put(value != NULL)) {
379 FinalizeCurrentPage();
381 bool ret = def_levels_->Put(value != NULL);
386 if (value == NULL)
break;
387 ++current_page_->num_non_null;
389 int64_t bytes_needed = 0;
390 if (EncodeValue(value, &bytes_needed))
break;
393 FinalizeCurrentPage();
397 if (
UNLIKELY(bytes_needed > page_size_)) {
398 page_size_ = bytes_needed;
399 if (page_size_ > MAX_DATA_PAGE_SIZE) {
401 ss <<
"Cannot write value of size "
403 <<
"data page that exceeds the max page limit "
407 values_buffer_len_ = page_size_;
408 values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
412 ++current_page_->header.data_page_header.num_values;
417 DCHECK(dict_encoder_base_ != NULL);
418 DCHECK_EQ(current_page_->header.uncompressed_page_size, 0);
419 if (current_page_->num_non_null == 0)
return;
420 int len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
424 values_buffer_len_ *= 2;
425 values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
426 len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
428 dict_encoder_base_->ClearIndices();
429 current_page_->header.uncompressed_page_size = len;
433 int64_t* first_data_page, int64_t* first_dictionary_page) {
434 if (current_page_ == NULL) {
436 *first_data_page = *file_pos;
437 *first_dictionary_page = -1;
441 FinalizeCurrentPage();
443 *first_dictionary_page = -1;
445 if (dict_encoder_base_ != NULL) {
446 *first_dictionary_page = *file_pos;
448 DictionaryPageHeader dict_header;
449 dict_header.num_values = dict_encoder_base_->num_entries();
450 dict_header.encoding = Encoding::PLAIN_DICTIONARY;
453 header.type = PageType::DICTIONARY_PAGE;
454 header.uncompressed_page_size = dict_encoder_base_->dict_encoded_size();
455 header.__set_dictionary_page_header(dict_header);
458 uint8_t* dict_buffer = parent_->per_file_mem_pool_->Allocate(
459 header.uncompressed_page_size);
460 dict_encoder_base_->WriteDict(dict_buffer);
461 if (compressor_.get() != NULL) {
463 int64_t max_compressed_size =
464 compressor_->MaxOutputLen(header.uncompressed_page_size);
465 DCHECK_GT(max_compressed_size, 0);
466 uint8_t* compressed_data =
467 parent_->per_file_mem_pool_->Allocate(max_compressed_size);
468 header.compressed_page_size = max_compressed_size;
469 compressor_->ProcessBlock32(
true, header.uncompressed_page_size, dict_buffer,
470 &header.compressed_page_size, &compressed_data);
471 dict_buffer = compressed_data;
474 parent_->per_file_mem_pool_->ReturnPartialAllocation(
475 max_compressed_size - header.compressed_page_size);
477 header.compressed_page_size = header.uncompressed_page_size;
480 uint8_t* header_buffer;
483 &header, &header_len, &header_buffer));
485 *file_pos += header_len;
486 total_compressed_byte_size_ += header_len;
487 total_uncompressed_byte_size_ += header_len;
489 RETURN_IF_ERROR(parent_->Write(dict_buffer, header.compressed_page_size));
490 *file_pos += header.compressed_page_size;
491 total_compressed_byte_size_ += header.compressed_page_size;
492 total_uncompressed_byte_size_ += header.uncompressed_page_size;
495 *first_data_page = *file_pos;
497 for (
int i = 0; i < num_data_pages_; ++i) {
501 if (page.
header.data_page_header.num_values == 0) {
502 DCHECK_EQ(page.
header.compressed_page_size, 0);
503 DCHECK_EQ(i, num_data_pages_ - 1);
508 uint8_t* buffer = NULL;
511 parent_->thrift_serializer_->Serialize(&page.
header, &len, &buffer));
517 *file_pos += page.
header.compressed_page_size;
523 DCHECK(current_page_ != NULL);
524 if (current_page_->finalized)
return;
529 if (current_page_->num_non_null == 0) current_encoding_ =
Encoding::PLAIN;
531 if (current_encoding_ == Encoding::PLAIN_DICTIONARY) WriteDictDataPage();
533 PageHeader& header = current_page_->header;
534 header.data_page_header.encoding = current_encoding_;
537 def_levels_->Flush();
538 current_page_->num_def_bytes =
sizeof(int32_t) + def_levels_->len();
539 header.uncompressed_page_size += current_page_->num_def_bytes;
542 uint8_t* uncompressed_data = NULL;
543 if (compressor_.get() == NULL) {
545 parent_->per_file_mem_pool_->Allocate(header.uncompressed_page_size);
548 parent_->compression_staging_buffer_.resize(
549 header.uncompressed_page_size);
550 uncompressed_data = &parent_->compression_staging_buffer_[0];
553 BufferBuilder buffer(uncompressed_data, header.uncompressed_page_size);
556 int num_def_level_bytes = def_levels_->len();
558 buffer.
Append(num_def_level_bytes);
559 buffer.
Append(def_levels_->buffer(), num_def_level_bytes);
564 if (compressor_.get() == NULL) {
565 current_page_->data =
reinterpret_cast<uint8_t*
>(uncompressed_data);
566 header.compressed_page_size = header.uncompressed_page_size;
569 int64_t max_compressed_size =
570 compressor_->MaxOutputLen(header.uncompressed_page_size);
571 DCHECK_GT(max_compressed_size, 0);
572 uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size);
573 header.compressed_page_size = max_compressed_size;
574 compressor_->ProcessBlock32(
true, header.uncompressed_page_size, uncompressed_data,
575 &header.compressed_page_size, &compressed_data);
576 current_page_->data = compressed_data;
580 parent_->per_file_mem_pool_->ReturnPartialAllocation(
581 max_compressed_size - header.compressed_page_size);
585 uint8_t* header_buffer;
586 uint32_t header_len = 0;
587 parent_->thrift_serializer_->Serialize(
588 ¤t_page_->header, &header_len, &header_buffer);
590 current_page_->finalized =
true;
591 total_compressed_byte_size_ += header_len + header.compressed_page_size;
592 total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size;
593 parent_->file_size_estimate_ += header_len + header.compressed_page_size;
594 def_levels_->Clear();
598 if (num_data_pages_ < pages_.size()) {
600 current_page_ = &pages_[num_data_pages_++];
601 current_page_->header.data_page_header.num_values = 0;
602 current_page_->header.compressed_page_size = 0;
603 current_page_->header.uncompressed_page_size = 0;
606 current_page_ = &pages_[num_data_pages_++];
608 DataPageHeader header;
609 header.num_values = 0;
610 header.definition_level_encoding = Encoding::RLE;
611 header.repetition_level_encoding = Encoding::BIT_PACKED;
612 current_page_->header.__set_data_page_header(header);
614 current_page_->finalized =
false;
615 current_page_->num_non_null = 0;
622 parent, state, output, part_desc, table_desc, output_expr_ctxs),
624 current_row_group_(NULL),
627 reusable_col_mem_pool_(new
MemPool(parent_->mem_tracker())),
628 per_file_mem_pool_(new
MemPool(parent_->mem_tracker())),
639 stringstream created_by;
645 THdfsCompression::type codec = THdfsCompression::SNAPPY;
648 if (query_options.__isset.compression_codec) {
649 codec = query_options.compression_codec;
651 if (!(codec == THdfsCompression::NONE ||
652 codec == THdfsCompression::GZIP ||
653 codec == THdfsCompression::SNAPPY)) {
659 VLOG_FILE <<
"Using compression codec: " << codec;
663 for (
int i = 0; i <
columns_.size(); ++i) {
677 this, output_expr_ctxs_[i], codec);
681 this, output_expr_ctxs_[i], codec);
685 this, output_expr_ctxs_[i], codec);
689 this, output_expr_ctxs_[i], codec);
693 this, output_expr_ctxs_[i], codec);
697 this, output_expr_ctxs_[i], codec);
703 this, output_expr_ctxs_[i], codec);
709 this, output_expr_ctxs_[i], codec);
713 this, output_expr_ctxs_[i], codec);
717 this, output_expr_ctxs_[i], codec);
741 for (
int i = 0; i <
columns_.size(); ++i) {
745 node.__set_repetition_type(FieldRepetitionType::OPTIONAL);
753 node.__set_converted_type(ConvertedType::DECIMAL);
754 node.__set_type_length(
759 node.__set_converted_type(ConvertedType::UTF8);
774 for (
int i = 0; i <
columns_.size(); ++i) {
775 ColumnMetaData metadata;
779 metadata.encodings.push_back(Encoding::RLE);
782 metadata.encodings.push_back(Encoding::PLAIN_DICTIONARY);
785 metadata.codec = columns_[i]->codec();
844 ss <<
"Parquet file size " <<
file_size_limit_ <<
" bytes is too small for "
845 <<
"a table with " <<
columns_.size() <<
" columns. Set query option "
846 <<
"PARQUET_FILE_SIZE to at least " <<
MinBlockSize() <<
".";
863 const vector<int32_t>& row_group_indices,
bool* new_file) {
867 if (row_group_indices.empty()) {
870 limit = row_group_indices.size();
873 bool all_rows = row_group_indices.empty();
877 for (
int j = 0; j <
columns_.size(); ++j) {
912 for (
int i = 0; i <
columns_.size(); ++i) {
932 for (
int i = 0; i <
columns_.size(); ++i) {
933 int64_t data_page_offset, dict_page_offset;
936 DCHECK_GT(data_page_offset, 0);
939 if (dict_page_offset >= 0) {
946 columns_[i]->total_uncompressed_size();
948 columns_[i]->total_compressed_size();
963 uint8_t* buffer = NULL;
979 uint32_t file_metadata_len = 0;
980 uint8_t* buffer = NULL;
int64_t encoded_value_size_
const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[]
Mapping of Impala codec enums to Parquet enums.
boost::scoped_ptr< ThriftSerializer > thrift_serializer_
boost::scoped_ptr< MemPool > reusable_col_mem_pool_
int64_t file_size_limit_
Limit on the total size of the file.
std::vector< uint8_t > compression_staging_buffer_
Status AppendRow(TupleRow *row)
static int DecimalSize(const ColumnType &t)
The minimum byte size to store decimals of with precision t.precision.
static Status GetFileBlockSize(OutputPartition *output_partition, int64_t *size)
HdfsTableSink * parent_
Parent table sink object.
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
parquet::FileMetaData file_metadata_
File metdata thrift description.
static const int DEFAULT_DATA_PAGE_SIZE
Default data page size. In bytes.
#define IMPALA_BUILD_HASH
BaseColumnWriter(HdfsParquetTableWriter *parent, ExprContext *expr_ctx, const THdfsCompression::type &codec)
parquet::RowGroup * current_row_group_
The current row group being written to.
virtual Status Init()
Initialize column information.
int num_values_since_dict_size_check_
virtual void Close()
Called once when this writer should cleanup any resources.
RuntimeState * state_
Runtime state.
scoped_ptr< Codec > compressor_
int64_t row_count_
Number of rows in current file.
const uint8_t PARQUET_VERSION_NUMBER[4]
void Append(const void *buffer, int len)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
int64_t file_size_estimate_
uint64_t num_values() const
static const int HDFS_BLOCK_SIZE
Default hdfs block size. In bytes.
TupleRow * GetRow(int row_idx)
Status FlushCurrentRowGroup()
virtual Status InitNewFile()
scoped_ptr< DictEncoder< T > > dict_encoder_
Status Write(const char *data, int32_t len)
Write to the current hdfs file.
static const int HDFS_MIN_FILE_SIZE
Minimum file size. If the configured size is less, fail.
virtual void FinalizeCurrentPage()
static const int HDFS_BLOCK_ALIGNMENT
Align block sizes to this constant. In bytes.
static int ByteSize(const T &v)
Returns the byte size of 'v'.
const HdfsTableDescriptor * table_desc_
Table descriptor of table to be written.
virtual Status Finalize()
Write out all the data.
Utility class to build an in-memory buffer.
#define COUNTER_ADD(c, v)
int64_t total_uncompressed_byte_size_
T * CastValue(void *value)
virtual bool EncodeValue(void *value, int64_t *bytes_needed)
int64_t total_compressed_byte_size_
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
static int64_t UnpaddedCharLength(const char *cptr, int64_t len)
Returns number of characters in a char array (ignores trailing spaces)
const ColumnType & type() const
static std::string GetCodecName(THdfsCompression::type)
Return the name of a compression algorithm.
ColumnWriter(HdfsParquetTableWriter *parent, ExprContext *ctx, const THdfsCompression::type &codec)
static int Encode(uint8_t *buffer, int fixed_len_size, const T &t)
const parquet::Type::type IMPALA_TO_PARQUET_TYPES[]
const TQueryOptions & query_options() const
parquet::CompressionCodec::type codec() const
ObjectPool * obj_pool() const
virtual ~BaseColumnWriter()
int num_clustering_cols() const
~HdfsParquetTableWriter()
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
friend class BoolColumnWriter
static const int MAX_DICTIONARY_ENTRIES
HdfsParquetTableWriter(HdfsTableSink *parent, RuntimeState *state, OutputPartition *output_partition, const HdfsPartitionDescriptor *part_desc, const HdfsTableDescriptor *table_desc, const std::vector< ExprContext * > &output_expr_ctxs)
std::vector< ExprContext * > output_expr_ctxs_
Expressions that materialize output values.
HdfsParquetTableWriter * parent_
const std::vector< std::string > & col_names() const
int64_t num_rows
Records number of rows appended to the current file in this partition.
virtual void FinalizeCurrentPage()
Status WriteFileHeader()
Write the file header information to the output file.
vector< DataPage > pages_
std::vector< BaseColumnWriter * > columns_
array of pointers to column information.
boost::scoped_ptr< MemPool > per_file_mem_pool_
const ColumnType & type() const
uint64_t total_compressed_size() const
RuntimeProfile::Counter * hdfs_write_timer()
DictEncoderBase * dict_encoder_base_
RuntimeProfile::Counter * rows_inserted_counter()
TInsertStats stats_
Subclass should populate any file format specific stats.
RuntimeProfile::Counter * encode_timer()
static char * CharSlotToPtr(void *slot, const ColumnType &type)
virtual uint64_t default_block_size() const
Returns the target HDFS block size to use.
Metadata for a single partition inside an Hdfs table.
virtual Status AppendRowBatch(RowBatch *batch, const std::vector< int32_t > &row_group_indices, bool *new_file)
Appends parquet representation of rows in the batch to the current file.
uint64_t total_uncompressed_size() const
Status Flush(int64_t *file_pos, int64_t *first_data_page, int64_t *first_dictionary_page)
#define IMPALA_BUILD_VERSION
const uint32_t PARQUET_CURRENT_VERSION
BoolColumnWriter(HdfsParquetTableWriter *parent, ExprContext *ctx, const THdfsCompression::type &codec)
THdfsCompression::type codec_
Status WriteFileFooter()
Write the file metadata and footer.
Encoding::type current_encoding_
TParquetInsertStats parquet_stats_
For each column, the on disk size written.
OutputPartition * output_
Structure describing partition written to by this writer.
virtual bool EncodeValue(void *value, int64_t *bytes_needed)
static int RoundUp(int value, int factor)
Returns 'value' rounded up to the nearest multiple of 'factor'.
int64_t MinBlockSize() const
Minimum allowable block size in bytes. This is a function of the number of columns.