Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-sink.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/data-sink.h"
16 
17 #include <string>
18 #include <map>
19 
20 #include "common/logging.h"
21 #include "exec/hdfs-table-sink.h"
22 #include "exec/hbase-table-sink.h"
23 #include "exec/exec-node.h"
24 #include "exprs/expr.h"
25 #include "gen-cpp/ImpalaInternalService_types.h"
26 #include "gen-cpp/ImpalaInternalService_constants.h"
28 #include "util/container-util.h"
29 
30 #include "common/names.h"
31 
32 namespace impala {
33 
35  const TDataSink& thrift_sink, const vector<TExpr>& output_exprs,
36  const TPlanFragmentExecParams& params,
37  const RowDescriptor& row_desc, scoped_ptr<DataSink>* sink) {
38  DataSink* tmp_sink = NULL;
39  switch (thrift_sink.type) {
40  case TDataSinkType::DATA_STREAM_SINK:
41  if (!thrift_sink.__isset.stream_sink) {
42  return Status("Missing data stream sink.");
43  }
44 
45  // TODO: figure out good buffer size based on size of output row
46  tmp_sink = new DataStreamSender(pool,
47  params.sender_id, row_desc, thrift_sink.stream_sink,
48  params.destinations, 16 * 1024);
49  sink->reset(tmp_sink);
50  break;
51 
52  case TDataSinkType::TABLE_SINK:
53  if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");
54  switch (thrift_sink.table_sink.type) {
55  case TTableSinkType::HDFS:
56  tmp_sink = new HdfsTableSink(row_desc, output_exprs, thrift_sink);
57  sink->reset(tmp_sink);
58  break;
59  case TTableSinkType::HBASE:
60  tmp_sink = new HBaseTableSink(row_desc, output_exprs, thrift_sink);
61  sink->reset(tmp_sink);
62  break;
63  default:
64  stringstream error_msg;
65  const char* str = "Unknown table sink";
66  map<int, const char*>::const_iterator i =
67  _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
68  if (i != _TTableSinkType_VALUES_TO_NAMES.end()) {
69  str = i->second;
70  }
71  error_msg << str << " not implemented.";
72  return Status(error_msg.str());
73  }
74 
75  break;
76  default:
77  stringstream error_msg;
78  map<int, const char*>::const_iterator i =
79  _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
80  const char* str = "Unknown data sink type ";
81  if (i != _TDataSinkType_VALUES_TO_NAMES.end()) {
82  str = i->second;
83  }
84  error_msg << str << " not implemented.";
85  return Status(error_msg.str());
86  }
87  return Status::OK;
88 }
89 
90 void DataSink::MergeInsertStats(const TInsertStats& src_stats,
91  TInsertStats* dst_stats) {
92  dst_stats->bytes_written += src_stats.bytes_written;
93  if (src_stats.__isset.parquet_stats) {
94  if (dst_stats->__isset.parquet_stats) {
95  MergeMapValues<string, int64_t>(src_stats.parquet_stats.per_column_size,
96  &dst_stats->parquet_stats.per_column_size);
97  } else {
98  dst_stats->__set_parquet_stats(src_stats.parquet_stats);
99  }
100  }
101 }
102 
104  const string& prefix) {
105  const char* indent = " ";
106  stringstream ss;
107  ss << prefix;
108  bool first = true;
109  BOOST_FOREACH(const PartitionStatusMap::value_type& val, stats) {
110  if (!first) ss << endl;
111  first = false;
112  ss << "Partition: ";
113 
114  const string& partition_key = val.first;
115  if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) {
116  ss << "Default" << endl;
117  } else {
118  ss << partition_key << endl;
119  }
120  const TInsertStats& stats = val.second.stats;
121  ss << indent << "BytesWritten: "
122  << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
123  if (stats.__isset.parquet_stats) {
124  const TParquetInsertStats& parquet_stats = stats.parquet_stats;
125  ss << endl << indent << "Per Column Sizes:";
126  for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin();
127  i != parquet_stats.per_column_size.end(); ++i) {
128  ss << endl << indent << indent << i->first << ": "
129  << PrettyPrinter::Print(i->second, TUnit::BYTES);
130  }
131  }
132  }
133  return ss.str();
134 }
135 
137  expr_mem_tracker_.reset(
138  new MemTracker(-1, -1, "Data sink", state->instance_mem_tracker(), false));
139  return Status::OK;
140 }
141 
142 } // namespace impala
boost::scoped_ptr< MemTracker > expr_mem_tracker_
Definition: data-sink.h:85
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
Superclass of all data sinks.
Definition: data-sink.h:39
std::map< std::string, TInsertPartitionStatus > PartitionStatusMap
Definition: runtime-state.h:51
ObjectPool pool
static std::string OutputInsertStats(const PartitionStatusMap &stats, const std::string &prefix="")
Outputs the insert stats contained in the map of insert partition updates to a string.
Definition: data-sink.cc:103
static void MergeInsertStats(const TInsertStats &src_stats, TInsertStats *dst_stats)
Definition: data-sink.cc:90
This class is thread-safe.
Definition: mem-tracker.h:61
const RowDescriptor & row_desc() const
MemTracker * instance_mem_tracker()
static const Status OK
Definition: status.h:87
virtual Status Prepare(RuntimeState *state)
Definition: data-sink.cc:136
static Status CreateDataSink(ObjectPool *pool, const TDataSink &thrift_sink, const std::vector< TExpr > &output_exprs, const TPlanFragmentExecParams &params, const RowDescriptor &row_desc, boost::scoped_ptr< DataSink > *sink)
Definition: data-sink.cc:34