Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala-server.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "service/impala-server.h"
16 
17 #include <algorithm>
18 #include <exception>
19 #include <boost/algorithm/string/join.hpp>
20 #include <boost/algorithm/string/replace.hpp>
21 #include <boost/filesystem.hpp>
22 #include <boost/date_time/posix_time/posix_time_types.hpp>
23 #include <boost/unordered_set.hpp>
24 #include <boost/foreach.hpp>
25 #include <boost/bind.hpp>
26 #include <boost/algorithm/string.hpp>
27 #include <boost/lexical_cast.hpp>
28 #include <google/malloc_extension.h>
29 #include <gutil/strings/substitute.h>
30 #include <openssl/evp.h>
31 #include <openssl/err.h>
32 #include <rapidjson/rapidjson.h>
33 #include <rapidjson/stringbuffer.h>
34 #include <rapidjson/writer.h>
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <netdb.h>
38 #include <unistd.h>
39 
40 #include "catalog/catalog-server.h"
41 #include "catalog/catalog-util.h"
42 #include "common/logging.h"
43 #include "common/version.h"
44 #include "rpc/authentication.h"
45 #include "rpc/thrift-util.h"
46 #include "rpc/thrift-thread.h"
47 #include "rpc/rpc-trace.h"
48 #include "runtime/client-cache.h"
50 #include "runtime/exec-env.h"
51 #include "runtime/lib-cache.h"
53 #include "runtime/tmp-file-mgr.h"
57 #include "service/query-options.h"
59 #include "util/bit-util.h"
60 #include "util/cgroups-mgr.h"
61 #include "util/container-util.h"
62 #include "util/debug-util.h"
63 #include "util/error-util.h"
64 #include "util/impalad-metrics.h"
65 #include "util/network-util.h"
66 #include "util/parse-util.h"
67 #include "util/redactor.h"
68 #include "util/string-parser.h"
69 #include "util/summary-util.h"
70 #include "util/uid-util.h"
71 
72 #include "gen-cpp/Types_types.h"
73 #include "gen-cpp/ImpalaService.h"
74 #include "gen-cpp/DataSinks_types.h"
75 #include "gen-cpp/ImpalaService_types.h"
76 #include "gen-cpp/ImpalaInternalService.h"
77 
78 #include "common/names.h"
79 
80 using boost::adopt_lock_t;
81 using boost::algorithm::is_any_of;
82 using boost::algorithm::istarts_with;
83 using boost::algorithm::replace_all_copy;
84 using boost::algorithm::split;
85 using boost::algorithm::token_compress_on;
86 using boost::uuids::random_generator;
87 using boost::uuids::uuid;
88 using namespace apache::thrift;
89 using namespace beeswax;
90 using namespace strings;
91 
92 DECLARE_int32(be_port);
93 DECLARE_string(nn);
94 DECLARE_int32(nn_port);
95 DECLARE_string(authorized_proxy_user_config);
96 DECLARE_bool(abort_on_config_error);
97 DECLARE_bool(disk_spill_encryption);
98 
99 DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served");
100 DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are served");
101 
102 DEFINE_int32(fe_service_threads, 64,
103  "number of threads available to serve client requests");
104 DEFINE_int32(be_service_threads, 64,
105  "(Advanced) number of threads available to serve backend execution requests");
106 DEFINE_string(default_query_options, "", "key=value pair of default query options for"
107  " impalad, separated by ','");
108 DEFINE_int32(query_log_size, 25, "Number of queries to retain in the query log. If -1, "
109  "the query log has unbounded size.");
110 DEFINE_bool(log_query_to_file, true, "if true, logs completed query profiles to file.");
111 
112 DEFINE_int64(max_result_cache_size, 100000L, "Maximum number of query results a client "
113  "may request to be cached on a per-query basis to support restarting fetches. This "
114  "option guards against unreasonably large result caches requested by clients. "
115  "Requests exceeding this maximum will be rejected.");
116 
117 DEFINE_int32(max_audit_event_log_file_size, 5000, "The maximum size (in queries) of the "
118  "audit event log file before a new one is created (if event logging is enabled)");
119 DEFINE_string(audit_event_log_dir, "", "The directory in which audit event log files are "
120  "written. Setting this flag will enable audit event logging.");
121 DEFINE_bool(abort_on_failed_audit_event, true, "Shutdown Impala if there is a problem "
122  "recording an audit event.");
123 
124 DEFINE_int32(max_lineage_log_file_size, 5000, "The maximum size (in queries) of "
125  "the lineage event log file before a new one is created (if lineage logging is "
126  "enabled)");
127 DEFINE_string(lineage_event_log_dir, "", "The directory in which lineage event log "
128  "files are written. Setting this flag with enable lineage logging.");
129 DEFINE_bool(abort_on_failed_lineage_event, true, "Shutdown Impala if there is a problem "
130  "recording a lineage record.");
131 
132 DEFINE_string(profile_log_dir, "", "The directory in which profile log files are"
133  " written. If blank, defaults to <log_file_dir>/profiles");
134 DEFINE_int32(max_profile_log_file_size, 5000, "The maximum size (in queries) of the "
135  "profile log file before a new one is created");
136 
137 DEFINE_int32(cancellation_thread_pool_size, 5,
138  "(Advanced) Size of the thread-pool processing cancellations due to node failure");
139 
140 DEFINE_string(ssl_server_certificate, "", "The full path to the SSL certificate file used"
141  " to authenticate Impala to clients. If set, both Beeswax and HiveServer2 ports will "
142  "only accept SSL connections");
143 DEFINE_string(ssl_private_key, "", "The full path to the private key used as a "
144  "counterpart to the public key contained in --ssl_server_certificate. If "
145  "--ssl_server_certificate is set, this option must be set as well.");
146 DEFINE_string(ssl_client_ca_certificate, "", "(Advanced) The full path to a certificate "
147  "used by Thrift clients to check the validity of a server certificate. May either be "
148  "a certificate for a third-party Certificate Authority, or a copy of the certificate "
149  "the client expects to receive from the server.");
150 
151 DEFINE_int32(idle_session_timeout, 0, "The time, in seconds, that a session may be idle"
152  " for before it is closed (and all running queries cancelled) by Impala. If 0, idle"
153  " sessions are never expired.");
154 DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be idle for"
155  " (i.e. no processing work is done and no updates are received from the client) "
156  "before it is cancelled. If 0, idle queries are never expired. The query option "
157  "QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
158  " the maximum allowable timeout.");
159 
160 DEFINE_string(local_nodemanager_url, "", "The URL of the local Yarn Node Manager's HTTP "
161  "interface, used to detect if the Node Manager fails");
162 DECLARE_bool(enable_rm);
163 DECLARE_bool(compact_catalog_topic);
164 
165 namespace impala {
166 
167 // Prefix of profile, event and lineage log filenames. The version number is
168 // internal, and does not correspond to an Impala release - it should
169 // be changed only when the file format changes.
170 //
171 // In the 1.0 version of the profile log, the timestamp at the beginning of each entry
172 // was relative to the local time zone. In log version 1.1, this was changed to be
173 // relative to UTC. The same time zone change was made for the audit log, but the
174 // version was kept at 1.0 because there is no known consumer of the timestamp.
175 const string PROFILE_LOG_FILE_PREFIX = "impala_profile_log_1.1-";
176 const string AUDIT_EVENT_LOG_FILE_PREFIX = "impala_audit_event_log_1.0-";
177 const string LINEAGE_LOG_FILE_PREFIX = "impala_lineage_log_1.0-";
178 
179 const uint32_t MAX_CANCELLATION_QUEUE_SIZE = 65536;
180 
181 const string BEESWAX_SERVER_NAME = "beeswax-frontend";
182 const string HS2_SERVER_NAME = "hiveserver2-frontend";
183 
184 const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
185 const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
186 const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
187 const int ImpalaServer::ASCII_PRECISION = 16; // print 16 digits for double/float
188 
190 
191 // Work item for ImpalaServer::cancellation_thread_pool_.
193  public:
194  CancellationWork(const TUniqueId& query_id, const Status& cause, bool unregister)
195  : query_id_(query_id), cause_(cause), unregister_(unregister) {
196  }
197 
199  }
200 
201  const TUniqueId& query_id() const { return query_id_; }
202  const Status& cause() const { return cause_; }
203  const bool unregister() const { return unregister_; }
204 
205  bool operator<(const CancellationWork& other) const {
206  return query_id_ < other.query_id_;
207  }
208 
209  bool operator==(const CancellationWork& other) const {
210  return query_id_ == other.query_id_;
211  }
212 
213  private:
214  // Id of query to be canceled.
215  TUniqueId query_id_;
216 
217  // Error status containing a list of failed impalads causing the cancellation.
219 
220  // If true, unregister the query rather than cancelling it. Calling UnregisterQuery()
221  // does call CancelInternal eventually, but also ensures that the query is torn down and
222  // archived.
224 };
225 
226 ImpalaServer::ImpalaServer(ExecEnv* exec_env)
227  : exec_env_(exec_env) {
228  // Initialize default config
230 
231  Status status = exec_env_->frontend()->ValidateSettings();
232  if (!status.ok()) {
233  LOG(ERROR) << status.GetDetail();
234  if (FLAGS_abort_on_config_error) {
235  LOG(ERROR) << "Aborting Impala Server startup due to improper configuration";
236  exit(1);
237  }
238  }
239 
240  status = TmpFileMgr::Init();
241  if (!status.ok()) {
242  LOG(ERROR) << status.GetDetail();
243  if (FLAGS_abort_on_config_error) {
244  LOG(ERROR) << "Aborting Impala Server startup due to improperly "
245  << "configured scratch directories.";
246  exit(1);
247  }
248  }
249 
250  if (!InitProfileLogging().ok()) {
251  LOG(ERROR) << "Query profile archival is disabled";
252  FLAGS_log_query_to_file = false;
253  }
254 
255  if (!InitAuditEventLogging().ok()) {
256  LOG(ERROR) << "Aborting Impala Server startup due to failure initializing "
257  << "audit event logging";
258  exit(1);
259  }
260 
261  if (!InitLineageLogging().ok()) {
262  LOG(ERROR) << "Aborting Impala Server startup due to failure initializing "
263  << "lineage logging";
264  exit(1);
265  }
266 
267  if (!FLAGS_authorized_proxy_user_config.empty()) {
268  // Parse the proxy user configuration using the format:
269  // <proxy user>=<comma separated list of users they are allowed to delegate>
270  // See FLAGS_authorized_proxy_user_config for more details.
271  vector<string> proxy_user_config;
272  split(proxy_user_config, FLAGS_authorized_proxy_user_config, is_any_of(";"),
273  token_compress_on);
274  if (proxy_user_config.size() > 0) {
275  BOOST_FOREACH(const string& config, proxy_user_config) {
276  size_t pos = config.find("=");
277  if (pos == string::npos) {
278  LOG(ERROR) << "Invalid proxy user configuration. No mapping value specified "
279  << "for the proxy user. For more information review usage of the "
280  << "--authorized_proxy_user_config flag: " << config;
281  exit(1);
282  }
283  string proxy_user = config.substr(0, pos);
284  string config_str = config.substr(pos + 1);
285  vector<string> parsed_allowed_users;
286  split(parsed_allowed_users, config_str, is_any_of(","), token_compress_on);
287  unordered_set<string> allowed_users(parsed_allowed_users.begin(),
288  parsed_allowed_users.end());
289  authorized_proxy_user_config_.insert(make_pair(proxy_user, allowed_users));
290  }
291  }
292  }
293 
294  if (FLAGS_disk_spill_encryption) {
295  // Initialize OpenSSL for spilling encryption. This is not thread-safe so we
296  // initialize it once on startup.
297  // TODO: Set OpenSSL callbacks to provide locking to make the library thread-safe.
298  OpenSSL_add_all_algorithms();
299  ERR_load_crypto_strings();
300  }
301 
303 
304  // Initialize impalad metrics
305  ImpaladMetrics::CreateMetrics(exec_env->metrics()->GetChildGroup("impala-server"));
308 
309  // Register the membership callback if required
310  if (exec_env->subscriber() != NULL) {
312  bind<void>(mem_fn(&ImpalaServer::MembershipCallback), this, _1, _2);
314 
316  bind<void>(mem_fn(&ImpalaServer::CatalogUpdateCallback), this, _1, _2);
317  exec_env->subscriber()->AddTopic(
318  CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb);
319  }
320 
322 
323  // Initialise the cancellation thread pool with 5 (by default) threads. The max queue
324  // size is deliberately set so high that it should never fill; if it does the
325  // cancellations will get ignored and retried on the next statestore heartbeat.
327  "impala-server", "cancellation-worker",
328  FLAGS_cancellation_thread_pool_size, MAX_CANCELLATION_QUEUE_SIZE,
329  bind<void>(&ImpalaServer::CancelFromThreadPool, this, _1, _2)));
330 
331  if (FLAGS_idle_session_timeout > 0) {
332  session_timeout_thread_.reset(new Thread("impala-server", "session-expirer",
333  bind<void>(&ImpalaServer::ExpireSessions, this)));
334  }
335 
336  query_expiration_thread_.reset(new Thread("impala-server", "query-expirer",
337  bind<void>(&ImpalaServer::ExpireQueries, this)));
338 
339  is_offline_ = false;
340  if (FLAGS_enable_rm) {
341  nm_failure_detection_thread_.reset(new Thread("impala-server", "nm-failure-detector",
342  bind<void>(&ImpalaServer::DetectNmFailures, this)));
343  }
344 
345  exec_env_->SetImpalaServer(this);
346 }
347 
348 Status ImpalaServer::LogLineageRecord(const TExecRequest& request) {
349  if (!request.__isset.query_exec_request && !request.__isset.catalog_op_request) {
350  return Status::OK;
351  }
352  string lineage_graph;
353  if (request.__isset.query_exec_request &&
354  request.query_exec_request.__isset.lineage_graph) {
355  lineage_graph = request.query_exec_request.lineage_graph;
356  } else if (request.__isset.catalog_op_request &&
357  request.catalog_op_request.__isset.lineage_graph) {
358  lineage_graph = request.catalog_op_request.lineage_graph;
359  } else {
360  return Status::OK;
361  }
362  DCHECK(!lineage_graph.empty());
363  Status status = lineage_logger_->AppendEntry(lineage_graph);
364  if (!status.ok()) {
365  LOG(ERROR) << "Unable to record query lineage record: " << status.GetDetail();
366  if (FLAGS_abort_on_failed_lineage_event) {
367  LOG(ERROR) << "Shutting down Impala Server due to abort_on_failed_lineage_event=true";
368  exit(1);
369  }
370  }
371  return status;
372 }
373 
375  return !FLAGS_lineage_event_log_dir.empty();
376 }
377 
379  if (!IsLineageLoggingEnabled()) {
380  LOG(INFO) << "Lineage logging is disabled";
381  return Status::OK;
382  }
383  lineage_logger_.reset(new SimpleLogger(FLAGS_lineage_event_log_dir,
384  LINEAGE_LOG_FILE_PREFIX, FLAGS_max_lineage_log_file_size));
386  lineage_logger_flush_thread_.reset(new Thread("impala-server",
387  "lineage-log-flush", &ImpalaServer::LineageLoggerFlushThread, this));
388  return Status::OK;
389 }
390 
392  const TExecRequest& request) {
393  stringstream ss;
394  rapidjson::StringBuffer buffer;
395  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
396 
397  writer.StartObject();
398  // Each log entry is a timestamp mapped to a JSON object
399  ss << UnixMillis();
400  writer.String(ss.str().c_str());
401  writer.StartObject();
402  writer.String("query_id");
403  writer.String(PrintId(exec_state.query_id()).c_str());
404  writer.String("session_id");
405  writer.String(PrintId(exec_state.session_id()).c_str());
406  writer.String("start_time");
407  writer.String(exec_state.start_time().DebugString().c_str());
408  writer.String("authorization_failure");
409  writer.Bool(Frontend::IsAuthorizationError(exec_state.query_status()));
410  writer.String("status");
411  writer.String(exec_state.query_status().GetDetail().c_str());
412  writer.String("user");
413  writer.String(exec_state.effective_user().c_str());
414  writer.String("impersonator");
415  if (exec_state.do_as_user().empty()) {
416  // If there is no do_as_user() is empty, the "impersonator" field should be Null.
417  writer.Null();
418  } else {
419  // Otherwise, the delegator is the current connected user.
420  writer.String(exec_state.connected_user().c_str());
421  }
422  writer.String("statement_type");
423  if (request.stmt_type == TStmtType::DDL) {
424  if (request.catalog_op_request.op_type == TCatalogOpType::DDL) {
425  writer.String(
426  PrintTDdlType(request.catalog_op_request.ddl_params.ddl_type).c_str());
427  } else {
428  writer.String(PrintTCatalogOpType(request.catalog_op_request.op_type).c_str());
429  }
430  } else {
431  writer.String(PrintTStmtType(request.stmt_type).c_str());
432  }
433  writer.String("network_address");
434  writer.String(
435  lexical_cast<string>(exec_state.session()->network_address).c_str());
436  writer.String("sql_statement");
437  string stmt = replace_all_copy(exec_state.sql_stmt(), "\n", " ");
438  Redact(&stmt);
439  writer.String(stmt.c_str());
440  writer.String("catalog_objects");
441  writer.StartArray();
442  BOOST_FOREACH(const TAccessEvent& event, request.access_events) {
443  writer.StartObject();
444  writer.String("name");
445  writer.String(event.name.c_str());
446  writer.String("object_type");
447  writer.String(PrintTCatalogObjectType(event.object_type).c_str());
448  writer.String("privilege");
449  writer.String(event.privilege.c_str());
450  writer.EndObject();
451  }
452  writer.EndArray();
453  writer.EndObject();
454  writer.EndObject();
455  Status status = audit_event_logger_->AppendEntry(buffer.GetString());
456  if (!status.ok()) {
457  LOG(ERROR) << "Unable to record audit event record: " << status.GetDetail();
458  if (FLAGS_abort_on_failed_audit_event) {
459  LOG(ERROR) << "Shutting down Impala Server due to abort_on_failed_audit_event=true";
460  exit(1);
461  }
462  }
463  return status;
464 }
465 
467  return !FLAGS_audit_event_log_dir.empty();
468 }
469 
472  LOG(INFO) << "Event logging is disabled";
473  return Status::OK;
474  }
475  audit_event_logger_.reset(new SimpleLogger(FLAGS_audit_event_log_dir,
476  AUDIT_EVENT_LOG_FILE_PREFIX, FLAGS_max_audit_event_log_file_size));
478  audit_event_logger_flush_thread_.reset(new Thread("impala-server",
479  "audit-event-log-flush", &ImpalaServer::AuditEventLoggerFlushThread, this));
480  return Status::OK;
481 }
482 
484  Status status = exec_state.query_status();
485  bool log_events = true;
486  switch (exec_state.stmt_type()) {
487  case TStmtType::QUERY: {
488  // If the query didn't finish, log audit and lineage events only if the
489  // the client issued at least one fetch.
490  if (!status.ok() && !exec_state.fetched_rows()) log_events = false;
491  break;
492  }
493  case TStmtType::DML: {
494  if (!status.ok()) log_events = false;
495  break;
496  }
497  case TStmtType::DDL: {
498  if (exec_state.catalog_op_type() == TCatalogOpType::DDL) {
499  // For a DDL operation, log audit and lineage events only if the
500  // operation finished.
501  if (!status.ok()) log_events = false;
502  } else {
503  // This case covers local catalog operations such as SHOW and DESCRIBE.
504  if (!status.ok() && !exec_state.fetched_rows()) log_events = false;
505  }
506  break;
507  }
508  case TStmtType::EXPLAIN:
509  case TStmtType::LOAD:
510  case TStmtType::SET:
511  default:
512  break;
513  }
514  // Log audit events that are due to an AuthorizationException.
516  (Frontend::IsAuthorizationError(exec_state.query_status()) || log_events)) {
517  LogAuditRecord(exec_state, exec_state.exec_request());
518  }
519  if (IsLineageLoggingEnabled() && log_events) {
520  LogLineageRecord(exec_state.exec_request());
521  }
522 }
523 
525  if (!FLAGS_log_query_to_file) return Status::OK;
526 
527  if (FLAGS_profile_log_dir.empty()) {
528  stringstream ss;
529  ss << FLAGS_log_dir << "/profiles/";
530  FLAGS_profile_log_dir = ss.str();
531  }
532  profile_logger_.reset(new SimpleLogger(FLAGS_profile_log_dir,
533  PROFILE_LOG_FILE_PREFIX, FLAGS_max_profile_log_file_size));
535  profile_log_file_flush_thread_.reset(new Thread("impala-server", "log-flush-thread",
537 
538  return Status::OK;
539 }
540 
542  bool base64_encoded, stringstream* output) {
543  DCHECK(output != NULL);
544  // Search for the query id in the active query map
545  {
546  lock_guard<mutex> l(query_exec_state_map_lock_);
547  QueryExecStateMap::const_iterator exec_state = query_exec_state_map_.find(query_id);
548  if (exec_state != query_exec_state_map_.end()) {
549  if (base64_encoded) {
550  exec_state->second->profile().SerializeToArchiveString(output);
551  } else {
552  exec_state->second->profile().PrettyPrint(output);
553  }
554  return Status::OK;
555  }
556  }
557 
558  // The query was not found the active query map, search the query log.
559  {
560  lock_guard<mutex> l(query_log_lock_);
561  QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
562  if (query_record == query_log_index_.end()) {
563  stringstream ss;
564  ss << "Query id " << PrintId(query_id) << " not found.";
565  return Status(ss.str());
566  }
567  if (base64_encoded) {
568  (*output) << query_record->second->encoded_profile_str;
569  } else {
570  (*output) << query_record->second->profile_str;
571  }
572  }
573  return Status::OK;
574 }
575 
576 Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* result) {
577  // Search for the query id in the active query map
578  {
579  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
580  if (exec_state != NULL) {
581  lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
582  if (exec_state->coord() != NULL) {
583  lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
584  *result = exec_state->coord()->exec_summary();
585  return Status::OK;
586  }
587  }
588  }
589 
590  // Look for the query in completed query log.
591  {
592  lock_guard<mutex> l(query_log_lock_);
593  QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
594  if (query_record == query_log_index_.end()) {
595  stringstream ss;
596  ss << "Query id " << PrintId(query_id) << " not found.";
597  return Status(ss.str());
598  }
599  *result = query_record->second->exec_summary;
600  }
601  return Status::OK;
602 }
603 
605  while (true) {
606  sleep(5);
607  profile_logger_->Flush();
608  }
609 }
610 
612  while (true) {
613  sleep(5);
614  Status status = audit_event_logger_->Flush();
615  if (!status.ok()) {
616  LOG(ERROR) << "Error flushing audit event log: " << status.GetDetail();
617  if (FLAGS_abort_on_failed_audit_event) {
618  LOG(ERROR) << "Shutting down Impala Server due to "
619  << "abort_on_failed_audit_event=true";
620  exit(1);
621  }
622  }
623  }
624 }
625 
627  while (true) {
628  sleep(5);
629  Status status = lineage_logger_->Flush();
630  if (!status.ok()) {
631  LOG(ERROR) << "Error flushing lineage event log: " << status.GetDetail();
632  if (FLAGS_abort_on_failed_lineage_event) {
633  LOG(ERROR) << "Shutting down Impala Server due to "
634  << "abort_on_failed_lineage_event=true";
635  exit(1);
636  }
637  }
638  }
639 }
640 
642  const string& encoded_profile_str = query.profile().SerializeToArchiveString();
643 
644  // If there was an error initialising archival (e.g. directory is not writeable),
645  // FLAGS_log_query_to_file will have been set to false
646  if (FLAGS_log_query_to_file) {
647  stringstream ss;
648  ss << UnixMillis() << " " << query.query_id() << " " << encoded_profile_str;
649  Status status = profile_logger_->AppendEntry(ss.str());
650  if (!status.ok()) {
651  LOG_EVERY_N(WARNING, 1000) << "Could not write to profile log file file ("
652  << google::COUNTER << " attempts failed): "
653  << status.GetDetail();
654  LOG_EVERY_N(WARNING, 1000)
655  << "Disable query logging with --log_query_to_file=false";
656  }
657  }
658 
659  if (FLAGS_query_log_size == 0) return;
660  QueryStateRecord record(query, true, encoded_profile_str);
661  if (query.coord() != NULL) {
662  lock_guard<SpinLock> lock(query.coord()->GetExecSummaryLock());
663  record.exec_summary = query.coord()->exec_summary();
664  }
665  {
666  lock_guard<mutex> l(query_log_lock_);
667  // Add record to the beginning of the log, and to the lookup index.
668  query_log_index_[query.query_id()] = query_log_.insert(query_log_.begin(), record);
669 
670  if (FLAGS_query_log_size > -1 && FLAGS_query_log_size < query_log_.size()) {
671  DCHECK_EQ(query_log_.size() - FLAGS_query_log_size, 1);
672  query_log_index_.erase(query_log_.back().id);
673  query_log_.pop_back();
674  }
675  }
676 }
677 
679 
680 Status ImpalaServer::Execute(TQueryCtx* query_ctx,
681  shared_ptr<SessionState> session_state,
682  shared_ptr<QueryExecState>* exec_state) {
683  PrepareQueryContext(query_ctx);
684  bool registered_exec_state;
686 
687  // Redact the SQL stmt and update the query context
688  string stmt = replace_all_copy(query_ctx->request.stmt, "\n", " ");
689  Redact(&stmt);
690  query_ctx->request.__set_redacted_stmt((const string) stmt);
691 
692  Status status = ExecuteInternal(*query_ctx, session_state, &registered_exec_state,
693  exec_state);
694  if (!status.ok() && registered_exec_state) {
695  UnregisterQuery((*exec_state)->query_id(), false, &status);
696  }
697  return status;
698 }
699 
701  const TQueryCtx& query_ctx,
702  shared_ptr<SessionState> session_state,
703  bool* registered_exec_state,
704  shared_ptr<QueryExecState>* exec_state) {
705  DCHECK(session_state != NULL);
706  *registered_exec_state = false;
707  if (IsOffline()) {
708  return Status("This Impala server is offline. Please retry your query later.");
709  }
710  exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
711  this, session_state));
712 
713  (*exec_state)->query_events()->MarkEvent("Start execution");
714 
715  TExecRequest result;
716  {
717  // Keep a lock on exec_state so that registration and setting
718  // result_metadata are atomic.
719  //
720  // Note: this acquires the exec_state lock *before* the
721  // query_exec_state_map_ lock. This is the opposite of
722  // GetQueryExecState(..., true), and therefore looks like a
723  // candidate for deadlock. The reason this works here is that
724  // GetQueryExecState cannot find exec_state (under the exec state
725  // map lock) and take it's lock until RegisterQuery has
726  // finished. By that point, the exec state map lock will have been
727  // given up, so the classic deadlock interleaving is not possible.
728  lock_guard<mutex> l(*(*exec_state)->lock());
729 
730  // register exec state as early as possible so that queries that
731  // take a long time to plan show up, and to handle incoming status
732  // reports before execution starts.
733  RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state));
734  *registered_exec_state = true;
735 
736  RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus(
737  exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
738  (*exec_state)->query_events()->MarkEvent("Planning finished");
739  (*exec_state)->summary_profile()->AddEventSequence(
740  result.timeline.name, result.timeline);
741  if (result.__isset.result_set_metadata) {
742  (*exec_state)->set_result_metadata(result.result_set_metadata);
743  }
744  }
745  VLOG(2) << "Execution request: " << ThriftDebugString(result);
746 
747  // start execution of query; also starts fragment status reports
748  RETURN_IF_ERROR((*exec_state)->Exec(&result));
749  if (result.stmt_type == TStmtType::DDL) {
750  Status status = UpdateCatalogMetrics();
751  if (!status.ok()) {
752  VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail();
753  }
754  }
755 
756  if ((*exec_state)->coord() != NULL) {
757  const unordered_set<TNetworkAddress>& unique_hosts =
758  (*exec_state)->schedule()->unique_hosts();
759  if (!unique_hosts.empty()) {
760  lock_guard<mutex> l(query_locations_lock_);
761  BOOST_FOREACH(const TNetworkAddress& port, unique_hosts) {
762  query_locations_[port].insert((*exec_state)->query_id());
763  }
764  }
765  }
766  return Status::OK;
767 }
768 
769 void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
770  query_ctx->__set_pid(getpid());
771  query_ctx->__set_now_string(TimestampValue::LocalTime().DebugString());
772  query_ctx->__set_coord_address(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
773 
774  // Creating a random_generator every time is not free, but
775  // benchmarks show it to be slightly cheaper than contending for a
776  // single generator under a lock (since random_generator is not
777  // thread-safe).
778  random_generator uuid_generator;
779  uuid query_uuid = uuid_generator();
780  UUIDToTUniqueId(query_uuid, &query_ctx->query_id);
781 }
782 
783 Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
784  const shared_ptr<QueryExecState>& exec_state) {
785  lock_guard<mutex> l2(session_state->lock);
786  // The session wasn't expired at the time it was checked out and it isn't allowed to
787  // expire while checked out, so it must not be expired.
788  DCHECK(session_state->ref_count > 0 && !session_state->expired);
789  // The session may have been closed after it was checked out.
790  if (session_state->closed) return Status("Session has been closed, ignoring query.");
791  const TUniqueId& query_id = exec_state->query_id();
792  {
793  lock_guard<mutex> l(query_exec_state_map_lock_);
794  QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
795  if (entry != query_exec_state_map_.end()) {
796  // There shouldn't be an active query with that same id.
797  // (query_id is globally unique)
798  stringstream ss;
799  ss << "query id " << PrintId(query_id) << " already exists";
800  return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, ss.str()));
801  }
802  query_exec_state_map_.insert(make_pair(query_id, exec_state));
803  }
804  return Status::OK;
805 }
806 
807 Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
808  const shared_ptr<QueryExecState>& exec_state) {
809  const TUniqueId& query_id = exec_state->query_id();
810  lock_guard<mutex> l(session_state->lock);
811  // The session wasn't expired at the time it was checked out and it isn't allowed to
812  // expire while checked out, so it must not be expired.
813  DCHECK_GT(session_state->ref_count, 0);
814  DCHECK(!session_state->expired);
815  // The session may have been closed after it was checked out.
816  if (session_state->closed) return Status("Session closed");
817  // Add query to the set that will be unregistered if sesssion is closed.
818  session_state->inflight_queries.insert(query_id);
819  // Set query expiration.
820  int32_t timeout_s = exec_state->query_options().query_timeout_s;
821  if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) {
822  timeout_s = min(FLAGS_idle_query_timeout, timeout_s);
823  } else {
824  // Use a non-zero timeout, if one exists
825  timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
826  }
827  if (timeout_s > 0) {
828  lock_guard<mutex> l2(query_expiration_lock_);
829  VLOG_QUERY << "Query " << PrintId(query_id) << " has timeout of "
830  << PrettyPrinter::Print(timeout_s * 1000L * 1000L * 1000L,
831  TUnit::TIME_NS);
832  queries_by_timestamp_.insert(
833  make_pair(UnixMillis() + (1000L * timeout_s), query_id));
834  }
835  return Status::OK;
836 }
837 
838 Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
839  const Status* cause) {
840  VLOG_QUERY << "UnregisterQuery(): query_id=" << query_id;
841 
842  RETURN_IF_ERROR(CancelInternal(query_id, check_inflight, cause));
843 
844  shared_ptr<QueryExecState> exec_state;
845  {
846  lock_guard<mutex> l(query_exec_state_map_lock_);
847  QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
848  if (entry == query_exec_state_map_.end()) {
849  return Status("Invalid or unknown query handle");
850  } else {
851  exec_state = entry->second;
852  }
853  query_exec_state_map_.erase(entry);
854  }
855 
856  exec_state->Done();
857  LogQueryEvents(*exec_state.get());
858 
859  {
860  lock_guard<mutex> l(exec_state->session()->lock);
861  exec_state->session()->inflight_queries.erase(query_id);
862  }
863 
864  if (exec_state->coord() != NULL) {
865  string exec_summary;
866  {
867  lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
868  const TExecSummary& summary = exec_state->coord()->exec_summary();
869  exec_summary = PrintExecSummary(summary);
870  }
871  exec_state->summary_profile()->AddInfoString("ExecSummary", exec_summary);
872 
873  const unordered_set<TNetworkAddress>& unique_hosts =
874  exec_state->schedule()->unique_hosts();
875  if (!unique_hosts.empty()) {
876  lock_guard<mutex> l(query_locations_lock_);
877  BOOST_FOREACH(const TNetworkAddress& hostport, unique_hosts) {
878  // Query may have been removed already by cancellation path. In particular, if
879  // node to fail was last sender to an exchange, the coordinator will realise and
880  // fail the query at the same time the failure detection path does the same
881  // thing. They will harmlessly race to remove the query from this map.
882  QueryLocations::iterator it = query_locations_.find(hostport);
883  if (it != query_locations_.end()) {
884  it->second.erase(exec_state->query_id());
885  }
886  }
887  }
888  }
889  ArchiveQuery(*exec_state);
890  return Status::OK;
891 }
892 
894  TGetDbsResult db_names;
895  RETURN_IF_ERROR(exec_env_->frontend()->GetDbNames(NULL, NULL, &db_names));
896  ImpaladMetrics::CATALOG_NUM_DBS->set_value(db_names.dbs.size());
897  ImpaladMetrics::CATALOG_NUM_TABLES->set_value(0L);
898  BOOST_FOREACH(const string& db, db_names.dbs) {
899  TGetTablesResult table_names;
900  RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db, NULL, NULL, &table_names));
901  ImpaladMetrics::CATALOG_NUM_TABLES->Increment(table_names.tables.size());
902  }
903 
904  return Status::OK;
905 }
906 
907 Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflight,
908  const Status* cause) {
909  VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
910  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
911  if (exec_state == NULL) return Status("Invalid or unknown query handle");
912  lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
913  if (check_inflight) {
914  lock_guard<mutex> l2(exec_state->session()->lock);
915  if (exec_state->session()->inflight_queries.find(query_id) ==
916  exec_state->session()->inflight_queries.end()) {
917  return Status("Query not yet running");
918  }
919  }
920  // TODO: can we call Coordinator::Cancel() here while holding lock?
921  exec_state->Cancel(cause);
922  return Status::OK;
923 }
924 
925 Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
926  bool ignore_if_absent) {
927  // Find the session_state and remove it from the map.
928  shared_ptr<SessionState> session_state;
929  {
930  lock_guard<mutex> l(session_state_map_lock_);
931  SessionStateMap::iterator entry = session_state_map_.find(session_id);
932  if (entry == session_state_map_.end()) {
933  if (ignore_if_absent) {
934  return Status::OK;
935  } else {
936  return Status("Invalid session ID");
937  }
938  }
939  session_state = entry->second;
940  session_state_map_.erase(session_id);
941  }
942  DCHECK(session_state != NULL);
943  if (session_state->session_type == TSessionType::BEESWAX) {
945  } else {
947  }
948  unordered_set<TUniqueId> inflight_queries;
949  {
950  lock_guard<mutex> l(session_state->lock);
951  DCHECK(!session_state->closed);
952  session_state->closed = true;
953  // Since closed is true, no more queries will be added to the inflight list.
954  inflight_queries.insert(session_state->inflight_queries.begin(),
955  session_state->inflight_queries.end());
956  }
957  // Unregister all open queries from this session.
958  Status status("Session closed");
959  BOOST_FOREACH(const TUniqueId& query_id, inflight_queries) {
960  UnregisterQuery(query_id, false, &status);
961  }
962  return Status::OK;
963 }
964 
965 Status ImpalaServer::GetSessionState(const TUniqueId& session_id,
966  shared_ptr<SessionState>* session_state, bool mark_active) {
967  lock_guard<mutex> l(session_state_map_lock_);
968  SessionStateMap::iterator i = session_state_map_.find(session_id);
969  if (i == session_state_map_.end()) {
970  *session_state = boost::shared_ptr<SessionState>();
971  return Status("Invalid session id");
972  } else {
973  if (mark_active) {
974  lock_guard<mutex> session_lock(i->second->lock);
975  if (i->second->expired) {
976  stringstream ss;
977  ss << "Client session expired due to more than " << FLAGS_idle_session_timeout
978  << "s of inactivity (last activity was at: "
979  << TimestampValue(i->second->last_accessed_ms / 1000).DebugString() << ").";
980  return Status(ss.str());
981  }
982  if (i->second->closed) return Status("Session is closed");
983  ++i->second->ref_count;
984  }
985  *session_state = i->second;
986  return Status::OK;
987  }
988 }
989 
991  TReportExecStatusResult& return_val, const TReportExecStatusParams& params) {
992  VLOG_FILE << "ReportExecStatus() query_id=" << params.query_id
993  << " backend#=" << params.backend_num
994  << " instance_id=" << params.fragment_instance_id
995  << " done=" << (params.done ? "true" : "false");
996  // TODO: implement something more efficient here, we're currently
997  // acquiring/releasing the map lock and doing a map lookup for
998  // every report (assign each query a local int32_t id and use that to index into a
999  // vector of QueryExecStates, w/o lookup or locking?)
1000  shared_ptr<QueryExecState> exec_state = GetQueryExecState(params.query_id, false);
1001  // TODO: This is expected occasionally (since a report RPC might be in flight while
1002  // cancellation is happening), but repeated instances for the same query are a bug
1003  // (which we have occasionally seen). Consider keeping query exec states around for a
1004  // little longer (until all reports have been received).
1005  if (exec_state.get() == NULL) {
1006  return_val.status.__set_status_code(TErrorCode::INTERNAL_ERROR);
1007  const string& err = Substitute("ReportExecStatus(): Received report for unknown "
1008  "query ID (probably closed or cancelled). (query_id: $0, backend: $1, instance:"
1009  " $2 done: $3)", PrintId(params.query_id), params.backend_num,
1010  PrintId(params.fragment_instance_id), params.done);
1011  return_val.status.error_msgs.push_back(err);
1012  VLOG_QUERY << err;
1013  return;
1014  }
1015  exec_state->coord()->UpdateFragmentExecStatus(params).SetTStatus(&return_val);
1016 }
1017 
1019  TTransmitDataResult& return_val, const TTransmitDataParams& params) {
1020  VLOG_ROW << "TransmitData(): instance_id=" << params.dest_fragment_instance_id
1021  << " node_id=" << params.dest_node_id
1022  << " #rows=" << params.row_batch.num_rows
1023  << "sender_id=" << params.sender_id
1024  << " eos=" << (params.eos ? "true" : "false");
1025  // TODO: fix Thrift so we can simply take ownership of thrift_batch instead
1026  // of having to copy its data
1027  if (params.row_batch.num_rows > 0) {
1028  Status status = exec_env_->stream_mgr()->AddData(
1029  params.dest_fragment_instance_id, params.dest_node_id, params.row_batch,
1030  params.sender_id);
1031  status.SetTStatus(&return_val);
1032  if (!status.ok()) {
1033  // should we close the channel here as well?
1034  return;
1035  }
1036  }
1037 
1038  if (params.eos) {
1040  params.dest_fragment_instance_id, params.dest_node_id,
1041  params.sender_id).SetTStatus(&return_val);
1042  }
1043 }
1044 
1046  Status status = ParseQueryOptions(FLAGS_default_query_options, &default_query_options_);
1047  if (!status.ok()) {
1048  // Log error and exit if the default query options are invalid.
1049  LOG(ERROR) << "Invalid default query options. Please check -default_query_options.\n"
1050  << status.GetDetail();
1051  exit(1);
1052  }
1053  LOG(INFO) << "Default query options:" << ThriftDebugString(default_query_options_);
1054 
1055  map<string, string> string_map;
1057  map<string, string>::const_iterator itr = string_map.begin();
1058  for (; itr != string_map.end(); ++itr) {
1059  ConfigVariable option;
1060  option.__set_key(itr->first);
1061  option.__set_value(itr->second);
1062  default_configs_.push_back(option);
1063  }
1064  ConfigVariable support_start_over;
1065  support_start_over.__set_key("support_start_over");
1066  support_start_over.__set_value("false");
1067  default_configs_.push_back(support_start_over);
1068 }
1069 
1070 void ImpalaServer::SessionState::ToThrift(const TUniqueId& session_id,
1071  TSessionState* state) {
1072  lock_guard<mutex> l(lock);
1073  state->session_id = session_id;
1074  state->session_type = session_type;
1075  state->database = database;
1076  state->connected_user = connected_user;
1077  // The do_as_user will only be set if delegation is enabled and the
1078  // proxy user is authorized to delegate as this user.
1079  if (!do_as_user.empty()) state->__set_delegated_user(do_as_user);
1080  state->network_address = network_address;
1081 }
1082 
1083 void ImpalaServer::CancelFromThreadPool(uint32_t thread_id,
1084  const CancellationWork& cancellation_work) {
1085  if (cancellation_work.unregister()) {
1086  Status status = UnregisterQuery(cancellation_work.query_id(), true,
1087  &cancellation_work.cause());
1088  if (!status.ok()) {
1089  VLOG_QUERY << "Query de-registration (" << cancellation_work.query_id()
1090  << ") failed";
1091  }
1092  } else {
1093  Status status = CancelInternal(cancellation_work.query_id(), true,
1094  &cancellation_work.cause());
1095  if (!status.ok()) {
1096  VLOG_QUERY << "Query cancellation (" << cancellation_work.query_id()
1097  << ") did not succeed: " << status.GetDetail();
1098  }
1099  }
1100 }
1101 
1102 Status ImpalaServer::AuthorizeProxyUser(const string& user, const string& do_as_user) {
1103  if (user.empty()) {
1104  return Status("Unable to delegate using empty proxy username.");
1105  } else if (user.empty()) {
1106  return Status("Unable to delegate using empty doAs username.");
1107  }
1108 
1109  stringstream error_msg;
1110  error_msg << "User '" << user << "' is not authorized to delegate to '"
1111  << do_as_user << "'.";
1112  if (authorized_proxy_user_config_.size() == 0) {
1113  error_msg << " User delegation is disabled.";
1114  return Status(error_msg.str());
1115  }
1116 
1117  // Get the short version of the user name (the user name up to the first '/' or '@')
1118  // from the full principal name.
1119  size_t end_idx = min(user.find("/"), user.find("@"));
1120  // If neither are found (or are found at the beginning of the user name),
1121  // return the username. Otherwise, return the username up to the matching character.
1122  string short_user(
1123  end_idx == string::npos || end_idx == 0 ? user : user.substr(0, end_idx));
1124 
1125  // Check if the proxy user exists. If he/she does, then check if they are allowed
1126  // to delegate to the do_as_user.
1127  ProxyUserMap::const_iterator proxy_user =
1128  authorized_proxy_user_config_.find(short_user);
1129  if (proxy_user != authorized_proxy_user_config_.end()) {
1130  BOOST_FOREACH(const string& user, proxy_user->second) {
1131  if (user == "*" || user == do_as_user) return Status::OK;
1132  }
1133  }
1134  return Status(error_msg.str());
1135 }
1136 
1138  const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
1139  vector<TTopicDelta>* subscriber_topic_updates) {
1140  StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
1141  incoming_topic_deltas.find(CatalogServer::IMPALA_CATALOG_TOPIC);
1142  if (topic == incoming_topic_deltas.end()) return;
1143  const TTopicDelta& delta = topic->second;
1144 
1145 
1146  // Process any updates
1147  if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0) {
1148  TUpdateCatalogCacheRequest update_req;
1149  update_req.__set_is_delta(delta.is_delta);
1150  // Process all Catalog updates (new and modified objects) and determine what the
1151  // new catalog version will be.
1152  int64_t new_catalog_version = catalog_update_info_.catalog_version;
1153  BOOST_FOREACH(const TTopicItem& item, delta.topic_entries) {
1154  uint32_t len = item.value.size();
1155  TCatalogObject catalog_object;
1156  Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
1157  item.value.data()), &len, FLAGS_compact_catalog_topic, &catalog_object);
1158  if (!status.ok()) {
1159  LOG(ERROR) << "Error deserializing item: " << status.GetDetail();
1160  continue;
1161  }
1162  if (catalog_object.type == TCatalogObjectType::CATALOG) {
1163  update_req.__set_catalog_service_id(catalog_object.catalog.catalog_service_id);
1164  new_catalog_version = catalog_object.catalog_version;
1165  }
1166 
1167  // Refresh the lib cache entries of any added functions and data sources
1168  if (catalog_object.type == TCatalogObjectType::FUNCTION) {
1169  DCHECK(catalog_object.__isset.fn);
1170  LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
1171  }
1172  if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
1173  DCHECK(catalog_object.__isset.data_source);
1174  LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
1175  }
1176 
1177  update_req.updated_objects.push_back(catalog_object);
1178  }
1179 
1180  // We need to look up the dropped functions and data sources and remove them
1181  // from the library cache. The data sent from the catalog service does not
1182  // contain all the function metadata so we'll ask our local frontend for it. We
1183  // need to do this before updating the catalog.
1184  vector<TCatalogObject> dropped_objects;
1185 
1186  // Process all Catalog deletions (dropped objects). We only know the keys (object
1187  // names) so must parse each key to determine the TCatalogObject.
1188  BOOST_FOREACH(const string& key, delta.topic_deletions) {
1189  LOG(INFO) << "Catalog topic entry deletion: " << key;
1190  TCatalogObject catalog_object;
1191  Status status = TCatalogObjectFromEntryKey(key, &catalog_object);
1192  if (!status.ok()) {
1193  LOG(ERROR) << "Error parsing catalog topic entry deletion key: " << key << " "
1194  << "Error: " << status.GetDetail();
1195  continue;
1196  }
1197  update_req.removed_objects.push_back(catalog_object);
1198  if (catalog_object.type == TCatalogObjectType::FUNCTION ||
1199  catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
1200  TCatalogObject dropped_object;
1202  catalog_object, &dropped_object).ok()) {
1203  // This object may have been dropped and re-created. To avoid removing the
1204  // re-created object's entry from the cache verify the existing object has a
1205  // catalog version <= the catalog version included in this statestore heartbeat.
1206  if (dropped_object.catalog_version <= new_catalog_version) {
1207  if (catalog_object.type == TCatalogObjectType::FUNCTION ||
1208  catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
1209  dropped_objects.push_back(dropped_object);
1210  }
1211  }
1212  }
1213  // Nothing to do in error case.
1214  }
1215  }
1216 
1217  // Call the FE to apply the changes to the Impalad Catalog.
1218  TUpdateCatalogCacheResponse resp;
1219  Status s = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp);
1220  if (!s.ok()) {
1221  LOG(ERROR) << "There was an error processing the impalad catalog update. Requesting"
1222  << " a full topic update to recover: " << s.GetDetail();
1223  subscriber_topic_updates->push_back(TTopicDelta());
1224  TTopicDelta& update = subscriber_topic_updates->back();
1225  update.topic_name = CatalogServer::IMPALA_CATALOG_TOPIC;
1226  update.__set_from_version(0L);
1227  ImpaladMetrics::CATALOG_READY->set_value(false);
1228  // Dropped all cached lib files (this behaves as if all functions and data
1229  // sources are dropped).
1231  } else {
1232  {
1233  unique_lock<mutex> unique_lock(catalog_version_lock_);
1234  catalog_update_info_.catalog_version = new_catalog_version;
1235  catalog_update_info_.catalog_topic_version = delta.to_version;
1236  catalog_update_info_.catalog_service_id = resp.catalog_service_id;
1237  }
1238  ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0);
1240  // Remove all dropped objects from the library cache.
1241  // TODO: is this expensive? We'd like to process heartbeats promptly.
1242  BOOST_FOREACH(TCatalogObject& object, dropped_objects) {
1243  if (object.type == TCatalogObjectType::FUNCTION) {
1244  LibCache::instance()->RemoveEntry(object.fn.hdfs_location);
1245  } else if (object.type == TCatalogObjectType::DATA_SOURCE) {
1246  LibCache::instance()->RemoveEntry(object.data_source.hdfs_location);
1247  } else {
1248  DCHECK(false);
1249  }
1250  }
1251  }
1252  }
1253 
1254  // Always update the minimum subscriber version for the catalog topic.
1255  {
1256  unique_lock<mutex> unique_lock(catalog_version_lock_);
1257  min_subscriber_catalog_topic_version_ = delta.min_subscriber_topic_version;
1258  }
1259  catalog_version_update_cv_.notify_all();
1260 }
1261 
1263  const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers) {
1264  // If this this update result contains a catalog object to add or remove, directly apply
1265  // the update to the local impalad's catalog cache. Otherwise, wait for a statestore
1266  // heartbeat that contains this update version.
1267  if ((catalog_update_result.__isset.updated_catalog_object ||
1268  catalog_update_result.__isset.removed_catalog_object)) {
1269  TUpdateCatalogCacheRequest update_req;
1270  update_req.__set_is_delta(true);
1271  update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
1272 
1273  if (catalog_update_result.__isset.updated_catalog_object) {
1274  update_req.updated_objects.push_back(catalog_update_result.updated_catalog_object);
1275  }
1276  if (catalog_update_result.__isset.removed_catalog_object) {
1277  update_req.removed_objects.push_back(catalog_update_result.removed_catalog_object);
1278  }
1279  // Apply the changes to the local catalog cache.
1280  TUpdateCatalogCacheResponse resp;
1281  Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp);
1282  if (!status.ok()) LOG(ERROR) << status.GetDetail();
1283  RETURN_IF_ERROR(status);
1284  if (!wait_for_all_subscribers) return Status::OK;
1285  }
1286 
1287  unique_lock<mutex> unique_lock(catalog_version_lock_);
1288  int64_t min_req_catalog_version = catalog_update_result.version;
1289  const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
1290 
1291  // Wait for the update to be processed locally.
1292  // TODO: What about query cancellation?
1293  VLOG_QUERY << "Waiting for catalog version: " << min_req_catalog_version
1294  << " current version: " << catalog_update_info_.catalog_version;
1295  while (catalog_update_info_.catalog_version < min_req_catalog_version &&
1296  catalog_update_info_.catalog_service_id == catalog_service_id) {
1297  catalog_version_update_cv_.wait(unique_lock);
1298  }
1299 
1300  if (!wait_for_all_subscribers) return Status::OK;
1301 
1302  // Now wait for this update to be propagated to all catalog topic subscribers.
1303  // If we make it here it implies the first condition was met (the update was processed
1304  // locally or the catalog service id has changed).
1305  int64_t min_req_subscriber_topic_version = catalog_update_info_.catalog_topic_version;
1306 
1307  VLOG_QUERY << "Waiting for min subscriber topic version: "
1308  << min_req_subscriber_topic_version << " current version: "
1310  while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
1311  catalog_update_info_.catalog_service_id == catalog_service_id) {
1312  catalog_version_update_cv_.wait(unique_lock);
1313  }
1314  return Status::OK;
1315 }
1316 
1318  const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
1319  vector<TTopicDelta>* subscriber_topic_updates) {
1320  // TODO: Consider rate-limiting this. In the short term, best to have
1321  // statestore heartbeat less frequently.
1322  StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
1323  incoming_topic_deltas.find(SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC);
1324 
1325  if (topic != incoming_topic_deltas.end()) {
1326  const TTopicDelta& delta = topic->second;
1327  // If this is not a delta, the update should include all entries in the topic so
1328  // clear the saved mapping of known backends.
1329  if (!delta.is_delta) known_backends_.clear();
1330 
1331  // Process membership additions.
1332  BOOST_FOREACH(const TTopicItem& item, delta.topic_entries) {
1333  uint32_t len = item.value.size();
1334  TBackendDescriptor backend_descriptor;
1335  Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
1336  item.value.data()), &len, false, &backend_descriptor);
1337  if (!status.ok()) {
1338  VLOG(2) << "Error deserializing topic item with key: " << item.key;
1339  continue;
1340  }
1341  // This is a new item - add it to the map of known backends.
1342  known_backends_.insert(make_pair(item.key, backend_descriptor.address));
1343  }
1344  // Process membership deletions.
1345  BOOST_FOREACH(const string& backend_id, delta.topic_deletions) {
1346  known_backends_.erase(backend_id);
1347  }
1348 
1349  // Create a set of known backend network addresses. Used to test for cluster
1350  // membership by network address.
1351  set<TNetworkAddress> current_membership;
1352  BOOST_FOREACH(const BackendAddressMap::value_type& backend, known_backends_) {
1353  current_membership.insert(backend.second);
1354  }
1355 
1356  // Maps from query id (to be cancelled) to a list of failed Impalads that are
1357  // the cause of the cancellation.
1358  map<TUniqueId, vector<TNetworkAddress> > queries_to_cancel;
1359  {
1360  // Build a list of queries that are running on failed hosts (as evidenced by their
1361  // absence from the membership list).
1362  // TODO: crash-restart failures can give false negatives for failed Impala demons.
1363  lock_guard<mutex> l(query_locations_lock_);
1364  QueryLocations::const_iterator loc_entry = query_locations_.begin();
1365  while (loc_entry != query_locations_.end()) {
1366  if (current_membership.find(loc_entry->first) == current_membership.end()) {
1367  unordered_set<TUniqueId>::const_iterator query_id = loc_entry->second.begin();
1368  // Add failed backend locations to all queries that ran on that backend.
1369  for(; query_id != loc_entry->second.end(); ++query_id) {
1370  vector<TNetworkAddress>& failed_hosts = queries_to_cancel[*query_id];
1371  failed_hosts.push_back(loc_entry->first);
1372  }
1373  exec_env_->impalad_client_cache()->CloseConnections(loc_entry->first);
1374  // We can remove the location wholesale once we know backend's failed. To do so
1375  // safely during iteration, we have to be careful not in invalidate the current
1376  // iterator, so copy the iterator to do the erase(..) and advance the original.
1377  QueryLocations::const_iterator failed_backend = loc_entry;
1378  ++loc_entry;
1379  query_locations_.erase(failed_backend);
1380  } else {
1381  ++loc_entry;
1382  }
1383  }
1384  }
1385 
1386  if (cancellation_thread_pool_->GetQueueSize() + queries_to_cancel.size() >
1388  // Ignore the cancellations - we'll be able to process them on the next heartbeat
1389  // instead.
1390  LOG_EVERY_N(WARNING, 60) << "Cancellation queue is full";
1391  } else {
1392  // Since we are the only producer for this pool, we know that this cannot block
1393  // indefinitely since the queue is large enough to accept all new cancellation
1394  // requests.
1395  map<TUniqueId, vector<TNetworkAddress> >::iterator cancellation_entry;
1396  for (cancellation_entry = queries_to_cancel.begin();
1397  cancellation_entry != queries_to_cancel.end();
1398  ++cancellation_entry) {
1399  stringstream cause_msg;
1400  cause_msg << "Cancelled due to unreachable impalad(s): ";
1401  for (int i = 0; i < cancellation_entry->second.size(); ++i) {
1402  cause_msg << cancellation_entry->second[i];
1403  if (i + 1 != cancellation_entry->second.size()) cause_msg << ", ";
1404  }
1406  CancellationWork(cancellation_entry->first, Status(cause_msg.str()), false));
1407  }
1408  }
1409  }
1410 }
1411 
1412 ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_state,
1413  bool copy_profile, const string& encoded_profile) {
1414  id = exec_state.query_id();
1415  const TExecRequest& request = exec_state.exec_request();
1416 
1417  const string* plan_str = exec_state.summary_profile().GetInfoString("Plan");
1418  if (plan_str != NULL) plan = *plan_str;
1419  stmt = exec_state.sql_stmt();
1420  stmt_type = request.stmt_type;
1421  effective_user = exec_state.effective_user();
1422  default_db = exec_state.default_db();
1423  start_time = exec_state.start_time();
1424  end_time = exec_state.end_time();
1425  has_coord = false;
1426 
1427  Coordinator* coord = exec_state.coord();
1428  if (coord != NULL) {
1429  num_complete_fragments = coord->progress().num_complete();
1430  total_fragments = coord->progress().total();
1431  has_coord = true;
1432  }
1433  query_state = exec_state.query_state();
1434  num_rows_fetched = exec_state.num_rows_fetched();
1435  query_status = exec_state.query_status();
1436 
1437  exec_state.query_events()->ToThrift(&event_sequence);
1438 
1439  if (copy_profile) {
1440  stringstream ss;
1441  exec_state.profile().PrettyPrint(&ss);
1442  profile_str = ss.str();
1443  if (encoded_profile.empty()) {
1444  encoded_profile_str = exec_state.profile().SerializeToArchiveString();
1445  } else {
1446  encoded_profile_str = encoded_profile;
1447  }
1448  }
1449 
1450  // Save the query fragments so that the plan can be visualised.
1451  fragments = exec_state.exec_request().query_exec_request.fragments;
1452 }
1453 
1455  const QueryStateRecord& lhs, const QueryStateRecord& rhs) const {
1456  if (lhs.start_time == rhs.start_time) return lhs.id < rhs.id;
1457  return lhs.start_time < rhs.start_time;
1458 }
1459 
1461  const ThriftServer::ConnectionContext& connection_context) {
1462  if (connection_context.server_name == BEESWAX_SERVER_NAME) {
1463  // Beeswax only allows for one session per connection, so we can share the session ID
1464  // with the connection ID
1465  const TUniqueId& session_id = connection_context.connection_id;
1466  shared_ptr<SessionState> session_state;
1467  session_state.reset(new SessionState);
1468  session_state->closed = false;
1469  session_state->start_time = TimestampValue::LocalTime();
1470  session_state->last_accessed_ms = UnixMillis();
1471  session_state->database = "default";
1472  session_state->session_type = TSessionType::BEESWAX;
1473  session_state->network_address = connection_context.network_address;
1474  session_state->default_query_options = default_query_options_;
1475  // If the username was set by a lower-level transport, use it.
1476  if (!connection_context.username.empty()) {
1477  session_state->connected_user = connection_context.username;
1478  }
1479 
1480  {
1481  lock_guard<mutex> l(session_state_map_lock_);
1482  bool success =
1483  session_state_map_.insert(make_pair(session_id, session_state)).second;
1484  // The session should not have already existed.
1485  DCHECK(success);
1486  }
1487  {
1488  lock_guard<mutex> l(connection_to_sessions_map_lock_);
1489  connection_to_sessions_map_[connection_context.connection_id].push_back(session_id);
1490  }
1492  }
1493 }
1494 
1496  const ThriftServer::ConnectionContext& connection_context) {
1497  unique_lock<mutex> l(connection_to_sessions_map_lock_);
1498  ConnectionToSessionMap::iterator it =
1499  connection_to_sessions_map_.find(connection_context.connection_id);
1500 
1501  // Not every connection must have an associated session
1502  if (it == connection_to_sessions_map_.end()) return;
1503 
1504  LOG(INFO) << "Connection from client " << connection_context.network_address
1505  << " closed, closing " << it->second.size() << " associated session(s)";
1506 
1507  BOOST_FOREACH(const TUniqueId& session_id, it->second) {
1508  Status status = CloseSessionInternal(session_id, true);
1509  if (!status.ok()) {
1510  LOG(WARNING) << "Error closing session " << session_id << ": "
1511  << status.GetDetail();
1512  }
1513  }
1514  connection_to_sessions_map_.erase(it);
1515 }
1516 
1518  while (true) {
1519  // Sleep for half the session timeout; the maximum delay between a session expiring
1520  // and this method picking it up is equal to the size of this sleep.
1521  SleepForMs(FLAGS_idle_session_timeout * 500);
1522  lock_guard<mutex> l(session_state_map_lock_);
1523  int64_t now = UnixMillis();
1524  VLOG(3) << "Session expiration thread waking up";
1525  // TODO: If holding session_state_map_lock_ for the duration of this loop is too
1526  // expensive, consider a priority queue.
1527  BOOST_FOREACH(SessionStateMap::value_type& session_state, session_state_map_) {
1528  unordered_set<TUniqueId> inflight_queries;
1529  {
1530  lock_guard<mutex> l(session_state.second->lock);
1531  if (session_state.second->ref_count > 0) continue;
1532  // A session closed by other means is in the process of being removed, and it's
1533  // best not to interfere.
1534  if (session_state.second->closed || session_state.second->expired) continue;
1535  int64_t last_accessed_ms = session_state.second->last_accessed_ms;
1536  if (now - last_accessed_ms <= (FLAGS_idle_session_timeout * 1000)) continue;
1537  LOG(INFO) << "Expiring session: " << session_state.first << ", user:"
1538  << session_state.second->connected_user << ", last active: "
1539  << TimestampValue(last_accessed_ms / 1000).DebugString();
1540  session_state.second->expired = true;
1541  ImpaladMetrics::NUM_SESSIONS_EXPIRED->Increment(1L);
1542  // Since expired is true, no more queries will be added to the inflight list.
1543  inflight_queries.insert(session_state.second->inflight_queries.begin(),
1544  session_state.second->inflight_queries.end());
1545  }
1546  // Unregister all open queries from this session.
1547  Status status("Session expired due to inactivity");
1548  BOOST_FOREACH(const TUniqueId& query_id, inflight_queries) {
1549  cancellation_thread_pool_->Offer(CancellationWork(query_id, status, true));
1550  }
1551  }
1552  }
1553 }
1554 
1556  while (true) {
1557  // The following block accomplishes three things:
1558  //
1559  // 1. Update the ordered list of queries by checking the 'idle_time' parameter in
1560  // query_exec_state. We are able to avoid doing this for *every* query in flight
1561  // thanks to the observation that expiry times never move backwards, only
1562  // forwards. Therefore once we find a query that a) hasn't changed its idle time and
1563  // b) has not yet expired we can stop moving through the list. If the idle time has
1564  // changed, we need to re-insert the query in the right place in queries_by_timestamp_
1565  //
1566  // 2. Remove any queries that would have expired but have already been closed for any
1567  // reason.
1568  //
1569  // 3. Compute the next time a query *might* expire, so that the sleep at the end of
1570  // this loop has an accurate duration to wait. If the list of queries is empty, the
1571  // default sleep duration is half the idle query timeout.
1572  int64_t now;
1573  {
1574  lock_guard<mutex> l(query_expiration_lock_);
1575  ExpirationQueue::iterator expiration_event = queries_by_timestamp_.begin();
1576  now = UnixMillis();
1577  while (expiration_event != queries_by_timestamp_.end()) {
1578  // If the last-observed expiration time for this query is still in the future, we
1579  // know that the true expiration time will be at least that far off. So we can
1580  // break here and sleep.
1581  if (expiration_event->first > now) break;
1582  shared_ptr<QueryExecState> query_state =
1583  GetQueryExecState(expiration_event->second, false);
1584  if (query_state.get() == NULL) {
1585  // Query was deleted some other way.
1586  queries_by_timestamp_.erase(expiration_event++);
1587  continue;
1588  }
1589  // First, check the actual expiration time in case the query has updated it
1590  // since the last time we looked.
1591  int32_t timeout_s = query_state->query_options().query_timeout_s;
1592  if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) {
1593  timeout_s = min(FLAGS_idle_query_timeout, timeout_s);
1594  } else {
1595  // Use a non-zero timeout, if one exists
1596  timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
1597  }
1598  int64_t expiration = query_state->last_active() + (timeout_s * 1000L);
1599  if (now < expiration) {
1600  // If the real expiration date is in the future we may need to re-insert the
1601  // query's expiration event at its correct location.
1602  if (expiration == expiration_event->first) {
1603  // The query hasn't been updated since it was inserted, so we know (by the
1604  // fact that queries are inserted in-expiration-order initially) that it is
1605  // still the next query to expire. No need to re-insert it.
1606  break;
1607  } else {
1608  // Erase and re-insert with an updated expiration time.
1609  TUniqueId query_id = expiration_event->second;
1610  queries_by_timestamp_.erase(expiration_event++);
1611  queries_by_timestamp_.insert(make_pair(expiration, query_id));
1612  }
1613  } else if (!query_state->is_active()) {
1614  // Otherwise time to expire this query
1615  VLOG_QUERY << "Expiring query due to client inactivity: "
1616  << expiration_event->second << ", last activity was at: "
1617  << TimestampValue(query_state->last_active()).DebugString();
1618  const string& err_msg = Substitute(
1619  "Query $0 expired due to client inactivity (timeout is $1)",
1620  PrintId(expiration_event->second),
1621  PrettyPrinter::Print(timeout_s * 1000000000L, TUnit::TIME_NS));
1622 
1624  CancellationWork(expiration_event->second, Status(err_msg), false));
1625  queries_by_timestamp_.erase(expiration_event++);
1626  ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
1627  } else {
1628  // Iterator is moved on in every other branch.
1629  ++expiration_event;
1630  }
1631  }
1632  }
1633  // Since we only allow timeouts to be 1s or greater, the earliest that any new query
1634  // could expire is in 1s time. An existing query may expire sooner, but we are
1635  // comfortable with a maximum error of 1s as a trade-off for not frequently waking
1636  // this thread.
1637  SleepForMs(1000L);
1638  }
1639 }
1640 
1641 Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int be_port,
1642  ThriftServer** beeswax_server, ThriftServer** hs2_server, ThriftServer** be_server,
1643  ImpalaServer** impala_server) {
1644  DCHECK((beeswax_port == 0) == (beeswax_server == NULL));
1645  DCHECK((hs2_port == 0) == (hs2_server == NULL));
1646  DCHECK((be_port == 0) == (be_server == NULL));
1647 
1648  shared_ptr<ImpalaServer> handler(new ImpalaServer(exec_env));
1649 
1650  if (beeswax_port != 0 && beeswax_server != NULL) {
1651  // Beeswax FE must be a TThreadPoolServer because ODBC and Hue only support
1652  // TThreadPoolServer.
1653  shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
1654  shared_ptr<TProcessorEventHandler> event_handler(
1655  new RpcEventHandler("beeswax", exec_env->metrics()));
1656  beeswax_processor->setEventHandler(event_handler);
1657  *beeswax_server = new ThriftServer(BEESWAX_SERVER_NAME, beeswax_processor,
1658  beeswax_port, AuthManager::GetInstance()->GetExternalAuthProvider(),
1659  exec_env->metrics(), FLAGS_fe_service_threads, ThriftServer::ThreadPool);
1660 
1661  (*beeswax_server)->SetConnectionHandler(handler.get());
1662  if (!FLAGS_ssl_server_certificate.empty()) {
1663  LOG(INFO) << "Enabling SSL for Beeswax";
1664  RETURN_IF_ERROR((*beeswax_server)->EnableSsl(
1665  FLAGS_ssl_server_certificate, FLAGS_ssl_private_key));
1666  }
1667 
1668  LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_port;
1669  }
1670 
1671  if (hs2_port != 0 && hs2_server != NULL) {
1672  // HiveServer2 JDBC driver does not support non-blocking server.
1673  shared_ptr<TProcessor> hs2_fe_processor(
1674  new ImpalaHiveServer2ServiceProcessor(handler));
1675  shared_ptr<TProcessorEventHandler> event_handler(
1676  new RpcEventHandler("hs2", exec_env->metrics()));
1677  hs2_fe_processor->setEventHandler(event_handler);
1678 
1679  *hs2_server = new ThriftServer(HS2_SERVER_NAME, hs2_fe_processor, hs2_port,
1680  AuthManager::GetInstance()->GetExternalAuthProvider(), exec_env->metrics(),
1681  FLAGS_fe_service_threads, ThriftServer::ThreadPool);
1682 
1683  (*hs2_server)->SetConnectionHandler(handler.get());
1684  if (!FLAGS_ssl_server_certificate.empty()) {
1685  LOG(INFO) << "Enabling SSL for HiveServer2";
1686  RETURN_IF_ERROR((*hs2_server)->EnableSsl(
1687  FLAGS_ssl_server_certificate, FLAGS_ssl_private_key));
1688  }
1689 
1690  LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_port;
1691  }
1692 
1693  if (be_port != 0 && be_server != NULL) {
1694  shared_ptr<FragmentMgr> fragment_mgr(new FragmentMgr());
1695  shared_ptr<ImpalaInternalService> thrift_if(
1696  new ImpalaInternalService(handler, fragment_mgr));
1697  shared_ptr<TProcessor> be_processor(new ImpalaInternalServiceProcessor(thrift_if));
1698  shared_ptr<TProcessorEventHandler> event_handler(
1699  new RpcEventHandler("backend", exec_env->metrics()));
1700  be_processor->setEventHandler(event_handler);
1701 
1702  *be_server = new ThriftServer("backend", be_processor, be_port, NULL,
1703  exec_env->metrics(), FLAGS_be_service_threads);
1704 
1705  LOG(INFO) << "ImpalaInternalService listening on " << be_port;
1706  }
1707  if (impala_server != NULL) *impala_server = handler.get();
1708 
1709  return Status::OK;
1710 }
1711 
1713  TUniqueId* session_id) {
1714  DCHECK(session_id != NULL);
1715  lock_guard<mutex> l(query_exec_state_map_lock_);
1716  QueryExecStateMap::iterator i = query_exec_state_map_.find(query_id);
1717  if (i == query_exec_state_map_.end()) {
1718  return false;
1719  } else {
1720  *session_id = i->second->session_id();
1721  return true;
1722  }
1723 }
1724 
1725 shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState(
1726  const TUniqueId& query_id, bool lock) {
1727  lock_guard<mutex> l(query_exec_state_map_lock_);
1728  QueryExecStateMap::iterator i = query_exec_state_map_.find(query_id);
1729  if (i == query_exec_state_map_.end()) {
1730  return shared_ptr<QueryExecState>();
1731  } else {
1732  if (lock) i->second->lock()->lock();
1733  return i->second;
1734  }
1735 }
1736 
1737 void ImpalaServer::SetOffline(bool is_offline) {
1738  lock_guard<mutex> l(is_offline_lock_);
1739  is_offline_ = is_offline;
1740  ImpaladMetrics::IMPALA_SERVER_READY->set_value(is_offline);
1741 }
1742 
1744  DCHECK(FLAGS_enable_rm);
1745  if (FLAGS_local_nodemanager_url.empty()) {
1746  LOG(WARNING) << "No NM address set (--nm_addr is empty), no NM failure detection "
1747  << "thread started";
1748  return;
1749  }
1750  // We only want a network address to open a socket to, for now. Get rid of http(s)://
1751  // prefix, and split the string into hostname:port.
1752  if (istarts_with(FLAGS_local_nodemanager_url, "http://")) {
1753  FLAGS_local_nodemanager_url =
1754  FLAGS_local_nodemanager_url.substr(string("http://").size());
1755  } else if (istarts_with(FLAGS_local_nodemanager_url, "https://")) {
1756  FLAGS_local_nodemanager_url =
1757  FLAGS_local_nodemanager_url.substr(string("https://").size());
1758  }
1759  vector<string> components;
1760  split(components, FLAGS_local_nodemanager_url, is_any_of(":"));
1761  if (components.size() < 2) {
1762  LOG(ERROR) << "Could not parse network address from --local_nodemanager_url, no NM"
1763  << " failure detection thread started";
1764  return;
1765  }
1766  DCHECK_GE(components.size(), 2);
1767  TNetworkAddress nm_addr =
1768  MakeNetworkAddress(components[0], atoi(components[1].c_str()));
1769 
1772  struct addrinfo* addr;
1773  if (getaddrinfo(nm_addr.hostname.c_str(), components[1].c_str(), NULL, &addr)) {
1774  LOG(WARNING) << "Could not resolve NM address: " << nm_addr << ". Error was: "
1775  << GetStrErrMsg();
1776  return;
1777  }
1778  LOG(INFO) << "Starting NM failure-detection thread, NM at: " << nm_addr;
1779  // True if the last time through the loop Impala had failed, otherwise false. Used to
1780  // only change the offline status when there's a change in state.
1781  bool last_failure_state = false;
1782  while (true) {
1783  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
1784  if (sockfd >= 0) {
1785  if (connect(sockfd, addr->ai_addr, sizeof(sockaddr)) < 0) {
1786  failure_detector.UpdateHeartbeat(FLAGS_local_nodemanager_url, false);
1787  } else {
1788  failure_detector.UpdateHeartbeat(FLAGS_local_nodemanager_url, true);
1789  }
1790  ::close(sockfd);
1791  } else {
1792  LOG(ERROR) << "Could not create socket! Error was: " << GetStrErrMsg();
1793  }
1794  bool is_failed = (failure_detector.GetPeerState(FLAGS_local_nodemanager_url) ==
1796  if (is_failed != last_failure_state) {
1797  if (is_failed) {
1798  LOG(WARNING) <<
1799  "ImpalaServer is going offline while local node-manager connectivity is bad";
1800  } else {
1801  LOG(WARNING) <<
1802  "Node-manager connectivity has been restored. ImpalaServer is now online";
1803  }
1804  SetOffline(is_failed);
1805  }
1806  last_failure_state = is_failed;
1807  SleepForMs(2000);
1808  }
1809  freeaddrinfo(addr);
1810 }
1811 
1812 }
bool operator==(const CancellationWork &other) const
void TransmitData(TTransmitDataResult &return_val, const TTransmitDataParams &params)
boost::scoped_ptr< ThreadPool< CancellationWork > > cancellation_thread_pool_
boost::scoped_ptr< Thread > audit_event_logger_flush_thread_
If audit event logging is enabled, wakes once every 5s to flush audit events to disk.
boost::mutex query_expiration_lock_
Guards queries_by_timestamp_. Must not be acquired before a session state lock.
void SetImpalaServer(ImpalaServer *server)
Definition: exec-env.h:69
void TQueryOptionsToMap(const TQueryOptions &query_options, std::map< std::string, std::string > *configuration)
Converts a TQueryOptions struct into a map of key, value pairs.
static IntGauge * IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS
boost::mutex query_exec_state_map_lock_
const std::string GetDetail() const
Definition: status.cc:184
const TExecRequest & exec_request() const
void MembershipCallback(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
void ToThrift(const TUniqueId &session_id, TSessionState *session_state)
void SetOffline(bool offline)
Set is_offline_ to the argument's value.
boost::mutex session_state_map_lock_
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
static void CreateMetrics(MetricGroup *m)
std::string SerializeToArchiveString() const
ProxyUserMap authorized_proxy_user_config_
boost::mutex query_log_lock_
Guards query_log_ and query_log_index_.
DEFINE_int64(max_result_cache_size, 100000L,"Maximum number of query results a client ""may request to be cached on a per-query basis to support restarting fetches. This ""option guards against unreasonably large result caches requested by clients. ""Requests exceeding this maximum will be rejected.")
TUniqueId query_id_
Definition: coordinator.h:194
boost::mutex connection_to_sessions_map_lock_
Protects connection_to_sessions_map_. May be taken before session_state_map_lock_.
const string AUDIT_EVENT_LOG_FILE_PREFIX
const TUniqueId & query_id() const
Definition: coordinator.h:152
Status CancelInternal(const TUniqueId &query_id, bool check_inflight, const Status *cause=NULL)
const string PROFILE_LOG_FILE_PREFIX
int64_t min_subscriber_catalog_topic_version_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void Redact(string *value, bool *changed)
Definition: redactor.cc:309
void CancelFromThreadPool(uint32_t thread_id, const CancellationWork &cancellation_work)
const TUniqueId & session_id() const
TUniqueId catalog_service_id
The CatalogService ID that this catalog version is from.
MetricGroup * GetChildGroup(const std::string &name)
Creates or returns an already existing child metric group.
Definition: metrics.cc:169
Webserver * webserver()
Definition: exec-env.h:84
void LogQueryEvents(const QueryExecState &exec_state)
Log audit and column lineage events.
void RegisterWebserverCallbacks(Webserver *webserver)
Registers all the per-Impalad webserver callbacks.
Status CloseSender(const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int sender_id)
const bool unregister() const
const std::string & connected_user() const
Status RegisterQuery(boost::shared_ptr< SessionState > session_state, const boost::shared_ptr< QueryExecState > &exec_state)
int64_t catalog_topic_version
The statestore catalog topic version this update was received in.
std::string PrintTCatalogOpType(const TCatalogOpType::type &type)
static Status Init()
Definition: tmp-file-mgr.cc:47
static std::string IMPALA_CATALOG_TOPIC
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
DECLARE_bool(abort_on_config_error)
boost::scoped_ptr< Thread > query_expiration_thread_
Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
virtual PeerState GetPeerState(const std::string &peer)
Returns the current estimated state of a peer.
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
static IntGauge * CATALOG_NUM_TABLES
static void PrepareQueryContext(TQueryCtx *query_ctx)
Frontend * frontend()
Definition: exec-env.h:91
DEFINE_int32(beeswax_port, 21000,"port on which Beeswax client requests are served")
boost::condition_variable catalog_version_update_cv_
Variable to signal when the catalog version has been modified.
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
std::string PrintTCatalogObjectType(const TCatalogObjectType::type &type)
bool operator<(const CancellationWork &other) const
Status Execute(TQueryCtx *query_ctx, boost::shared_ptr< SessionState > session_state, boost::shared_ptr< QueryExecState > *exec_state)
bool IsAuditEventLoggingEnabled()
Returns true if audit event logging is enabled, false otherwise.
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
bool IsLineageLoggingEnabled()
Returns true if lineage logging is enabled, false otherwise.
Status GetCatalogObject(const TCatalogObject &request, TCatalogObject *response)
Definition: frontend.cc:168
Status CloseSessionInternal(const TUniqueId &session_id, bool ignore_if_absent)
QueryStateRecord()
Default constructor used only when participating in collections.
boost::mutex query_locations_lock_
ImpalaServer(ExecEnv *exec_env)
ExpirationQueue queries_by_timestamp_
int64_t UnixMillis()
Definition: time.h:51
Status InitAuditEventLogging()
#define VLOG_QUERY
Definition: logging.h:57
bool operator()(const QueryStateRecord &lhs, const QueryStateRecord &rhs) const
Comparator that sorts by start time.
const std::string & effective_user() const
MetricGroup * metrics()
Definition: exec-env.h:85
boost::scoped_ptr< Thread > session_timeout_thread_
Thread that runs ExpireSessions if FLAGS_idle_session_timeout > 0.
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
void RemoveEntry(const std::string &hdfs_lib_file)
Removes the cache entry for 'hdfs_lib_file'.
Definition: lib-cache.cc:232
static BooleanProperty * CATALOG_READY
Status ProcessCatalogUpdateResult(const TCatalogUpdateResult &catalog_update_result, bool wait_for_all_subscribers)
static IntCounter * IMPALA_SERVER_NUM_QUERIES
boost::scoped_ptr< SimpleLogger > lineage_logger_
static const std::string IMPALA_MEMBERSHIP_TOPIC
CancellationWork(const TUniqueId &query_id, const Status &cause, bool unregister)
const string BEESWAX_SERVER_NAME
const string LINEAGE_LOG_FILE_PREFIX
QueryLocations query_locations_
boost::mutex is_offline_lock_
Protects is_offline_.
static IntCounter * NUM_SESSIONS_EXPIRED
DECLARE_int32(be_port)
virtual void GetExecSummary(impala::TExecSummary &result, const beeswax::QueryHandle &query_id)
Status GetSessionState(const TUniqueId &session_id, boost::shared_ptr< SessionState > *session_state, bool mark_active=false)
void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap &topic_deltas, std::vector< TTopicDelta > *topic_updates)
DEFINE_string(default_query_options,"","key=value pair of default query options for"" impalad, separated by ','")
ConnectionToSessionMap connection_to_sessions_map_
string GetStrErrMsg()
Definition: error-util.cc:30
virtual void ConnectionStart(const ThriftServer::ConnectionContext &session_context)
SessionHandlerIf methods.
bool GetSessionIdForQuery(const TUniqueId &query_id, TUniqueId *session_id)
Per-connection information.
Definition: thrift-server.h:45
static bool IsAuthorizationError(const Status &status)
Returns true if the error returned by the FE was due to an AuthorizationException.
Definition: frontend.cc:224
std::string DebugString(const T &val)
Definition: udf-debug.h:27
const uint32_t MAX_CANCELLATION_QUEUE_SIZE
SessionStateMap session_state_map_
boost::scoped_ptr< SimpleLogger > audit_event_logger_
boost::scoped_ptr< SimpleLogger > profile_logger_
Status GetRuntimeProfileStr(const TUniqueId &query_id, bool base64_encoded, std::stringstream *output)
Status ParseQueryOptions(const std::string &options, TQueryOptions *query_options)
#define EXIT_IF_ERROR(stmt)
Definition: status.h:248
std::string PrintTStmtType(const TStmtType::type &type)
TExecSummary exec_summary
Summary of execution for this query.
std::vector< beeswax::ConfigVariable > default_configs_
const TUniqueId & query_id() const
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
static LibCache * instance()
Definition: lib-cache.h:63
#define VLOG_ROW
Definition: logging.h:59
std::string connected_user
Connected user for this session, i.e. the user which originated this session.
const TimestampValue & start_time() const
void SetTStatus(T *status_container) const
Definition: status.h:213
QueryLogIndex query_log_index_
static IntCounter * NUM_QUERIES_EXPIRED
const TUniqueId & query_id() const
std::string PrintExecSummary(const TExecSummary &exec_summary)
Print the exec summary as a formatted table.
CatalogUpdateVersionInfo catalog_update_info_
The version information from the last successfull call to UpdateCatalog().
boost::shared_ptr< QueryExecState > GetQueryExecState(const TUniqueId &query_id, bool lock)
Snapshot of a query's state, archived in the query log.
ImpalaServer::SessionState * session() const
static BooleanProperty * IMPALA_SERVER_READY
Status GetDbNames(const std::string *pattern, const TSessionState *session, TGetDbsResult *table_names)
Definition: frontend.cc:128
void ReportExecStatus(TReportExecStatusResult &return_val, const TReportExecStatusParams &params)
Status SetQueryInflight(boost::shared_ptr< SessionState > session_state, const boost::shared_ptr< QueryExecState > &exec_state)
static AuthManager * GetInstance()
QueryExecStateMap query_exec_state_map_
bool is_offline_
True if Impala server is offline, false otherwise.
Status TCatalogObjectFromEntryKey(const string &key, TCatalogObject *catalog_object)
Definition: catalog-util.cc:53
void LineageLoggerFlushThread()
Runs once every 5s to flush the lineage log file to disk.
static IntGauge * IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS
Status CreateImpalaServer(ExecEnv *exec_env, int beeswax_port, int hs2_port, int be_port, ThriftServer **beeswax_server, ThriftServer **hs2_server, ThriftServer **be_server, ImpalaServer **impala_server)
std::string database
The default database (changed as a result of 'use' query execution)
std::string PrintTDdlType(const TDdlType::type &type)
static StringProperty * IMPALA_SERVER_START_TIME
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
Definition: uid-util.h:38
const string HS2_SERVER_NAME
void DropCache()
Removes all cached entries.
Definition: lib-cache.cc:262
Status AddData(const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, const TRowBatch &thrift_batch, int sender_id)
const std::string & do_as_user() const
void SetNeedsRefresh(const std::string &hdfs_lib_file)
Definition: lib-cache.cc:221
Status UpdateCatalogMetrics()
Updates the number of databases / tables metrics from the FE catalog.
static const Status OK
Definition: status.h:87
TStmtType::type stmt_type() const
const TExecSummary & exec_summary() const
Definition: coordinator.h:173
const std::string & sql_stmt() const
const Status & query_status() const
bool IsOffline()
Returns true if Impala is offline (and not accepting queries), false otherwise.
Status LogLineageRecord(const TExecRequest &request)
int64_t catalog_version
The last catalog version returned from UpdateCatalog()
boost::scoped_ptr< Thread > nm_failure_detection_thread_
Container thread for DetectNmFailures().
boost::scoped_ptr< Thread > profile_log_file_flush_thread_
If profile logging is enabled, wakes once every 5s to flush query profiles to disk.
ExecEnv * exec_env_
Definition: coordinator.h:193
DataStreamMgr * stream_mgr()
Definition: exec-env.h:75
Status DeserializeThriftMsg(JNIEnv *env, jbyteArray serialized_msg, T *deserialized_msg)
void ArchiveQuery(const QueryExecState &query)
TQueryOptions default_query_options_
Default query options in the form of TQueryOptions and beeswax::ConfigVariable.
ExecEnv * exec_env_
global, per-server state
virtual PeerState UpdateHeartbeat(const std::string &peer, bool seen)
BackendAddressMap known_backends_
void CloseConnections(const TNetworkAddress &address)
Definition: client-cache.h:277
Status GetTableNames(const std::string &db, const std::string *pattern, const TSessionState *session, TGetTablesResult *table_names)
Definition: frontend.cc:119
#define VLOG_FILE
Definition: logging.h:58
StatestoreSubscriber * subscriber()
Definition: exec-env.h:97
const RuntimeProfile & profile() const
Status ValidateSettings()
Validate Hadoop config; requires FE.
Definition: frontend.cc:183
void SetConnectionHandler(ConnectionHandlerIf *connection)
TNetworkAddress network_address
Client network address.
Status UpdateCatalogCache(const TUpdateCatalogCacheRequest &req, TUpdateCatalogCacheResponse *resp)
Definition: frontend.cc:105
DECLARE_string(nn)
bool ok() const
Definition: status.h:172
virtual void close(const beeswax::QueryHandle &handle)
TimestampValue start_time
Start and end time of the query.
Status GetExecRequest(const TQueryCtx &query_ctx, TExecRequest *result)
Call FE to get TExecRequest.
Definition: frontend.cc:173
std::string do_as_user
The user to delegate to. Empty for no delegation.
const int MAX_NM_MISSED_HEARTBEATS
boost::scoped_ptr< Thread > lineage_logger_flush_thread_
If lineage logging is enabled, wakes once every 5s to flush lineage events to disk.
virtual void ConnectionEnd(const ThriftServer::ConnectionContext &session_context)
void AuditEventLoggerFlushThread()
Runs once every 5s to flush the audit log file to disk.
TCatalogOpType::type catalog_op_type() const
boost::mutex catalog_version_lock_
static IntGauge * CATALOG_NUM_DBS
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
Status ExecuteInternal(const TQueryCtx &query_ctx, boost::shared_ptr< SessionState > session_state, bool *registered_exec_state, boost::shared_ptr< QueryExecState > *exec_state)
Implements Execute() logic, but doesn't unregister query on error.
const Status & cause() const
void LogFileFlushThread()
Runs once every 5s to flush the profile log file to disk.
static TimestampValue LocalTime()
DEFINE_bool(log_query_to_file, true,"if true, logs completed query profiles to file.")
Status AuthorizeProxyUser(const std::string &user, const std::string &do_as_user)
std::string DebugString() const
ImpalaInternalServiceClientCache * impalad_client_cache()
Definition: exec-env.h:76
Status UnregisterQuery(const TUniqueId &query_id, bool check_inflight, const Status *cause=NULL)
Status LogAuditRecord(const QueryExecState &exec_state, const TExecRequest &request)