Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
catalog-op-executor.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <sstream>
18 
19 #include "exec/incr-stats-util.h"
20 #include "common/status.h"
21 #include "runtime/lib-cache.h"
22 #include "service/impala-server.h"
23 #include "service/hs2-util.h"
24 #include "util/string-parser.h"
25 #include "gen-cpp/CatalogService.h"
26 #include "gen-cpp/CatalogService_types.h"
27 #include "gen-cpp/CatalogObjects_types.h"
28 
29 #include <thrift/protocol/TDebugProtocol.h>
30 #include <thrift/Thrift.h>
31 #include <gutil/strings/substitute.h>
32 
33 #include "common/names.h"
34 using namespace impala;
35 using namespace apache::hive::service::cli::thrift;
36 using namespace apache::thrift;
37 using strings::Substitute;
38 
39 DECLARE_int32(catalog_service_port);
40 DECLARE_string(catalog_service_host);
41 
42 Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
43  Status status;
44  DCHECK_NOTNULL(profile_);
45  RuntimeProfile::Counter* exec_timer = ADD_TIMER(profile_, "CatalogOpExecTimer");
46  SCOPED_TIMER(exec_timer);
47  const TNetworkAddress& address =
48  MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
49  CatalogServiceConnection client(env_->catalogd_client_cache(), address, &status);
50  RETURN_IF_ERROR(status);
51  switch (request.op_type) {
52  case TCatalogOpType::DDL: {
53  // Compute stats stmts must be executed via ExecComputeStats().
54  DCHECK(request.ddl_params.ddl_type != TDdlType::COMPUTE_STATS);
55 
56  exec_response_.reset(new TDdlExecResponse());
57  RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::ExecDdl, request.ddl_params,
58  exec_response_.get()));
59  catalog_update_result_.reset(
60  new TCatalogUpdateResult(exec_response_.get()->result));
61  Status status(exec_response_->result.status);
62  if (status.ok()) {
63  if (request.ddl_params.ddl_type == TDdlType::DROP_FUNCTION) {
64  HandleDropFunction(request.ddl_params.drop_fn_params);
65  } else if (request.ddl_params.ddl_type == TDdlType::DROP_DATA_SOURCE) {
66  HandleDropDataSource(request.ddl_params.drop_data_source_params);
67  }
68  }
69  return status;
70  }
71  case TCatalogOpType::RESET_METADATA: {
72  TResetMetadataResponse response;
73  RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::ResetMetadata,
74  request.reset_metadata_params, &response));
75  catalog_update_result_.reset(new TCatalogUpdateResult(response.result));
76  return Status(response.result.status);
77  }
78  default: {
79  return Status(Substitute("TCatalogOpType: $0 does not support execution against the"
80  " CatalogService", request.op_type));
81  }
82  }
83 }
84 
86  const TComputeStatsParams& compute_stats_params,
87  const TTableSchema& tbl_stats_schema, const TRowSet& tbl_stats_data,
88  const TTableSchema& col_stats_schema, const TRowSet& col_stats_data) {
89  // Create a new DDL request to alter the table's statistics.
90  TCatalogOpRequest catalog_op_req;
91  catalog_op_req.__isset.ddl_params = true;
92  catalog_op_req.__set_op_type(TCatalogOpType::DDL);
93  TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params;
94  update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
95 
96  TAlterTableUpdateStatsParams& update_stats_params =
97  update_stats_req.alter_table_params.update_stats_params;
98  update_stats_req.__isset.alter_table_params = true;
99  update_stats_req.alter_table_params.__set_alter_type(TAlterTableType::UPDATE_STATS);
100  update_stats_req.alter_table_params.__set_table_name(compute_stats_params.table_name);
101  update_stats_req.alter_table_params.__isset.update_stats_params = true;
102  update_stats_params.__set_table_name(compute_stats_params.table_name);
103  update_stats_params.__set_expect_all_partitions(
104  compute_stats_params.expect_all_partitions);
105  update_stats_params.__set_is_incremental(compute_stats_params.is_incremental);
106 
107  // Fill the alteration request based on the child-query results.
108  SetTableStats(tbl_stats_schema, tbl_stats_data,
109  compute_stats_params.existing_part_stats, &update_stats_params);
110  // col_stats_schema and col_stats_data will be empty if there was no column stats query.
111  if (!col_stats_schema.columns.empty()) {
112  if (compute_stats_params.is_incremental) {
113  RuntimeProfile::Counter* incremental_finalize_timer =
114  ADD_TIMER(profile_, "FinalizeIncrementalStatsTimer");
115  SCOPED_TIMER(incremental_finalize_timer);
116  FinalizePartitionedColumnStats(col_stats_schema,
117  compute_stats_params.existing_part_stats,
118  compute_stats_params.expected_partitions,
119  col_stats_data, compute_stats_params.num_partition_cols, &update_stats_params);
120  } else {
121  SetColumnStats(col_stats_schema, col_stats_data, &update_stats_params);
122  }
123  }
124 
125  // Execute the 'alter table update stats' request.
126  RETURN_IF_ERROR(Exec(catalog_op_req));
127  return Status::OK;
128 }
129 
130 void CatalogOpExecutor::HandleDropFunction(const TDropFunctionParams& request) {
131  DCHECK(fe_ != NULL) << "FE tests should not be calling this";
132  // Can only be called after successfully processing a catalog update.
133  DCHECK(catalog_update_result_ != NULL);
134 
135  // Lookup in the local catalog the metadata for the function.
136  TCatalogObject obj;
137  obj.type = TCatalogObjectType::FUNCTION;
138  obj.fn.name = request.fn_name;
139  obj.fn.arg_types = request.arg_types;
140  obj.fn.signature = request.signature;
141  obj.__isset.fn = true;
142  obj.fn.__isset.signature = true;
143 
144  TCatalogObject fn;
145  Status status = fe_->GetCatalogObject(obj, &fn);
146  if (!status.ok()) {
147  // This can happen if the function was dropped by another impalad.
148  VLOG_QUERY << "Could not lookup function catalog object: "
149  << apache::thrift::ThriftDebugString(request);
150  return;
151  }
152  // This function may have been dropped and re-created. To avoid removing the re-created
153  // function's entry from the cache verify the existing function has a catalog
154  // version <= the dropped version. This may happen if the update from the statestore
155  // gets applied *before* the result of a direct-DDL drop function command.
156  if (fn.catalog_version <= catalog_update_result_->version) {
157  LibCache::instance()->RemoveEntry(fn.fn.hdfs_location);
158  }
159 }
160 
161 void CatalogOpExecutor::HandleDropDataSource(const TDropDataSourceParams& request) {
162  DCHECK(fe_ != NULL) << "FE tests should not be calling this";
163  // Can only be called after successfully processing a catalog update.
164  DCHECK(catalog_update_result_ != NULL);
165 
166  // Lookup in the local catalog the metadata for the data source.
167  TCatalogObject obj;
168  obj.type = TCatalogObjectType::DATA_SOURCE;
169  obj.data_source.name = request.data_source;
170  obj.__isset.data_source = true;
171 
172  TCatalogObject ds;
173  Status status = fe_->GetCatalogObject(obj, &ds);
174  if (!status.ok()) {
175  // This can happen if the data source was dropped by another impalad.
176  VLOG_QUERY << "Could not lookup data source catalog object: "
177  << apache::thrift::ThriftDebugString(request);
178  return;
179  }
180  // This data source may have been dropped and re-created. To avoid removing the
181  // re-created data source's entry from the cache verify the existing data source has a
182  // catalog version <= the dropped version. This may happen if the update from the
183  // statestore gets applied *before* the result of a direct-DDL drop data source
184  // command.
185  if (ds.catalog_version <= catalog_update_result_->version) {
186  LibCache::instance()->RemoveEntry(ds.data_source.hdfs_location);
187  }
188 }
189 
190 void CatalogOpExecutor::SetTableStats(const TTableSchema& tbl_stats_schema,
191  const TRowSet& tbl_stats_data, const vector<TPartitionStats>& existing_part_stats,
192  TAlterTableUpdateStatsParams* params) {
193  // Accumulate total number of rows in the table.
194  long total_num_rows = 0;
195  // Set per-partition stats.
196  BOOST_FOREACH(const TRow& row, tbl_stats_data.rows) {
197  DCHECK_GT(row.colVals.size(), 0);
198  // The first column is the COUNT(*) expr of the original query.
199  DCHECK(row.colVals[0].__isset.i64Val);
200  int64_t num_rows = row.colVals[0].i64Val.value;
201  // The remaining columns are partition columns that the results are grouped by.
202  vector<string> partition_key_vals;
203  partition_key_vals.reserve(row.colVals.size());
204  for (int j = 1; j < row.colVals.size(); ++j) {
205  stringstream ss;
206  PrintTColumnValue(row.colVals[j], &ss);
207  partition_key_vals.push_back(ss.str());
208  }
209  params->partition_stats[partition_key_vals].stats.__set_num_rows(num_rows);
210  total_num_rows += num_rows;
211  }
212 
213  BOOST_FOREACH(const TPartitionStats& existing_stats, existing_part_stats) {
214  total_num_rows += existing_stats.stats.num_rows;
215  }
216 
217  params->__isset.partition_stats = true;
218 
219  // Set per-table stats.
220  params->table_stats.__set_num_rows(total_num_rows);
221  params->__isset.table_stats = true;
222 }
223 
224 void CatalogOpExecutor::SetColumnStats(const TTableSchema& col_stats_schema,
225  const TRowSet& col_stats_data, TAlterTableUpdateStatsParams* params) {
226  // Expect exactly one result row.
227  DCHECK_EQ(1, col_stats_data.rows.size());
228  const TRow& col_stats_row = col_stats_data.rows[0];
229 
230  // Set per-column stats. For a column at position i in its source table,
231  // the NDVs and the number of NULLs are at position i and i + 1 of the
232  // col_stats_row, respectively. Positions i + 2 and i + 3 contain the max/avg
233  // length for string columns, and -1 for non-string columns.
234  for (int i = 0; i < col_stats_row.colVals.size(); i += 4) {
235  TColumnStats col_stats;
236  col_stats.__set_num_distinct_values(col_stats_row.colVals[i].i64Val.value);
237  col_stats.__set_num_nulls(col_stats_row.colVals[i + 1].i64Val.value);
238  col_stats.__set_max_size(col_stats_row.colVals[i + 2].i32Val.value);
239  col_stats.__set_avg_size(col_stats_row.colVals[i + 3].doubleVal.value);
240  params->column_stats[col_stats_schema.columns[i].columnName] = col_stats;
241  }
242  params->__isset.column_stats = true;
243 }
244 
245 Status CatalogOpExecutor::GetCatalogObject(const TCatalogObject& object_desc,
246  TCatalogObject* result) {
247  const TNetworkAddress& address =
248  MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
249  Status status;
250  CatalogServiceConnection client(env_->catalogd_client_cache(), address, &status);
251  RETURN_IF_ERROR(status);
252 
253  TGetCatalogObjectRequest request;
254  request.__set_object_desc(object_desc);
255 
256  TGetCatalogObjectResponse response;
258  client.DoRpc(&CatalogServiceClient::GetCatalogObject, request, &response));
259  *result = response.catalog_object;
260  return Status::OK;
261 }
262 
263 Status CatalogOpExecutor::PrioritizeLoad(const TPrioritizeLoadRequest& req,
264  TPrioritizeLoadResponse* result) {
265  const TNetworkAddress& address =
266  MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
267  Status status;
268  CatalogServiceConnection client(env_->catalogd_client_cache(), address, &status);
269  RETURN_IF_ERROR(status);
270  RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::PrioritizeLoad, req, result));
271  return Status::OK;
272 }
273 
274 Status CatalogOpExecutor::SentryAdminCheck(const TSentryAdminCheckRequest& req) {
275  const TNetworkAddress& address =
276  MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
277  Status cnxn_status;
278  CatalogServiceConnection client(env_->catalogd_client_cache(), address, &cnxn_status);
279  RETURN_IF_ERROR(cnxn_status);
280  TSentryAdminCheckResponse resp;
281  RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::SentryAdminCheck, req, &resp));
282  return Status(resp.status);
283 }
Status GetCatalogObject(const TCatalogObject &object_desc, TCatalogObject *result)
Status Exec(const TCatalogOpRequest &catalog_op)
Executes the given catalog operation against the catalog server.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
#define ADD_TIMER(profile, name)
void PrintTColumnValue(const apache::hive::service::cli::thrift::TColumnValue &colval, std::stringstream *out)
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
#define SCOPED_TIMER(c)
Status Exec(QuerySchedule &schedule, std::vector< ExprContext * > *output_expr_ctxs)
static void SetTableStats(const apache::hive::service::cli::thrift::TTableSchema &tbl_stats_schema, const apache::hive::service::cli::thrift::TRowSet &tbl_stats_data, const std::vector< TPartitionStats > &existing_part_stats, TAlterTableUpdateStatsParams *params)
void HandleDropFunction(const TDropFunctionParams &)
#define VLOG_QUERY
Definition: logging.h:57
void RemoveEntry(const std::string &hdfs_lib_file)
Removes the cache entry for 'hdfs_lib_file'.
Definition: lib-cache.cc:232
DECLARE_int32(catalog_service_port)
Status SentryAdminCheck(const TSentryAdminCheckRequest &re)
static LibCache * instance()
Definition: lib-cache.h:63
static void SetColumnStats(const apache::hive::service::cli::thrift::TTableSchema &col_stats_schema, const apache::hive::service::cli::thrift::TRowSet &col_stats_data, TAlterTableUpdateStatsParams *params)
Status ExecComputeStats(const TComputeStatsParams &compute_stats_params, const apache::hive::service::cli::thrift::TTableSchema &tbl_stats_schema, const apache::hive::service::cli::thrift::TRowSet &tbl_stats_data, const apache::hive::service::cli::thrift::TTableSchema &col_stats_schema, const apache::hive::service::cli::thrift::TRowSet &col_stats_data)
static const Status OK
Definition: status.h:87
Status PrioritizeLoad(const TPrioritizeLoadRequest &req, TPrioritizeLoadResponse *result)
void HandleDropDataSource(const TDropDataSourceParams &)
DECLARE_string(catalog_service_host)
bool ok() const
Definition: status.h:172
void FinalizePartitionedColumnStats(const TTableSchema &col_stats_schema, const vector< TPartitionStats > &existing_part_stats, const vector< vector< string > > &expected_partitions, const TRowSet &rowset, int32_t num_partition_cols, TAlterTableUpdateStatsParams *params)