Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala::HdfsTableSink Class Reference

#include <hdfs-table-sink.h>

Inheritance diagram for impala::HdfsTableSink:
Collaboration diagram for impala::HdfsTableSink:

Public Member Functions

 HdfsTableSink (const RowDescriptor &row_desc, const std::vector< TExpr > &select_list_texprs, const TDataSink &tsink)
 
virtual Status Prepare (RuntimeState *state)
 Prepares output_exprs and partition_key_exprs, and connects to HDFS. More...
 
virtual Status Open (RuntimeState *state)
 
virtual Status Send (RuntimeState *state, RowBatch *batch, bool eos)
 Append all rows in batch to the temporary Hdfs files corresponding to partitions. More...
 
virtual void Close (RuntimeState *state)
 
virtual RuntimeProfileprofile ()
 Returns the runtime profile for the sink. More...
 
const HdfsTableDescriptorTableDesc ()
 
MemTrackermem_tracker ()
 
RuntimeProfile::Counterrows_inserted_counter ()
 
RuntimeProfile::Counterbytes_written_counter ()
 
RuntimeProfile::Counterencode_timer ()
 
RuntimeProfile::Counterhdfs_write_timer ()
 
RuntimeProfile::Countercompress_timer ()
 
std::string DebugString () const
 

Static Public Member Functions

static Status GetFileBlockSize (OutputPartition *output_partition, int64_t *size)
 
static Status CreateDataSink (ObjectPool *pool, const TDataSink &thrift_sink, const std::vector< TExpr > &output_exprs, const TPlanFragmentExecParams &params, const RowDescriptor &row_desc, boost::scoped_ptr< DataSink > *sink)
 
static void MergeInsertStats (const TInsertStats &src_stats, TInsertStats *dst_stats)
 
static std::string OutputInsertStats (const PartitionStatusMap &stats, const std::string &prefix="")
 Outputs the insert stats contained in the map of insert partition updates to a string. More...
 

Protected Attributes

bool closed_
 
boost::scoped_ptr< MemTrackerexpr_mem_tracker_
 

Private Types

typedef std::pair
< OutputPartition
*, std::vector< int32_t > > 
PartitionPair
 
typedef boost::unordered_map
< std::string, PartitionPair
PartitionMap
 
typedef boost::unordered_map
< std::string,
HdfsPartitionDescriptor * > 
PartitionDescriptorMap
 

Private Member Functions

Status InitOutputPartition (RuntimeState *state, const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output_partition)
 Initialises the filenames of a given output partition, and opens the temporary file. More...
 
Status CreateNewTmpFile (RuntimeState *state, OutputPartition *output_partition)
 
void GetHashTblKey (const std::vector< ExprContext * > &ctxs, std::string *key)
 
Status GetOutputPartition (RuntimeState *state, const std::string &key, PartitionPair **partition_pair)
 
Status PrepareExprs (RuntimeState *state)
 Initialise and prepare select and partition key expressions. More...
 
void BuildHdfsFileNames (const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output)
 
Status FinalizePartitionFile (RuntimeState *state, OutputPartition *partition)
 
void ClosePartitionFile (RuntimeState *state, OutputPartition *partition)
 Closes the hdfs file for this partition as well as the writer. More...
 

Private Attributes

const HdfsTableDescriptortable_desc_
 Descriptor of target table. Set in Prepare(). More...
 
const HdfsPartitionDescriptordefault_partition_
 Currently this is the default partition since we don't support multi-format sinks. More...
 
std::vector< ExprContext * > output_expr_ctxs_
 Exprs that materialize output values. More...
 
TupleRowcurrent_row_
 Current row from the current RowBatch to output. More...
 
hdfsFS hdfs_connection_
 Connection to hdfs, established in Open() and closed in Close(). More...
 
const RowDescriptorrow_desc_
 Row descriptor of row batches passed in Send(). Set in c'tor. More...
 
TableId table_id_
 Table id resolved in Prepare() to set tuple_desc_;. More...
 
