Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
admission-controller.cc
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/algorithm/string.hpp>
18 #include <boost/bind.hpp>
19 #include <boost/foreach.hpp>
20 #include <boost/mem_fn.hpp>
21 #include <gutil/strings/substitute.h>
22 
23 #include "common/logging.h"
25 #include "runtime/exec-env.h"
26 #include "runtime/mem-tracker.h"
27 #include "util/debug-util.h"
28 #include "util/time.h"
29 #include "util/runtime-profile.h"
30 
31 #include "common/names.h"
32 
33 using namespace strings;
34 
35 DEFINE_int64(queue_wait_timeout_ms, 60 * 1000, "Maximum amount of time (in "
36  "milliseconds) that a request will wait to be admitted before timing out.");
37 
38 namespace impala {
39 
40 const string AdmissionController::IMPALA_REQUEST_QUEUE_TOPIC("impala-request-queue");
41 
42 // Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>".
43 // "!" is used because the backend id contains a colon, but it should not contain "!".
44 // When parsing the topic key we need to be careful to find the last instance in
45 // case the pool name contains it as well.
46 const char TOPIC_KEY_DELIMITER = '!';
47 
48 // Define metric key format strings for metrics in PoolMetrics
49 // '$0' is replaced with the pool name by strings::Substitute
51  "admission-controller.$0.local-admitted";
53  "admission-controller.$0.local-queued";
55  "admission-controller.$0.local-dequeued";
57  "admission-controller.$0.local-rejected";
59  "admission-controller.$0.local-timed-out";
61  "admission-controller.$0.local-completed";
63  "admission-controller.$0.local-time-in-queue-ms";
65  "admission-controller.$0.cluster-num-running";
67  "admission-controller.$0.cluster-in-queue";
69  "admission-controller.$0.cluster-mem-usage";
71  "admission-controller.$0.cluster-mem-estimate";
73  "admission-controller.$0.local-num-running";
75  "admission-controller.$0.local-in-queue";
77  "admission-controller.$0.local-mem-usage";
79  "admission-controller.$0.local-mem-estimate";
80 
81 // Profile query events
82 const string QUERY_EVENT_SUBMIT_FOR_ADMISSION = "Submit for admission";
83 const string QUERY_EVENT_COMPLETED_ADMISSION = "Completed admission";
84 
85 // Profile info string for admission result
86 const string PROFILE_INFO_KEY_ADMISSION_RESULT = "Admission result";
87 const string PROFILE_INFO_VAL_ADMIT_IMMEDIATELY = "Admitted immediately";
88 const string PROFILE_INFO_VAL_ADMIT_QUEUED = "Admitted (queued)";
89 const string PROFILE_INFO_VAL_REJECTED = "Rejected";
90 const string PROFILE_INFO_VAL_TIME_OUT = "Timed out (queued)";
91 
92 // Error status string formats
93 // $0 = pool, $1 = rejection reason (see REASON_XXX below)
94 const string STATUS_REJECTED = "Rejected query from pool $0 : $1";
95 const string REASON_DISABLED_MEM_LIMIT = "disabled by mem limit set to 0";
96 const string REASON_DISABLED_REQUESTS_LIMIT = "disabled by requests limit set to 0";
97 const string REASON_QUEUE_FULL = "queue full, limit=$0, num_queued=$1";
99  "request memory estimate $0 is greater than pool limit $1.\n\n"
100  "If the memory estimate appears to be too high, use the MEM_LIMIT query option to "
101  "override the memory estimates in admission decisions. Check the explain plan for "
102  "warnings about missing stats. Running COMPUTE STATS may help. You may also "
103  "consider using query hints to manually improve the plan.\n\n"
104  "See the Impala documentation for more details regarding the MEM_LIMIT query "
105  "option, table stats, and query hints. If the memory estimate is still too high, "
106  "consider modifying the query to reduce the memory impact or increasing the "
107  "available memory.";
108 
109 // Queue decision details
110 // $0 = num running queries, $1 = num queries limit
111 const string QUEUED_NUM_RUNNING = "number of running queries $0 is over limit $1";
112 // $0 = query estimate, $1 = current pool memory estimate, $2 = pool memory limit
113 const string QUEUED_MEM_LIMIT = "query memory estimate $0 plus current pool "
114  "memory estimate $1 is over pool memory limit $2";
115 // $0 = queue size
116 const string QUEUED_QUEUE_NOT_EMPTY = "queue is not empty (size $0); queued queries are "
117  "executed first";
118 // $0 = timeout in milliseconds, $1 = queue detail
119 const string STATUS_TIME_OUT = "Admission for query exceeded timeout $0ms. Queued "
120  "reason: $1";
121 
122 // Parses the pool name and backend_id from the topic key if it is valid.
123 // Returns true if the topic key is valid and pool_name and backend_id are set.
124 static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
125  string* backend_id) {
126  size_t pos = topic_key.find_last_of(TOPIC_KEY_DELIMITER);
127  if (pos == string::npos || pos >= topic_key.size() - 1) {
128  VLOG_QUERY << "Invalid topic key for pool: " << topic_key;
129  return false;
130  }
131  *pool_name = topic_key.substr(0, pos);
132  *backend_id = topic_key.substr(pos + 1);
133  return true;
134 }
135 
136 // Returns the topic key for the pool at this backend, i.e. a string of the
137 // form: "<pool_name><delimiter><backend_id>".
138 static inline string MakePoolTopicKey(const string& pool_name,
139  const string& backend_id) {
140  // Ensure the backend_id does not contain the delimiter to ensure that the topic key
141  // can be parsed properly by finding the last instance of the delimiter.
142  DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos);
143  return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
144 }
145 
146 // Returns a debug string for the given local and total pool stats. Either
147 // 'total_stats' or 'local_stats' may be NULL to skip writing those stats.
148 static string DebugPoolStats(const string& pool_name,
149  const TPoolStats* total_stats,
150  const TPoolStats* local_stats) {
151  stringstream ss;
152  ss << "pool=" << pool_name;
153  if (total_stats != NULL) {
154  ss << " Total(";
155  ss << "num_running=" << total_stats->num_running << ", ";
156  ss << "num_queued=" << total_stats->num_queued << ", ";
157  ss << "mem_usage=" <<
158  PrettyPrinter::Print(total_stats->mem_usage, TUnit::BYTES) << ", ";
159  ss << "mem_estimate=" <<
160  PrettyPrinter::Print(total_stats->mem_estimate, TUnit::BYTES);
161  ss << ")";
162  }
163  if (local_stats != NULL) {
164  ss << " Local(";
165  ss << "num_running=" << local_stats->num_running << ", ";
166  ss << "num_queued=" << local_stats->num_queued << ", ";
167  ss << "mem_usage=" <<
168  PrettyPrinter::Print(local_stats->mem_usage, TUnit::BYTES) << ", ";
169  ss << "mem_estimate=" <<
170  PrettyPrinter::Print(local_stats->mem_estimate, TUnit::BYTES);
171  ss << ")";
172  }
173  return ss.str();
174 }
175 
176 AdmissionController::AdmissionController(RequestPoolService* request_pool_service,
177  MetricGroup* metrics, const string& backend_id)
178  : request_pool_service_(request_pool_service),
179  metrics_(metrics),
180  backend_id_(backend_id),
181  thrift_serializer_(false),
182  done_(false) {
183  dequeue_thread_.reset(new Thread("scheduling", "admission-thread",
185 }
186 
188  // The AdmissionController should live for the lifetime of the impalad, but
189  // for unit tests we need to ensure that no thread is waiting on the
190  // condition variable. This notifies the dequeue thread to stop and waits
191  // for it to finish.
192  {
193  // Lock to ensure the dequeue thread will see the update to done_
194  lock_guard<mutex> l(admission_ctrl_lock_);
195  done_ = true;
196  dequeue_cv_.notify_one();
197  }
198  dequeue_thread_->Join();
199 }
200 
203  bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
204  Status status = subscriber->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
205  if (!status.ok()) {
206  status.AddDetail("AdmissionController failed to register request queue topic");
207  }
208  return status;
209 }
210 
212  const int64_t max_requests, const int64_t mem_limit, const QuerySchedule& schedule,
213  bool admit_from_queue) {
214  const TPoolStats& total_stats = cluster_pool_stats_[pool_name];
215  DCHECK_GE(total_stats.mem_usage, 0);
216  DCHECK_GE(total_stats.mem_estimate, 0);
217  const int64_t query_total_estimated_mem = schedule.GetClusterMemoryEstimate();
218  const int64_t current_cluster_estimate_mem =
219  max(total_stats.mem_usage, total_stats.mem_estimate);
220  // The estimated total memory footprint for the query cluster-wise after admitting
221  const int64_t cluster_estimated_memory = query_total_estimated_mem +
222  current_cluster_estimate_mem;
223  DCHECK_GE(cluster_estimated_memory, 0);
224 
225  // Can't admit if:
226  // (a) Already over the maximum number of requests
227  // (b) Request will go over the mem limit
228  // (c) This is not admitting from the queue and there are already queued requests
229  if (max_requests >= 0 && total_stats.num_running >= max_requests) {
230  return Status::Expected(Substitute(QUEUED_NUM_RUNNING, total_stats.num_running,
231  max_requests));
232  } else if (mem_limit >= 0 && cluster_estimated_memory >= mem_limit) {
233  return Status::Expected(Substitute(QUEUED_MEM_LIMIT,
234  PrettyPrinter::Print(query_total_estimated_mem, TUnit::BYTES),
235  PrettyPrinter::Print(current_cluster_estimate_mem, TUnit::BYTES),
236  PrettyPrinter::Print(mem_limit, TUnit::BYTES)));
237  } else if (!admit_from_queue && total_stats.num_queued > 0) {
238  return Status::Expected(Substitute(QUEUED_QUEUE_NOT_EMPTY, total_stats.num_queued));
239  }
240  return Status::OK;
241 }
242 
244  const int64_t max_requests, const int64_t mem_limit, const int64_t max_queued,
245  const QuerySchedule& schedule) {
246  TPoolStats* total_stats = &cluster_pool_stats_[pool_name];
247  const int64_t expected_mem_usage = schedule.GetClusterMemoryEstimate();
248  string reject_reason;
249  if (max_requests == 0) {
250  reject_reason = REASON_DISABLED_REQUESTS_LIMIT;
251  } else if (mem_limit == 0) {
252  reject_reason = REASON_DISABLED_MEM_LIMIT;
253  } else if (mem_limit > 0 && expected_mem_usage >= mem_limit) {
254  reject_reason = Substitute(REASON_REQ_OVER_MEM_LIMIT,
255  PrettyPrinter::Print(expected_mem_usage, TUnit::BYTES),
256  PrettyPrinter::Print(mem_limit, TUnit::BYTES));
257  } else if (total_stats->num_queued >= max_queued) {
258  reject_reason = Substitute(REASON_QUEUE_FULL, max_queued, total_stats->num_queued);
259  } else {
260  return Status::OK; // Not rejected
261  }
262  return Status(Substitute(STATUS_REJECTED, pool_name, reject_reason));
263 }
264 
266  const string& pool_name = schedule->request_pool();
267  TPoolConfigResult pool_config;
268  RETURN_IF_ERROR(request_pool_service_->GetPoolConfig(pool_name, &pool_config));
269  const int64_t max_requests = pool_config.max_requests;
270  const int64_t max_queued = pool_config.max_queued;
271  const int64_t mem_limit = pool_config.mem_limit;
272 
273  // Note the queue_node will not exist in the queue when this method returns.
274  QueueNode queue_node(*schedule);
275  Status admitStatus; // An error status specifies why query is not admitted
276 
278  ScopedEvent completedEvent(schedule->query_events(), QUERY_EVENT_COMPLETED_ADMISSION);
279  {
280  lock_guard<mutex> lock(admission_ctrl_lock_);
281  RequestQueue* queue = &request_queue_map_[pool_name];
282  pool_config_cache_[pool_name] = pool_config;
283  PoolMetrics* pool_metrics = GetPoolMetrics(pool_name);
284  TPoolStats* total_stats = &cluster_pool_stats_[pool_name];
285  TPoolStats* local_stats = &local_pool_stats_[pool_name];
286  const int64_t cluster_mem_estimate = schedule->GetClusterMemoryEstimate();
287  VLOG_QUERY << "Schedule for id=" << schedule->query_id()
288  << " in pool_name=" << pool_name << " PoolConfig(max_requests="
289  << max_requests << " max_queued=" << max_queued
290  << " mem_limit=" << PrettyPrinter::Print(mem_limit, TUnit::BYTES)
291  << ") query cluster_mem_estimate="
292  << PrettyPrinter::Print(cluster_mem_estimate, TUnit::BYTES);
293  VLOG_QUERY << "Stats: " << DebugPoolStats(pool_name, total_stats, local_stats);
294 
295  admitStatus = CanAdmitRequest(pool_name, max_requests, mem_limit, *schedule, false);
296  if (admitStatus.ok()) {
297  // Execute immediately
298  pools_for_updates_.insert(pool_name);
299  // The local and total stats get incremented together when we queue so if
300  // there were any locally queued queries we should not admit immediately.
301  DCHECK_EQ(local_stats->num_queued, 0);
302  schedule->set_is_admitted(true);
305  ++total_stats->num_running;
306  ++local_stats->num_running;
307  int64_t mem_estimate = schedule->GetClusterMemoryEstimate();
308  local_stats->mem_estimate += mem_estimate;
309  total_stats->mem_estimate += mem_estimate;
310  if (pool_metrics != NULL) {
311  pool_metrics->local_admitted->Increment(1L);
312  pool_metrics->local_mem_estimate->Increment(mem_estimate);
313  pool_metrics->cluster_mem_estimate->Increment(mem_estimate);
314  }
315  VLOG_QUERY << "Admitted query id=" << schedule->query_id();
316  VLOG_RPC << "Final: " << DebugPoolStats(pool_name, total_stats, local_stats);
317  return Status::OK;
318  }
319 
320  Status rejectStatus = RejectRequest(pool_name, max_requests, mem_limit, max_queued,
321  *schedule);
322  if (!rejectStatus.ok()) {
323  schedule->set_is_admitted(false);
326  if (pool_metrics != NULL) pool_metrics->local_rejected->Increment(1L);
327  return rejectStatus;
328  }
329 
330  // We cannot immediately admit but do not need to reject, so queue the request
331  VLOG_QUERY << "Queuing, query id=" << schedule->query_id();
332  DCHECK_LT(total_stats->num_queued, max_queued);
333  DCHECK(max_requests > 0 || mem_limit > 0);
334  pools_for_updates_.insert(pool_name);
335  ++local_stats->num_queued;
336  ++total_stats->num_queued;
337  queue->Enqueue(&queue_node);
338  if (pool_metrics != NULL) pool_metrics->local_queued->Increment(1L);
339  }
340 
341  int64_t wait_start_ms = MonotonicMillis();
342  int64_t queue_wait_timeout_ms = max(0L, FLAGS_queue_wait_timeout_ms);
343  // We just call Get() to block until the result is set or it times out. Note that we
344  // don't hold the admission_ctrl_lock_ while we wait on this promise so we need to
345  // check the state after acquiring the lock in order to avoid any races because it is
346  // Set() by the dequeuing thread while holding admission_ctrl_lock_.
347  // TODO: handle cancellation
348  bool timed_out;
349  queue_node.is_admitted.Get(queue_wait_timeout_ms, &timed_out);
350  int64_t wait_time_ms = MonotonicMillis() - wait_start_ms;
351 
352  // Take the lock in order to check the result of is_admitted as there could be a race
353  // with the timeout. If the Get() timed out, then we need to dequeue the request.
354  // Otherwise, the request was admitted and we update the number of running queries
355  // stats.
356  {
357  lock_guard<mutex> lock(admission_ctrl_lock_);
358  RequestQueue* queue = &request_queue_map_[pool_name];
359  PoolMetrics* pool_metrics = GetPoolMetrics(pool_name);
360  pools_for_updates_.insert(pool_name);
361  if (pool_metrics != NULL) {
362  pool_metrics->local_time_in_queue_ms->Increment(wait_time_ms);
363  }
364  // Now that we have the lock, check again if the query was actually admitted (i.e.
365  // if the promise still hasn't been set), in which case we just admit the query.
366  timed_out = !queue_node.is_admitted.IsSet();
367  TPoolStats* total_stats = &cluster_pool_stats_[pool_name];
368  TPoolStats* local_stats = &local_pool_stats_[pool_name];
369  if (timed_out) {
370  queue->Remove(&queue_node);
371  queue_node.is_admitted.Set(false);
372  schedule->set_is_admitted(false);
375  --local_stats->num_queued;
376  --total_stats->num_queued;
377  if (pool_metrics != NULL) pool_metrics->local_timed_out->Increment(1L);
378  return Status(Substitute(STATUS_TIME_OUT, queue_wait_timeout_ms,
379  admitStatus.GetDetail()));
380  }
381  // The dequeue thread updates the stats (to avoid a race condition) so we do
382  // not change them here.
383  DCHECK(queue_node.is_admitted.Get());
384  DCHECK(!queue->Contains(&queue_node));
385  schedule->set_is_admitted(true);
388  if (pool_metrics != NULL) pool_metrics->local_admitted->Increment(1L);
389  VLOG_QUERY << "Admitted queued query id=" << schedule->query_id();
390  VLOG_RPC << "Final: " << DebugPoolStats(pool_name, total_stats, local_stats);
391  return Status::OK;
392  }
393 }
394 
396  if (!schedule->is_admitted()) return Status::OK; // No-op if query was not admitted
397  const string& pool_name = schedule->request_pool();
398  {
399  lock_guard<mutex> lock(admission_ctrl_lock_);
400  TPoolStats* total_stats = &cluster_pool_stats_[pool_name];
401  TPoolStats* local_stats = &local_pool_stats_[pool_name];
402  DCHECK_GT(total_stats->num_running, 0);
403  DCHECK_GT(local_stats->num_running, 0);
404  --total_stats->num_running;
405  --local_stats->num_running;
406 
407  int64_t mem_estimate = schedule->GetClusterMemoryEstimate();
408  local_stats->mem_estimate -= mem_estimate;
409  total_stats->mem_estimate -= mem_estimate;
410  PoolMetrics* pool_metrics = GetPoolMetrics(pool_name);
411  if (pool_metrics != NULL) {
412  pool_metrics->local_completed->Increment(1L);
413  pool_metrics->local_mem_estimate->Increment(-1 * mem_estimate);
414  pool_metrics->cluster_mem_estimate->Increment(-1 * mem_estimate);
415  }
416  pools_for_updates_.insert(pool_name);
417  VLOG_RPC << "Released query id=" << schedule->query_id() << " "
418  << DebugPoolStats(pool_name, total_stats, local_stats);
419  }
420  dequeue_cv_.notify_one();
421  return Status::OK;
422 }
423 
424 // Statestore subscriber callback for IMPALA_REQUEST_QUEUE_TOPIC. First, add any local
425 // pool stats updates. Then, per_backend_pool_stats_map_ is updated with the updated
426 // stats from any topic deltas that are received and we recompute the cluster-wide
427 // aggregate stats.
429  const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
430  vector<TTopicDelta>* subscriber_topic_updates) {
431  {
432  lock_guard<mutex> lock(admission_ctrl_lock_);
433  BOOST_FOREACH(PoolStatsMap::value_type& entry, local_pool_stats_) {
434  UpdateLocalMemUsage(entry.first);
435  }
436  AddPoolUpdates(subscriber_topic_updates);
437 
438  StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
439  incoming_topic_deltas.find(IMPALA_REQUEST_QUEUE_TOPIC);
440  if (topic != incoming_topic_deltas.end()) {
441  const TTopicDelta& delta = topic->second;
442  // Delta and non-delta updates are handled the same way, except for a full update
443  // we first clear the per_backend_pool_stats_map_. We then update the global map
444  // and then re-compute the pool stats for any pools that changed.
445  if (!delta.is_delta) {
446  VLOG_ROW << "Full impala-request-queue stats update";
448  }
449  HandleTopicUpdates(delta.topic_entries);
450  HandleTopicDeletions(delta.topic_deletions);
451  }
452  BOOST_FOREACH(PoolStatsMap::value_type& entry, local_pool_stats_) {
453  UpdateClusterAggregates(entry.first);
454  }
455  }
456  dequeue_cv_.notify_one(); // Dequeue and admit queries on the dequeue thread
457 }
458 
459 void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_updates) {
460  BOOST_FOREACH(const TTopicItem& item, topic_updates) {
461  string pool_name;
462  string topic_backend_id;
463  if (!ParsePoolTopicKey(item.key, &pool_name, &topic_backend_id)) continue;
464  // The topic entry from this subscriber is handled specially; the stats coming
465  // from the statestore are likely already outdated.
466  if (topic_backend_id == backend_id_) continue;
467  local_pool_stats_[pool_name]; // Create an entry in the local map if it doesn't exist
468  TPoolStats pool_update;
469  uint32_t len = item.value.size();
470  Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
471  item.value.data()), &len, false, &pool_update);
472  if (!status.ok()) {
473  VLOG_QUERY << "Error deserializing pool update with key: " << item.key;
474  continue;
475  }
476  PoolStatsMap& pool_map = per_backend_pool_stats_map_[pool_name];
477 
478  // Debug logging
479  if (pool_map.find(topic_backend_id) != pool_map.end()) {
480  VLOG_ROW << "Stats update for key=" << item.key << " previous: "
481  << DebugPoolStats(pool_name, NULL, &pool_map[topic_backend_id]);
482  }
483  VLOG_ROW << "Stats update for key=" << item.key << " updated: "
484  << DebugPoolStats(pool_name, NULL, &pool_update);
485 
486  pool_map[topic_backend_id] = pool_update;
487  DCHECK(per_backend_pool_stats_map_[pool_name][topic_backend_id].num_running ==
488  pool_update.num_running);
489  DCHECK(per_backend_pool_stats_map_[pool_name][topic_backend_id].num_queued ==
490  pool_update.num_queued);
491  }
492 }
493 
494 void AdmissionController::HandleTopicDeletions(const vector<string>& topic_deletions) {
495  BOOST_FOREACH(const string& topic_key, topic_deletions) {
496  string pool_name;
497  string topic_backend_id;
498  if (!ParsePoolTopicKey(topic_key, &pool_name, &topic_backend_id)) continue;
499  PoolStatsMap& pool_map = per_backend_pool_stats_map_[pool_name];
500  VLOG_ROW << "Deleting stats for key=" << topic_key << " "
501  << DebugPoolStats(pool_name, NULL, &pool_map[topic_backend_id]);
502  pool_map.erase(topic_backend_id);
503  DCHECK(per_backend_pool_stats_map_[pool_name].find(topic_backend_id) ==
504  per_backend_pool_stats_map_[pool_name].end());
505  }
506 }
507 
508 void AdmissionController::UpdateClusterAggregates(const string& pool_name) {
509  const TPoolStats& local_stats = local_pool_stats_[pool_name];
510  const PoolStatsMap& pool_map = per_backend_pool_stats_map_[pool_name];
511  TPoolStats total_stats;
512  BOOST_FOREACH(const PoolStatsMap::value_type& entry, pool_map) {
513  // Skip an update from this subscriber as the information may be outdated.
514  // The current local_stats will be added below.
515  if (entry.first == backend_id_) continue;
516  DCHECK_GE(entry.second.num_running, 0);
517  DCHECK_GE(entry.second.num_queued, 0);
518  DCHECK_GE(entry.second.mem_usage, 0);
519  DCHECK_GE(entry.second.mem_estimate, 0);
520  total_stats.num_running += entry.second.num_running;
521  total_stats.num_queued += entry.second.num_queued;
522  total_stats.mem_usage += entry.second.mem_usage;
523  total_stats.mem_estimate += entry.second.mem_estimate;
524  }
525  total_stats.num_running += local_stats.num_running;
526  total_stats.num_queued += local_stats.num_queued;
527  total_stats.mem_usage += local_stats.mem_usage;
528  total_stats.mem_estimate += local_stats.mem_estimate;
529 
530  DCHECK_GE(total_stats.num_running, 0);
531  DCHECK_GE(total_stats.num_queued, 0);
532  DCHECK_GE(total_stats.mem_usage, 0);
533  DCHECK_GE(total_stats.mem_estimate, 0);
534  DCHECK_GE(total_stats.num_running, local_stats.num_running);
535  DCHECK_GE(total_stats.num_queued, local_stats.num_queued);
536 
537  cluster_pool_stats_[pool_name] = total_stats;
538  PoolMetrics* pool_metrics = GetPoolMetrics(pool_name);
539  if (pool_metrics != NULL) {
540  pool_metrics->cluster_num_running->set_value(total_stats.num_running);
541  pool_metrics->cluster_in_queue->set_value(total_stats.num_queued);
542  pool_metrics->cluster_mem_usage->set_value(total_stats.mem_usage);
543  pool_metrics->cluster_mem_estimate->set_value(total_stats.mem_estimate);
544  }
545 
546  if (cluster_pool_stats_[pool_name] != total_stats) {
547  VLOG_ROW << "Recomputed stats, previous: "
548  << DebugPoolStats(pool_name, &cluster_pool_stats_[pool_name], NULL);
549  VLOG_ROW << "Recomputed stats, updated: "
550  << DebugPoolStats(pool_name, &total_stats, NULL);
551  }
552 }
553 
554 void AdmissionController::UpdateLocalMemUsage(const string& pool_name) {
555  TPoolStats* stats = &local_pool_stats_[pool_name];
557  const int64_t current_usage = tracker == NULL ? 0L : tracker->consumption();
558  if (current_usage != stats->mem_usage) {
559  stats->mem_usage = current_usage;
560  pools_for_updates_.insert(pool_name);
561  PoolMetrics* pool_metrics = GetPoolMetrics(pool_name);
562  if (pool_metrics != NULL) {
563  pool_metrics->local_mem_usage->set_value(current_usage);
564  }
565  }
566 }
567 
568 void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
569  if (pools_for_updates_.empty()) return;
570  topic_updates->push_back(TTopicDelta());
571  TTopicDelta& topic_delta = topic_updates->back();
572  topic_delta.topic_name = IMPALA_REQUEST_QUEUE_TOPIC;
573  BOOST_FOREACH(const string& pool_name, pools_for_updates_) {
574  DCHECK(local_pool_stats_.find(pool_name) != local_pool_stats_.end());
575  TPoolStats& pool_stats = local_pool_stats_[pool_name];
576  VLOG_ROW << "Sending topic update " << DebugPoolStats(pool_name, NULL, &pool_stats);
577  topic_delta.topic_entries.push_back(TTopicItem());
578  TTopicItem& topic_item = topic_delta.topic_entries.back();
579  topic_item.key = MakePoolTopicKey(pool_name, backend_id_);
580  Status status = thrift_serializer_.Serialize(&pool_stats, &topic_item.value);
581  if (!status.ok()) {
582  LOG(WARNING) << "Failed to serialize query pool stats: " << status.GetDetail();
583  topic_updates->pop_back();
584  }
585  PoolMetrics* pool_metrics = GetPoolMetrics(pool_name);
586  if (pool_metrics != NULL) {
587  pool_metrics->local_num_running->set_value(pool_stats.num_running);
588  pool_metrics->local_in_queue->set_value(pool_stats.num_queued);
589  pool_metrics->local_mem_usage->set_value(pool_stats.mem_usage);
590  }
591  }
592  pools_for_updates_.clear();
593 }
594 
596  while (true) {
597  unique_lock<mutex> lock(admission_ctrl_lock_);
598  if (done_) break;
599  dequeue_cv_.wait(lock);
600  BOOST_FOREACH(PoolStatsMap::value_type& entry, local_pool_stats_) {
601  const string& pool_name = entry.first;
602  TPoolStats* local_stats = &entry.second;
603 
604  PoolConfigMap::iterator it = pool_config_cache_.find(pool_name);
605  if (it == pool_config_cache_.end()) continue; // No local requests in this pool
606  const TPoolConfigResult& pool_config = it->second;
607 
608  const int64_t max_requests = pool_config.max_requests;
609  const int64_t mem_limit = pool_config.mem_limit;
610 
611  // We should never have queued any requests in pools where either limit is 0 as no
612  // requests should ever be admitted or when both limits are less than 0, i.e.
613  // unlimited requests can be admitted and should never be queued.
614  if (max_requests == 0 || mem_limit == 0 || (max_requests < 0 && mem_limit < 0)) {
615  DCHECK_EQ(local_stats->num_queued, 0);
616  }
617 
618  if (local_stats->num_queued == 0) continue; // Nothing to dequeue
619  DCHECK(max_requests > 0 || mem_limit > 0);
620  TPoolStats* total_stats = &cluster_pool_stats_[pool_name];
621 
622  DCHECK_GT(local_stats->num_queued, 0);
623  DCHECK_GE(total_stats->num_queued, local_stats->num_queued);
624 
625  // Determine the maximum number of requests that can possibly be dequeued based
626  // on the max_requests limit and the current queue size. We will attempt to
627  // dequeue up to this number of requests until reaching the per-pool memory limit.
628  int64_t max_to_dequeue = 0;
629  if (max_requests > 0) {
630  const int64_t total_available = max_requests - total_stats->num_running;
631  if (total_available <= 0) continue;
632  double queue_size_ratio = static_cast<double>(local_stats->num_queued) /
633  static_cast<double>(total_stats->num_queued);
634  // The maximum number of requests that can possibly be dequeued is the total
635  // number of available requests scaled by the ratio of the size of the local
636  // queue to the size of the total queue. We attempt to dequeue at least one
637  // request and at most the size of the local queue.
638  // TODO: Use a simple heuristic rather than a lower bound of 1 to avoid admitting
639  // too many requests globally when only a single request can be admitted.
640  max_to_dequeue = min(local_stats->num_queued,
641  max(1L, static_cast<int64_t>(queue_size_ratio * total_available)));
642  } else {
643  max_to_dequeue = local_stats->num_queued; // No limit on num running requests
644  }
645 
646  RequestQueue& queue = request_queue_map_[pool_name];
647  VLOG_RPC << "Dequeue thread will try to admit " << max_to_dequeue << " requests"
648  << ", pool=" << pool_name << ", num_queued=" << local_stats->num_queued;
649 
650  PoolMetrics* pool_metrics = GetPoolMetrics(pool_name);
651  while (max_to_dequeue > 0 && !queue.empty()) {
652  QueueNode* queue_node = queue.head();
653  DCHECK(queue_node != NULL);
654  DCHECK(!queue_node->is_admitted.IsSet());
655  const QuerySchedule& schedule = queue_node->schedule;
656  Status admitStatus = CanAdmitRequest(pool_name, max_requests, mem_limit,
657  schedule, true);
658  if (!admitStatus.ok()) {
659  VLOG_RPC << "Could not dequeue query id=" << queue_node->schedule.query_id()
660  << " reason: " << admitStatus.GetDetail();
661  break;
662  }
663  queue.Dequeue();
664  --local_stats->num_queued;
665  --total_stats->num_queued;
666  ++local_stats->num_running;
667  ++total_stats->num_running;
668  int64_t mem_estimate = schedule.GetClusterMemoryEstimate();
669  local_stats->mem_estimate += mem_estimate;
670  total_stats->mem_estimate += mem_estimate;
671  if (pool_metrics != NULL) {
672  pool_metrics->local_dequeued->Increment(1L);
673  pool_metrics->local_mem_estimate->Increment(mem_estimate);
674  pool_metrics->cluster_mem_estimate->Increment(mem_estimate);
675  }
676  VLOG_ROW << "Dequeuing query id=" << queue_node->schedule.query_id();
677  queue_node->is_admitted.Set(true);
678  --max_to_dequeue;
679  }
680  pools_for_updates_.insert(pool_name);
681  }
682  }
683 }
684 
686 AdmissionController::GetPoolMetrics(const string& pool_name) {
687  if (metrics_ == NULL) return NULL;
688  PoolMetricsMap::iterator it = pool_metrics_map_.find(pool_name);
689  if (it != pool_metrics_map_.end()) return &it->second;
690 
691  PoolMetrics* pool_metrics = &pool_metrics_map_[pool_name];
692  pool_metrics->local_admitted = metrics_->AddCounter(
693  Substitute(LOCAL_ADMITTED_METRIC_KEY_FORMAT, pool_name), 0L);
694  pool_metrics->local_queued = metrics_->AddCounter(
695  Substitute(LOCAL_QUEUED_METRIC_KEY_FORMAT, pool_name), 0L);
696  pool_metrics->local_dequeued = metrics_->AddCounter(
697  Substitute(LOCAL_DEQUEUED_METRIC_KEY_FORMAT, pool_name), 0L);
698  pool_metrics->local_rejected = metrics_->AddCounter(
699  Substitute(LOCAL_REJECTED_METRIC_KEY_FORMAT, pool_name), 0L);
700  pool_metrics->local_timed_out = metrics_->AddCounter(
701  Substitute(LOCAL_TIMED_OUT_METRIC_KEY_FORMAT, pool_name), 0L);
702  pool_metrics->local_completed = metrics_->AddCounter(
703  Substitute(LOCAL_COMPLETED_METRIC_KEY_FORMAT, pool_name), 0L);
704  pool_metrics->local_time_in_queue_ms = metrics_->AddCounter(
705  Substitute(LOCAL_TIME_IN_QUEUE_METRIC_KEY_FORMAT, pool_name), 0L);
706 
707  pool_metrics->cluster_num_running = metrics_->AddGauge(
708  Substitute(CLUSTER_NUM_RUNNING_METRIC_KEY_FORMAT, pool_name), 0L);
709 
710  pool_metrics->cluster_in_queue = metrics_->AddGauge(
711  Substitute(CLUSTER_IN_QUEUE_METRIC_KEY_FORMAT, pool_name), 0L);
712  pool_metrics->cluster_mem_usage = metrics_->AddGauge(
713  Substitute(CLUSTER_MEM_USAGE_METRIC_KEY_FORMAT, pool_name), 0L);
714  pool_metrics->cluster_mem_estimate = metrics_->AddGauge(
715  Substitute(CLUSTER_MEM_ESTIMATE_METRIC_KEY_FORMAT, pool_name), 0L);
716 
717  pool_metrics->local_num_running = metrics_->AddGauge(
718  Substitute(LOCAL_NUM_RUNNING_METRIC_KEY_FORMAT, pool_name), 0L);
719 
720  pool_metrics->local_in_queue = metrics_->AddGauge(
721  Substitute(LOCAL_IN_QUEUE_METRIC_KEY_FORMAT, pool_name), 0L);
722 
723  pool_metrics->local_mem_usage = metrics_->AddGauge(
724  Substitute(LOCAL_MEM_USAGE_METRIC_KEY_FORMAT, pool_name), 0L);
725  pool_metrics->local_mem_estimate = metrics_->AddGauge(
726  Substitute(LOCAL_MEM_ESTIMATE_METRIC_KEY_FORMAT, pool_name), 0L);
727  return pool_metrics;
728 }
729 }
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
Definition: metrics.h:239
const string QUEUED_NUM_RUNNING
PerBackendPoolStatsMap per_backend_pool_stats_map_
void HandleTopicUpdates(const std::vector< TTopicItem > &topic_updates)
IntGauge * local_mem_estimate
The sum of planner memory estimates for requests that were started locally.
const string LOCAL_TIMED_OUT_METRIC_KEY_FORMAT
int64_t consumption() const
Returns the memory consumed in bytes.
Definition: mem-tracker.h:298
const std::string GetDetail() const
Definition: status.cc:184
RuntimeProfile::EventSequence * query_events()
void UpdatePoolStats(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
Statestore subscriber callback that updates the pool stats state.
const string LOCAL_MEM_USAGE_METRIC_KEY_FORMAT
IntGauge * local_num_running
The total number of queries currently running that were initiated locally.
void AddInfoString(const std::string &key, const std::string &value)
Status GetPoolConfig(const std::string &pool_name, TPoolConfigResult *pool_config)
const string LOCAL_MEM_ESTIMATE_METRIC_KEY_FORMAT
void HandleTopicDeletions(const std::vector< std::string > &topic_deletions)
boost::scoped_ptr< Thread > dequeue_thread_
Thread dequeuing and admitting queries.
MemTracker tracker
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
void Set(const T &val)
Definition: promise.h:38
const string CLUSTER_MEM_USAGE_METRIC_KEY_FORMAT
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
const string CLUSTER_MEM_ESTIMATE_METRIC_KEY_FORMAT
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
const string LOCAL_QUEUED_METRIC_KEY_FORMAT
const string STATUS_REJECTED
const string STATUS_TIME_OUT
IntGauge * cluster_in_queue
The estimated total number of requests currently queued across the cluster.
const string LOCAL_ADMITTED_METRIC_KEY_FORMAT
const string LOCAL_TIME_IN_QUEUE_METRIC_KEY_FORMAT
const string QUERY_EVENT_COMPLETED_ADMISSION
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
Definition: status.cc:166
static bool ParsePoolTopicKey(const string &topic_key, string *pool_name, string *backend_id)
const string REASON_DISABLED_MEM_LIMIT
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
Status ReleaseQuery(QuerySchedule *schedule)
IntGauge * local_mem_usage
The total amount of memory used by this pool locally.
Status Serialize(T *obj, std::vector< uint8_t > *result)
Serializes obj into result. Result will contain a copy of the memory.
Definition: thrift-util.h:48
DEFINE_int64(queue_wait_timeout_ms, 60 *1000,"Maximum amount of time (in ""milliseconds) that a request will wait to be admitted before timing out.")
const TUniqueId & query_id() const
const std::string backend_id_
Unique id for this impalad, used to construct topic keys.
static const std::string IMPALA_REQUEST_QUEUE_TOPIC
void MarkEvent(const std::string &label)
#define VLOG_QUERY
Definition: logging.h:57
const string QUEUED_MEM_LIMIT
boost::condition_variable dequeue_cv_
void UpdateLocalMemUsage(const std::string &pool_name)
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
void DequeueLoop()
Dequeues and admits queued queries when notified by dequeue_cv_.
static string DebugPoolStats(const string &pool_name, const TPoolStats *total_stats, const TPoolStats *local_stats)
IntGauge * cluster_mem_estimate
The sum of planner memory estimates for requests across the cluster.
const string CLUSTER_NUM_RUNNING_METRIC_KEY_FORMAT
const string PROFILE_INFO_VAL_ADMIT_IMMEDIATELY
RuntimeProfile * summary_profile()
bool Contains(const T *target) const
const string PROFILE_INFO_VAL_ADMIT_QUEUED
ThriftSerializer thrift_serializer_
Serializes/deserializes TPoolStats when sending and receiving topic updates.
IntGauge * cluster_mem_usage
Approximate total amount of memory used by this pool across the cluster.
Status AdmitQuery(QuerySchedule *schedule)
void AddPoolUpdates(std::vector< TTopicDelta > *subscriber_topic_updates)
const string LOCAL_NUM_RUNNING_METRIC_KEY_FORMAT
void set_is_admitted(bool is_admitted)
boost::unordered_map< std::string, TPoolStats > PoolStatsMap
Map of pool names to pool statistics.
int64_t MonotonicMillis()
Definition: time.h:35
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
void Enqueue(T *n)
Enqueue node onto the queue's tail. This is O(1).
#define VLOG_ROW
Definition: logging.h:59
const string REASON_REQ_OVER_MEM_LIMIT
const string PROFILE_INFO_VAL_REJECTED
This class is thread-safe.
Definition: mem-tracker.h:61
Status RejectRequest(const std::string &pool, const int64_t max_requests, const int64_t mem_limit, const int64_t max_queued, const QuerySchedule &schedule)
bool IsSet()
Returns whether the value is set.
Definition: promise.h:92
static string MakePoolTopicKey(const string &pool_name, const string &backend_id)
bool done_
If true, tear down the dequeuing thread. This only happens in unit tests.
const T & Get()
Definition: promise.h:59
static Status Expected(const std::string &error_msg)
Create a status instance that represents an expected error and will not be logged.
Definition: status.cc:162
int64_t GetClusterMemoryEstimate() const
Total estimated memory for all nodes. set_num_hosts() must be set before calling. ...
static MemTracker * GetRequestPoolMemTracker(const std::string &pool_name, MemTracker *parent)
Definition: mem-tracker.cc:134
Status CanAdmitRequest(const std::string &pool, const int64_t max_requests, const int64_t mem_limit, const QuerySchedule &schedule, bool admit_from_queue)
T must be a subclass of InternalQueue::Node.
const string REASON_QUEUE_FULL
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
Definition: metrics.h:223
IntGauge * local_in_queue
The total number of requests currently queued locally.
void UpdateClusterAggregates(const std::string &pool_name)
static const Status OK
Definition: status.h:87
const std::string & request_pool() const
#define VLOG_RPC
Definition: logging.h:56
const char TOPIC_KEY_DELIMITER
Utility class to mark an event when the object is destroyed.
const string PROFILE_INFO_KEY_ADMISSION_RESULT
IntCounter * local_dequeued
The total number of requests that have been dequeued locally.
RequestPoolService * request_pool_service_
Status DeserializeThriftMsg(JNIEnv *env, jbyteArray serialized_msg, T *deserialized_msg)
const string QUEUED_QUEUE_NOT_EMPTY
PoolMetrics * GetPoolMetrics(const std::string &pool_name)
const string LOCAL_COMPLETED_METRIC_KEY_FORMAT
bool ok() const
Definition: status.h:172
MetricGroup * metrics_
Metrics subsystem access.
Status Init(StatestoreSubscriber *subscriber)
Registers with the subscription manager.
const string LOCAL_DEQUEUED_METRIC_KEY_FORMAT
IntCounter * local_timed_out
The total number of requests that timed out while waiting for admission locally.
const string REASON_DISABLED_REQUESTS_LIMIT
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
const string PROFILE_INFO_VAL_TIME_OUT
const string QUERY_EVENT_SUBMIT_FOR_ADMISSION
const string LOCAL_REJECTED_METRIC_KEY_FORMAT
const string LOCAL_IN_QUEUE_METRIC_KEY_FORMAT
const string CLUSTER_IN_QUEUE_METRIC_KEY_FORMAT
bool is_admitted() const