Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
admission-controller.h
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef SCHEDULING_ADMISSION_CONTROLLER_H
17 #define SCHEDULING_ADMISSION_CONTROLLER_H
18 
19 #include <vector>
20 #include <string>
21 #include <list>
22 
23 #include <boost/unordered_map.hpp>
24 #include <boost/unordered_set.hpp>
25 #include <boost/thread/mutex.hpp>
26 
27 #include "common/status.h"
31 #include "util/internal-queue.h"
32 #include "util/thread.h"
33 
34 namespace impala {
35 
36 class QuerySchedule;
37 class ExecEnv;
38 
51 //
66 //
82  public:
83  AdmissionController(RequestPoolService* request_pool_service, MetricGroup* metrics,
84  const std::string& backend_id);
86 
92  Status AdmitQuery(QuerySchedule* schedule);
93 
100 
102  Status Init(StatestoreSubscriber* subscriber);
103 
104  private:
105  static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
106 
109  struct QueueNode : public InternalQueue<QueueNode>::Node {
110  QueueNode(const QuerySchedule& query_schedule) : schedule(query_schedule) { }
111 
118 
123  };
124 
127  struct PoolMetrics {
150 
168  };
169 
173 
176 
178  boost::scoped_ptr<Thread> dequeue_thread_;
179 
181  const std::string backend_id_;
182 
185 
189  boost::mutex admission_ctrl_lock_;
190 
192  typedef boost::unordered_map<std::string, TPoolStats> PoolStatsMap;
193 
197 
199  typedef boost::unordered_set<std::string> PoolSet;
200 
204 
211  typedef boost::unordered_map<std::string, PoolStatsMap> PerBackendPoolStatsMap;
213 
221 
226 
228  typedef boost::unordered_map<std::string, RequestQueue> RequestQueueMap;
230 
232  typedef boost::unordered_map<std::string, PoolMetrics> PoolMetricsMap;
234 
238  typedef boost::unordered_map<std::string, TPoolConfigResult> PoolConfigMap;
240 
243  boost::condition_variable dequeue_cv_;
244 
246  bool done_;
247 
249  void UpdatePoolStats(
250  const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
251  std::vector<TTopicDelta>* subscriber_topic_updates);
252 
255  void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);
256 
259  void HandleTopicDeletions(const std::vector<std::string>& topic_deletions);
260 
264  void UpdateClusterAggregates(const std::string& pool_name);
265 
269  void UpdateLocalMemUsage(const std::string& pool_name);
270 
274  void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
275 
277  void DequeueLoop();
278 
282  Status CanAdmitRequest(const std::string& pool, const int64_t max_requests,
283  const int64_t mem_limit, const QuerySchedule& schedule, bool admit_from_queue);
284 
287  Status RejectRequest(const std::string& pool, const int64_t max_requests,
288  const int64_t mem_limit, const int64_t max_queued, const QuerySchedule& schedule);
289 
293  PoolMetrics* GetPoolMetrics(const std::string& pool_name);
294 };
295 
296 }
297 
298 #endif // SCHEDULING_ADMISSION_CONTROLLER_H
InternalQueue< QueueNode > RequestQueue
PerBackendPoolStatsMap per_backend_pool_stats_map_
void HandleTopicUpdates(const std::vector< TTopicItem > &topic_updates)
IntGauge * local_mem_estimate
The sum of planner memory estimates for requests that were started locally.
void UpdatePoolStats(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
Statestore subscriber callback that updates the pool stats state.
IntGauge * local_num_running
The total number of queries currently running that were initiated locally.
void HandleTopicDeletions(const std::vector< std::string > &topic_deletions)
boost::scoped_ptr< Thread > dequeue_thread_
Thread dequeuing and admitting queries.
class SimpleMetric< int64_t, TMetricKind::COUNTER > IntCounter
Definition: metrics.h:320
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
IntGauge * cluster_in_queue
The estimated total number of requests currently queued across the cluster.
Status ReleaseQuery(QuerySchedule *schedule)
IntGauge * local_mem_usage
The total amount of memory used by this pool locally.
boost::unordered_map< std::string, PoolMetrics > PoolMetricsMap
Map of pool names to pool metrics.
const std::string backend_id_
Unique id for this impalad, used to construct topic keys.
static const std::string IMPALA_REQUEST_QUEUE_TOPIC
boost::condition_variable dequeue_cv_
void UpdateLocalMemUsage(const std::string &pool_name)
QueueNode(const QuerySchedule &query_schedule)
void DequeueLoop()
Dequeues and admits queued queries when notified by dequeue_cv_.
IntGauge * cluster_mem_estimate
The sum of planner memory estimates for requests across the cluster.
boost::unordered_set< std::string > PoolSet
A set of pool names.
ObjectPool pool
ThriftSerializer thrift_serializer_
Serializes/deserializes TPoolStats when sending and receiving topic updates.
IntGauge * cluster_mem_usage
Approximate total amount of memory used by this pool across the cluster.
Status AdmitQuery(QuerySchedule *schedule)
void AddPoolUpdates(std::vector< TTopicDelta > *subscriber_topic_updates)
boost::unordered_map< std::string, TPoolStats > PoolStatsMap
Map of pool names to pool statistics.
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
boost::unordered_map< std::string, RequestQueue > RequestQueueMap
Map of pool names to request queues.
Status RejectRequest(const std::string &pool, const int64_t max_requests, const int64_t mem_limit, const int64_t max_queued, const QuerySchedule &schedule)
bool done_
If true, tear down the dequeuing thread. This only happens in unit tests.
Status CanAdmitRequest(const std::string &pool, const int64_t max_requests, const int64_t mem_limit, const QuerySchedule &schedule, bool admit_from_queue)
T must be a subclass of InternalQueue::Node.
IntGauge * local_in_queue
The total number of requests currently queued locally.
void UpdateClusterAggregates(const std::string &pool_name)
IntCounter * local_dequeued
The total number of requests that have been dequeued locally.
RequestPoolService * request_pool_service_
boost::unordered_map< std::string, PoolStatsMap > PerBackendPoolStatsMap
PoolMetrics * GetPoolMetrics(const std::string &pool_name)
AdmissionController(RequestPoolService *request_pool_service, MetricGroup *metrics, const std::string &backend_id)
MetricGroup * metrics_
Metrics subsystem access.
Status Init(StatestoreSubscriber *subscriber)
Registers with the subscription manager.
IntCounter * local_timed_out
The total number of requests that timed out while waiting for admission locally.
boost::unordered_map< std::string, TPoolConfigResult > PoolConfigMap