18 #include <gutil/strings/substitute.h>
31 #include "gen-cpp/CatalogService.h"
32 #include "gen-cpp/CatalogService_types.h"
34 #include <thrift/Thrift.h>
38 using boost::algorithm::join;
39 using namespace apache::hive::service::cli::thrift;
40 using namespace apache::thrift;
41 using namespace beeswax;
42 using namespace strings;
57 ImpalaServer::QueryExecState::QueryExecState(
61 last_active_time_(numeric_limits<int64_t>::max()),
67 result_cache_max_size_(-1),
68 profile_(&profile_pool_,
"Query"),
69 server_profile_(&profile_pool_,
"ImpalaServer"),
70 summary_profile_(&profile_pool_,
"Summary"),
72 query_state_(beeswax::QueryState::CREATED),
74 current_batch_row_(0),
78 parent_server_(server),
91 Substitute(
"V$0", 1 + session->hs2_version));
103 lexical_cast<string>(
session_->network_address));
111 DCHECK(wait_thread_.get() == NULL) <<
"BlockOnWait() needs to be called!";
116 lock_guard<mutex> l(
lock_);
117 DCHECK(result_cache_ == NULL);
118 result_cache_.reset(cache);
119 if (max_size > FLAGS_max_result_cache_size) {
121 Substitute(
"Requested result-cache size of $0 exceeds Impala's maximum of $1.",
122 max_size, FLAGS_max_result_cache_size));
124 result_cache_max_size_ = max_size;
130 exec_request_ = *exec_request;
132 profile_.AddChild(&server_profile_);
133 summary_profile_.AddInfoString(
"Query Type",
PrintTStmtType(stmt_type()));
134 summary_profile_.AddInfoString(
"Query State",
PrintQueryState(query_state_));
136 switch (exec_request->stmt_type) {
137 case TStmtType::QUERY:
139 DCHECK(exec_request_.__isset.query_exec_request);
140 return ExecQueryOrDmlRequest(exec_request_.query_exec_request);
141 case TStmtType::EXPLAIN: {
142 request_result_set_.reset(
new vector<TResultRow>(
143 exec_request_.explain_result.results));
146 case TStmtType::DDL: {
147 DCHECK(exec_request_.__isset.catalog_op_request);
148 return ExecDdlRequest();
150 case TStmtType::LOAD: {
151 DCHECK(exec_request_.__isset.load_data_request);
152 TLoadDataResp response;
154 frontend_->LoadData(exec_request_.load_data_request, &response));
155 request_result_set_.reset(
new vector<TResultRow>);
156 request_result_set_->push_back(response.load_summary);
159 TCatalogOpRequest reset_req;
160 reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
161 reset_req.__set_reset_metadata_params(TResetMetadataRequest());
162 reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
163 reset_req.reset_metadata_params.__set_is_refresh(
true);
164 reset_req.reset_metadata_params.__set_table_name(
165 exec_request_.load_data_request.table_name);
166 catalog_op_executor_.reset(
170 *catalog_op_executor_->update_catalog_result(),
171 exec_request_.query_options.sync_ddl));
174 case TStmtType::SET: {
175 DCHECK(exec_request_.__isset.set_query_option_request);
176 lock_guard<mutex> l(session_->lock);
177 if (exec_request_.set_query_option_request.__isset.key) {
179 DCHECK(exec_request_.set_query_option_request.__isset.value);
181 exec_request_.set_query_option_request.key,
182 exec_request_.set_query_option_request.value,
183 &session_->default_query_options));
186 map<string, string> config;
188 session_->default_query_options, &config);
189 vector<string> keys, values;
190 map<string, string>::const_iterator itr = config.begin();
191 for (; itr != config.end(); ++itr) {
192 keys.push_back(itr->first);
193 values.push_back(itr->second);
195 SetResultSet(keys, values);
201 errmsg <<
"Unknown exec request stmt type: " << exec_request_.stmt_type;
202 return Status(errmsg.str());
207 const TCatalogOpRequest& catalog_op) {
208 switch (catalog_op.op_type) {
209 case TCatalogOpType::USE: {
210 lock_guard<mutex> l(session_->lock);
211 session_->database = exec_request_.catalog_op_request.use_db_params.db;
214 case TCatalogOpType::SHOW_TABLES: {
215 const TShowTablesParams* params = &catalog_op.show_tables_params;
219 const string* table_name =
220 params->__isset.show_pattern ? &(params->show_pattern) : NULL;
221 TGetTablesResult table_names;
224 SetResultSet(table_names.tables);
227 case TCatalogOpType::SHOW_DBS: {
228 const TShowDbsParams* params = &catalog_op.show_dbs_params;
229 TGetDbsResult db_names;
230 const string* db_pattern =
231 params->__isset.show_pattern ? (¶ms->show_pattern) : NULL;
233 frontend_->GetDbNames(db_pattern, &
query_ctx_.session, &db_names));
234 SetResultSet(db_names.dbs);
237 case TCatalogOpType::SHOW_DATA_SRCS: {
238 const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params;
239 TGetDataSrcsResult result;
240 const string* pattern =
241 params->__isset.show_pattern ? (¶ms->show_pattern) : NULL;
243 frontend_->GetDataSrcMetadata(pattern, &result));
244 SetResultSet(result.data_src_names, result.locations, result.class_names,
245 result.api_versions);
248 case TCatalogOpType::SHOW_STATS: {
249 const TShowStatsParams& params = catalog_op.show_stats_params;
253 request_result_set_.reset(
new vector<TResultRow>(response.rows));
254 result_metadata_ = response.schema;
257 case TCatalogOpType::SHOW_FUNCTIONS: {
258 const TShowFunctionsParams* params = &catalog_op.show_fns_params;
259 TGetFunctionsResult functions;
260 const string* fn_pattern =
261 params->__isset.show_pattern ? (¶ms->show_pattern) : NULL;
263 params->category, params->db, fn_pattern, &
query_ctx_.session, &functions));
264 SetResultSet(functions.fn_ret_types, functions.fn_signatures);
267 case TCatalogOpType::SHOW_ROLES: {
268 const TShowRolesParams& params = catalog_op.show_roles_params;
269 if (params.is_admin_op) {
275 TSentryAdminCheckRequest req;
276 req.__set_header(TCatalogServiceRequestHeader());
277 req.header.__set_requesting_user(effective_user());
283 TShowRolesResult result;
285 SetResultSet(result.role_names);
288 case TCatalogOpType::SHOW_GRANT_ROLE: {
289 const TShowGrantRoleParams& params = catalog_op.show_grant_role_params;
290 if (params.is_admin_op) {
296 TSentryAdminCheckRequest req;
297 req.__set_header(TCatalogServiceRequestHeader());
298 req.header.__set_requesting_user(effective_user());
305 request_result_set_.reset(
new vector<TResultRow>(response.rows));
306 result_metadata_ = response.schema;
309 case TCatalogOpType::DESCRIBE: {
310 TDescribeTableResult response;
311 RETURN_IF_ERROR(frontend_->DescribeTable(catalog_op.describe_table_params,
314 request_result_set_.reset(
new vector<TResultRow>(response.results));
317 case TCatalogOpType::SHOW_CREATE_TABLE: {
319 RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params,
321 SetResultSet(vector<string>(1, response));
324 case TCatalogOpType::SHOW_FILES: {
326 RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response));
328 request_result_set_.reset(
new vector<TResultRow>(response.rows));
329 result_metadata_ = response.schema;
334 ss <<
"Unexpected TCatalogOpType: " << catalog_op.op_type;
341 const TQueryExecRequest& query_exec_request) {
343 DCHECK_GT(query_exec_request.fragments.size(), 0);
345 if (query_exec_request.__isset.query_plan) {
346 stringstream plan_ss;
349 plan_ss <<
"\n----------------\n"
350 << query_exec_request.query_plan
351 <<
"----------------";
352 summary_profile_.AddInfoString(
"Plan", plan_ss.str());
355 if (query_exec_request.__isset.per_host_mem_req) {
357 ss << query_exec_request.per_host_mem_req;
360 if (query_exec_request.__isset.per_host_vcores) {
362 ss << query_exec_request.per_host_vcores;
365 if (!query_exec_request.query_ctx.__isset.parent_query_id &&
366 query_exec_request.query_ctx.__isset.tables_missing_stats &&
367 !query_exec_request.query_ctx.tables_missing_stats.empty()) {
369 const vector<TTableName>& tbls = query_exec_request.query_ctx.tables_missing_stats;
370 for (
int i = 0; i < tbls.size(); ++i) {
371 if (i != 0) ss <<
",";
372 ss << tbls[i].db_name <<
"." << tbls[i].table_name;
381 bool has_coordinator_fragment =
382 query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
383 DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
385 if (FLAGS_enable_rm) {
389 exec_request_.query_options, effective_user(), &summary_profile_,
query_events_));
392 summary_profile_.AddInfoString(
"Request Pool", schedule_->request_pool());
393 if (FLAGS_enable_rm) {
395 stringstream reservation_request_ss;
396 reservation_request_ss << schedule_->reservation_request();
397 summary_profile_.AddInfoString(
"Resource reservation request",
398 reservation_request_ss.str());
403 lock_guard<mutex> l(
lock_);
407 if (FLAGS_enable_rm && schedule_->HasReservation()) {
409 stringstream reservation_ss;
410 reservation_ss << *schedule_->reservation();
411 summary_profile_.AddInfoString(
"Granted resource reservation", reservation_ss.str());
414 status = coord_->Exec(*schedule_, &output_expr_ctxs_);
416 lock_guard<mutex> l(
lock_);
420 profile_.AddChild(coord_->query_profile());
425 string op_type = catalog_op_type() == TCatalogOpType::DDL ?
427 summary_profile_.AddInfoString(
"DDL Type", op_type);
429 if (catalog_op_type() != TCatalogOpType::DDL &&
430 catalog_op_type() != TCatalogOpType::RESET_METADATA) {
431 Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request);
432 lock_guard<mutex> l(
lock_);
433 return UpdateQueryStatus(status);
436 if (ddl_type() == TDdlType::COMPUTE_STATS) {
437 TComputeStatsParams& compute_stats_params =
438 exec_request_.catalog_op_request.ddl_params.compute_stats_params;
440 if (compute_stats_params.__isset.tbl_stats_query) {
441 child_queries_.push_back(
442 ChildQuery(compute_stats_params.tbl_stats_query,
this, parent_server_));
444 if (compute_stats_params.__isset.col_stats_query) {
445 child_queries_.push_back(
446 ChildQuery(compute_stats_params.col_stats_query,
this, parent_server_));
448 if (child_queries_.size() > 0) ExecChildQueriesAsync();
454 Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request);
456 lock_guard<mutex> l(
lock_);
464 if (catalog_op_type() == TCatalogOpType::DDL &&
465 ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT &&
466 !catalog_op_executor_->ddl_exec_response()->new_table_created) {
467 DCHECK(exec_request_.catalog_op_request.
468 ddl_params.create_table_params.if_not_exists);
474 *catalog_op_executor_->update_catalog_result(),
475 exec_request_.query_options.sync_ddl));
477 if (catalog_op_type() == TCatalogOpType::DDL &&
478 ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
483 DCHECK(exec_request_.__isset.query_exec_request);
484 RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request));
494 unique_lock<mutex> l(
lock_);
496 summary_profile_.AddInfoString(
"End Time", end_time().
DebugString());
497 summary_profile_.AddInfoString(
"Query State",
PrintQueryState(query_state_));
500 if (coord_.get() != NULL) {
501 Expr::Close(output_expr_ctxs_, coord_->runtime_state());
505 LOG(WARNING) <<
"Failed to release resources of query " << schedule_->query_id()
506 <<
" because of error: " << status.
GetDetail();
515 TResultSet metadata_op_result;
517 summary_profile_.AddInfoString(
"Query Type",
PrintTStmtType(TStmtType::DDL));
518 summary_profile_.AddInfoString(
"Query State",
PrintQueryState(query_state_));
520 &metadata_op_result));
521 result_metadata_ = metadata_op_result.schema;
522 request_result_set_.reset(
new vector<TResultRow>(metadata_op_result.rows));
527 wait_thread_.reset(
new Thread(
532 if (wait_thread_.get() != NULL) {
533 wait_thread_->Join();
534 wait_thread_.reset();
540 Status status = WaitInternal();
542 lock_guard<mutex> l(
lock_);
543 if (returns_result_set()) {
544 query_events()->MarkEvent(
"Rows available");
546 query_events()->MarkEvent(
"Request finished");
548 UpdateQueryStatus(status);
551 UpdateQueryState(QueryState::FINISHED);
557 if (exec_request_.stmt_type == TStmtType::EXPLAIN) {
563 if (coord_.get() != NULL) {
569 if (ddl_type() == TDdlType::COMPUTE_STATS && child_queries_.size() > 0) {
573 if (!returns_result_set()) {
578 if (ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
579 SetCreateTableAsSelectResultSet();
595 UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows));
603 if (result_cache_max_size_ <= 0) {
605 "Restarting of fetch requires enabling of query result caching."));
608 if (result_cache_.get() == NULL) {
610 ss <<
"The query result cache exceeded its limit of " << result_cache_max_size_
611 <<
" rows. Restarting the fetch is not possible.";
616 num_rows_fetched_ = 0;
621 lock_guard<mutex> l(
lock_);
622 if (query_state_ < query_state) query_state_ = query_state;
628 query_state_ = QueryState::EXCEPTION;
638 DCHECK(query_state_ != QueryState::EXCEPTION);
642 if (request_result_set_ != NULL) {
643 query_state_ = QueryState::FINISHED;
645 const vector<TResultRow>& all_rows = (*(request_result_set_.get()));
647 while ((num_rows < max_rows || max_rows <= 0)
648 && num_rows_fetched_ < all_rows.size()) {
649 fetched_rows->
AddOneRow(all_rows[num_rows_fetched_]);
653 eos_ = (num_rows_fetched_ == all_rows.size());
657 int32_t num_rows_fetched_from_cache = 0;
658 if (result_cache_max_size_ > 0 && result_cache_ != NULL) {
660 int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows;
661 num_rows_fetched_from_cache =
662 fetched_rows->
AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size);
663 num_rows_fetched_ += num_rows_fetched_from_cache;
664 if (num_rows_fetched_from_cache >= max_rows)
return Status::OK;
668 vector<void*> result_row;
669 result_row.resize(output_expr_ctxs_.size());
673 scales.resize(result_row.size());
675 if (coord_ == NULL) {
677 query_state_ = QueryState::FINISHED;
682 query_state_ = QueryState::FINISHED;
684 if (current_batch_ == NULL || current_batch_row_ >= current_batch_->num_rows()) {
687 if (current_batch_ == NULL)
return Status::OK;
690 int32_t max_coord_rows = max_rows;
692 DCHECK_LE(num_rows_fetched_from_cache, max_rows);
693 max_coord_rows = max_rows - num_rows_fetched_from_cache;
698 int available = current_batch_->num_rows() - current_batch_row_;
699 int fetched_count = available;
701 if (max_coord_rows > 0 && max_coord_rows < available) fetched_count = max_coord_rows;
702 for (
int i = 0; i < fetched_count; ++i) {
703 TupleRow* row = current_batch_->GetRow(current_batch_row_);
707 ++current_batch_row_;
713 if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) {
714 int rows_fetched_from_coord = fetched_rows->
size() - num_rows_fetched_from_cache;
715 if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) {
731 int64_t before = result_cache_->ByteSize();
734 int64_t delta_bytes =
735 fetched_rows->
ByteSize(num_rows_fetched_from_cache, fetched_rows->
size());
738 if (!query_mem_tracker->
TryConsume(delta_bytes)) {
739 return coord_->runtime_state()->SetMemLimitExceeded(
740 query_mem_tracker, delta_bytes);
743 int num_rows_added = result_cache_->AddRows(
744 fetched_rows, num_rows_fetched_from_cache, fetched_rows->
size());
746 int64_t after = result_cache_->ByteSize();
749 DCHECK_GE(before + delta_bytes, after)
750 <<
"Combined result sets consume more memory than both individually "
751 << Substitute(
"(before: $0, delta_bytes: $1, after: $2)",
752 before, delta_bytes, after);
755 if (before + delta_bytes > after) {
756 query_mem_tracker->
Release(before + delta_bytes - after);
757 delta_bytes = after - before;
769 vector<int>* scales) {
770 DCHECK(result->size() >= output_expr_ctxs_.size());
771 for (
int i = 0; i < output_expr_ctxs_.size(); ++i) {
772 (*result)[i] = output_expr_ctxs_[i]->GetValue(row);
773 (*scales)[i] = output_expr_ctxs_[i]->root()->output_scale();
780 BOOST_FOREACH(
ChildQuery& child_query, child_queries_) {
785 if (eos_ || query_state_ == QueryState::EXCEPTION)
return;
788 UpdateQueryStatus(*cause);
790 query_state_ = QueryState::EXCEPTION;
792 if (coord_.get() != NULL) coord_->Cancel(cause);
796 if (!exec_request().__isset.query_exec_request ||
797 exec_request().query_exec_request.stmt_type != TStmtType::DML) {
804 TQueryExecRequest query_exec_request = exec_request().query_exec_request;
805 if (query_exec_request.__isset.finalize_params) {
806 const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
807 TUpdateCatalogRequest catalog_update;
808 catalog_update.__set_header(TCatalogServiceRequestHeader());
809 catalog_update.header.__set_requesting_user(effective_user());
811 VLOG_QUERY <<
"No partitions altered, not updating metastore (query id: "
817 VLOG_QUERY <<
"Updating metastore with " << catalog_update.created_partitions.size()
818 <<
" altered partitions ("
819 << join (catalog_update.created_partitions,
", ") <<
")";
821 catalog_update.target_table = finalize_params.table_name;
822 catalog_update.db_name = finalize_params.table_db;
825 const TNetworkAddress& address =
831 VLOG_QUERY <<
"Executing FinalizeDml() using CatalogService";
832 TUpdateCatalogResponse resp;
834 client.DoRpc(&CatalogServiceClient::UpdateCatalog, catalog_update, &resp));
836 Status status(resp.result.status);
837 if (!status.ok()) LOG(ERROR) <<
"ERROR Finalizing DML: " << status.GetDetail();
839 RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
840 exec_request_.query_options.sync_ddl));
849 DCHECK(coord_.get() != NULL);
854 Status status = coord_->GetNext(¤t_batch_, coord_->runtime_state());
856 if (!status.
ok())
return status;
860 current_batch_ = NULL;
864 current_batch_row_ = 0;
865 eos_ = current_batch_ == NULL;
870 request_result_set_.reset(
new vector<TResultRow>);
871 request_result_set_->resize(results.size());
872 for (
int i = 0; i < results.size(); ++i) {
873 (*request_result_set_.get())[i].__isset.colVals =
true;
874 (*request_result_set_.get())[i].colVals.resize(1);
875 (*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]);
880 const vector<string>& col2) {
881 DCHECK_EQ(col1.size(), col2.size());
883 request_result_set_.reset(
new vector<TResultRow>);
884 request_result_set_->resize(col1.size());
885 for (
int i = 0; i < col1.size(); ++i) {
886 (*request_result_set_.get())[i].__isset.colVals =
true;
887 (*request_result_set_.get())[i].colVals.resize(2);
888 (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
889 (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
894 const vector<string>& col2,
const vector<string>& col3,
const vector<string>& col4) {
895 DCHECK_EQ(col1.size(), col2.size());
896 DCHECK_EQ(col1.size(), col3.size());
897 DCHECK_EQ(col1.size(), col4.size());
899 request_result_set_.reset(
new vector<TResultRow>);
900 request_result_set_->resize(col1.size());
901 for (
int i = 0; i < col1.size(); ++i) {
902 (*request_result_set_.get())[i].__isset.colVals =
true;
903 (*request_result_set_.get())[i].colVals.resize(4);
904 (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
905 (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
906 (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]);
907 (*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]);
912 DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
913 int64_t total_num_rows_inserted = 0;
916 if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
917 DCHECK(coord_.get());
919 const PartitionStatusMap::value_type& p, coord_->per_partition_status()) {
920 total_num_rows_inserted += p.second.num_appended_rows;
923 const string& summary_msg = Substitute(
"Inserted $0 row(s)", total_num_rows_inserted);
925 vector<string> results(1, summary_msg);
926 SetResultSet(results);
930 client_wait_sw_.Start();
931 lock_guard<mutex> l(expiration_data_lock_);
933 DCHECK(ref_count_ > 0) <<
"Invalid MarkInactive()";
938 client_wait_sw_.Stop();
939 int64_t elapsed_time = client_wait_sw_.ElapsedTime();
940 client_wait_timer_->Set(elapsed_time);
941 lock_guard<mutex> l(expiration_data_lock_);
947 DCHECK_GE(child_queries_.size(), 1);
948 DCHECK_LE(child_queries_.size(), 2);
949 catalog_op_executor_.reset(
954 TTableSchema col_stats_schema;
955 TRowSet col_stats_data;
956 if (child_queries_.size() > 1) {
957 col_stats_schema = child_queries_[1].result_schema();
958 col_stats_data = child_queries_[1].result_data();
961 Status status = catalog_op_executor_->ExecComputeStats(
962 exec_request_.catalog_op_request.ddl_params.compute_stats_params,
963 child_queries_[0].result_schema(),
964 child_queries_[0].result_data(),
968 lock_guard<mutex> l(
lock_);
972 *catalog_op_executor_->update_catalog_result(),
973 exec_request_.query_options.sync_ddl));
976 const TDdlExecResponse* ddl_resp = catalog_op_executor_->ddl_exec_response();
977 if (ddl_resp != NULL && ddl_resp->__isset.result_set) {
978 result_metadata_ = ddl_resp->result_set.schema;
979 request_result_set_.reset(
new vector<TResultRow>);
980 request_result_set_->assign(
981 ddl_resp->result_set.rows.begin(), ddl_resp->result_set.rows.end());
989 DCHECK(child_queries_thread_.get() == NULL);
990 child_queries_thread_.reset(
new Thread(
"query-exec-state",
"async child queries",
995 for (
int i = 0; i < child_queries_.size(); ++i) {
996 if (!child_queries_status_.ok())
return;
997 child_queries_status_ = child_queries_[i].ExecAndFetch();
1002 if (child_queries_thread_.get() == NULL)
return Status::OK;
1003 child_queries_thread_->Join();
1005 lock_guard<mutex> l(
lock_);
1014 if (result_cache_ == NULL)
return;
1017 int64_t total_bytes = result_cache_->ByteSize();
1019 if (coord_ != NULL) {
1020 DCHECK_NOTNULL(coord_->query_mem_tracker());
1021 coord_->query_mem_tracker()->Release(total_bytes);
1023 result_cache_.reset(NULL);
Status FetchRows(const int32_t max_rows, QueryResultSet *fetched_rows)
EventSequence * AddEventSequence(const std::string &key)
void TQueryOptionsToMap(const TQueryOptions &query_options, std::map< std::string, std::string > *configuration)
Converts a TQueryOptions struct into a map of key, value pairs.
void SetCreateTableAsSelectResultSet()
static IntGauge * RESULTSET_CACHE_TOTAL_NUM_ROWS
const std::string GetDetail() const
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
Status ExecLocalCatalogOp(const TCatalogOpRequest &catalog_op)
void AddInfoString(const std::string &key, const std::string &value)
DECLARE_int32(catalog_service_port)
TODO: Consider allowing fragment IDs as category parameters.
bool TryConsume(int64_t bytes)
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
const TUniqueId & query_id() const
string GetVersionString(bool compact)
Returns "<program short name> version <GetBuildVersion(compact)>".
#define RETURN_IF_ERROR(stmt)
some generally useful macros
const TUniqueId & session_id() const
std::string PrintQueryState(const beeswax::QueryState::type &type)
boost::mutex lock_
protects all fields below
#define ADD_TIMER(profile, name)
const std::string & connected_user() const
void UpdateQueryState(beeswax::QueryState::type query_state)
virtual size_t size()=0
Returns the size of this result set in number of rows.
std::string PrintTCatalogOpType(const TCatalogOpType::type &type)
beeswax::QueryState::type query_state_
virtual Status AddOneRow(const std::vector< void * > &row, const std::vector< int > &scales)=0
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Status UpdateTableAndColumnStats()
string PrintId(const TUniqueId &id, const string &separator)
const std::string & default_db() const
void WaitAsync()
Calls Wait() asynchronously in a thread and returns immediately.
ResourceBroker * resource_broker()
std::string PrintTSessionType(const TSessionType::type &type)
static IntGauge * RESULTSET_CACHE_TOTAL_BYTES
boost::shared_ptr< SessionState > session_
Session that this query is from.
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
bool PrepareCatalogUpdate(TUpdateCatalogRequest *catalog_update)
void MarkEvent(const std::string &label)
static const string PER_HOST_MEM_KEY
const std::string & effective_user() const
void Start()
Starts the timer without resetting it.
RuntimeProfile::Counter * row_materialization_timer_
CatalogServiceClientCache * catalogd_client_cache()
static const string TABLES_MISSING_STATS_KEY
const TNetworkAddress & backend_address() const
Status WaitForChildQueries()
std::string DebugString(const T &val)
Status SetResultCache(QueryResultSet *cache, int64_t max_size)
virtual Status Schedule(Coordinator *coord, QuerySchedule *schedule)=0
void SetResultSet(const std::vector< std::string > &results)
Status GetRowValue(TupleRow *row, std::vector< void * > *result, std::vector< int > *scales)
virtual int AddRows(const QueryResultSet *other, int start_idx, int num_rows)=0
std::string PrintTStmtType(const TStmtType::type &type)
Status FetchRowsInternal(const int32_t max_rows, QueryResultSet *fetched_rows)
const TimestampValue & start_time() const
This class is thread-safe.
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
const TUniqueId & query_id() const
Status ExecQueryOrDmlRequest(const TQueryExecRequest &query_exec_request)
Status WaitInternal()
Core logic of Wait(). Does not update query_state_/status_.
RuntimeProfile summary_profile_
void FreeLocalAllocations()
RuntimeProfile::Counter * client_wait_timer_
Tracks how long we are idle waiting for a client to fetch rows.
std::string PrintTDdlType(const TDdlType::type &type)
int64_t ByteSize()
Returns the approximate size of this result set in bytes.
RuntimeProfile::EventSequence * query_events_
static const string PER_HOST_VCORES_KEY
Status UpdateQueryStatus(const Status &status)
const std::string & do_as_user() const
Status Exec(TExecRequest *exec_request)
RuntimeProfile::EventSequence * query_events_
Event timeline for this query. Unowned.
const TQueryCtx query_ctx_
ExecEnv * exec_env_
global, per-server state
void Cancel(const Status *cause=NULL)
DECLARE_string(catalog_service_host)
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
DECLARE_int64(max_result_cache_size)
void SetQueryOption(TImpalaQueryOptions::type opt, const T &opt_val, TExecuteStatementReq *exec_stmt_req)
Status UpdateCatalog()
Gather and publish all required updates to the metastore.
RuntimeProfile server_profile_
MemTracker * query_mem_tracker()
void set_name(const std::string &name)
static TimestampValue LocalTime()
virtual Status Release(QuerySchedule *schedule)=0
Releases the reserved resources (if any) from the given schedule.
TSessionType::type session_type() const
void ExecChildQueriesAsync()