19 #include <boost/algorithm/string/join.hpp> 
   20 #include <boost/algorithm/string/replace.hpp> 
   21 #include <boost/filesystem.hpp> 
   22 #include <boost/date_time/posix_time/posix_time_types.hpp> 
   23 #include <boost/unordered_set.hpp> 
   24 #include <boost/foreach.hpp> 
   25 #include <boost/bind.hpp> 
   26 #include <boost/algorithm/string.hpp> 
   27 #include <boost/lexical_cast.hpp> 
   28 #include <google/malloc_extension.h> 
   29 #include <gutil/strings/substitute.h> 
   30 #include <openssl/evp.h> 
   31 #include <openssl/err.h> 
   32 #include <rapidjson/rapidjson.h> 
   33 #include <rapidjson/stringbuffer.h> 
   34 #include <rapidjson/writer.h> 
   35 #include <sys/types.h> 
   36 #include <sys/socket.h> 
   72 #include "gen-cpp/Types_types.h" 
   73 #include "gen-cpp/ImpalaService.h" 
   74 #include "gen-cpp/DataSinks_types.h" 
   75 #include "gen-cpp/ImpalaService_types.h" 
   76 #include "gen-cpp/ImpalaInternalService.h" 
   80 using boost::adopt_lock_t;
 
   81 using boost::algorithm::is_any_of;
 
   82 using boost::algorithm::istarts_with;
 
   83 using boost::algorithm::replace_all_copy;
 
   84 using boost::algorithm::split;
 
   85 using boost::algorithm::token_compress_on;
 
   86 using boost::uuids::random_generator;
 
   87 using boost::uuids::uuid;
 
   88 using namespace apache::thrift;
 
   89 using namespace beeswax;
 
   90 using namespace strings;
 
   99 DEFINE_int32(beeswax_port, 21000, 
"port on which Beeswax client requests are served");
 
  100 DEFINE_int32(hs2_port, 21050, 
"port on which HiveServer2 client requests are served");
 
  103     "number of threads available to serve client requests");
 
  105     "(Advanced) number of threads available to serve backend execution requests");
 
  106 DEFINE_string(default_query_options, 
"", 
"key=value pair of default query options for" 
  107     " impalad, separated by ','");
 
  108 DEFINE_int32(query_log_size, 25, 
"Number of queries to retain in the query log. If -1, " 
  109     "the query log has unbounded size.");
 
  110 DEFINE_bool(log_query_to_file, 
true, 
"if true, logs completed query profiles to file.");
 
  112 DEFINE_int64(max_result_cache_size, 100000L, 
"Maximum number of query results a client " 
  113     "may request to be cached on a per-query basis to support restarting fetches. This " 
  114     "option guards against unreasonably large result caches requested by clients. " 
  115     "Requests exceeding this maximum will be rejected.");
 
  117 DEFINE_int32(max_audit_event_log_file_size, 5000, 
"The maximum size (in queries) of the " 
  118     "audit event log file before a new one is created (if event logging is enabled)");
 
  119 DEFINE_string(audit_event_log_dir, 
"", 
"The directory in which audit event log files are " 
  120     "written. Setting this flag will enable audit event logging.");
 
  121 DEFINE_bool(abort_on_failed_audit_event, 
true, 
"Shutdown Impala if there is a problem " 
  122     "recording an audit event.");
 
  124 DEFINE_int32(max_lineage_log_file_size, 5000, 
"The maximum size (in queries) of " 
  125     "the lineage event log file before a new one is created (if lineage logging is " 
  127 DEFINE_string(lineage_event_log_dir, 
"", 
"The directory in which lineage event log " 
  128     "files are written. Setting this flag with enable lineage logging.");
 
  129 DEFINE_bool(abort_on_failed_lineage_event, 
true, 
"Shutdown Impala if there is a problem " 
  130     "recording a lineage record.");
 
  132 DEFINE_string(profile_log_dir, 
"", 
"The directory in which profile log files are" 
  133     " written. If blank, defaults to <log_file_dir>/profiles");
 
  134 DEFINE_int32(max_profile_log_file_size, 5000, 
"The maximum size (in queries) of the " 
  135     "profile log file before a new one is created");
 
  138     "(Advanced) Size of the thread-pool processing cancellations due to node failure");
 
  140 DEFINE_string(ssl_server_certificate, 
"", 
"The full path to the SSL certificate file used" 
  141     " to authenticate Impala to clients. If set, both Beeswax and HiveServer2 ports will " 
  142     "only accept SSL connections");
 
  143 DEFINE_string(ssl_private_key, 
"", 
"The full path to the private key used as a " 
  144     "counterpart to the public key contained in --ssl_server_certificate. If " 
  145     "--ssl_server_certificate is set, this option must be set as well.");
 
  146 DEFINE_string(ssl_client_ca_certificate, 
"", 
"(Advanced) The full path to a certificate " 
  147     "used by Thrift clients to check the validity of a server certificate. May either be " 
  148     "a certificate for a third-party Certificate Authority, or a copy of the certificate " 
  149     "the client expects to receive from the server.");
 
  151 DEFINE_int32(idle_session_timeout, 0, 
"The time, in seconds, that a session may be idle" 
  152     " for before it is closed (and all running queries cancelled) by Impala. If 0, idle" 
  153     " sessions are never expired.");
 
  154 DEFINE_int32(idle_query_timeout, 0, 
"The time, in seconds, that a query may be idle for" 
  155     " (i.e. no processing work is done and no updates are received from the client) " 
  156     "before it is cancelled. If 0, idle queries are never expired. The query option " 
  157     "QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents" 
  158     " the maximum allowable timeout.");
 
  160 DEFINE_string(local_nodemanager_url, 
"", 
"The URL of the local Yarn Node Manager's HTTP " 
  161     "interface, used to detect if the Node Manager fails");
 
  184 const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = 
"42000";
 
  185 const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = 
"HY000";
 
  186 const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = 
"HYC00";
 
  187 const int ImpalaServer::ASCII_PRECISION = 16; 
 
  195       : 
