16 #ifndef STATESTORE_SIMPLE_SCHEDULER_H
17 #define STATESTORE_SIMPLE_SCHEDULER_H
22 #include <boost/unordered_map.hpp>
23 #include <boost/thread/mutex.hpp>
31 #include "gen-cpp/Types_types.h"
32 #include "gen-cpp/ResourceBrokerService_types.h"
33 #include "rapidjson/rapidjson.h"
77 TBackendDescriptor* backend);
83 BackendMap::iterator entry =
backend_map_.find(data_location.hostname);
84 return (entry !=
backend_map_.end() && entry->second.size() > 0);
103 typedef boost::unordered_map<std::string, std::list<TBackendDescriptor> >
BackendMap;
116 typedef boost::unordered_map<std::string, TBackendDescriptor>
BackendIdMap;
184 const TResourceBrokerReservationResponse& reservation, Coordinator* coord);
189 const TResourceBrokerReservationResponse& reservation);
193 std::vector<TTopicDelta>* subscriber_topic_updates);
201 rapidjson::Document* document);
205 const TQueryOptions& query_options, std::string*
pool)
const;
217 const std::vector<TScanRangeLocations>& locations,
218 const std::vector<TNetworkAddress>& host_list,
bool exec_at_coord,
233 const TPlan& plan,
const std::vector<TPlanNodeType::type>& types);
239 int fragment_idx,
const TQueryExecRequest& exec_request);
242 void GetScanHosts(TPlanNodeId scan_id,
const TQueryExecRequest& exec_request,
246 bool ContainsNode(
const TPlan& plan, TPlanNodeType::type type);
250 const std::vector<TPlanNodeType::type>& types, std::vector<TPlanNodeId>* results);
255 const TQueryExecRequest& exec_request);
boost::mutex active_resources_lock_
Protects active_reservations_ and active_client_resources_.
void RemoveFromActiveResourceMaps(const TResourceBrokerReservationResponse &reservation)
IntGauge * num_backends_metric_
Current number of backends.
virtual void GetAllKnownBackends(BackendList *backends)
Return a list of all backends known to the scheduler.
void FindNodes(const TPlan &plan, const std::vector< TPlanNodeType::type > &types, std::vector< TPlanNodeId > *results)
Returns all ids of nodes in 'plan' of any of the given types.
static list< string > backends
const std::string backend_id_
Unique - across the cluster - identifier for this impala backend.
virtual void HandleLostResource(const TUniqueId &client_resource_id)
PlanNodeId FindLeftmostNode(const TPlan &plan, const std::vector< TPlanNodeType::type > &types)
IntCounter * total_local_assignments_
class SimpleMetric< int64_t, TMetricKind::COUNTER > IntCounter
void GetScanHosts(TPlanNodeId scan_id, const TQueryExecRequest &exec_request, const FragmentExecParams ¶ms, std::vector< TNetworkAddress > *scan_hosts)
Adds all hosts the given scan is executed on to scan_hosts.
MetricGroups may be organised hierarchically as a tree.
ActiveReservationsMap active_reservations_
boost::unordered_map< TNetworkAddress, PerNodeScanRanges > FragmentScanRangeAssignment
void AddToActiveResourceMaps(const TResourceBrokerReservationResponse &reservation, Coordinator *coord)
uint32_t update_count_
Counts the number of UpdateMembership invocations, to help throttle the logging.
MetricGroup * metrics_
MetricGroup subsystem access.
virtual Status GetBackends(const std::vector< TNetworkAddress > &data_locations, BackendList *backends)
std::map< std::string, std::string > ArgumentMap
bool ContainsNode(const TPlan &plan, TPlanNodeType::type type)
Returns true if 'plan' contains a node of the given type.
RequestPoolService * request_pool_service_
static const std::string IMPALA_MEMBERSHIP_TOPIC
Webserver * webserver_
Webserver for /backends. Not owned by us.
boost::unordered_map< TUniqueId, Coordinator * > ActiveReservationsMap
boost::unordered_map< TUniqueId, Coordinator * > ActiveClientResourcesMap
virtual bool HasLocalBackend(const TNetworkAddress &data_location)
Return true if there is a backend located on the given data_location.
void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
Called asynchronously when an update is received from the subscription manager.
virtual void HandlePreemptedResource(const TUniqueId &client_resource_id)
TBackendDescriptor backend_descriptor_
Describes this backend, including the Impalad service address.
Status GetRequestPool(const std::string &user, const TQueryOptions &query_options, std::string *pool) const
Determines the pool for a user and query options via request_pool_service_.
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
Status ComputeScanRangeAssignment(const TQueryExecRequest &exec_request, QuerySchedule *schedule)
ActiveClientResourcesMap active_client_resources_
BackendIdMap current_membership_
boost::scoped_ptr< AdmissionController > admission_controller_
Used to make admission decisions in 'Schedule()'.
class SimpleMetric< bool, TMetricKind::PROPERTY > BooleanProperty
virtual impala::Status Init()
Registers with the subscription manager if required.
SimpleScheduler(StatestoreSubscriber *subscriber, const std::string &backend_id, const TNetworkAddress &backend_address, MetricGroup *metrics, Webserver *webserver, ResourceBroker *resource_broker, RequestPoolService *request_pool_service)
boost::unordered_map< std::string, std::string > BackendIpAddressMap
ResourceBroker * resource_broker_
boost::mutex backend_map_lock_
BackendMap::iterator next_nonlocal_backend_entry_
round robin entry in BackendMap for non-local host assignment
boost::unordered_map< std::string, std::list< TBackendDescriptor > > BackendMap
Map from a datanode's IP address to a list of backend addresses running on that node.
virtual void HandlePreemptedReservation(const TUniqueId &reservation_id)
StatestoreSubscriber * statestore_subscriber_
IntCounter * total_assignments_
Locality metrics.
virtual impala::Status GetBackend(const TNetworkAddress &data_location, TBackendDescriptor *backend)
void ComputeFragmentHosts(const TQueryExecRequest &exec_request, QuerySchedule *schedule)
int FindLeftmostInputFragment(int fragment_idx, const TQueryExecRequest &exec_request)
int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx, const TQueryExecRequest &exec_request)
BackendIpAddressMap backend_ip_map_
std::vector< TBackendDescriptor > BackendList
List of server descriptors.
virtual Status Schedule(Coordinator *coord, QuerySchedule *schedule)
virtual Status Release(QuerySchedule *schedule)
Releases the reserved resources (if any) from the given schedule.
boost::unordered_map< std::string, TBackendDescriptor > BackendIdMap
void BackendsUrlCallback(const Webserver::ArgumentMap &args, rapidjson::Document *document)
void ComputeFragmentExecParams(const TQueryExecRequest &exec_request, QuerySchedule *schedule)
Populates fragment_exec_params_ in schedule.
ThriftSerializer thrift_serializer_
BooleanProperty * initialised_
Initialisation metric.