18 #include <boost/algorithm/string/join.hpp>
19 #include <boost/date_time/posix_time/posix_time_types.hpp>
20 #include <boost/unordered_set.hpp>
22 #include <thrift/protocol/TDebugProtocol.h>
23 #include <gtest/gtest.h>
24 #include <boost/foreach.hpp>
25 #include <boost/bind.hpp>
26 #include <boost/algorithm/string.hpp>
27 #include <google/heap-profiler.h>
28 #include <google/malloc_extension.h>
58 #include "gen-cpp/Types_types.h"
59 #include "gen-cpp/ImpalaService.h"
60 #include "gen-cpp/DataSinks_types.h"
61 #include "gen-cpp/Types_types.h"
62 #include "gen-cpp/ImpalaService.h"
63 #include "gen-cpp/ImpalaService_types.h"
64 #include "gen-cpp/ImpalaInternalService.h"
65 #include "gen-cpp/Frontend_types.h"
69 using boost::adopt_lock_t;
70 using boost::algorithm::join;
71 using namespace apache::thrift;
72 using namespace apache::hive::service::cli::thrift;
73 using namespace beeswax;
75 #define RAISE_IF_ERROR(stmt, ex_type) \
77 Status __status__ = (stmt); \
78 if (UNLIKELY(!__status__.ok())) { \
79 RaiseBeeswaxException(__status__.GetDetail(), ex_type); \
91 : metadata_(metadata), result_set_(rowset), owned_result_set_(NULL) {
96 : metadata_(metadata), result_set_(new vector<string>()),
97 owned_result_set_(result_set_) {
105 virtual Status AddOneRow(
const vector<void*>& col_values,
const vector<int>& scales) {
106 int num_col = col_values.size();
107 DCHECK_EQ(num_col, metadata_.columns.size());
108 stringstream out_stream;
109 out_stream.precision(ASCII_PRECISION);
110 for (
int i = 0; i < num_col; ++i) {
112 out_stream << (i > 0 ?
"\t" :
"");
113 DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
114 RawValue::PrintValue(col_values[i],
116 scales[i], &out_stream);
118 result_set_->push_back(out_stream.str());
125 int num_col = row.colVals.size();
126 DCHECK_EQ(num_col, metadata_.columns.size());
127 stringstream out_stream;
128 out_stream.precision(ASCII_PRECISION);
129 for (
int i = 0; i < num_col; ++i) {
131 out_stream << (i > 0 ?
"\t" :
"");
132 out_stream << row.colVals[i];
134 result_set_->push_back(out_stream.str());
141 const int rows_added =
142 min(static_cast<size_t>(num_rows), o->
result_set_->size() - start_idx);
143 result_set_->insert(result_set_->end(), o->
result_set_->begin() + start_idx,
148 virtual int64_t
ByteSize(
int start_idx,
int num_rows) {
150 const int end = min(static_cast<size_t>(num_rows), result_set_->size() - start_idx);
151 for (
int i = start_idx; i < start_idx + end; ++i) {
152 bytes +=
sizeof(result_set_[i]) + result_set_[i].capacity();
157 virtual size_t size() {
return result_set_->size(); }
171 void ImpalaServer::query(QueryHandle& query_handle,
const Query& query) {
172 VLOG_QUERY <<
"query(): query=" << query.query;
174 shared_ptr<SessionState> session;
176 session_handle.
WithSession(ThriftServer::GetThreadConnectionId(), &session),
177 SQLSTATE_GENERAL_ERROR);
180 RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
184 shared_ptr<QueryExecState> exec_state;
186 SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
188 exec_state->UpdateQueryState(QueryState::RUNNING);
191 exec_state->WaitAsync();
194 Status status = SetQueryInflight(session, exec_state);
196 UnregisterQuery(exec_state->query_id(),
false, &status);
197 RaiseBeeswaxException(status.
GetDetail(), SQLSTATE_GENERAL_ERROR);
199 TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
202 void ImpalaServer::executeAndWait(QueryHandle& query_handle,
const Query& query,
203 const LogContextId& client_ctx) {
204 VLOG_QUERY <<
"executeAndWait(): query=" << query.query;
206 shared_ptr<SessionState> session;
208 session_handle.
WithSession(ThriftServer::GetThreadConnectionId(), &session),
209 SQLSTATE_GENERAL_ERROR);
212 RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
214 shared_ptr<QueryExecState> exec_state;
215 DCHECK(session != NULL);
220 lock_guard<mutex> l(session->lock);
221 if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
227 SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
229 exec_state->UpdateQueryState(QueryState::RUNNING);
232 Status status = SetQueryInflight(session, exec_state);
234 UnregisterQuery(exec_state->query_id(),
false, &status);
235 RaiseBeeswaxException(status.
GetDetail(), SQLSTATE_GENERAL_ERROR);
239 status = exec_state->query_status();
241 UnregisterQuery(exec_state->query_id(),
false, &status);
242 RaiseBeeswaxException(status.
GetDetail(), SQLSTATE_GENERAL_ERROR);
245 exec_state->UpdateQueryState(QueryState::FINISHED);
246 TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
250 query_handle.log_context = client_ctx.empty() ? query_handle.id : client_ctx;
253 void ImpalaServer::explain(QueryExplanation& query_explanation,
const Query& query) {
256 VLOG_QUERY <<
"explain(): query=" << query.query;
259 SQLSTATE_GENERAL_ERROR);
262 RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
266 SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
267 query_explanation.__isset.textual =
true;
268 VLOG_QUERY <<
"explain():\nstmt=" << query_ctx.request.stmt
269 <<
"\nplan: " << query_explanation.textual;
272 void ImpalaServer::fetch(Results& query_results,
const QueryHandle& query_handle,
273 const bool start_over,
const int32_t fetch_size) {
276 SQLSTATE_GENERAL_ERROR);
280 RaiseBeeswaxException(
281 "Does not support start over", SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED);
285 QueryHandleToTUniqueId(query_handle, &query_id);
286 VLOG_ROW <<
"fetch(): query_id=" <<
PrintId(query_id) <<
" fetch_size=" << fetch_size;
288 Status status = FetchInternal(query_id, start_over, fetch_size, &query_results);
289 VLOG_ROW <<
"fetch result: #results=" << query_results.data.size()
290 <<
" has_more=" << (query_results.has_more ?
"true" :
"false");
292 UnregisterQuery(query_id,
false, &status);
293 RaiseBeeswaxException(status.
GetDetail(), SQLSTATE_GENERAL_ERROR);
298 void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
299 const QueryHandle& handle) {
302 SQLSTATE_GENERAL_ERROR);
306 QueryHandleToTUniqueId(handle, &query_id);
308 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
true);
309 if (exec_state.get() == NULL) {
310 RaiseBeeswaxException(
"Invalid query handle", SQLSTATE_GENERAL_ERROR);
315 lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
318 const TResultSetMetadata* result_set_md = exec_state->result_metadata();
319 results_metadata.__isset.schema =
true;
320 results_metadata.schema.__isset.fieldSchemas =
true;
321 results_metadata.schema.fieldSchemas.resize(result_set_md->columns.size());
322 for (
int i = 0; i < results_metadata.schema.fieldSchemas.size(); ++i) {
323 const TColumnType& type = result_set_md->columns[i].columnType;
324 DCHECK_EQ(1, type.types.size());
325 DCHECK_EQ(TTypeNodeType::SCALAR, type.types[0].type);
326 DCHECK(type.types[0].__isset.scalar_type);
327 TPrimitiveType::type col_type = type.types[0].scalar_type.type;
328 results_metadata.schema.fieldSchemas[i].__set_type(
332 results_metadata.schema.fieldSchemas[i].__set_name(
333 result_set_md->columns[i].columnName);
338 results_metadata.__set_delim(
"\t");
343 void ImpalaServer::close(
const QueryHandle& handle) {
346 SQLSTATE_GENERAL_ERROR);
348 QueryHandleToTUniqueId(handle, &query_id);
352 RAISE_IF_ERROR(UnregisterQuery(query_id,
true), SQLSTATE_GENERAL_ERROR);
355 QueryState::type ImpalaServer::get_state(
const QueryHandle& handle) {
358 SQLSTATE_GENERAL_ERROR);
360 QueryHandleToTUniqueId(handle, &query_id);
363 lock_guard<mutex> l(query_exec_state_map_lock_);
364 QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
365 if (entry != query_exec_state_map_.end()) {
366 return entry->second->query_state();
368 VLOG_QUERY <<
"ImpalaServer::get_state invalid handle";
369 RaiseBeeswaxException(
"Invalid query handle", SQLSTATE_GENERAL_ERROR);
372 return QueryState::FINISHED;
375 void ImpalaServer::echo(
string& echo_string,
const string& input_string) {
378 SQLSTATE_GENERAL_ERROR);
379 echo_string = input_string;
382 void ImpalaServer::clean(
const LogContextId& log_context) {
385 void ImpalaServer::get_log(
string& log,
const LogContextId& context) {
388 SQLSTATE_GENERAL_ERROR);
391 handle.__set_id(context);
393 QueryHandleToTUniqueId(handle, &query_id);
395 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
false);
396 if (exec_state.get() == NULL) {
398 str <<
"unknown query id: " <<
query_id;
399 LOG(ERROR) << str.str();
402 stringstream error_log_ss;
404 if (!exec_state->query_status().ok()) {
405 error_log_ss << exec_state->query_status().GetDetail() <<
"\n";
409 error_log_ss << join(exec_state->GetAnalysisWarnings(),
"\n");
412 if (exec_state->coord() != NULL) {
413 if (!exec_state->query_status().ok()) error_log_ss <<
"\n\n";
414 error_log_ss << exec_state->coord()->GetErrorLog();
416 log = error_log_ss.str();
419 void ImpalaServer::get_default_configuration(vector<ConfigVariable> &configurations,
420 const bool include_hadoop) {
423 SQLSTATE_GENERAL_ERROR);
424 configurations.insert(configurations.end(), default_configs_.begin(),
425 default_configs_.end());
428 void ImpalaServer::dump_config(
string& config) {
431 SQLSTATE_GENERAL_ERROR);
436 const beeswax::QueryHandle& query_handle) {
439 SQLSTATE_GENERAL_ERROR);
442 QueryHandleToTUniqueId(query_handle, &query_id);
447 void ImpalaServer::CloseInsert(TInsertResult& insert_result,
448 const QueryHandle& query_handle) {
451 SQLSTATE_GENERAL_ERROR);
453 QueryHandleToTUniqueId(query_handle, &query_id);
456 Status status = CloseInsertInternal(query_id, &insert_result);
458 RaiseBeeswaxException(status.
GetDetail(), SQLSTATE_GENERAL_ERROR);
465 void ImpalaServer::GetRuntimeProfile(
string& profile_output,
const QueryHandle& handle) {
466 ScopedSessionState session_handle(
this);
467 RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
468 SQLSTATE_GENERAL_ERROR);
471 QueryHandleToTUniqueId(handle, &query_id);
474 Status status = GetRuntimeProfileStr(query_id,
false, &ss);
476 ss <<
"GetRuntimeProfile error: " << status.
GetDetail();
477 RaiseBeeswaxException(ss.str(), SQLSTATE_GENERAL_ERROR);
479 profile_output = ss.str();
482 void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
483 const beeswax::QueryHandle& handle) {
486 SQLSTATE_GENERAL_ERROR);
488 QueryHandleToTUniqueId(handle, &query_id);
490 Status status = GetExecSummary(query_id, &result);
491 if (!status.
ok()) RaiseBeeswaxException(status.
GetDetail(), SQLSTATE_GENERAL_ERROR);
494 void ImpalaServer::PingImpalaService(TPingImpalaServiceResp& return_val) {
497 SQLSTATE_GENERAL_ERROR);
501 VLOG_RPC <<
"PingImpalaService(): return_val=" << ThriftDebugString(return_val);
504 void ImpalaServer::ResetCatalog(impala::TStatus& status) {
505 Status::DEPRECATED_RPC.ToThrift(&status);
508 void ImpalaServer::ResetTable(impala::TStatus& status,
const TResetTableReq& request) {
509 Status::DEPRECATED_RPC.ToThrift(&status);
512 Status ImpalaServer::QueryToTQueryContext(
const Query& query,
513 TQueryCtx* query_ctx) {
514 query_ctx->request.stmt = query.query;
515 VLOG_QUERY <<
"query: " << ThriftDebugString(query);
517 shared_ptr<SessionState> session;
518 const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
520 DCHECK(session != NULL);
525 lock_guard<mutex> l(session->lock);
526 if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
527 query_ctx->request.query_options = session->default_query_options;
529 session->ToThrift(session_id, &query_ctx->session);
533 if (query.__isset.configuration) {
534 BOOST_FOREACH(
const string& option, query.configuration) {
538 << ThriftDebugString(query_ctx->request.query_options);
544 inline void ImpalaServer::TUniqueIdToQueryHandle(
const TUniqueId& query_id,
545 QueryHandle* handle) {
546 string query_id_str =
PrintId(query_id);
547 handle->__set_id(query_id_str);
548 handle->__set_log_context(query_id_str);
551 inline void ImpalaServer::QueryHandleToTUniqueId(
const QueryHandle& handle,
552 TUniqueId* query_id) {
556 void ImpalaServer::RaiseBeeswaxException(
const string& msg,
const char* sql_state) {
557 BeeswaxException exc;
558 exc.__set_message(msg);
559 exc.__set_SQLState(sql_state);
563 Status ImpalaServer::FetchInternal(
const TUniqueId& query_id,
564 const bool start_over,
const int32_t fetch_size, beeswax::Results* query_results) {
565 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
false);
566 if (exec_state == NULL)
return Status(
"Invalid query handle");
571 exec_state->BlockOnWait();
573 lock_guard<mutex> frl(*exec_state->fetch_rows_lock());
574 lock_guard<mutex> l(*exec_state->lock());
576 if (exec_state->num_rows_fetched() == 0) {
577 exec_state->query_events()->MarkEvent(
"First row fetched");
578 exec_state->set_fetched_rows();
586 const TResultSetMetadata* result_metadata = exec_state->result_metadata();
587 query_results->columns.resize(result_metadata->columns.size());
588 for (
int i = 0; i < result_metadata->columns.size(); ++i) {
593 const TColumnType& type = result_metadata->columns[i].columnType;
594 DCHECK_EQ(1, type.types.size());
595 DCHECK_EQ(TTypeNodeType::SCALAR, type.types[0].type);
596 DCHECK(type.types[0].__isset.scalar_type);
597 TPrimitiveType::type col_type = type.types[0].scalar_type.type;
600 query_results->__isset.columns =
true;
603 query_results->__set_ready(
true);
606 query_results->__set_start_row(exec_state->num_rows_fetched());
609 query_results->data.clear();
610 if (!exec_state->eos()) {
612 &(query_results->data));
613 fetch_rows_status = exec_state->FetchRows(fetch_size, &result_set);
615 query_results->__set_has_more(!exec_state->eos());
616 query_results->__isset.data =
true;
618 return fetch_rows_status;
621 Status ImpalaServer::CloseInsertInternal(
const TUniqueId& query_id,
622 TInsertResult* insert_result) {
623 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
true);
624 if (exec_state == NULL)
return Status(
"Invalid query handle");
627 lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
628 query_status = exec_state->query_status();
629 if (query_status.ok()) {
634 if (exec_state->coord() != NULL) {
635 BOOST_FOREACH(
const PartitionStatusMap::value_type& v,
636 exec_state->coord()->per_partition_status()) {
637 const pair<string, TInsertPartitionStatus> partition_status = v;
638 insert_result->rows_appended[partition_status.first] =
639 partition_status.second.num_appended_rows;
const std::string GetDetail() const
bool ParseId(const string &s, TUniqueId *id)
const TUniqueId & query_id() const
string GetVersionString(bool compact)
Returns "<program short name> version <GetBuildVersion(compact)>".
#define RETURN_IF_ERROR(stmt)
some generally useful macros
PrimitiveType ThriftToType(TPrimitiveType::type ttype)
string PrintId(const TUniqueId &id, const string &separator)
string TypeToOdbcString(PrimitiveType t)
void Cancel(const Status *cause=NULL)
Status GetExplainPlan(const TQueryCtx &query_ctx, std::string *explain_string)
Call FE to get explain plan.
Status WithSession(const TUniqueId &session_id, boost::shared_ptr< SessionState > *session=NULL)
virtual size_t size()
Returns the size of this result set in number of rows.
AsciiQueryResultSet(const TResultSetMetadata &metadata)
Status ParseQueryOptions(const std::string &options, TQueryOptions *query_options)
virtual int64_t ByteSize(int start_idx, int num_rows)
Returns the approximate size of the given range of rows in bytes.
virtual Status AddOneRow(const TResultRow &row)
AsciiQueryResultSet(const TResultSetMetadata &metadata, vector< string > *rowset)
virtual ~AsciiQueryResultSet()
scoped_ptr< vector< string > > owned_result_set_
virtual int AddRows(const QueryResultSet *other, int start_idx, int num_rows)
void CancelInternal()
Runs cancel logic. Assumes that lock_ is held.
const TResultSetMetadata & metadata_
vector< string > * result_set_
#define RAISE_IF_ERROR(stmt, ex_type)
virtual Status AddOneRow(const vector< void * > &col_values, const vector< int > &scales)