Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
query-schedule.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <sstream>
18 #include <boost/algorithm/string/join.hpp>
19 #include <boost/foreach.hpp>
20 #include <boost/uuid/uuid.hpp>
21 #include <boost/uuid/uuid_generators.hpp>
22 
23 #include "util/container-util.h"
24 #include "util/mem-info.h"
25 #include "util/network-util.h"
26 #include "util/uid-util.h"
27 #include "util/debug-util.h"
28 #include "util/parse-util.h"
29 #include "util/llama-util.h"
30 
31 #include "common/names.h"
32 
33 using boost::uuids::random_generator;
34 using boost::uuids::uuid;
35 using namespace impala;
36 
37 DEFINE_bool(rm_always_use_defaults, false, "If true, all queries use the same initial"
38  " resource requests regardless of their computed resource estimates. Only meaningful "
39  "if --enable_rm is set.");
40 DEFINE_string(rm_default_memory, "4G", "The initial amount of memory that"
41  " a query should reserve on each node if either it does not have an available "
42  "estimate, or if --rm_always_use_defaults is set.");
43 DEFINE_int32(rm_default_cpu_vcores, 2, "The initial number of virtual cores that"
44  " a query should reserve on each node if either it does not have an available "
45  "estimate, or if --rm_always_use_defaults is set.");
46 
47 
48 namespace impala {
49 
50 // Default value for the request_timeout in a reservation request. The timeout is the
51 // max time in milliseconds to wait for a resource request to be fulfilled by Llama.
52 // The default value of five minutes was determined to be reasonable based on
53 // experiments on a 20-node cluster with TPCDS 15TB and 8 concurrent clients.
54 // Over 30% of queries timed out with a reservation timeout of 1 minute but only less
55 // than 5% timed out when using 5 minutes. Still, the default value is somewhat
56 // arbitrary and a good value is workload dependent.
57 const int64_t DEFAULT_REQUEST_TIMEOUT_MS = 5 * 60 * 1000;
58 
60  const TQueryExecRequest& request, const TQueryOptions& query_options,
61  const string& effective_user, RuntimeProfile* summary_profile,
62  RuntimeProfile::EventSequence* query_events)
63  : query_id_(query_id),
64  request_(request),
65  query_options_(query_options),
66  effective_user_(effective_user),
67  summary_profile_(summary_profile),
68  query_events_(query_events),
69  num_backends_(0),
70  num_hosts_(0),
71  num_scan_ranges_(0),
72  is_admitted_(false) {
73  fragment_exec_params_.resize(request.fragments.size());
74  // map from plan node id to fragment index in exec_request.fragments
75  vector<PlanNodeId> per_node_fragment_idx;
76  for (int i = 0; i < request.fragments.size(); ++i) {
77  BOOST_FOREACH(const TPlanNode& node, request.fragments[i].plan.nodes) {
78  if (plan_node_to_fragment_idx_.size() < node.node_id + 1) {
79  plan_node_to_fragment_idx_.resize(node.node_id + 1);
80  }
81  plan_node_to_fragment_idx_[node.node_id] = i;
82  }
83  }
84 }
85 
87  DCHECK_GT(num_hosts_, 0);
88  const int64_t total_cluster_mem = GetPerHostMemoryEstimate() * num_hosts_;
89  DCHECK_GE(total_cluster_mem, 0); // Assume total cluster memory fits in an int64_t.
90  return total_cluster_mem;
91 }
92 
94  // Precedence of different estimate sources is:
95  // user-supplied RM query option >
96  // server-side defaults (if rm_always_use_defaults == true) >
97  // query option limit >
98  // estimate >
99  // server-side defaults (if rm_always_use_defaults == false)
100  int64_t query_option_memory_limit = numeric_limits<int64_t>::max();
101  bool has_query_option = false;
102  if (query_options_.__isset.mem_limit && query_options_.mem_limit > 0) {
103  query_option_memory_limit = query_options_.mem_limit;
104  has_query_option = true;
105  }
106 
107  int64_t estimate_limit = numeric_limits<int64_t>::max();
108  bool has_estimate = false;
109  if (request_.__isset.per_host_mem_req && request_.per_host_mem_req > 0) {
110  estimate_limit = request_.per_host_mem_req;
111  has_estimate = true;
112  }
113 
114  int64_t per_host_mem = 0L;
115  if (query_options_.__isset.rm_initial_mem && query_options_.rm_initial_mem > 0) {
116  per_host_mem = query_options_.rm_initial_mem;
117  } else if (FLAGS_rm_always_use_defaults) {
118  bool ignored;
119  per_host_mem = ParseUtil::ParseMemSpec(FLAGS_rm_default_memory,
120  &ignored, 0);
121  } else if (has_query_option) {
122  per_host_mem = query_option_memory_limit;
123  } else if (has_estimate) {
124  per_host_mem = estimate_limit;
125  } else {
126  // If no estimate or query option, use the server-side limits anyhow.
127  bool ignored;
128  per_host_mem = ParseUtil::ParseMemSpec(FLAGS_rm_default_memory,
129  &ignored, 0);
130  }
131  // Cap the memory estimate at the amount of physical memory available. The user's
132  // provided value or the estimate from planning can each be unreasonable.
133  // TODO: Get this limit from Llama (Yarn sets it).
134  return min(per_host_mem, MemInfo::physical_mem());
135 }
136 
138  // Precedence of different estimate sources is:
139  // server-side defaults (if rm_always_use_defaults == true) >
140  // computed estimates
141  // server-side defaults (if rm_always_use_defaults == false)
142  int16_t v_cpu_cores = FLAGS_rm_default_cpu_vcores;
143  if (!FLAGS_rm_always_use_defaults && query_options_.__isset.v_cpu_cores &&
144  query_options_.v_cpu_cores > 0) {
145  v_cpu_cores = query_options_.v_cpu_cores;
146  }
147 
148  return v_cpu_cores;
149 }
150 
151 void QuerySchedule::GetResourceHostport(const TNetworkAddress& src,
152  TNetworkAddress* dst) {
153  DCHECK(dst != NULL);
154  DCHECK(resource_resolver_.get() != NULL)
155  << "resource_resolver_ is NULL, didn't call SetUniqueHosts()?";
156  resource_resolver_->GetResourceHostport(src, dst);
157 }
158 
159 void QuerySchedule::SetUniqueHosts(const unordered_set<TNetworkAddress>& unique_hosts) {
162 }
163 
164 void QuerySchedule::PrepareReservationRequest(const string& pool, const string& user) {
165  reservation_request_.resources.clear();
166  reservation_request_.version = TResourceBrokerServiceVersion::V1;
167  reservation_request_.queue = pool;
168  reservation_request_.gang = true;
169  // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because
170  // Llama checks group membership based on the short name of the principal.
172 
173  // Set optional request timeout from query options.
174  if (query_options_.__isset.reservation_request_timeout) {
175  DCHECK_GT(query_options_.reservation_request_timeout, 0);
176  reservation_request_.__set_request_timeout(
177  query_options_.reservation_request_timeout);
178  }
179 
180  // Set the reservation timeout from the query options or use a default.
181  int64_t timeout = DEFAULT_REQUEST_TIMEOUT_MS;
182  if (query_options_.__isset.reservation_request_timeout) {
183  timeout = query_options_.reservation_request_timeout;
184  }
185  reservation_request_.__set_request_timeout(timeout);
186 
187  int32_t memory_mb = GetPerHostMemoryEstimate() / 1024 / 1024;
188  int32_t v_cpu_cores = GetPerHostVCores();
189  // The memory_mb and v_cpu_cores estimates may legitimately be zero,
190  // e.g., for constant selects. Do not reserve any resources in those cases.
191  if (memory_mb == 0 && v_cpu_cores == 0) return;
192 
193  DCHECK(resource_resolver_.get() != NULL)
194  << "resource_resolver_ is NULL, didn't call SetUniqueHosts()?";
195  random_generator uuid_generator;
196  BOOST_FOREACH(const TNetworkAddress& host, unique_hosts_) {
197  reservation_request_.resources.push_back(llama::TResource());
198  llama::TResource& resource = reservation_request_.resources.back();
199  uuid id = uuid_generator();
200  resource.client_resource_id.hi = *reinterpret_cast<uint64_t*>(&id.data[0]);
201  resource.client_resource_id.lo = *reinterpret_cast<uint64_t*>(&id.data[8]);
202  resource.enforcement = llama::TLocationEnforcement::MUST;
203 
204  TNetworkAddress resource_hostport;
205  resource_resolver_->GetResourceHostport(host, &resource_hostport);
206  stringstream ss;
207  ss << resource_hostport;
208  resource.askedLocation = ss.str();
209  resource.memory_mb = memory_mb;
210  resource.v_cpu_cores = v_cpu_cores;
211  }
212 }
213 
215  if (!HasReservation()) return Status("Query schedule does not have a reservation.");
216  vector<TNetworkAddress> hosts_missing_resources;
218  BOOST_FOREACH(const FragmentExecParams& params, fragment_exec_params_) {
219  BOOST_FOREACH(const TNetworkAddress& host, params.hosts) {
220  // Ignore the coordinator host which is not contained in unique_hosts_.
221  if (unique_hosts_.find(host) == unique_hosts_.end()) continue;
222  TNetworkAddress resource_hostport;
223  resolver.GetResourceHostport(host, &resource_hostport);
224  if (reservation_.allocated_resources.find(resource_hostport) ==
225  reservation_.allocated_resources.end()) {
226  hosts_missing_resources.push_back(host);
227  }
228  }
229  }
230  if (!hosts_missing_resources.empty()) {
231  stringstream ss;
232  ss << "Failed to validate reservation " << reservation_.reservation_id << "." << endl
233  << "Missing resources for hosts [";
234  for (int i = 0; i < hosts_missing_resources.size(); ++i) {
235  ss << hosts_missing_resources[i];
236  if (i + 1 != hosts_missing_resources.size()) ss << ", ";
237  }
238  ss << "]";
239  return Status(ss.str());
240  }
241  return Status::OK;
242 }
243 
244 }
void GetResourceHostport(const TNetworkAddress &src, TNetworkAddress *dst)
const TQueryOptions & query_options_
TUniqueId query_id_
Definition: coordinator.h:194
const TUniqueId & query_id() const
Definition: coordinator.h:152
const int64_t DEFAULT_REQUEST_TIMEOUT_MS
std::vector< TNetworkAddress > hosts
std::vector< int32_t > plan_node_to_fragment_idx_
Maps from plan node id to its fragment index. Filled in c'tor.
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
Definition: mem-info.h:36
DEFINE_string(rm_default_memory,"4G","The initial amount of memory that"" a query should reserve on each node if either it does not have an available ""estimate, or if --rm_always_use_defaults is set.")
static int64_t ParseMemSpec(const std::string &mem_spec_str, bool *is_percent, int64_t relative_reference)
Definition: parse-util.cc:23
int64_t GetPerHostMemoryEstimate() const
bool HasReservation() const
void PrepareReservationRequest(const std::string &pool, const std::string &user)
void GetResourceHostport(const TNetworkAddress &src, TNetworkAddress *dst)
QuerySchedule(const TUniqueId &query_id, const TQueryExecRequest &request, const TQueryOptions &query_options, const std::string &effective_user, RuntimeProfile *summary_profile, RuntimeProfile::EventSequence *query_events)
std::vector< FragmentExecParams > fragment_exec_params_
const TQueryExecRequest & request_
ObjectPool pool
void SetUniqueHosts(const boost::unordered_set< TNetworkAddress > &unique_hosts)
DEFINE_bool(rm_always_use_defaults, false,"If true, all queries use the same initial"" resource requests regardless of their computed resource estimates. Only meaningful ""if --enable_rm is set.")
int16_t GetPerHostVCores() const
int64_t GetClusterMemoryEstimate() const
Total estimated memory for all nodes. set_num_hosts() must be set before calling. ...
const boost::unordered_set< TNetworkAddress > & unique_hosts() const
static const Status OK
Definition: status.h:87
RuntimeProfile::EventSequence * query_events_
Event timeline for this query. Unowned.
Definition: coordinator.h:309
TResourceBrokerReservationResponse reservation_
Fulfilled reservation request. Populated by scheduler.
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
TResourceBrokerReservationRequest reservation_request_
Reservation request to be submitted to Llama. Set in PrepareReservationRequest(). ...
boost::unordered_set< TNetworkAddress > unique_hosts_
The set of hosts that the query will run on excluding the coordinator.
string GetShortName(const string &user)
Definition: llama-util.cc:135
boost::scoped_ptr< ResourceResolver > resource_resolver_