25 #include "gen-cpp/CatalogService.h"
26 #include "gen-cpp/CatalogService_types.h"
27 #include "gen-cpp/CatalogObjects_types.h"
29 #include <thrift/protocol/TDebugProtocol.h>
30 #include <thrift/Thrift.h>
31 #include <gutil/strings/substitute.h>
34 using namespace impala;
35 using namespace apache::hive::service::cli::thrift;
36 using namespace apache::thrift;
37 using strings::Substitute;
44 DCHECK_NOTNULL(profile_);
47 const TNetworkAddress& address =
51 switch (request.op_type) {
52 case TCatalogOpType::DDL: {
54 DCHECK(request.ddl_params.ddl_type != TDdlType::COMPUTE_STATS);
56 exec_response_.reset(
new TDdlExecResponse());
57 RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::ExecDdl, request.ddl_params,
58 exec_response_.get()));
59 catalog_update_result_.reset(
60 new TCatalogUpdateResult(exec_response_.get()->result));
61 Status status(exec_response_->result.status);
63 if (request.ddl_params.ddl_type == TDdlType::DROP_FUNCTION) {
64 HandleDropFunction(request.ddl_params.drop_fn_params);
65 }
else if (request.ddl_params.ddl_type == TDdlType::DROP_DATA_SOURCE) {
66 HandleDropDataSource(request.ddl_params.drop_data_source_params);
71 case TCatalogOpType::RESET_METADATA: {
72 TResetMetadataResponse response;
74 request.reset_metadata_params, &response));
75 catalog_update_result_.reset(
new TCatalogUpdateResult(response.result));
76 return Status(response.result.status);
79 return Status(Substitute(
"TCatalogOpType: $0 does not support execution against the"
80 " CatalogService", request.op_type));
86 const TComputeStatsParams& compute_stats_params,
87 const TTableSchema& tbl_stats_schema,
const TRowSet& tbl_stats_data,
88 const TTableSchema& col_stats_schema,
const TRowSet& col_stats_data) {
90 TCatalogOpRequest catalog_op_req;
91 catalog_op_req.__isset.ddl_params =
true;
92 catalog_op_req.__set_op_type(TCatalogOpType::DDL);
93 TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params;
94 update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
96 TAlterTableUpdateStatsParams& update_stats_params =
97 update_stats_req.alter_table_params.update_stats_params;
98 update_stats_req.__isset.alter_table_params =
true;
99 update_stats_req.alter_table_params.__set_alter_type(TAlterTableType::UPDATE_STATS);
100 update_stats_req.alter_table_params.__set_table_name(compute_stats_params.table_name);
101 update_stats_req.alter_table_params.__isset.update_stats_params =
true;
102 update_stats_params.__set_table_name(compute_stats_params.table_name);
103 update_stats_params.__set_expect_all_partitions(
104 compute_stats_params.expect_all_partitions);
105 update_stats_params.__set_is_incremental(compute_stats_params.is_incremental);
108 SetTableStats(tbl_stats_schema, tbl_stats_data,
109 compute_stats_params.existing_part_stats, &update_stats_params);
111 if (!col_stats_schema.columns.empty()) {
112 if (compute_stats_params.is_incremental) {
114 ADD_TIMER(profile_,
"FinalizeIncrementalStatsTimer");
117 compute_stats_params.existing_part_stats,
118 compute_stats_params.expected_partitions,
119 col_stats_data, compute_stats_params.num_partition_cols, &update_stats_params);
121 SetColumnStats(col_stats_schema, col_stats_data, &update_stats_params);
131 DCHECK(fe_ != NULL) <<
"FE tests should not be calling this";
133 DCHECK(catalog_update_result_ != NULL);
137 obj.type = TCatalogObjectType::FUNCTION;
138 obj.fn.name = request.fn_name;
139 obj.fn.arg_types = request.arg_types;
140 obj.fn.signature = request.signature;
141 obj.__isset.fn =
true;
142 obj.fn.__isset.signature =
true;
145 Status status = fe_->GetCatalogObject(obj, &fn);
148 VLOG_QUERY <<
"Could not lookup function catalog object: "
149 << apache::thrift::ThriftDebugString(request);
156 if (fn.catalog_version <= catalog_update_result_->version) {
162 DCHECK(fe_ != NULL) <<
"FE tests should not be calling this";
164 DCHECK(catalog_update_result_ != NULL);
168 obj.type = TCatalogObjectType::DATA_SOURCE;
169 obj.data_source.name = request.data_source;
170 obj.__isset.data_source =
true;
173 Status status = fe_->GetCatalogObject(obj, &ds);
176 VLOG_QUERY <<
"Could not lookup data source catalog object: "
177 << apache::thrift::ThriftDebugString(request);
185 if (ds.catalog_version <= catalog_update_result_->version) {
191 const TRowSet& tbl_stats_data,
const vector<TPartitionStats>& existing_part_stats,
192 TAlterTableUpdateStatsParams* params) {
194 long total_num_rows = 0;
196 BOOST_FOREACH(
const TRow& row, tbl_stats_data.rows) {
197 DCHECK_GT(row.colVals.size(), 0);
199 DCHECK(row.colVals[0].__isset.i64Val);
200 int64_t num_rows = row.colVals[0].i64Val.value;
202 vector<string> partition_key_vals;
203 partition_key_vals.reserve(row.colVals.size());
204 for (
int j = 1; j < row.colVals.size(); ++j) {
207 partition_key_vals.push_back(ss.str());
209 params->partition_stats[partition_key_vals].stats.__set_num_rows(num_rows);
210 total_num_rows += num_rows;
213 BOOST_FOREACH(
const TPartitionStats& existing_stats, existing_part_stats) {
214 total_num_rows += existing_stats.stats.num_rows;
217 params->__isset.partition_stats =
true;
220 params->table_stats.__set_num_rows(total_num_rows);
221 params->__isset.table_stats =
true;
225 const TRowSet& col_stats_data, TAlterTableUpdateStatsParams* params) {
227 DCHECK_EQ(1, col_stats_data.rows.size());
228 const TRow& col_stats_row = col_stats_data.rows[0];
234 for (
int i = 0; i < col_stats_row.colVals.size(); i += 4) {
235 TColumnStats col_stats;
236 col_stats.__set_num_distinct_values(col_stats_row.colVals[i].i64Val.value);
237 col_stats.__set_num_nulls(col_stats_row.colVals[i + 1].i64Val.value);
238 col_stats.__set_max_size(col_stats_row.colVals[i + 2].i32Val.value);
239 col_stats.__set_avg_size(col_stats_row.colVals[i + 3].doubleVal.value);
240 params->column_stats[col_stats_schema.columns[i].columnName] = col_stats;
242 params->__isset.column_stats =
true;
246 TCatalogObject* result) {
247 const TNetworkAddress& address =
253 TGetCatalogObjectRequest request;
254 request.__set_object_desc(object_desc);
256 TGetCatalogObjectResponse response;
258 client.DoRpc(&CatalogServiceClient::GetCatalogObject, request, &response));
259 *result = response.catalog_object;
264 TPrioritizeLoadResponse* result) {
265 const TNetworkAddress& address =
270 RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::PrioritizeLoad, req, result));
275 const TNetworkAddress& address =
280 TSentryAdminCheckResponse resp;
281 RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::SentryAdminCheck, req, &resp));
282 return Status(resp.status);
Status GetCatalogObject(const TCatalogObject &object_desc, TCatalogObject *result)
Status Exec(const TCatalogOpRequest &catalog_op)
Executes the given catalog operation against the catalog server.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
#define ADD_TIMER(profile, name)
void PrintTColumnValue(const apache::hive::service::cli::thrift::TColumnValue &colval, std::stringstream *out)
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Status Exec(QuerySchedule &schedule, std::vector< ExprContext * > *output_expr_ctxs)
static void SetTableStats(const apache::hive::service::cli::thrift::TTableSchema &tbl_stats_schema, const apache::hive::service::cli::thrift::TRowSet &tbl_stats_data, const std::vector< TPartitionStats > &existing_part_stats, TAlterTableUpdateStatsParams *params)
void HandleDropFunction(const TDropFunctionParams &)
void RemoveEntry(const std::string &hdfs_lib_file)
Removes the cache entry for 'hdfs_lib_file'.
DECLARE_int32(catalog_service_port)
Status SentryAdminCheck(const TSentryAdminCheckRequest &re)
static LibCache * instance()
static void SetColumnStats(const apache::hive::service::cli::thrift::TTableSchema &col_stats_schema, const apache::hive::service::cli::thrift::TRowSet &col_stats_data, TAlterTableUpdateStatsParams *params)
Status ExecComputeStats(const TComputeStatsParams &compute_stats_params, const apache::hive::service::cli::thrift::TTableSchema &tbl_stats_schema, const apache::hive::service::cli::thrift::TRowSet &tbl_stats_data, const apache::hive::service::cli::thrift::TTableSchema &col_stats_schema, const apache::hive::service::cli::thrift::TRowSet &col_stats_data)
Status PrioritizeLoad(const TPrioritizeLoadRequest &req, TPrioritizeLoadResponse *result)
void HandleDropDataSource(const TDropDataSourceParams &)
DECLARE_string(catalog_service_host)
void FinalizePartitionedColumnStats(const TTableSchema &col_stats_schema, const vector< TPartitionStats > &existing_part_stats, const vector< vector< string > > &expected_partitions, const TRowSet &rowset, int32_t num_partition_cols, TAlterTableUpdateStatsParams *params)