Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
request-pool-service.cc
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <list>
18 #include <string>
19 
20 #include "common/logging.h"
21 #include "rpc/jni-thrift-util.h"
22 #include "util/mem-info.h"
23 #include "util/parse-util.h"
24 #include "util/time.h"
25 
26 #include "common/names.h"
27 
28 using namespace impala;
29 
30 DEFINE_string(fair_scheduler_allocation_path, "", "Path to the fair scheduler "
31  "allocation file (fair-scheduler.xml).");
32 DEFINE_string(llama_site_path, "", "Path to the Llama configuration file "
33  "(llama-site.xml). If set, fair_scheduler_allocation_path must also be set.");
34 
35 // The default_pool parameters are used if fair scheduler allocation and Llama
36 // configuration files are not provided. The default values for this 'default pool'
37 // are the same as the default values for pools defined via the fair scheduler
38 // allocation file and Llama configurations.
39 DEFINE_int64(default_pool_max_requests, 200, "Maximum number of concurrent outstanding "
40  "requests allowed to run before queueing incoming requests. A negative value "
41  "indicates no limit. 0 indicates no requests will be admitted. Ignored if "
42  "fair_scheduler_config_path and llama_site_path are set.");
43 DEFINE_string(default_pool_mem_limit, "", "Maximum amount of memory that all "
44  "outstanding requests in this pool may use before new requests to this pool"
45  " are queued. Specified as a number of bytes ('<int>[bB]?'), megabytes "
46  "('<float>[mM]'), gigabytes ('<float>[gG]'), or percentage of the physical memory "
47  "('<int>%'). 0 or not setting indicates no limit. Defaults to bytes if no unit is "
48  "given. Ignored if fair_scheduler_config_path and llama_site_path are set.");
49 DEFINE_int64(default_pool_max_queued, 200, "Maximum number of requests allowed to be "
50  "queued before rejecting requests. A negative value or 0 indicates requests "
51  "will always be rejected once the maximum number of concurrent requests are "
52  "executing. Ignored if fair_scheduler_config_path and "
53  "llama_site_path are set.");
54 
55 // Flags to disable the pool limits for all pools.
56 DEFINE_bool(disable_pool_mem_limits, false, "Disables all per-pool mem limits.");
57 DEFINE_bool(disable_pool_max_requests, false, "Disables all per-pool limits on the "
58  "maximum number of running requests.");
59 
60 DECLARE_bool(enable_rm);
61 
62 // Pool name used when the configuration files are not specified.
63 const string DEFAULT_POOL_NAME = "default-pool";
64 
65 const string RESOLVE_POOL_METRIC_NAME = "request-pool-service.resolve-pool-duration-ms";
66 
68  metrics_(metrics), resolve_pool_ms_metric_(NULL) {
69  DCHECK(metrics_ != NULL);
71  new StatsMetric<double>(RESOLVE_POOL_METRIC_NAME, TUnit::TIME_MS));
72 
73  if (FLAGS_fair_scheduler_allocation_path.empty() &&
74  FLAGS_llama_site_path.empty()) {
75  if (FLAGS_enable_rm) {
76  LOG(ERROR) << "If resource management is enabled, -fair_scheduler_allocation_path "
77  << "is required.";
78  exit(1);
79  }
80  default_pool_only_ = true;
81  bool is_percent; // not used
82  int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_default_pool_mem_limit,
83  &is_percent, MemInfo::physical_mem());
84  // -1 indicates an error occurred
85  if (bytes_limit < 0) {
86  LOG(ERROR) << "Unable to parse default pool mem limit from '"
87  << FLAGS_default_pool_mem_limit << "'.";
88  exit(1);
89  }
90  // 0 indicates no limit or not set
91  if (bytes_limit == 0) {
93  } else {
94  default_pool_mem_limit_ = bytes_limit;
95  }
96  return;
97  }
98  default_pool_only_ = false;
99 
100  jmethodID start_id; // RequestPoolService.start(), only called in this method.
101  JniMethodDescriptor methods[] = {
102  {"<init>", "(Ljava/lang/String;Ljava/lang/String;)V", &ctor_},
103  {"start", "()V", &start_id},
104  {"resolveRequestPool", "([B)[B", &resolve_request_pool_id_},
105  {"getPoolConfig", "([B)[B", &get_pool_config_id_}};
106 
107  JNIEnv* jni_env = getJNIEnv();
109  jni_env->FindClass("com/cloudera/impala/util/RequestPoolService");
110  EXIT_IF_EXC(jni_env);
111  uint32_t num_methods = sizeof(methods) / sizeof(methods[0]);
112  for (int i = 0; i < num_methods; ++i) {
114  &(methods[i])));
115  }
116 
117  jstring fair_scheduler_config_path =
118  jni_env->NewStringUTF(FLAGS_fair_scheduler_allocation_path.c_str());
119  EXIT_IF_EXC(jni_env);
120  jstring llama_site_path =
121  jni_env->NewStringUTF(FLAGS_llama_site_path.c_str());
122  EXIT_IF_EXC(jni_env);
123 
124  jobject request_pool_service = jni_env->NewObject(request_pool_service_class_, ctor_,
125  fair_scheduler_config_path, llama_site_path);
126  EXIT_IF_EXC(jni_env);
127  EXIT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, request_pool_service,
129  jni_env->CallObjectMethod(request_pool_service_, start_id);
130  EXIT_IF_EXC(jni_env);
131 }
132 
133 Status RequestPoolService::ResolveRequestPool(const string& requested_pool_name,
134  const string& user, TResolveRequestPoolResult* resolved_pool) {
135  if (default_pool_only_) {
136  resolved_pool->__set_resolved_pool(DEFAULT_POOL_NAME);
137  resolved_pool->__set_has_access(true);
138  return Status::OK;
139  }
140 
141  TResolveRequestPoolParams params;
142  params.__set_user(user);
143  params.__set_requested_pool(requested_pool_name);
144 
145  int64_t start_time = MonotonicMillis();
147  params, resolved_pool);
149  return status;
150 }
151 
153  TPoolConfigResult* pool_config) {
154  if (default_pool_only_) {
155  pool_config->__set_max_requests(
156  FLAGS_disable_pool_max_requests ? -1 : FLAGS_default_pool_max_requests);
157  pool_config->__set_mem_limit(
158  FLAGS_disable_pool_mem_limits ? -1 : default_pool_mem_limit_);
159  pool_config->__set_max_queued(FLAGS_default_pool_max_queued);
160  return Status::OK;
161  }
162 
163  TPoolConfigParams params;
164  params.__set_pool(pool_name);
166  request_pool_service_, get_pool_config_id_, params, pool_config));
167  if (FLAGS_disable_pool_max_requests) pool_config->__set_max_requests(-1);
168  if (FLAGS_disable_pool_mem_limits) pool_config->__set_mem_limit(-1);
169  return Status::OK;
170 }
RequestPoolService(MetricGroup *metrics)
Status GetPoolConfig(const std::string &pool_name, TPoolConfigResult *pool_config)
DEFINE_int64(default_pool_max_requests, 200,"Maximum number of concurrent outstanding ""requests allowed to run before queueing incoming requests. A negative value ""indicates no limit. 0 indicates no requests will be admitted. Ignored if ""fair_scheduler_config_path and llama_site_path are set.")
jobject request_pool_service_
Instance of com.cloudera.impala.util.RequestPoolService.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static Status LoadJniMethod(JNIEnv *jni_env, const jclass &jni_class, JniMethodDescriptor *descriptor)
Definition: jni-util.cc:193
M * RegisterMetric(M *metric)
Definition: metrics.h:211
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
Definition: mem-info.h:36
#define EXIT_IF_EXC(env)
Definition: jni-util.h:85
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
static int64_t ParseMemSpec(const std::string &mem_spec_str, bool *is_percent, int64_t relative_reference)
Definition: parse-util.cc:23
DEFINE_string(fair_scheduler_allocation_path,"","Path to the fair scheduler ""allocation file (fair-scheduler.xml).")
StatsMetric< double > * resolve_pool_ms_metric_
Metric measuring the time ResolveRequestPool() takes, in milliseconds.
static Status LocalToGlobalRef(JNIEnv *env, jobject local_ref, jobject *global_ref)
Definition: jni-util.cc:67
MetricGroup * metrics_
Metrics subsystem access.
DEFINE_bool(disable_pool_mem_limits, false,"Disables all per-pool mem limits.")
void Update(const T &value)
#define EXIT_IF_ERROR(stmt)
Definition: status.h:248
int64_t MonotonicMillis()
Definition: time.h:35
const string RESOLVE_POOL_METRIC_NAME
Describes one method to look up in a Java object.
Definition: jni-util.h:149
static const Status OK
Definition: status.h:87
Status ResolveRequestPool(const std::string &requested_pool_name, const std::string &user, TResolveRequestPoolResult *resolved_pool)
const string DEFAULT_POOL_NAME
static Status CallJniMethod(const jobject &obj, const jmethodID &method, const T &arg)
Definition: jni-util.h:231
DECLARE_bool(enable_rm)
JNIEnv * getJNIEnv(void)
C linkage for helper functions in hdfsJniHelper.h.