Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
simple-scheduler.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef STATESTORE_SIMPLE_SCHEDULER_H
17 #define STATESTORE_SIMPLE_SCHEDULER_H
18 
19 #include <vector>
20 #include <string>
21 #include <list>
22 #include <boost/unordered_map.hpp>
23 #include <boost/thread/mutex.hpp>
24 
25 #include "common/status.h"
26 #include "statestore/scheduler.h"
28 #include "statestore/statestore.h"
29 #include "util/metrics.h"
31 #include "gen-cpp/Types_types.h" // for TNetworkAddress
32 #include "gen-cpp/ResourceBrokerService_types.h"
33 #include "rapidjson/rapidjson.h"
34 
35 namespace impala {
36 
37 class ResourceBroker;
38 class Coordinator;
39 
43 //
46 class SimpleScheduler : public Scheduler {
47  public:
48  static const std::string IMPALA_MEMBERSHIP_TOPIC;
49 
54  SimpleScheduler(StatestoreSubscriber* subscriber, const std::string& backend_id,
55  const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* webserver,
56  ResourceBroker* resource_broker, RequestPoolService* request_pool_service);
57 
60  SimpleScheduler(const std::vector<TNetworkAddress>& backends, MetricGroup* metrics,
61  Webserver* webserver, ResourceBroker* resource_broker,
62  RequestPoolService* request_pool_service);
63 
71  virtual Status GetBackends(const std::vector<TNetworkAddress>& data_locations,
73 
76  virtual impala::Status GetBackend(const TNetworkAddress& data_location,
77  TBackendDescriptor* backend);
78 
80 
81  virtual bool HasLocalBackend(const TNetworkAddress& data_location) {
82  boost::lock_guard<boost::mutex> l(backend_map_lock_);
83  BackendMap::iterator entry = backend_map_.find(data_location.hostname);
84  return (entry != backend_map_.end() && entry->second.size() > 0);
85  }
86 
88  virtual impala::Status Init();
89 
90  virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule);
91  virtual Status Release(QuerySchedule* schedule);
92  virtual void HandlePreemptedReservation(const TUniqueId& reservation_id);
93  virtual void HandlePreemptedResource(const TUniqueId& client_resource_id);
94  virtual void HandleLostResource(const TUniqueId& client_resource_id);
95 
96  private:
100  boost::mutex backend_map_lock_;
101 
103  typedef boost::unordered_map<std::string, std::list<TBackendDescriptor> > BackendMap;
105 
108  typedef boost::unordered_map<std::string, std::string> BackendIpAddressMap;
110 
116  typedef boost::unordered_map<std::string, TBackendDescriptor> BackendIdMap;
118 
121 
124 
126  BackendMap::iterator next_nonlocal_backend_entry_;
127 
132 
134  const std::string backend_id_;
135 
137  TBackendDescriptor backend_descriptor_;
138 
140 
144 
149 
151  uint32_t update_count_;
152 
155 
160  typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveReservationsMap;
162 
167  typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveClientResourcesMap;
169 
173 
177 
179  boost::scoped_ptr<AdmissionController> admission_controller_;
180 
184  const TResourceBrokerReservationResponse& reservation, Coordinator* coord);
185 
189  const TResourceBrokerReservationResponse& reservation);
190 
192  void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
193  std::vector<TTopicDelta>* subscriber_topic_updates);
194 
201  rapidjson::Document* document);
202 
204  Status GetRequestPool(const std::string& user,
205  const TQueryOptions& query_options, std::string* pool) const;
206 
210  Status ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
211  QuerySchedule* schedule);
212 
217  const std::vector<TScanRangeLocations>& locations,
218  const std::vector<TNetworkAddress>& host_list, bool exec_at_coord,
219  const TQueryOptions& query_options, FragmentScanRangeAssignment* assignment);
220 
222  void ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
223  QuerySchedule* schedule);
224 
227  void ComputeFragmentHosts(const TQueryExecRequest& exec_request,
228  QuerySchedule* schedule);
229 
233  const TPlan& plan, const std::vector<TPlanNodeType::type>& types);
234 
239  int fragment_idx, const TQueryExecRequest& exec_request);
240 
242  void GetScanHosts(TPlanNodeId scan_id, const TQueryExecRequest& exec_request,
243  const FragmentExecParams& params, std::vector<TNetworkAddress>* scan_hosts);
244 
246  bool ContainsNode(const TPlan& plan, TPlanNodeType::type type);
247 
249  void FindNodes(const TPlan& plan,
250  const std::vector<TPlanNodeType::type>& types, std::vector<TPlanNodeId>* results);
251 
254  int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
255  const TQueryExecRequest& exec_request);
256 };
257 
258 }
259 
260 #endif
boost::mutex active_resources_lock_
Protects active_reservations_ and active_client_resources_.
void RemoveFromActiveResourceMaps(const TResourceBrokerReservationResponse &reservation)
IntGauge * num_backends_metric_
Current number of backends.
int PlanNodeId
Definition: global-types.h:26
virtual void GetAllKnownBackends(BackendList *backends)
Return a list of all backends known to the scheduler.
void FindNodes(const TPlan &plan, const std::vector< TPlanNodeType::type > &types, std::vector< TPlanNodeId > *results)
Returns all ids of nodes in 'plan' of any of the given types.
static list< string > backends
const std::string backend_id_
Unique - across the cluster - identifier for this impala backend.
virtual void HandleLostResource(const TUniqueId &client_resource_id)
PlanNodeId FindLeftmostNode(const TPlan &plan, const std::vector< TPlanNodeType::type > &types)
IntCounter * total_local_assignments_
class SimpleMetric< int64_t, TMetricKind::COUNTER > IntCounter
Definition: metrics.h:320
void GetScanHosts(TPlanNodeId scan_id, const TQueryExecRequest &exec_request, const FragmentExecParams &params, std::vector< TNetworkAddress > *scan_hosts)
Adds all hosts the given scan is executed on to scan_hosts.
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
ActiveReservationsMap active_reservations_
boost::unordered_map< TNetworkAddress, PerNodeScanRanges > FragmentScanRangeAssignment
void AddToActiveResourceMaps(const TResourceBrokerReservationResponse &reservation, Coordinator *coord)
uint32_t update_count_
Counts the number of UpdateMembership invocations, to help throttle the logging.
MetricGroup * metrics_
MetricGroup subsystem access.
virtual Status GetBackends(const std::vector< TNetworkAddress > &data_locations, BackendList *backends)
std::map< std::string, std::string > ArgumentMap
Definition: webserver.h:36
bool ContainsNode(const TPlan &plan, TPlanNodeType::type type)
Returns true if 'plan' contains a node of the given type.
RequestPoolService * request_pool_service_
static const std::string IMPALA_MEMBERSHIP_TOPIC
Webserver * webserver_
Webserver for /backends. Not owned by us.
boost::unordered_map< TUniqueId, Coordinator * > ActiveReservationsMap
ObjectPool pool
boost::unordered_map< TUniqueId, Coordinator * > ActiveClientResourcesMap
virtual bool HasLocalBackend(const TNetworkAddress &data_location)
Return true if there is a backend located on the given data_location.
void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
Called asynchronously when an update is received from the subscription manager.
virtual void HandlePreemptedResource(const TUniqueId &client_resource_id)
TBackendDescriptor backend_descriptor_
Describes this backend, including the Impalad service address.
Status GetRequestPool(const std::string &user, const TQueryOptions &query_options, std::string *pool) const
Determines the pool for a user and query options via request_pool_service_.
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
Status ComputeScanRangeAssignment(const TQueryExecRequest &exec_request, QuerySchedule *schedule)
ActiveClientResourcesMap active_client_resources_
BackendIdMap current_membership_
boost::scoped_ptr< AdmissionController > admission_controller_
Used to make admission decisions in 'Schedule()'.
class SimpleMetric< bool, TMetricKind::PROPERTY > BooleanProperty
Definition: metrics.h:322
virtual impala::Status Init()
Registers with the subscription manager if required.
SimpleScheduler(StatestoreSubscriber *subscriber, const std::string &backend_id, const TNetworkAddress &backend_address, MetricGroup *metrics, Webserver *webserver, ResourceBroker *resource_broker, RequestPoolService *request_pool_service)
boost::unordered_map< std::string, std::string > BackendIpAddressMap
ResourceBroker * resource_broker_
BackendMap::iterator next_nonlocal_backend_entry_
round robin entry in BackendMap for non-local host assignment
boost::unordered_map< std::string, std::list< TBackendDescriptor > > BackendMap
Map from a datanode's IP address to a list of backend addresses running on that node.
virtual void HandlePreemptedReservation(const TUniqueId &reservation_id)
StatestoreSubscriber * statestore_subscriber_
IntCounter * total_assignments_
Locality metrics.
virtual impala::Status GetBackend(const TNetworkAddress &data_location, TBackendDescriptor *backend)
void ComputeFragmentHosts(const TQueryExecRequest &exec_request, QuerySchedule *schedule)
int FindLeftmostInputFragment(int fragment_idx, const TQueryExecRequest &exec_request)
int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx, const TQueryExecRequest &exec_request)
BackendIpAddressMap backend_ip_map_
std::vector< TBackendDescriptor > BackendList
List of server descriptors.
Definition: scheduler.h:45
virtual Status Schedule(Coordinator *coord, QuerySchedule *schedule)
virtual Status Release(QuerySchedule *schedule)
Releases the reserved resources (if any) from the given schedule.
boost::unordered_map< std::string, TBackendDescriptor > BackendIdMap
void BackendsUrlCallback(const Webserver::ArgumentMap &args, rapidjson::Document *document)
void ComputeFragmentExecParams(const TQueryExecRequest &exec_request, QuerySchedule *schedule)
Populates fragment_exec_params_ in schedule.
ThriftSerializer thrift_serializer_
BooleanProperty * initialised_
Initialisation metric.