Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
query-exec-state.h
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_SERVICE_QUERY_EXEC_STATE_H
16 #define IMPALA_SERVICE_QUERY_EXEC_STATE_H
17 
18 #include "common/status.h"
20 #include "util/runtime-profile.h"
22 #include "service/child-query.h"
24 #include "gen-cpp/Frontend_types.h"
25 #include "service/impala-server.h"
26 #include "gen-cpp/Frontend_types.h"
27 
28 #include <boost/thread.hpp>
29 #include <boost/unordered_set.hpp>
30 #include <vector>
31 
32 namespace impala {
33 
34 class ExecEnv;
35 class Coordinator;
36 class RuntimeState;
37 class RowBatch;
38 class Expr;
39 class TupleRow;
40 class Frontend;
41 class QueryExecStateCleaner;
42 
56  public:
57  QueryExecState(const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
58  ImpalaServer* server, boost::shared_ptr<ImpalaServer::SessionState> session);
59 
61 
65  Status Exec(TExecRequest* exec_request);
66 
70  Status Exec(const TMetadataOpRequest& exec_request);
71 
75  void Wait();
76 
78  void WaitAsync();
79 
83  void BlockOnWait();
84 
91  Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows);
92 
101 
104  void UpdateQueryState(beeswax::QueryState::type query_state);
105 
114  Status UpdateQueryStatus(const Status& status);
115 
121  void Cancel(const Status* cause = NULL);
122 
125  void Done();
126 
130  Status SetResultCache(QueryResultSet* cache, int64_t max_size);
131 
132  ImpalaServer::SessionState* session() const { return session_.get(); }
138  const std::string& effective_user() const {
139  return do_as_user().empty() ? connected_user() : do_as_user();
140  }
141  const std::string& connected_user() const { return query_ctx_.session.connected_user; }
142  const std::string& do_as_user() const { return session_->do_as_user; }
143  TSessionType::type session_type() const { return query_ctx_.session.session_type; }
144  const TUniqueId& session_id() const { return query_ctx_.session.session_id; }
145  const std::string& default_db() const { return query_ctx_.session.database; }
146  bool eos() const { return eos_; }
147  Coordinator* coord() const { return coord_.get(); }
148  QuerySchedule* schedule() { return schedule_.get(); }
149  int num_rows_fetched() const { return num_rows_fetched_; }
150  void set_fetched_rows() { fetched_rows_ = true; }
151  bool fetched_rows() const { return fetched_rows_; }
152  bool returns_result_set() { return !result_metadata_.columns.empty(); }
153  const TResultSetMetadata* result_metadata() { return &result_metadata_; }
154  const TUniqueId& query_id() const { return query_ctx_.query_id; }
155  const TExecRequest& exec_request() const { return exec_request_; }
156  TStmtType::type stmt_type() const { return exec_request_.stmt_type; }
157  TCatalogOpType::type catalog_op_type() const {
158  return exec_request_.catalog_op_request.op_type;
159  }
160  TDdlType::type ddl_type() const {
161  return exec_request_.catalog_op_request.ddl_params.ddl_type;
162  }
163  boost::mutex* lock() { return &lock_; }
164  boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
165  const beeswax::QueryState::type query_state() const { return query_state_; }
166  void set_query_state(beeswax::QueryState::type state) { query_state_ = state; }
167  const Status& query_status() const { return query_status_; }
168  void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; }
169  const RuntimeProfile& profile() const { return profile_; }
171  const TimestampValue& start_time() const { return start_time_; }
172  const TimestampValue& end_time() const { return end_time_; }
173  const std::string& sql_stmt() const { return query_ctx_.request.stmt; }
174  const TQueryOptions& query_options() const { return query_ctx_.request.query_options; }
176  TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
177 
178  const std::vector<std::string>& GetAnalysisWarnings() const {
179  return exec_request_.analysis_warnings;
180  }
181 
182  inline int64_t last_active() const {
183  boost::lock_guard<boost::mutex> l(expiration_data_lock_);
184  return last_active_time_;
185  }
186 
188  inline bool is_active() const {
189  boost::lock_guard<boost::mutex> l(expiration_data_lock_);
190  return ref_count_ > 0;
191  }
192 
195 
196  private:
197  const TQueryCtx query_ctx_;
198 
203  boost::mutex fetch_rows_lock_;
204 
208  mutable boost::mutex expiration_data_lock_;
210 
214  uint32_t ref_count_;
215 
217  boost::scoped_ptr<Thread> wait_thread_;
218 
219  boost::mutex lock_; // protects all following fields
221 
223  boost::shared_ptr<SessionState> session_;
224 
226  boost::scoped_ptr<QuerySchedule> schedule_;
227 
229  boost::scoped_ptr<Coordinator> coord_;
230 
232  boost::scoped_ptr<CatalogOpExecutor> catalog_op_executor_;
233 
236  boost::scoped_ptr<std::vector<TResultRow> > request_result_set_;
237 
243  boost::scoped_ptr<QueryResultSet> result_cache_;
244 
247 
249  boost::scoped_ptr<RuntimeState> local_runtime_state_;
251 
260  //
268 
273 
275  std::vector<ExprContext*> output_expr_ctxs_;
276  bool eos_; // if true, there are no more rows to return
277  beeswax::QueryState::type query_state_;
279  TExecRequest exec_request_;
280 
281  TResultSetMetadata result_metadata_; // metadata for select query
282  RowBatch* current_batch_; // the current row batch; only applicable if coord is set
283  int current_batch_row_; // number of rows fetched within the current batch
284  int num_rows_fetched_; // number of rows fetched by client for the entire query
285 
289 
292 
296 
299 
301  std::vector<ChildQuery> child_queries_;
302 
308  boost::scoped_ptr<Thread> child_queries_thread_;
309 
312  Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op);
313 
316  void MarkInactive();
317 
320  void MarkActive();
321 
327  Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request);
328 
332 
335 
338 
341  Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows);
342 
349 
353  Status GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales);
354 
357 
360  void SetResultSet(const std::vector<std::string>& results);
361  void SetResultSet(const std::vector<std::string>& col1,
362  const std::vector<std::string>& col2);
363  void SetResultSet(const std::vector<std::string>& col1,
364  const std::vector<std::string>& col2, const std::vector<std::string>& col3,
365  const std::vector<std::string>& col4);
366 
371 
379 
382  void ExecChildQueriesAsync();
383 
388  void ExecChildQueries();
389 
395 
398  void ClearResultCache();
399 };
400 
401 }
402 #endif
const TimestampValue & end_time() const
Status FetchRows(const int32_t max_rows, QueryResultSet *fetched_rows)
const TExecRequest & exec_request() const
Status ExecLocalCatalogOp(const TCatalogOpRequest &catalog_op)
RuntimeProfile::EventSequence * query_events() const
boost::scoped_ptr< QuerySchedule > schedule_
Resource assignment determined by scheduler. Owned by obj_pool_.
const TQueryOptions & query_options() const
Frontend * frontend_
To get access to UpdateCatalog, LOAD, and DDL methods. Not owned.
void set_result_metadata(const TResultSetMetadata &md)
const std::vector< std::string > & GetAnalysisWarnings() const
const TUniqueId & session_id() const
std::vector< ExprContext * > output_expr_ctxs_
TUniqueId parent_query_id() const
Returns 0:0 if this is a root query.
const std::string & connected_user() const
void UpdateQueryState(beeswax::QueryState::type query_state)
beeswax::QueryState::type query_state_
const std::string & default_db() const
void WaitAsync()
Calls Wait() asynchronously in a thread and returns immediately.
boost::scoped_ptr< Thread > wait_thread_
Thread for asynchronously running Wait().
boost::scoped_ptr< Thread > child_queries_thread_
boost::shared_ptr< SessionState > session_
Session that this query is from.
void set_query_state(beeswax::QueryState::type state)
const std::string & effective_user() const
boost::scoped_ptr< RuntimeState > local_runtime_state_
local runtime_state_ in case we don't have a coord_
boost::scoped_ptr< Coordinator > coord_
not set for ddl queries, or queries with "limit 0"
int64_t result_cache_max_size_
Max size of the result_cache_ in number of rows. A value <= 0 means no caching.
RuntimeProfile::Counter * row_materialization_timer_
Status SetResultCache(QueryResultSet *cache, int64_t max_size)
void SetResultSet(const std::vector< std::string > &results)
Status GetRowValue(TupleRow *row, std::vector< void * > *result, std::vector< int > *scales)
boost::scoped_ptr< QueryResultSet > result_cache_
bool is_active() const
Returns true if Impala is actively processing this query.
Status FetchRowsInternal(const int32_t max_rows, QueryResultSet *fetched_rows)
const TimestampValue & start_time() const
const RuntimeProfile & summary_profile() const
const TUniqueId & query_id() const
Status ExecQueryOrDmlRequest(const TQueryExecRequest &query_exec_request)
Status ExecLoadDataRequest()
Executes a LOAD DATA.
ImpalaServer::SessionState * session() const
Status WaitInternal()
Core logic of Wait(). Does not update query_state_/status_.
const beeswax::QueryState::type query_state() const
RuntimeProfile::Counter * client_wait_timer_
Tracks how long we are idle waiting for a client to fetch rows.
RuntimeProfile::EventSequence * query_events_
Status UpdateQueryStatus(const Status &status)
const std::string & do_as_user() const
boost::scoped_ptr< std::vector< TResultRow > > request_result_set_
TStmtType::type stmt_type() const
const std::string & sql_stmt() const
const Status & query_status() const
Status Exec(TExecRequest *exec_request)
TimestampValue start_time_
Start/end time of the query.
const TResultSetMetadata * result_metadata()
std::vector< ChildQuery > child_queries_
List of child queries to be executed on behalf of this query.
void Cancel(const Status *cause=NULL)
boost::scoped_ptr< CatalogOpExecutor > catalog_op_executor_
Runs statements that query or modify the catalog via the CatalogService.
const RuntimeProfile & profile() const
Status UpdateCatalog()
Gather and publish all required updates to the metastore.
QueryExecState(const TQueryCtx &query_ctx, ExecEnv *exec_env, Frontend *frontend, ImpalaServer *server, boost::shared_ptr< ImpalaServer::SessionState > session)
MonotonicStopWatch client_wait_sw_
Timer to track idle time for the above counter.
TCatalogOpType::type catalog_op_type() const
TSessionType::type session_type() const