const std::vector< TExpr > & select_list_texprs_
 
const std::vector< TExpr > & partition_key_texprs_
 
std::vector< ExprContext * > partition_key_expr_ctxs_
 Exprs of partition keys. More...
 
bool overwrite_
 Indicates whether the existing partitions should be overwritten. More...
 
std::string staging_dir_
 
std::string unique_id_str_
 
PartitionMap partition_keys_to_output_partitions_
 
std::vector< ExprContext * > dynamic_partition_key_expr_ctxs_
 
PartitionDescriptorMap partition_descriptor_map_
 
boost::scoped_ptr< MemTrackermem_tracker_
 
RuntimeProfileruntime_profile_
 Allocated from runtime state's pool. More...
 
RuntimeProfile::Counterpartitions_created_counter_
 
RuntimeProfile::Counterfiles_created_counter_
 
RuntimeProfile::Counterrows_inserted_counter_
 
RuntimeProfile::Counterbytes_written_counter_
 
RuntimeProfile::Counterencode_timer_
 Time spent converting tuple to on disk format. More...
 
RuntimeProfile::Counterhdfs_write_timer_
 Time spent writing to hdfs. More...
 
RuntimeProfile::Countercompress_timer_
 Time spent compressing data. More...
 
bool has_empty_input_batch_
 

Detailed Description

The sink consumes all row batches of its child execution tree, and writes the evaluated output_exprs into temporary Hdfs files. The query coordinator moves the temporary files into their final locations after the sinks have finished executing. This sink supports static and dynamic partition inserts (Hive terminology), as well as inserting into unpartitioned tables, and optional overwriting of partitions/tables. Files and partitions: This sink writes one or more Hdfs files per output partition, corresponding to an Hdfs directory. The Hdfs file names depend on unique_id, and therefore, we rely on the global uniqueness of unique_id, ie, no two HdfsTableSinks must be constructed with the same unique_id. For each row, its target partition is computed based on the partition_key_exprs from tsink. A map of opened Hdfs files (corresponding to partitions) is maintained. Each row may belong to different partition than the one before it. Failure behavior: In Exec() all data is written to Hdfs files in a temporary directory. In Close() all temporary Hdfs files are moved to their final locations, while also removing the original files if overwrite was specified, as follows:

  1. We move all temporary files to their final destinations.
  2. After all tmp files have been moved, we delete the original files if overwrite was specified. There is a possibility of data inconsistency, e.g., if a failure occurs while moving the Hdfs files. The temporary directory is
    <unique_id.hi>-<unique_id.lo>_data such that an external tool can easily clean up incomplete inserts. This is consistent with Hive's behavior.

Definition at line 122 of file hdfs-table-sink.h.

Member Typedef Documentation

typedef boost::unordered_map<std::string, HdfsPartitionDescriptor*> impala::HdfsTableSink::PartitionDescriptorMap
private

Map from row key (i.e. concatenated non-constant partition keys) to partition descriptor. We don't own the HdfsPartitionDescriptors, they belong to the table descriptor. The key is generated by GetHashTblKey() from the keys in a row.

Definition at line 266 of file hdfs-table-sink.h.

typedef boost::unordered_map<std::string, PartitionPair> impala::HdfsTableSink::PartitionMap
private

Definition at line 177 of file hdfs-table-sink.h.

typedef std::pair<OutputPartition*, std::vector<int32_t> > impala::HdfsTableSink::PartitionPair
private

Key is the concatenation of the evaluated dynamic_partition_key_exprs_ generated by GetHashTblKey(). Maps to an OutputPartition, which are owned by the object pool and a vector of rows to insert into this partition from the current row batch.

Definition at line 176 of file hdfs-table-sink.h.

Constructor & Destructor Documentation

impala::HdfsTableSink::HdfsTableSink ( const RowDescriptor row_desc,
const std::vector< TExpr > &  select_list_texprs,
const TDataSink &  tsink 
)

