25 #include "gen-cpp/ImpalaInternalService_types.h"
26 #include "gen-cpp/ImpalaInternalService_constants.h"
35 const TDataSink& thrift_sink,
const vector<TExpr>& output_exprs,
36 const TPlanFragmentExecParams& params,
39 switch (thrift_sink.type) {
40 case TDataSinkType::DATA_STREAM_SINK:
41 if (!thrift_sink.__isset.stream_sink) {
42 return Status(
"Missing data stream sink.");
47 params.sender_id, row_desc, thrift_sink.stream_sink,
48 params.destinations, 16 * 1024);
49 sink->reset(tmp_sink);
52 case TDataSinkType::TABLE_SINK:
53 if (!thrift_sink.__isset.table_sink)
return Status(
"Missing table sink.");
54 switch (thrift_sink.table_sink.type) {
55 case TTableSinkType::HDFS:
56 tmp_sink =
new HdfsTableSink(row_desc, output_exprs, thrift_sink);
57 sink->reset(tmp_sink);
59 case TTableSinkType::HBASE:
60 tmp_sink =
new HBaseTableSink(row_desc, output_exprs, thrift_sink);
61 sink->reset(tmp_sink);
64 stringstream error_msg;
65 const char* str =
"Unknown table sink";
66 map<int, const char*>::const_iterator i =
67 _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
68 if (i != _TTableSinkType_VALUES_TO_NAMES.end()) {
71 error_msg << str <<
" not implemented.";
72 return Status(error_msg.str());
77 stringstream error_msg;
78 map<int, const char*>::const_iterator i =
79 _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
80 const char* str =
"Unknown data sink type ";
81 if (i != _TDataSinkType_VALUES_TO_NAMES.end()) {
84 error_msg << str <<
" not implemented.";
85 return Status(error_msg.str());
91 TInsertStats* dst_stats) {
92 dst_stats->bytes_written += src_stats.bytes_written;
93 if (src_stats.__isset.parquet_stats) {
94 if (dst_stats->__isset.parquet_stats) {
95 MergeMapValues<string, int64_t>(src_stats.parquet_stats.per_column_size,
96 &dst_stats->parquet_stats.per_column_size);
98 dst_stats->__set_parquet_stats(src_stats.parquet_stats);
104 const string& prefix) {
105 const char* indent =
" ";
109 BOOST_FOREACH(
const PartitionStatusMap::value_type& val, stats) {
110 if (!first) ss << endl;
114 const string& partition_key = val.first;
115 if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) {
116 ss <<
"Default" << endl;
118 ss << partition_key << endl;
120 const TInsertStats& stats = val.second.stats;
121 ss << indent <<
"BytesWritten: "
123 if (stats.__isset.parquet_stats) {
124 const TParquetInsertStats& parquet_stats = stats.parquet_stats;
125 ss << endl << indent <<
"Per Column Sizes:";
126 for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin();
127 i != parquet_stats.per_column_size.end(); ++i) {
128 ss << endl << indent << indent << i->first <<
": "
boost::scoped_ptr< MemTracker > expr_mem_tracker_
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
Superclass of all data sinks.
std::map< std::string, TInsertPartitionStatus > PartitionStatusMap
static std::string OutputInsertStats(const PartitionStatusMap &stats, const std::string &prefix="")
Outputs the insert stats contained in the map of insert partition updates to a string.
static void MergeInsertStats(const TInsertStats &src_stats, TInsertStats *dst_stats)
This class is thread-safe.
const RowDescriptor & row_desc() const
MemTracker * instance_mem_tracker()
virtual Status Prepare(RuntimeState *state)
static Status CreateDataSink(ObjectPool *pool, const TDataSink &thrift_sink, const std::vector< TExpr > &output_exprs, const TPlanFragmentExecParams ¶ms, const RowDescriptor &row_desc, boost::scoped_ptr< DataSink > *sink)