Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
query-exec-state.cc
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <limits>
18 #include <gutil/strings/substitute.h>
19 
20 #include "exprs/expr.h"
21 #include "exprs/expr-context.h"
22 #include "runtime/row-batch.h"
23 #include "runtime/runtime-state.h"
24 #include "service/impala-server.h"
25 #include "service/frontend.h"
26 #include "service/query-options.h"
27 #include "util/debug-util.h"
28 #include "util/impalad-metrics.h"
29 #include "util/time.h"
30 
31 #include "gen-cpp/CatalogService.h"
32 #include "gen-cpp/CatalogService_types.h"
33 
34 #include <thrift/Thrift.h>
35 
36 #include "common/names.h"
37 
38 using boost::algorithm::join;
39 using namespace apache::hive::service::cli::thrift;
40 using namespace apache::thrift;
41 using namespace beeswax;
42 using namespace strings;
43 
44 DECLARE_int32(catalog_service_port);
45 DECLARE_string(catalog_service_host);
46 DECLARE_bool(enable_rm);
47 DECLARE_int64(max_result_cache_size);
48 
49 namespace impala {
50 
51 // Keys into the info string map of the runtime profile referring to specific
52 // items used by CM for monitoring purposes.
53 static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
54 static const string PER_HOST_VCORES_KEY = "Estimated Per-Host VCores";
55 static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
56 
57 ImpalaServer::QueryExecState::QueryExecState(
58  const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
59  ImpalaServer* server, shared_ptr<SessionState> session)
60  : query_ctx_(query_ctx),
61  last_active_time_(numeric_limits<int64_t>::max()),
62  ref_count_(0L),
63  exec_env_(exec_env),
64  session_(session),
65  schedule_(NULL),
66  coord_(NULL),
67  result_cache_max_size_(-1),
68  profile_(&profile_pool_, "Query"), // assign name w/ id after planning
69  server_profile_(&profile_pool_, "ImpalaServer"),
70  summary_profile_(&profile_pool_, "Summary"),
71  eos_(false),
72  query_state_(beeswax::QueryState::CREATED),
73  current_batch_(NULL),
74  current_batch_row_(0),
75  num_rows_fetched_(0),
76  fetched_rows_(false),
77  frontend_(frontend),
78  parent_server_(server),
79  start_time_(TimestampValue::LocalTime()) {
80  row_materialization_timer_ = ADD_TIMER(&server_profile_, "RowMaterializationTimer");
81  client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer");
85 
86  profile_.set_name("Query (id=" + PrintId(query_id()) + ")");
89  if (session_type() == TSessionType::HIVESERVER2) {
90  summary_profile_.AddInfoString("HiveServer2 Protocol Version",
91  Substitute("V$0", 1 + session->hs2_version));
92  }
94  summary_profile_.AddInfoString("End Time", "");
95  summary_profile_.AddInfoString("Query Type", "N/A");
97  summary_profile_.AddInfoString("Query Status", "OK");
98  summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact */ true));
100  summary_profile_.AddInfoString("Connected User", connected_user());
101  summary_profile_.AddInfoString("Delegated User", do_as_user());
102  summary_profile_.AddInfoString("Network Address",
103  lexical_cast<string>(session_->network_address));
104  summary_profile_.AddInfoString("Default Db", default_db());
105  summary_profile_.AddInfoString("Sql Statement", query_ctx_.request.stmt);
106  summary_profile_.AddInfoString("Coordinator",
108 }
109 
111  DCHECK(wait_thread_.get() == NULL) << "BlockOnWait() needs to be called!";
112 }
113 
115  int64_t max_size) {
116  lock_guard<mutex> l(lock_);
117  DCHECK(result_cache_ == NULL);
118  result_cache_.reset(cache);
119  if (max_size > FLAGS_max_result_cache_size) {
120  return Status(
121  Substitute("Requested result-cache size of $0 exceeds Impala's maximum of $1.",
122  max_size, FLAGS_max_result_cache_size));
123  }
124  result_cache_max_size_ = max_size;
125  return Status::OK;
126 }
127 
128 Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) {
129  MarkActive();
130  exec_request_ = *exec_request;
131 
132  profile_.AddChild(&server_profile_);
133  summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
134  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
135 
136  switch (exec_request->stmt_type) {
137  case TStmtType::QUERY:
138  case TStmtType::DML:
139  DCHECK(exec_request_.__isset.query_exec_request);
140  return ExecQueryOrDmlRequest(exec_request_.query_exec_request);
141  case TStmtType::EXPLAIN: {
142  request_result_set_.reset(new vector<TResultRow>(
143  exec_request_.explain_result.results));
144  return Status::OK;
145  }
146  case TStmtType::DDL: {
147  DCHECK(exec_request_.__isset.catalog_op_request);
148  return ExecDdlRequest();
149  }
150  case TStmtType::LOAD: {
151  DCHECK(exec_request_.__isset.load_data_request);
152  TLoadDataResp response;
154  frontend_->LoadData(exec_request_.load_data_request, &response));
155  request_result_set_.reset(new vector<TResultRow>);
156  request_result_set_->push_back(response.load_summary);
157 
158  // Now refresh the table metadata.
159  TCatalogOpRequest reset_req;
160  reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
161  reset_req.__set_reset_metadata_params(TResetMetadataRequest());
162  reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
163  reset_req.reset_metadata_params.__set_is_refresh(true);
164  reset_req.reset_metadata_params.__set_table_name(
165  exec_request_.load_data_request.table_name);
166  catalog_op_executor_.reset(
167  new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
168  RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
169  RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
170  *catalog_op_executor_->update_catalog_result(),
171  exec_request_.query_options.sync_ddl));
172  return Status::OK;
173  }
174  case TStmtType::SET: {
175  DCHECK(exec_request_.__isset.set_query_option_request);
176  lock_guard<mutex> l(session_->lock);
177  if (exec_request_.set_query_option_request.__isset.key) {
178  // "SET key=value" updates the session query options.
179  DCHECK(exec_request_.set_query_option_request.__isset.value);
181  exec_request_.set_query_option_request.key,
182  exec_request_.set_query_option_request.value,
183  &session_->default_query_options));
184  } else {
185  // "SET" returns a table of all query options.
186  map<string, string> config;
188  session_->default_query_options, &config);
189  vector<string> keys, values;
190  map<string, string>::const_iterator itr = config.begin();
191  for (; itr != config.end(); ++itr) {
192  keys.push_back(itr->first);
193  values.push_back(itr->second);
194  }
195  SetResultSet(keys, values);
196  }
197  return Status::OK;
198  }
199  default:
200  stringstream errmsg;
201  errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type;
202  return Status(errmsg.str());
203  }
204 }
205 
207  const TCatalogOpRequest& catalog_op) {
208  switch (catalog_op.op_type) {
209  case TCatalogOpType::USE: {
210  lock_guard<mutex> l(session_->lock);
211  session_->database = exec_request_.catalog_op_request.use_db_params.db;
212  return Status::OK;
213  }
214  case TCatalogOpType::SHOW_TABLES: {
215  const TShowTablesParams* params = &catalog_op.show_tables_params;
216  // A NULL pattern means match all tables. However, Thrift string types can't
217  // be NULL in C++, so we have to test if it's set rather than just blindly
218  // using the value.
219  const string* table_name =
220  params->__isset.show_pattern ? &(params->show_pattern) : NULL;
221  TGetTablesResult table_names;
222  RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name,
223  &query_ctx_.session, &table_names));
224  SetResultSet(table_names.tables);
225  return Status::OK;
226  }
227  case TCatalogOpType::SHOW_DBS: {
228  const TShowDbsParams* params = &catalog_op.show_dbs_params;
229  TGetDbsResult db_names;
230  const string* db_pattern =
231  params->__isset.show_pattern ? (&params->show_pattern) : NULL;
233  frontend_->GetDbNames(db_pattern, &query_ctx_.session, &db_names));
234  SetResultSet(db_names.dbs);
235  return Status::OK;
236  }
237  case TCatalogOpType::SHOW_DATA_SRCS: {
238  const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params;
239  TGetDataSrcsResult result;
240  const string* pattern =
241  params->__isset.show_pattern ? (&params->show_pattern) : NULL;
243  frontend_->GetDataSrcMetadata(pattern, &result));
244  SetResultSet(result.data_src_names, result.locations, result.class_names,
245  result.api_versions);
246  return Status::OK;
247  }
248  case TCatalogOpType::SHOW_STATS: {
249  const TShowStatsParams& params = catalog_op.show_stats_params;
250  TResultSet response;
251  RETURN_IF_ERROR(frontend_->GetStats(params, &response));
252  // Set the result set and its schema from the response.
253  request_result_set_.reset(new vector<TResultRow>(response.rows));
254  result_metadata_ = response.schema;
255  return Status::OK;
256  }
257  case TCatalogOpType::SHOW_FUNCTIONS: {
258  const TShowFunctionsParams* params = &catalog_op.show_fns_params;
259  TGetFunctionsResult functions;
260  const string* fn_pattern =
261  params->__isset.show_pattern ? (&params->show_pattern) : NULL;
262  RETURN_IF_ERROR(frontend_->GetFunctions(
263  params->category, params->db, fn_pattern, &query_ctx_.session, &functions));
264  SetResultSet(functions.fn_ret_types, functions.fn_signatures);
265  return Status::OK;
266  }
267  case TCatalogOpType::SHOW_ROLES: {
268  const TShowRolesParams& params = catalog_op.show_roles_params;
269  if (params.is_admin_op) {
270  // Verify the user has privileges to perform this operation by checking against
271  // the Sentry Service (via the Catalog Server).
272  catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
273  &server_profile_));
274 
275  TSentryAdminCheckRequest req;
276  req.__set_header(TCatalogServiceRequestHeader());
277  req.header.__set_requesting_user(effective_user());
278  RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
279  }
280 
281  // If we have made it here, the user has privileges to execute this operation.
282  // Return the results.
283  TShowRolesResult result;
284  RETURN_IF_ERROR(frontend_->ShowRoles(params, &result));
285  SetResultSet(result.role_names);
286  return Status::OK;
287  }
288  case TCatalogOpType::SHOW_GRANT_ROLE: {
289  const TShowGrantRoleParams& params = catalog_op.show_grant_role_params;
290  if (params.is_admin_op) {
291  // Verify the user has privileges to perform this operation by checking against
292  // the Sentry Service (via the Catalog Server).
293  catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
294  &server_profile_));
295 
296  TSentryAdminCheckRequest req;
297  req.__set_header(TCatalogServiceRequestHeader());
298  req.header.__set_requesting_user(effective_user());
299  RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
300  }
301 
302  TResultSet response;
303  RETURN_IF_ERROR(frontend_->GetRolePrivileges(params, &response));
304  // Set the result set and its schema from the response.
305  request_result_set_.reset(new vector<TResultRow>(response.rows));
306  result_metadata_ = response.schema;
307  return Status::OK;
308  }
309  case TCatalogOpType::DESCRIBE: {
310  TDescribeTableResult response;
311  RETURN_IF_ERROR(frontend_->DescribeTable(catalog_op.describe_table_params,
312  &response));
313  // Set the result set
314  request_result_set_.reset(new vector<TResultRow>(response.results));
315  return Status::OK;
316  }
317  case TCatalogOpType::SHOW_CREATE_TABLE: {
318  string response;
319  RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params,
320  &response));
321  SetResultSet(vector<string>(1, response));
322  return Status::OK;
323  }
324  case TCatalogOpType::SHOW_FILES: {
325  TResultSet response;
326  RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response));
327  // Set the result set and its schema from the response.
328  request_result_set_.reset(new vector<TResultRow>(response.rows));
329  result_metadata_ = response.schema;
330  return Status::OK;
331  }
332  default: {
333  stringstream ss;
334  ss << "Unexpected TCatalogOpType: " << catalog_op.op_type;
335  return Status(ss.str());
336  }
337  }
338 }
339 
341  const TQueryExecRequest& query_exec_request) {
342  // we always need at least one plan fragment
343  DCHECK_GT(query_exec_request.fragments.size(), 0);
344 
345  if (query_exec_request.__isset.query_plan) {
346  stringstream plan_ss;
347  // Add some delimiters to make it clearer where the plan
348  // begins and the profile ends
349  plan_ss << "\n----------------\n"
350  << query_exec_request.query_plan
351  << "----------------";
352  summary_profile_.AddInfoString("Plan", plan_ss.str());
353  }
354  // Add info strings consumed by CM: Estimated mem/vcores and tables missing stats.
355  if (query_exec_request.__isset.per_host_mem_req) {
356  stringstream ss;
357  ss << query_exec_request.per_host_mem_req;
358  summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
359  }
360  if (query_exec_request.__isset.per_host_vcores) {
361  stringstream ss;
362  ss << query_exec_request.per_host_vcores;
363  summary_profile_.AddInfoString(PER_HOST_VCORES_KEY, ss.str());
364  }
365  if (!query_exec_request.query_ctx.__isset.parent_query_id &&
366  query_exec_request.query_ctx.__isset.tables_missing_stats &&
367  !query_exec_request.query_ctx.tables_missing_stats.empty()) {
368  stringstream ss;
369  const vector<TTableName>& tbls = query_exec_request.query_ctx.tables_missing_stats;
370  for (int i = 0; i < tbls.size(); ++i) {
371  if (i != 0) ss << ",";
372  ss << tbls[i].db_name << "." << tbls[i].table_name;
373  }
374  summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
375  }
376 
377  // If desc_tbl is not set, query has SELECT with no FROM. In that
378  // case, the query can only have a single fragment, and that fragment needs to be
379  // executed by the coordinator. This check confirms that.
380  // If desc_tbl is set, the query may or may not have a coordinator fragment.
381  bool has_coordinator_fragment =
382  query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
383  DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
384 
385  if (FLAGS_enable_rm) {
386  DCHECK(exec_env_->resource_broker() != NULL);
387  }
388  schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
389  exec_request_.query_options, effective_user(), &summary_profile_, query_events_));
390  coord_.reset(new Coordinator(exec_env_, query_events_));
391  Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
392  summary_profile_.AddInfoString("Request Pool", schedule_->request_pool());
393  if (FLAGS_enable_rm) {
394  if (status.ok()) {
395  stringstream reservation_request_ss;
396  reservation_request_ss << schedule_->reservation_request();
397  summary_profile_.AddInfoString("Resource reservation request",
398  reservation_request_ss.str());
399  }
400  }
401 
402  {
403  lock_guard<mutex> l(lock_);
404  RETURN_IF_ERROR(UpdateQueryStatus(status));
405  }
406 
407  if (FLAGS_enable_rm && schedule_->HasReservation()) {
408  // Add the granted reservation to the query profile.
409  stringstream reservation_ss;
410  reservation_ss << *schedule_->reservation();
411  summary_profile_.AddInfoString("Granted resource reservation", reservation_ss.str());
412  query_events_->MarkEvent("Resources reserved");
413  }
414  status = coord_->Exec(*schedule_, &output_expr_ctxs_);
415  {
416  lock_guard<mutex> l(lock_);
417  RETURN_IF_ERROR(UpdateQueryStatus(status));
418  }
419 
420  profile_.AddChild(coord_->query_profile());
421  return Status::OK;
422 }
423 
425  string op_type = catalog_op_type() == TCatalogOpType::DDL ?
426  PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type());
427  summary_profile_.AddInfoString("DDL Type", op_type);
428 
429  if (catalog_op_type() != TCatalogOpType::DDL &&
430  catalog_op_type() != TCatalogOpType::RESET_METADATA) {
431  Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request);
432  lock_guard<mutex> l(lock_);
433  return UpdateQueryStatus(status);
434  }
435 
436  if (ddl_type() == TDdlType::COMPUTE_STATS) {
437  TComputeStatsParams& compute_stats_params =
438  exec_request_.catalog_op_request.ddl_params.compute_stats_params;
439  // Add child queries for computing table and column stats.
440  if (compute_stats_params.__isset.tbl_stats_query) {
441  child_queries_.push_back(
442  ChildQuery(compute_stats_params.tbl_stats_query, this, parent_server_));
443  }
444  if (compute_stats_params.__isset.col_stats_query) {
445  child_queries_.push_back(
446  ChildQuery(compute_stats_params.col_stats_query, this, parent_server_));
447  }
448  if (child_queries_.size() > 0) ExecChildQueriesAsync();
449  return Status::OK;
450  }
451 
452  catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
453  &server_profile_));
454  Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request);
455  {
456  lock_guard<mutex> l(lock_);
457  RETURN_IF_ERROR(UpdateQueryStatus(status));
458  }
459 
460  // If this is a CTAS request, there will usually be more work to do
461  // after executing the CREATE TABLE statement (the INSERT portion of the operation).
462  // The exception is if the user specified IF NOT EXISTS and the table already
463  // existed, in which case we do not execute the INSERT.
464  if (catalog_op_type() == TCatalogOpType::DDL &&
465  ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT &&
466  !catalog_op_executor_->ddl_exec_response()->new_table_created) {
467  DCHECK(exec_request_.catalog_op_request.
468  ddl_params.create_table_params.if_not_exists);
469  return Status::OK;
470  }
471 
472  // Add newly created table to catalog cache.
473  RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
474  *catalog_op_executor_->update_catalog_result(),
475  exec_request_.query_options.sync_ddl));
476 
477  if (catalog_op_type() == TCatalogOpType::DDL &&
478  ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
479  // At this point, the remainder of the CTAS request executes
480  // like a normal DML request. As with other DML requests, it will
481  // wait for another catalog update if any partitions were altered as a result
482  // of the operation.
483  DCHECK(exec_request_.__isset.query_exec_request);
484  RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request));
485  }
486  return Status::OK;
487 }
488 
490  MarkActive();
491  // Make sure we join on wait_thread_ before we finish (and especially before this object
492  // is destroyed).
493  BlockOnWait();
494  unique_lock<mutex> l(lock_);
495  end_time_ = TimestampValue::LocalTime();
496  summary_profile_.AddInfoString("End Time", end_time().DebugString());
497  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
498  query_events_->MarkEvent("Unregister query");
499 
500  if (coord_.get() != NULL) {
501  Expr::Close(output_expr_ctxs_, coord_->runtime_state());
502  // Release any reserved resources.
503  Status status = exec_env_->scheduler()->Release(schedule_.get());
504  if (!status.ok()) {
505  LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
506  << " because of error: " << status.GetDetail();
507  }
508  }
509 
510  // Update result set cache metrics, and update mem limit accounting.
511  ClearResultCache();
512 }
513 
514 Status ImpalaServer::QueryExecState::Exec(const TMetadataOpRequest& exec_request) {
515  TResultSet metadata_op_result;
516  // Like the other Exec(), fill out as much profile information as we're able to.
517  summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
518  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
519  RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request,
520  &metadata_op_result));
521  result_metadata_ = metadata_op_result.schema;
522  request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows));
523  return Status::OK;
524 }
525 
527  wait_thread_.reset(new Thread(
528  "query-exec-state", "wait-thread", &ImpalaServer::QueryExecState::Wait, this));
529 }
530 
532  if (wait_thread_.get() != NULL) {
533  wait_thread_->Join();
534  wait_thread_.reset();
535  }
536 }
537 
539  // block until results are ready
540  Status status = WaitInternal();
541  {
542  lock_guard<mutex> l(lock_);
543  if (returns_result_set()) {
544  query_events()->MarkEvent("Rows available");
545  } else {
546  query_events()->MarkEvent("Request finished");
547  }
548  UpdateQueryStatus(status);
549  }
550  if (status.ok()) {
551  UpdateQueryState(QueryState::FINISHED);
552  }
553 }
554 
556  // Explain requests have already populated the result set. Nothing to do here.
557  if (exec_request_.stmt_type == TStmtType::EXPLAIN) {
558  MarkInactive();
559  return Status::OK;
560  }
561 
562  RETURN_IF_ERROR(WaitForChildQueries());
563  if (coord_.get() != NULL) {
564  RETURN_IF_ERROR(coord_->Wait());
565  RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, coord_->runtime_state()));
566  RETURN_IF_ERROR(UpdateCatalog());
567  }
568 
569  if (ddl_type() == TDdlType::COMPUTE_STATS && child_queries_.size() > 0) {
570  RETURN_IF_ERROR(UpdateTableAndColumnStats());
571  }
572 
573  if (!returns_result_set()) {
574  // Queries that do not return a result are finished at this point. This includes
575  // DML operations and a subset of the DDL operations.
576  eos_ = true;
577  } else {
578  if (ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
579  SetCreateTableAsSelectResultSet();
580  }
581  }
582  // Rows are available now (for SELECT statement), so start the 'wait' timer that tracks
583  // how long Impala waits for the client to fetch rows. For other statements, track the
584  // time until a Close() is received.
585  MarkInactive();
586  return Status::OK;
587 }
588 
590  QueryResultSet* fetched_rows) {
591  // Pause the wait timer, since the client has instructed us to do work on its behalf.
592  MarkActive();
593 
594  // ImpalaServer::FetchInternal has already taken our lock_
595  UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows));
596 
597  MarkInactive();
598  return query_status_;
599 }
600 
602  // No result caching for this query. Restart is invalid.
603  if (result_cache_max_size_ <= 0) {
604  return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR,
605  "Restarting of fetch requires enabling of query result caching."));
606  }
607  // The cache overflowed on a previous fetch.
608  if (result_cache_.get() == NULL) {
609  stringstream ss;
610  ss << "The query result cache exceeded its limit of " << result_cache_max_size_
611  << " rows. Restarting the fetch is not possible.";
612  return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str()));
613  }
614  // Reset fetch state to start over.
615  eos_ = false;
616  num_rows_fetched_ = 0;
617  return Status::OK;
618 }
619 
620 void ImpalaServer::QueryExecState::UpdateQueryState(QueryState::type query_state) {
621  lock_guard<mutex> l(lock_);
622  if (query_state_ < query_state) query_state_ = query_state;
623 }
624 
626  // Preserve the first non-ok status
627  if (!status.ok() && query_status_.ok()) {
628  query_state_ = QueryState::EXCEPTION;
629  query_status_ = status;
630  summary_profile_.AddInfoString("Query Status", query_status_.GetDetail());
631  }
632 
633  return status;
634 }
635 
637  QueryResultSet* fetched_rows) {
638  DCHECK(query_state_ != QueryState::EXCEPTION);
639 
640  if (eos_) return Status::OK;
641 
642  if (request_result_set_ != NULL) {
643  query_state_ = QueryState::FINISHED;
644  int num_rows = 0;
645  const vector<TResultRow>& all_rows = (*(request_result_set_.get()));
646  // max_rows <= 0 means no limit
647  while ((num_rows < max_rows || max_rows <= 0)
648  && num_rows_fetched_ < all_rows.size()) {
649  fetched_rows->AddOneRow(all_rows[num_rows_fetched_]);
650  ++num_rows_fetched_;
651  ++num_rows;
652  }
653  eos_ = (num_rows_fetched_ == all_rows.size());
654  return Status::OK;
655  }
656 
657  int32_t num_rows_fetched_from_cache = 0;
658  if (result_cache_max_size_ > 0 && result_cache_ != NULL) {
659  // Satisfy the fetch from the result cache if possible.
660  int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows;
661  num_rows_fetched_from_cache =
662  fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size);
663  num_rows_fetched_ += num_rows_fetched_from_cache;
664  if (num_rows_fetched_from_cache >= max_rows) return Status::OK;
665  }
666 
667  // List of expr values to hold evaluated rows from the query
668  vector<void*> result_row;
669  result_row.resize(output_expr_ctxs_.size());
670 
671  // List of scales for floating point values in result_row
672  vector<int> scales;
673  scales.resize(result_row.size());
674 
675  if (coord_ == NULL) {
676  // Query with LIMIT 0.
677  query_state_ = QueryState::FINISHED;
678  eos_ = true;
679  return Status::OK;
680  }
681 
682  query_state_ = QueryState::FINISHED; // results will be ready after this call
683  // Fetch the next batch if we've returned the current batch entirely
684  if (current_batch_ == NULL || current_batch_row_ >= current_batch_->num_rows()) {
685  RETURN_IF_ERROR(FetchNextBatch());
686  }
687  if (current_batch_ == NULL) return Status::OK;
688 
689  // Maximum number of rows to be fetched from the coord.
690  int32_t max_coord_rows = max_rows;
691  if (max_rows > 0) {
692  DCHECK_LE(num_rows_fetched_from_cache, max_rows);
693  max_coord_rows = max_rows - num_rows_fetched_from_cache;
694  }
695  {
696  SCOPED_TIMER(row_materialization_timer_);
697  // Convert the available rows, limited by max_coord_rows
698  int available = current_batch_->num_rows() - current_batch_row_;
699  int fetched_count = available;
700  // max_coord_rows <= 0 means no limit
701  if (max_coord_rows > 0 && max_coord_rows < available) fetched_count = max_coord_rows;
702  for (int i = 0; i < fetched_count; ++i) {
703  TupleRow* row = current_batch_->GetRow(current_batch_row_);
704  RETURN_IF_ERROR(GetRowValue(row, &result_row, &scales));
705  RETURN_IF_ERROR(fetched_rows->AddOneRow(result_row, scales));
706  ++num_rows_fetched_;
707  ++current_batch_row_;
708  }
709  }
710  ExprContext::FreeLocalAllocations(output_expr_ctxs_);
711 
712  // Update the result cache if necessary.
713  if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) {
714  int rows_fetched_from_coord = fetched_rows->size() - num_rows_fetched_from_cache;
715  if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) {
716  // Set the cache to NULL to indicate that adding the rows fetched from the coord
717  // would exceed the bound of the cache, and therefore, RestartFetch() should fail.
718  ClearResultCache();
719  return Status::OK;
720  }
721 
722  // We guess the size of the cache after adding fetched_rows by looking at the size of
723  // fetched_rows itself, and using this estimate to confirm that the memtracker will
724  // allow us to use this much extra memory. In fact, this might be an overestimate, as
725  // the size of two result sets combined into one is not always the size of both result
726  // sets added together (the best example is the null bitset for each column: it might
727  // have only one entry in each result set, and as a result consume two bytes, but when
728  // the result sets are combined, only one byte is needed). Therefore after we add the
729  // new result set into the cache, we need to fix up the memory consumption to the
730  // actual levels to ensure we don't 'leak' bytes that we aren't using.
731  int64_t before = result_cache_->ByteSize();
732 
733  // Upper-bound on memory required to add fetched_rows to the cache.
734  int64_t delta_bytes =
735  fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size());
736  MemTracker* query_mem_tracker = coord_->query_mem_tracker();
737  // Count the cached rows towards the mem limit.
738  if (!query_mem_tracker->TryConsume(delta_bytes)) {
739  return coord_->runtime_state()->SetMemLimitExceeded(
740  query_mem_tracker, delta_bytes);
741  }
742  // Append all rows fetched from the coordinator into the cache.
743  int num_rows_added = result_cache_->AddRows(
744  fetched_rows, num_rows_fetched_from_cache, fetched_rows->size());
745 
746  int64_t after = result_cache_->ByteSize();
747 
748  // Confirm that this was not an underestimate of the memory required.
749  DCHECK_GE(before + delta_bytes, after)
750  << "Combined result sets consume more memory than both individually "
751  << Substitute("(before: $0, delta_bytes: $1, after: $2)",
752  before, delta_bytes, after);
753 
754  // Fix up the tracked values
755  if (before + delta_bytes > after) {
756  query_mem_tracker->Release(before + delta_bytes - after);
757  delta_bytes = after - before;
758  }
759 
760  // Update result set cache metrics.
761  ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added);
762  ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes);
763  }
764 
765  return Status::OK;
766 }
767 
769  vector<int>* scales) {
770  DCHECK(result->size() >= output_expr_ctxs_.size());
771  for (int i = 0; i < output_expr_ctxs_.size(); ++i) {
772  (*result)[i] = output_expr_ctxs_[i]->GetValue(row);
773  (*scales)[i] = output_expr_ctxs_[i]->root()->output_scale();
774  }
775  return Status::OK;
776 }
777 
779  // Cancel and close child queries before cancelling parent.
780  BOOST_FOREACH(ChildQuery& child_query, child_queries_) {
781  child_query.Cancel();
782  }
783 
784  // If the query is completed or cancelled, no need to cancel.
785  if (eos_ || query_state_ == QueryState::EXCEPTION) return;
786 
787  if (cause != NULL) {
788  UpdateQueryStatus(*cause);
789  query_events_->MarkEvent("Cancelled");
790  query_state_ = QueryState::EXCEPTION;
791  }
792  if (coord_.get() != NULL) coord_->Cancel(cause);
793 }
794 
796  if (!exec_request().__isset.query_exec_request ||
797  exec_request().query_exec_request.stmt_type != TStmtType::DML) {
798  return Status::OK;
799  }
800 
801  query_events_->MarkEvent("DML data written");
802  SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer"));
803 
804  TQueryExecRequest query_exec_request = exec_request().query_exec_request;
805  if (query_exec_request.__isset.finalize_params) {
806  const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
807  TUpdateCatalogRequest catalog_update;
808  catalog_update.__set_header(TCatalogServiceRequestHeader());
809  catalog_update.header.__set_requesting_user(effective_user());
810  if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
811  VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
812  << query_id() << ")";
813  } else {
814  // TODO: We track partitions written to, not created, which means
815  // that we do more work than is necessary, because written-to
816  // partitions don't always require a metastore change.
817  VLOG_QUERY << "Updating metastore with " << catalog_update.created_partitions.size()
818  << " altered partitions ("
819  << join (catalog_update.created_partitions, ", ") << ")";
820 
821  catalog_update.target_table = finalize_params.table_name;
822  catalog_update.db_name = finalize_params.table_db;
823 
824  Status cnxn_status;
825  const TNetworkAddress& address =
826  MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
828  exec_env_->catalogd_client_cache(), address, &cnxn_status);
829  RETURN_IF_ERROR(cnxn_status);
830 
831  VLOG_QUERY << "Executing FinalizeDml() using CatalogService";
832  TUpdateCatalogResponse resp;
834  client.DoRpc(&CatalogServiceClient::UpdateCatalog, catalog_update, &resp));
835 
836  Status status(resp.result.status);
837  if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail();
838  RETURN_IF_ERROR(status);
839  RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
840  exec_request_.query_options.sync_ddl));
841  }
842  }
843  query_events_->MarkEvent("DML Metastore update finished");
844  return Status::OK;
845 }
846 
848  DCHECK(!eos_);
849  DCHECK(coord_.get() != NULL);
850 
851  // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_
852  // ensures that we do not call coord_->GetNext() multiple times concurrently.
853  lock_.unlock();
854  Status status = coord_->GetNext(&current_batch_, coord_->runtime_state());
855  lock_.lock();
856  if (!status.ok()) return status;
857 
858  // Check if query status has changed during GetNext() call
859  if (!query_status_.ok()) {
860  current_batch_ = NULL;
861  return query_status_;
862  }
863 
864  current_batch_row_ = 0;
865  eos_ = current_batch_ == NULL;
866  return Status::OK;
867 }
868 
869 void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& results) {
870  request_result_set_.reset(new vector<TResultRow>);
871  request_result_set_->resize(results.size());
872  for (int i = 0; i < results.size(); ++i) {
873  (*request_result_set_.get())[i].__isset.colVals = true;
874  (*request_result_set_.get())[i].colVals.resize(1);
875  (*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]);
876  }
877 }
878 
879 void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& col1,
880  const vector<string>& col2) {
881  DCHECK_EQ(col1.size(), col2.size());
882 
883  request_result_set_.reset(new vector<TResultRow>);
884  request_result_set_->resize(col1.size());
885  for (int i = 0; i < col1.size(); ++i) {
886  (*request_result_set_.get())[i].__isset.colVals = true;
887  (*request_result_set_.get())[i].colVals.resize(2);
888  (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
889  (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
890  }
891 }
892 
893 void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& col1,
894  const vector<string>& col2, const vector<string>& col3, const vector<string>& col4) {
895  DCHECK_EQ(col1.size(), col2.size());
896  DCHECK_EQ(col1.size(), col3.size());
897  DCHECK_EQ(col1.size(), col4.size());
898 
899  request_result_set_.reset(new vector<TResultRow>);
900  request_result_set_->resize(col1.size());
901  for (int i = 0; i < col1.size(); ++i) {
902  (*request_result_set_.get())[i].__isset.colVals = true;
903  (*request_result_set_.get())[i].colVals.resize(4);
904  (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
905  (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
906  (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]);
907  (*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]);
908  }
909 }
910 
912  DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
913  int64_t total_num_rows_inserted = 0;
914  // There will only be rows inserted in the case a new table was created as part of this
915  // operation.
916  if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
917  DCHECK(coord_.get());
918  BOOST_FOREACH(
919  const PartitionStatusMap::value_type& p, coord_->per_partition_status()) {
920  total_num_rows_inserted += p.second.num_appended_rows;
921  }
922  }
923  const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);
924  VLOG_QUERY << summary_msg;
925  vector<string> results(1, summary_msg);
926  SetResultSet(results);
927 }
928 
930  client_wait_sw_.Start();
931  lock_guard<mutex> l(expiration_data_lock_);
932  last_active_time_ = UnixMillis();
933  DCHECK(ref_count_ > 0) << "Invalid MarkInactive()";
934  --ref_count_;
935 }
936 
938  client_wait_sw_.Stop();
939  int64_t elapsed_time = client_wait_sw_.ElapsedTime();
940  client_wait_timer_->Set(elapsed_time);
941  lock_guard<mutex> l(expiration_data_lock_);
942  last_active_time_ = UnixMillis();
943  ++ref_count_;
944 }
945 
947  DCHECK_GE(child_queries_.size(), 1);
948  DCHECK_LE(child_queries_.size(), 2);
949  catalog_op_executor_.reset(
950  new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
951 
952  // If there was no column stats query, pass in empty thrift structures to
953  // ExecComputeStats(). Otherwise pass in the column stats result.
954  TTableSchema col_stats_schema;
955  TRowSet col_stats_data;
956  if (child_queries_.size() > 1) {
957  col_stats_schema = child_queries_[1].result_schema();
958  col_stats_data = child_queries_[1].result_data();
959  }
960 
961  Status status = catalog_op_executor_->ExecComputeStats(
962  exec_request_.catalog_op_request.ddl_params.compute_stats_params,
963  child_queries_[0].result_schema(),
964  child_queries_[0].result_data(),
965  col_stats_schema,
966  col_stats_data);
967  {
968  lock_guard<mutex> l(lock_);
969  RETURN_IF_ERROR(UpdateQueryStatus(status));
970  }
971  RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
972  *catalog_op_executor_->update_catalog_result(),
973  exec_request_.query_options.sync_ddl));
974 
975  // Set the results to be reported to the client.
976  const TDdlExecResponse* ddl_resp = catalog_op_executor_->ddl_exec_response();
977  if (ddl_resp != NULL && ddl_resp->__isset.result_set) {
978  result_metadata_ = ddl_resp->result_set.schema;
979  request_result_set_.reset(new vector<TResultRow>);
980  request_result_set_->assign(
981  ddl_resp->result_set.rows.begin(), ddl_resp->result_set.rows.end());
982  }
983 
984  query_events_->MarkEvent("Metastore update finished");
985  return Status::OK;
986 }
987 
989  DCHECK(child_queries_thread_.get() == NULL);
990  child_queries_thread_.reset(new Thread("query-exec-state", "async child queries",
992 }
993 
995  for (int i = 0; i < child_queries_.size(); ++i) {
996  if (!child_queries_status_.ok()) return;
997  child_queries_status_ = child_queries_[i].ExecAndFetch();
998  }
999 }
1000 
1002  if (child_queries_thread_.get() == NULL) return Status::OK;
1003  child_queries_thread_->Join();
1004  {
1005  lock_guard<mutex> l(lock_);
1007  RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status_));
1008  }
1009  query_events_->MarkEvent("Child queries finished");
1010  return Status::OK;
1011 }
1012 
1014  if (result_cache_ == NULL) return;
1015  // Update result set cache metrics and mem limit accounting.
1016  ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size());
1017  int64_t total_bytes = result_cache_->ByteSize();
1018  ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes);
1019  if (coord_ != NULL) {
1020  DCHECK_NOTNULL(coord_->query_mem_tracker());
1021  coord_->query_mem_tracker()->Release(total_bytes);
1022  }
1023  result_cache_.reset(NULL);
1024 }
1025 
1026 }
Status FetchRows(const int32_t max_rows, QueryResultSet *fetched_rows)
EventSequence * AddEventSequence(const std::string &key)
void TQueryOptionsToMap(const TQueryOptions &query_options, std::map< std::string, std::string > *configuration)
Converts a TQueryOptions struct into a map of key, value pairs.
static IntGauge * RESULTSET_CACHE_TOTAL_NUM_ROWS
const std::string GetDetail() const
Definition: status.cc:184
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
Status ExecLocalCatalogOp(const TCatalogOpRequest &catalog_op)
void AddInfoString(const std::string &key, const std::string &value)
DECLARE_int32(catalog_service_port)
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
bool TryConsume(int64_t bytes)
Definition: mem-tracker.h:163
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
const TUniqueId & query_id() const
Definition: coordinator.h:152
string GetVersionString(bool compact)
Returns "<program short name> version <GetBuildVersion(compact)>".
Definition: debug-util.cc:239
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
const TUniqueId & session_id() const
friend class ChildQuery
std::string PrintQueryState(const beeswax::QueryState::type &type)
boost::mutex lock_
protects all fields below
Definition: coordinator.h:233
#define ADD_TIMER(profile, name)
const std::string & connected_user() const
void UpdateQueryState(beeswax::QueryState::type query_state)
Scheduler * scheduler()
Definition: exec-env.h:96
virtual size_t size()=0
Returns the size of this result set in number of rows.
std::string PrintTCatalogOpType(const TCatalogOpType::type &type)
beeswax::QueryState::type query_state_
virtual Status AddOneRow(const std::vector< void * > &row, const std::vector< int > &scales)=0
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
const std::string & default_db() const
void WaitAsync()
Calls Wait() asynchronously in a thread and returns immediately.
#define SCOPED_TIMER(c)
ResourceBroker * resource_broker()
Definition: exec-env.h:95
std::string PrintTSessionType(const TSessionType::type &type)
static IntGauge * RESULTSET_CACHE_TOTAL_BYTES
boost::shared_ptr< SessionState > session_
Session that this query is from.
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
bool PrepareCatalogUpdate(TUpdateCatalogRequest *catalog_update)
int64_t UnixMillis()
Definition: time.h:51
void MarkEvent(const std::string &label)
static const string PER_HOST_MEM_KEY
#define VLOG_QUERY
Definition: logging.h:57
Status query_status_
Definition: coordinator.h:237
const std::string & effective_user() const
void Start()
Starts the timer without resetting it.
RuntimeProfile::Counter * row_materialization_timer_
DECLARE_bool(enable_rm)
CatalogServiceClientCache * catalogd_client_cache()
Definition: exec-env.h:79
static const string TABLES_MISSING_STATS_KEY
const TNetworkAddress & backend_address() const
Definition: exec-env.h:99
std::string DebugString(const T &val)
Definition: udf-debug.h:27
Status SetResultCache(QueryResultSet *cache, int64_t max_size)
virtual Status Schedule(Coordinator *coord, QuerySchedule *schedule)=0
void SetResultSet(const std::vector< std::string > &results)
Status GetRowValue(TupleRow *row, std::vector< void * > *result, std::vector< int > *scales)
virtual int AddRows(const QueryResultSet *other, int start_idx, int num_rows)=0
std::string PrintTStmtType(const TStmtType::type &type)
Status FetchRowsInternal(const int32_t max_rows, QueryResultSet *fetched_rows)
const TimestampValue & start_time() const
This class is thread-safe.
Definition: mem-tracker.h:61
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:209
const TUniqueId & query_id() const
Status ExecQueryOrDmlRequest(const TQueryExecRequest &query_exec_request)
Status WaitInternal()
Core logic of Wait(). Does not update query_state_/status_.
RuntimeProfile::Counter * client_wait_timer_
Tracks how long we are idle waiting for a client to fetch rows.
std::string PrintTDdlType(const TDdlType::type &type)
int64_t ByteSize()
Returns the approximate size of this result set in bytes.
RuntimeProfile::EventSequence * query_events_
static const string PER_HOST_VCORES_KEY
Status UpdateQueryStatus(const Status &status)
const std::string & do_as_user() const
static const Status OK
Definition: status.h:87
ExecEnv * exec_env_
Definition: coordinator.h:193
Status Exec(TExecRequest *exec_request)
RuntimeProfile::EventSequence * query_events_
Event timeline for this query. Unowned.
Definition: coordinator.h:309
ExecEnv * exec_env_
global, per-server state
void Cancel(const Status *cause=NULL)
DECLARE_string(catalog_service_host)
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
DECLARE_int64(max_result_cache_size)
void SetQueryOption(TImpalaQueryOptions::type opt, const T &opt_val, TExecuteStatementReq *exec_stmt_req)
Definition: child-query.cc:102
bool ok() const
Definition: status.h:172
Status UpdateCatalog()
Gather and publish all required updates to the metastore.
MemTracker * query_mem_tracker()
TQueryCtx query_ctx_
Definition: coordinator.h:198
void set_name(const std::string &name)
static TimestampValue LocalTime()
virtual Status Release(QuerySchedule *schedule)=0
Releases the reserved resources (if any) from the given schedule.
TSessionType::type session_type() const