Member Function Documentation

void impala::HdfsTableSink::BuildHdfsFileNames ( const HdfsPartitionDescriptor partition_descriptor,
OutputPartition output 
)
private

Sets hdfs_file_name and tmp_hdfs_file_name of given output partition. The Hdfs directory is created from the target table's base Hdfs dir, the partition_key_names_ and the evaluated partition_key_exprs_. The Hdfs file name is the unique_id_str_.

Definition at line 240 of file hdfs-table-sink.cc.

References impala::OutputPartition::final_hdfs_file_name_prefix, impala::HdfsTableDescriptor::hdfs_base_dir(), impala::HdfsPartitionDescriptor::location(), impala::OutputPartition::num_files, impala::OutputPartition::partition_name, staging_dir_, table_desc_, impala::OutputPartition::tmp_hdfs_dir_name, impala::OutputPartition::tmp_hdfs_file_name_prefix, and unique_id_str_.

Referenced by InitOutputPartition().

RuntimeProfile::Counter* impala::HdfsTableSink::bytes_written_counter ( )
inline

Definition at line 152 of file hdfs-table-sink.h.

References bytes_written_counter_.

Referenced by impala::HdfsTableWriter::Write().

void impala::HdfsTableSink::Close ( RuntimeState state)
virtual
void impala::HdfsTableSink::ClosePartitionFile ( RuntimeState state,
OutputPartition partition 
)
private
Status impala::DataSink::CreateDataSink ( ObjectPool pool,
const TDataSink &  thrift_sink,
const std::vector< TExpr > &  output_exprs,
const TPlanFragmentExecParams &  params,
const RowDescriptor row_desc,
boost::scoped_ptr< DataSink > *  sink 
)
staticinherited

Creates a new data sink from thrift_sink. A pointer to the new sink is written to *sink, and is owned by the caller.

Definition at line 34 of file data-sink.cc.

References impala::Status::OK.

Referenced by impala::PlanFragmentExecutor::Prepare().

Status impala::HdfsTableSink::CreateNewTmpFile ( RuntimeState state,
OutputPartition output_partition 
)
private
Status impala::HdfsTableSink::FinalizePartitionFile ( RuntimeState state,
OutputPartition partition 
)
private
Status impala::HdfsTableSink::GetFileBlockSize ( OutputPartition output_partition,
int64_t *  size 
)
static

Get the block size of the current file opened for this partition. This is a utility routine that can be called by specific table writers. Currently used by the parquet writer.

Definition at line 628 of file hdfs-table-sink.cc.

References impala::OutputPartition::current_file_name, impala::GetHdfsErrorMsg(), impala::OutputPartition::hdfs_connection, and impala::Status::OK.

Referenced by impala::HdfsParquetTableWriter::InitNewFile().

void impala::HdfsTableSink::GetHashTblKey ( const std::vector< ExprContext * > &  ctxs,
std::string *  key 
)
private

Generates string key for hash_tbl_ as a concatenation of all evaluated exprs, evaluated against current_row_. The generated string is much shorter than the full Hdfs file name.

Definition at line 436 of file hdfs-table-sink.cc.

References current_row_, and impala::RawValue::PrintValueAsBytes().

Referenced by Open(), and Send().

void impala::DataSink::MergeInsertStats ( const TInsertStats &  src_stats,
TInsertStats *  dst_stats 
)
staticinherited

Merges one update to the insert stats for a partition. dst_stats will have the combined stats of src_stats and dst_stats after this method returns.

Definition at line 90 of file data-sink.cc.

Referenced by FinalizePartitionFile().

Status impala::HdfsTableSink::Open ( RuntimeState state)
virtual
string impala::DataSink::OutputInsertStats ( const PartitionStatusMap stats,
const std::string &  prefix = "" 
)
staticinherited

Outputs the insert stats contained in the map of insert partition updates to a string.

Definition at line 103 of file data-sink.cc.

