Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala-beeswax-server.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "service/impala-server.h"
16 
17 #include <algorithm>
18 #include <boost/algorithm/string/join.hpp>
19 #include <boost/date_time/posix_time/posix_time_types.hpp>
20 #include <boost/unordered_set.hpp>
21 #include <jni.h>
22 #include <thrift/protocol/TDebugProtocol.h>
23 #include <gtest/gtest.h>
24 #include <boost/foreach.hpp>
25 #include <boost/bind.hpp>
26 #include <boost/algorithm/string.hpp>
27 #include <google/heap-profiler.h>
28 #include <google/malloc_extension.h>
29 
30 #include "codegen/llvm-codegen.h"
31 #include "common/logging.h"
32 #include "common/version.h"
33 #include "exec/exec-node.h"
34 #include "exec/hdfs-table-sink.h"
35 #include "exec/scan-node.h"
36 #include "exprs/expr.h"
38 #include "runtime/client-cache.h"
39 #include "runtime/descriptors.h"
41 #include "runtime/row-batch.h"
43 #include "runtime/hdfs-fs-cache.h"
44 #include "runtime/exec-env.h"
45 #include "runtime/raw-value.h"
48 #include "service/query-options.h"
50 #include "util/container-util.h"
51 #include "util/debug-util.h"
52 #include "util/impalad-metrics.h"
53 #include "util/string-parser.h"
54 #include "rpc/thrift-util.h"
55 #include "rpc/thrift-server.h"
56 #include "util/jni-util.h"
57 #include "util/webserver.h"
58 #include "gen-cpp/Types_types.h"
59 #include "gen-cpp/ImpalaService.h"
60 #include "gen-cpp/DataSinks_types.h"
61 #include "gen-cpp/Types_types.h"
62 #include "gen-cpp/ImpalaService.h"
63 #include "gen-cpp/ImpalaService_types.h"
64 #include "gen-cpp/ImpalaInternalService.h"
65 #include "gen-cpp/Frontend_types.h"
66 
67 #include "common/names.h"
68 
69 using boost::adopt_lock_t;
70 using boost::algorithm::join;
71 using namespace apache::thrift;
72 using namespace apache::hive::service::cli::thrift;
73 using namespace beeswax;
74 
75 #define RAISE_IF_ERROR(stmt, ex_type) \
76  do { \
77  Status __status__ = (stmt); \
78  if (UNLIKELY(!__status__.ok())) { \
79  RaiseBeeswaxException(__status__.GetDetail(), ex_type); \
80  } \
81  } while (false)
82 
83 namespace impala {
84 
85 // Ascii result set for Beeswax.
86 // Beeswax returns rows in ascii, using "\t" as column delimiter.
88  public:
89  // Rows are added into rowset.
90  AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset)
91  : metadata_(metadata), result_set_(rowset), owned_result_set_(NULL) {
92  }
93 
94  // Rows are added into a new rowset that is owned by this result set.
95  AsciiQueryResultSet(const TResultSetMetadata& metadata)
96  : metadata_(metadata), result_set_(new vector<string>()),
97  owned_result_set_(result_set_) {
98  }
99 
100  virtual ~AsciiQueryResultSet() { }
101 
102  // Convert expr values (col_values) to ASCII using "\t" as column delimiter and store
103  // it in this result set.
104  // TODO: Handle complex types.
105  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) {
106  int num_col = col_values.size();
107  DCHECK_EQ(num_col, metadata_.columns.size());
108  stringstream out_stream;
109  out_stream.precision(ASCII_PRECISION);
110  for (int i = 0; i < num_col; ++i) {
111  // ODBC-187 - ODBC can only take "\t" as the delimiter
112  out_stream << (i > 0 ? "\t" : "");
113  DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
114  RawValue::PrintValue(col_values[i],
115  ColumnType(metadata_.columns[i].columnType),
116  scales[i], &out_stream);
117  }
118  result_set_->push_back(out_stream.str());
119  return Status::OK;
120  }
121 
122  // Convert TResultRow to ASCII using "\t" as column delimiter and store it in this
123  // result set.
124  virtual Status AddOneRow(const TResultRow& row) {
125  int num_col = row.colVals.size();
126  DCHECK_EQ(num_col, metadata_.columns.size());
127  stringstream out_stream;
128  out_stream.precision(ASCII_PRECISION);
129  for (int i = 0; i < num_col; ++i) {
130  // ODBC-187 - ODBC can only take "\t" as the delimiter
131  out_stream << (i > 0 ? "\t" : "");
132  out_stream << row.colVals[i];
133  }
134  result_set_->push_back(out_stream.str());
135  return Status::OK;
136  }
137 
138  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) {
139  const AsciiQueryResultSet* o = static_cast<const AsciiQueryResultSet*>(other);
140  if (start_idx >= o->result_set_->size()) return 0;
141  const int rows_added =
142  min(static_cast<size_t>(num_rows), o->result_set_->size() - start_idx);
143  result_set_->insert(result_set_->end(), o->result_set_->begin() + start_idx,
144  o->result_set_->begin() + start_idx + rows_added);
145  return rows_added;
146  }
147 
148  virtual int64_t ByteSize(int start_idx, int num_rows) {
149  int64_t bytes = 0;
150  const int end = min(static_cast<size_t>(num_rows), result_set_->size() - start_idx);
151  for (int i = start_idx; i < start_idx + end; ++i) {
152  bytes += sizeof(result_set_[i]) + result_set_[i].capacity();
153  }
154  return bytes;
155  }
156 
157  virtual size_t size() { return result_set_->size(); }
158 
159  private:
160  // Metadata of the result set
161  const TResultSetMetadata& metadata_;
162 
163  // Points to the result set to be filled. The result set this points to may be owned by
164  // this object, in which case owned_result_set_ is set.
165  vector<string>* result_set_;
166 
167  // Set to result_set_ if result_set_ is owned.
168  scoped_ptr<vector<string> > owned_result_set_;
169 };
170 
171 void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
172  VLOG_QUERY << "query(): query=" << query.query;
173  ScopedSessionState session_handle(this);
174  shared_ptr<SessionState> session;
176  session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
177  SQLSTATE_GENERAL_ERROR);
178  TQueryCtx query_ctx;
179  // raise general error for request conversion error;
180  RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
181 
182  // raise Syntax error or access violation; it's likely to be syntax/analysis error
183  // TODO: that may not be true; fix this
184  shared_ptr<QueryExecState> exec_state;
185  RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
186  SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
187 
188  exec_state->UpdateQueryState(QueryState::RUNNING);
189  // start thread to wait for results to become available, which will allow
190  // us to advance query state to FINISHED or EXCEPTION
191  exec_state->WaitAsync();
192  // Once the query is running do a final check for session closure and add it to the
193  // set of in-flight queries.
194  Status status = SetQueryInflight(session, exec_state);
195  if (!status.ok()) {
196  UnregisterQuery(exec_state->query_id(), false, &status);
197  RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
198  }
199  TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
200 }
201 
202 void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
203  const LogContextId& client_ctx) {
204  VLOG_QUERY << "executeAndWait(): query=" << query.query;
205  ScopedSessionState session_handle(this);
206  shared_ptr<SessionState> session;
208  session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
209  SQLSTATE_GENERAL_ERROR);
210  TQueryCtx query_ctx;
211  // raise general error for request conversion error;
212  RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
213 
214  shared_ptr<QueryExecState> exec_state;
215  DCHECK(session != NULL); // The session should exist.
216  {
217  // The session is created when the client connects. Depending on the underlying
218  // transport, the username may be known at that time. If the username hasn't been set
219  // yet, set it now.
220  lock_guard<mutex> l(session->lock);
221  if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
222  }
223 
224  // raise Syntax error or access violation; it's likely to be syntax/analysis error
225  // TODO: that may not be true; fix this
226  RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
227  SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
228 
229  exec_state->UpdateQueryState(QueryState::RUNNING);
230  // Once the query is running do a final check for session closure and add it to the
231  // set of in-flight queries.
232  Status status = SetQueryInflight(session, exec_state);
233  if (!status.ok()) {
234  UnregisterQuery(exec_state->query_id(), false, &status);
235  RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
236  }
237  // block until results are ready
238  exec_state->Wait();
239  status = exec_state->query_status();
240  if (!status.ok()) {
241  UnregisterQuery(exec_state->query_id(), false, &status);
242  RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
243  }
244 
245  exec_state->UpdateQueryState(QueryState::FINISHED);
246  TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
247 
248  // If the input log context id is an empty string, then create a new number and
249  // set it to _return. Otherwise, set _return with the input log context
250  query_handle.log_context = client_ctx.empty() ? query_handle.id : client_ctx;
251 }
252 
253 void ImpalaServer::explain(QueryExplanation& query_explanation, const Query& query) {
254  // Translate Beeswax Query to Impala's QueryRequest and then set the explain plan bool
255  // before shipping to FE
256  VLOG_QUERY << "explain(): query=" << query.query;
257  ScopedSessionState session_handle(this);
258  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
259  SQLSTATE_GENERAL_ERROR);
260 
261  TQueryCtx query_ctx;
262  RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
263 
265  exec_env_->frontend()->GetExplainPlan(query_ctx, &query_explanation.textual),
266  SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
267  query_explanation.__isset.textual = true;
268  VLOG_QUERY << "explain():\nstmt=" << query_ctx.request.stmt
269  << "\nplan: " << query_explanation.textual;
270 }
271 
272 void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle,
273  const bool start_over, const int32_t fetch_size) {
274  ScopedSessionState session_handle(this);
275  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
276  SQLSTATE_GENERAL_ERROR);
277 
278  if (start_over) {
279  // We can't start over. Raise "Optional feature not implemented"
280  RaiseBeeswaxException(
281  "Does not support start over", SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED);
282  }
283 
284  TUniqueId query_id;
285  QueryHandleToTUniqueId(query_handle, &query_id);
286  VLOG_ROW << "fetch(): query_id=" << PrintId(query_id) << " fetch_size=" << fetch_size;
287 
288  Status status = FetchInternal(query_id, start_over, fetch_size, &query_results);
289  VLOG_ROW << "fetch result: #results=" << query_results.data.size()
290  << " has_more=" << (query_results.has_more ? "true" : "false");
291  if (!status.ok()) {
292  UnregisterQuery(query_id, false, &status);
293  RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
294  }
295 }
296 
297 // TODO: Handle complex types.
298 void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
299  const QueryHandle& handle) {
300  ScopedSessionState session_handle(this);
301  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
302  SQLSTATE_GENERAL_ERROR);
303 
304  // Convert QueryHandle to TUniqueId and get the query exec state.
305  TUniqueId query_id;
306  QueryHandleToTUniqueId(handle, &query_id);
307  VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id);
308  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
309  if (exec_state.get() == NULL) {
310  RaiseBeeswaxException("Invalid query handle", SQLSTATE_GENERAL_ERROR);
311  }
312 
313  {
314  // make sure we release the lock on exec_state if we see any error
315  lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
316 
317  // Convert TResultSetMetadata to Beeswax.ResultsMetadata
318  const TResultSetMetadata* result_set_md = exec_state->result_metadata();
319  results_metadata.__isset.schema = true;
320  results_metadata.schema.__isset.fieldSchemas = true;
321  results_metadata.schema.fieldSchemas.resize(result_set_md->columns.size());
322  for (int i = 0; i < results_metadata.schema.fieldSchemas.size(); ++i) {
323  const TColumnType& type = result_set_md->columns[i].columnType;
324  DCHECK_EQ(1, type.types.size());
325  DCHECK_EQ(TTypeNodeType::SCALAR, type.types[0].type);
326  DCHECK(type.types[0].__isset.scalar_type);
327  TPrimitiveType::type col_type = type.types[0].scalar_type.type;
328  results_metadata.schema.fieldSchemas[i].__set_type(
329  TypeToOdbcString(ThriftToType(col_type)));
330 
331  // Fill column name
332  results_metadata.schema.fieldSchemas[i].__set_name(
333  result_set_md->columns[i].columnName);
334  }
335  }
336 
337  // ODBC-187 - ODBC can only take "\t" as the delimiter and ignores whatever is set here.
338  results_metadata.__set_delim("\t");
339 
340  // results_metadata.table_dir and in_tablename are not applicable.
341 }
342 
343 void ImpalaServer::close(const QueryHandle& handle) {
344  ScopedSessionState session_handle(this);
345  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
346  SQLSTATE_GENERAL_ERROR);
347  TUniqueId query_id;
348  QueryHandleToTUniqueId(handle, &query_id);
349  VLOG_QUERY << "close(): query_id=" << PrintId(query_id);
350  // TODO: do we need to raise an exception if the query state is EXCEPTION?
351  // TODO: use timeout to get rid of unwanted exec_state.
352  RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR);
353 }
354 
355 QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
356  ScopedSessionState session_handle(this);
357  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
358  SQLSTATE_GENERAL_ERROR);
359  TUniqueId query_id;
360  QueryHandleToTUniqueId(handle, &query_id);
361  VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
362 
363  lock_guard<mutex> l(query_exec_state_map_lock_);
364  QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
365  if (entry != query_exec_state_map_.end()) {
366  return entry->second->query_state();
367  } else {
368  VLOG_QUERY << "ImpalaServer::get_state invalid handle";
369  RaiseBeeswaxException("Invalid query handle", SQLSTATE_GENERAL_ERROR);
370  }
371  // dummy to keep compiler happy
372  return QueryState::FINISHED;
373 }
374 
375 void ImpalaServer::echo(string& echo_string, const string& input_string) {
376  ScopedSessionState session_handle(this);
377  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
378  SQLSTATE_GENERAL_ERROR);
379  echo_string = input_string;
380 }
381 
382 void ImpalaServer::clean(const LogContextId& log_context) {
383 }
384 
385 void ImpalaServer::get_log(string& log, const LogContextId& context) {
386  ScopedSessionState session_handle(this);
387  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
388  SQLSTATE_GENERAL_ERROR);
389  // LogContextId is the same as QueryHandle.id
390  QueryHandle handle;
391  handle.__set_id(context);
392  TUniqueId query_id;
393  QueryHandleToTUniqueId(handle, &query_id);
394 
395  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
396  if (exec_state.get() == NULL) {
397  stringstream str;
398  str << "unknown query id: " << query_id;
399  LOG(ERROR) << str.str();
400  return;
401  }
402  stringstream error_log_ss;
403  // If the query status is !ok, include the status error message at the top of the log.
404  if (!exec_state->query_status().ok()) {
405  error_log_ss << exec_state->query_status().GetDetail() << "\n";
406  }
407 
408  // Add warnings from analysis
409  error_log_ss << join(exec_state->GetAnalysisWarnings(), "\n");
410 
411  // Add warnings from execution
412  if (exec_state->coord() != NULL) {
413  if (!exec_state->query_status().ok()) error_log_ss << "\n\n";
414  error_log_ss << exec_state->coord()->GetErrorLog();
415  }
416  log = error_log_ss.str();
417 }
418 
419 void ImpalaServer::get_default_configuration(vector<ConfigVariable> &configurations,
420  const bool include_hadoop) {
421  ScopedSessionState session_handle(this);
422  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
423  SQLSTATE_GENERAL_ERROR);
424  configurations.insert(configurations.end(), default_configs_.begin(),
425  default_configs_.end());
426 }
427 
428 void ImpalaServer::dump_config(string& config) {
429  ScopedSessionState session_handle(this);
430  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
431  SQLSTATE_GENERAL_ERROR);
432  config = "";
433 }
434 
435 void ImpalaServer::Cancel(impala::TStatus& tstatus,
436  const beeswax::QueryHandle& query_handle) {
437  ScopedSessionState session_handle(this);
438  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
439  SQLSTATE_GENERAL_ERROR);
440  // Convert QueryHandle to TUniqueId and get the query exec state.
441  TUniqueId query_id;
442  QueryHandleToTUniqueId(query_handle, &query_id);
443  RAISE_IF_ERROR(CancelInternal(query_id, true), SQLSTATE_GENERAL_ERROR);
444  tstatus.status_code = TErrorCode::OK;
445 }
446 
447 void ImpalaServer::CloseInsert(TInsertResult& insert_result,
448  const QueryHandle& query_handle) {
449  ScopedSessionState session_handle(this);
450  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
451  SQLSTATE_GENERAL_ERROR);
452  TUniqueId query_id;
453  QueryHandleToTUniqueId(query_handle, &query_id);
454  VLOG_QUERY << "CloseInsert(): query_id=" << PrintId(query_id);
455 
456  Status status = CloseInsertInternal(query_id, &insert_result);
457  if (!status.ok()) {
458  RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
459  }
460 }
461 
462 // Gets the runtime profile string for the given query handle and stores the result in
463 // the profile_output parameter. Raises a BeeswaxException if there are any errors
464 // getting the profile, such as no matching queries found.
465 void ImpalaServer::GetRuntimeProfile(string& profile_output, const QueryHandle& handle) {
466  ScopedSessionState session_handle(this);
467  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
468  SQLSTATE_GENERAL_ERROR);
469 
470  TUniqueId query_id;
471  QueryHandleToTUniqueId(handle, &query_id);
472  VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
473  stringstream ss;
474  Status status = GetRuntimeProfileStr(query_id, false, &ss);
475  if (!status.ok()) {
476  ss << "GetRuntimeProfile error: " << status.GetDetail();
477  RaiseBeeswaxException(ss.str(), SQLSTATE_GENERAL_ERROR);
478  }
479  profile_output = ss.str();
480 }
481 
482 void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
483  const beeswax::QueryHandle& handle) {
484  ScopedSessionState session_handle(this);
485  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
486  SQLSTATE_GENERAL_ERROR);
487  TUniqueId query_id;
488  QueryHandleToTUniqueId(handle, &query_id);
489  VLOG_RPC << "GetExecSummary(): query_id=" << PrintId(query_id);
490  Status status = GetExecSummary(query_id, &result);
491  if (!status.ok()) RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
492 }
493 
494 void ImpalaServer::PingImpalaService(TPingImpalaServiceResp& return_val) {
495  ScopedSessionState session_handle(this);
496  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
497  SQLSTATE_GENERAL_ERROR);
498 
499  VLOG_RPC << "PingImpalaService()";
500  return_val.version = GetVersionString(true);
501  VLOG_RPC << "PingImpalaService(): return_val=" << ThriftDebugString(return_val);
502 }
503 
504 void ImpalaServer::ResetCatalog(impala::TStatus& status) {
505  Status::DEPRECATED_RPC.ToThrift(&status);
506 }
507 
508 void ImpalaServer::ResetTable(impala::TStatus& status, const TResetTableReq& request) {
509  Status::DEPRECATED_RPC.ToThrift(&status);
510 }
511 
512 Status ImpalaServer::QueryToTQueryContext(const Query& query,
513  TQueryCtx* query_ctx) {
514  query_ctx->request.stmt = query.query;
515  VLOG_QUERY << "query: " << ThriftDebugString(query);
516  {
517  shared_ptr<SessionState> session;
518  const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
519  RETURN_IF_ERROR(GetSessionState(session_id, &session));
520  DCHECK(session != NULL);
521  {
522  // The session is created when the client connects. Depending on the underlying
523  // transport, the username may be known at that time. If the username hasn't been
524  // set yet, set it now.
525  lock_guard<mutex> l(session->lock);
526  if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
527  query_ctx->request.query_options = session->default_query_options;
528  }
529  session->ToThrift(session_id, &query_ctx->session);
530  }
531 
532  // Override default query options with Query.Configuration
533  if (query.__isset.configuration) {
534  BOOST_FOREACH(const string& option, query.configuration) {
535  RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options));
536  }
537  VLOG_QUERY << "TClientRequest.queryOptions: "
538  << ThriftDebugString(query_ctx->request.query_options);
539  }
540 
541  return Status::OK;
542 }
543 
544 inline void ImpalaServer::TUniqueIdToQueryHandle(const TUniqueId& query_id,
545  QueryHandle* handle) {
546  string query_id_str = PrintId(query_id);
547  handle->__set_id(query_id_str);
548  handle->__set_log_context(query_id_str);
549 }
550 
551 inline void ImpalaServer::QueryHandleToTUniqueId(const QueryHandle& handle,
552  TUniqueId* query_id) {
553  ParseId(handle.id, query_id);
554 }
555 
556 void ImpalaServer::RaiseBeeswaxException(const string& msg, const char* sql_state) {
557  BeeswaxException exc;
558  exc.__set_message(msg);
559  exc.__set_SQLState(sql_state);
560  throw exc;
561 }
562 
563 Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
564  const bool start_over, const int32_t fetch_size, beeswax::Results* query_results) {
565  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
566  if (exec_state == NULL) return Status("Invalid query handle");
567 
568  // Make sure QueryExecState::Wait() has completed before fetching rows. Wait() ensures
569  // that rows are ready to be fetched (e.g., Wait() opens QueryExecState::output_exprs_,
570  // which are evaluated in QueryExecState::FetchRows() below).
571  exec_state->BlockOnWait();
572 
573  lock_guard<mutex> frl(*exec_state->fetch_rows_lock());
574  lock_guard<mutex> l(*exec_state->lock());
575 
576  if (exec_state->num_rows_fetched() == 0) {
577  exec_state->query_events()->MarkEvent("First row fetched");
578  exec_state->set_fetched_rows();
579  }
580 
581  // Check for cancellation or an error.
582  RETURN_IF_ERROR(exec_state->query_status());
583 
584  // ODBC-190: set Beeswax's Results.columns to work around bug ODBC-190;
585  // TODO: remove the block of code when ODBC-190 is resolved.
586  const TResultSetMetadata* result_metadata = exec_state->result_metadata();
587  query_results->columns.resize(result_metadata->columns.size());
588  for (int i = 0; i < result_metadata->columns.size(); ++i) {
589  // TODO: As of today, the ODBC driver does not support boolean and timestamp data
590  // type but it should. This is tracked by ODBC-189. We should verify that our
591  // boolean and timestamp type are correctly recognized when ODBC-189 is closed.
592  // TODO: Handle complex types.
593  const TColumnType& type = result_metadata->columns[i].columnType;
594  DCHECK_EQ(1, type.types.size());
595  DCHECK_EQ(TTypeNodeType::SCALAR, type.types[0].type);
596  DCHECK(type.types[0].__isset.scalar_type);
597  TPrimitiveType::type col_type = type.types[0].scalar_type.type;
598  query_results->columns[i] = TypeToOdbcString(ThriftToType(col_type));
599  }
600  query_results->__isset.columns = true;
601 
602  // Results are always ready because we're blocking.
603  query_results->__set_ready(true);
604  // It's likely that ODBC doesn't care about start_row, but Hue needs it. For Hue,
605  // start_row starts from zero, not one.
606  query_results->__set_start_row(exec_state->num_rows_fetched());
607 
608  Status fetch_rows_status;
609  query_results->data.clear();
610  if (!exec_state->eos()) {
611  AsciiQueryResultSet result_set(*(exec_state->result_metadata()),
612  &(query_results->data));
613  fetch_rows_status = exec_state->FetchRows(fetch_size, &result_set);
614  }
615  query_results->__set_has_more(!exec_state->eos());
616  query_results->__isset.data = true;
617 
618  return fetch_rows_status;
619 }
620 
621 Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
622  TInsertResult* insert_result) {
623  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
624  if (exec_state == NULL) return Status("Invalid query handle");
625  Status query_status;
626  {
627  lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
628  query_status = exec_state->query_status();
629  if (query_status.ok()) {
630  // Coord may be NULL for a SELECT with LIMIT 0.
631  // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might
632  // need to revisit this, since that might lead us to insert a row without a
633  // coordinator, depending on how we choose to drive the table sink.
634  if (exec_state->coord() != NULL) {
635  BOOST_FOREACH(const PartitionStatusMap::value_type& v,
636  exec_state->coord()->per_partition_status()) {
637  const pair<string, TInsertPartitionStatus> partition_status = v;
638  insert_result->rows_appended[partition_status.first] =
639  partition_status.second.num_appended_rows;
640  }
641  }
642  }
643  }
644  RETURN_IF_ERROR(UnregisterQuery(query_id, true));
645  return query_status;
646 }
647 
648 }
const std::string GetDetail() const
Definition: status.cc:184
bool ParseId(const string &s, TUniqueId *id)
Definition: debug-util.cc:112
const TUniqueId & query_id() const
Definition: coordinator.h:152
string GetVersionString(bool compact)
Returns "<program short name> version <GetBuildVersion(compact)>".
Definition: debug-util.cc:239
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
PrimitiveType ThriftToType(TPrimitiveType::type ttype)
Definition: types.cc:27
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
string TypeToOdbcString(PrimitiveType t)
Definition: types.cc:96
Frontend * frontend()
Definition: exec-env.h:91
void Cancel(const Status *cause=NULL)
Status GetExplainPlan(const TQueryCtx &query_ctx, std::string *explain_string)
Call FE to get explain plan.
Definition: frontend.cc:178
#define VLOG_QUERY
Definition: logging.h:57
Status WithSession(const TUniqueId &session_id, boost::shared_ptr< SessionState > *session=NULL)
virtual size_t size()
Returns the size of this result set in number of rows.
AsciiQueryResultSet(const TResultSetMetadata &metadata)
Status ParseQueryOptions(const std::string &options, TQueryOptions *query_options)
#define VLOG_ROW
Definition: logging.h:59
virtual int64_t ByteSize(int start_idx, int num_rows)
Returns the approximate size of the given range of rows in bytes.
virtual Status AddOneRow(const TResultRow &row)
AsciiQueryResultSet(const TResultSetMetadata &metadata, vector< string > *rowset)
#define VLOG_RPC
Definition: logging.h:56
ExecEnv * exec_env_
Definition: coordinator.h:193
scoped_ptr< vector< string > > owned_result_set_
virtual int AddRows(const QueryResultSet *other, int start_idx, int num_rows)
void CancelInternal()
Runs cancel logic. Assumes that lock_ is held.
bool ok() const
Definition: status.h:172
#define RAISE_IF_ERROR(stmt, ex_type)
virtual Status AddOneRow(const vector< void * > &col_values, const vector< int > &scales)