15 #ifndef RESOURCE_BROKER_H_
16 #define RESOURCE_BROKER_H_
18 #include <boost/unordered_map.hpp>
19 #include <boost/scoped_ptr.hpp>
20 #include <boost/uuid/uuid.hpp>
25 #include "gen-cpp/LlamaAMService.h"
26 #include "gen-cpp/LlamaNotificationService.h"
27 #include "gen-cpp/ResourceBrokerService_types.h"
31 class QueryResourceMgr;
35 class ResourceBrokerNotificationServiceClient;
46 ResourceBroker(
const std::vector<TNetworkAddress>& llama_addresses,
47 const TNetworkAddress& llama_callback_address,
MetricGroup* metrics);
58 Status Reserve(
const TResourceBrokerReservationRequest& request,
59 TResourceBrokerReservationResponse* response);
63 Status Expand(
const TResourceBrokerExpansionRequest& request,
64 TResourceBrokerExpansionResponse* response);
71 void ClearRequests(
const TUniqueId& reservation_id,
bool include_reservation);
77 TResourceBrokerReleaseResponse* response);
81 void AMNotification(
const llama::TLlamaAMNotificationRequest& request,
82 llama::TLlamaAMNotificationResponse& response);
88 void NMNotification(
const llama::TLlamaNMNotificationRequest& request,
89 llama::TLlamaNMNotificationResponse& response);
102 const TUniqueId& reservation_id,
const TNetworkAddress& local_resource_address,
111 typedef std::map<TNetworkAddress, llama::TAllocatedResource>
ResourceMap;
128 template <
typename LlamaReqType,
typename LlamaRespType>
134 template <
typename LlamaReqType,
typename LlamaRespType>
136 const LlamaReqType& request, LlamaRespType* response);
141 template <
typename LlamaReqType,
typename LlamaRespType>
149 llama::TLlamaAMReservationRequest& dest);
153 llama::TLlamaAMReleaseRequest& dest);
155 class PendingRequest;
161 PendingRequest* reservation);
290 DCHECK(is_expansion || reservation_id == request_id);
305 void SetResources(
const std::vector<llama::TAllocatedResource>& resources);
376 typedef boost::unordered_map<llama::TUniqueId, std::vector<AllocatedRequest> >
382 typedef boost::unordered_map<TUniqueId, std::pair<int32_t, QueryResourceMgr*> >
392 const TResourceBrokerReservationRequest& request);
395 const TResourceBrokerReservationResponse& reservation);
398 const TResourceBrokerExpansionRequest& request);
401 const TResourceBrokerExpansionResponse& expansion);
const llama::TUniqueId & request_id() const
const llama::TUniqueId & reservation_id() const
uint64_t memory_mb_
The total memory allocated to this request.
TNetworkAddress llama_callback_address_
Promise< bool > promise_
Promise object that WaitForNotification() waits on and AMNotification() signals.
class SimpleMetric< std::string, TMetricKind::PROPERTY > StringProperty
AllocatedRequestMap allocated_requests_
void AMNotification(const llama::TLlamaAMNotificationRequest &request, llama::TLlamaAMNotificationResponse &response)
std::vector< llama::TAllocatedResource > allocated_resources_
IntCounter * reservation_requests_total_metric_
Total number of reservation requests.
IntCounter * reservation_requests_timedout_metric_
boost::mutex llama_registration_lock_
UIntGauge * allocated_memory_metric_
Total amount of memory currently allocated by Llama to this node.
const TUniqueId & query_id() const
Status Reserve(const TResourceBrokerReservationRequest &request, TResourceBrokerReservationResponse *response)
Requests resources from Llama. Blocks until the request has been granted or denied.
uint64_t memory_mb() const
boost::unordered_map< llama::TUniqueId, std::vector< AllocatedRequest > > AllocatedRequestMap
class SimpleMetric< int64_t, TMetricKind::COUNTER > IntCounter
IntCounter * requests_released_metric_
Total number of fulfilled reservation requests that have been released.
StatsMetric< double > * reservation_rpc_time_metric_
void Close()
Closes the llama_client_cache_ and joins the llama_callback_server_.
IntCounter * expansion_requests_total_metric_
Total number of expansion requests.
MetricGroups may be organised hierarchically as a tree.
StatsMetric< double > * expansion_rpc_time_metric_
boost::shared_ptr< llama::LlamaNotificationServiceIf > llama_callback_thrift_iface_
Thrift API implementation which proxies Llama notifications onto this ResourceBroker.
bool is_expansion() const
bool WaitForNotification(int64_t timeout, ResourceMap *resources, bool *timed_out, PendingRequest *reservation)
bool GetQueryResourceMgr(const TUniqueId &query_id, const TUniqueId &reservation_id, const TNetworkAddress &local_resource_address, QueryResourceMgr **res_mgr)
llama::TUniqueId reservation_id_
The reservation ID for this request. Expansions all share the same reservation ID.
UIntGauge * allocated_vcpus_metric_
Total number of vcpu cores currently allocated by Llama to this node.
IntCounter * reservation_requests_failed_metric_
StringProperty * active_llama_metric_
void SendLlamaRpc(ClientConnection< llama::LlamaAMServiceClient > *llama_client, const LlamaReqType &request, LlamaRespType *response)
void ClearRequests(const TUniqueId &reservation_id, bool include_reservation)
boost::mutex pending_requests_lock_
Protects pending_requests_.
IntCounter * expansion_requests_rejected_metric_
Number of well-formed expansion requests rejected by the central scheduler.
bool LlamaHasRestarted(const llama::TStatus &status) const
Detects Llama restarts from the given return status of a Llama RPC.
uint32_t vcpus_
The number of VCPUs allocated to this request.
IntCounter * expansion_requests_failed_metric_
llama::TUniqueId request_id_
Status Release(const TResourceBrokerReleaseRequest &request, TResourceBrokerReleaseResponse *response)
AllocatedRequest(const llama::TUniqueId &reservation_id, uint64_t memory_mb, uint32_t vcpus, bool is_expansion)
IntCounter * reservation_requests_fulfilled_metric_
Number of fulfilled reservation requests.
PendingRequest(const llama::TUniqueId &reservation_id, const llama::TUniqueId &request_id, bool is_expansion)
int active_llama_addr_idx_
PendingRequestMap pending_requests_
llama::TUniqueId reservation_id_
void CreateLlamaReleaseRequest(const TResourceBrokerReleaseRequest &src, llama::TLlamaAMReleaseRequest &dest)
Creates a Llama release request from a resource broker release request.
void UnregisterQueryResourceMgr(const TUniqueId &query_id)
StatsMetric< double > * expansion_response_time_metric_
Promise< bool > * promise()
boost::mutex allocated_requests_lock_
Protectes allocated_requests_.
boost::uuids::uuid llama_client_id_
void set_scheduler(Scheduler *scheduler)
ResourceBroker(const std::vector< TNetworkAddress > &llama_addresses, const TNetworkAddress &llama_callback_address, MetricGroup *metrics)
bool is_expansion_
True if this is an expansion request, false if it is a reservation request.
IntCounter * expansion_requests_fulfilled_metric_
Number of fulfilled expansion requests.
Status RefreshLlamaNodes()
Retrieves the nodes known to Llama and stores them in llama_nodes_.
StringProperty * active_llama_handle_metric_
boost::unordered_map< llama::TUniqueId, PendingRequest * > PendingRequestMap
IntCounter * expansion_requests_timedout_metric_
Status RegisterWithLlama()
llama::TUniqueId llama_handle_
Handle received from Llama during registration. Set in RegisterWithLlama().
std::vector< TNetworkAddress > llama_addresses_
Llama availability group.
Status ReRegisterWithLlama(const LlamaReqType &request, LlamaRespType *response)
bool is_expansion_
True if this is an expansion request, false if it is a reservation request.
boost::scoped_ptr< ThriftServer > llama_callback_server_
Status Expand(const TResourceBrokerExpansionRequest &request, TResourceBrokerExpansionResponse *response)
boost::unordered_map< TUniqueId, std::pair< int32_t, QueryResourceMgr * > > QueryResourceMgrsMap
QueryResourceMgrsMap query_resource_mgrs_
boost::mutex query_resource_mgrs_lock_
Protects query_resource_mgrs_.
std::vector< std::string > llama_nodes_
List of nodes registered with Llama. Set in RefreshLlamaNodes().
IntCounter * reservation_requests_rejected_metric_
Number of well-formed reservation requests rejected by the central scheduler.
Only CPU-heavy threads need be managed using this class.
bool is_expansion() const
ostream & operator<<(ostream &os, const map< TNetworkAddress, llama::TAllocatedResource > &resources)
std::map< TNetworkAddress, llama::TAllocatedResource > ResourceMap
void GetResources(ResourceMap *resources)
void NMNotification(const llama::TLlamaNMNotificationRequest &request, llama::TLlamaNMNotificationResponse &response)
StatsMetric< double > * reservation_response_time_metric_
const std::vector< std::string > & llama_nodes()
Status LlamaRpc(LlamaReqType *request, LlamaRespType *response, StatsMetric< double > *rpc_time_metric)
const llama::TUniqueId reservation_id() const
boost::scoped_ptr< ClientCache< llama::LlamaAMServiceClient > > llama_client_cache_
Cache of Llama client connections.
void SetResources(const std::vector< llama::TAllocatedResource > &resources)
void CreateLlamaReservationRequest(const TResourceBrokerReservationRequest &src, llama::TLlamaAMReservationRequest &dest)
Creates a Llama reservation request from a resource broker reservation request.