query_id_(query_id), cause_(cause), unregister_(unregister) {
 
  234     if (FLAGS_abort_on_config_error) {
 
  235       LOG(ERROR) << 
"Aborting Impala Server startup due to improper configuration";
 
  243     if (FLAGS_abort_on_config_error) {
 
  244       LOG(ERROR) << 
"Aborting Impala Server startup due to improperly " 
  245                  << 
"configured scratch directories.";
 
  251     LOG(ERROR) << 
"Query profile archival is disabled";
 
  252     FLAGS_log_query_to_file = 
false;
 
  256     LOG(ERROR) << 
"Aborting Impala Server startup due to failure initializing " 
  257                << 
"audit event logging";
 
  262     LOG(ERROR) << 
"Aborting Impala Server startup due to failure initializing " 
  263                << 
"lineage logging";
 
  267   if (!FLAGS_authorized_proxy_user_config.empty()) {
 
  271     vector<string> proxy_user_config;
 
  272     split(proxy_user_config, FLAGS_authorized_proxy_user_config, is_any_of(
";"),
 
  274     if (proxy_user_config.size() > 0) {
 
  275       BOOST_FOREACH(
const string& config, proxy_user_config) {
 
  276         size_t pos = config.find(
"=");
 
  277         if (pos == string::npos) {
 
  278           LOG(ERROR) << 
"Invalid proxy user configuration. No mapping value specified " 
  279                      << 
"for the proxy user. For more information review usage of the " 
  280                      << 
"--authorized_proxy_user_config flag: " << config;
 
  283         string proxy_user = config.substr(0, pos);
 
  284         string config_str = config.substr(pos + 1);
 
  285         vector<string> parsed_allowed_users;
 
  286         split(parsed_allowed_users, config_str, is_any_of(
","), token_compress_on);
 
  287         unordered_set<string> allowed_users(parsed_allowed_users.begin(),
 
  288             parsed_allowed_users.end());
 
  294   if (FLAGS_disk_spill_encryption) {
 
  298     OpenSSL_add_all_algorithms();
 
  299     ERR_load_crypto_strings();
 
  327           "impala-server", 
"cancellation-worker",
 
  331   if (FLAGS_idle_session_timeout > 0) {
 
  340   if (FLAGS_enable_rm) {
 
  349   if (!request.__isset.query_exec_request && !request.__isset.catalog_op_request) {
 
  352   string lineage_graph;
 
  353   if (request.__isset.query_exec_request &&
 
  354       request.query_exec_request.__isset.lineage_graph) {
 
  355     lineage_graph = request.query_exec_request.lineage_graph;
 
  356   } 
else if (request.__isset.catalog_op_request &&
 
  357       request.catalog_op_request.__isset.lineage_graph) {
 
  358     lineage_graph = request.catalog_op_request.lineage_graph;
 
  362   DCHECK(!lineage_graph.empty());
 
  365     LOG(ERROR) << 
"Unable to record query lineage record: " << status.GetDetail();
 
  366     if (FLAGS_abort_on_failed_lineage_event) {
 
  367       LOG(ERROR) << 
"Shutting down Impala Server due to abort_on_failed_lineage_event=true";
 
  375   return !FLAGS_lineage_event_log_dir.empty();
 
  380     LOG(INFO) << 
"Lineage logging is disabled";
 
  392     const TExecRequest& request) {
 
  394   rapidjson::StringBuffer buffer;
 
  395   rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
 
  397   writer.StartObject();
 
  400   writer.String(ss.str().c_str());
 
  401   writer.StartObject();
 
  402   writer.String(
"query_id");
 
  404   writer.String(
"session_id");
 
  406   writer.String(
"start_time");
 
  408   writer.String(
"authorization_failure");
 
  410   writer.String(
"status");
 
  412   writer.String(
"user");
 
  414   writer.String(
"impersonator");
 
  422   writer.String(
"statement_type");
 
  423   if (request.stmt_type == TStmtType::DDL) {
 
  424     if (request.catalog_op_request.op_type == TCatalogOpType::DDL) {
 
  426           PrintTDdlType(request.catalog_op_request.ddl_params.ddl_type).c_str());
 
  433   writer.String(
"network_address");
 
  436   writer.String(
"sql_statement");
 
  437   string stmt = replace_all_copy(exec_state.
sql_stmt(), 
"\n", 
" ");
 
  439   writer.String(stmt.c_str());
 
  440   writer.String(
"catalog_objects");
 
  442   BOOST_FOREACH(
const TAccessEvent& event, request.access_events) {
 
  443     writer.StartObject();
 
  444     writer.String(
"name");
 
  445     writer.String(event.name.c_str());
 
  446     writer.String(
"object_type");
 
  448     writer.String(
"privilege");
 
  449     writer.String(event.privilege.c_str());
 
  457     LOG(ERROR) << 
"Unable to record audit event record: " << status.
GetDetail();
 
  458     if (FLAGS_abort_on_failed_audit_event) {
 
  459       LOG(ERROR) << 
"Shutting down Impala Server due to abort_on_failed_audit_event=true";
 
  467   return !FLAGS_audit_event_log_dir.empty();
 
  472     LOG(INFO) << 
"Event logging is disabled";
 
  485   bool log_events = 
true;
 
  487     case TStmtType::QUERY: {
 
  490       if (!status.
ok() && !exec_state.
fetched_rows()) log_events = 
false;
 
  493     case TStmtType::DML: {
 
  494       if (!status.
ok()) log_events = 
false;
 
  497     case TStmtType::DDL: {
 
  501         if (!status.
ok()) log_events = 
false;
 
  504         if (!status.
ok() && !exec_state.
fetched_rows()) log_events = 
false;
 
  508     case TStmtType::EXPLAIN:
 
  509     case TStmtType::LOAD:
 
  525   if (!FLAGS_log_query_to_file) 
return Status::OK;
 
  527   if (FLAGS_profile_log_dir.empty()) {
 
  529     ss << FLAGS_log_dir << 
"/profiles/";
 
  530     FLAGS_profile_log_dir = ss.str();
 
  542     bool base64_encoded, stringstream* output) {
 
  543   DCHECK(output != NULL);
 
  549       if (base64_encoded) {
 
  550         exec_state->second->profile().SerializeToArchiveString(output);
 
  552         exec_state->second->profile().PrettyPrint(output);
 
  561     QueryLogIndex::const_iterator query_record = 
query_log_index_.find(query_id);
 
  564       ss << 
"Query id " << 
PrintId(query_id) << 
" not found.";
 
  567     if (base64_encoded) {
 
  568       (*output) << query_record->second->encoded_profile_str;
 
  570       (*output) << query_record->second->profile_str;
 
  580     if (exec_state != NULL) {
 
  581       lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
 
  582       if (exec_state->coord() != NULL) {
 
  583         lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
 
  584         *result = exec_state->coord()->exec_summary();
 
  593     QueryLogIndex::const_iterator query_record = 
query_log_index_.find(query_id);
 
  596       ss << 
"Query id " << 
PrintId(query_id) << 
" not found.";
 
  599     *result = query_record->second->exec_summary;
 
  616       LOG(ERROR) << 
"Error flushing audit event log: " << status.
GetDetail();
 
  617       if (FLAGS_abort_on_failed_audit_event) {
 
  618         LOG(ERROR) << 
"Shutting down Impala Server due to " 
  619                    << 
"abort_on_failed_audit_event=true";
 
  631       LOG(ERROR) << 
"Error flushing lineage event log: " << status.
GetDetail();
 
  632       if (FLAGS_abort_on_failed_lineage_event) {
 
  633         LOG(ERROR) << 
"Shutting down Impala Server due to " 
  634                    << 
"abort_on_failed_lineage_event=true";
 
  646   if (FLAGS_log_query_to_file) {
 
  651       LOG_EVERY_N(WARNING, 1000) << 
"Could not write to profile log file file (" 
  652                                  << google::COUNTER << 
" attempts failed): " 
  654       LOG_EVERY_N(WARNING, 1000)
 
  655           << 
"Disable query logging with --log_query_to_file=false";
 
  659   if (FLAGS_query_log_size == 0) 
return;
 
  661   if (query.
coord() != NULL) {
 
  662     lock_guard<SpinLock> lock(query.
coord()->GetExecSummaryLock());
 
  670     if (FLAGS_query_log_size > -1 && FLAGS_query_log_size < 
query_log_.size()) {
 
  671       DCHECK_EQ(
query_log_.size() - FLAGS_query_log_size, 1);
 
  681     shared_ptr<SessionState> session_state,
 
  682     shared_ptr<QueryExecState>* exec_state) {
 
  684   bool registered_exec_state;
 
  688   string stmt = replace_all_copy(query_ctx->request.stmt, 
"\n", 
" ");
 
  690   query_ctx->request.__set_redacted_stmt((
const string) stmt);
 
  694   if (!status.
ok() && registered_exec_state) {
 
  701     const TQueryCtx& query_ctx,
 
  702     shared_ptr<SessionState> session_state,
 
  703     bool* registered_exec_state,
 
  704     shared_ptr<QueryExecState>* exec_state) {
 
  705   DCHECK(session_state != NULL);
 
  706   *registered_exec_state = 
false;
 
  708     return Status(
"This Impala server is offline. Please retry your query later.");
 
  711       this, session_state));
 
  713   (*exec_state)->query_events()->MarkEvent(
"Start execution");
 
  728     lock_guard<mutex> l(*(*exec_state)->lock());
 
  734     *registered_exec_state = 
true;
 
  738     (*exec_state)->query_events()->MarkEvent(
"Planning finished");
 
  739     (*exec_state)->summary_profile()->AddEventSequence(
 
  740         result.timeline.name, result.timeline);
 
  741     if (result.__isset.result_set_metadata) {
 
  742       (*exec_state)->set_result_metadata(result.result_set_metadata);
 
  745   VLOG(2) << 
"Execution request: " << ThriftDebugString(result);
 
  749   if (result.stmt_type == TStmtType::DDL) {
 
  756   if ((*exec_state)->coord() != NULL) {
 
  757     const unordered_set<TNetworkAddress>& unique_hosts =
 
  758         (*exec_state)->schedule()->unique_hosts();
 
  759     if (!unique_hosts.empty()) {
 
  761       BOOST_FOREACH(
const TNetworkAddress& port, unique_hosts) {
 
  770   query_ctx->__set_pid(getpid());
 
  778   random_generator uuid_generator;
 
  779   uuid query_uuid = uuid_generator();
 
  784     const shared_ptr<QueryExecState>& exec_state) {
 
  785   lock_guard<mutex> l2(session_state->lock);
 
  788   DCHECK(session_state->ref_count > 0 && !session_state->expired);
 
  790   if (session_state->closed) 
return Status(
"Session has been closed, ignoring query.");
 
  791   const TUniqueId& 
query_id = exec_state->query_id();
 
  799       ss << 
"query id " << 
PrintId(query_id) << 
" already exists";
 
  808     const shared_ptr<QueryExecState>& exec_state) {
 
  809   const TUniqueId& 
query_id = exec_state->query_id();
 
  810   lock_guard<mutex> l(session_state->lock);
 
  813   DCHECK_GT(session_state->ref_count, 0);
 
  814   DCHECK(!session_state->expired);
 
  816   if (session_state->closed) 
return Status(
"Session closed");
 
  818   session_state->inflight_queries.insert(query_id);
 
  820   int32_t timeout_s = exec_state->query_options().query_timeout_s;
 
  821   if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) {
 
  822     timeout_s = min(FLAGS_idle_query_timeout, timeout_s);
 
  825     timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
 
  833         make_pair(
UnixMillis() + (1000L * timeout_s), query_id));
 
  844   shared_ptr<QueryExecState> exec_state;
 
  849       return Status(
"Invalid or unknown query handle");
 
  851       exec_state = entry->second;
 
  860     lock_guard<mutex> l(exec_state->session()->lock);
 
  861     exec_state->session()->inflight_queries.erase(query_id);
 
  864   if (exec_state->coord() != NULL) {
 
  867       lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
 
  868       const TExecSummary& summary = exec_state->coord()->exec_summary();
 
  871     exec_state->summary_profile()->AddInfoString(
"ExecSummary", exec_summary);
 
  873     const unordered_set<TNetworkAddress>& unique_hosts =
 
  874         exec_state->schedule()->unique_hosts();
 
  875     if (!unique_hosts.empty()) {
 
  877       BOOST_FOREACH(
const TNetworkAddress& hostport, unique_hosts) {
 
  884           it->second.erase(exec_state->query_id());
 
  894   TGetDbsResult db_names;
 
  898   BOOST_FOREACH(
const string& db, db_names.dbs) {
 
  899     TGetTablesResult table_names;
 
  911   if (exec_state == NULL) 
return Status(
"Invalid or unknown query handle");
 
  912   lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
 
  913   if (check_inflight) {
 
  914     lock_guard<mutex> l2(exec_state->session()->lock);
 
  915     if (exec_state->session()->inflight_queries.find(query_id) ==
 
  916         exec_state->session()->inflight_queries.end()) {
 
  917       return Status(
"Query not yet running");
 
  921   exec_state->Cancel(cause);
 
  926     bool ignore_if_absent) {
 
  928   shared_ptr<SessionState> session_state;
 
  933       if (ignore_if_absent) {
 
  936         return Status(
"Invalid session ID");
 
  939     session_state = entry->second;
 
  942   DCHECK(session_state != NULL);
 
  943   if (session_state->session_type == TSessionType::BEESWAX) {
 
  948   unordered_set<TUniqueId> inflight_queries;
 
  950     lock_guard<mutex> l(session_state->lock);
 
  951     DCHECK(!session_state->closed);
 
  952     session_state->closed = 
true;
 
  954     inflight_queries.insert(session_state->inflight_queries.begin(),
 
  955         session_state->inflight_queries.end());
 
  958   Status status(
"Session closed");
 
  959   BOOST_FOREACH(
const TUniqueId& 
query_id, inflight_queries) {
 
  966     shared_ptr<SessionState>* session_state, 
bool mark_active) {
 
  970     *session_state = boost::shared_ptr<SessionState>();
 
  971     return Status(
"Invalid session id");
 
  974       lock_guard<mutex> session_lock(i->second->lock);
 
  975       if (i->second->expired) {
 
  977         ss << 
"Client session expired due to more than " << FLAGS_idle_session_timeout
 
  978            << 
"s of inactivity (last activity was at: " 
  982       if (i->second->closed) 
return Status(
"Session is closed");
 
  983       ++i->second->ref_count;
 
  985     *session_state = i->second;
 
  991     TReportExecStatusResult& return_val, 
const TReportExecStatusParams& params) {
 
  992   VLOG_FILE << 
"ReportExecStatus() query_id=" << params.query_id
 
  993             << 
" backend#=" << params.backend_num
 
  994             << 
" instance_id=" << params.fragment_instance_id
 
  995             << 
" done=" << (params.done ? 
"true" : 
"false");
 
 1000   shared_ptr<QueryExecState> exec_state = 
GetQueryExecState(params.query_id, 
false);
 
 1005   if (exec_state.get() == NULL) {
 
 1006     return_val.status.__set_status_code(TErrorCode::INTERNAL_ERROR);
 
 1007     const string& err = Substitute(
"ReportExecStatus(): Received report for unknown " 
 1008         "query ID (probably closed or cancelled). (query_id: $0, backend: $1, instance:" 
 1009         " $2 done: $3)", 
PrintId(params.query_id), params.backend_num,
 
 1010         PrintId(params.fragment_instance_id), params.done);
 
 1011     return_val.status.error_msgs.push_back(err);
 
 1015   exec_state->coord()->UpdateFragmentExecStatus(params).SetTStatus(&return_val);
 
 1019     TTransmitDataResult& return_val, 
const TTransmitDataParams& params) {
 
 1020   VLOG_ROW << 
"TransmitData(): instance_id=" << params.dest_fragment_instance_id
 
 1021            << 
" node_id=" << params.dest_node_id
 
 1022            << 
" #rows=" << params.row_batch.num_rows
 
 1023            << 
"sender_id=" << params.sender_id
 
 1024            << 
" eos=" << (params.eos ? 
"true" : 
"false");
 
 1027   if (params.row_batch.num_rows > 0) {
 
 1029         params.dest_fragment_instance_id, params.dest_node_id, params.row_batch,
 
 1040         params.dest_fragment_instance_id, params.dest_node_id,
 
 1049     LOG(ERROR) << 
"Invalid default query options. Please check -default_query_options.\n" 
 1055   map<string, string> string_map;
 
 1057   map<string, string>::const_iterator itr = string_map.begin();
 
 1058   for (; itr != string_map.end(); ++itr) {
 
 1059     ConfigVariable option;
 
 1060     option.__set_key(itr->first);
 
 1061     option.__set_value(itr->second);
 
 1064   ConfigVariable support_start_over;
 
 1065   support_start_over.__set_key(
"support_start_over");
 
 1066   support_start_over.__set_value(
"false");
 
 1071     TSessionState* state) {
 
 1072   lock_guard<mutex> l(
lock);
 
 1073   state->session_id = session_id;
 
 1087         &cancellation_work.
cause());
 
 1094         &cancellation_work.
cause());
 
 1097                  << 
") did not succeed: " << status.
GetDetail();
 
 1104     return Status(
"Unable to delegate using empty proxy username.");
 
 1105   } 
else if (user.empty()) {
 
 1106     return Status(
"Unable to delegate using empty doAs username.");
 
 1109   stringstream error_msg;
 
 1110   error_msg << 
"User '" << user << 
"' is not authorized to delegate to '" 
 1111             << do_as_user << 
"'.";
 
 1113     error_msg << 
" User delegation is disabled.";
 
 1114     return Status(error_msg.str());
 
 1119   size_t end_idx = min(user.find(
"/"), user.find(
"@"));
 
 1123       end_idx == string::npos || end_idx == 0 ? user : user.substr(0, end_idx));
 
 1127   ProxyUserMap::const_iterator proxy_user =
 
 1130     BOOST_FOREACH(
const string& user, proxy_user->second) {
 
 1131       if (user == 
"*" || user == do_as_user) 
return Status::OK;
 
 1134   return Status(error_msg.str());
 
 1139     vector<TTopicDelta>* subscriber_topic_updates) {
 
 1140   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
 
 1142   if (topic == incoming_topic_deltas.end()) 
return;
 
 1143   const TTopicDelta& delta = topic->second;
 
 1147   if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0)  {
 
 1148     TUpdateCatalogCacheRequest update_req;
 
 1149     update_req.__set_is_delta(delta.is_delta);
 
 1153     BOOST_FOREACH(
const TTopicItem& item, delta.topic_entries) {
 
 1154       uint32_t len = item.value.size();
 
 1155       TCatalogObject catalog_object;
 
 1157           item.value.data()), &len, FLAGS_compact_catalog_topic, &catalog_object);
 
 1159         LOG(ERROR) << 
"Error deserializing item: " << status.
GetDetail();
 
 1162       if (catalog_object.type == TCatalogObjectType::CATALOG) {
 
 1163         update_req.__set_catalog_service_id(catalog_object.catalog.catalog_service_id);
 
 1164         new_catalog_version = catalog_object.catalog_version;
 
 1168       if (catalog_object.type == TCatalogObjectType::FUNCTION) {
 
 1169         DCHECK(catalog_object.__isset.fn);
 
 1172       if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
 
 1173         DCHECK(catalog_object.__isset.data_source);
 
 1177       update_req.updated_objects.push_back(catalog_object);
 
 1184     vector<TCatalogObject> dropped_objects;
 
 1188     BOOST_FOREACH(
const string& key, delta.topic_deletions) {
 
 1189       LOG(INFO) << 
"Catalog topic entry deletion: " << key;
 
 1190       TCatalogObject catalog_object;
 
 1193         LOG(ERROR) << 
"Error parsing catalog topic entry deletion key: " << key << 
" " 
 1197       update_req.removed_objects.push_back(catalog_object);
 
 1198       if (catalog_object.type == TCatalogObjectType::FUNCTION ||
 
 1199           catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
 
 1200         TCatalogObject dropped_object;
 
 1202                 catalog_object, &dropped_object).
ok()) {
 
 1206           if (dropped_object.catalog_version <= new_catalog_version) {
 
 1207             if (catalog_object.type == TCatalogObjectType::FUNCTION ||
 
 1208                 catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
 
 1209               dropped_objects.push_back(dropped_object);
 
 1218     TUpdateCatalogCacheResponse resp;
 
 1221       LOG(ERROR) << 
"There was an error processing the impalad catalog update. Requesting" 
 1222                  << 
" a full topic update to recover: " << s.
GetDetail();
 
 1223       subscriber_topic_updates->push_back(TTopicDelta());
 
 1224       TTopicDelta& update = subscriber_topic_updates->back();
 
 1226       update.__set_from_version(0L);
 
 1242       BOOST_FOREACH(TCatalogObject& 
object, dropped_objects) {
 
 1243         if (
object.type == TCatalogObjectType::FUNCTION) {
 
 1245         } 
else if (
object.type == TCatalogObjectType::DATA_SOURCE) {
 
 1263     const TCatalogUpdateResult& catalog_update_result, 
bool wait_for_all_subscribers) {
 
 1267   if ((catalog_update_result.__isset.updated_catalog_object ||
 
 1268       catalog_update_result.__isset.removed_catalog_object)) {
 
 1269     TUpdateCatalogCacheRequest update_req;
 
 1270     update_req.__set_is_delta(
true);
 
 1271     update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
 
 1273     if (catalog_update_result.__isset.updated_catalog_object) {
 
 1274       update_req.updated_objects.push_back(catalog_update_result.updated_catalog_object);
 
 1276     if (catalog_update_result.__isset.removed_catalog_object) {
 
 1277       update_req.removed_objects.push_back(catalog_update_result.removed_catalog_object);
 
 1280     TUpdateCatalogCacheResponse resp;
 
 1282     if (!status.
ok()) LOG(ERROR) << status.
GetDetail();
 
 1284     if (!wait_for_all_subscribers) 
return Status::OK;
 
 1288   int64_t min_req_catalog_version = catalog_update_result.version;
 
 1289   const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
 
 1293   VLOG_QUERY << 
"Waiting for catalog version: " << min_req_catalog_version
 
 1300   if (!wait_for_all_subscribers) 
return Status::OK;
 
 1307   VLOG_QUERY << 
"Waiting for min subscriber topic version: " 
 1308              << min_req_subscriber_topic_version << 
" current version: " 
 1310   while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
 
 1319     vector<TTopicDelta>* subscriber_topic_updates) {
 
 1322   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
 
 1325   if (topic != incoming_topic_deltas.end()) {
 
 1326     const TTopicDelta& delta = topic->second;
 
 1332     BOOST_FOREACH(
const TTopicItem& item, delta.topic_entries) {
 
 1333       uint32_t len = item.value.size();
 
 1334       TBackendDescriptor backend_descriptor;
 
 1336           item.value.data()), &len, 
false, &backend_descriptor);
 
 1338         VLOG(2) << 
"Error deserializing topic item with key: " << item.key;
 
 1342       known_backends_.insert(make_pair(item.key, backend_descriptor.address));
 
 1345     BOOST_FOREACH(
const string& backend_id, delta.topic_deletions) {
 
 1351     set<TNetworkAddress> current_membership;
 
 1352     BOOST_FOREACH(
const BackendAddressMap::value_type& backend, 
known_backends_) {
 
 1353       current_membership.insert(backend.second);
 
 1358     map<TUniqueId, vector<TNetworkAddress> > queries_to_cancel;
 
 1366         if (current_membership.find(loc_entry->first) == current_membership.end()) {
 
 1367           unordered_set<TUniqueId>::const_iterator 
query_id = loc_entry->second.begin();
 
 1369           for(; query_id != loc_entry->second.end(); ++
query_id) {
 
 1370             vector<TNetworkAddress>& failed_hosts = queries_to_cancel[*
query_id];
 
 1371             failed_hosts.push_back(loc_entry->first);
 
 1377           QueryLocations::const_iterator failed_backend = loc_entry;
 
 1390       LOG_EVERY_N(WARNING, 60) << 
"Cancellation queue is full";
 
 1395       map<TUniqueId, vector<TNetworkAddress> >::iterator cancellation_entry;
 
 1396       for (cancellation_entry = queries_to_cancel.begin();
 
 1397           cancellation_entry != queries_to_cancel.end();
 
 1398           ++cancellation_entry) {
 
 1399         stringstream cause_msg;
 
 1400         cause_msg << 
"Cancelled due to unreachable impalad(s): ";
 
 1401         for (
int i = 0; i < cancellation_entry->second.size(); ++i) {
 
 1402           cause_msg << cancellation_entry->second[i];
 
 1403           if (i + 1 != cancellation_entry->second.size()) cause_msg << 
", ";
 
 1413     bool copy_profile, 
const string& encoded_profile) {
 
 1414   id = exec_state.query_id();
 
 1415   const TExecRequest& request = exec_state.exec_request();
 
 1417   const string* plan_str = exec_state.summary_profile().GetInfoString(
"Plan");
 
 1418   if (plan_str != NULL) plan = *plan_str;
 
 1419   stmt = exec_state.sql_stmt();
 
 1420   stmt_type = request.stmt_type;
 
 1421   effective_user = exec_state.effective_user();
 
 1422   default_db = exec_state.default_db();
 
 1423   start_time = exec_state.start_time();
 
 1424   end_time = exec_state.end_time();
 
 1427   Coordinator* coord = exec_state.coord();
 
 1428   if (coord != NULL) {
 
 1429     num_complete_fragments = coord->progress().num_complete();
 
 1430     total_fragments = coord->progress().total();
 
 1433   query_state = exec_state.query_state();
 
 1434   num_rows_fetched = exec_state.num_rows_fetched();
 
 1435   query_status = exec_state.query_status();
 
 1437   exec_state.query_events()->ToThrift(&event_sequence);
 
 1441     exec_state.profile().PrettyPrint(&ss);
 
 1442     profile_str = ss.str();
 
 1443     if (encoded_profile.empty()) {
 
 1444       encoded_profile_str = exec_state.profile().SerializeToArchiveString();
 
 1446       encoded_profile_str = encoded_profile;
 
 1451   fragments = exec_state.exec_request().query_exec_request.fragments;
 
 1465     const TUniqueId& session_id = connection_context.
connection_id;
 
 1466     shared_ptr<SessionState> session_state;
 
 1468     session_state->closed = 
false;
 
 1470     session_state->last_accessed_ms = 
UnixMillis();
 
 1471     session_state->database = 
"default";
 
 1472     session_state->session_type = TSessionType::BEESWAX;
 
 1476     if (!connection_context.
username.empty()) {
 
 1477       session_state->connected_user = connection_context.
username;
 
 1498   ConnectionToSessionMap::iterator it =
 
 1504   LOG(INFO) << 
"Connection from client " << connection_context.
network_address 
 1505             << 
" closed, closing " << it->second.size() << 
" associated session(s)";
 
 1507   BOOST_FOREACH(
const TUniqueId& session_id, it->second) {
 
 1510       LOG(WARNING) << 
"Error closing session " << session_id << 
": " 
 1521     SleepForMs(FLAGS_idle_session_timeout * 500);
 
 1524     VLOG(3) << 
"Session expiration thread waking up";
 
 1528       unordered_set<TUniqueId> inflight_queries;
 
 1530         lock_guard<mutex> l(session_state.second->lock);
 
 1531         if (session_state.second->ref_count > 0) 
continue;
 
 1534         if (session_state.second->closed || session_state.second->expired) 
continue;
 
 1535         int64_t last_accessed_ms = session_state.second->last_accessed_ms;
 
 1536         if (now - last_accessed_ms <= (FLAGS_idle_session_timeout * 1000)) 
continue;
 
 1537         LOG(INFO) << 
"Expiring session: " << session_state.first << 
", user:" 
 1538                   << session_state.second->connected_user << 
", last active: " 
 1540         session_state.second->expired = 
true;
 
 1543         inflight_queries.insert(session_state.second->inflight_queries.begin(),
 
 1544             session_state.second->inflight_queries.end());
 
 1547       Status status(
"Session expired due to inactivity");
 
 1548       BOOST_FOREACH(
const TUniqueId& 
query_id, inflight_queries) {
 
 1581         if (expiration_event->first > now) 
break;
 
 1582         shared_ptr<QueryExecState> query_state =
 
 1584         if (query_state.get() == NULL) {
 
 1591         int32_t timeout_s = query_state->query_options().query_timeout_s;
 
 1592         if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) {
 
 1593           timeout_s = min(FLAGS_idle_query_timeout, timeout_s);
 
 1596           timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
 
 1598         int64_t expiration = query_state->last_active() + (timeout_s * 1000L);
 
 1599         if (now < expiration) {
 
 1602           if (expiration == expiration_event->first) {
 
 1609             TUniqueId 
query_id = expiration_event->second;
 
 1613         } 
else if (!query_state->is_active()) {
 
 1615           VLOG_QUERY << 
"Expiring query due to client inactivity: " 
 1616                      << expiration_event->second << 
", last activity was at: " 
 1618           const string& err_msg = Substitute(
 
 1619               "Query $0 expired due to client inactivity (timeout is $1)",
 
 1620               PrintId(expiration_event->second),
 
 1644   DCHECK((beeswax_port == 0) == (beeswax_server == NULL));
 
 1645   DCHECK((hs2_port == 0) == (hs2_server == NULL));
 
 1646   DCHECK((be_port == 0) == (be_server == NULL));
 
 1648   shared_ptr<ImpalaServer> handler(
new ImpalaServer(exec_env));
 
 1650   if (beeswax_port != 0 && beeswax_server != NULL) {
 
 1653     shared_ptr<TProcessor> beeswax_processor(
new ImpalaServiceProcessor(handler));
 
 1654     shared_ptr<TProcessorEventHandler> event_handler(
 
 1656     beeswax_processor->setEventHandler(event_handler);
 
 1662     if (!FLAGS_ssl_server_certificate.empty()) {
 
 1663       LOG(INFO) << 
"Enabling SSL for Beeswax";
 
 1665               FLAGS_ssl_server_certificate, FLAGS_ssl_private_key));
 
 1668     LOG(INFO) << 
"Impala Beeswax Service listening on " << beeswax_port;
 
 1671   if (hs2_port != 0 && hs2_server != NULL) {
 
 1673     shared_ptr<TProcessor> hs2_fe_processor(
 
 1674         new ImpalaHiveServer2ServiceProcessor(handler));
 
 1675     shared_ptr<TProcessorEventHandler> event_handler(
 
 1677     hs2_fe_processor->setEventHandler(event_handler);
 
 1684     if (!FLAGS_ssl_server_certificate.empty()) {
 
 1685       LOG(INFO) << 
"Enabling SSL for HiveServer2";
 
 1687               FLAGS_ssl_server_certificate, FLAGS_ssl_private_key));
 
 1690     LOG(INFO) << 
"Impala HiveServer2 Service listening on " << hs2_port;
 
 1693   if (be_port != 0 && be_server != NULL) {
 
 1694     shared_ptr<FragmentMgr> fragment_mgr(
new FragmentMgr());
 
 1695     shared_ptr<ImpalaInternalService> thrift_if(
 
 1697     shared_ptr<TProcessor> be_processor(
new ImpalaInternalServiceProcessor(thrift_if));
 
 1698     shared_ptr<TProcessorEventHandler> event_handler(
 
 1700     be_processor->setEventHandler(event_handler);
 
 1702     *be_server = 
new ThriftServer(
"backend", be_processor, be_port, NULL,
 
 1703         exec_env->
metrics(), FLAGS_be_service_threads);
 
 1705     LOG(INFO) << 
"ImpalaInternalService listening on " << be_port;
 
 1707   if (impala_server != NULL) *impala_server = handler.get();
 
 1713     TUniqueId* session_id) {
 
 1714   DCHECK(session_id != NULL);
 
 1720     *session_id = i->second->session_id();
 
 1726     const TUniqueId& 
query_id, 
bool lock) {
 
 1730     return shared_ptr<QueryExecState>();
 
 1732     if (lock) i->second->lock()->lock();
 
 1744   DCHECK(FLAGS_enable_rm);
 
 1745   if (FLAGS_local_nodemanager_url.empty()) {
 
 1746     LOG(WARNING) << 
"No NM address set (--nm_addr is empty), no NM failure detection " 
 1747                  << 
"thread started";
 
 1752   if (istarts_with(FLAGS_local_nodemanager_url, 
"http://")) {
 
 1753     FLAGS_local_nodemanager_url =
 
 1754         FLAGS_local_nodemanager_url.substr(
string(
"http://").size());
 
 1755   } 
else if (istarts_with(FLAGS_local_nodemanager_url, 
"https://")) {
 
 1756     FLAGS_local_nodemanager_url =
 
 1757         FLAGS_local_nodemanager_url.substr(
string(
"https://").size());
 
 1759   vector<string> components;
 
 1760   split(components, FLAGS_local_nodemanager_url, is_any_of(
":"));
 
 1761   if (components.size() < 2) {
 
 1762     LOG(ERROR) << 
"Could not parse network address from --local_nodemanager_url, no NM" 
 1763                << 
" failure detection thread started";
 
 1766   DCHECK_GE(components.size(), 2);
 
 1767   TNetworkAddress nm_addr =
 
 1772   struct addrinfo* addr;
 
 1773   if (getaddrinfo(nm_addr.hostname.c_str(), components[1].c_str(), NULL, &addr)) {
 
 1774     LOG(WARNING) << 
"Could not resolve NM address: " << nm_addr << 
". Error was: " 
 1778   LOG(INFO) << 
"Starting NM failure-detection thread, NM at: " << nm_addr;
 
 1781   bool last_failure_state = 
false;
 
 1783     int sockfd = socket(AF_INET, SOCK_STREAM, 0);
 
 1785       if (connect(sockfd, addr->ai_addr, 
sizeof(sockaddr)) < 0) {
 
 1792       LOG(ERROR) << 
"Could not create socket! Error was: " << 
GetStrErrMsg();
 
 1794     bool is_failed = (failure_detector.
GetPeerState(FLAGS_local_nodemanager_url) ==
 
 1796     if (is_failed != last_failure_state) {
 
 1799             "ImpalaServer is going offline while local node-manager connectivity is bad";
 
 1802             "Node-manager connectivity has been restored. ImpalaServer is now online";
 
 1806     last_failure_state = is_failed;
 
Status InitProfileLogging()
 
bool operator==(const CancellationWork &other) const 
 
void TransmitData(TTransmitDataResult &return_val, const TTransmitDataParams ¶ms)
 
boost::scoped_ptr< ThreadPool< CancellationWork > > cancellation_thread_pool_
 
boost::scoped_ptr< Thread > audit_event_logger_flush_thread_
If audit event logging is enabled, wakes once every 5s to flush audit events to disk. 
 
boost::mutex query_expiration_lock_
Guards queries_by_timestamp_. Must not be acquired before a session state lock. 
 
void SetImpalaServer(ImpalaServer *server)
 
void TQueryOptionsToMap(const TQueryOptions &query_options, std::map< std::string, std::string > *configuration)
Converts a TQueryOptions struct into a map of key, value pairs. 
 
static IntGauge * IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS
 
boost::mutex query_exec_state_map_lock_
 
Coordinator * coord() const 
 
const std::string GetDetail() const 
 
const TExecRequest & exec_request() const 
 
void MembershipCallback(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
 
void ToThrift(const TUniqueId &session_id, TSessionState *session_state)
 
void SetOffline(bool offline)
Set is_offline_ to the argument's value. 
 
boost::mutex session_state_map_lock_
 
TODO: Consider allowing fragment IDs as category parameters. 
 
static void CreateMetrics(MetricGroup *m)
 
std::string SerializeToArchiveString() const 
 
ProxyUserMap authorized_proxy_user_config_
 
boost::mutex query_log_lock_
Guards query_log_ and query_log_index_. 
 
DEFINE_int64(max_result_cache_size, 100000L,"Maximum number of query results a client ""may request to be cached on a per-query basis to support restarting fetches. This ""option guards against unreasonably large result caches requested by clients. ""Requests exceeding this maximum will be rejected.")
 
boost::mutex connection_to_sessions_map_lock_
Protects connection_to_sessions_map_. May be taken before session_state_map_lock_. 
 
const string AUDIT_EVENT_LOG_FILE_PREFIX
 
const TUniqueId & query_id() const 
 
Status CancelInternal(const TUniqueId &query_id, bool check_inflight, const Status *cause=NULL)
 
const string PROFILE_LOG_FILE_PREFIX
 
int64_t min_subscriber_catalog_topic_version_
 
#define RETURN_IF_ERROR(stmt)
some generally useful macros 
 
void Redact(string *value, bool *changed)
 
void CancelFromThreadPool(uint32_t thread_id, const CancellationWork &cancellation_work)
 
const TUniqueId & session_id() const 
 
TUniqueId catalog_service_id
The CatalogService ID that this catalog version is from. 
 
MetricGroup * GetChildGroup(const std::string &name)
Creates or returns an already existing child metric group. 
 
void LogQueryEvents(const QueryExecState &exec_state)
Log audit and column lineage events. 
 
void RegisterWebserverCallbacks(Webserver *webserver)
Registers all the per-Impalad webserver callbacks. 
 
Status CloseSender(const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int sender_id)
 
const bool unregister() const 
 
const std::string & connected_user() const 
 
bool fetched_rows() const 
 
Status RegisterQuery(boost::shared_ptr< SessionState > session_state, const boost::shared_ptr< QueryExecState > &exec_state)
 
int64_t catalog_topic_version
The statestore catalog topic version this update was received in. 
 
TNetworkAddress network_address
 
std::string PrintTCatalogOpType(const TCatalogOpType::type &type)
 
static std::string IMPALA_CATALOG_TOPIC
 
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
 
DECLARE_bool(abort_on_config_error)
 
boost::scoped_ptr< Thread > query_expiration_thread_
Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set. 
 
virtual PeerState GetPeerState(const std::string &peer)
Returns the current estimated state of a peer. 
 
string PrintId(const TUniqueId &id, const string &separator)
 
static IntGauge * CATALOG_NUM_TABLES
 
static void PrepareQueryContext(TQueryCtx *query_ctx)
 
DEFINE_int32(beeswax_port, 21000,"port on which Beeswax client requests are served")
 
boost::condition_variable catalog_version_update_cv_
Variable to signal when the catalog version has been modified. 
 
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds. 
 
void InitializeConfigVariables()
 
std::string PrintTCatalogObjectType(const TCatalogObjectType::type &type)
 
bool operator<(const CancellationWork &other) const 
 
Status Execute(TQueryCtx *query_ctx, boost::shared_ptr< SessionState > session_state, boost::shared_ptr< QueryExecState > *exec_state)
 
bool IsAuditEventLoggingEnabled()
Returns true if audit event logging is enabled, false otherwise. 
 
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
 
bool IsLineageLoggingEnabled()
Returns true if lineage logging is enabled, false otherwise. 
 
Status GetCatalogObject(const TCatalogObject &request, TCatalogObject *response)
 
Status CloseSessionInternal(const TUniqueId &session_id, bool ignore_if_absent)
 
QueryStateRecord()
Default constructor used only when participating in collections. 
 
boost::mutex query_locations_lock_
 
TSessionType::type session_type
 
ImpalaServer(ExecEnv *exec_env)
 
ExpirationQueue queries_by_timestamp_
 
Status InitAuditEventLogging()
 
bool operator()(const QueryStateRecord &lhs, const QueryStateRecord &rhs) const 
Comparator that sorts by start time. 
 
const std::string & effective_user() const 
 
boost::scoped_ptr< Thread > session_timeout_thread_
Thread that runs ExpireSessions if FLAGS_idle_session_timeout > 0. 
 
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
 
void RemoveEntry(const std::string &hdfs_lib_file)
Removes the cache entry for 'hdfs_lib_file'. 
 
static BooleanProperty * CATALOG_READY
 
Status ProcessCatalogUpdateResult(const TCatalogUpdateResult &catalog_update_result, bool wait_for_all_subscribers)
 
static IntCounter * IMPALA_SERVER_NUM_QUERIES
 
boost::scoped_ptr< SimpleLogger > lineage_logger_
 
static const std::string IMPALA_MEMBERSHIP_TOPIC
 
CancellationWork(const TUniqueId &query_id, const Status &cause, bool unregister)
 
const string BEESWAX_SERVER_NAME
 
const string LINEAGE_LOG_FILE_PREFIX
 
QueryLocations query_locations_
 
boost::mutex is_offline_lock_
Protects is_offline_. 
 
static IntCounter * NUM_SESSIONS_EXPIRED
 
virtual void GetExecSummary(impala::TExecSummary &result, const beeswax::QueryHandle &query_id)
 
Status GetSessionState(const TUniqueId &session_id, boost::shared_ptr< SessionState > *session_state, bool mark_active=false)
 
void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap &topic_deltas, std::vector< TTopicDelta > *topic_updates)
 
DEFINE_string(default_query_options,"","key=value pair of default query options for"" impalad, separated by ','")
 
ConnectionToSessionMap connection_to_sessions_map_
 
virtual void ConnectionStart(const ThriftServer::ConnectionContext &session_context)
SessionHandlerIf methods. 
 
bool GetSessionIdForQuery(const TUniqueId &query_id, TUniqueId *session_id)
 
Per-connection information. 
 
static bool IsAuthorizationError(const Status &status)
Returns true if the error returned by the FE was due to an AuthorizationException. 
 
std::string DebugString(const T &val)
 
const uint32_t MAX_CANCELLATION_QUEUE_SIZE
 
SessionStateMap session_state_map_
 
boost::scoped_ptr< SimpleLogger > audit_event_logger_
 
boost::scoped_ptr< SimpleLogger > profile_logger_
 
Status GetRuntimeProfileStr(const TUniqueId &query_id, bool base64_encoded, std::stringstream *output)
 
Status ParseQueryOptions(const std::string &options, TQueryOptions *query_options)
 
#define EXIT_IF_ERROR(stmt)
 
std::string PrintTStmtType(const TStmtType::type &type)
 
TExecSummary exec_summary
Summary of execution for this query. 
 
std::vector< beeswax::ConfigVariable > default_configs_
 
const TUniqueId & query_id() const 
 
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details. 
 
static LibCache * instance()
 
std::string connected_user
Connected user for this session, i.e. the user which originated this session. 
 
const TimestampValue & start_time() const 
 
void SetTStatus(T *status_container) const 
 
QueryLogIndex query_log_index_
 
Status InitLineageLogging()
 
static IntCounter * NUM_QUERIES_EXPIRED
 
const TUniqueId & query_id() const 
 
std::string PrintExecSummary(const TExecSummary &exec_summary)
Print the exec summary as a formatted table. 
 
CatalogUpdateVersionInfo catalog_update_info_
The version information from the last successfull call to UpdateCatalog(). 
 
boost::shared_ptr< QueryExecState > GetQueryExecState(const TUniqueId &query_id, bool lock)
 
Snapshot of a query's state, archived in the query log. 
 
ImpalaServer::SessionState * session() const 
 
static BooleanProperty * IMPALA_SERVER_READY
 
Status GetDbNames(const std::string *pattern, const TSessionState *session, TGetDbsResult *table_names)
 
void ReportExecStatus(TReportExecStatusResult &return_val, const TReportExecStatusParams ¶ms)
 
Status SetQueryInflight(boost::shared_ptr< SessionState > session_state, const boost::shared_ptr< QueryExecState > &exec_state)
 
static AuthManager * GetInstance()
 
QueryExecStateMap query_exec_state_map_
 
bool is_offline_
True if Impala server is offline, false otherwise. 
 
Status TCatalogObjectFromEntryKey(const string &key, TCatalogObject *catalog_object)
 
void LineageLoggerFlushThread()
Runs once every 5s to flush the lineage log file to disk. 
 
static IntGauge * IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS
 
Status CreateImpalaServer(ExecEnv *exec_env, int beeswax_port, int hs2_port, int be_port, ThriftServer **beeswax_server, ThriftServer **hs2_server, ThriftServer **be_server, ImpalaServer **impala_server)
 
std::string database
The default database (changed as a result of 'use' query execution) 
 
std::string PrintTDdlType(const TDdlType::type &type)
 
static StringProperty * IMPALA_SERVER_START_TIME
 
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
 
const string HS2_SERVER_NAME
 
void DropCache()
Removes all cached entries. 
 
Status AddData(const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, const TRowBatch &thrift_batch, int sender_id)
 
const std::string & do_as_user() const 
 
void SetNeedsRefresh(const std::string &hdfs_lib_file)
 
Status UpdateCatalogMetrics()
Updates the number of databases / tables metrics from the FE catalog. 
 
TStmtType::type stmt_type() const 
 
const TExecSummary & exec_summary() const 
 
const std::string & sql_stmt() const 
 
const Status & query_status() const 
 
bool IsOffline()
Returns true if Impala is offline (and not accepting queries), false otherwise. 
 
Status LogLineageRecord(const TExecRequest &request)
 
int64_t catalog_version
The last catalog version returned from UpdateCatalog() 
 
boost::scoped_ptr< Thread > nm_failure_detection_thread_
Container thread for DetectNmFailures(). 
 
boost::scoped_ptr< Thread > profile_log_file_flush_thread_
If profile logging is enabled, wakes once every 5s to flush query profiles to disk. 
 
DataStreamMgr * stream_mgr()
 
Status DeserializeThriftMsg(JNIEnv *env, jbyteArray serialized_msg, T *deserialized_msg)
 
void ArchiveQuery(const QueryExecState &query)
 
TQueryOptions default_query_options_
Default query options in the form of TQueryOptions and beeswax::ConfigVariable. 
 
ExecEnv * exec_env_
global, per-server state 
 
virtual PeerState UpdateHeartbeat(const std::string &peer, bool seen)
 
BackendAddressMap known_backends_
 
void CloseConnections(const TNetworkAddress &address)
 
Status GetTableNames(const std::string &db, const std::string *pattern, const TSessionState *session, TGetTablesResult *table_names)
 
StatestoreSubscriber * subscriber()
 
const RuntimeProfile & profile() const 
 
Status ValidateSettings()
Validate Hadoop config; requires FE. 
 
void SetConnectionHandler(ConnectionHandlerIf *connection)
 
TNetworkAddress network_address
Client network address. 
 
Status UpdateCatalogCache(const TUpdateCatalogCacheRequest &req, TUpdateCatalogCacheResponse *resp)
 
virtual void close(const beeswax::QueryHandle &handle)
 
TimestampValue start_time
Start and end time of the query. 
 
Status GetExecRequest(const TQueryCtx &query_ctx, TExecRequest *result)
Call FE to get TExecRequest. 
 
std::string do_as_user
The user to delegate to. Empty for no delegation. 
 
const int MAX_NM_MISSED_HEARTBEATS
 
boost::scoped_ptr< Thread > lineage_logger_flush_thread_
If lineage logging is enabled, wakes once every 5s to flush lineage events to disk. 
 
virtual void ConnectionEnd(const ThriftServer::ConnectionContext &session_context)
 
void AuditEventLoggerFlushThread()
Runs once every 5s to flush the audit log file to disk. 
 
TCatalogOpType::type catalog_op_type() const 
 
boost::mutex catalog_version_lock_
 
static IntGauge * CATALOG_NUM_DBS
 
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
 
Status ExecuteInternal(const TQueryCtx &query_ctx, boost::shared_ptr< SessionState > session_state, bool *registered_exec_state, boost::shared_ptr< QueryExecState > *exec_state)
Implements Execute() logic, but doesn't unregister query on error. 
 
const Status & cause() const 
 
void LogFileFlushThread()
Runs once every 5s to flush the profile log file to disk. 
 
static TimestampValue LocalTime()
 
DEFINE_bool(log_query_to_file, true,"if true, logs completed query profiles to file.")
 
Status AuthorizeProxyUser(const std::string &user, const std::string &do_as_user)
 
std::string DebugString() const 
 
ImpalaInternalServiceClientCache * impalad_client_cache()
 
Status UnregisterQuery(const TUniqueId &query_id, bool check_inflight, const Status *cause=NULL)
 
Status LogAuditRecord(const QueryExecState &exec_state, const TExecRequest &request)