Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
|
#include <hdfs-table-sink.h>
Public Member Functions | |
HdfsTableSink (const RowDescriptor &row_desc, const std::vector< TExpr > &select_list_texprs, const TDataSink &tsink) | |
virtual Status | Prepare (RuntimeState *state) |
Prepares output_exprs and partition_key_exprs, and connects to HDFS. More... | |
virtual Status | Open (RuntimeState *state) |
virtual Status | Send (RuntimeState *state, RowBatch *batch, bool eos) |
Append all rows in batch to the temporary Hdfs files corresponding to partitions. More... | |
virtual void | Close (RuntimeState *state) |
virtual RuntimeProfile * | profile () |
Returns the runtime profile for the sink. More... | |
const HdfsTableDescriptor & | TableDesc () |
MemTracker * | mem_tracker () |
RuntimeProfile::Counter * | rows_inserted_counter () |
RuntimeProfile::Counter * | bytes_written_counter () |
RuntimeProfile::Counter * | encode_timer () |
RuntimeProfile::Counter * | hdfs_write_timer () |
RuntimeProfile::Counter * | compress_timer () |
std::string | DebugString () const |
Static Public Member Functions | |
static Status | GetFileBlockSize (OutputPartition *output_partition, int64_t *size) |
static Status | CreateDataSink (ObjectPool *pool, const TDataSink &thrift_sink, const std::vector< TExpr > &output_exprs, const TPlanFragmentExecParams ¶ms, const RowDescriptor &row_desc, boost::scoped_ptr< DataSink > *sink) |
static void | MergeInsertStats (const TInsertStats &src_stats, TInsertStats *dst_stats) |
static std::string | OutputInsertStats (const PartitionStatusMap &stats, const std::string &prefix="") |
Outputs the insert stats contained in the map of insert partition updates to a string. More... | |
Protected Attributes | |
bool | closed_ |
boost::scoped_ptr< MemTracker > | expr_mem_tracker_ |
Private Types | |
typedef std::pair < OutputPartition *, std::vector< int32_t > > | PartitionPair |
typedef boost::unordered_map < std::string, PartitionPair > | PartitionMap |
typedef boost::unordered_map < std::string, HdfsPartitionDescriptor * > | PartitionDescriptorMap |
Private Member Functions | |
Status | InitOutputPartition (RuntimeState *state, const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output_partition) |
Initialises the filenames of a given output partition, and opens the temporary file. More... | |
Status | CreateNewTmpFile (RuntimeState *state, OutputPartition *output_partition) |
void | GetHashTblKey (const std::vector< ExprContext * > &ctxs, std::string *key) |
Status | GetOutputPartition (RuntimeState *state, const std::string &key, PartitionPair **partition_pair) |
Status | PrepareExprs (RuntimeState *state) |
Initialise and prepare select and partition key expressions. More... | |
void | BuildHdfsFileNames (const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output) |
Status | FinalizePartitionFile (RuntimeState *state, OutputPartition *partition) |
void | ClosePartitionFile (RuntimeState *state, OutputPartition *partition) |
Closes the hdfs file for this partition as well as the writer. More... | |
The sink consumes all row batches of its child execution tree, and writes the evaluated output_exprs into temporary Hdfs files. The query coordinator moves the temporary files into their final locations after the sinks have finished executing. This sink supports static and dynamic partition inserts (Hive terminology), as well as inserting into unpartitioned tables, and optional overwriting of partitions/tables. Files and partitions: This sink writes one or more Hdfs files per output partition, corresponding to an Hdfs directory. The Hdfs file names depend on unique_id, and therefore, we rely on the global uniqueness of unique_id, ie, no two HdfsTableSinks must be constructed with the same unique_id. For each row, its target partition is computed based on the partition_key_exprs from tsink. A map of opened Hdfs files (corresponding to partitions) is maintained. Each row may belong to different partition than the one before it. Failure behavior: In Exec() all data is written to Hdfs files in a temporary directory. In Close() all temporary Hdfs files are moved to their final locations, while also removing the original files if overwrite was specified, as follows:
Definition at line 122 of file hdfs-table-sink.h.
|
private |
Map from row key (i.e. concatenated non-constant partition keys) to partition descriptor. We don't own the HdfsPartitionDescriptors, they belong to the table descriptor. The key is generated by GetHashTblKey() from the keys in a row.
Definition at line 266 of file hdfs-table-sink.h.
|
private |
Definition at line 177 of file hdfs-table-sink.h.
|
private |
Key is the concatenation of the evaluated dynamic_partition_key_exprs_ generated by GetHashTblKey(). Maps to an OutputPartition, which are owned by the object pool and a vector of rows to insert into this partition from the current row batch.
Definition at line 176 of file hdfs-table-sink.h.
impala::HdfsTableSink::HdfsTableSink | ( | const RowDescriptor & | row_desc, |
const std::vector< TExpr > & | select_list_texprs, | ||
const TDataSink & | tsink | ||
) |
|
private |
Sets hdfs_file_name and tmp_hdfs_file_name of given output partition. The Hdfs directory is created from the target table's base Hdfs dir, the partition_key_names_ and the evaluated partition_key_exprs_. The Hdfs file name is the unique_id_str_.
Definition at line 240 of file hdfs-table-sink.cc.
References impala::OutputPartition::final_hdfs_file_name_prefix, impala::HdfsTableDescriptor::hdfs_base_dir(), impala::HdfsPartitionDescriptor::location(), impala::OutputPartition::num_files, impala::OutputPartition::partition_name, staging_dir_, table_desc_, impala::OutputPartition::tmp_hdfs_dir_name, impala::OutputPartition::tmp_hdfs_file_name_prefix, and unique_id_str_.
Referenced by InitOutputPartition().
|
inline |
Definition at line 152 of file hdfs-table-sink.h.
References bytes_written_counter_.
Referenced by impala::HdfsTableWriter::Write().
|
virtual |
Move temporary Hdfs files to final locations. Remove original Hdfs files if overwrite was specified. Closes output_exprs and partition_key_exprs.
Implements impala::DataSink.
Definition at line 602 of file hdfs-table-sink.cc.
References impala::Expr::Close(), impala::DataSink::closed_, impala::HdfsPartitionDescriptor::CloseExprs(), ClosePartitionFile(), output_expr_ctxs_, impala::HdfsTableDescriptor::partition_descriptors(), partition_key_expr_ctxs_, partition_keys_to_output_partitions_, runtime_profile_, SCOPED_TIMER, table_desc_, and impala::RuntimeProfile::total_time_counter().
|
private |
Closes the hdfs file for this partition as well as the writer.
Definition at line 589 of file hdfs-table-sink.cc.
References impala::OutputPartition::current_file_name, impala::GetHdfsErrorMsg(), hdfs_connection_, impala::RuntimeState::LogError(), impala::ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT, impala::OutputPartition::tmp_hdfs_file, and VLOG_FILE.
Referenced by Close(), CreateNewTmpFile(), and FinalizePartitionFile().
|
inline |
Definition at line 155 of file hdfs-table-sink.h.
References compress_timer_.
Referenced by impala::HdfsSequenceTableWriter::ConsumeRow(), impala::HdfsTextTableWriter::Flush(), impala::HdfsAvroTableWriter::Flush(), and impala::HdfsSequenceTableWriter::WriteCompressedBlock().
|
staticinherited |
Creates a new data sink from thrift_sink. A pointer to the new sink is written to *sink, and is owned by the caller.
Definition at line 34 of file data-sink.cc.
References impala::Status::OK.
Referenced by impala::PlanFragmentExecutor::Prepare().
|
private |
Add a temporary file to an output partition. Files are created in a temporary directory and then moved to the real partition directory by the coordinator in a finalization step. The temporary file's current location and final destination are recorded in the state parameter. If this function fails, the tmp file is cleaned up.
Definition at line 279 of file hdfs-table-sink.cc.
References ADD_TIMER, impala::HdfsPartitionDescriptor::block_size(), ClosePartitionFile(), COUNTER_ADD, impala::OutputPartition::current_file_name, files_created_counter_, impala::OutputPartition::final_hdfs_file_name_prefix, impala::GetHdfsErrorMsg(), hdfs_connection_, impala::RuntimeState::hdfs_files_to_move(), impala::OutputPartition::num_files, impala::ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT, impala::OutputPartition::num_rows, impala::Status::ok(), impala::OutputPartition::partition_descriptor, profile(), SCOPED_TIMER, impala::OutputPartition::tmp_hdfs_file, impala::OutputPartition::tmp_hdfs_file_name_prefix, VLOG_FILE, and impala::OutputPartition::writer.
Referenced by InitOutputPartition(), and Send().
string impala::HdfsTableSink::DebugString | ( | ) | const |
Definition at line 643 of file hdfs-table-sink.cc.
References impala::Expr::DebugString(), impala::HdfsTableDescriptor::DebugString(), output_expr_ctxs_, overwrite_, partition_key_expr_ctxs_, and table_desc_.
Referenced by impala::HdfsSequenceTableWriter::AppendRowBatch(), impala::HdfsTextTableWriter::AppendRowBatch(), impala::HdfsSequenceTableWriter::EncodeRow(), impala::HdfsTableWriter::HdfsTableWriter(), and PrepareExprs().
|
inline |
Definition at line 153 of file hdfs-table-sink.h.
References encode_timer_.
Referenced by impala::HdfsSequenceTableWriter::AppendRowBatch(), impala::HdfsTextTableWriter::AppendRowBatch(), impala::HdfsParquetTableWriter::AppendRowBatch(), and impala::HdfsAvroTableWriter::AppendRowBatch().
|
private |
Updates runtime stats of HDFS with rows written, then closes the file associated with the partition by calling ClosePartitionFile()
Definition at line 565 of file hdfs-table-sink.cc.
References ADD_TIMER, ClosePartitionFile(), impala::DataSink::MergeInsertStats(), impala::OutputPartition::num_rows, impala::Status::OK, overwrite_, impala::OutputPartition::partition_name, impala::RuntimeState::per_partition_status(), profile(), RETURN_IF_ERROR, SCOPED_TIMER, impala::OutputPartition::tmp_hdfs_file, and impala::OutputPartition::writer.
Referenced by Send().
|
static |
Get the block size of the current file opened for this partition. This is a utility routine that can be called by specific table writers. Currently used by the parquet writer.
Definition at line 628 of file hdfs-table-sink.cc.
References impala::OutputPartition::current_file_name, impala::GetHdfsErrorMsg(), impala::OutputPartition::hdfs_connection, and impala::Status::OK.
Referenced by impala::HdfsParquetTableWriter::InitNewFile().
|
private |
Generates string key for hash_tbl_ as a concatenation of all evaluated exprs, evaluated against current_row_. The generated string is much shorter than the full Hdfs file name.
Definition at line 436 of file hdfs-table-sink.cc.
References current_row_, and impala::RawValue::PrintValueAsBytes().
|
inlineprivate |
Given a hashed partition key, get the output partition structure from the partition_keys_to_output_partitions_.
Definition at line 447 of file hdfs-table-sink.cc.
References impala::ObjectPool::Add(), default_partition_, has_empty_input_batch_, impala::RuntimeState::hdfs_files_to_move(), impala::HdfsPartitionDescriptor::id(), InitOutputPartition(), impala::RuntimeState::obj_pool(), impala::Status::OK, impala::Status::ok(), partition_descriptor_map_, partition_keys_to_output_partitions_, impala::OutputPartition::partition_name, impala::RuntimeState::per_partition_status(), impala::OutputPartition::tmp_hdfs_dir_name, and impala::OutputPartition::writer.
Referenced by Send().
|
inline |
Definition at line 154 of file hdfs-table-sink.h.
References hdfs_write_timer_.
Referenced by impala::HdfsParquetTableWriter::Finalize(), impala::HdfsTextTableWriter::Flush(), impala::HdfsSequenceTableWriter::Flush(), and impala::HdfsAvroTableWriter::Flush().
|
private |
Initialises the filenames of a given output partition, and opens the temporary file.
Definition at line 325 of file hdfs-table-sink.cc.
References BuildHdfsFileNames(), impala::TableDescriptor::col_names(), COUNTER_ADD, CreateNewTmpFile(), current_row_, impala::HdfsPartitionDescriptor::file_format(), has_empty_input_batch_, impala::OutputPartition::hdfs_connection, hdfs_connection_, impala::HdfsTableDescriptor::null_partition_key_value(), impala::Status::OK, output_expr_ctxs_, impala::OutputPartition::partition_descriptor, partition_key_expr_ctxs_, impala::OutputPartition::partition_name, partitions_created_counter_, impala::RuntimeState::query_options(), RETURN_IF_ERROR, table_desc_, impala::UrlEncode(), and impala::OutputPartition::writer.
Referenced by GetOutputPartition().
|
inline |
Definition at line 149 of file hdfs-table-sink.h.
References mem_tracker_.
Referenced by impala::HdfsTextTableWriter::Close(), impala::HdfsAvroTableWriter::HdfsAvroTableWriter(), impala::HdfsSequenceTableWriter::HdfsSequenceTableWriter(), and impala::HdfsTextTableWriter::Init().
|
staticinherited |
Merges one update to the insert stats for a partition. dst_stats will have the combined stats of src_stats and dst_stats after this method returns.
Definition at line 90 of file data-sink.cc.
Referenced by FinalizePartitionFile().
|
virtual |
Opens output_exprs and partition_key_exprs, prepares the single output partition for static inserts, and populates partition_descriptor_map_.
Implements impala::DataSink.
Definition at line 160 of file hdfs-table-sink.cc.
References default_partition_, impala::RawValue::Eq(), GetHashTblKey(), impala::Status::OK, impala::Expr::Open(), impala::HdfsPartitionDescriptor::OpenExprs(), output_expr_ctxs_, partition_descriptor_map_, impala::HdfsTableDescriptor::partition_descriptors(), partition_key_expr_ctxs_, impala::HdfsPartitionDescriptor::partition_key_value_ctxs(), RETURN_IF_ERROR, and table_desc_.
|
staticinherited |
Outputs the insert stats contained in the map of insert partition updates to a string.
Definition at line 103 of file data-sink.cc.
References impala::PrettyPrinter::Print().
|
virtual |
Prepares output_exprs and partition_key_exprs, and connects to HDFS.
Reimplemented from impala::DataSink.
Definition at line 107 of file hdfs-table-sink.cc.
References impala::ObjectPool::Add(), ADD_COUNTER, ADD_TIMER, bytes_written_counter_, compress_timer_, impala::Expr::CreateExprTrees(), impala::RuntimeState::desc_tbl(), encode_timer_, files_created_counter_, impala::RuntimeState::fragment_instance_id(), impala::DescriptorTbl::GetTableDescriptor(), impala::HdfsTableDescriptor::hdfs_base_dir(), hdfs_connection_, hdfs_write_timer_, impala::HdfsFsCache::instance(), impala::RuntimeState::instance_mem_tracker(), mem_tracker_, impala::name, impala::RuntimeState::obj_pool(), impala::Status::OK, output_expr_ctxs_, partition_key_expr_ctxs_, partition_key_texprs_, partitions_created_counter_, impala::DataSink::Prepare(), PrepareExprs(), impala::PrintId(), profile(), impala::RuntimeState::query_id(), RETURN_IF_ERROR, rows_inserted_counter_, runtime_profile_, SCOPED_TIMER, select_list_texprs_, staging_dir_, table_desc_, table_id_, unique_id_str_, and VLOG_QUERY.
|
private |
Initialise and prepare select and partition key expressions.
Definition at line 72 of file hdfs-table-sink.cc.
References impala::TableDescriptor::col_names(), DebugString(), dynamic_partition_key_expr_ctxs_, impala::DataSink::expr_mem_tracker_, impala::TableDescriptor::num_clustering_cols(), impala::TableDescriptor::num_cols(), impala::Status::OK, output_expr_ctxs_, impala::HdfsTableDescriptor::partition_descriptors(), partition_key_expr_ctxs_, impala::Expr::Prepare(), impala::HdfsPartitionDescriptor::PrepareExprs(), RETURN_IF_ERROR, row_desc_, and table_desc_.
Referenced by Prepare().
|
inlinevirtual |
Returns the runtime profile for the sink.
Implements impala::DataSink.
Definition at line 147 of file hdfs-table-sink.h.
References runtime_profile_.
Referenced by CreateNewTmpFile(), FinalizePartitionFile(), and Prepare().
|
inline |
Definition at line 151 of file hdfs-table-sink.h.
References rows_inserted_counter_.
Referenced by impala::HdfsSequenceTableWriter::AppendRowBatch(), impala::HdfsTextTableWriter::AppendRowBatch(), impala::HdfsAvroTableWriter::AppendRowBatch(), and impala::HdfsParquetTableWriter::Finalize().
|
virtual |
Append all rows in batch to the temporary Hdfs files corresponding to partitions.
Implements impala::DataSink.
Definition at line 495 of file hdfs-table-sink.cc.
References impala::RuntimeState::CheckQueryState(), CreateNewTmpFile(), current_row_, dynamic_partition_key_expr_ctxs_, FinalizePartitionFile(), impala::ExprContext::FreeLocalAllocations(), GetHashTblKey(), GetOutputPartition(), impala::RowBatch::GetRow(), has_empty_input_batch_, impala::RowBatch::num_rows(), impala::Status::OK, output_expr_ctxs_, partition_key_expr_ctxs_, partition_keys_to_output_partitions_, RETURN_IF_ERROR, impala::ROOT_PARTITION_KEY, runtime_profile_, SCOPED_TIMER, impala::RuntimeProfile::total_time_counter(), and impala::OutputPartition::writer.
|
inline |
Definition at line 148 of file hdfs-table-sink.h.
References table_desc_.
|
private |
Definition at line 276 of file hdfs-table-sink.h.
Referenced by bytes_written_counter(), and Prepare().
|
protectedinherited |
Set to true after Close() has been called. Subclasses should check and set this in Close().
Definition at line 83 of file data-sink.h.
Referenced by Close().
|
private |
Time spent compressing data.
Definition at line 283 of file hdfs-table-sink.h.
Referenced by compress_timer(), and Prepare().
|
private |
Current row from the current RowBatch to output.
Definition at line 217 of file hdfs-table-sink.h.
Referenced by GetHashTblKey(), InitOutputPartition(), and Send().
|
private |
Currently this is the default partition since we don't support multi-format sinks.
Definition at line 211 of file hdfs-table-sink.h.
Referenced by GetOutputPartition(), and Open().
|
private |
Subset of partition_key_expr_ctxs_ which are not constant. Set in Prepare(). Used for generating the string key of hash_tbl_.
Definition at line 259 of file hdfs-table-sink.h.
Referenced by PrepareExprs(), and Send().
|
private |
Time spent converting tuple to on disk format.
Definition at line 279 of file hdfs-table-sink.h.
Referenced by encode_timer(), and Prepare().
|
protectedinherited |
Definition at line 85 of file data-sink.h.
Referenced by impala::DataSink::Prepare(), and PrepareExprs().
|
private |
Definition at line 274 of file hdfs-table-sink.h.
Referenced by CreateNewTmpFile(), and Prepare().
|
private |
Flag to indicate the current input batch passed in Send() is empty. It implies that we must not initialize the OutputPartition writer of a static partition insert.
Definition at line 287 of file hdfs-table-sink.h.
Referenced by GetOutputPartition(), InitOutputPartition(), and Send().
|
private |
Connection to hdfs, established in Open() and closed in Close().
Definition at line 220 of file hdfs-table-sink.h.
Referenced by ClosePartitionFile(), CreateNewTmpFile(), InitOutputPartition(), and Prepare().
|
private |
Time spent writing to hdfs.
Definition at line 281 of file hdfs-table-sink.h.
Referenced by hdfs_write_timer(), and Prepare().
|
private |
Definition at line 269 of file hdfs-table-sink.h.
Referenced by mem_tracker(), and Prepare().
|
private |
Exprs that materialize output values.
Definition at line 214 of file hdfs-table-sink.h.
Referenced by Close(), DebugString(), InitOutputPartition(), Open(), Prepare(), PrepareExprs(), and Send().
|
private |
Indicates whether the existing partitions should be overwritten.
Definition at line 240 of file hdfs-table-sink.h.
Referenced by DebugString(), and FinalizePartitionFile().
|
private |
Definition at line 267 of file hdfs-table-sink.h.
Referenced by GetOutputPartition(), and Open().
|
private |
Exprs of partition keys.
Definition at line 237 of file hdfs-table-sink.h.
Referenced by Close(), DebugString(), InitOutputPartition(), Open(), Prepare(), PrepareExprs(), and Send().
|
private |
Thrift representation of partition keys, saved in the constructor to be used to initialise partition_key_exprs_ in Init
Definition at line 234 of file hdfs-table-sink.h.
Referenced by Prepare().
|
private |
Hash table of generated output partitions. Maps from a string representation of the dynamic_partition_key_exprs_ generated by GetHashTblKey() to its corresponding OutputPartition. If there are no partitions (and no partition keys) we store a single OutputPartition in the map to simplify the code.
Definition at line 255 of file hdfs-table-sink.h.
Referenced by Close(), GetOutputPartition(), and Send().
|
private |
Definition at line 273 of file hdfs-table-sink.h.
Referenced by InitOutputPartition(), and Prepare().
|
private |
Row descriptor of row batches passed in Send(). Set in c'tor.
Definition at line 223 of file hdfs-table-sink.h.
Referenced by PrepareExprs().
|
private |
Definition at line 275 of file hdfs-table-sink.h.
Referenced by Prepare(), and rows_inserted_counter().
|
private |
|
private |
Thrift representation of select list exprs, saved in the constructor to be used to initialise output_exprs_ in Init
Definition at line 230 of file hdfs-table-sink.h.
Referenced by Prepare().
|
private |
The directory in which to write intermediate results. Set to <hdfs_table_base_dir>/_impala_insert_staging/ during Prepare()
Definition at line 244 of file hdfs-table-sink.h.
Referenced by BuildHdfsFileNames(), and Prepare().
|
private |
Descriptor of target table. Set in Prepare().
Definition at line 208 of file hdfs-table-sink.h.
Referenced by BuildHdfsFileNames(), Close(), DebugString(), InitOutputPartition(), Open(), Prepare(), PrepareExprs(), and TableDesc().
|
private |
Table id resolved in Prepare() to set tuple_desc_;.
Definition at line 226 of file hdfs-table-sink.h.
Referenced by Prepare().
|
private |
string representation of the unique fragment instance id. Used for per-partition Hdfs file names, and for tmp Hdfs directories. Set in Prepare();
Definition at line 248 of file hdfs-table-sink.h.
Referenced by BuildHdfsFileNames(), and Prepare().