19 #include <boost/algorithm/string/join.hpp>
20 #include <boost/algorithm/string/replace.hpp>
21 #include <boost/filesystem.hpp>
22 #include <boost/date_time/posix_time/posix_time_types.hpp>
23 #include <boost/unordered_set.hpp>
24 #include <boost/foreach.hpp>
25 #include <boost/bind.hpp>
26 #include <boost/algorithm/string.hpp>
27 #include <boost/lexical_cast.hpp>
28 #include <google/malloc_extension.h>
29 #include <gutil/strings/substitute.h>
30 #include <openssl/evp.h>
31 #include <openssl/err.h>
32 #include <rapidjson/rapidjson.h>
33 #include <rapidjson/stringbuffer.h>
34 #include <rapidjson/writer.h>
35 #include <sys/types.h>
36 #include <sys/socket.h>
72 #include "gen-cpp/Types_types.h"
73 #include "gen-cpp/ImpalaService.h"
74 #include "gen-cpp/DataSinks_types.h"
75 #include "gen-cpp/ImpalaService_types.h"
76 #include "gen-cpp/ImpalaInternalService.h"
80 using boost::adopt_lock_t;
81 using boost::algorithm::is_any_of;
82 using boost::algorithm::istarts_with;
83 using boost::algorithm::replace_all_copy;
84 using boost::algorithm::split;
85 using boost::algorithm::token_compress_on;
86 using boost::uuids::random_generator;
87 using boost::uuids::uuid;
88 using namespace apache::thrift;
89 using namespace beeswax;
90 using namespace strings;
99 DEFINE_int32(beeswax_port, 21000,
"port on which Beeswax client requests are served");
100 DEFINE_int32(hs2_port, 21050,
"port on which HiveServer2 client requests are served");
103 "number of threads available to serve client requests");
105 "(Advanced) number of threads available to serve backend execution requests");
106 DEFINE_string(default_query_options,
"",
"key=value pair of default query options for"
107 " impalad, separated by ','");
108 DEFINE_int32(query_log_size, 25,
"Number of queries to retain in the query log. If -1, "
109 "the query log has unbounded size.");
110 DEFINE_bool(log_query_to_file,
true,
"if true, logs completed query profiles to file.");
112 DEFINE_int64(max_result_cache_size, 100000L,
"Maximum number of query results a client "
113 "may request to be cached on a per-query basis to support restarting fetches. This "
114 "option guards against unreasonably large result caches requested by clients. "
115 "Requests exceeding this maximum will be rejected.");
117 DEFINE_int32(max_audit_event_log_file_size, 5000,
"The maximum size (in queries) of the "
118 "audit event log file before a new one is created (if event logging is enabled)");
119 DEFINE_string(audit_event_log_dir,
"",
"The directory in which audit event log files are "
120 "written. Setting this flag will enable audit event logging.");
121 DEFINE_bool(abort_on_failed_audit_event,
true,
"Shutdown Impala if there is a problem "
122 "recording an audit event.");
124 DEFINE_int32(max_lineage_log_file_size, 5000,
"The maximum size (in queries) of "
125 "the lineage event log file before a new one is created (if lineage logging is "
127 DEFINE_string(lineage_event_log_dir,
"",
"The directory in which lineage event log "
128 "files are written. Setting this flag with enable lineage logging.");
129 DEFINE_bool(abort_on_failed_lineage_event,
true,
"Shutdown Impala if there is a problem "
130 "recording a lineage record.");
132 DEFINE_string(profile_log_dir,
"",
"The directory in which profile log files are"
133 " written. If blank, defaults to <log_file_dir>/profiles");
134 DEFINE_int32(max_profile_log_file_size, 5000,
"The maximum size (in queries) of the "
135 "profile log file before a new one is created");
138 "(Advanced) Size of the thread-pool processing cancellations due to node failure");
140 DEFINE_string(ssl_server_certificate,
"",
"The full path to the SSL certificate file used"
141 " to authenticate Impala to clients. If set, both Beeswax and HiveServer2 ports will "
142 "only accept SSL connections");
143 DEFINE_string(ssl_private_key,
"",
"The full path to the private key used as a "
144 "counterpart to the public key contained in --ssl_server_certificate. If "
145 "--ssl_server_certificate is set, this option must be set as well.");
146 DEFINE_string(ssl_client_ca_certificate,
"",
"(Advanced) The full path to a certificate "
147 "used by Thrift clients to check the validity of a server certificate. May either be "
148 "a certificate for a third-party Certificate Authority, or a copy of the certificate "
149 "the client expects to receive from the server.");
151 DEFINE_int32(idle_session_timeout, 0,
"The time, in seconds, that a session may be idle"
152 " for before it is closed (and all running queries cancelled) by Impala. If 0, idle"
153 " sessions are never expired.");
154 DEFINE_int32(idle_query_timeout, 0,
"The time, in seconds, that a query may be idle for"
155 " (i.e. no processing work is done and no updates are received from the client) "
156 "before it is cancelled. If 0, idle queries are never expired. The query option "
157 "QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
158 " the maximum allowable timeout.");
160 DEFINE_string(local_nodemanager_url,
"",
"The URL of the local Yarn Node Manager's HTTP "
161 "interface, used to detect if the Node Manager fails");
184 const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION =
"42000";
185 const char* ImpalaServer::SQLSTATE_GENERAL_ERROR =
"HY000";
186 const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED =
"HYC00";
187 const int ImpalaServer::ASCII_PRECISION = 16;
195 :
query_id_(query_id), cause_(cause), unregister_(unregister) {
234 if (FLAGS_abort_on_config_error) {
235 LOG(ERROR) <<
"Aborting Impala Server startup due to improper configuration";
243 if (FLAGS_abort_on_config_error) {
244 LOG(ERROR) <<
"Aborting Impala Server startup due to improperly "
245 <<
"configured scratch directories.";
251 LOG(ERROR) <<
"Query profile archival is disabled";
252 FLAGS_log_query_to_file =
false;
256 LOG(ERROR) <<
"Aborting Impala Server startup due to failure initializing "
257 <<
"audit event logging";
262 LOG(ERROR) <<
"Aborting Impala Server startup due to failure initializing "
263 <<
"lineage logging";
267 if (!FLAGS_authorized_proxy_user_config.empty()) {
271 vector<string> proxy_user_config;
272 split(proxy_user_config, FLAGS_authorized_proxy_user_config, is_any_of(
";"),
274 if (proxy_user_config.size() > 0) {
275 BOOST_FOREACH(
const string& config, proxy_user_config) {
276 size_t pos = config.find(
"=");
277 if (pos == string::npos) {
278 LOG(ERROR) <<
"Invalid proxy user configuration. No mapping value specified "
279 <<
"for the proxy user. For more information review usage of the "
280 <<
"--authorized_proxy_user_config flag: " << config;
283 string proxy_user = config.substr(0, pos);
284 string config_str = config.substr(pos + 1);
285 vector<string> parsed_allowed_users;
286 split(parsed_allowed_users, config_str, is_any_of(
","), token_compress_on);
287 unordered_set<string> allowed_users(parsed_allowed_users.begin(),
288 parsed_allowed_users.end());
294 if (FLAGS_disk_spill_encryption) {
298 OpenSSL_add_all_algorithms();
299 ERR_load_crypto_strings();
327 "impala-server",
"cancellation-worker",
331 if (FLAGS_idle_session_timeout > 0) {
340 if (FLAGS_enable_rm) {
349 if (!request.__isset.query_exec_request && !request.__isset.catalog_op_request) {
352 string lineage_graph;
353 if (request.__isset.query_exec_request &&
354 request.query_exec_request.__isset.lineage_graph) {
355 lineage_graph = request.query_exec_request.lineage_graph;
356 }
else if (request.__isset.catalog_op_request &&
357 request.catalog_op_request.__isset.lineage_graph) {
358 lineage_graph = request.catalog_op_request.lineage_graph;
362 DCHECK(!lineage_graph.empty());
365 LOG(ERROR) <<
"Unable to record query lineage record: " << status.GetDetail();
366 if (FLAGS_abort_on_failed_lineage_event) {
367 LOG(ERROR) <<
"Shutting down Impala Server due to abort_on_failed_lineage_event=true";
375 return !FLAGS_lineage_event_log_dir.empty();
380 LOG(INFO) <<
"Lineage logging is disabled";
392 const TExecRequest& request) {
394 rapidjson::StringBuffer buffer;
395 rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
397 writer.StartObject();
400 writer.String(ss.str().c_str());
401 writer.StartObject();
402 writer.String(
"query_id");
404 writer.String(
"session_id");
406 writer.String(
"start_time");
408 writer.String(
"authorization_failure");
410 writer.String(
"status");
412 writer.String(
"user");
414 writer.String(
"impersonator");
422 writer.String(
"statement_type");
423 if (request.stmt_type == TStmtType::DDL) {
424 if (request.catalog_op_request.op_type == TCatalogOpType::DDL) {
426 PrintTDdlType(request.catalog_op_request.ddl_params.ddl_type).c_str());
433 writer.String(
"network_address");
436 writer.String(
"sql_statement");
437 string stmt = replace_all_copy(exec_state.
sql_stmt(),
"\n",
" ");
439 writer.String(stmt.c_str());
440 writer.String(
"catalog_objects");
442 BOOST_FOREACH(
const TAccessEvent& event, request.access_events) {
443 writer.StartObject();
444 writer.String(
"name");
445 writer.String(event.name.c_str());
446 writer.String(
"object_type");
448 writer.String(
"privilege");
449 writer.String(event.privilege.c_str());
457 LOG(ERROR) <<
"Unable to record audit event record: " << status.
GetDetail();
458 if (FLAGS_abort_on_failed_audit_event) {
459 LOG(ERROR) <<
"Shutting down Impala Server due to abort_on_failed_audit_event=true";
467 return !FLAGS_audit_event_log_dir.empty();
472 LOG(INFO) <<
"Event logging is disabled";
485 bool log_events =
true;
487 case TStmtType::QUERY: {
490 if (!status.
ok() && !exec_state.
fetched_rows()) log_events =
false;
493 case TStmtType::DML: {
494 if (!status.
ok()) log_events =
false;
497 case TStmtType::DDL: {
501 if (!status.
ok()) log_events =
false;
504 if (!status.
ok() && !exec_state.
fetched_rows()) log_events =
false;
508 case TStmtType::EXPLAIN:
509 case TStmtType::LOAD:
525 if (!FLAGS_log_query_to_file)
return Status::OK;
527 if (FLAGS_profile_log_dir.empty()) {
529 ss << FLAGS_log_dir <<
"/profiles/";
530 FLAGS_profile_log_dir = ss.str();
542 bool base64_encoded, stringstream* output) {
543 DCHECK(output != NULL);
549 if (base64_encoded) {
550 exec_state->second->profile().SerializeToArchiveString(output);
552 exec_state->second->profile().PrettyPrint(output);
561 QueryLogIndex::const_iterator query_record =
query_log_index_.find(query_id);
564 ss <<
"Query id " <<
PrintId(query_id) <<
" not found.";
567 if (base64_encoded) {
568 (*output) << query_record->second->encoded_profile_str;
570 (*output) << query_record->second->profile_str;
580 if (exec_state != NULL) {
581 lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
582 if (exec_state->coord() != NULL) {
583 lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
584 *result = exec_state->coord()->exec_summary();
593 QueryLogIndex::const_iterator query_record =
query_log_index_.find(query_id);
596 ss <<
"Query id " <<
PrintId(query_id) <<
" not found.";
599 *result = query_record->second->exec_summary;
616 LOG(ERROR) <<
"Error flushing audit event log: " << status.
GetDetail();
617 if (FLAGS_abort_on_failed_audit_event) {
618 LOG(ERROR) <<
"Shutting down Impala Server due to "
619 <<
"abort_on_failed_audit_event=true";
631 LOG(ERROR) <<
"Error flushing lineage event log: " << status.
GetDetail();
632 if (FLAGS_abort_on_failed_lineage_event) {
633 LOG(ERROR) <<
"Shutting down Impala Server due to "
634 <<
"abort_on_failed_lineage_event=true";
646 if (FLAGS_log_query_to_file) {
651 LOG_EVERY_N(WARNING, 1000) <<
"Could not write to profile log file file ("
652 << google::COUNTER <<
" attempts failed): "
654 LOG_EVERY_N(WARNING, 1000)
655 <<
"Disable query logging with --log_query_to_file=false";
659 if (FLAGS_query_log_size == 0)
return;
661 if (query.
coord() != NULL) {
662 lock_guard<SpinLock> lock(query.
coord()->GetExecSummaryLock());
670 if (FLAGS_query_log_size > -1 && FLAGS_query_log_size <
query_log_.size()) {
671 DCHECK_EQ(
query_log_.size() - FLAGS_query_log_size, 1);
681 shared_ptr<SessionState> session_state,
682 shared_ptr<QueryExecState>* exec_state) {
684 bool registered_exec_state;
688 string stmt = replace_all_copy(query_ctx->request.stmt,
"\n",
" ");
690 query_ctx->request.__set_redacted_stmt((
const string) stmt);
694 if (!status.
ok() && registered_exec_state) {
701 const TQueryCtx& query_ctx,
702 shared_ptr<SessionState> session_state,
703 bool* registered_exec_state,
704 shared_ptr<QueryExecState>* exec_state) {
705 DCHECK(session_state != NULL);
706 *registered_exec_state =
false;
708 return Status(
"This Impala server is offline. Please retry your query later.");
711 this, session_state));
713 (*exec_state)->query_events()->MarkEvent(
"Start execution");
728 lock_guard<mutex> l(*(*exec_state)->lock());
734 *registered_exec_state =
true;
738 (*exec_state)->query_events()->MarkEvent(
"Planning finished");
739 (*exec_state)->summary_profile()->AddEventSequence(
740 result.timeline.name, result.timeline);
741 if (result.__isset.result_set_metadata) {
742 (*exec_state)->set_result_metadata(result.result_set_metadata);
745 VLOG(2) <<
"Execution request: " << ThriftDebugString(result);
749 if (result.stmt_type == TStmtType::DDL) {
756 if ((*exec_state)->coord() != NULL) {
757 const unordered_set<TNetworkAddress>& unique_hosts =
758 (*exec_state)->schedule()->unique_hosts();
759 if (!unique_hosts.empty()) {
761 BOOST_FOREACH(
const TNetworkAddress& port, unique_hosts) {
770 query_ctx->__set_pid(getpid());
778 random_generator uuid_generator;
779 uuid query_uuid = uuid_generator();
784 const shared_ptr<QueryExecState>& exec_state) {
785 lock_guard<mutex> l2(session_state->lock);
788 DCHECK(session_state->ref_count > 0 && !session_state->expired);
790 if (session_state->closed)
return Status(
"Session has been closed, ignoring query.");
791 const TUniqueId&
query_id = exec_state->query_id();
799 ss <<
"query id " <<
PrintId(query_id) <<
" already exists";
808 const shared_ptr<QueryExecState>& exec_state) {
809 const TUniqueId&
query_id = exec_state->query_id();
810 lock_guard<mutex> l(session_state->lock);
813 DCHECK_GT(session_state->ref_count, 0);
814 DCHECK(!session_state->expired);
816 if (session_state->closed)
return Status(
"Session closed");
818 session_state->inflight_queries.insert(query_id);
820 int32_t timeout_s = exec_state->query_options().query_timeout_s;
821 if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) {
822 timeout_s = min(FLAGS_idle_query_timeout, timeout_s);
825 timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
833 make_pair(
UnixMillis() + (1000L * timeout_s), query_id));
844 shared_ptr<QueryExecState> exec_state;
849 return Status(
"Invalid or unknown query handle");
851 exec_state = entry->second;
860 lock_guard<mutex> l(exec_state->session()->lock);
861 exec_state->session()->inflight_queries.erase(query_id);
864 if (exec_state->coord() != NULL) {
867 lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
868 const TExecSummary& summary = exec_state->coord()->exec_summary();
871 exec_state->summary_profile()->AddInfoString(
"ExecSummary", exec_summary);
873 const unordered_set<TNetworkAddress>& unique_hosts =
874 exec_state->schedule()->unique_hosts();
875 if (!unique_hosts.empty()) {
877 BOOST_FOREACH(
const TNetworkAddress& hostport, unique_hosts) {
884 it->second.erase(exec_state->query_id());
894 TGetDbsResult db_names;
898 BOOST_FOREACH(
const string& db, db_names.dbs) {
899 TGetTablesResult table_names;
911 if (exec_state == NULL)
return Status(
"Invalid or unknown query handle");
912 lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
913 if (check_inflight) {
914 lock_guard<mutex> l2(exec_state->session()->lock);
915 if (exec_state->session()->inflight_queries.find(query_id) ==
916 exec_state->session()->inflight_queries.end()) {
917 return Status(
"Query not yet running");
921 exec_state->Cancel(cause);
926 bool ignore_if_absent) {
928 shared_ptr<SessionState> session_state;
933 if (ignore_if_absent) {
936 return Status(
"Invalid session ID");
939 session_state = entry->second;
942 DCHECK(session_state != NULL);
943 if (session_state->session_type == TSessionType::BEESWAX) {
948 unordered_set<TUniqueId> inflight_queries;
950 lock_guard<mutex> l(session_state->lock);
951 DCHECK(!session_state->closed);
952 session_state->closed =
true;
954 inflight_queries.insert(session_state->inflight_queries.begin(),
955 session_state->inflight_queries.end());
958 Status status(
"Session closed");
959 BOOST_FOREACH(
const TUniqueId&
query_id, inflight_queries) {
966 shared_ptr<SessionState>* session_state,
bool mark_active) {
970 *session_state = boost::shared_ptr<SessionState>();
971 return Status(
"Invalid session id");
974 lock_guard<mutex> session_lock(i->second->lock);
975 if (i->second->expired) {
977 ss <<
"Client session expired due to more than " << FLAGS_idle_session_timeout
978 <<
"s of inactivity (last activity was at: "
982 if (i->second->closed)
return Status(
"Session is closed");
983 ++i->second->ref_count;
985 *session_state = i->second;
991 TReportExecStatusResult& return_val,
const TReportExecStatusParams& params) {
992 VLOG_FILE <<
"ReportExecStatus() query_id=" << params.query_id
993 <<
" backend#=" << params.backend_num
994 <<
" instance_id=" << params.fragment_instance_id
995 <<
" done=" << (params.done ?
"true" :
"false");
1000 shared_ptr<QueryExecState> exec_state =
GetQueryExecState(params.query_id,
false);
1005 if (exec_state.get() == NULL) {
1006 return_val.status.__set_status_code(TErrorCode::INTERNAL_ERROR);
1007 const string& err = Substitute(
"ReportExecStatus(): Received report for unknown "
1008 "query ID (probably closed or cancelled). (query_id: $0, backend: $1, instance:"
1009 " $2 done: $3)",
PrintId(params.query_id), params.backend_num,
1010 PrintId(params.fragment_instance_id), params.done);
1011 return_val.status.error_msgs.push_back(err);
1015 exec_state->coord()->UpdateFragmentExecStatus(params).SetTStatus(&return_val);
1019 TTransmitDataResult& return_val,
const TTransmitDataParams& params) {
1020 VLOG_ROW <<
"TransmitData(): instance_id=" << params.dest_fragment_instance_id
1021 <<
" node_id=" << params.dest_node_id
1022 <<
" #rows=" << params.row_batch.num_rows
1023 <<
"sender_id=" << params.sender_id
1024 <<
" eos=" << (params.eos ?
"true" :
"false");
1027 if (params.row_batch.num_rows > 0) {
1029 params.dest_fragment_instance_id, params.dest_node_id, params.row_batch,
1040 params.dest_fragment_instance_id, params.dest_node_id,
1049 LOG(ERROR) <<
"Invalid default query options. Please check -default_query_options.\n"
1055 map<string, string> string_map;
1057 map<string, string>::const_iterator itr = string_map.begin();
1058 for (; itr != string_map.end(); ++itr) {
1059 ConfigVariable option;
1060 option.__set_key(itr->first);
1061 option.__set_value(itr->second);
1064 ConfigVariable support_start_over;
1065 support_start_over.__set_key(
"support_start_over");
1066 support_start_over.__set_value(
"false");
1071 TSessionState* state) {
1072 lock_guard<mutex> l(
lock);
1073 state->session_id = session_id;
1087 &cancellation_work.
cause());
1094 &cancellation_work.
cause());
1097 <<
") did not succeed: " << status.
GetDetail();
1104 return Status(
"Unable to delegate using empty proxy username.");
1105 }
else if (user.empty()) {
1106 return Status(
"Unable to delegate using empty doAs username.");
1109 stringstream error_msg;
1110 error_msg <<
"User '" << user <<
"' is not authorized to delegate to '"
1111 << do_as_user <<
"'.";
1113 error_msg <<
" User delegation is disabled.";
1114 return Status(error_msg.str());
1119 size_t end_idx = min(user.find(
"/"), user.find(
"@"));
1123 end_idx == string::npos || end_idx == 0 ? user : user.substr(0, end_idx));
1127 ProxyUserMap::const_iterator proxy_user =
1130 BOOST_FOREACH(
const string& user, proxy_user->second) {
1131 if (user ==
"*" || user == do_as_user)
return Status::OK;
1134 return Status(error_msg.str());
1139 vector<TTopicDelta>* subscriber_topic_updates) {
1140 StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
1142 if (topic == incoming_topic_deltas.end())
return;
1143 const TTopicDelta& delta = topic->second;
1147 if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0) {
1148 TUpdateCatalogCacheRequest update_req;
1149 update_req.__set_is_delta(delta.is_delta);
1153 BOOST_FOREACH(
const TTopicItem& item, delta.topic_entries) {
1154 uint32_t len = item.value.size();
1155 TCatalogObject catalog_object;
1157 item.value.data()), &len, FLAGS_compact_catalog_topic, &catalog_object);
1159 LOG(ERROR) <<
"Error deserializing item: " << status.
GetDetail();
1162 if (catalog_object.type == TCatalogObjectType::CATALOG) {
1163 update_req.__set_catalog_service_id(catalog_object.catalog.catalog_service_id);
1164 new_catalog_version = catalog_object.catalog_version;
1168 if (catalog_object.type == TCatalogObjectType::FUNCTION) {
1169 DCHECK(catalog_object.__isset.fn);
1172 if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
1173 DCHECK(catalog_object.__isset.data_source);
1177 update_req.updated_objects.push_back(catalog_object);
1184 vector<TCatalogObject> dropped_objects;
1188 BOOST_FOREACH(
const string& key, delta.topic_deletions) {
1189 LOG(INFO) <<
"Catalog topic entry deletion: " << key;
1190 TCatalogObject catalog_object;
1193 LOG(ERROR) <<
"Error parsing catalog topic entry deletion key: " << key <<
" "
1197 update_req.removed_objects.push_back(catalog_object);
1198 if (catalog_object.type == TCatalogObjectType::FUNCTION ||
1199 catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
1200 TCatalogObject dropped_object;
1202 catalog_object, &dropped_object).
ok()) {
1206 if (dropped_object.catalog_version <= new_catalog_version) {
1207 if (catalog_object.type == TCatalogObjectType::FUNCTION ||
1208 catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
1209 dropped_objects.push_back(dropped_object);
1218 TUpdateCatalogCacheResponse resp;
1221 LOG(ERROR) <<
"There was an error processing the impalad catalog update. Requesting"
1222 <<
" a full topic update to recover: " << s.
GetDetail();
1223 subscriber_topic_updates->push_back(TTopicDelta());
1224 TTopicDelta& update = subscriber_topic_updates->back();
1226 update.__set_from_version(0L);
1242 BOOST_FOREACH(TCatalogObject&
object, dropped_objects) {
1243 if (
object.type == TCatalogObjectType::FUNCTION) {
1245 }
else if (
object.type == TCatalogObjectType::DATA_SOURCE) {
1263 const TCatalogUpdateResult& catalog_update_result,
bool wait_for_all_subscribers) {
1267 if ((catalog_update_result.__isset.updated_catalog_object ||
1268 catalog_update_result.__isset.removed_catalog_object)) {
1269 TUpdateCatalogCacheRequest update_req;
1270 update_req.__set_is_delta(
true);
1271 update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
1273 if (catalog_update_result.__isset.updated_catalog_object) {
1274 update_req.updated_objects.push_back(catalog_update_result.updated_catalog_object);
1276 if (catalog_update_result.__isset.removed_catalog_object) {
1277 update_req.removed_objects.push_back(catalog_update_result.removed_catalog_object);
1280 TUpdateCatalogCacheResponse resp;
1282 if (!status.
ok()) LOG(ERROR) << status.
GetDetail();
1284 if (!wait_for_all_subscribers)
return Status::OK;
1288 int64_t min_req_catalog_version = catalog_update_result.version;
1289 const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
1293 VLOG_QUERY <<
"Waiting for catalog version: " << min_req_catalog_version
1300 if (!wait_for_all_subscribers)
return Status::OK;
1307 VLOG_QUERY <<
"Waiting for min subscriber topic version: "
1308 << min_req_subscriber_topic_version <<
" current version: "
1310 while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
1319 vector<TTopicDelta>* subscriber_topic_updates) {
1322 StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
1325 if (topic != incoming_topic_deltas.end()) {
1326 const TTopicDelta& delta = topic->second;
1332 BOOST_FOREACH(
const TTopicItem& item, delta.topic_entries) {
1333 uint32_t len = item.value.size();
1334 TBackendDescriptor backend_descriptor;
1336 item.value.data()), &len,
false, &backend_descriptor);
1338 VLOG(2) <<
"Error deserializing topic item with key: " << item.key;
1342 known_backends_.insert(make_pair(item.key, backend_descriptor.address));
1345 BOOST_FOREACH(
const string& backend_id, delta.topic_deletions) {
1351 set<TNetworkAddress> current_membership;
1352 BOOST_FOREACH(
const BackendAddressMap::value_type& backend,
known_backends_) {
1353 current_membership.insert(backend.second);
1358 map<TUniqueId, vector<TNetworkAddress> > queries_to_cancel;
1366 if (current_membership.find(loc_entry->first) == current_membership.end()) {
1367 unordered_set<TUniqueId>::const_iterator
query_id = loc_entry->second.begin();
1369 for(; query_id != loc_entry->second.end(); ++
query_id) {
1370 vector<TNetworkAddress>& failed_hosts = queries_to_cancel[*
query_id];
1371 failed_hosts.push_back(loc_entry->first);
1377 QueryLocations::const_iterator failed_backend = loc_entry;
1390 LOG_EVERY_N(WARNING, 60) <<
"Cancellation queue is full";
1395 map<TUniqueId, vector<TNetworkAddress> >::iterator cancellation_entry;
1396 for (cancellation_entry = queries_to_cancel.begin();
1397 cancellation_entry != queries_to_cancel.end();
1398 ++cancellation_entry) {
1399 stringstream cause_msg;
1400 cause_msg <<
"Cancelled due to unreachable impalad(s): ";
1401 for (
int i = 0; i < cancellation_entry->second.size(); ++i) {
1402 cause_msg << cancellation_entry->second[i];
1403 if (i + 1 != cancellation_entry->second.size()) cause_msg <<
", ";
1413 bool copy_profile,
const string& encoded_profile) {
1414 id = exec_state.query_id();
1415 const TExecRequest& request = exec_state.exec_request();
1417 const string* plan_str = exec_state.summary_profile().GetInfoString(
"Plan");
1418 if (plan_str != NULL) plan = *plan_str;
1419 stmt = exec_state.sql_stmt();
1420 stmt_type = request.stmt_type;
1421 effective_user = exec_state.effective_user();
1422 default_db = exec_state.default_db();
1423 start_time = exec_state.start_time();
1424 end_time = exec_state.end_time();
1427 Coordinator* coord = exec_state.coord();
1428 if (coord != NULL) {
1429 num_complete_fragments = coord->progress().num_complete();
1430 total_fragments = coord->progress().total();
1433 query_state = exec_state.query_state();
1434 num_rows_fetched = exec_state.num_rows_fetched();
1435 query_status = exec_state.query_status();
1437 exec_state.query_events()->ToThrift(&event_sequence);
1441 exec_state.profile().PrettyPrint(&ss);
1442 profile_str = ss.str();
1443 if (encoded_profile.empty()) {
1444 encoded_profile_str = exec_state.profile().SerializeToArchiveString();
1446 encoded_profile_str = encoded_profile;
1451 fragments = exec_state.exec_request().query_exec_request.fragments;
1465 const TUniqueId& session_id = connection_context.
connection_id;
1466 shared_ptr<SessionState> session_state;
1468 session_state->closed =
false;
1470 session_state->last_accessed_ms =
UnixMillis();
1471 session_state->database =
"default";
1472 session_state->session_type = TSessionType::BEESWAX;
1476 if (!connection_context.
username.empty()) {
1477 session_state->connected_user = connection_context.
username;
1498 ConnectionToSessionMap::iterator it =
1504 LOG(INFO) <<
"Connection from client " << connection_context.
network_address
1505 <<
" closed, closing " << it->second.size() <<
" associated session(s)";
1507 BOOST_FOREACH(
const TUniqueId& session_id, it->second) {
1510 LOG(WARNING) <<
"Error closing session " << session_id <<
": "
1521 SleepForMs(FLAGS_idle_session_timeout * 500);
1524 VLOG(3) <<
"Session expiration thread waking up";
1528 unordered_set<TUniqueId> inflight_queries;
1530 lock_guard<mutex> l(session_state.second->lock);
1531 if (session_state.second->ref_count > 0)
continue;
1534 if (session_state.second->closed || session_state.second->expired)
continue;
1535 int64_t last_accessed_ms = session_state.second->last_accessed_ms;
1536 if (now - last_accessed_ms <= (FLAGS_idle_session_timeout * 1000))
continue;
1537 LOG(INFO) <<
"Expiring session: " << session_state.first <<
", user:"
1538 << session_state.second->connected_user <<
", last active: "
1540 session_state.second->expired =
true;
1543 inflight_queries.insert(session_state.second->inflight_queries.begin(),
1544 session_state.second->inflight_queries.end());
1547 Status status(
"Session expired due to inactivity");
1548 BOOST_FOREACH(
const TUniqueId&
query_id, inflight_queries) {
1581 if (expiration_event->first > now)
break;
1582 shared_ptr<QueryExecState> query_state =
1584 if (query_state.get() == NULL) {
1591 int32_t timeout_s = query_state->query_options().query_timeout_s;
1592 if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) {
1593 timeout_s = min(FLAGS_idle_query_timeout, timeout_s);
1596 timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
1598 int64_t expiration = query_state->last_active() + (timeout_s * 1000L);
1599 if (now < expiration) {
1602 if (expiration == expiration_event->first) {
1609 TUniqueId
query_id = expiration_event->second;
1613 }
else if (!query_state->is_active()) {
1615 VLOG_QUERY <<
"Expiring query due to client inactivity: "
1616 << expiration_event->second <<
", last activity was at: "
1618 const string& err_msg = Substitute(
1619 "Query $0 expired due to client inactivity (timeout is $1)",
1620 PrintId(expiration_event->second),
1644 DCHECK((beeswax_port == 0) == (beeswax_server == NULL));
1645 DCHECK((hs2_port == 0) == (hs2_server == NULL));
1646 DCHECK((be_port == 0) == (be_server == NULL));
1648 shared_ptr<ImpalaServer> handler(
new ImpalaServer(exec_env));
1650 if (beeswax_port != 0 && beeswax_server != NULL) {
1653 shared_ptr<TProcessor> beeswax_processor(
new ImpalaServiceProcessor(handler));
1654 shared_ptr<TProcessorEventHandler> event_handler(
1656 beeswax_processor->setEventHandler(event_handler);
1662 if (!FLAGS_ssl_server_certificate.empty()) {
1663 LOG(INFO) <<
"Enabling SSL for Beeswax";
1665 FLAGS_ssl_server_certificate, FLAGS_ssl_private_key));
1668 LOG(INFO) <<
"Impala Beeswax Service listening on " << beeswax_port;
1671 if (hs2_port != 0 && hs2_server != NULL) {
1673 shared_ptr<TProcessor> hs2_fe_processor(
1674 new ImpalaHiveServer2ServiceProcessor(handler));
1675 shared_ptr<TProcessorEventHandler> event_handler(
1677 hs2_fe_processor->setEventHandler(event_handler);
1684 if (!FLAGS_ssl_server_certificate.empty()) {
1685 LOG(INFO) <<
"Enabling SSL for HiveServer2";
1687 FLAGS_ssl_server_certificate, FLAGS_ssl_private_key));
1690 LOG(INFO) <<
"Impala HiveServer2 Service listening on " << hs2_port;
1693 if (be_port != 0 && be_server != NULL) {
1694 shared_ptr<FragmentMgr> fragment_mgr(
new FragmentMgr());
1695 shared_ptr<ImpalaInternalService> thrift_if(
1697 shared_ptr<TProcessor> be_processor(
new ImpalaInternalServiceProcessor(thrift_if));
1698 shared_ptr<TProcessorEventHandler> event_handler(
1700 be_processor->setEventHandler(event_handler);
1702 *be_server =
new ThriftServer(
"backend", be_processor, be_port, NULL,
1703 exec_env->
metrics(), FLAGS_be_service_threads);
1705 LOG(INFO) <<
"ImpalaInternalService listening on " << be_port;
1707 if (impala_server != NULL) *impala_server = handler.get();
1713 TUniqueId* session_id) {
1714 DCHECK(session_id != NULL);
1720 *session_id = i->second->session_id();
1726 const TUniqueId&
query_id,
bool lock) {
1730 return shared_ptr<QueryExecState>();
1732 if (lock) i->second->lock()->lock();
1744 DCHECK(FLAGS_enable_rm);
1745 if (FLAGS_local_nodemanager_url.empty()) {
1746 LOG(WARNING) <<
"No NM address set (--nm_addr is empty), no NM failure detection "
1747 <<
"thread started";
1752 if (istarts_with(FLAGS_local_nodemanager_url,
"http://")) {
1753 FLAGS_local_nodemanager_url =
1754 FLAGS_local_nodemanager_url.substr(
string(
"http://").size());
1755 }
else if (istarts_with(FLAGS_local_nodemanager_url,
"https://")) {
1756 FLAGS_local_nodemanager_url =
1757 FLAGS_local_nodemanager_url.substr(
string(
"https://").size());
1759 vector<string> components;
1760 split(components, FLAGS_local_nodemanager_url, is_any_of(
":"));
1761 if (components.size() < 2) {
1762 LOG(ERROR) <<
"Could not parse network address from --local_nodemanager_url, no NM"
1763 <<
" failure detection thread started";
1766 DCHECK_GE(components.size(), 2);
1767 TNetworkAddress nm_addr =
1772 struct addrinfo* addr;
1773 if (getaddrinfo(nm_addr.hostname.c_str(), components[1].c_str(), NULL, &addr)) {
1774 LOG(WARNING) <<
"Could not resolve NM address: " << nm_addr <<
". Error was: "
1778 LOG(INFO) <<
"Starting NM failure-detection thread, NM at: " << nm_addr;
1781 bool last_failure_state =
false;
1783 int sockfd = socket(AF_INET, SOCK_STREAM, 0);
1785 if (connect(sockfd, addr->ai_addr,
sizeof(sockaddr)) < 0) {
1792 LOG(ERROR) <<
"Could not create socket! Error was: " <<
GetStrErrMsg();
1794 bool is_failed = (failure_detector.
GetPeerState(FLAGS_local_nodemanager_url) ==
1796 if (is_failed != last_failure_state) {
1799 "ImpalaServer is going offline while local node-manager connectivity is bad";
1802 "Node-manager connectivity has been restored. ImpalaServer is now online";
1806 last_failure_state = is_failed;
Status InitProfileLogging()
bool operator==(const CancellationWork &other) const
void TransmitData(TTransmitDataResult &return_val, const TTransmitDataParams ¶ms)
boost::scoped_ptr< ThreadPool< CancellationWork > > cancellation_thread_pool_
boost::scoped_ptr< Thread > audit_event_logger_flush_thread_
If audit event logging is enabled, wakes once every 5s to flush audit events to disk.
boost::mutex query_expiration_lock_
Guards queries_by_timestamp_. Must not be acquired before a session state lock.
void SetImpalaServer(ImpalaServer *server)
void TQueryOptionsToMap(const TQueryOptions &query_options, std::map< std::string, std::string > *configuration)
Converts a TQueryOptions struct into a map of key, value pairs.
static IntGauge * IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS
boost::mutex query_exec_state_map_lock_
Coordinator * coord() const
const std::string GetDetail() const
const TExecRequest & exec_request() const
void MembershipCallback(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
void ToThrift(const TUniqueId &session_id, TSessionState *session_state)
void SetOffline(bool offline)
Set is_offline_ to the argument's value.
boost::mutex session_state_map_lock_
TODO: Consider allowing fragment IDs as category parameters.
static void CreateMetrics(MetricGroup *m)
std::string SerializeToArchiveString() const
ProxyUserMap authorized_proxy_user_config_
boost::mutex query_log_lock_
Guards query_log_ and query_log_index_.
DEFINE_int64(max_result_cache_size, 100000L,"Maximum number of query results a client ""may request to be cached on a per-query basis to support restarting fetches. This ""option guards against unreasonably large result caches requested by clients. ""Requests exceeding this maximum will be rejected.")
boost::mutex connection_to_sessions_map_lock_
Protects connection_to_sessions_map_. May be taken before session_state_map_lock_.
const string AUDIT_EVENT_LOG_FILE_PREFIX
const TUniqueId & query_id() const
Status CancelInternal(const TUniqueId &query_id, bool check_inflight, const Status *cause=NULL)
const string PROFILE_LOG_FILE_PREFIX
int64_t min_subscriber_catalog_topic_version_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
void Redact(string *value, bool *changed)
void CancelFromThreadPool(uint32_t thread_id, const CancellationWork &cancellation_work)
const TUniqueId & session_id() const
TUniqueId catalog_service_id
The CatalogService ID that this catalog version is from.
MetricGroup * GetChildGroup(const std::string &name)
Creates or returns an already existing child metric group.
void LogQueryEvents(const QueryExecState &exec_state)
Log audit and column lineage events.
void RegisterWebserverCallbacks(Webserver *webserver)
Registers all the per-Impalad webserver callbacks.
Status CloseSender(const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int sender_id)
const bool unregister() const
const std::string & connected_user() const
bool fetched_rows() const
Status RegisterQuery(boost::shared_ptr< SessionState > session_state, const boost::shared_ptr< QueryExecState > &exec_state)
int64_t catalog_topic_version
The statestore catalog topic version this update was received in.
TNetworkAddress network_address
std::string PrintTCatalogOpType(const TCatalogOpType::type &type)
static std::string IMPALA_CATALOG_TOPIC
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
DECLARE_bool(abort_on_config_error)
boost::scoped_ptr< Thread > query_expiration_thread_
Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
virtual PeerState GetPeerState(const std::string &peer)
Returns the current estimated state of a peer.
string PrintId(const TUniqueId &id, const string &separator)
static IntGauge * CATALOG_NUM_TABLES
static void PrepareQueryContext(TQueryCtx *query_ctx)
DEFINE_int32(beeswax_port, 21000,"port on which Beeswax client requests are served")
boost::condition_variable catalog_version_update_cv_
Variable to signal when the catalog version has been modified.
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
void InitializeConfigVariables()
std::string PrintTCatalogObjectType(const TCatalogObjectType::type &type)
bool operator<(const CancellationWork &other) const
Status Execute(TQueryCtx *query_ctx, boost::shared_ptr< SessionState > session_state, boost::shared_ptr< QueryExecState > *exec_state)
bool IsAuditEventLoggingEnabled()
Returns true if audit event logging is enabled, false otherwise.
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
bool IsLineageLoggingEnabled()
Returns true if lineage logging is enabled, false otherwise.
Status GetCatalogObject(const TCatalogObject &request, TCatalogObject *response)
Status CloseSessionInternal(const TUniqueId &session_id, bool ignore_if_absent)
QueryStateRecord()
Default constructor used only when participating in collections.
boost::mutex query_locations_lock_
TSessionType::type session_type
ImpalaServer(ExecEnv *exec_env)
ExpirationQueue queries_by_timestamp_
Status InitAuditEventLogging()
bool operator()(const QueryStateRecord &lhs, const QueryStateRecord &rhs) const
Comparator that sorts by start time.
const std::string & effective_user() const
boost::scoped_ptr< Thread > session_timeout_thread_
Thread that runs ExpireSessions if FLAGS_idle_session_timeout > 0.
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
void RemoveEntry(const std::string &hdfs_lib_file)
Removes the cache entry for 'hdfs_lib_file'.
static BooleanProperty * CATALOG_READY
Status ProcessCatalogUpdateResult(const TCatalogUpdateResult &catalog_update_result, bool wait_for_all_subscribers)
static IntCounter * IMPALA_SERVER_NUM_QUERIES
boost::scoped_ptr< SimpleLogger > lineage_logger_
static const std::string IMPALA_MEMBERSHIP_TOPIC
CancellationWork(const TUniqueId &query_id, const Status &cause, bool unregister)
const string BEESWAX_SERVER_NAME
const string LINEAGE_LOG_FILE_PREFIX
QueryLocations query_locations_
boost::mutex is_offline_lock_
Protects is_offline_.
static IntCounter * NUM_SESSIONS_EXPIRED
virtual void GetExecSummary(impala::TExecSummary &result, const beeswax::QueryHandle &query_id)
Status GetSessionState(const TUniqueId &session_id, boost::shared_ptr< SessionState > *session_state, bool mark_active=false)
void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap &topic_deltas, std::vector< TTopicDelta > *topic_updates)
DEFINE_string(default_query_options,"","key=value pair of default query options for"" impalad, separated by ','")
ConnectionToSessionMap connection_to_sessions_map_
virtual void ConnectionStart(const ThriftServer::ConnectionContext &session_context)
SessionHandlerIf methods.
bool GetSessionIdForQuery(const TUniqueId &query_id, TUniqueId *session_id)
Per-connection information.
static bool IsAuthorizationError(const Status &status)
Returns true if the error returned by the FE was due to an AuthorizationException.
std::string DebugString(const T &val)
const uint32_t MAX_CANCELLATION_QUEUE_SIZE
SessionStateMap session_state_map_
boost::scoped_ptr< SimpleLogger > audit_event_logger_
boost::scoped_ptr< SimpleLogger > profile_logger_
Status GetRuntimeProfileStr(const TUniqueId &query_id, bool base64_encoded, std::stringstream *output)
Status ParseQueryOptions(const std::string &options, TQueryOptions *query_options)
#define EXIT_IF_ERROR(stmt)
std::string PrintTStmtType(const TStmtType::type &type)
TExecSummary exec_summary
Summary of execution for this query.
std::vector< beeswax::ConfigVariable > default_configs_
const TUniqueId & query_id() const
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
static LibCache * instance()
std::string connected_user
Connected user for this session, i.e. the user which originated this session.
const TimestampValue & start_time() const
void SetTStatus(T *status_container) const
QueryLogIndex query_log_index_
Status InitLineageLogging()
static IntCounter * NUM_QUERIES_EXPIRED
const TUniqueId & query_id() const
std::string PrintExecSummary(const TExecSummary &exec_summary)
Print the exec summary as a formatted table.
CatalogUpdateVersionInfo catalog_update_info_
The version information from the last successfull call to UpdateCatalog().
boost::shared_ptr< QueryExecState > GetQueryExecState(const TUniqueId &query_id, bool lock)
Snapshot of a query's state, archived in the query log.
ImpalaServer::SessionState * session() const
static BooleanProperty * IMPALA_SERVER_READY
Status GetDbNames(const std::string *pattern, const TSessionState *session, TGetDbsResult *table_names)
void ReportExecStatus(TReportExecStatusResult &return_val, const TReportExecStatusParams ¶ms)
Status SetQueryInflight(boost::shared_ptr< SessionState > session_state, const boost::shared_ptr< QueryExecState > &exec_state)
static AuthManager * GetInstance()
QueryExecStateMap query_exec_state_map_
bool is_offline_
True if Impala server is offline, false otherwise.
Status TCatalogObjectFromEntryKey(const string &key, TCatalogObject *catalog_object)
void LineageLoggerFlushThread()
Runs once every 5s to flush the lineage log file to disk.
static IntGauge * IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS
Status CreateImpalaServer(ExecEnv *exec_env, int beeswax_port, int hs2_port, int be_port, ThriftServer **beeswax_server, ThriftServer **hs2_server, ThriftServer **be_server, ImpalaServer **impala_server)
std::string database
The default database (changed as a result of 'use' query execution)
std::string PrintTDdlType(const TDdlType::type &type)
static StringProperty * IMPALA_SERVER_START_TIME
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
const string HS2_SERVER_NAME
void DropCache()
Removes all cached entries.
Status AddData(const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, const TRowBatch &thrift_batch, int sender_id)
const std::string & do_as_user() const
void SetNeedsRefresh(const std::string &hdfs_lib_file)
Status UpdateCatalogMetrics()
Updates the number of databases / tables metrics from the FE catalog.
TStmtType::type stmt_type() const
const TExecSummary & exec_summary() const
const std::string & sql_stmt() const
const Status & query_status() const
bool IsOffline()
Returns true if Impala is offline (and not accepting queries), false otherwise.
Status LogLineageRecord(const TExecRequest &request)
int64_t catalog_version
The last catalog version returned from UpdateCatalog()
boost::scoped_ptr< Thread > nm_failure_detection_thread_
Container thread for DetectNmFailures().
boost::scoped_ptr< Thread > profile_log_file_flush_thread_
If profile logging is enabled, wakes once every 5s to flush query profiles to disk.
DataStreamMgr * stream_mgr()
Status DeserializeThriftMsg(JNIEnv *env, jbyteArray serialized_msg, T *deserialized_msg)
void ArchiveQuery(const QueryExecState &query)
TQueryOptions default_query_options_
Default query options in the form of TQueryOptions and beeswax::ConfigVariable.
ExecEnv * exec_env_
global, per-server state
virtual PeerState UpdateHeartbeat(const std::string &peer, bool seen)
BackendAddressMap known_backends_
void CloseConnections(const TNetworkAddress &address)
Status GetTableNames(const std::string &db, const std::string *pattern, const TSessionState *session, TGetTablesResult *table_names)
StatestoreSubscriber * subscriber()
const RuntimeProfile & profile() const
Status ValidateSettings()
Validate Hadoop config; requires FE.
void SetConnectionHandler(ConnectionHandlerIf *connection)
TNetworkAddress network_address
Client network address.
Status UpdateCatalogCache(const TUpdateCatalogCacheRequest &req, TUpdateCatalogCacheResponse *resp)
virtual void close(const beeswax::QueryHandle &handle)
TimestampValue start_time
Start and end time of the query.
Status GetExecRequest(const TQueryCtx &query_ctx, TExecRequest *result)
Call FE to get TExecRequest.
std::string do_as_user
The user to delegate to. Empty for no delegation.
const int MAX_NM_MISSED_HEARTBEATS
boost::scoped_ptr< Thread > lineage_logger_flush_thread_
If lineage logging is enabled, wakes once every 5s to flush lineage events to disk.
virtual void ConnectionEnd(const ThriftServer::ConnectionContext &session_context)
void AuditEventLoggerFlushThread()
Runs once every 5s to flush the audit log file to disk.
TCatalogOpType::type catalog_op_type() const
boost::mutex catalog_version_lock_
static IntGauge * CATALOG_NUM_DBS
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
Status ExecuteInternal(const TQueryCtx &query_ctx, boost::shared_ptr< SessionState > session_state, bool *registered_exec_state, boost::shared_ptr< QueryExecState > *exec_state)
Implements Execute() logic, but doesn't unregister query on error.
const Status & cause() const
void LogFileFlushThread()
Runs once every 5s to flush the profile log file to disk.
static TimestampValue LocalTime()
DEFINE_bool(log_query_to_file, true,"if true, logs completed query profiles to file.")
Status AuthorizeProxyUser(const std::string &user, const std::string &do_as_user)
std::string DebugString() const
ImpalaInternalServiceClientCache * impalad_client_cache()
Status UnregisterQuery(const TUniqueId &query_id, bool check_inflight, const Status *cause=NULL)
Status LogAuditRecord(const QueryExecState &exec_state, const TExecRequest &request)