Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hbase-table-sink.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hbase-table-sink.h"
16 
17 #include <vector>
18 
19 #include "common/logging.h"
20 #include "exprs/expr.h"
21 #include "exprs/expr-context.h"
22 #include "gen-cpp/ImpalaInternalService_constants.h"
23 
24 #include "common/names.h"
25 
26 namespace impala {
27 
28 const static string& ROOT_PARTITION_KEY =
29  g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
30 
32  const vector<TExpr>& select_list_texprs,
33  const TDataSink& tsink)
34  : table_id_(tsink.table_sink.target_table_id),
35  table_desc_(NULL),
36  hbase_table_writer_(NULL),
37  row_desc_(row_desc),
38  select_list_texprs_(select_list_texprs) {
39 }
40 
41 Status HBaseTableSink::PrepareExprs(RuntimeState* state) {
42  // From the thrift expressions create the real exprs.
43  RETURN_IF_ERROR(Expr::CreateExprTrees(state->obj_pool(), select_list_texprs_,
44  &output_expr_ctxs_));
45  // Prepare the exprs to run.
47  Expr::Prepare(output_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get()));
48  return Status::OK;
49 }
50 
51 Status HBaseTableSink::Prepare(RuntimeState* state) {
52  RETURN_IF_ERROR(DataSink::Prepare(state));
53  runtime_profile_ = state->obj_pool()->Add(
54  new RuntimeProfile(state->obj_pool(), "HbaseTableSink"));
55  SCOPED_TIMER(runtime_profile_->total_time_counter());
56 
57  // Get the hbase table descriptor. The table name will be used.
58  table_desc_ = static_cast<HBaseTableDescriptor*>(
59  state->desc_tbl().GetTableDescriptor(table_id_));
60  // Prepare the expressions.
61  RETURN_IF_ERROR(PrepareExprs(state));
62  // Now that expressions are ready to materialize tuples, create the writer.
63  hbase_table_writer_.reset(
64  new HBaseTableWriter(table_desc_, output_expr_ctxs_, runtime_profile_));
65 
66  // Try and init the table writer. This can create connections to HBase and
67  // to zookeeper.
68  RETURN_IF_ERROR(hbase_table_writer_->Init(state));
69 
70  // Add a 'root partition' status in which to collect insert statistics
71  TInsertPartitionStatus root_status;
72  root_status.__set_num_appended_rows(0L);
73  root_status.__set_stats(TInsertStats());
74  root_status.__set_id(-1L);
75  state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
76 
77  return Status::OK;
78 }
79 
80 Status HBaseTableSink::Open(RuntimeState* state) {
81  return Expr::Open(output_expr_ctxs_, state);
82 }
83 
84 Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch, bool eos) {
85  SCOPED_TIMER(runtime_profile_->total_time_counter());
86  ExprContext::FreeLocalAllocations(output_expr_ctxs_);
88  // Since everything is set up just forward everything to the writer.
89  RETURN_IF_ERROR(hbase_table_writer_->AppendRowBatch(batch));
90  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_appended_rows +=
91  batch->num_rows();
92  return Status::OK;
93 }
94 
95 void HBaseTableSink::Close(RuntimeState* state) {
96  if (closed_) return;
97  SCOPED_TIMER(runtime_profile_->total_time_counter());
98 
99  if (hbase_table_writer_.get() != NULL) {
100  hbase_table_writer_->Close(state);
101  hbase_table_writer_.reset(NULL);
102  }
103  Expr::Close(output_expr_ctxs_, state);
104  closed_ = true;
105 }
106 
107 } // namespace impala
int num_rows() const
Definition: row-batch.h:215
TableDescriptor * GetTableDescriptor(TableId id) const
Definition: descriptors.cc:427
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
PartitionStatusMap * per_partition_status()
#define SCOPED_TIMER(c)
HBaseTableSink(const RowDescriptor &row_desc, const std::vector< TExpr > &select_list_texprs, const TDataSink &tsink)
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
static const string & ROOT_PARTITION_KEY
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
const RowDescriptor & row_desc() const
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
Definition: coordinator.h:255
double Send(ThriftClient< NetworkTestServiceClient > *client, int64_t bytes)