References impala::PrettyPrinter::Print().

virtual RuntimeProfile* impala::HdfsTableSink::profile ( )
inlinevirtual

Returns the runtime profile for the sink.

Implements impala::DataSink.

Definition at line 147 of file hdfs-table-sink.h.

References runtime_profile_.

Referenced by CreateNewTmpFile(), FinalizePartitionFile(), and Prepare().

const HdfsTableDescriptor& impala::HdfsTableSink::TableDesc ( )
inline

Definition at line 148 of file hdfs-table-sink.h.

References table_desc_.

Member Data Documentation

RuntimeProfile::Counter* impala::HdfsTableSink::bytes_written_counter_
private

Definition at line 276 of file hdfs-table-sink.h.

Referenced by bytes_written_counter(), and Prepare().

bool impala::DataSink::closed_
protectedinherited

Set to true after Close() has been called. Subclasses should check and set this in Close().

Definition at line 83 of file data-sink.h.

Referenced by Close().

RuntimeProfile::Counter* impala::HdfsTableSink::compress_timer_
private

Time spent compressing data.

Definition at line 283 of file hdfs-table-sink.h.

Referenced by compress_timer(), and Prepare().

TupleRow* impala::HdfsTableSink::current_row_
private

Current row from the current RowBatch to output.

Definition at line 217 of file hdfs-table-sink.h.

Referenced by GetHashTblKey(), InitOutputPartition(), and Send().

const HdfsPartitionDescriptor* impala::HdfsTableSink::default_partition_
private

Currently this is the default partition since we don't support multi-format sinks.

Definition at line 211 of file hdfs-table-sink.h.

Referenced by GetOutputPartition(), and Open().

std::vector<ExprContext*> impala::HdfsTableSink::dynamic_partition_key_expr_ctxs_
private

Subset of partition_key_expr_ctxs_ which are not constant. Set in Prepare(). Used for generating the string key of hash_tbl_.

Definition at line 259 of file hdfs-table-sink.h.

Referenced by PrepareExprs(), and Send().

RuntimeProfile::Counter* impala::HdfsTableSink::encode_timer_
private

Time spent converting tuple to on disk format.

Definition at line 279 of file hdfs-table-sink.h.

Referenced by encode_timer(), and Prepare().

boost::scoped_ptr<MemTracker> impala::DataSink::expr_mem_tracker_
protectedinherited

Definition at line 85 of file data-sink.h.

Referenced by impala::DataSink::Prepare(), and PrepareExprs().

RuntimeProfile::Counter* impala::HdfsTableSink::files_created_counter_
private

Definition at line 274 of file hdfs-table-sink.h.

Referenced by CreateNewTmpFile(), and Prepare().

bool impala::HdfsTableSink::has_empty_input_batch_
private

Flag to indicate the current input batch passed in Send() is empty. It implies that we must not initialize the OutputPartition writer of a static partition insert.

Definition at line 287 of file hdfs-table-sink.h.

Referenced by GetOutputPartition(), InitOutputPartition(), and Send().

hdfsFS impala::HdfsTableSink::hdfs_connection_
private

Connection to hdfs, established in Open() and closed in Close().

Definition at line 220 of file hdfs-table-sink.h.

Referenced by ClosePartitionFile(), CreateNewTmpFile(), InitOutputPartition(), and Prepare().

RuntimeProfile::Counter* impala::HdfsTableSink::hdfs_write_timer_
private

Time spent writing to hdfs.

Definition at line 281 of file hdfs-table-sink.h.

Referenced by hdfs_write_timer(), and Prepare().

boost::scoped_ptr<MemTracker> impala::HdfsTableSink::mem_tracker_
private

Definition at line 269 of file hdfs-table-sink.h.

Referenced by mem_tracker(), and Prepare().

std::vector<ExprContext*> impala::HdfsTableSink::output_expr_ctxs_
private

Exprs that materialize output values.

Definition at line 214 of file hdfs-table-sink.h.

