Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala-hs2-server.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "service/impala-server.h"
17 
18 #include <algorithm>
19 #include <boost/algorithm/string/join.hpp>
20 #include <boost/date_time/posix_time/posix_time_types.hpp>
21 #include <boost/unordered_set.hpp>
22 #include <jni.h>
23 #include <thrift/protocol/TDebugProtocol.h>
24 #include <gtest/gtest.h>
25 #include <boost/foreach.hpp>
26 #include <boost/bind.hpp>
27 #include <boost/algorithm/string.hpp>
28 #include <google/heap-profiler.h>
29 #include <google/malloc_extension.h>
30 #include <gutil/strings/substitute.h>
31 
32 #include "common/logging.h"
33 #include "common/version.h"
34 #include "exprs/expr.h"
35 #include "runtime/raw-value.h"
37 #include "service/query-options.h"
38 #include "util/debug-util.h"
39 #include "rpc/thrift-util.h"
40 #include "util/impalad-metrics.h"
41 #include "service/hs2-util.h"
42 
43 #include "common/names.h"
44 
45 using boost::adopt_lock_t;
46 using boost::algorithm::join;
47 using boost::uuids::uuid;
48 using namespace apache::hive::service::cli::thrift;
49 using namespace apache::hive::service::cli;
50 using namespace apache::thrift;
51 using namespace beeswax; // Converting QueryState
52 using namespace strings;
53 
54 const TProtocolVersion::type MAX_SUPPORTED_HS2_VERSION =
55  TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6;
56 
57 // HiveServer2 error returning macro
58 #define HS2_RETURN_ERROR(return_val, error_msg, error_state) \
59  do { \
60  return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); \
61  return_val.status.__set_errorMessage(error_msg); \
62  return_val.status.__set_sqlState(error_state); \
63  return; \
64  } while (false)
65 
66 #define HS2_RETURN_IF_ERROR(return_val, status, error_state) \
67  do { \
68  if (UNLIKELY(!status.ok())) { \
69  HS2_RETURN_ERROR(return_val, status.GetDetail(), error_state); \
70  return; \
71  } \
72  } while (false)
73 
74 namespace impala {
75 
76 const string IMPALA_RESULT_CACHING_OPT = "impala.resultset.cache.size";
77 
78 // Utility functions for computing the size of HS2 Thrift structs in bytes.
79 static inline
80 int64_t ByteSize(const thrift::TColumnValue& val) {
81  return sizeof(val) + val.stringVal.value.capacity();
82 }
83 
84 static int64_t ByteSize(const thrift::TRow& row) {
85  int64_t bytes = sizeof(row);
86  BOOST_FOREACH(const thrift::TColumnValue& c, row.colVals) {
87  bytes += ByteSize(c);
88  }
89  return bytes;
90 }
91 
92 // Returns the size, in bytes, of a Hive TColumn structure, only taking into account those
93 // values in the range [start_idx, end_idx).
94 static uint32_t TColumnByteSize(const thrift::TColumn& col, uint32_t start_idx,
95  uint32_t end_idx) {
96  DCHECK_LE(start_idx, end_idx);
97  uint32_t num_rows = end_idx - start_idx;
98  if (num_rows == 0) return 0L;
99 
100  if (col.__isset.boolVal) return (num_rows * sizeof(bool)) + col.boolVal.nulls.size();
101  if (col.__isset.byteVal) return num_rows + col.byteVal.nulls.size();
102  if (col.__isset.i16Val) return (num_rows * sizeof(int16_t)) + col.i16Val.nulls.size();
103  if (col.__isset.i32Val) return (num_rows * sizeof(int32_t)) + col.i32Val.nulls.size();
104  if (col.__isset.i64Val) return (num_rows * sizeof(int64_t)) + col.i64Val.nulls.size();
105  if (col.__isset.doubleVal) {
106  return (num_rows * sizeof(double)) + col.doubleVal.nulls.size();
107  }
108  if (col.__isset.stringVal) {
109  uint32_t bytes = 0;
110  for (int i = start_idx; i < end_idx; ++i) bytes += col.stringVal.values[i].size();
111  return bytes + col.stringVal.nulls.size();
112  }
113 
114  return 0;
115 }
116 
117 // Helper function to translate between Beeswax and HiveServer2 type
118 static TOperationState::type QueryStateToTOperationState(
119  const beeswax::QueryState::type& query_state);
120 
121 // Result set container for Hive protocol versions >= V6, where results are returned in
122 // column-orientation.
124  public:
125  HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
126  : metadata_(metadata), result_set_(rowset), num_rows_(0) {
127  if (rowset == NULL) {
128  owned_result_set_.reset(new TRowSet());
129  result_set_ = owned_result_set_.get();
130  }
131  InitColumns();
132  }
133 
134  virtual ~HS2ColumnarResultSet() { }
135 
136  // Add a row of expr values
137  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) {
138  int num_col = col_values.size();
139  DCHECK_EQ(num_col, metadata_.columns.size());
140  for (int i = 0; i < num_col; ++i) {
141  ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_,
142  &(result_set_->columns[i]));
143  }
144  ++num_rows_;
145  return Status::OK;
146  }
147 
148  // Add a row from a TResultRow
149  virtual Status AddOneRow(const TResultRow& row) {
150  int num_col = row.colVals.size();
151  DCHECK_EQ(num_col, metadata_.columns.size());
152  for (int i = 0; i < num_col; ++i) {
153  TColumnValueToHS2TColumn(row.colVals[i], metadata_.columns[i].columnType, num_rows_,
154  &(result_set_->columns[i]));
155  }
156  ++num_rows_;
157  return Status::OK;
158  }
159 
160  // Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
161  // from 'other' into this result set
162  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) {
163  const HS2ColumnarResultSet* o = static_cast<const HS2ColumnarResultSet*>(other);
164  DCHECK_EQ(metadata_.columns.size(), o->metadata_.columns.size());
165  if (start_idx >= o->num_rows_) return 0;
166  const int rows_added =
167  min(static_cast<long>(num_rows), o->num_rows_ - start_idx);
168  for (int j = 0; j < metadata_.columns.size(); ++j) {
169  thrift::TColumn* from = &o->result_set_->columns[j];
170  thrift::TColumn* to = &result_set_->columns[j];
171  switch (metadata_.columns[j].columnType.types[0].scalar_type.type) {
172  case TPrimitiveType::NULL_TYPE:
173  case TPrimitiveType::BOOLEAN:
174  StitchNulls(num_rows_, rows_added, start_idx, from->boolVal.nulls,
175  &(to->boolVal.nulls));
176  to->boolVal.values.insert(
177  to->boolVal.values.end(),
178  from->boolVal.values.begin() + start_idx,
179  from->boolVal.values.begin() + start_idx + rows_added);
180  break;
181  case TPrimitiveType::TINYINT:
182  StitchNulls(num_rows_, rows_added, start_idx, from->byteVal.nulls,
183  &(to->byteVal.nulls));
184  to->byteVal.values.insert(
185  to->byteVal.values.end(),
186  from->byteVal.values.begin() + start_idx,
187  from->byteVal.values.begin() + start_idx + rows_added);
188  break;
189  case TPrimitiveType::SMALLINT:
190  StitchNulls(num_rows_, rows_added, start_idx, from->i16Val.nulls,
191  &(to->i16Val.nulls));
192  to->i16Val.values.insert(
193  to->i16Val.values.end(),
194  from->i16Val.values.begin() + start_idx,
195  from->i16Val.values.begin() + start_idx + rows_added);
196  break;
197  case TPrimitiveType::INT:
198  StitchNulls(num_rows_, rows_added, start_idx, from->i32Val.nulls,
199  &(to->i32Val.nulls));
200  to->i32Val.values.insert(
201  to->i32Val.values.end(),
202  from->i32Val.values.begin() + start_idx,
203  from->i32Val.values.begin() + start_idx + rows_added);
204  break;
205  case TPrimitiveType::BIGINT:
206  StitchNulls(num_rows_, rows_added, start_idx, from->i64Val.nulls,
207  &(to->i64Val.nulls));
208  to->i64Val.values.insert(
209  to->i64Val.values.end(),
210  from->i64Val.values.begin() + start_idx,
211  from->i64Val.values.begin() + start_idx + rows_added);
212  break;
213  case TPrimitiveType::FLOAT:
214  case TPrimitiveType::DOUBLE:
215  StitchNulls(num_rows_, rows_added, start_idx, from->doubleVal.nulls,
216  &(to->doubleVal.nulls));
217  to->doubleVal.values.insert(
218  to->doubleVal.values.end(),
219  from->doubleVal.values.begin() + start_idx,
220  from->doubleVal.values.begin() + start_idx + rows_added);
221  break;
222  case TPrimitiveType::TIMESTAMP:
223  case TPrimitiveType::DECIMAL:
224  case TPrimitiveType::STRING:
225  case TPrimitiveType::VARCHAR:
226  case TPrimitiveType::CHAR:
227  StitchNulls(num_rows_, rows_added, start_idx, from->stringVal.nulls,
228  &(to->stringVal.nulls));
229  to->stringVal.values.insert(to->stringVal.values.end(),
230  from->stringVal.values.begin() + start_idx,
231  from->stringVal.values.begin() + start_idx + rows_added);
232  break;
233  default:
234  DCHECK(false) << "Unsupported type: " << TypeToString(ThriftToType(
235  metadata_.columns[j].columnType.types[0].scalar_type.type));
236  break;
237  }
238  }
239  num_rows_ += rows_added;
240  return rows_added;
241  }
242 
243  virtual int64_t ByteSize(int start_idx, int num_rows) {
244  const int end = min(start_idx + num_rows, (int)size());
245  int64_t bytes = 0L;
246  BOOST_FOREACH(const thrift::TColumn& c, result_set_->columns) {
247  bytes += TColumnByteSize(c, start_idx, end);
248  }
249  return bytes;
250  }
251 
252  virtual size_t size() { return num_rows_; }
253 
254  private:
255  // Metadata of the result set
256  const TResultSetMetadata& metadata_;
257 
258  // Points to the TRowSet to be filled. The row set this points to may be owned by
259  // this object, in which case owned_result_set_ is set.
260  TRowSet* result_set_;
261 
262  // Set to result_set_ if result_set_ is owned.
263  scoped_ptr<TRowSet> owned_result_set_;
264 
265  int64_t num_rows_;
266 
267  void InitColumns() {
268  result_set_->__isset.columns = true;
269  BOOST_FOREACH(const TColumn& col, metadata_.columns) {
270  DCHECK(col.columnType.types.size() == 1) <<
271  "Structured columns unsupported in HS2 interface";
272  thrift::TColumn column;
273  switch (col.columnType.types[0].scalar_type.type) {
274  case TPrimitiveType::NULL_TYPE:
275  case TPrimitiveType::BOOLEAN:
276  column.__isset.boolVal = true;
277  break;
278  case TPrimitiveType::TINYINT:
279  column.__isset.byteVal = true;
280  break;
281  case TPrimitiveType::SMALLINT:
282  column.__isset.i16Val = true;
283  break;
284  case TPrimitiveType::INT:
285  column.__isset.i32Val = true;
286  break;
287  case TPrimitiveType::BIGINT:
288  column.__isset.i64Val = true;
289  break;
290  case TPrimitiveType::FLOAT:
291  case TPrimitiveType::DOUBLE:
292  column.__isset.doubleVal = true;
293  break;
294  case TPrimitiveType::TIMESTAMP:
295  case TPrimitiveType::DECIMAL:
296  case TPrimitiveType::VARCHAR:
297  case TPrimitiveType::CHAR:
298  case TPrimitiveType::STRING:
299  column.__isset.stringVal = true;
300  break;
301  default:
302  DCHECK(false) << "Unhandled column type: "
303  << TypeToString(
304  ThriftToType(col.columnType.types[0].scalar_type.type));
305  }
306  result_set_->columns.push_back(column);
307  }
308  }
309 };
310 
311 // TRow result set for HiveServer2
313  public:
314  // Rows are added into rowset.
315  HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
316  : metadata_(metadata), result_set_(rowset) {
317  if (rowset == NULL) {
318  owned_result_set_.reset(new TRowSet());
319  result_set_ = owned_result_set_.get();
320  }
321  }
322 
324 
325  // Convert expr value to HS2 TRow and store it in TRowSet.
326  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) {
327  int num_col = col_values.size();
328  DCHECK_EQ(num_col, metadata_.columns.size());
329  result_set_->rows.push_back(TRow());
330  TRow& trow = result_set_->rows.back();
331  trow.colVals.resize(num_col);
332  for (int i = 0; i < num_col; ++i) {
333  ExprValueToHS2TColumnValue(col_values[i],
334  metadata_.columns[i].columnType, &(trow.colVals[i]));
335  }
336  return Status::OK;
337  }
338 
339  // Convert TResultRow to HS2 TRow and store it in TRowSet.
340  virtual Status AddOneRow(const TResultRow& row) {
341  int num_col = row.colVals.size();
342  DCHECK_EQ(num_col, metadata_.columns.size());
343  result_set_->rows.push_back(TRow());
344  TRow& trow = result_set_->rows.back();
345  trow.colVals.resize(num_col);
346  for (int i = 0; i < num_col; ++i) {
347  TColumnValueToHS2TColumnValue(row.colVals[i], metadata_.columns[i].columnType,
348  &(trow.colVals[i]));
349  }
350  return Status::OK;
351  }
352 
353  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) {
354  const HS2RowOrientedResultSet* o = static_cast<const HS2RowOrientedResultSet*>(other);
355  if (start_idx >= o->result_set_->rows.size()) return 0;
356  const int rows_added =
357  min(static_cast<size_t>(num_rows), o->result_set_->rows.size() - start_idx);
358  for (int i = start_idx; i < start_idx + rows_added; ++i) {
359  result_set_->rows.push_back(o->result_set_->rows[i]);
360  }
361  return rows_added;
362  }
363 
364  virtual int64_t ByteSize(int start_idx, int num_rows) {
365  int64_t bytes = 0;
366  const int end =
367  min(static_cast<size_t>(num_rows), result_set_->rows.size() - start_idx);
368  for (int i = start_idx; i < start_idx + end; ++i) {
369  bytes += impala::ByteSize(result_set_->rows[i]);
370  }
371  return bytes;
372  }
373 
374  virtual size_t size() { return result_set_->rows.size(); }
375 
376  private:
377  // Metadata of the result set
378  const TResultSetMetadata& metadata_;
379 
380  // Points to the TRowSet to be filled. The row set this points to may be owned by
381  // this object, in which case owned_result_set_ is set.
382  TRowSet* result_set_;
383 
384  // Set to result_set_ if result_set_ is owned.
385  scoped_ptr<TRowSet> owned_result_set_;
386 };
387 
388 ImpalaServer::QueryResultSet* ImpalaServer::CreateHS2ResultSet(
389  TProtocolVersion::type version, const TResultSetMetadata& metadata,
390  TRowSet* rowset) {
391  if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
392  return new HS2RowOrientedResultSet(metadata, rowset);
393  } else {
394  return new HS2ColumnarResultSet(metadata, rowset);
395  }
396 }
397 
398 void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
399  TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) {
400  TUniqueId session_id;
401  TUniqueId secret;
402  Status unique_id_status =
403  THandleIdentifierToTUniqueId(session_handle, &session_id, &secret);
404  if (!unique_id_status.ok()) {
405  status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
406  status->__set_errorMessage(unique_id_status.GetDetail());
407  status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
408  return;
409  }
410  ScopedSessionState scoped_session(this);
411  shared_ptr<SessionState> session;
412  Status get_session_status = scoped_session.WithSession(session_id, &session);
413  if (!get_session_status.ok()) {
414  status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
415  status->__set_errorMessage(get_session_status.GetDetail());
416  // TODO: (here and elsewhere) - differentiate between invalid session ID and timeout
417  // when setting the error code.
418  status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
419  return;
420  }
421 
422  if (session == NULL) {
423  status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
424  status->__set_errorMessage("Invalid session ID");
425  status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
426  return;
427  }
428  TQueryCtx query_ctx;
429  PrepareQueryContext(&query_ctx);
430  session->ToThrift(session_id, &query_ctx.session);
431  request->__set_session(query_ctx.session);
432 
433  shared_ptr<QueryExecState> exec_state;
434  // There is no user-supplied query text available because this metadata operation comes
435  // from an RPC. As a best effort, we use the type of the operation.
436  map<int, const char*>::const_iterator query_text_it =
437  _TMetadataOpcode_VALUES_TO_NAMES.find(request->opcode);
438  const string& query_text = query_text_it == _TMetadataOpcode_VALUES_TO_NAMES.end() ?
439  "N/A" : query_text_it->second;
440  query_ctx.request.stmt = query_text;
441  exec_state.reset(new QueryExecState(query_ctx, exec_env_,
442  exec_env_->frontend(), this, session));
443  Status register_status = RegisterQuery(session, exec_state);
444  if (!register_status.ok()) {
445  status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
446  status->__set_errorMessage(register_status.GetDetail());
447  status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
448  return;
449  }
450 
451  Status exec_status = exec_state->Exec(*request);
452  if (!exec_status.ok()) {
453  UnregisterQuery(exec_state->query_id(), false, &exec_status);
454  status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
455  status->__set_errorMessage(exec_status.GetDetail());
456  status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
457  return;
458  }
459 
460  exec_state->UpdateQueryState(QueryState::FINISHED);
461 
462  Status inflight_status = SetQueryInflight(session, exec_state);
463  if (!inflight_status.ok()) {
464  UnregisterQuery(exec_state->query_id(), false, &inflight_status);
465  status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
466  status->__set_errorMessage(inflight_status.GetDetail());
467  status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
468  return;
469  }
470  handle->__set_hasResultSet(true);
471  // TODO: create secret for operationId
472  TUniqueId operation_id = exec_state->query_id();
473  TUniqueIdToTHandleIdentifier(operation_id, operation_id, &(handle->operationId));
474  status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
475 }
476 
477 Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size,
478  bool fetch_first, TFetchResultsResp* fetch_results) {
479  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
480  if (exec_state == NULL) return Status("Invalid query handle");
481 
482  // FetchResults doesn't have an associated session handle, so we presume that this
483  // request should keep alive the same session that orignated the query.
484  ScopedSessionState session_handle(this);
485  const TUniqueId session_id = exec_state->session_id();
486  shared_ptr<SessionState> session;
487  RETURN_IF_ERROR(session_handle.WithSession(session_id, &session));
488 
489  // Make sure QueryExecState::Wait() has completed before fetching rows. Wait() ensures
490  // that rows are ready to be fetched (e.g., Wait() opens QueryExecState::output_exprs_,
491  // which are evaluated in QueryExecState::FetchRows() below).
492  exec_state->BlockOnWait();
493 
494  lock_guard<mutex> frl(*exec_state->fetch_rows_lock());
495  lock_guard<mutex> l(*exec_state->lock());
496 
497  // Check for cancellation or an error.
498  RETURN_IF_ERROR(exec_state->query_status());
499 
500  if (exec_state->num_rows_fetched() == 0) {
501  exec_state->query_events()->MarkEvent("First row fetched");
502  exec_state->set_fetched_rows();
503  }
504 
505  if (fetch_first) RETURN_IF_ERROR(exec_state->RestartFetch());
506 
507  fetch_results->results.__set_startRowOffset(exec_state->num_rows_fetched());
508 
509  // Child queries should always return their results in row-major format, rather than
510  // inheriting the parent session's setting.
511  bool is_child_query = exec_state->parent_query_id() != TUniqueId();
512  TProtocolVersion::type version = is_child_query ?
513  TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version;
514  scoped_ptr<QueryResultSet> result_set(CreateHS2ResultSet(version,
515  *(exec_state->result_metadata()), &(fetch_results->results)));
516  RETURN_IF_ERROR(exec_state->FetchRows(fetch_size, result_set.get()));
517  fetch_results->__isset.results = true;
518  fetch_results->__set_hasMoreRows(!exec_state->eos());
519  return Status::OK;
520 }
521 
522 Status ImpalaServer::TExecuteStatementReqToTQueryContext(
523  const TExecuteStatementReq execute_request, TQueryCtx* query_ctx) {
524  query_ctx->request.stmt = execute_request.statement;
525  VLOG_QUERY << "TExecuteStatementReq: " << ThriftDebugString(execute_request);
526  {
527  shared_ptr<SessionState> session_state;
528  TUniqueId session_id;
529  TUniqueId secret;
530  RETURN_IF_ERROR(THandleIdentifierToTUniqueId(execute_request.sessionHandle.sessionId,
531  &session_id, &secret));
532 
533  RETURN_IF_ERROR(GetSessionState(session_id, &session_state));
534  session_state->ToThrift(session_id, &query_ctx->session);
535  lock_guard<mutex> l(session_state->lock);
536  query_ctx->request.query_options = session_state->default_query_options;
537  }
538 
539  if (execute_request.__isset.confOverlay) {
540  map<string, string>::const_iterator conf_itr = execute_request.confOverlay.begin();
541  for (; conf_itr != execute_request.confOverlay.end(); ++conf_itr) {
542  if (conf_itr->first == IMPALA_RESULT_CACHING_OPT) continue;
543  if (conf_itr->first == ChildQuery::PARENT_QUERY_OPT) {
544  if (ParseId(conf_itr->second, &query_ctx->parent_query_id)) {
545  query_ctx->__isset.parent_query_id = true;
546  }
547  continue;
548  }
549  RETURN_IF_ERROR(SetQueryOption(conf_itr->first, conf_itr->second,
550  &query_ctx->request.query_options));
551  }
552  VLOG_QUERY << "TClientRequest.queryOptions: "
553  << ThriftDebugString(query_ctx->request.query_options);
554  }
555  return Status::OK;
556 }
557 
558 // HiveServer2 API
559 void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
560  const TOpenSessionReq& request) {
561  // DO NOT log this Thrift struct in its entirety, in case a bad client sets the
562  // password.
563  VLOG_QUERY << "OpenSession(): username=" << request.username;
564 
565  // Generate session ID and the secret
566  TUniqueId session_id;
567  {
568  lock_guard<mutex> l(uuid_lock_);
569  uuid secret = uuid_generator_();
570  uuid session_uuid = uuid_generator_();
571  return_val.sessionHandle.sessionId.guid.assign(
572  session_uuid.begin(), session_uuid.end());
573  return_val.sessionHandle.sessionId.secret.assign(secret.begin(), secret.end());
574  DCHECK_EQ(return_val.sessionHandle.sessionId.guid.size(), 16);
575  DCHECK_EQ(return_val.sessionHandle.sessionId.secret.size(), 16);
576  return_val.__isset.sessionHandle = true;
577  UUIDToTUniqueId(session_uuid, &session_id);
578  }
579  // create a session state: initialize start time, session type, database and default
580  // query options.
581  // TODO: put secret in session state map and check it
582  // TODO: Fix duplication of code between here and ConnectionStart().
583  shared_ptr<SessionState> state(new SessionState());
584  state->closed = false;
585  state->start_time = TimestampValue::LocalTime();
586  state->session_type = TSessionType::HIVESERVER2;
587  state->network_address = ThriftServer::GetThreadConnectionContext()->network_address;
588  state->last_accessed_ms = UnixMillis();
589  state->hs2_version = min(MAX_SUPPORTED_HS2_VERSION, request.client_protocol);
590 
591  // If the username was set by a lower-level transport, use it.
592  const ThriftServer::Username& username =
593  ThriftServer::GetThreadConnectionContext()->username;
594  if (!username.empty()) {
595  state->connected_user = username;
596  } else {
597  state->connected_user = request.username;
598  }
599 
600  // TODO: request.configuration might specify database.
601  state->database = "default";
602 
603  // Convert request.configuration to session default query options.
604  state->default_query_options = default_query_options_;
605  if (request.__isset.configuration) {
606  map<string, string>::const_iterator conf_itr = request.configuration.begin();
607  for (; conf_itr != request.configuration.end(); ++conf_itr) {
608  // If the current user is a valid proxy user, he/she can optionally perform
609  // authorization requests on behalf of another user. This is done by setting the
610  // 'impala.doas.user' Hive Server 2 configuration property.
611  if (conf_itr->first == "impala.doas.user") {
612  state->do_as_user = conf_itr->second;
613  Status status = AuthorizeProxyUser(state->connected_user, state->do_as_user);
614  HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
615  continue;
616  }
617  // Ignore failure to set query options (will be logged)
618  SetQueryOption(conf_itr->first, conf_itr->second, &state->default_query_options);
619  }
620  }
621  TQueryOptionsToMap(state->default_query_options, &return_val.configuration);
622 
623  // Put the session state in session_state_map_
624  {
625  lock_guard<mutex> l(session_state_map_lock_);
626  session_state_map_.insert(make_pair(session_id, state));
627  }
628 
629  {
630  lock_guard<mutex> l(connection_to_sessions_map_lock_);
631  const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId();
632  connection_to_sessions_map_[connection_id].push_back(session_id);
633  }
634 
635  ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(1L);
636 
637  return_val.__isset.configuration = true;
638  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
639  return_val.serverProtocolVersion = state->hs2_version;
640 }
641 
642 void ImpalaServer::CloseSession(TCloseSessionResp& return_val,
643  const TCloseSessionReq& request) {
644  VLOG_QUERY << "CloseSession(): request=" << ThriftDebugString(request);
645 
646  TUniqueId session_id;
647  TUniqueId secret;
648  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
649  request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
650  HS2_RETURN_IF_ERROR(return_val,
651  CloseSessionInternal(session_id, false), SQLSTATE_GENERAL_ERROR);
652  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
653 }
654 
655 void ImpalaServer::GetInfo(TGetInfoResp& return_val,
656  const TGetInfoReq& request) {
657  VLOG_QUERY << "GetInfo(): request=" << ThriftDebugString(request);
658 
659  TUniqueId session_id;
660  TUniqueId secret;
661  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
662  request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
663  ScopedSessionState session_handle(this);
664  shared_ptr<SessionState> session;
665  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
666  SQLSTATE_GENERAL_ERROR);
667 
668  switch (request.infoType) {
669  case TGetInfoType::CLI_SERVER_NAME:
670  case TGetInfoType::CLI_DBMS_NAME:
671  return_val.infoValue.__set_stringValue("Impala");
672  break;
673  case TGetInfoType::CLI_DBMS_VER:
674  return_val.infoValue.__set_stringValue(IMPALA_BUILD_VERSION);
675  break;
676  default:
677  HS2_RETURN_ERROR(return_val, "Unsupported operation",
678  SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED);
679  }
680  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
681 }
682 
683 void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
684  const TExecuteStatementReq& request) {
685  VLOG_QUERY << "ExecuteStatement(): request=" << ThriftDebugString(request);
686  // We ignore the runAsync flag here: Impala's queries will always run asynchronously,
687  // and will block on fetch. To the client, this looks like Hive's synchronous mode; the
688  // difference is that rows are not available when ExecuteStatement() returns.
689  TQueryCtx query_ctx;
690  Status status = TExecuteStatementReqToTQueryContext(request, &query_ctx);
691  HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
692 
693  TUniqueId session_id;
694  TUniqueId secret;
695  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
696  request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
697  ScopedSessionState session_handle(this);
698  shared_ptr<SessionState> session;
699  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
700  SQLSTATE_GENERAL_ERROR);
701  if (session == NULL) {
703  return_val, Status("Invalid session ID"), SQLSTATE_GENERAL_ERROR);
704  }
705 
706  // Optionally enable result caching to allow restarting fetches.
707  int64_t cache_num_rows = -1;
708  if (request.__isset.confOverlay) {
709  map<string, string>::const_iterator iter =
710  request.confOverlay.find(IMPALA_RESULT_CACHING_OPT);
711  if (iter != request.confOverlay.end()) {
712  StringParser::ParseResult parse_result;
713  cache_num_rows = StringParser::StringToInt<int64_t>(
714  iter->second.c_str(), iter->second.size(), &parse_result);
715  if (parse_result != StringParser::PARSE_SUCCESS) {
717  return_val, Status(Substitute("Invalid value '$0' for '$1' option.",
718  iter->second, IMPALA_RESULT_CACHING_OPT)), SQLSTATE_GENERAL_ERROR);
719  }
720  }
721  }
722 
723  shared_ptr<QueryExecState> exec_state;
724  status = Execute(&query_ctx, session, &exec_state);
725  HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
726 
727  // Optionally enable result caching on the QueryExecState.
728  if (cache_num_rows > 0) {
729  status = exec_state->SetResultCache(CreateHS2ResultSet(session->hs2_version,
730  *exec_state->result_metadata()), cache_num_rows);
731  if (!status.ok()) {
732  UnregisterQuery(exec_state->query_id(), false, &status);
733  HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
734  }
735  }
736  exec_state->UpdateQueryState(QueryState::RUNNING);
737  // Start thread to wait for results to become available.
738  exec_state->WaitAsync();
739  // Once the query is running do a final check for session closure and add it to the
740  // set of in-flight queries.
741  status = SetQueryInflight(session, exec_state);
742  if (!status.ok()) {
743  UnregisterQuery(exec_state->query_id(), false, &status);
744  HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
745  }
746  return_val.__isset.operationHandle = true;
747  return_val.operationHandle.__set_operationType(TOperationType::EXECUTE_STATEMENT);
748  return_val.operationHandle.__set_hasResultSet(exec_state->returns_result_set());
749  // TODO: create secret for operationId and store the secret in exec_state
750  TUniqueIdToTHandleIdentifier(exec_state->query_id(), exec_state->query_id(),
751  &return_val.operationHandle.operationId);
752  return_val.status.__set_statusCode(
753  apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS);
754 
755  VLOG_QUERY << "ExecuteStatement(): return_val=" << ThriftDebugString(return_val);
756 }
757 
758 void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val,
759  const TGetTypeInfoReq& request) {
760  VLOG_QUERY << "GetTypeInfo(): request=" << ThriftDebugString(request);
761 
762  TMetadataOpRequest req;
763  req.__set_opcode(TMetadataOpcode::GET_TYPE_INFO);
764  req.__set_get_type_info_req(request);
765 
766  TOperationHandle handle;
767  thrift::TStatus status;
768  ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
769  handle.__set_operationType(TOperationType::GET_TYPE_INFO);
770  return_val.__set_operationHandle(handle);
771  return_val.__set_status(status);
772 
773  VLOG_QUERY << "GetTypeInfo(): return_val=" << ThriftDebugString(return_val);
774 }
775 
776 void ImpalaServer::GetCatalogs(TGetCatalogsResp& return_val,
777  const TGetCatalogsReq& request) {
778  VLOG_QUERY << "GetCatalogs(): request=" << ThriftDebugString(request);
779 
780  TMetadataOpRequest req;
781  req.__set_opcode(TMetadataOpcode::GET_CATALOGS);
782  req.__set_get_catalogs_req(request);
783 
784  TOperationHandle handle;
785  thrift::TStatus status;
786  ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
787  handle.__set_operationType(TOperationType::GET_CATALOGS);
788  return_val.__set_operationHandle(handle);
789  return_val.__set_status(status);
790 
791  VLOG_QUERY << "GetCatalogs(): return_val=" << ThriftDebugString(return_val);
792 }
793 
794 void ImpalaServer::GetSchemas(TGetSchemasResp& return_val,
795  const TGetSchemasReq& request) {
796  VLOG_QUERY << "GetSchemas(): request=" << ThriftDebugString(request);
797 
798  TMetadataOpRequest req;
799  req.__set_opcode(TMetadataOpcode::GET_SCHEMAS);
800  req.__set_get_schemas_req(request);
801 
802  TOperationHandle handle;
803  thrift::TStatus status;
804  ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
805  handle.__set_operationType(TOperationType::GET_SCHEMAS);
806  return_val.__set_operationHandle(handle);
807  return_val.__set_status(status);
808 
809  VLOG_QUERY << "GetSchemas(): return_val=" << ThriftDebugString(return_val);
810 }
811 
812 void ImpalaServer::GetTables(TGetTablesResp& return_val,
813  const TGetTablesReq& request) {
814  VLOG_QUERY << "GetTables(): request=" << ThriftDebugString(request);
815 
816  TMetadataOpRequest req;
817  req.__set_opcode(TMetadataOpcode::GET_TABLES);
818  req.__set_get_tables_req(request);
819 
820  TOperationHandle handle;
821  thrift::TStatus status;
822  ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
823  handle.__set_operationType(TOperationType::GET_TABLES);
824  return_val.__set_operationHandle(handle);
825  return_val.__set_status(status);
826 
827  VLOG_QUERY << "GetTables(): return_val=" << ThriftDebugString(return_val);
828 }
829 
830 void ImpalaServer::GetTableTypes(TGetTableTypesResp& return_val,
831  const TGetTableTypesReq& request) {
832  VLOG_QUERY << "GetTableTypes(): request=" << ThriftDebugString(request);
833 
834  TMetadataOpRequest req;
835  req.__set_opcode(TMetadataOpcode::GET_TABLE_TYPES);
836  req.__set_get_table_types_req(request);
837 
838  TOperationHandle handle;
839  thrift::TStatus status;
840  ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
841  handle.__set_operationType(TOperationType::GET_TABLE_TYPES);
842  return_val.__set_operationHandle(handle);
843  return_val.__set_status(status);
844 
845  VLOG_QUERY << "GetTableTypes(): return_val=" << ThriftDebugString(return_val);
846 
847 }
848 
849 void ImpalaServer::GetColumns(TGetColumnsResp& return_val,
850  const TGetColumnsReq& request) {
851  VLOG_QUERY << "GetColumns(): request=" << ThriftDebugString(request);
852 
853  TMetadataOpRequest req;
854  req.__set_opcode(TMetadataOpcode::GET_COLUMNS);
855  req.__set_get_columns_req(request);
856 
857  TOperationHandle handle;
858  thrift::TStatus status;
859  ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
860  handle.__set_operationType(TOperationType::GET_COLUMNS);
861  return_val.__set_operationHandle(handle);
862  return_val.__set_status(status);
863 
864  VLOG_QUERY << "GetColumns(): return_val=" << ThriftDebugString(return_val);
865 }
866 
867 void ImpalaServer::GetFunctions(TGetFunctionsResp& return_val,
868  const TGetFunctionsReq& request) {
869  VLOG_QUERY << "GetFunctions(): request=" << ThriftDebugString(request);
870 
871  TMetadataOpRequest req;
872  req.__set_opcode(TMetadataOpcode::GET_FUNCTIONS);
873  req.__set_get_functions_req(request);
874 
875  TOperationHandle handle;
876  thrift::TStatus status;
877  ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
878  handle.__set_operationType(TOperationType::GET_FUNCTIONS);
879  return_val.__set_operationHandle(handle);
880  return_val.__set_status(status);
881 
882  VLOG_QUERY << "GetFunctions(): return_val=" << ThriftDebugString(return_val);
883 }
884 
885 void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
886  const TGetOperationStatusReq& request) {
887  if (request.operationHandle.operationId.guid.size() == 0) {
888  // An empty operation handle identifier means no execution and no result for this
889  // query (USE <database>).
890  VLOG_ROW << "GetOperationStatus(): guid size 0";
891  return_val.operationState = TOperationState::FINISHED_STATE;
892  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
893  return;
894  }
895 
896  // TODO: check secret
897  TUniqueId query_id;
898  TUniqueId secret;
899  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
900  request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
901  VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id);
902 
903  lock_guard<mutex> l(query_exec_state_map_lock_);
904  QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
905  if (entry != query_exec_state_map_.end()) {
906  QueryState::type query_state = entry->second->query_state();
907  TOperationState::type operation_state = QueryStateToTOperationState(query_state);
908  return_val.__set_operationState(operation_state);
909  return;
910  }
911 
912  // No handle was found
913  HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
914 }
915 
916 void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
917  const TCancelOperationReq& request) {
918  TUniqueId query_id;
919  TUniqueId secret;
920  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
921  request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
922  VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id);
923 
924  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
925  if (exec_state.get() == NULL) {
926  // No handle was found
927  HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
928  }
929  ScopedSessionState session_handle(this);
930  const TUniqueId session_id = exec_state->session_id();
931  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
932  SQLSTATE_GENERAL_ERROR);
933  HS2_RETURN_IF_ERROR(return_val, CancelInternal(query_id, true), SQLSTATE_GENERAL_ERROR);
934  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
935 }
936 
937 void ImpalaServer::CloseOperation(TCloseOperationResp& return_val,
938  const TCloseOperationReq& request) {
939  TUniqueId query_id;
940  TUniqueId secret;
941  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
942  request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
943  VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id);
944 
945  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
946  if (exec_state.get() == NULL) {
947  // No handle was found
948  HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
949  }
950  ScopedSessionState session_handle(this);
951  const TUniqueId session_id = exec_state->session_id();
952  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
953  SQLSTATE_GENERAL_ERROR);
954  // TODO: use timeout to get rid of unwanted exec_state.
955  HS2_RETURN_IF_ERROR(return_val, UnregisterQuery(query_id, true),
956  SQLSTATE_GENERAL_ERROR);
957  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
958 }
959 
960 void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
961  const TGetResultSetMetadataReq& request) {
962  // Convert Operation id to TUniqueId and get the query exec state.
963  // TODO: check secret
964  TUniqueId query_id;
965  TUniqueId secret;
966  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
967  request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
968  VLOG_QUERY << "GetResultSetMetadata(): query_id=" << PrintId(query_id);
969 
970  // Look up the session ID (which takes session_state_map_lock_) before taking the query
971  // exec state lock.
972  TUniqueId session_id;
973  if (!GetSessionIdForQuery(query_id, &session_id)) {
974  HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
975  }
976  ScopedSessionState session_handle(this);
977  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
978  SQLSTATE_GENERAL_ERROR);
979 
980  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
981  if (exec_state.get() == NULL) {
982  VLOG_QUERY << "GetResultSetMetadata(): invalid query handle";
983  // No handle was found
984  HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
985  }
986  {
987  // make sure we release the lock on exec_state if we see any error
988  lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
989 
990  // Convert TResultSetMetadata to TGetResultSetMetadataResp
991  const TResultSetMetadata* result_set_md = exec_state->result_metadata();
992  DCHECK(result_set_md != NULL);
993  if (result_set_md->columns.size() > 0) {
994  return_val.__isset.schema = true;
995  return_val.schema.columns.resize(result_set_md->columns.size());
996  for (int i = 0; i < result_set_md->columns.size(); ++i) {
997  return_val.schema.columns[i].__set_columnName(
998  result_set_md->columns[i].columnName);
999  return_val.schema.columns[i].position = i;
1000  return_val.schema.columns[i].typeDesc.types.resize(1);
1001  ColumnType t(result_set_md->columns[i].columnType);
1002  return_val.schema.columns[i].typeDesc.types[0] = t.ToHs2Type();
1003  }
1004  }
1005  }
1006 
1007  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1008  VLOG_QUERY << "GetResultSetMetadata(): return_val=" << ThriftDebugString(return_val);
1009 }
1010 
1011 void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
1012  const TFetchResultsReq& request) {
1013  if (request.orientation != TFetchOrientation::FETCH_NEXT
1014  && request.orientation != TFetchOrientation::FETCH_FIRST) {
1015  HS2_RETURN_ERROR(return_val, "Unsupported operation",
1016  SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED);
1017  }
1018  bool fetch_first = request.orientation == TFetchOrientation::FETCH_FIRST;
1019 
1020  // Convert Operation id to TUniqueId and get the query exec state.
1021  // TODO: check secret
1022  TUniqueId query_id;
1023  TUniqueId secret;
1024  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
1025  request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
1026  VLOG_ROW << "FetchResults(): query_id=" << PrintId(query_id)
1027  << " fetch_size=" << request.maxRows;
1028 
1029  // FetchInternal takes care of extending the session
1030  Status status = FetchInternal(query_id, request.maxRows, fetch_first, &return_val);
1031  VLOG_ROW << "FetchResults(): #results=" << return_val.results.rows.size()
1032  << " has_more=" << (return_val.hasMoreRows ? "true" : "false");
1033  if (!status.ok()) {
1034  // Only unregister the query if the underlying error is unrecoverable.
1035  // Clients are expected to understand that a failed FETCH_FIRST is recoverable,
1036  // and hence, the query must eventually be closed by the client.
1037  // It is important to ensure FETCH_NEXT does not return recoverable errors to
1038  // preserve compatibility with clients written against Impala versions < 1.3.
1039  if (status.IsRecoverableError()) {
1040  DCHECK(fetch_first);
1041  } else {
1042  UnregisterQuery(query_id, false, &status);
1043  }
1044  HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
1045  }
1046  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1047 }
1048 
1049 void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
1050  TUniqueId query_id;
1051  TUniqueId secret;
1052  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
1053  request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
1054 
1055  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
1056  if (exec_state.get() == NULL) {
1057  // No handle was found
1058  HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
1059  }
1060 
1061  // GetLog doesn't have an associated session handle, so we presume that this request
1062  // should keep alive the same session that orignated the query.
1063  ScopedSessionState session_handle(this);
1064  const TUniqueId session_id = exec_state->session_id();
1065  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
1066  SQLSTATE_GENERAL_ERROR);
1067 
1068  stringstream ss;
1069  if (exec_state->coord() != NULL) {
1070  // Report progress
1071  ss << exec_state->coord()->progress().ToString() << "\n";
1072  }
1073  // Report analysis errors
1074  ss << join(exec_state->GetAnalysisWarnings(), "\n");
1075  if (exec_state->coord() != NULL) {
1076  // Report execution errors
1077  ss << exec_state->coord()->GetErrorLog();
1078  }
1079  return_val.log = ss.str();
1080  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1081 }
1082 
1083 void ImpalaServer::GetExecSummary(TGetExecSummaryResp& return_val,
1084  const TGetExecSummaryReq& request) {
1085  TUniqueId session_id;
1086  TUniqueId secret;
1087  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
1088  request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
1089  ScopedSessionState session_handle(this);
1090  shared_ptr<SessionState> session;
1091  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
1092  SQLSTATE_GENERAL_ERROR);
1093 
1094  TUniqueId query_id;
1095  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
1096  request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
1097 
1098  TExecSummary summary;
1099  Status status = GetExecSummary(query_id, &summary);
1100  HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
1101  return_val.__set_summary(summary);
1102  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1103 }
1104 
1105 void ImpalaServer::GetRuntimeProfile(TGetRuntimeProfileResp& return_val,
1106  const TGetRuntimeProfileReq& request) {
1107  TUniqueId session_id;
1108  TUniqueId secret;
1109  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
1110  request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
1111  ScopedSessionState session_handle(this);
1112  shared_ptr<SessionState> session;
1113  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
1114  SQLSTATE_GENERAL_ERROR);
1115 
1116  TUniqueId query_id;
1117  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
1118  request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
1119 
1120  stringstream ss;
1121  HS2_RETURN_IF_ERROR(return_val, GetRuntimeProfileStr(query_id, false, &ss),
1122  SQLSTATE_GENERAL_ERROR);
1123  return_val.__set_profile(ss.str());
1124  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1125 }
1126 
1127 void ImpalaServer::GetDelegationToken(TGetDelegationTokenResp& return_val,
1128  const TGetDelegationTokenReq& req) {
1129  return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
1130  return_val.status.__set_errorMessage("Not implemented");
1131 }
1132 
1133 void ImpalaServer::CancelDelegationToken(TCancelDelegationTokenResp& return_val,
1134  const TCancelDelegationTokenReq& req) {
1135  return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
1136  return_val.status.__set_errorMessage("Not implemented");
1137 }
1138 
1139 void ImpalaServer::RenewDelegationToken(TRenewDelegationTokenResp& return_val,
1140  const TRenewDelegationTokenReq& req) {
1141  return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
1142  return_val.status.__set_errorMessage("Not implemented");
1143 }
1144 
1145 TOperationState::type QueryStateToTOperationState(const QueryState::type& query_state) {
1146  switch (query_state) {
1147  case QueryState::CREATED: return TOperationState::INITIALIZED_STATE;
1148  case QueryState::RUNNING: return TOperationState::RUNNING_STATE;
1149  case QueryState::FINISHED: return TOperationState::FINISHED_STATE;
1150  case QueryState::EXCEPTION: return TOperationState::ERROR_STATE;
1151  default: return TOperationState::UKNOWN_STATE;
1152  }
1153 }
1154 
1155 }
void TColumnValueToHS2TColumnValue(const TColumnValue &col_val, const TColumnType &type, apache::hive::service::cli::thrift::TColumnValue *hs2_col_val)
For V1->V5.
virtual size_t size()
Returns the size of this result set in number of rows.
void StitchNulls(uint32_t num_rows_before, uint32_t num_rows_added, uint32_t start_idx, const std::string &from, std::string *to)
TOperationState::type QueryStateToTOperationState(const QueryState::type &query_state)
void TQueryOptionsToMap(const TQueryOptions &query_options, std::map< std::string, std::string > *configuration)
Converts a TQueryOptions struct into a map of key, value pairs.
const std::string GetDetail() const
Definition: status.cc:184
const TProtocolVersion::type MAX_SUPPORTED_HS2_VERSION
bool ParseId(const string &s, TUniqueId *id)
Definition: debug-util.cc:112
static uint32_t TColumnByteSize(const thrift::TColumn &col, uint32_t start_idx, uint32_t end_idx)
const TUniqueId & query_id() const
Definition: coordinator.h:152
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
std::string Username
Username.
Definition: thrift-server.h:42
PrimitiveType ThriftToType(TPrimitiveType::type ttype)
Definition: types.cc:27
virtual Status AddOneRow(const TResultRow &row)
static boost::uuids::random_generator uuid_generator_
HS2RowOrientedResultSet(const TResultSetMetadata &metadata, TRowSet *rowset=NULL)
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
Frontend * frontend()
Definition: exec-env.h:91
HS2ColumnarResultSet(const TResultSetMetadata &metadata, TRowSet *rowset=NULL)
virtual Status AddOneRow(const vector< void * > &col_values, const vector< int > &scales)
string TypeToString(PrimitiveType t)
Definition: types.cc:73
virtual Status AddOneRow(const vector< void * > &col_values, const vector< int > &scales)
int64_t UnixMillis()
Definition: time.h:51
void CloseOperation(apache::hive::service::cli::thrift::TFetchResultsReq &fetchResultsReq, TCLIServiceClient &impalaClient)
Method to close a fetch operation handle.
Definition: simba.cc:29
virtual size_t size()
Returns the size of this result set in number of rows.
#define VLOG_QUERY
Definition: logging.h:57
const string IMPALA_RESULT_CACHING_OPT
static int64_t ByteSize(const thrift::TRow &row)
Status WithSession(const TUniqueId &session_id, boost::shared_ptr< SessionState > *session=NULL)
static int64_t ByteSize(const thrift::TColumnValue &val)
virtual int AddRows(const QueryResultSet *other, int start_idx, int num_rows)
void TColumnValueToHS2TColumn(const TColumnValue &col_val, const TColumnType &type, uint32_t row_idx, apache::hive::service::cli::thrift::TColumn *column)
For V6->
apache::hive::service::cli::thrift::TTypeEntry ToHs2Type() const
Definition: types.cc:120
#define VLOG_ROW
Definition: logging.h:59
virtual Status AddOneRow(const TResultRow &row)
virtual int64_t ByteSize(int start_idx, int num_rows)
Returns the approximate size of the given range of rows in bytes.
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
Definition: uid-util.h:38
void ExprValueToHS2TColumn(const void *value, const TColumnType &type, uint32_t row_idx, apache::hive::service::cli::thrift::TColumn *column)
For V6->
#define HS2_RETURN_ERROR(return_val, error_msg, error_state)
ExecEnv * exec_env_
Definition: coordinator.h:193
#define IMPALA_BUILD_VERSION
Definition: version.h:21
void CancelInternal()
Runs cancel logic. Assumes that lock_ is held.
void SetQueryOption(TImpalaQueryOptions::type opt, const T &opt_val, TExecuteStatementReq *exec_stmt_req)
Definition: child-query.cc:102
bool ok() const
Definition: status.h:172
virtual int64_t ByteSize(int start_idx, int num_rows)
Returns the approximate size of the given range of rows in bytes.
virtual int AddRows(const QueryResultSet *other, int start_idx, int num_rows)
#define HS2_RETURN_IF_ERROR(return_val, status, error_state)
void ExprValueToHS2TColumnValue(const void *value, const TColumnType &type, apache::hive::service::cli::thrift::TColumnValue *hs2_col_val)
For V1->V5.
bool IsRecoverableError() const
Definition: status.h:183