19 #include <boost/algorithm/string/join.hpp>
20 #include <boost/date_time/posix_time/posix_time_types.hpp>
21 #include <boost/unordered_set.hpp>
23 #include <thrift/protocol/TDebugProtocol.h>
24 #include <gtest/gtest.h>
25 #include <boost/foreach.hpp>
26 #include <boost/bind.hpp>
27 #include <boost/algorithm/string.hpp>
28 #include <google/heap-profiler.h>
29 #include <google/malloc_extension.h>
30 #include <gutil/strings/substitute.h>
45 using boost::adopt_lock_t;
46 using boost::algorithm::join;
47 using boost::uuids::uuid;
48 using namespace apache::hive::service::cli::thrift;
49 using namespace apache::hive::service::cli;
50 using namespace apache::thrift;
51 using namespace beeswax;
52 using namespace strings;
55 TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6;
58 #define HS2_RETURN_ERROR(return_val, error_msg, error_state) \
60 return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); \
61 return_val.status.__set_errorMessage(error_msg); \
62 return_val.status.__set_sqlState(error_state); \
66 #define HS2_RETURN_IF_ERROR(return_val, status, error_state) \
68 if (UNLIKELY(!status.ok())) { \
69 HS2_RETURN_ERROR(return_val, status.GetDetail(), error_state); \
80 int64_t
ByteSize(
const thrift::TColumnValue& val) {
81 return sizeof(val) + val.stringVal.value.capacity();
84 static int64_t
ByteSize(
const thrift::TRow& row) {
85 int64_t bytes =
sizeof(row);
86 BOOST_FOREACH(
const thrift::TColumnValue& c, row.colVals) {
96 DCHECK_LE(start_idx, end_idx);
97 uint32_t num_rows = end_idx - start_idx;
98 if (num_rows == 0)
return 0L;
100 if (col.__isset.boolVal)
return (num_rows *
sizeof(
bool)) + col.boolVal.nulls.size();
101 if (col.__isset.byteVal)
return num_rows + col.byteVal.nulls.size();
102 if (col.__isset.i16Val)
return (num_rows *
sizeof(int16_t)) + col.i16Val.nulls.size();
103 if (col.__isset.i32Val)
return (num_rows *
sizeof(int32_t)) + col.i32Val.nulls.size();
104 if (col.__isset.i64Val)
return (num_rows *
sizeof(int64_t)) + col.i64Val.nulls.size();
105 if (col.__isset.doubleVal) {
106 return (num_rows *
sizeof(
double)) + col.doubleVal.nulls.size();
108 if (col.__isset.stringVal) {
110 for (
int i = start_idx; i < end_idx; ++i) bytes += col.stringVal.values[i].size();
111 return bytes + col.stringVal.nulls.size();
119 const beeswax::QueryState::type& query_state);
126 : metadata_(metadata), result_set_(rowset), num_rows_(0) {
127 if (rowset == NULL) {
128 owned_result_set_.reset(
new TRowSet());
129 result_set_ = owned_result_set_.get();
137 virtual Status AddOneRow(
const vector<void*>& col_values,
const vector<int>& scales) {
138 int num_col = col_values.size();
139 DCHECK_EQ(num_col, metadata_.columns.size());
140 for (
int i = 0; i < num_col; ++i) {
142 &(result_set_->columns[i]));
150 int num_col = row.colVals.size();
151 DCHECK_EQ(num_col, metadata_.columns.size());
152 for (
int i = 0; i < num_col; ++i) {
154 &(result_set_->columns[i]));
164 DCHECK_EQ(metadata_.columns.size(), o->
metadata_.columns.size());
166 const int rows_added =
167 min(static_cast<long>(num_rows), o->
num_rows_ - start_idx);
168 for (
int j = 0; j < metadata_.columns.size(); ++j) {
169 thrift::TColumn* from = &o->
result_set_->columns[j];
170 thrift::TColumn* to = &result_set_->columns[j];
171 switch (metadata_.columns[j].columnType.types[0].scalar_type.type) {
172 case TPrimitiveType::NULL_TYPE:
173 case TPrimitiveType::BOOLEAN:
174 StitchNulls(num_rows_, rows_added, start_idx, from->boolVal.nulls,
175 &(to->boolVal.nulls));
176 to->boolVal.values.insert(
177 to->boolVal.values.end(),
178 from->boolVal.values.begin() + start_idx,
179 from->boolVal.values.begin() + start_idx + rows_added);
181 case TPrimitiveType::TINYINT:
182 StitchNulls(num_rows_, rows_added, start_idx, from->byteVal.nulls,
183 &(to->byteVal.nulls));
184 to->byteVal.values.insert(
185 to->byteVal.values.end(),
186 from->byteVal.values.begin() + start_idx,
187 from->byteVal.values.begin() + start_idx + rows_added);
189 case TPrimitiveType::SMALLINT:
190 StitchNulls(num_rows_, rows_added, start_idx, from->i16Val.nulls,
191 &(to->i16Val.nulls));
192 to->i16Val.values.insert(
193 to->i16Val.values.end(),
194 from->i16Val.values.begin() + start_idx,
195 from->i16Val.values.begin() + start_idx + rows_added);
197 case TPrimitiveType::INT:
198 StitchNulls(num_rows_, rows_added, start_idx, from->i32Val.nulls,
199 &(to->i32Val.nulls));
200 to->i32Val.values.insert(
201 to->i32Val.values.end(),
202 from->i32Val.values.begin() + start_idx,
203 from->i32Val.values.begin() + start_idx + rows_added);
205 case TPrimitiveType::BIGINT:
206 StitchNulls(num_rows_, rows_added, start_idx, from->i64Val.nulls,
207 &(to->i64Val.nulls));
208 to->i64Val.values.insert(
209 to->i64Val.values.end(),
210 from->i64Val.values.begin() + start_idx,
211 from->i64Val.values.begin() + start_idx + rows_added);
213 case TPrimitiveType::FLOAT:
214 case TPrimitiveType::DOUBLE:
215 StitchNulls(num_rows_, rows_added, start_idx, from->doubleVal.nulls,
216 &(to->doubleVal.nulls));
217 to->doubleVal.values.insert(
218 to->doubleVal.values.end(),
219 from->doubleVal.values.begin() + start_idx,
220 from->doubleVal.values.begin() + start_idx + rows_added);
222 case TPrimitiveType::TIMESTAMP:
223 case TPrimitiveType::DECIMAL:
224 case TPrimitiveType::STRING:
225 case TPrimitiveType::VARCHAR:
226 case TPrimitiveType::CHAR:
227 StitchNulls(num_rows_, rows_added, start_idx, from->stringVal.nulls,
228 &(to->stringVal.nulls));
229 to->stringVal.values.insert(to->stringVal.values.end(),
230 from->stringVal.values.begin() + start_idx,
231 from->stringVal.values.begin() + start_idx + rows_added);
235 metadata_.columns[j].columnType.types[0].scalar_type.type));
239 num_rows_ += rows_added;
243 virtual int64_t
ByteSize(
int start_idx,
int num_rows) {
244 const int end = min(start_idx + num_rows, (
int)size());
246 BOOST_FOREACH(
const thrift::TColumn& c, result_set_->columns) {
252 virtual size_t size() {
return num_rows_; }
268 result_set_->__isset.columns =
true;
269 BOOST_FOREACH(
const TColumn& col, metadata_.columns) {
270 DCHECK(col.columnType.types.size() == 1) <<
271 "Structured columns unsupported in HS2 interface";
272 thrift::TColumn column;
273 switch (col.columnType.types[0].scalar_type.type) {
274 case TPrimitiveType::NULL_TYPE:
275 case TPrimitiveType::BOOLEAN:
276 column.__isset.boolVal =
true;
278 case TPrimitiveType::TINYINT:
279 column.__isset.byteVal =
true;
281 case TPrimitiveType::SMALLINT:
282 column.__isset.i16Val =
true;
284 case TPrimitiveType::INT:
285 column.__isset.i32Val =
true;
287 case TPrimitiveType::BIGINT:
288 column.__isset.i64Val =
true;
290 case TPrimitiveType::FLOAT:
291 case TPrimitiveType::DOUBLE:
292 column.__isset.doubleVal =
true;
294 case TPrimitiveType::TIMESTAMP:
295 case TPrimitiveType::DECIMAL:
296 case TPrimitiveType::VARCHAR:
297 case TPrimitiveType::CHAR:
298 case TPrimitiveType::STRING:
299 column.__isset.stringVal =
true;
302 DCHECK(
false) <<
"Unhandled column type: "
304 ThriftToType(col.columnType.types[0].scalar_type.type));
306 result_set_->columns.push_back(column);
316 : metadata_(metadata), result_set_(rowset) {
317 if (rowset == NULL) {
318 owned_result_set_.reset(
new TRowSet());
319 result_set_ = owned_result_set_.get();
326 virtual Status AddOneRow(
const vector<void*>& col_values,
const vector<int>& scales) {
327 int num_col = col_values.size();
328 DCHECK_EQ(num_col, metadata_.columns.size());
329 result_set_->rows.push_back(TRow());
330 TRow& trow = result_set_->rows.back();
331 trow.colVals.resize(num_col);
332 for (
int i = 0; i < num_col; ++i) {
334 metadata_.columns[i].columnType, &(trow.colVals[i]));
341 int num_col = row.colVals.size();
342 DCHECK_EQ(num_col, metadata_.columns.size());
343 result_set_->rows.push_back(TRow());
344 TRow& trow = result_set_->rows.back();
345 trow.colVals.resize(num_col);
346 for (
int i = 0; i < num_col; ++i) {
355 if (start_idx >= o->
result_set_->rows.size())
return 0;
356 const int rows_added =
357 min(static_cast<size_t>(num_rows), o->
result_set_->rows.size() - start_idx);
358 for (
int i = start_idx; i < start_idx + rows_added; ++i) {
359 result_set_->rows.push_back(o->
result_set_->rows[i]);
364 virtual int64_t
ByteSize(
int start_idx,
int num_rows) {
367 min(static_cast<size_t>(num_rows), result_set_->rows.size() - start_idx);
368 for (
int i = start_idx; i < start_idx + end; ++i) {
374 virtual size_t size() {
return result_set_->rows.size(); }
389 TProtocolVersion::type version,
const TResultSetMetadata& metadata,
391 if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
398 void ImpalaServer::ExecuteMetadataOp(
const THandleIdentifier& session_handle,
399 TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) {
400 TUniqueId session_id;
403 THandleIdentifierToTUniqueId(session_handle, &session_id, &secret);
404 if (!unique_id_status.
ok()) {
405 status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
406 status->__set_errorMessage(unique_id_status.
GetDetail());
407 status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
411 shared_ptr<SessionState> session;
413 if (!get_session_status.
ok()) {
414 status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
415 status->__set_errorMessage(get_session_status.
GetDetail());
418 status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
422 if (session == NULL) {
423 status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
424 status->__set_errorMessage(
"Invalid session ID");
425 status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
429 PrepareQueryContext(&query_ctx);
430 session->ToThrift(session_id, &query_ctx.session);
431 request->__set_session(query_ctx.session);
433 shared_ptr<QueryExecState> exec_state;
436 map<int, const char*>::const_iterator query_text_it =
437 _TMetadataOpcode_VALUES_TO_NAMES.find(request->opcode);
438 const string& query_text = query_text_it == _TMetadataOpcode_VALUES_TO_NAMES.end() ?
439 "N/A" : query_text_it->second;
440 query_ctx.request.stmt = query_text;
443 Status register_status = RegisterQuery(session, exec_state);
444 if (!register_status.
ok()) {
445 status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
446 status->__set_errorMessage(register_status.
GetDetail());
447 status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
451 Status exec_status = exec_state->Exec(*request);
452 if (!exec_status.
ok()) {
453 UnregisterQuery(exec_state->query_id(),
false, &exec_status);
454 status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
455 status->__set_errorMessage(exec_status.
GetDetail());
456 status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
460 exec_state->UpdateQueryState(QueryState::FINISHED);
462 Status inflight_status = SetQueryInflight(session, exec_state);
463 if (!inflight_status.
ok()) {
464 UnregisterQuery(exec_state->query_id(),
false, &inflight_status);
465 status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
466 status->__set_errorMessage(inflight_status.
GetDetail());
467 status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
470 handle->__set_hasResultSet(
true);
472 TUniqueId operation_id = exec_state->query_id();
473 TUniqueIdToTHandleIdentifier(operation_id, operation_id, &(handle->operationId));
474 status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
477 Status ImpalaServer::FetchInternal(
const TUniqueId&
query_id, int32_t fetch_size,
478 bool fetch_first, TFetchResultsResp* fetch_results) {
479 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
false);
480 if (exec_state == NULL)
return Status(
"Invalid query handle");
484 ScopedSessionState session_handle(
this);
485 const TUniqueId session_id = exec_state->session_id();
486 shared_ptr<SessionState> session;
492 exec_state->BlockOnWait();
494 lock_guard<mutex> frl(*exec_state->fetch_rows_lock());
495 lock_guard<mutex> l(*exec_state->lock());
500 if (exec_state->num_rows_fetched() == 0) {
501 exec_state->query_events()->MarkEvent(
"First row fetched");
502 exec_state->set_fetched_rows();
507 fetch_results->results.__set_startRowOffset(exec_state->num_rows_fetched());
511 bool is_child_query = exec_state->parent_query_id() != TUniqueId();
512 TProtocolVersion::type version = is_child_query ?
513 TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version;
514 scoped_ptr<QueryResultSet> result_set(CreateHS2ResultSet(version,
515 *(exec_state->result_metadata()), &(fetch_results->results)));
517 fetch_results->__isset.results =
true;
518 fetch_results->__set_hasMoreRows(!exec_state->eos());
522 Status ImpalaServer::TExecuteStatementReqToTQueryContext(
523 const TExecuteStatementReq execute_request, TQueryCtx* query_ctx) {
524 query_ctx->request.stmt = execute_request.statement;
525 VLOG_QUERY <<
"TExecuteStatementReq: " << ThriftDebugString(execute_request);
527 shared_ptr<SessionState> session_state;
528 TUniqueId session_id;
530 RETURN_IF_ERROR(THandleIdentifierToTUniqueId(execute_request.sessionHandle.sessionId,
531 &session_id, &secret));
534 session_state->ToThrift(session_id, &query_ctx->session);
535 lock_guard<mutex> l(session_state->lock);
536 query_ctx->request.query_options = session_state->default_query_options;
539 if (execute_request.__isset.confOverlay) {
540 map<string, string>::const_iterator conf_itr = execute_request.confOverlay.begin();
541 for (; conf_itr != execute_request.confOverlay.end(); ++conf_itr) {
543 if (conf_itr->first == ChildQuery::PARENT_QUERY_OPT) {
544 if (
ParseId(conf_itr->second, &query_ctx->parent_query_id)) {
545 query_ctx->__isset.parent_query_id =
true;
550 &query_ctx->request.query_options));
553 << ThriftDebugString(query_ctx->request.query_options);
559 void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
560 const TOpenSessionReq& request) {
563 VLOG_QUERY <<
"OpenSession(): username=" << request.username;
566 TUniqueId session_id;
568 lock_guard<mutex> l(uuid_lock_);
571 return_val.sessionHandle.sessionId.guid.assign(
572 session_uuid.begin(), session_uuid.end());
573 return_val.sessionHandle.sessionId.secret.assign(secret.begin(), secret.end());
574 DCHECK_EQ(return_val.sessionHandle.sessionId.guid.size(), 16);
575 DCHECK_EQ(return_val.sessionHandle.sessionId.secret.size(), 16);
576 return_val.__isset.sessionHandle =
true;
584 state->closed =
false;
585 state->start_time = TimestampValue::LocalTime();
586 state->session_type = TSessionType::HIVESERVER2;
587 state->network_address = ThriftServer::GetThreadConnectionContext()->network_address;
593 ThriftServer::GetThreadConnectionContext()->username;
594 if (!username.empty()) {
595 state->connected_user = username;
597 state->connected_user = request.username;
601 state->database =
"default";
604 state->default_query_options = default_query_options_;
605 if (request.__isset.configuration) {
606 map<string, string>::const_iterator conf_itr = request.configuration.begin();
607 for (; conf_itr != request.configuration.end(); ++conf_itr) {
611 if (conf_itr->first ==
"impala.doas.user") {
612 state->do_as_user = conf_itr->second;
613 Status status = AuthorizeProxyUser(state->connected_user, state->do_as_user);
618 SetQueryOption(conf_itr->first, conf_itr->second, &state->default_query_options);
625 lock_guard<mutex> l(session_state_map_lock_);
626 session_state_map_.insert(make_pair(session_id, state));
630 lock_guard<mutex> l(connection_to_sessions_map_lock_);
631 const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId();
632 connection_to_sessions_map_[connection_id].push_back(session_id);
635 ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(1L);
637 return_val.__isset.configuration =
true;
638 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
639 return_val.serverProtocolVersion = state->hs2_version;
642 void ImpalaServer::CloseSession(TCloseSessionResp& return_val,
643 const TCloseSessionReq& request) {
644 VLOG_QUERY <<
"CloseSession(): request=" << ThriftDebugString(request);
646 TUniqueId session_id;
649 request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
651 CloseSessionInternal(session_id,
false), SQLSTATE_GENERAL_ERROR);
652 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
655 void ImpalaServer::GetInfo(TGetInfoResp& return_val,
656 const TGetInfoReq& request) {
657 VLOG_QUERY <<
"GetInfo(): request=" << ThriftDebugString(request);
659 TUniqueId session_id;
662 request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
664 shared_ptr<SessionState> session;
666 SQLSTATE_GENERAL_ERROR);
668 switch (request.infoType) {
669 case TGetInfoType::CLI_SERVER_NAME:
670 case TGetInfoType::CLI_DBMS_NAME:
671 return_val.infoValue.__set_stringValue(
"Impala");
673 case TGetInfoType::CLI_DBMS_VER:
678 SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED);
680 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
683 void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
684 const TExecuteStatementReq& request) {
685 VLOG_QUERY <<
"ExecuteStatement(): request=" << ThriftDebugString(request);
690 Status status = TExecuteStatementReqToTQueryContext(request, &query_ctx);
693 TUniqueId session_id;
696 request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
698 shared_ptr<SessionState> session;
700 SQLSTATE_GENERAL_ERROR);
701 if (session == NULL) {
703 return_val,
Status(
"Invalid session ID"), SQLSTATE_GENERAL_ERROR);
707 int64_t cache_num_rows = -1;
708 if (request.__isset.confOverlay) {
709 map<string, string>::const_iterator iter =
711 if (iter != request.confOverlay.end()) {
713 cache_num_rows = StringParser::StringToInt<int64_t>(
714 iter->second.c_str(), iter->second.size(), &parse_result);
715 if (parse_result != StringParser::PARSE_SUCCESS) {
717 return_val,
Status(Substitute(
"Invalid value '$0' for '$1' option.",
723 shared_ptr<QueryExecState> exec_state;
724 status = Execute(&query_ctx, session, &exec_state);
728 if (cache_num_rows > 0) {
729 status = exec_state->SetResultCache(CreateHS2ResultSet(session->hs2_version,
730 *exec_state->result_metadata()), cache_num_rows);
732 UnregisterQuery(exec_state->query_id(),
false, &status);
736 exec_state->UpdateQueryState(QueryState::RUNNING);
738 exec_state->WaitAsync();
741 status = SetQueryInflight(session, exec_state);
743 UnregisterQuery(exec_state->query_id(),
false, &status);
746 return_val.__isset.operationHandle =
true;
747 return_val.operationHandle.__set_operationType(TOperationType::EXECUTE_STATEMENT);
748 return_val.operationHandle.__set_hasResultSet(exec_state->returns_result_set());
750 TUniqueIdToTHandleIdentifier(exec_state->query_id(), exec_state->query_id(),
751 &return_val.operationHandle.operationId);
752 return_val.status.__set_statusCode(
753 apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS);
755 VLOG_QUERY <<
"ExecuteStatement(): return_val=" << ThriftDebugString(return_val);
758 void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val,
759 const TGetTypeInfoReq& request) {
760 VLOG_QUERY <<
"GetTypeInfo(): request=" << ThriftDebugString(request);
762 TMetadataOpRequest req;
763 req.__set_opcode(TMetadataOpcode::GET_TYPE_INFO);
764 req.__set_get_type_info_req(request);
766 TOperationHandle handle;
767 thrift::TStatus status;
768 ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
769 handle.__set_operationType(TOperationType::GET_TYPE_INFO);
770 return_val.__set_operationHandle(handle);
771 return_val.__set_status(status);
773 VLOG_QUERY <<
"GetTypeInfo(): return_val=" << ThriftDebugString(return_val);
776 void ImpalaServer::GetCatalogs(TGetCatalogsResp& return_val,
777 const TGetCatalogsReq& request) {
778 VLOG_QUERY <<
"GetCatalogs(): request=" << ThriftDebugString(request);
780 TMetadataOpRequest req;
781 req.__set_opcode(TMetadataOpcode::GET_CATALOGS);
782 req.__set_get_catalogs_req(request);
784 TOperationHandle handle;
785 thrift::TStatus status;
786 ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
787 handle.__set_operationType(TOperationType::GET_CATALOGS);
788 return_val.__set_operationHandle(handle);
789 return_val.__set_status(status);
791 VLOG_QUERY <<
"GetCatalogs(): return_val=" << ThriftDebugString(return_val);
794 void ImpalaServer::GetSchemas(TGetSchemasResp& return_val,
795 const TGetSchemasReq& request) {
796 VLOG_QUERY <<
"GetSchemas(): request=" << ThriftDebugString(request);
798 TMetadataOpRequest req;
799 req.__set_opcode(TMetadataOpcode::GET_SCHEMAS);
800 req.__set_get_schemas_req(request);
802 TOperationHandle handle;
803 thrift::TStatus status;
804 ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
805 handle.__set_operationType(TOperationType::GET_SCHEMAS);
806 return_val.__set_operationHandle(handle);
807 return_val.__set_status(status);
809 VLOG_QUERY <<
"GetSchemas(): return_val=" << ThriftDebugString(return_val);
812 void ImpalaServer::GetTables(TGetTablesResp& return_val,
813 const TGetTablesReq& request) {
814 VLOG_QUERY <<
"GetTables(): request=" << ThriftDebugString(request);
816 TMetadataOpRequest req;
817 req.__set_opcode(TMetadataOpcode::GET_TABLES);
818 req.__set_get_tables_req(request);
820 TOperationHandle handle;
821 thrift::TStatus status;
822 ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
823 handle.__set_operationType(TOperationType::GET_TABLES);
824 return_val.__set_operationHandle(handle);
825 return_val.__set_status(status);
827 VLOG_QUERY <<
"GetTables(): return_val=" << ThriftDebugString(return_val);
830 void ImpalaServer::GetTableTypes(TGetTableTypesResp& return_val,
831 const TGetTableTypesReq& request) {
832 VLOG_QUERY <<
"GetTableTypes(): request=" << ThriftDebugString(request);
834 TMetadataOpRequest req;
835 req.__set_opcode(TMetadataOpcode::GET_TABLE_TYPES);
836 req.__set_get_table_types_req(request);
838 TOperationHandle handle;
839 thrift::TStatus status;
840 ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
841 handle.__set_operationType(TOperationType::GET_TABLE_TYPES);
842 return_val.__set_operationHandle(handle);
843 return_val.__set_status(status);
845 VLOG_QUERY <<
"GetTableTypes(): return_val=" << ThriftDebugString(return_val);
849 void ImpalaServer::GetColumns(TGetColumnsResp& return_val,
850 const TGetColumnsReq& request) {
851 VLOG_QUERY <<
"GetColumns(): request=" << ThriftDebugString(request);
853 TMetadataOpRequest req;
854 req.__set_opcode(TMetadataOpcode::GET_COLUMNS);
855 req.__set_get_columns_req(request);
857 TOperationHandle handle;
858 thrift::TStatus status;
859 ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
860 handle.__set_operationType(TOperationType::GET_COLUMNS);
861 return_val.__set_operationHandle(handle);
862 return_val.__set_status(status);
864 VLOG_QUERY <<
"GetColumns(): return_val=" << ThriftDebugString(return_val);
867 void ImpalaServer::GetFunctions(TGetFunctionsResp& return_val,
868 const TGetFunctionsReq& request) {
869 VLOG_QUERY <<
"GetFunctions(): request=" << ThriftDebugString(request);
871 TMetadataOpRequest req;
872 req.__set_opcode(TMetadataOpcode::GET_FUNCTIONS);
873 req.__set_get_functions_req(request);
875 TOperationHandle handle;
876 thrift::TStatus status;
877 ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
878 handle.__set_operationType(TOperationType::GET_FUNCTIONS);
879 return_val.__set_operationHandle(handle);
880 return_val.__set_status(status);
882 VLOG_QUERY <<
"GetFunctions(): return_val=" << ThriftDebugString(return_val);
885 void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
886 const TGetOperationStatusReq& request) {
887 if (request.operationHandle.operationId.guid.size() == 0) {
890 VLOG_ROW <<
"GetOperationStatus(): guid size 0";
891 return_val.operationState = TOperationState::FINISHED_STATE;
892 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
900 request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
903 lock_guard<mutex> l(query_exec_state_map_lock_);
904 QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
905 if (entry != query_exec_state_map_.end()) {
906 QueryState::type query_state = entry->second->query_state();
908 return_val.__set_operationState(operation_state);
913 HS2_RETURN_ERROR(return_val,
"Invalid query handle", SQLSTATE_GENERAL_ERROR);
916 void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
917 const TCancelOperationReq& request) {
921 request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
924 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
false);
925 if (exec_state.get() == NULL) {
927 HS2_RETURN_ERROR(return_val,
"Invalid query handle", SQLSTATE_GENERAL_ERROR);
930 const TUniqueId session_id = exec_state->session_id();
932 SQLSTATE_GENERAL_ERROR);
934 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
938 const TCloseOperationReq& request) {
942 request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
945 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
false);
946 if (exec_state.get() == NULL) {
948 HS2_RETURN_ERROR(return_val,
"Invalid query handle", SQLSTATE_GENERAL_ERROR);
951 const TUniqueId session_id = exec_state->session_id();
953 SQLSTATE_GENERAL_ERROR);
956 SQLSTATE_GENERAL_ERROR);
957 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
960 void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
961 const TGetResultSetMetadataReq& request) {
967 request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
972 TUniqueId session_id;
973 if (!GetSessionIdForQuery(query_id, &session_id)) {
974 HS2_RETURN_ERROR(return_val,
"Invalid query handle", SQLSTATE_GENERAL_ERROR);
978 SQLSTATE_GENERAL_ERROR);
980 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
true);
981 if (exec_state.get() == NULL) {
982 VLOG_QUERY <<
"GetResultSetMetadata(): invalid query handle";
984 HS2_RETURN_ERROR(return_val,
"Invalid query handle", SQLSTATE_GENERAL_ERROR);
988 lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
991 const TResultSetMetadata* result_set_md = exec_state->result_metadata();
992 DCHECK(result_set_md != NULL);
993 if (result_set_md->columns.size() > 0) {
994 return_val.__isset.schema =
true;
995 return_val.schema.columns.resize(result_set_md->columns.size());
996 for (
int i = 0; i < result_set_md->columns.size(); ++i) {
997 return_val.schema.columns[i].__set_columnName(
998 result_set_md->columns[i].columnName);
999 return_val.schema.columns[i].position = i;
1000 return_val.schema.columns[i].typeDesc.types.resize(1);
1001 ColumnType t(result_set_md->columns[i].columnType);
1002 return_val.schema.columns[i].typeDesc.types[0] = t.
ToHs2Type();
1007 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1008 VLOG_QUERY <<
"GetResultSetMetadata(): return_val=" << ThriftDebugString(return_val);
1011 void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
1012 const TFetchResultsReq& request) {
1013 if (request.orientation != TFetchOrientation::FETCH_NEXT
1014 && request.orientation != TFetchOrientation::FETCH_FIRST) {
1016 SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED);
1018 bool fetch_first = request.orientation == TFetchOrientation::FETCH_FIRST;
1025 request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
1027 <<
" fetch_size=" << request.maxRows;
1030 Status status = FetchInternal(query_id, request.maxRows, fetch_first, &return_val);
1031 VLOG_ROW <<
"FetchResults(): #results=" << return_val.results.rows.size()
1032 <<
" has_more=" << (return_val.hasMoreRows ?
"true" :
"false");
1040 DCHECK(fetch_first);
1042 UnregisterQuery(query_id,
false, &status);
1046 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1049 void ImpalaServer::GetLog(TGetLogResp& return_val,
const TGetLogReq& request) {
1053 request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
1055 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
false);
1056 if (exec_state.get() == NULL) {
1058 HS2_RETURN_ERROR(return_val,
"Invalid query handle", SQLSTATE_GENERAL_ERROR);
1064 const TUniqueId session_id = exec_state->session_id();
1066 SQLSTATE_GENERAL_ERROR);
1069 if (exec_state->coord() != NULL) {
1071 ss << exec_state->coord()->progress().ToString() <<
"\n";
1074 ss << join(exec_state->GetAnalysisWarnings(),
"\n");
1075 if (exec_state->coord() != NULL) {
1077 ss << exec_state->coord()->GetErrorLog();
1079 return_val.log = ss.str();
1080 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1083 void ImpalaServer::GetExecSummary(TGetExecSummaryResp& return_val,
1084 const TGetExecSummaryReq& request) {
1085 TUniqueId session_id;
1088 request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
1090 shared_ptr<SessionState> session;
1092 SQLSTATE_GENERAL_ERROR);
1096 request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
1098 TExecSummary summary;
1099 Status status = GetExecSummary(query_id, &summary);
1101 return_val.__set_summary(summary);
1102 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1105 void ImpalaServer::GetRuntimeProfile(TGetRuntimeProfileResp& return_val,
1106 const TGetRuntimeProfileReq& request) {
1107 TUniqueId session_id;
1110 request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
1112 shared_ptr<SessionState> session;
1114 SQLSTATE_GENERAL_ERROR);
1118 request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
1122 SQLSTATE_GENERAL_ERROR);
1123 return_val.__set_profile(ss.str());
1124 return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
1127 void ImpalaServer::GetDelegationToken(TGetDelegationTokenResp& return_val,
1128 const TGetDelegationTokenReq& req) {
1129 return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
1130 return_val.status.__set_errorMessage(
"Not implemented");
1133 void ImpalaServer::CancelDelegationToken(TCancelDelegationTokenResp& return_val,
1134 const TCancelDelegationTokenReq& req) {
1135 return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
1136 return_val.status.__set_errorMessage(
"Not implemented");
1139 void ImpalaServer::RenewDelegationToken(TRenewDelegationTokenResp& return_val,
1140 const TRenewDelegationTokenReq& req) {
1141 return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
1142 return_val.status.__set_errorMessage(
"Not implemented");
1146 switch (query_state) {
1147 case QueryState::CREATED:
return TOperationState::INITIALIZED_STATE;
1148 case QueryState::RUNNING:
return TOperationState::RUNNING_STATE;
1149 case QueryState::FINISHED:
return TOperationState::FINISHED_STATE;
1150 case QueryState::EXCEPTION:
return TOperationState::ERROR_STATE;
1151 default:
return TOperationState::UKNOWN_STATE;
void TColumnValueToHS2TColumnValue(const TColumnValue &col_val, const TColumnType &type, apache::hive::service::cli::thrift::TColumnValue *hs2_col_val)
For V1->V5.
virtual size_t size()
Returns the size of this result set in number of rows.
void StitchNulls(uint32_t num_rows_before, uint32_t num_rows_added, uint32_t start_idx, const std::string &from, std::string *to)
TOperationState::type QueryStateToTOperationState(const QueryState::type &query_state)
void TQueryOptionsToMap(const TQueryOptions &query_options, std::map< std::string, std::string > *configuration)
Converts a TQueryOptions struct into a map of key, value pairs.
const std::string GetDetail() const
const TProtocolVersion::type MAX_SUPPORTED_HS2_VERSION
bool ParseId(const string &s, TUniqueId *id)
static uint32_t TColumnByteSize(const thrift::TColumn &col, uint32_t start_idx, uint32_t end_idx)
const TUniqueId & query_id() const
#define RETURN_IF_ERROR(stmt)
some generally useful macros
std::string Username
Username.
PrimitiveType ThriftToType(TPrimitiveType::type ttype)
virtual Status AddOneRow(const TResultRow &row)
static boost::uuids::random_generator uuid_generator_
HS2RowOrientedResultSet(const TResultSetMetadata &metadata, TRowSet *rowset=NULL)
string PrintId(const TUniqueId &id, const string &separator)
const TResultSetMetadata & metadata_
HS2ColumnarResultSet(const TResultSetMetadata &metadata, TRowSet *rowset=NULL)
virtual Status AddOneRow(const vector< void * > &col_values, const vector< int > &scales)
string TypeToString(PrimitiveType t)
virtual Status AddOneRow(const vector< void * > &col_values, const vector< int > &scales)
void CloseOperation(apache::hive::service::cli::thrift::TFetchResultsReq &fetchResultsReq, TCLIServiceClient &impalaClient)
Method to close a fetch operation handle.
virtual size_t size()
Returns the size of this result set in number of rows.
const string IMPALA_RESULT_CACHING_OPT
static int64_t ByteSize(const thrift::TRow &row)
Status WithSession(const TUniqueId &session_id, boost::shared_ptr< SessionState > *session=NULL)
static int64_t ByteSize(const thrift::TColumnValue &val)
virtual int AddRows(const QueryResultSet *other, int start_idx, int num_rows)
void TColumnValueToHS2TColumn(const TColumnValue &col_val, const TColumnType &type, uint32_t row_idx, apache::hive::service::cli::thrift::TColumn *column)
For V6->
apache::hive::service::cli::thrift::TTypeEntry ToHs2Type() const
scoped_ptr< TRowSet > owned_result_set_
virtual Status AddOneRow(const TResultRow &row)
virtual int64_t ByteSize(int start_idx, int num_rows)
Returns the approximate size of the given range of rows in bytes.
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
const TResultSetMetadata & metadata_
void ExprValueToHS2TColumn(const void *value, const TColumnType &type, uint32_t row_idx, apache::hive::service::cli::thrift::TColumn *column)
For V6->
#define HS2_RETURN_ERROR(return_val, error_msg, error_state)
#define IMPALA_BUILD_VERSION
virtual ~HS2ColumnarResultSet()
virtual ~HS2RowOrientedResultSet()
scoped_ptr< TRowSet > owned_result_set_
void CancelInternal()
Runs cancel logic. Assumes that lock_ is held.
void SetQueryOption(TImpalaQueryOptions::type opt, const T &opt_val, TExecuteStatementReq *exec_stmt_req)
virtual int64_t ByteSize(int start_idx, int num_rows)
Returns the approximate size of the given range of rows in bytes.
virtual int AddRows(const QueryResultSet *other, int start_idx, int num_rows)
#define HS2_RETURN_IF_ERROR(return_val, status, error_state)
void ExprValueToHS2TColumnValue(const void *value, const TColumnType &type, apache::hive::service::cli::thrift::TColumnValue *hs2_col_val)
For V1->V5.
bool IsRecoverableError() const