Referenced by Close(), DebugString(), InitOutputPartition(), Open(), Prepare(), PrepareExprs(), and Send().

bool impala::HdfsTableSink::overwrite_
private

Indicates whether the existing partitions should be overwritten.

Definition at line 240 of file hdfs-table-sink.h.

Referenced by DebugString(), and FinalizePartitionFile().

PartitionDescriptorMap impala::HdfsTableSink::partition_descriptor_map_
private

Definition at line 267 of file hdfs-table-sink.h.

Referenced by GetOutputPartition(), and Open().

std::vector<ExprContext*> impala::HdfsTableSink::partition_key_expr_ctxs_
private

Exprs of partition keys.

Definition at line 237 of file hdfs-table-sink.h.

Referenced by Close(), DebugString(), InitOutputPartition(), Open(), Prepare(), PrepareExprs(), and Send().

const std::vector<TExpr>& impala::HdfsTableSink::partition_key_texprs_
private

Thrift representation of partition keys, saved in the constructor to be used to initialise partition_key_exprs_ in Init

Definition at line 234 of file hdfs-table-sink.h.

Referenced by Prepare().

PartitionMap impala::HdfsTableSink::partition_keys_to_output_partitions_
private

Hash table of generated output partitions. Maps from a string representation of the dynamic_partition_key_exprs_ generated by GetHashTblKey() to its corresponding OutputPartition. If there are no partitions (and no partition keys) we store a single OutputPartition in the map to simplify the code.

Definition at line 255 of file hdfs-table-sink.h.

Referenced by Close(), GetOutputPartition(), and Send().

RuntimeProfile::Counter* impala::HdfsTableSink::partitions_created_counter_
private

Definition at line 273 of file hdfs-table-sink.h.

Referenced by InitOutputPartition(), and Prepare().

const RowDescriptor& impala::HdfsTableSink::row_desc_
private

Row descriptor of row batches passed in Send(). Set in c'tor.

Definition at line 223 of file hdfs-table-sink.h.

Referenced by PrepareExprs().

RuntimeProfile::Counter* impala::HdfsTableSink::rows_inserted_counter_
private

Definition at line 275 of file hdfs-table-sink.h.

Referenced by Prepare(), and rows_inserted_counter().

RuntimeProfile* impala::HdfsTableSink::runtime_profile_
private

Allocated from runtime state's pool.

Definition at line 272 of file hdfs-table-sink.h.

Referenced by Close(), Prepare(), profile(), and Send().

const std::vector<TExpr>& impala::HdfsTableSink::select_list_texprs_
private

Thrift representation of select list exprs, saved in the constructor to be used to initialise output_exprs_ in Init

Definition at line 230 of file hdfs-table-sink.h.

Referenced by Prepare().

std::string impala::HdfsTableSink::staging_dir_
private

The directory in which to write intermediate results. Set to <hdfs_table_base_dir>/_impala_insert_staging/ during Prepare()

Definition at line 244 of file hdfs-table-sink.h.

Referenced by BuildHdfsFileNames(), and Prepare().

const HdfsTableDescriptor* impala::HdfsTableSink::table_desc_
private

Descriptor of target table. Set in Prepare().

Definition at line 208 of file hdfs-table-sink.h.

Referenced by BuildHdfsFileNames(), Close(), DebugString(), InitOutputPartition(), Open(), Prepare(), PrepareExprs(), and TableDesc().

TableId impala::HdfsTableSink::table_id_
private

Table id resolved in Prepare() to set tuple_desc_;.

Definition at line 226 of file hdfs-table-sink.h.

Referenced by Prepare().

std::string impala::HdfsTableSink::unique_id_str_
private

string representation of the unique fragment instance id. Used for per-partition Hdfs file names, and for tmp Hdfs directories. Set in Prepare();

Definition at line 248 of file hdfs-table-sink.h.

Referenced by BuildHdfsFileNames(), and Prepare().


The documentation for this class was generated from the following files: