21 #include "gen-cpp/ImpalaInternalService_constants.h"
36 #include <gutil/strings/substitute.h>
38 #include <boost/scoped_ptr.hpp>
39 #include <boost/date_time/posix_time/posix_time.hpp>
42 #include "gen-cpp/ImpalaInternalService_constants.h"
46 using boost::posix_time::microsec_clock;
47 using boost::posix_time::ptime;
48 using namespace strings;
53 g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
56 const vector<TExpr>& select_list_texprs,
57 const TDataSink& tsink)
59 table_id_(tsink.table_sink.target_table_id),
60 select_list_texprs_(select_list_texprs),
61 partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs),
62 overwrite_(tsink.table_sink.hdfs_table_sink.overwrite),
63 has_empty_input_batch_(false) {
64 DCHECK(tsink.__isset.table_sink);
67 OutputPartition::OutputPartition()
68 : hdfs_connection(NULL), tmp_hdfs_file(NULL), num_rows(0), num_files(0),
69 partition_descriptor(NULL) {
98 const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc,
115 ptime now = microsec_clock::local_time();
116 long seed = (now.time_of_day().seconds() * 1000)
117 + (now.time_of_day().total_microseconds() / 1000);
131 stringstream error_msg(
"Failed to get table descriptor for table id: ");
133 return Status(error_msg.str());
165 const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc,
176 const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc,
178 if (id_to_desc.first == g_ImpalaInternalService_constants.DEFAULT_PARTITION_ID) {
191 bool relevant_partition =
true;
195 vector<ExprContext*> dynamic_partition_key_value_ctxs;
199 dynamic_partition_key_value_ctxs.push_back(
208 void* table_partition_key_value =
211 if (table_partition_key_value == NULL && target_partition_key_value == NULL) {
214 if (table_partition_key_value == NULL || target_partition_key_value == NULL
215 || !
RawValue::Eq(table_partition_key_value, target_partition_key_value,
217 relevant_partition =
false;
222 if (relevant_partition) {
229 <<
"Partitions with duplicate 'static' keys found during INSERT";
235 return Status(
"No default partition found for HdfsTextTableSink");
257 const string& query_suffix = Substitute(
"$0_$1_data",
unique_id_str_, rand());
265 if (partition_descriptor.
location().empty()) {
273 Substitute(
"$0/$1", partition_descriptor.
location(), query_suffix);
282 stringstream filename;
285 <<
"." << output_partition->
writer->file_extension();
288 const char* tmp_hdfs_file_name_cstr =
295 if (block_size == 0) block_size = output_partition->
writer->default_block_size();
298 tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size);
299 VLOG_FILE <<
"hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr;
312 <<
"." << output_partition->
writer->file_extension();
317 Status status = output_partition->
writer->InitNewFile();
330 stringstream partition_name_ss;
349 UrlEncode(value_str, &encoded_str,
true);
351 partition_name_ss << (encoded_str.empty() ?
354 partition_name_ss <<
"/";
364 bool allow_unsupported_formats =
367 if (!allow_unsupported_formats) {
368 if (partition_descriptor.
file_format() == THdfsFileFormat::SEQUENCE_FILE ||
369 partition_descriptor.
file_format() == THdfsFileFormat::AVRO) {
370 stringstream error_msg;
371 map<int, const char*>::const_iterator i =
372 _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.
file_format());
373 error_msg <<
"Writing to table format " << i->second
374 <<
" is not supported. Use query option ALLOW_UNSUPPORTED_FORMATS"
376 return Status(error_msg.str());
378 if (partition_descriptor.
file_format() == THdfsFileFormat::TEXT &&
380 state->
query_options().compression_codec != THdfsCompression::NONE) {
381 stringstream error_msg;
382 error_msg <<
"Writing to compressed text table is not supported. "
383 "Use query option ALLOW_UNSUPPORTED_FORMATS to override.";
384 return Status(error_msg.str());
393 case THdfsFileFormat::TEXT:
394 output_partition->
writer.reset(
396 this, state, output_partition, &partition_descriptor,
table_desc_,
399 case THdfsFileFormat::PARQUET:
400 output_partition->
writer.reset(
402 this, state, output_partition, &partition_descriptor,
table_desc_,
405 case THdfsFileFormat::SEQUENCE_FILE:
406 output_partition->
writer.reset(
408 this, state, output_partition, &partition_descriptor,
table_desc_,
411 case THdfsFileFormat::AVRO:
412 output_partition->
writer.reset(
414 this, state, output_partition, &partition_descriptor,
table_desc_,
418 stringstream error_msg;
419 map<int, const char*>::const_iterator i =
420 _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.
file_format());
421 if (i != _THdfsFileFormat_VALUES_TO_NAMES.end()) {
422 error_msg <<
"Cannot write to table with format " << i->second <<
". "
423 <<
"Impala only supports writing to TEXT and PARQUET.";
425 error_msg <<
"Cannot write to table. Impala only supports writing to TEXT"
426 <<
" and PARQUET tables. (Unknown file format: "
429 return Status(error_msg.str());
437 stringstream hash_table_key;
438 for (
int i = 0; i < ctxs.size(); ++i) {
440 ctxs[i]->GetValue(
current_row_), ctxs[i]->root()->type(), &hash_table_key);
442 hash_table_key <<
"/";
444 *key = hash_table_key.str();
449 PartitionMap::iterator existing_partition;
457 partition_descriptor = it->second;
466 if (partition->
writer.get() != NULL) partition->
writer->Close();
474 TInsertPartitionStatus partition_status;
475 partition_status.__set_num_appended_rows(0L);
476 partition_status.__set_id(partition_descriptor->
id());
477 partition_status.__set_stats(TInsertStats());
490 *partition_pair = &existing_partition->second;
500 DCHECK(eos || batch->
num_rows() > 0);
510 if (!has_empty_input_batch_) {
518 batch, partition_pair->second, &new_file));
526 for (
int i = 0; i < batch->
num_rows(); ++i) {
533 partition_pair->second.push_back(i);
538 if (partition->second.second.empty())
continue;
543 batch, partition->second.second, &new_file));
549 partition->second.second.clear();
555 for (PartitionMap::iterator cur_partition =
571 if (partition->
writer.get() != NULL) {
576 PartitionStatusMap::iterator it =
581 it->second.num_appended_rows += partition->
num_rows;
605 for (PartitionMap::iterator cur_partition =
609 if (cur_partition->second.first->writer.get() != NULL) {
610 cur_partition->second.first->writer->Close();
618 const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc,
629 hdfsFileInfo* info = hdfsGetPathInfo(output_partition->
hdfs_connection,
637 *size = info->mBlockSize;
638 hdfsFreeFileInfo(info, 1);
645 out <<
"HdfsTableSink(overwrite=" << (
overwrite_ ?
"true" :
"false")
virtual std::string DebugString() const
static Status GetFileBlockSize(OutputPartition *output_partition, int64_t *size)
TableId table_id_
Table id resolved in Prepare() to set tuple_desc_;.
boost::scoped_ptr< MemTracker > expr_mem_tracker_
THdfsFileFormat::type file_format() const
std::vector< ExprContext * > output_expr_ctxs_
Exprs that materialize output values.
Status OpenExprs(RuntimeState *state)
const TUniqueId & query_id() const
std::string final_hdfs_file_name_prefix
virtual RuntimeProfile * profile()
Returns the runtime profile for the sink.
void BuildHdfsFileNames(const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output)
static void PrintValueAsBytes(const void *value, const ColumnType &type, std::stringstream *stream)
Writes the byte representation of a value to a stringstream character-by-character.
static bool Eq(const void *v1, const void *v2, const ColumnType &type)
Status PrepareExprs(RuntimeState *state)
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
FileMoveMap * hdfs_files_to_move()
Status PrepareExprs(RuntimeState *state)
Initialise and prepare select and partition key expressions.
std::string tmp_hdfs_dir_name
std::string current_file_name
TableDescriptor * GetTableDescriptor(TableId id) const
PartitionDescriptorMap partition_descriptor_map_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
virtual void Close(RuntimeState *state)
TupleRow * GetRow(int row_idx)
RuntimeProfile::Counter * partitions_created_counter_
Status GetOutputPartition(RuntimeState *state, const std::string &key, PartitionPair **partition_pair)
#define ADD_TIMER(profile, name)
const std::string & hdfs_base_dir() const
PartitionStatusMap * per_partition_status()
int32_t num_files
Number of files created in this partition.
string PrintId(const TUniqueId &id, const string &separator)
#define COUNTER_ADD(c, v)
std::string partition_name
key1=val1/key2=val2/ etc. Used to identify partitions to the metastore.
std::string DebugString() const
const HdfsPartitionDescriptor * partition_descriptor
The descriptor for this partition.
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
static void UrlEncode(const char *in, int in_len, string *out, bool hive_compat)
static HdfsFsCache * instance()
static IntGauge * NUM_FILES_OPEN_FOR_INSERT
RuntimeProfile::Counter * rows_inserted_counter_
const HdfsPartitionDescriptor * default_partition_
Currently this is the default partition since we don't support multi-format sinks.
hdfsFS hdfs_connection
Connection to hdfs.
hdfsFS hdfs_connection_
Connection to hdfs, established in Open() and closed in Close().
const HdfsTableDescriptor * table_desc_
Descriptor of target table. Set in Prepare().
bool LogError(const ErrorMsg &msg)
const TQueryOptions & query_options() const
void CloseExprs(RuntimeState *state)
void ClosePartitionFile(RuntimeState *state, OutputPartition *partition)
Closes the hdfs file for this partition as well as the writer.
const std::string & location() const
Status CreateNewTmpFile(RuntimeState *state, OutputPartition *output_partition)
std::vector< ExprContext * > partition_key_expr_ctxs_
Exprs of partition keys.
ObjectPool * obj_pool() const
RuntimeProfile::Counter * encode_timer_
Time spent converting tuple to on disk format.
int num_clustering_cols() const
const PartitionIdToDescriptorMap & partition_descriptors() const
virtual Status Prepare(RuntimeState *state)
Prepares output_exprs and partition_key_exprs, and connects to HDFS.
hdfsFile tmp_hdfs_file
Hdfs file at tmp_hdfs_file_name.
static const string & ROOT_PARTITION_KEY
bool overwrite_
Indicates whether the existing partitions should be overwritten.
#define ADD_COUNTER(profile, name, unit)
RuntimeProfile::Counter * files_created_counter_
RuntimeProfile * runtime_profile_
Allocated from runtime state's pool.
const TUniqueId & fragment_instance_id() const
std::pair< OutputPartition *, std::vector< int32_t > > PartitionPair
static void MergeInsertStats(const TInsertStats &src_stats, TInsertStats *dst_stats)
const DescriptorTbl & desc_tbl() const
This class is thread-safe.
void GetHashTblKey(const std::vector< ExprContext * > &ctxs, std::string *key)
const std::vector< std::string > & col_names() const
int64_t num_rows
Records number of rows appended to the current file in this partition.
boost::scoped_ptr< HdfsTableWriter > writer
Table format specific writer functions.
TupleRow * current_row_
Current row from the current RowBatch to output.
const RowDescriptor & row_desc() const
const std::vector< TExpr > & select_list_texprs_
MemTracker * instance_mem_tracker()
RuntimeProfile::Counter * compress_timer_
Time spent compressing data.
bool has_empty_input_batch_
void FreeLocalAllocations()
RuntimeProfile::Counter * hdfs_write_timer_
Time spent writing to hdfs.
virtual Status Send(RuntimeState *state, RowBatch *batch, bool eos)
Append all rows in batch to the temporary Hdfs files corresponding to partitions. ...
Status FinalizePartitionFile(RuntimeState *state, OutputPartition *partition)
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
std::vector< ExprContext * > dynamic_partition_key_expr_ctxs_
Metadata for a single partition inside an Hdfs table.
std::string unique_id_str_
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
boost::scoped_ptr< MemTracker > mem_tracker_
virtual Status Prepare(RuntimeState *state)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
Status InitOutputPartition(RuntimeState *state, const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output_partition)
Initialises the filenames of a given output partition, and opens the temporary file.
string GetHdfsErrorMsg(const string &prefix, const string &file)
virtual std::string DebugString() const
RuntimeProfile::Counter * bytes_written_counter_
const std::string & null_partition_key_value() const
std::string tmp_hdfs_file_name_prefix
virtual Status Open(RuntimeState *state)
PartitionMap partition_keys_to_output_partitions_
const std::vector< TExpr > & partition_key_texprs_
const std::vector< ExprContext * > & partition_key_value_ctxs() const
const RowDescriptor & row_desc_
Row descriptor of row batches passed in Send(). Set in c'tor.
Counter * total_time_counter()
Returns the counter for the total elapsed time.