19 #include <boost/algorithm/string.hpp>
20 #include <gflags/gflags.h>
21 #include <gutil/strings/substitute.h>
50 #include "gen-cpp/ImpalaInternalService.h"
51 #include "gen-cpp/CatalogService.h"
55 using boost::algorithm::is_any_of;
56 using boost::algorithm::split;
57 using boost::algorithm::to_lower;
58 using boost::algorithm::token_compress_on;
59 using namespace strings;
62 "Use an external statestore process to manage cluster membership");
64 "hostname where CatalogService is running");
65 DEFINE_bool(enable_webserver,
true,
"If true, debug webserver is enabled");
67 "hostname where StatestoreService is running");
69 "port where StatestoreSubscriberService should be exported");
71 "(Advanced) The number of threads in the global HDFS operation pool");
79 DEFINE_bool(enable_rm,
false,
"Whether to enable resource management. If enabled, "
80 "-fair_scheduler_allocation_path is required.");
82 "Port where Llama notification callback should be started");
86 "Host of Llama service that the resource broker should connect to");
88 "Port of Llama service that the resource broker should connect to");
90 "Llama availability group given as a comma-separated list of hostports.");
92 "Maximum number of seconds that Impala will attempt to (re-)register "
93 "with Llama before aborting the triggering action with an error "
94 "(e.g. Impalad startup or a Llama RPC request). "
95 "A setting of -1 means try indefinitely.");
97 "Number of seconds to wait between attempts during Llama registration.");
99 "Maximum number of times a non-registration Llama RPC request "
100 "(reserve/expand/release, etc.) is retried until the request is aborted. "
101 "An attempt is counted once Impala is registered with Llama, i.e., a "
102 "request survives at most llama_max_request_attempts-1 re-registrations.");
103 DEFINE_string(cgroup_hierarchy_path,
"",
"If Resource Management is enabled, this must "
104 "be set to the Impala-writeable root of the cgroups hierarchy into which execution "
105 "threads are assigned.");
106 DEFINE_string(staging_cgroup,
"impala_staging",
"Name of the cgroup that a query's "
107 "execution threads are moved into once the query completes.");
112 DEFINE_int32(resource_broker_cnxn_attempts, 1,
"The number of times to retry an "
113 "RPC connection to Llama. A setting of 0 means retry indefinitely");
114 DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000,
"The interval, in ms, "
115 "to wait between attempts to make an RPC connection to the Llama.");
116 DEFINE_int32(resource_broker_send_timeout, 0,
"Time to wait, in ms, "
117 "for the underlying socket of an RPC to Llama to successfully send data. "
118 "A setting of 0 means the socket will wait indefinitely.");
119 DEFINE_int32(resource_broker_recv_timeout, 0,
"Time to wait, in ms, "
120 "for the underlying socket of an RPC to Llama to successfully receive data. "
121 "A setting of 0 means the socket will wait indefinitely.");
126 "yarn.scheduler.include-port-in-node-name";
143 hdfs_op_thread_pool_(
147 enable_webserver_(FLAGS_enable_webserver),
151 is_pseudo_distributed_llama_(false) {
152 if (FLAGS_enable_rm)
InitRm();
155 if (FLAGS_use_statestore) {
156 TNetworkAddress subscriber_address =
158 TNetworkAddress statestore_address =
163 subscriber_address, statestore_address,
metrics_.get()));
169 vector<TNetworkAddress> addresses;
178 ExecEnv::ExecEnv(
const string& hostname,
int backend_port,
int subscriber_port,
179 int webserver_port,
const string& statestore_host,
int statestore_port)
185 webserver_(new
Webserver(webserver_port)),
189 hdfs_op_thread_pool_(
192 enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
196 is_pseudo_distributed_llama_(false) {
198 if (FLAGS_enable_rm)
InitRm();
200 if (FLAGS_use_statestore && statestore_port > 0) {
201 TNetworkAddress subscriber_address =
203 TNetworkAddress statestore_address =
208 subscriber_address, statestore_address,
metrics_.get()));
214 vector<TNetworkAddress> addresses;
225 vector<TNetworkAddress> llama_addresses;
226 if (!FLAGS_llama_addresses.empty()) {
227 vector<string> components;
228 split(components, FLAGS_llama_addresses, is_any_of(
","), token_compress_on);
229 for (
int i = 0; i < components.size(); ++i) {
230 to_lower(components[i]);
232 if (find(llama_addresses.begin(), llama_addresses.end(), llama_address)
233 == llama_addresses.end()) {
234 llama_addresses.push_back(llama_address);
239 if (!FLAGS_llama_host.empty()) {
240 to_lower(FLAGS_llama_host);
241 TNetworkAddress llama_address =
243 if (find(llama_addresses.begin(), llama_addresses.end(), llama_address)
244 == llama_addresses.end()) {
245 llama_addresses.push_back(llama_address);
248 for (
int i = 0; i < llama_addresses.size(); ++i) {
249 LOG(INFO) <<
"Llama address " << i <<
": " << llama_addresses[i];
252 TNetworkAddress llama_callback_address =
258 TGetHadoopConfigRequest config_request;
260 TGetHadoopConfigResponse config_response;
261 frontend_->GetHadoopConfig(config_request, &config_response);
262 if (config_response.__isset.value) {
263 to_lower(config_response.value);
269 LOG(INFO) <<
"Pseudo-distributed Llama cluster detected";
283 LOG(INFO) <<
"Starting global services";
285 if (FLAGS_enable_rm) {
291 cgroups_mgr_->Init(FLAGS_cgroup_hierarchy_path, FLAGS_staging_cgroup));
300 int64_t bytes_limit = 0;
310 LOG(WARNING) <<
"This system shows a discrepancy between the available "
311 <<
"memory and the memory commit limit allowed by the "
313 <<
"<=> CommitLimit: "
315 <<
"Impala will adhere to the smaller value by setting the "
316 <<
"process memory limit to " << bytes_limit <<
" "
317 <<
"Please verify the system configuration. Specifically, "
318 <<
"/proc/sys/vm/overcommit_memory and "
319 <<
"/proc/sys/vm/overcommit_ratio.";
325 if (bytes_limit < 0) {
326 return Status(
"Failed to parse mem limit from '" + FLAGS_mem_limit +
"'.");
331 int64_t min_requirement =
disk_io_mgr_->max_read_buffer_size() *
333 FLAGS_num_threads_per_core * FLAGS_num_cores;
334 if (bytes_limit < min_requirement) {
335 LOG(WARNING) <<
"Memory limit "
337 <<
" does not meet minimal memory requirement of "
346 #ifndef ADDRESS_SANITIZER
349 bytes_limit > 0 ? bytes_limit : -1, -1,
"Process"));
354 mem_tracker_->AddGcFunction(boost::bind(&MallocExtension::ReleaseFreeMemory,
355 MallocExtension::instance()));
365 LOG(WARNING) <<
"Memory limit "
367 <<
" exceeds physical memory of "
370 LOG(INFO) <<
"Using global memory limit: "
380 LOG(INFO) <<
"Not starting webserver";
389 status.
AddDetail(
"State Store Subscriber did not start up.");
boost::scoped_ptr< DiskIoMgr > disk_io_mgr_
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
DEFINE_bool(use_statestore, true,"Use an external statestore process to manage cluster membership")
void InitRm()
Initialise cgroups manager, detect test RM environment and init resource broker.
boost::scoped_ptr< MemTracker > mem_tracker_
boost::scoped_ptr< RequestPoolService > request_pool_service_
TNetworkAddress backend_address_
Address of the Impala backend server instance.
Functions to load and access the timestamp database.
Status InitForFeTests()
Initializes the exec env for running FE tests.
boost::scoped_ptr< Scheduler > scheduler_
void AddDefaultUrlCallbacks(Webserver *webserver, MemTracker *process_mem_tracker=NULL)
boost::scoped_ptr< CgroupsMgr > cgroups_mgr_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
static ExecEnv * exec_env_
Status RegisterMemoryMetrics(MetricGroup *metrics, bool register_jvm_metrics)
DECLARE_int32(state_store_port)
MetricGroups may be organised hierarchically as a tree.
static int64_t ParseMemSpec(const std::string &mem_spec_str, bool *is_percent, int64_t relative_reference)
boost::scoped_ptr< ResourceBroker > resource_broker_
boost::scoped_ptr< Frontend > frontend_
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
boost::scoped_ptr< ImpalaInternalServiceClientCache > impalad_client_cache_
static PhysicalBytesMetric * PHYSICAL_BYTES_RESERVED
boost::scoped_ptr< StatestoreSubscriber > statestore_subscriber_
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
DEFINE_int32(state_store_subscriber_port, 23000,"port where StatestoreSubscriberService should be exported")
bool is_pseudo_distributed_llama_
static int32_t vm_overcommit()
Returns the systems memory overcommit settings, typically the values are 0,1, and 2...
static const int DEFAULT_QUEUE_CAPACITY
DEFINE_int64(llama_registration_timeout_secs, 30,"Maximum number of seconds that Impala will attempt to (re-)register ""with Llama before aborting the triggering action with an error ""(e.g. Impalad startup or a Llama RPC request). ""A setting of -1 means try indefinitely.")
boost::scoped_ptr< CatalogServiceClientCache > catalogd_client_cache_
DECLARE_string(mem_limit)
virtual Status StartServices()
Starts any dependent services in their correct order.
This class is thread-safe.
boost::scoped_ptr< MetricGroup > metrics_
static const string PSEUDO_DISTRIBUTED_CONFIG_KEY
DEFINE_string(catalog_service_host,"localhost","hostname where CatalogService is running")
HdfsOpThreadPool * CreateHdfsOpThreadPool(const std::string &name, uint32_t num_threads, uint32_t max_queue_length)
Creates a new HdfsOp-processing thread pool.
boost::scoped_ptr< Webserver > webserver_
static int64_t commit_limit()