17 #include <boost/algorithm/string.hpp>
18 #include <boost/bind.hpp>
19 #include <boost/foreach.hpp>
20 #include <boost/mem_fn.hpp>
21 #include <gutil/strings/substitute.h>
33 using namespace strings;
35 DEFINE_int64(queue_wait_timeout_ms, 60 * 1000,
"Maximum amount of time (in "
36 "milliseconds) that a request will wait to be admitted before timing out.");
40 const string AdmissionController::IMPALA_REQUEST_QUEUE_TOPIC(
"impala-request-queue");
51 "admission-controller.$0.local-admitted";
53 "admission-controller.$0.local-queued";
55 "admission-controller.$0.local-dequeued";
57 "admission-controller.$0.local-rejected";
59 "admission-controller.$0.local-timed-out";
61 "admission-controller.$0.local-completed";
63 "admission-controller.$0.local-time-in-queue-ms";
65 "admission-controller.$0.cluster-num-running";
67 "admission-controller.$0.cluster-in-queue";
69 "admission-controller.$0.cluster-mem-usage";
71 "admission-controller.$0.cluster-mem-estimate";
73 "admission-controller.$0.local-num-running";
75 "admission-controller.$0.local-in-queue";
77 "admission-controller.$0.local-mem-usage";
79 "admission-controller.$0.local-mem-estimate";
99 "request memory estimate $0 is greater than pool limit $1.\n\n"
100 "If the memory estimate appears to be too high, use the MEM_LIMIT query option to "
101 "override the memory estimates in admission decisions. Check the explain plan for "
102 "warnings about missing stats. Running COMPUTE STATS may help. You may also "
103 "consider using query hints to manually improve the plan.\n\n"
104 "See the Impala documentation for more details regarding the MEM_LIMIT query "
105 "option, table stats, and query hints. If the memory estimate is still too high, "
106 "consider modifying the query to reduce the memory impact or increasing the "
114 "memory estimate $1 is over pool memory limit $2";
125 string* backend_id) {
127 if (pos == string::npos || pos >= topic_key.size() - 1) {
128 VLOG_QUERY <<
"Invalid topic key for pool: " << topic_key;
131 *pool_name = topic_key.substr(0, pos);
132 *backend_id = topic_key.substr(pos + 1);
139 const string& backend_id) {
149 const TPoolStats* total_stats,
150 const TPoolStats* local_stats) {
152 ss <<
"pool=" << pool_name;
153 if (total_stats != NULL) {
155 ss <<
"num_running=" << total_stats->num_running <<
", ";
156 ss <<
"num_queued=" << total_stats->num_queued <<
", ";
157 ss <<
"mem_usage=" <<
158 PrettyPrinter::Print(total_stats->mem_usage, TUnit::BYTES) <<
", ";
159 ss <<
"mem_estimate=" <<
160 PrettyPrinter::Print(total_stats->mem_estimate, TUnit::BYTES);
163 if (local_stats != NULL) {
165 ss <<
"num_running=" << local_stats->num_running <<
", ";
166 ss <<
"num_queued=" << local_stats->num_queued <<
", ";
167 ss <<
"mem_usage=" <<
168 PrettyPrinter::Print(local_stats->mem_usage, TUnit::BYTES) <<
", ";
169 ss <<
"mem_estimate=" <<
170 PrettyPrinter::Print(local_stats->mem_estimate, TUnit::BYTES);
178 : request_pool_service_(request_pool_service),
180 backend_id_(backend_id),
181 thrift_serializer_(false),
206 status.
AddDetail(
"AdmissionController failed to register request queue topic");
212 const int64_t max_requests,
const int64_t mem_limit,
const QuerySchedule& schedule,
213 bool admit_from_queue) {
215 DCHECK_GE(total_stats.mem_usage, 0);
216 DCHECK_GE(total_stats.mem_estimate, 0);
218 const int64_t current_cluster_estimate_mem =
219 max(total_stats.mem_usage, total_stats.mem_estimate);
221 const int64_t cluster_estimated_memory = query_total_estimated_mem +
222 current_cluster_estimate_mem;
223 DCHECK_GE(cluster_estimated_memory, 0);
229 if (max_requests >= 0 && total_stats.num_running >= max_requests) {
232 }
else if (mem_limit >= 0 && cluster_estimated_memory >= mem_limit) {
237 }
else if (!admit_from_queue && total_stats.num_queued > 0) {
244 const int64_t max_requests,
const int64_t mem_limit,
const int64_t max_queued,
248 string reject_reason;
249 if (max_requests == 0) {
251 }
else if (mem_limit == 0) {
253 }
else if (mem_limit > 0 && expected_mem_usage >= mem_limit) {
257 }
else if (total_stats->num_queued >= max_queued) {
258 reject_reason = Substitute(
REASON_QUEUE_FULL, max_queued, total_stats->num_queued);
267 TPoolConfigResult pool_config;
269 const int64_t max_requests = pool_config.max_requests;
270 const int64_t max_queued = pool_config.max_queued;
271 const int64_t mem_limit = pool_config.mem_limit;
288 <<
" in pool_name=" << pool_name <<
" PoolConfig(max_requests="
289 << max_requests <<
" max_queued=" << max_queued
291 <<
") query cluster_mem_estimate="
295 admitStatus =
CanAdmitRequest(pool_name, max_requests, mem_limit, *schedule,
false);
296 if (admitStatus.
ok()) {
301 DCHECK_EQ(local_stats->num_queued, 0);
305 ++total_stats->num_running;
306 ++local_stats->num_running;
308 local_stats->mem_estimate += mem_estimate;
309 total_stats->mem_estimate += mem_estimate;
310 if (pool_metrics != NULL) {
322 if (!rejectStatus.
ok()) {
326 if (pool_metrics != NULL) pool_metrics->
local_rejected->Increment(1L);
332 DCHECK_LT(total_stats->num_queued, max_queued);
333 DCHECK(max_requests > 0 || mem_limit > 0);
335 ++local_stats->num_queued;
336 ++total_stats->num_queued;
338 if (pool_metrics != NULL) pool_metrics->
local_queued->Increment(1L);
342 int64_t queue_wait_timeout_ms = max(0L, FLAGS_queue_wait_timeout_ms);
361 if (pool_metrics != NULL) {
370 queue->
Remove(&queue_node);
375 --local_stats->num_queued;
376 --total_stats->num_queued;
377 if (pool_metrics != NULL) pool_metrics->
local_timed_out->Increment(1L);
384 DCHECK(!queue->
Contains(&queue_node));
388 if (pool_metrics != NULL) pool_metrics->
local_admitted->Increment(1L);
402 DCHECK_GT(total_stats->num_running, 0);
403 DCHECK_GT(local_stats->num_running, 0);
404 --total_stats->num_running;
405 --local_stats->num_running;
408 local_stats->mem_estimate -= mem_estimate;
409 total_stats->mem_estimate -= mem_estimate;
411 if (pool_metrics != NULL) {
430 vector<TTopicDelta>* subscriber_topic_updates) {
438 StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
440 if (topic != incoming_topic_deltas.end()) {
441 const TTopicDelta& delta = topic->second;
445 if (!delta.is_delta) {
446 VLOG_ROW <<
"Full impala-request-queue stats update";
460 BOOST_FOREACH(
const TTopicItem& item, topic_updates) {
462 string topic_backend_id;
468 TPoolStats pool_update;
469 uint32_t len = item.value.size();
471 item.value.data()), &len,
false, &pool_update);
473 VLOG_QUERY <<
"Error deserializing pool update with key: " << item.key;
479 if (pool_map.find(topic_backend_id) != pool_map.end()) {
480 VLOG_ROW <<
"Stats update for key=" << item.key <<
" previous: "
483 VLOG_ROW <<
"Stats update for key=" << item.key <<
" updated: "
486 pool_map[topic_backend_id] = pool_update;
488 pool_update.num_running);
490 pool_update.num_queued);
495 BOOST_FOREACH(
const string& topic_key, topic_deletions) {
497 string topic_backend_id;
500 VLOG_ROW <<
"Deleting stats for key=" << topic_key <<
" "
502 pool_map.erase(topic_backend_id);
511 TPoolStats total_stats;
512 BOOST_FOREACH(
const PoolStatsMap::value_type& entry, pool_map) {
516 DCHECK_GE(entry.second.num_running, 0);
517 DCHECK_GE(entry.second.num_queued, 0);
518 DCHECK_GE(entry.second.mem_usage, 0);
519 DCHECK_GE(entry.second.mem_estimate, 0);
520 total_stats.num_running += entry.second.num_running;
521 total_stats.num_queued += entry.second.num_queued;
522 total_stats.mem_usage += entry.second.mem_usage;
523 total_stats.mem_estimate += entry.second.mem_estimate;
525 total_stats.num_running += local_stats.num_running;
526 total_stats.num_queued += local_stats.num_queued;
527 total_stats.mem_usage += local_stats.mem_usage;
528 total_stats.mem_estimate += local_stats.mem_estimate;
530 DCHECK_GE(total_stats.num_running, 0);
531 DCHECK_GE(total_stats.num_queued, 0);
532 DCHECK_GE(total_stats.mem_usage, 0);
533 DCHECK_GE(total_stats.mem_estimate, 0);
534 DCHECK_GE(total_stats.num_running, local_stats.num_running);
535 DCHECK_GE(total_stats.num_queued, local_stats.num_queued);
539 if (pool_metrics != NULL) {
547 VLOG_ROW <<
"Recomputed stats, previous: "
549 VLOG_ROW <<
"Recomputed stats, updated: "
557 const int64_t current_usage = tracker == NULL ? 0L : tracker->
consumption();
558 if (current_usage != stats->mem_usage) {
559 stats->mem_usage = current_usage;
562 if (pool_metrics != NULL) {
570 topic_updates->push_back(TTopicDelta());
571 TTopicDelta& topic_delta = topic_updates->back();
577 topic_delta.topic_entries.push_back(TTopicItem());
578 TTopicItem& topic_item = topic_delta.topic_entries.back();
582 LOG(WARNING) <<
"Failed to serialize query pool stats: " << status.
GetDetail();
583 topic_updates->pop_back();
586 if (pool_metrics != NULL) {
601 const string& pool_name = entry.first;
602 TPoolStats* local_stats = &entry.second;
606 const TPoolConfigResult& pool_config = it->second;
608 const int64_t max_requests = pool_config.max_requests;
609 const int64_t mem_limit = pool_config.mem_limit;
614 if (max_requests == 0 || mem_limit == 0 || (max_requests < 0 && mem_limit < 0)) {
615 DCHECK_EQ(local_stats->num_queued, 0);
618 if (local_stats->num_queued == 0)
continue;
619 DCHECK(max_requests > 0 || mem_limit > 0);
622 DCHECK_GT(local_stats->num_queued, 0);
623 DCHECK_GE(total_stats->num_queued, local_stats->num_queued);
628 int64_t max_to_dequeue = 0;
629 if (max_requests > 0) {
630 const int64_t total_available = max_requests - total_stats->num_running;
631 if (total_available <= 0)
continue;
632 double queue_size_ratio =
static_cast<double>(local_stats->num_queued) /
633 static_cast<double>(total_stats->num_queued);
640 max_to_dequeue = min(local_stats->num_queued,
641 max(1L, static_cast<int64_t>(queue_size_ratio * total_available)));
643 max_to_dequeue = local_stats->num_queued;
647 VLOG_RPC <<
"Dequeue thread will try to admit " << max_to_dequeue <<
" requests"
648 <<
", pool=" << pool_name <<
", num_queued=" << local_stats->num_queued;
651 while (max_to_dequeue > 0 && !queue.
empty()) {
653 DCHECK(queue_node != NULL);
658 if (!admitStatus.
ok()) {
660 <<
" reason: " << admitStatus.
GetDetail();
664 --local_stats->num_queued;
665 --total_stats->num_queued;
666 ++local_stats->num_running;
667 ++total_stats->num_running;
668 int64_t mem_estimate = schedule.GetClusterMemoryEstimate();
669 local_stats->mem_estimate += mem_estimate;
670 total_stats->mem_estimate += mem_estimate;
671 if (pool_metrics != NULL) {
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
IntCounter * local_rejected
const string QUEUED_NUM_RUNNING
PerBackendPoolStatsMap per_backend_pool_stats_map_
void HandleTopicUpdates(const std::vector< TTopicItem > &topic_updates)
IntGauge * local_mem_estimate
The sum of planner memory estimates for requests that were started locally.
const string LOCAL_TIMED_OUT_METRIC_KEY_FORMAT
int64_t consumption() const
Returns the memory consumed in bytes.
const std::string GetDetail() const
RuntimeProfile::EventSequence * query_events()
Promise< bool > is_admitted
void UpdatePoolStats(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
Statestore subscriber callback that updates the pool stats state.
const string LOCAL_MEM_USAGE_METRIC_KEY_FORMAT
IntGauge * local_num_running
The total number of queries currently running that were initiated locally.
void AddInfoString(const std::string &key, const std::string &value)
IntCounter * local_queued
Status GetPoolConfig(const std::string &pool_name, TPoolConfigResult *pool_config)
const string LOCAL_MEM_ESTIMATE_METRIC_KEY_FORMAT
void HandleTopicDeletions(const std::vector< std::string > &topic_deletions)
boost::scoped_ptr< Thread > dequeue_thread_
Thread dequeuing and admitting queries.
IntCounter * local_admitted
TODO: Consider allowing fragment IDs as category parameters.
const string CLUSTER_MEM_USAGE_METRIC_KEY_FORMAT
RequestQueueMap request_queue_map_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
const string CLUSTER_MEM_ESTIMATE_METRIC_KEY_FORMAT
MetricGroups may be organised hierarchically as a tree.
const string LOCAL_QUEUED_METRIC_KEY_FORMAT
const string STATUS_REJECTED
const string STATUS_TIME_OUT
IntGauge * cluster_in_queue
The estimated total number of requests currently queued across the cluster.
const string LOCAL_ADMITTED_METRIC_KEY_FORMAT
const string LOCAL_TIME_IN_QUEUE_METRIC_KEY_FORMAT
const string QUERY_EVENT_COMPLETED_ADMISSION
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
static bool ParsePoolTopicKey(const string &topic_key, string *pool_name, string *backend_id)
const string REASON_DISABLED_MEM_LIMIT
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
Status ReleaseQuery(QuerySchedule *schedule)
IntGauge * local_mem_usage
The total amount of memory used by this pool locally.
Status Serialize(T *obj, std::vector< uint8_t > *result)
Serializes obj into result. Result will contain a copy of the memory.
DEFINE_int64(queue_wait_timeout_ms, 60 *1000,"Maximum amount of time (in ""milliseconds) that a request will wait to be admitted before timing out.")
const TUniqueId & query_id() const
const std::string backend_id_
Unique id for this impalad, used to construct topic keys.
static const std::string IMPALA_REQUEST_QUEUE_TOPIC
void MarkEvent(const std::string &label)
const string QUEUED_MEM_LIMIT
boost::condition_variable dequeue_cv_
void UpdateLocalMemUsage(const std::string &pool_name)
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
void DequeueLoop()
Dequeues and admits queued queries when notified by dequeue_cv_.
static string DebugPoolStats(const string &pool_name, const TPoolStats *total_stats, const TPoolStats *local_stats)
IntGauge * cluster_mem_estimate
The sum of planner memory estimates for requests across the cluster.
const string CLUSTER_NUM_RUNNING_METRIC_KEY_FORMAT
const string PROFILE_INFO_VAL_ADMIT_IMMEDIATELY
PoolConfigMap pool_config_cache_
RuntimeProfile * summary_profile()
bool Contains(const T *target) const
const string PROFILE_INFO_VAL_ADMIT_QUEUED
ThriftSerializer thrift_serializer_
Serializes/deserializes TPoolStats when sending and receiving topic updates.
IntGauge * cluster_mem_usage
Approximate total amount of memory used by this pool across the cluster.
Status AdmitQuery(QuerySchedule *schedule)
void AddPoolUpdates(std::vector< TTopicDelta > *subscriber_topic_updates)
const string LOCAL_NUM_RUNNING_METRIC_KEY_FORMAT
void set_is_admitted(bool is_admitted)
boost::unordered_map< std::string, TPoolStats > PoolStatsMap
Map of pool names to pool statistics.
int64_t MonotonicMillis()
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
void Enqueue(T *n)
Enqueue node onto the queue's tail. This is O(1).
const string REASON_REQ_OVER_MEM_LIMIT
const string PROFILE_INFO_VAL_REJECTED
This class is thread-safe.
Status RejectRequest(const std::string &pool, const int64_t max_requests, const int64_t mem_limit, const int64_t max_queued, const QuerySchedule &schedule)
bool IsSet()
Returns whether the value is set.
static string MakePoolTopicKey(const string &pool_name, const string &backend_id)
bool done_
If true, tear down the dequeuing thread. This only happens in unit tests.
static Status Expected(const std::string &error_msg)
Create a status instance that represents an expected error and will not be logged.
IntGauge * cluster_num_running
int64_t GetClusterMemoryEstimate() const
Total estimated memory for all nodes. set_num_hosts() must be set before calling. ...
static MemTracker * GetRequestPoolMemTracker(const std::string &pool_name, MemTracker *parent)
Status CanAdmitRequest(const std::string &pool, const int64_t max_requests, const int64_t mem_limit, const QuerySchedule &schedule, bool admit_from_queue)
T must be a subclass of InternalQueue::Node.
const string REASON_QUEUE_FULL
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
IntGauge * local_in_queue
The total number of requests currently queued locally.
void UpdateClusterAggregates(const std::string &pool_name)
const std::string & request_pool() const
PoolStatsMap cluster_pool_stats_
const char TOPIC_KEY_DELIMITER
Utility class to mark an event when the object is destroyed.
PoolSet pools_for_updates_
IntCounter * local_completed
const string PROFILE_INFO_KEY_ADMISSION_RESULT
IntCounter * local_dequeued
The total number of requests that have been dequeued locally.
RequestPoolService * request_pool_service_
Status DeserializeThriftMsg(JNIEnv *env, jbyteArray serialized_msg, T *deserialized_msg)
const string QUEUED_QUEUE_NOT_EMPTY
boost::mutex admission_ctrl_lock_
PoolMetrics * GetPoolMetrics(const std::string &pool_name)
const string LOCAL_COMPLETED_METRIC_KEY_FORMAT
MetricGroup * metrics_
Metrics subsystem access.
Status Init(StatestoreSubscriber *subscriber)
Registers with the subscription manager.
const string LOCAL_DEQUEUED_METRIC_KEY_FORMAT
IntCounter * local_timed_out
The total number of requests that timed out while waiting for admission locally.
const string REASON_DISABLED_REQUESTS_LIMIT
PoolMetricsMap pool_metrics_map_
const QuerySchedule & schedule
IntCounter * local_time_in_queue_ms
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
const string PROFILE_INFO_VAL_TIME_OUT
const string QUERY_EVENT_SUBMIT_FOR_ADMISSION
const string LOCAL_REJECTED_METRIC_KEY_FORMAT
const string LOCAL_IN_QUEUE_METRIC_KEY_FORMAT
const string CLUSTER_IN_QUEUE_METRIC_KEY_FORMAT
PoolStatsMap local_pool_stats_