Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
resource-broker.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RESOURCE_BROKER_H_
16 #define RESOURCE_BROKER_H_
17 
18 #include <boost/unordered_map.hpp>
19 #include <boost/scoped_ptr.hpp>
20 #include <boost/uuid/uuid.hpp>
21 
22 #include "runtime/client-cache.h"
24 #include "util/promise.h"
25 #include "gen-cpp/LlamaAMService.h"
26 #include "gen-cpp/LlamaNotificationService.h"
27 #include "gen-cpp/ResourceBrokerService_types.h"
28 
29 namespace impala {
30 
31 class QueryResourceMgr;
32 class Status;
33 class MetricGroup;
34 class Scheduler;
35 class ResourceBrokerNotificationServiceClient;
36 
45  public:
46  ResourceBroker(const std::vector<TNetworkAddress>& llama_addresses,
47  const TNetworkAddress& llama_callback_address, MetricGroup* metrics);
48 
52  Status Init();
53 
55  void Close();
56 
58  Status Reserve(const TResourceBrokerReservationRequest& request,
59  TResourceBrokerReservationResponse* response);
60 
63  Status Expand(const TResourceBrokerExpansionRequest& request,
64  TResourceBrokerExpansionResponse* response);
65 
71  void ClearRequests(const TUniqueId& reservation_id, bool include_reservation);
72 
76  Status Release(const TResourceBrokerReleaseRequest& request,
77  TResourceBrokerReleaseResponse* response);
78 
81  void AMNotification(const llama::TLlamaAMNotificationRequest& request,
82  llama::TLlamaAMNotificationResponse& response);
83 
88  void NMNotification(const llama::TLlamaNMNotificationRequest& request,
89  llama::TLlamaNMNotificationResponse& response);
90 
91  const std::vector<std::string>& llama_nodes() { return llama_nodes_; }
92 
95 
96  void set_scheduler(Scheduler* scheduler) { scheduler_ = scheduler; };
97 
101  bool GetQueryResourceMgr(const TUniqueId& query_id,
102  const TUniqueId& reservation_id, const TNetworkAddress& local_resource_address,
103  QueryResourceMgr** res_mgr);
104 
108  void UnregisterQueryResourceMgr(const TUniqueId& query_id);
109 
110  private:
111  typedef std::map<TNetworkAddress, llama::TAllocatedResource> ResourceMap;
112 
113  bool has_standby_llama() { return llama_addresses_.size() > 1; }
114 
122 
128  template <typename LlamaReqType, typename LlamaRespType>
129  Status LlamaRpc(LlamaReqType* request, LlamaRespType* response,
130  StatsMetric<double>* rpc_time_metric);
131 
134  template <typename LlamaReqType, typename LlamaRespType>
136  const LlamaReqType& request, LlamaRespType* response);
137 
141  template <typename LlamaReqType, typename LlamaRespType>
142  Status ReRegisterWithLlama(const LlamaReqType& request, LlamaRespType* response);
143 
145  bool LlamaHasRestarted(const llama::TStatus& status) const;
146 
148  void CreateLlamaReservationRequest(const TResourceBrokerReservationRequest& src,
149  llama::TLlamaAMReservationRequest& dest);
150 
152  void CreateLlamaReleaseRequest(const TResourceBrokerReleaseRequest& src,
153  llama::TLlamaAMReleaseRequest& dest);
154 
155  class PendingRequest;
160  bool WaitForNotification(int64_t timeout, ResourceMap* resources, bool* timed_out,
161  PendingRequest* reservation);
162 
164  std::vector<TNetworkAddress> llama_addresses_;
165 
169 
172  TNetworkAddress llama_callback_address_;
173 
175 
177 
181 
185 
189 
195 
198 
201 
205 
208 
212 
216 
222 
225 
228 
232 
235 
239 
242 
245 
248 
255  boost::uuids::uuid llama_client_id_;
256 
258  boost::shared_ptr<llama::LlamaNotificationServiceIf> llama_callback_thrift_iface_;
259  boost::scoped_ptr<ThriftServer> llama_callback_server_;
260 
262  boost::scoped_ptr<ClientCache<llama::LlamaAMServiceClient> > llama_client_cache_;
263 
267 
269  llama::TUniqueId llama_handle_;
270 
272  std::vector<std::string> llama_nodes_;
273 
278  //
285  public:
286  PendingRequest(const llama::TUniqueId& reservation_id,
287  const llama::TUniqueId& request_id, bool is_expansion)
288  : reservation_id_(reservation_id), request_id_(request_id),
289  is_expansion_(is_expansion) {
290  DCHECK(is_expansion || reservation_id == request_id);
291  }
292 
297 
301  void GetResources(ResourceMap* resources);
302 
305  void SetResources(const std::vector<llama::TAllocatedResource>& resources);
306 
307  const llama::TUniqueId& request_id() const { return request_id_; }
308  const llama::TUniqueId& reservation_id() const { return reservation_id_; }
309 
310  bool is_expansion() const { return is_expansion_; }
311 
312  private:
315 
319  std::vector<llama::TAllocatedResource> allocated_resources_;
320 
323  llama::TUniqueId reservation_id_;
324 
327  llama::TUniqueId request_id_;
328 
331  };
332 
335 
339  typedef boost::unordered_map<llama::TUniqueId, PendingRequest*> PendingRequestMap;
341 
345  public:
346  AllocatedRequest(const llama::TUniqueId& reservation_id,
347  uint64_t memory_mb, uint32_t vcpus, bool is_expansion)
348  : reservation_id_(reservation_id), memory_mb_(memory_mb), vcpus_(vcpus),
349  is_expansion_(is_expansion) { }
350 
351  const llama::TUniqueId reservation_id() const { return reservation_id_; }
352  uint64_t memory_mb() const { return memory_mb_; }
353  uint32_t vcpus() const { return vcpus_; }
354  bool is_expansion() const { return is_expansion_; }
355 
356  private:
358  llama::TUniqueId reservation_id_;
359 
362 
364  uint32_t vcpus_;
365 
368  };
369 
372 
376  typedef boost::unordered_map<llama::TUniqueId, std::vector<AllocatedRequest> >
379 
382  typedef boost::unordered_map<TUniqueId, std::pair<int32_t, QueryResourceMgr*> >
384 
389 };
390 
391 std::ostream& operator<<(std::ostream& os,
392  const TResourceBrokerReservationRequest& request);
393 
394 std::ostream& operator<<(std::ostream& os,
395  const TResourceBrokerReservationResponse& reservation);
396 
397 std::ostream& operator<<(std::ostream& os,
398  const TResourceBrokerExpansionRequest& request);
399 
400 std::ostream& operator<<(std::ostream& os,
401  const TResourceBrokerExpansionResponse& expansion);
402 }
403 
404 #endif
const llama::TUniqueId & request_id() const
const llama::TUniqueId & reservation_id() const
uint64_t memory_mb_
The total memory allocated to this request.
TNetworkAddress llama_callback_address_
Promise< bool > promise_
Promise object that WaitForNotification() waits on and AMNotification() signals.
class SimpleMetric< std::string, TMetricKind::PROPERTY > StringProperty
Definition: metrics.h:323
AllocatedRequestMap allocated_requests_
void AMNotification(const llama::TLlamaAMNotificationRequest &request, llama::TLlamaAMNotificationResponse &response)
std::vector< llama::TAllocatedResource > allocated_resources_
IntCounter * reservation_requests_total_metric_
Total number of reservation requests.
IntCounter * reservation_requests_timedout_metric_
boost::mutex llama_registration_lock_
UIntGauge * allocated_memory_metric_
Total amount of memory currently allocated by Llama to this node.
const TUniqueId & query_id() const
Definition: coordinator.h:152
Status Reserve(const TResourceBrokerReservationRequest &request, TResourceBrokerReservationResponse *response)
Requests resources from Llama. Blocks until the request has been granted or denied.
boost::unordered_map< llama::TUniqueId, std::vector< AllocatedRequest > > AllocatedRequestMap
class SimpleMetric< int64_t, TMetricKind::COUNTER > IntCounter
Definition: metrics.h:320
IntCounter * requests_released_metric_
Total number of fulfilled reservation requests that have been released.
StatsMetric< double > * reservation_rpc_time_metric_
void Close()
Closes the llama_client_cache_ and joins the llama_callback_server_.
IntCounter * expansion_requests_total_metric_
Total number of expansion requests.
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
StatsMetric< double > * expansion_rpc_time_metric_
boost::shared_ptr< llama::LlamaNotificationServiceIf > llama_callback_thrift_iface_
Thrift API implementation which proxies Llama notifications onto this ResourceBroker.
bool WaitForNotification(int64_t timeout, ResourceMap *resources, bool *timed_out, PendingRequest *reservation)
bool GetQueryResourceMgr(const TUniqueId &query_id, const TUniqueId &reservation_id, const TNetworkAddress &local_resource_address, QueryResourceMgr **res_mgr)
llama::TUniqueId reservation_id_
The reservation ID for this request. Expansions all share the same reservation ID.
UIntGauge * allocated_vcpus_metric_
Total number of vcpu cores currently allocated by Llama to this node.
IntCounter * reservation_requests_failed_metric_
StringProperty * active_llama_metric_
void SendLlamaRpc(ClientConnection< llama::LlamaAMServiceClient > *llama_client, const LlamaReqType &request, LlamaRespType *response)
void ClearRequests(const TUniqueId &reservation_id, bool include_reservation)
boost::mutex pending_requests_lock_
Protects pending_requests_.
IntCounter * expansion_requests_rejected_metric_
Number of well-formed expansion requests rejected by the central scheduler.
bool LlamaHasRestarted(const llama::TStatus &status) const
Detects Llama restarts from the given return status of a Llama RPC.
uint32_t vcpus_
The number of VCPUs allocated to this request.
IntCounter * expansion_requests_failed_metric_
Status Release(const TResourceBrokerReleaseRequest &request, TResourceBrokerReleaseResponse *response)
AllocatedRequest(const llama::TUniqueId &reservation_id, uint64_t memory_mb, uint32_t vcpus, bool is_expansion)
IntCounter * reservation_requests_fulfilled_metric_
Number of fulfilled reservation requests.
PendingRequest(const llama::TUniqueId &reservation_id, const llama::TUniqueId &request_id, bool is_expansion)
PendingRequestMap pending_requests_
void CreateLlamaReleaseRequest(const TResourceBrokerReleaseRequest &src, llama::TLlamaAMReleaseRequest &dest)
Creates a Llama release request from a resource broker release request.
void UnregisterQueryResourceMgr(const TUniqueId &query_id)
StatsMetric< double > * expansion_response_time_metric_
boost::mutex allocated_requests_lock_
Protectes allocated_requests_.
boost::uuids::uuid llama_client_id_
void set_scheduler(Scheduler *scheduler)
ResourceBroker(const std::vector< TNetworkAddress > &llama_addresses, const TNetworkAddress &llama_callback_address, MetricGroup *metrics)
bool is_expansion_
True if this is an expansion request, false if it is a reservation request.
IntCounter * expansion_requests_fulfilled_metric_
Number of fulfilled expansion requests.
Status RefreshLlamaNodes()
Retrieves the nodes known to Llama and stores them in llama_nodes_.
StringProperty * active_llama_handle_metric_
boost::unordered_map< llama::TUniqueId, PendingRequest * > PendingRequestMap
IntCounter * expansion_requests_timedout_metric_
llama::TUniqueId llama_handle_
Handle received from Llama during registration. Set in RegisterWithLlama().
std::vector< TNetworkAddress > llama_addresses_
Llama availability group.
Status ReRegisterWithLlama(const LlamaReqType &request, LlamaRespType *response)
bool is_expansion_
True if this is an expansion request, false if it is a reservation request.
boost::scoped_ptr< ThriftServer > llama_callback_server_
Status Expand(const TResourceBrokerExpansionRequest &request, TResourceBrokerExpansionResponse *response)
boost::unordered_map< TUniqueId, std::pair< int32_t, QueryResourceMgr * > > QueryResourceMgrsMap
QueryResourceMgrsMap query_resource_mgrs_
boost::mutex query_resource_mgrs_lock_
Protects query_resource_mgrs_.
std::vector< std::string > llama_nodes_
List of nodes registered with Llama. Set in RefreshLlamaNodes().
IntCounter * reservation_requests_rejected_metric_
Number of well-formed reservation requests rejected by the central scheduler.
Only CPU-heavy threads need be managed using this class.
ostream & operator<<(ostream &os, const map< TNetworkAddress, llama::TAllocatedResource > &resources)
std::map< TNetworkAddress, llama::TAllocatedResource > ResourceMap
void GetResources(ResourceMap *resources)
void NMNotification(const llama::TLlamaNMNotificationRequest &request, llama::TLlamaNMNotificationResponse &response)
StatsMetric< double > * reservation_response_time_metric_
const std::vector< std::string > & llama_nodes()
Status LlamaRpc(LlamaReqType *request, LlamaRespType *response, StatsMetric< double > *rpc_time_metric)
const llama::TUniqueId reservation_id() const
boost::scoped_ptr< ClientCache< llama::LlamaAMServiceClient > > llama_client_cache_
Cache of Llama client connections.
void SetResources(const std::vector< llama::TAllocatedResource > &resources)
void CreateLlamaReservationRequest(const TResourceBrokerReservationRequest &src, llama::TLlamaAMReservationRequest &dest)
Creates a Llama reservation request from a resource broker reservation request.