Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
child-query.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "service/child-query.h"
18 #include "util/debug-util.h"
19 
20 #include "common/names.h"
21 
22 using namespace impala;
23 using namespace apache::hive::service::cli::thrift;
24 
25 namespace impala {
26 
27 const string ChildQuery::PARENT_QUERY_OPT = "impala.parent_query_id";
28 
29 // To detect cancellation of the parent query this function checks IsCancelled() before
30 // any HS2 "RPC" into the impala server. It is important not to hold any locks (in
31 // particular the parent query's lock_) while invoking HS2 functions to avoid deadlock.
33  const TUniqueId& session_id = parent_exec_state_->session_id();
34  VLOG_QUERY << "Executing child query: " << query_ << " in session "
35  << PrintId(session_id);
36 
37  // Create HS2 request and response structs.
38  Status status;
39  TExecuteStatementResp exec_stmt_resp;
40  TExecuteStatementReq exec_stmt_req;
41  ImpalaServer::TUniqueIdToTHandleIdentifier(session_id, session_id,
42  &exec_stmt_req.sessionHandle.sessionId);
43  exec_stmt_req.__set_statement(query_);
44  SetQueryOptions(parent_exec_state_->exec_request().query_options, &exec_stmt_req);
45  exec_stmt_req.confOverlay[PARENT_QUERY_OPT] = PrintId(parent_exec_state_->query_id());
46 
47  // Starting executing of the child query and setting is_running are not made atomic
48  // because holding a lock while calling into the parent_server_ may result in deadlock.
49  // Cancellation is checked immediately after setting is_running_ below.
50  // The order of the following three steps is important:
51  // 1. Start query execution before setting is_running_ to ensure that
52  // a concurrent Cancel() initiated by the parent is a no-op.
53  // 2. Set the hs2_handle_ before is_running_ to ensure there is a proper handle
54  // for Cancel() to use.
55  // 3. Set is_running_ to true. Once is_running_ is set, the child query
56  // can be cancelled via Cancel().
57  RETURN_IF_ERROR(IsCancelled());
58  parent_server_->ExecuteStatement(exec_stmt_resp, exec_stmt_req);
59  hs2_handle_ = exec_stmt_resp.operationHandle;
60  {
61  lock_guard<mutex> l(lock_);
62  is_running_ = true;
63  }
64  status = exec_stmt_resp.status;
65  RETURN_IF_ERROR(status);
66 
67  TGetResultSetMetadataReq meta_req;
68  meta_req.operationHandle = exec_stmt_resp.operationHandle;
69  RETURN_IF_ERROR(IsCancelled());
70  parent_server_->GetResultSetMetadata(meta_resp_, meta_req);
71  status = meta_resp_.status;
72  RETURN_IF_ERROR(status);
73 
74  // Fetch all results.
75  TFetchResultsReq fetch_req;
76  fetch_req.operationHandle = exec_stmt_resp.operationHandle;
77  fetch_req.maxRows = 1024;
78  do {
79  RETURN_IF_ERROR(IsCancelled());
80  parent_server_->FetchResults(fetch_resp_, fetch_req);
81  status = fetch_resp_.status;
82  } while (status.ok() && fetch_resp_.hasMoreRows);
83  RETURN_IF_ERROR(IsCancelled());
84 
85  TCloseOperationResp close_resp;
86  TCloseOperationReq close_req;
87  close_req.operationHandle = exec_stmt_resp.operationHandle;
88  parent_server_->CloseOperation(close_resp, close_req);
89  {
90  lock_guard<mutex> l(lock_);
91  is_running_ = false;
92  }
93  RETURN_IF_ERROR(IsCancelled());
94 
95  // Don't overwrite error from fetch. A failed fetch unregisters the query and we want to
96  // preserve the original error status (e.g., CANCELLED).
97  if (status.ok()) status = close_resp.status;
98  return status;
99 }
100 
101 template <typename T>
102 void SetQueryOption(TImpalaQueryOptions::type opt, const T& opt_val,
103  TExecuteStatementReq* exec_stmt_req) {
104  stringstream opt_val_ss;
105  opt_val_ss << opt_val;
106  map<int, const char*>::const_iterator it =
107  _TImpalaQueryOptions_VALUES_TO_NAMES.find(opt);
108  if (it == _TImpalaQueryOptions_VALUES_TO_NAMES.end()) return;
109  exec_stmt_req->confOverlay[it->second] = opt_val_ss.str();
110  exec_stmt_req->__isset.confOverlay = true;
111 }
112 
113 #define SET_QUERY_OPTION(NAME, ENUM)\
114  if (parent_options.__isset.NAME) {\
115  SetQueryOption(TImpalaQueryOptions::ENUM,\
116  parent_options.NAME, exec_stmt_req);\
117  }
118 
119 void ChildQuery::SetQueryOptions(const TQueryOptions& parent_options,
120  TExecuteStatementReq* exec_stmt_req) {
121  // If this DCHECK is hit then handle the missing query option below.
122  DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),
123  TImpalaQueryOptions::EXEC_SINGLE_NODE_ROWS_THRESHOLD + 1);
124  SET_QUERY_OPTION(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED);
125  SET_QUERY_OPTION(abort_on_error, ABORT_ON_ERROR);
126  SET_QUERY_OPTION(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS);
127  SET_QUERY_OPTION(batch_size, BATCH_SIZE);
128  // Ignore debug actions on child queries because they may cause deadlock.
129  SET_QUERY_OPTION(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT);
130  SET_QUERY_OPTION(disable_cached_reads, DISABLE_CACHED_READS);
131  SET_QUERY_OPTION(disable_outermost_topn, DISABLE_OUTERMOST_TOPN);
132  SET_QUERY_OPTION(disable_codegen, DISABLE_CODEGEN);
133  SET_QUERY_OPTION(explain_level, EXPLAIN_LEVEL);
134  SET_QUERY_OPTION(hbase_cache_blocks, HBASE_CACHE_BLOCKS);
135  SET_QUERY_OPTION(hbase_caching, HBASE_CACHING);
136  SET_QUERY_OPTION(max_errors, MAX_ERRORS);
137  SET_QUERY_OPTION(max_io_buffers, MAX_IO_BUFFERS);
138  SET_QUERY_OPTION(max_scan_range_length, MAX_SCAN_RANGE_LENGTH);
139  SET_QUERY_OPTION(mem_limit, MEM_LIMIT);
140  SET_QUERY_OPTION(num_nodes, NUM_NODES);
141  SET_QUERY_OPTION(num_scanner_threads, NUM_SCANNER_THREADS);
142  SET_QUERY_OPTION(compression_codec, COMPRESSION_CODEC);
143  SET_QUERY_OPTION(parquet_file_size, PARQUET_FILE_SIZE);
144  SET_QUERY_OPTION(request_pool, REQUEST_POOL);
145  SET_QUERY_OPTION(reservation_request_timeout, RESERVATION_REQUEST_TIMEOUT);
146  SET_QUERY_OPTION(sync_ddl, SYNC_DDL);
147  SET_QUERY_OPTION(v_cpu_cores, V_CPU_CORES);
148  SET_QUERY_OPTION(rm_initial_mem, RM_INITIAL_MEM);
149  SET_QUERY_OPTION(query_timeout_s, QUERY_TIMEOUT_S);
150  SET_QUERY_OPTION(max_block_mgr_memory, MAX_BLOCK_MGR_MEMORY);
151  SET_QUERY_OPTION(appx_count_distinct, APPX_COUNT_DISTINCT);
152  SET_QUERY_OPTION(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS);
153  SET_QUERY_OPTION(seq_compression_mode, SEQ_COMPRESSION_MODE);
154  SET_QUERY_OPTION(exec_single_node_rows_threshold,
155  EXEC_SINGLE_NODE_ROWS_THRESHOLD);
156 }
157 
159  // Do not hold lock_ while calling into parent_server_ to avoid deadlock.
160  {
161  lock_guard<mutex> l(lock_);
162  is_cancelled_ = true;
163  if (!is_running_) return;
164  is_running_ = false;
165  }
166  VLOG_QUERY << "Cancelling and closing child query with operation id: "
167  << hs2_handle_.operationId.guid;
168  // Ignore return statuses because they are not actionable.
169  TCancelOperationResp cancel_resp;
170  TCancelOperationReq cancel_req;
171  cancel_req.operationHandle = hs2_handle_;
172  parent_server_->CancelOperation(cancel_resp, cancel_req);
173  TCloseOperationResp close_resp;
174  TCloseOperationReq close_req;
175  close_req.operationHandle = hs2_handle_;
176  parent_server_->CloseOperation(close_resp, close_req);
177 }
178 
180  lock_guard<mutex> l(lock_);
181  if (!is_cancelled_) return Status::OK;
182  return Status::CANCELLED;
183 }
184 
185 }
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
boost::mutex lock_
protects all fields below
Definition: coordinator.h:233
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
const int BATCH_SIZE
void SetQueryOptions(const TQueryOptions &parent_options, apache::hive::service::cli::thrift::TExecuteStatementReq *exec_stmt_req)
Definition: child-query.cc:119
static void TUniqueIdToTHandleIdentifier(const TUniqueId &unique_id, const TUniqueId &secret, apache::hive::service::cli::thrift::THandleIdentifier *handle)
#define VLOG_QUERY
Definition: logging.h:57
Status ExecAndFetch()
Executes this child query through HiveServer2 and fetches all its results.
Definition: child-query.cc:32
static const Status CANCELLED
Definition: status.h:88
static const Status OK
Definition: status.h:87
#define SET_QUERY_OPTION(NAME, ENUM)
Definition: child-query.cc:113
void SetQueryOption(TImpalaQueryOptions::type opt, const T &opt_val, TExecuteStatementReq *exec_stmt_req)
Definition: child-query.cc:102
bool ok() const
Definition: status.h:172
static const string PARENT_QUERY_OPT
Definition: child-query.h:102