Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
exec-env.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/exec-env.h"
16 
17 #include <vector>
18 
19 #include <boost/algorithm/string.hpp>
20 #include <gflags/gflags.h>
21 #include <gutil/strings/substitute.h>
22 
23 #include "common/logging.h"
25 #include "runtime/client-cache.h"
27 #include "runtime/disk-io-mgr.h"
29 #include "runtime/hdfs-fs-cache.h"
30 #include "runtime/lib-cache.h"
31 #include "runtime/mem-tracker.h"
34 #include "service/frontend.h"
37 #include "util/debug-util.h"
39 #include "util/mem-info.h"
40 #include "util/metrics.h"
41 #include "util/network-util.h"
42 #include "util/parse-util.h"
43 #include "util/memory-metrics.h"
44 #include "util/webserver.h"
45 #include "util/mem-info.h"
46 #include "util/debug-util.h"
47 #include "util/cgroups-mgr.h"
48 #include "util/memory-metrics.h"
49 #include "util/pretty-printer.h"
50 #include "gen-cpp/ImpalaInternalService.h"
51 #include "gen-cpp/CatalogService.h"
52 
53 #include "common/names.h"
54 
55 using boost::algorithm::is_any_of;
56 using boost::algorithm::split;
57 using boost::algorithm::to_lower;
58 using boost::algorithm::token_compress_on;
59 using namespace strings;
60 
61 DEFINE_bool(use_statestore, true,
62  "Use an external statestore process to manage cluster membership");
63 DEFINE_string(catalog_service_host, "localhost",
64  "hostname where CatalogService is running");
65 DEFINE_bool(enable_webserver, true, "If true, debug webserver is enabled");
66 DEFINE_string(state_store_host, "localhost",
67  "hostname where StatestoreService is running");
68 DEFINE_int32(state_store_subscriber_port, 23000,
69  "port where StatestoreSubscriberService should be exported");
70 DEFINE_int32(num_hdfs_worker_threads, 16,
71  "(Advanced) The number of threads in the global HDFS operation pool");
72 
73 DECLARE_int32(state_store_port);
74 DECLARE_int32(num_threads_per_core);
75 DECLARE_int32(num_cores);
76 DECLARE_int32(be_port);
77 DECLARE_string(mem_limit);
78 
79 DEFINE_bool(enable_rm, false, "Whether to enable resource management. If enabled, "
80  "-fair_scheduler_allocation_path is required.");
81 DEFINE_int32(llama_callback_port, 28000,
82  "Port where Llama notification callback should be started");
83 // TODO: Deprecate llama_host and llama_port in favor of the new llama_hostports.
84 // This needs to be coordinated with CM.
85 DEFINE_string(llama_host, "",
86  "Host of Llama service that the resource broker should connect to");
87 DEFINE_int32(llama_port, 15000,
88  "Port of Llama service that the resource broker should connect to");
89 DEFINE_string(llama_addresses, "",
90  "Llama availability group given as a comma-separated list of hostports.");
91 DEFINE_int64(llama_registration_timeout_secs, 30,
92  "Maximum number of seconds that Impala will attempt to (re-)register "
93  "with Llama before aborting the triggering action with an error "
94  "(e.g. Impalad startup or a Llama RPC request). "
95  "A setting of -1 means try indefinitely.");
96 DEFINE_int64(llama_registration_wait_secs, 3,
97  "Number of seconds to wait between attempts during Llama registration.");
98 DEFINE_int64(llama_max_request_attempts, 5,
99  "Maximum number of times a non-registration Llama RPC request "
100  "(reserve/expand/release, etc.) is retried until the request is aborted. "
101  "An attempt is counted once Impala is registered with Llama, i.e., a "
102  "request survives at most llama_max_request_attempts-1 re-registrations.");
103 DEFINE_string(cgroup_hierarchy_path, "", "If Resource Management is enabled, this must "
104  "be set to the Impala-writeable root of the cgroups hierarchy into which execution "
105  "threads are assigned.");
106 DEFINE_string(staging_cgroup, "impala_staging", "Name of the cgroup that a query's "
107  "execution threads are moved into once the query completes.");
108 
109 // Use a low default value because the reconnection logic is performed manually
110 // for the purpose of faster Llama failover (otherwise we may try to reconnect to the
111 // inactive Llama for a long time).
112 DEFINE_int32(resource_broker_cnxn_attempts, 1, "The number of times to retry an "
113  "RPC connection to Llama. A setting of 0 means retry indefinitely");
114 DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, "The interval, in ms, "
115  "to wait between attempts to make an RPC connection to the Llama.");
116 DEFINE_int32(resource_broker_send_timeout, 0, "Time to wait, in ms, "
117  "for the underlying socket of an RPC to Llama to successfully send data. "
118  "A setting of 0 means the socket will wait indefinitely.");
119 DEFINE_int32(resource_broker_recv_timeout, 0, "Time to wait, in ms, "
120  "for the underlying socket of an RPC to Llama to successfully receive data. "
121  "A setting of 0 means the socket will wait indefinitely.");
122 
123 // The key for a variable set in Impala's test environment only, to allow the
124 // resource-broker to correctly map node addresses into a form that Llama understand.
125 const static string PSEUDO_DISTRIBUTED_CONFIG_KEY =
126  "yarn.scheduler.include-port-in-node-name";
127 
128 namespace impala {
129 
130 ExecEnv* ExecEnv::exec_env_ = NULL;
131 
132 ExecEnv::ExecEnv()
133  : stream_mgr_(new DataStreamMgr()),
134  impalad_client_cache_(new ImpalaInternalServiceClientCache()),
135  catalogd_client_cache_(new CatalogServiceClientCache()),
136  htable_factory_(new HBaseTableFactory()),
137  disk_io_mgr_(new DiskIoMgr()),
138  webserver_(new Webserver()),
139  metrics_(new MetricGroup("impala-metrics")),
140  mem_tracker_(NULL),
141  thread_mgr_(new ThreadResourceMgr),
142  cgroups_mgr_(NULL),
143  hdfs_op_thread_pool_(
144  CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)),
145  request_pool_service_(new RequestPoolService(metrics_.get())),
146  frontend_(new Frontend()),
147  enable_webserver_(FLAGS_enable_webserver),
148  tz_database_(TimezoneDatabase()),
149  is_fe_tests_(false),
150  backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)),
151  is_pseudo_distributed_llama_(false) {
152  if (FLAGS_enable_rm) InitRm();
153  // Initialize the scheduler either dynamically (with a statestore) or statically (with
154  // a standalone single backend)
155  if (FLAGS_use_statestore) {
156  TNetworkAddress subscriber_address =
157  MakeNetworkAddress(FLAGS_hostname, FLAGS_state_store_subscriber_port);
158  TNetworkAddress statestore_address =
159  MakeNetworkAddress(FLAGS_state_store_host, FLAGS_state_store_port);
160 
162  Substitute("impalad@$0", TNetworkAddressToString(backend_address_)),
163  subscriber_address, statestore_address, metrics_.get()));
164 
167  webserver_.get(), resource_broker_.get(), request_pool_service_.get()));
168  } else {
169  vector<TNetworkAddress> addresses;
170  addresses.push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
171  scheduler_.reset(new SimpleScheduler(addresses, metrics_.get(), webserver_.get(),
173  }
174  if (exec_env_ == NULL) exec_env_ = this;
175  if (FLAGS_enable_rm) resource_broker_->set_scheduler(scheduler_.get());
176 }
177 
178 ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
179  int webserver_port, const string& statestore_host, int statestore_port)
180  : stream_mgr_(new DataStreamMgr()),
181  impalad_client_cache_(new ImpalaInternalServiceClientCache()),
182  catalogd_client_cache_(new CatalogServiceClientCache()),
183  htable_factory_(new HBaseTableFactory()),
184  disk_io_mgr_(new DiskIoMgr()),
185  webserver_(new Webserver(webserver_port)),
186  metrics_(new MetricGroup("impala-metrics")),
187  mem_tracker_(NULL),
188  thread_mgr_(new ThreadResourceMgr),
189  hdfs_op_thread_pool_(
190  CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)),
191  frontend_(new Frontend()),
192  enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
193  tz_database_(TimezoneDatabase()),
194  is_fe_tests_(false),
195  backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)),
196  is_pseudo_distributed_llama_(false) {
198  if (FLAGS_enable_rm) InitRm();
199 
200  if (FLAGS_use_statestore && statestore_port > 0) {
201  TNetworkAddress subscriber_address =
202  MakeNetworkAddress(hostname, subscriber_port);
203  TNetworkAddress statestore_address =
204  MakeNetworkAddress(statestore_host, statestore_port);
205 
207  Substitute("impalad@$0", TNetworkAddressToString(backend_address_)),
208  subscriber_address, statestore_address, metrics_.get()));
209 
212  webserver_.get(), resource_broker_.get(), request_pool_service_.get()));
213  } else {
214  vector<TNetworkAddress> addresses;
215  addresses.push_back(MakeNetworkAddress(hostname, backend_port));
216  scheduler_.reset(new SimpleScheduler(addresses, metrics_.get(), webserver_.get(),
218  }
219  if (exec_env_ == NULL) exec_env_ = this;
220  if (FLAGS_enable_rm) resource_broker_->set_scheduler(scheduler_.get());
221 }
222 
224  // Unique addresses from FLAGS_llama_addresses and FLAGS_llama_host/FLAGS_llama_port.
225  vector<TNetworkAddress> llama_addresses;
226  if (!FLAGS_llama_addresses.empty()) {
227  vector<string> components;
228  split(components, FLAGS_llama_addresses, is_any_of(","), token_compress_on);
229  for (int i = 0; i < components.size(); ++i) {
230  to_lower(components[i]);
231  TNetworkAddress llama_address = MakeNetworkAddress(components[i]);
232  if (find(llama_addresses.begin(), llama_addresses.end(), llama_address)
233  == llama_addresses.end()) {
234  llama_addresses.push_back(llama_address);
235  }
236  }
237  }
238  // Add Llama hostport from deprecated flags (if it does not already exist).
239  if (!FLAGS_llama_host.empty()) {
240  to_lower(FLAGS_llama_host);
241  TNetworkAddress llama_address =
242  MakeNetworkAddress(FLAGS_llama_host, FLAGS_llama_port);
243  if (find(llama_addresses.begin(), llama_addresses.end(), llama_address)
244  == llama_addresses.end()) {
245  llama_addresses.push_back(llama_address);
246  }
247  }
248  for (int i = 0; i < llama_addresses.size(); ++i) {
249  LOG(INFO) << "Llama address " << i << ": " << llama_addresses[i];
250  }
251 
252  TNetworkAddress llama_callback_address =
253  MakeNetworkAddress(FLAGS_hostname, FLAGS_llama_callback_port);
254  resource_broker_.reset(new ResourceBroker(llama_addresses, llama_callback_address,
255  metrics_.get()));
256  cgroups_mgr_.reset(new CgroupsMgr(metrics_.get()));
257 
258  TGetHadoopConfigRequest config_request;
259  config_request.__set_name(PSEUDO_DISTRIBUTED_CONFIG_KEY);
260  TGetHadoopConfigResponse config_response;
261  frontend_->GetHadoopConfig(config_request, &config_response);
262  if (config_response.__isset.value) {
263  to_lower(config_response.value);
264  is_pseudo_distributed_llama_ = (config_response.value == "true");
265  } else {
267  }
269  LOG(INFO) << "Pseudo-distributed Llama cluster detected";
270  }
271 }
272 
274 }
275 
277  mem_tracker_.reset(new MemTracker(-1, -1, "Process"));
278  is_fe_tests_ = true;
279  return Status::OK;
280 }
281 
283  LOG(INFO) << "Starting global services";
284 
285  if (FLAGS_enable_rm) {
286  // Initialize the resource broker to make sure the Llama is up and reachable.
287  DCHECK(resource_broker_.get() != NULL);
289  DCHECK(cgroups_mgr_.get() != NULL);
291  cgroups_mgr_->Init(FLAGS_cgroup_hierarchy_path, FLAGS_staging_cgroup));
292  }
293 
294  // Initialize global memory limit.
295  // Depending on the system configuration, we will have to calculate the process
296  // memory limit either based on the available physical memory, or if overcommitting
297  // is turned off, we use the memory commit limit from /proc/meminfo (see
298  // IMPALA-1690).
299  // --mem_limit="" means no memory limit
300  int64_t bytes_limit = 0;
301  bool is_percent;
302 
303  if (MemInfo::vm_overcommit() == 2 &&
305  bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent,
307  // There might be the case of misconfiguration, when on a system swap is disabled
308  // and overcommitting is turned off the actual usable memory is less than the
309  // available physical memory.
310  LOG(WARNING) << "This system shows a discrepancy between the available "
311  << "memory and the memory commit limit allowed by the "
312  << "operating system. ( Mem: " << MemInfo::physical_mem()
313  << "<=> CommitLimit: "
314  << MemInfo::commit_limit() << "). "
315  << "Impala will adhere to the smaller value by setting the "
316  << "process memory limit to " << bytes_limit << " "
317  << "Please verify the system configuration. Specifically, "
318  << "/proc/sys/vm/overcommit_memory and "
319  << "/proc/sys/vm/overcommit_ratio.";
320  } else {
321  bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent,
323  }
324 
325  if (bytes_limit < 0) {
326  return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'.");
327  }
328  // Minimal IO Buffer requirements:
329  // IO buffer (8MB default) * number of IO buffers per thread (5) *
330  // number of threads per core * number of cores
331  int64_t min_requirement = disk_io_mgr_->max_read_buffer_size() *
333  FLAGS_num_threads_per_core * FLAGS_num_cores;
334  if (bytes_limit < min_requirement) {
335  LOG(WARNING) << "Memory limit "
336  << PrettyPrinter::Print(bytes_limit, TUnit::BYTES)
337  << " does not meet minimal memory requirement of "
338  << PrettyPrinter::Print(min_requirement, TUnit::BYTES);
339  }
340 
341  metrics_->Init(enable_webserver_ ? webserver_.get() : NULL);
342  impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
343  catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
345 
346 #ifndef ADDRESS_SANITIZER
347  // Limit of -1 means no memory limit.
349  bytes_limit > 0 ? bytes_limit : -1, -1, "Process"));
350 
351  // Since tcmalloc does not free unused memory, we may exceed the process mem limit even
352  // if Impala is not actually using that much memory. Add a callback to free any unused
353  // memory if we hit the process limit.
354  mem_tracker_->AddGcFunction(boost::bind(&MallocExtension::ReleaseFreeMemory,
355  MallocExtension::instance()));
356 #else
357  // tcmalloc metrics aren't defined in ASAN builds, just use the default behavior to
358  // track process memory usage (sum of all children trackers).
359  mem_tracker_.reset(new MemTracker(bytes_limit > 0 ? bytes_limit : -1, -1, "Process"));
360 #endif
361 
362  mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process");
363 
364  if (bytes_limit > MemInfo::physical_mem()) {
365  LOG(WARNING) << "Memory limit "
366  << PrettyPrinter::Print(bytes_limit, TUnit::BYTES)
367  << " exceeds physical memory of "
368  << PrettyPrinter::Print(MemInfo::physical_mem(), TUnit::BYTES);
369  }
370  LOG(INFO) << "Using global memory limit: "
371  << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
372 
374 
375  // Start services in order to ensure that dependencies between them are met
376  if (enable_webserver_) {
378  RETURN_IF_ERROR(webserver_->Start());
379  } else {
380  LOG(INFO) << "Not starting webserver";
381  }
382 
383  if (scheduler_ != NULL) RETURN_IF_ERROR(scheduler_->Init());
384 
385  // Must happen after all topic registrations / callbacks are done
386  if (statestore_subscriber_.get() != NULL) {
387  Status status = statestore_subscriber_->Start();
388  if (!status.ok()) {
389  status.AddDetail("State Store Subscriber did not start up.");
390  return status;
391  }
392  }
393 
394  return Status::OK;
395 }
396 
397 }
boost::scoped_ptr< DiskIoMgr > disk_io_mgr_
Definition: exec-env.h:126
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
DEFINE_bool(use_statestore, true,"Use an external statestore process to manage cluster membership")
void InitRm()
Initialise cgroups manager, detect test RM environment and init resource broker.
Definition: exec-env.cc:223
boost::scoped_ptr< MemTracker > mem_tracker_
Definition: exec-env.h:129
boost::scoped_ptr< RequestPoolService > request_pool_service_
Definition: exec-env.h:133
TNetworkAddress backend_address_
Address of the Impala backend server instance.
Definition: exec-env.h:147
Functions to load and access the timestamp database.
Status InitForFeTests()
Initializes the exec env for running FE tests.
Definition: exec-env.cc:276
boost::scoped_ptr< Scheduler > scheduler_
Definition: exec-env.h:121
bool enable_webserver_
Definition: exec-env.h:139
void AddDefaultUrlCallbacks(Webserver *webserver, MemTracker *process_mem_tracker=NULL)
boost::scoped_ptr< CgroupsMgr > cgroups_mgr_
Definition: exec-env.h:131
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
Definition: mem-info.h:36
static ExecEnv * exec_env_
Definition: exec-env.h:142
Status RegisterMemoryMetrics(MetricGroup *metrics, bool register_jvm_metrics)
DECLARE_int32(state_store_port)
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
static int64_t ParseMemSpec(const std::string &mem_spec_str, bool *is_percent, int64_t relative_reference)
Definition: parse-util.cc:23
boost::scoped_ptr< ResourceBroker > resource_broker_
Definition: exec-env.h:120
boost::scoped_ptr< Frontend > frontend_
Definition: exec-env.h:134
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
Definition: status.cc:166
boost::scoped_ptr< ImpalaInternalServiceClientCache > impalad_client_cache_
Definition: exec-env.h:123
static PhysicalBytesMetric * PHYSICAL_BYTES_RESERVED
boost::scoped_ptr< StatestoreSubscriber > statestore_subscriber_
Definition: exec-env.h:122
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
DEFINE_int32(state_store_subscriber_port, 23000,"port where StatestoreSubscriberService should be exported")
bool is_pseudo_distributed_llama_
Definition: exec-env.h:152
static int32_t vm_overcommit()
Returns the systems memory overcommit settings, typically the values are 0,1, and 2...
Definition: mem-info.h:42
static const int DEFAULT_QUEUE_CAPACITY
Definition: disk-io-mgr.h:619
DEFINE_int64(llama_registration_timeout_secs, 30,"Maximum number of seconds that Impala will attempt to (re-)register ""with Llama before aborting the triggering action with an error ""(e.g. Impalad startup or a Llama RPC request). ""A setting of -1 means try indefinitely.")
bool is_fe_tests_
Definition: exec-env.h:144
boost::scoped_ptr< CatalogServiceClientCache > catalogd_client_cache_
Definition: exec-env.h:124
DECLARE_string(mem_limit)
virtual Status StartServices()
Starts any dependent services in their correct order.
Definition: exec-env.cc:282
This class is thread-safe.
Definition: mem-tracker.h:61
boost::scoped_ptr< MetricGroup > metrics_
Definition: exec-env.h:128
virtual ~ExecEnv()
Definition: exec-env.cc:273
static const string PSEUDO_DISTRIBUTED_CONFIG_KEY
Definition: exec-env.cc:125
static const Status OK
Definition: status.h:87
ExecEnv * exec_env_
Definition: coordinator.h:193
DEFINE_string(catalog_service_host,"localhost","hostname where CatalogService is running")
bool ok() const
Definition: status.h:172
HdfsOpThreadPool * CreateHdfsOpThreadPool(const std::string &name, uint32_t num_threads, uint32_t max_queue_length)
Creates a new HdfsOp-processing thread pool.
boost::scoped_ptr< Webserver > webserver_
Definition: exec-env.h:127
static int64_t commit_limit()
Definition: mem-info.h:49