17 #include <boost/algorithm/string/join.hpp>
18 #include <boost/foreach.hpp>
19 #include <boost/uuid/uuid.hpp>
20 #include <boost/uuid/uuid_generators.hpp>
21 #include <boost/uuid/uuid_io.hpp>
22 #include <boost/lexical_cast.hpp>
23 #include <gutil/strings/substitute.h>
24 #include <thrift/Thrift.h>
37 #include "gen-cpp/ResourceBrokerService.h"
38 #include "gen-cpp/Llama_types.h"
42 using boost::algorithm::join;
43 using boost::algorithm::to_lower;
44 using boost::uuids::random_generator;
45 using namespace ::apache::thrift::server;
46 using namespace ::apache::thrift;
47 using namespace impala;
48 using namespace strings;
70 : resource_broker_(resource_broker) {}
73 const llama::TLlamaAMNotificationRequest& request) {
74 resource_broker_->AMNotification(request, response);
78 const llama::TLlamaNMNotificationRequest& request) {
79 LOG(WARNING) <<
"Ignoring node-manager notification. Handling not yet implemented.";
90 const TNetworkAddress& llama_callback_address,
MetricGroup* metrics) :
91 llama_addresses_(llama_addresses),
92 active_llama_addr_idx_(-1),
93 llama_callback_address_(llama_callback_address),
97 llama_client_cache_(new
ClientCache<llama::LlamaAMServiceClient>(
98 FLAGS_resource_broker_cnxn_attempts,
99 FLAGS_resource_broker_cnxn_retry_interval_ms,
100 FLAGS_resource_broker_send_timeout,
101 FLAGS_resource_broker_recv_timeout,
103 DCHECK(metrics != NULL);
105 "resource-broker.active-llama",
"none");
107 "resource-broker.active-llama-handle",
"none");
111 TUnit::TIME_S,
"The time, in seconds, that a Reserve() RPC takes to "
115 TUnit::TIME_S,
"The time, in seconds, that a reservation request takes "
116 "to be fulfilled by Llama"));
118 "resource-broker.reservation-requests-total", 0, TUnit::UNIT,
119 "The total number of reservation requests made by this Impala daemon to Llama");
121 "resource-broker.reservation-requests-fulfilled", 0, TUnit::UNIT,
122 "The number of reservation requests made by this Impala daemon to Llama "
125 "resource-broker.reservation-requests-failed", 0, TUnit::UNIT,
126 "The number of reservation requests made by this Impala daemon to Llama which "
129 "resource-broker.reservation-requests-rejected", 0, TUnit::UNIT,
130 "The number of reservation requests made by this Impala daemon to Llama "
131 "which were rejected");
133 "resource-broker.reservation-requests-timedout", 0, TUnit::UNIT,
134 "The number of reservation requests made by this Impala daemon to Llama "
140 "The time, in seconds, that a Reserve() RPC takes to Llama"));
143 TUnit::TIME_S,
"The time, in seconds, that a expansion request takes "
144 "to be fulfilled by Llama"));
146 "resource-broker.expansion-requests-total", 0, TUnit::UNIT,
147 "The total number of expansion requests made by this Impala daemon to Llama");
149 "resource-broker.expansion-requests-fulfilled", 0, TUnit::UNIT,
150 "The number of expansion requests made by this Impala daemon to Llama "
153 "resource-broker.expansion-requests-failed", 0, TUnit::UNIT,
154 "The number of expansion requests made by this Impala daemon to Llama which "
157 "resource-broker.expansion-requests-rejected", 0, TUnit::UNIT,
158 "The number of expansion requests made by this Impala daemon to Llama "
159 "which were rejected");
161 "resource-broker.expansion-requests-timedout", 0, TUnit::UNIT,
162 "The number of expansion requests made by this Impala daemon to Llama "
166 "resource-broker.requests-released", 0, TUnit::UNIT,
167 "The number of resource-release requests received from Llama");
170 "resource-broker.memory-resources-in-use", 0L, TUnit::BYTES,
"The total"
171 " number of bytes currently allocated to this Impala daemon by Llama");
174 "resource-broker.vcpu-resources-in-use", 0, TUnit::UNIT,
"The total number "
175 "of vcpus currently allocated to this Impala daemon by Llama");
177 requests_released_metric_ = metrics->
AddCounter<int64_t>(
178 "resource-broker.requests-released", 0, TUnit::UNIT,
"The total number of "
179 "resource allocations released by this Impala daemon");
187 return Status(
"No Llama addresses configured (see --llama_addresses)");
190 shared_ptr<TProcessor> llama_callback_proc(
197 random_generator uuid_generator;
220 while (FLAGS_llama_registration_timeout_secs == -1 ||
221 (now - start) < FLAGS_llama_registration_timeout_secs) {
229 llama_address, &client_status);
230 if (client_status.
ok()) {
232 llama::TLlamaAMRegisterRequest request;
233 request.__set_version(llama::TLlamaServiceVersion::V1);
234 llama::TUniqueId llama_uuid;
236 request.__set_client_id(llama_uuid);
238 llama::TNetworkAddress callback_address;
240 request.__set_notification_callback_service(callback_address);
241 llama::TLlamaAMRegisterResponse response;
242 LOG(INFO) <<
"Registering Resource Broker with Llama at " << llama_address;
244 llama_client.DoRpc(&llama::LlamaAMServiceClient::Register, request, &response);
245 if (rpc_status.
ok()) {
250 response.status,
"Failed to register Resource Broker with Llama."));
251 LOG(INFO) <<
"Received Llama client handle " << response.am_handle
252 << ((response.am_handle ==
llama_handle_) ?
" (same as old)" :
"");
259 LOG(INFO) <<
"Failed to connect to Llama at " << llama_address <<
"." << endl
260 <<
"Error: " << client_status.
GetDetail() << endl
261 <<
"Retrying to connect to Llama at "
263 << FLAGS_llama_registration_wait_secs <<
"s.";
265 SleepForMs(FLAGS_llama_registration_wait_secs * 1000);
268 DCHECK(FLAGS_llama_registration_timeout_secs != -1);
269 if ((now - start) >= FLAGS_llama_registration_timeout_secs) {
270 return Status(
"Failed to (re-)register Resource Broker with Llama.");
291 for (
int i = 0; i < status.error_msgs.size(); ++i) {
292 string error_msg = status.error_msgs[i];
295 LOG(INFO) <<
"Assuming Llama restart from error message: " << status.error_msgs[i];
311 const TResourceBrokerReservationRequest& src,
312 llama::TLlamaAMReservationRequest& dest) {
313 dest.version = llama::TLlamaServiceVersion::V1;
315 dest.gang = src.gang;
318 dest.__set_queue(src.queue);
319 dest.user = src.user;
320 dest.resources = src.resources;
321 random_generator uuid_generator;
322 llama::TUniqueId request_id;
324 dest.__set_reservation_id(request_id);
329 llama::TLlamaAMReleaseRequest& dest) {
330 dest.version = llama::TLlamaServiceVersion::V1;
332 dest.reservation_id << src.reservation_id;
335 template <
typename LlamaReqType,
typename LlamaRespType>
342 bool register_with_llama =
false;
343 while (attempts < FLAGS_llama_max_request_attempts) {
344 if (register_with_llama) {
348 VLOG_RPC <<
"Retrying Llama RPC after re-registration: " << *request;
349 register_with_llama =
false;
355 if (!rpc_status.
ok()) {
356 register_with_llama =
true;
363 }
catch (
const TException& e) {
364 VLOG_RPC <<
"Reopening Llama client due to: " << e.what();
365 rpc_status = llama_client.Reopen();
366 if (!rpc_status.
ok()) {
367 register_with_llama =
true;
370 VLOG_RPC <<
"Retrying Llama RPC: " << *request;
373 if (rpc_time_metric != NULL) {
380 register_with_llama =
true;
382 if (attempts >= FLAGS_llama_max_request_attempts) {
384 "Request aborted after $0 attempts due to connectivity issues with Llama.",
385 FLAGS_llama_max_request_attempts));
390 template <
typename LlamaReqType,
typename LlamaRespType>
393 const LlamaReqType& request, LlamaRespType* response) {
394 DCHECK(
false) <<
"SendLlamaRpc template function must be specialized.";
401 const llama::TLlamaAMGetNodesRequest& request,
402 llama::TLlamaAMGetNodesResponse* response) {
403 DCHECK(response != NULL);
404 (*llama_client)->GetNodes(*response, request);
411 const llama::TLlamaAMReservationRequest& request,
412 llama::TLlamaAMReservationResponse* response) {
413 DCHECK(response != NULL);
414 (*llama_client)->Reserve(*response, request);
421 const llama::TLlamaAMReservationExpansionRequest& request,
422 llama::TLlamaAMReservationExpansionResponse* response) {
423 DCHECK(response != NULL);
424 (*llama_client)->Expand(*response, request);
431 const llama::TLlamaAMReleaseRequest& request,
432 llama::TLlamaAMReleaseResponse* response) {
433 DCHECK(response != NULL);
434 (*llama_client)->Release(*response, request);
437 template <
typename LlamaReqType,
typename LlamaRespType>
439 LlamaRespType* response) {
446 llama::TLlamaAMGetNodesResponse* response) {
454 (*resources)[host] = resource;
455 VLOG_QUERY <<
"Getting allocated resource for reservation id "
461 const vector<llama::TAllocatedResource>& resources) {
464 BOOST_FOREACH(
const llama::TAllocatedResource& resource, resources) {
466 if (resource.reservation_id == request_id()) {
467 allocated_resources_.push_back(resource);
474 bool request_granted = pending_request->
promise()->
Get(timeout, timed_out);
482 if (request_granted && !*timed_out) {
484 int64_t total_memory_mb = 0L;
485 int32_t total_vcpus = 0;
486 BOOST_FOREACH(
const ResourceMap::value_type& resource, *resources) {
487 total_memory_mb += resource.second.memory_mb;
488 total_vcpus += resource.second.v_cpu_cores;
500 return request_granted;
504 TResourceBrokerExpansionResponse* response) {
505 VLOG_RPC <<
"Sending expansion request: " << request;
506 llama::TLlamaAMReservationExpansionRequest ll_request;
507 llama::TLlamaAMReservationExpansionResponse ll_response;
509 ll_request.version = llama::TLlamaServiceVersion::V1;
511 ll_request.expansion_of << request.reservation_id;
512 random_generator uuid_generator;
513 llama::TUniqueId request_id;
515 ll_request.__set_expansion_id(request_id);
516 ll_request.resource = request.resource;
522 new PendingRequest(ll_request.expansion_of, ll_request.expansion_id,
true);
536 if (!request_status.
ok()) {
538 return request_status;
541 bool timed_out =
false;
543 &response->allocated_resources, &timed_out, pending_request);
545 if (request_granted) {
547 response->__set_reservation_id(request.reservation_id);
552 return Status(Substitute(
"Resource expansion request exceeded timeout of $0",
559 if (!request_granted) {
561 return Status(
"Resource expansion request was rejected.");
564 VLOG_QUERY <<
"Fulfilled expansion for id: " << ll_response.expansion_id;
570 TResourceBrokerReservationResponse* response) {
571 VLOG_QUERY <<
"Sending reservation request: " << request;
574 llama::TLlamaAMReservationRequest ll_request;
575 llama::TLlamaAMReservationResponse ll_response;
582 ll_request.reservation_id,
false);
595 if (!request_status.
ok()) {
597 return request_status;
600 VLOG_RPC <<
"Received reservation response from Llama, waiting for notification on: "
603 bool timed_out =
false;
605 &response->allocated_resources, &timed_out, pending_request);
607 if (request_granted || timed_out) {
610 response->__set_reservation_id(
611 CastTUniqueId<llama::TUniqueId, TUniqueId>(pending_request->
reservation_id()));
616 return Status(Substitute(
"Resource expansion request exceeded timeout of $0",
623 if (!request_granted) {
625 return Status(
"Resource reservation request was rejected.");
628 TUniqueId reservation_id;
630 response->__set_reservation_id(reservation_id);
637 bool include_reservation) {
638 int64_t total_memory_bytes = 0L;
639 int32_t total_vcpus = 0L;
640 llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, llama::TUniqueId>(reservation_id);
645 vector<AllocatedRequest>::iterator request_it = it->second.begin();
646 while (request_it != it->second.end()) {
647 DCHECK(request_it->reservation_id() == llama_id);
648 if (!request_it->is_expansion() && !include_reservation) {
653 total_memory_bytes += (request_it->memory_mb() * 1024L * 1024L);
654 total_vcpus += request_it->vcpus();
655 request_it = it->second.erase(request_it);
661 <<
" and " << total_vcpus <<
" cores for " << llama_id;
667 TResourceBrokerReleaseResponse* response) {
668 VLOG_QUERY <<
"Releasing all resources for reservation: " << request.reservation_id;
672 llama::TLlamaAMReleaseRequest llama_request;
673 llama::TLlamaAMReleaseResponse llama_response;
683 llama::TUniqueId reservation_id =
684 CastTUniqueId<TUniqueId, llama::TUniqueId>(request.reservation_id);;
692 llama::TLlamaAMNotificationResponse& response) {
698 VLOG_QUERY <<
"Ignoring Llama AM notification with mismatched AM handle. "
699 <<
"Known handle: " <<
llama_handle_ <<
". Received handle: "
700 << request.am_handle;
706 if (request.heartbeat)
return;
707 VLOG_QUERY <<
"Received non-heartbeat AM notification";
712 BOOST_FOREACH(
const llama::TUniqueId& res_id, request.allocated_reservation_ids) {
717 VLOG_RPC <<
"Allocation for " << res_id <<
" arrived after timeout";
721 LOG(INFO) <<
"Received allocated resource for reservation id: " << res_id;
722 it->second->SetResources(request.allocated_resources);
723 it->second->promise()->Set(
true);
727 BOOST_FOREACH(
const llama::TUniqueId& res_id, request.rejected_reservation_ids) {
730 VLOG_RPC <<
"Rejection for " << res_id <<
" arrived after timeout";
733 it->second->promise()->Set(
false);
739 BOOST_FOREACH(
const llama::TUniqueId& res_id, request.preempted_reservation_ids) {
740 TUniqueId impala_res_id;
741 impala_res_id << res_id;
746 BOOST_FOREACH(
const llama::TUniqueId& res_id, request.preempted_client_resource_ids) {
747 TUniqueId impala_res_id;
748 impala_res_id << res_id;
753 BOOST_FOREACH(
const llama::TUniqueId& res_id, request.lost_client_resource_ids) {
754 TUniqueId impala_res_id;
755 impala_res_id << res_id;
763 llama::TLlamaNMNotificationResponse& response) {
767 llama::TLlamaAMGetNodesRequest llama_request;
769 llama_request.__set_version(llama::TLlamaServiceVersion::V1);
770 llama::TLlamaAMGetNodesResponse llama_response;
775 LOG(INFO) <<
"Llama Nodes [" << join(
llama_nodes_,
", ") <<
"]";
780 const TUniqueId& reservation_id,
const TNetworkAddress& local_resource_address,
784 if (entry->second == NULL) {
788 *mgr = entry->second;
790 return ++entry->first == 1L;
797 <<
"UnregisterQueryResourceMgr() without corresponding GetQueryResourceMgr()";
798 if (--it->second.first == 0) {
799 it->second.second->Shutdown();
800 delete it->second.second;
806 const map<TNetworkAddress, llama::TAllocatedResource>& resources) {
807 typedef map<TNetworkAddress, llama::TAllocatedResource>
ResourceMap;
809 BOOST_FOREACH(
const ResourceMap::value_type& resource, resources) {
810 os <<
"(" << resource.first <<
"," << resource.second <<
")";
811 if (++count != resources.size()) os <<
",";
816 ostream&
operator<<(ostream& os,
const TResourceBrokerReservationRequest& request) {
817 os <<
"Reservation Request("
818 <<
"queue=" << request.queue <<
" "
819 <<
"user=" << request.user <<
" "
820 <<
"gang=" << request.gang <<
" "
821 <<
"request_timeout=" << request.request_timeout <<
" "
823 for (
int i = 0; i < request.resources.size(); ++i) {
824 os << request.resources[i];
825 if (i + 1 != request.resources.size()) os <<
",";
831 ostream&
operator<<(ostream& os,
const TResourceBrokerReservationResponse& reservation) {
832 os <<
"Granted Reservation("
833 <<
"reservation id=" << reservation.reservation_id <<
" "
834 <<
"resources=[" << reservation.allocated_resources <<
"])";
838 ostream&
operator<<(ostream& os,
const TResourceBrokerExpansionRequest& request) {
839 os <<
"Expansion Request("
840 <<
"reservation id=" << request.reservation_id <<
" "
841 <<
"resource=" << request.resource <<
" "
842 <<
"request_timeout=" << request.request_timeout <<
")";
846 ostream&
operator<<(ostream& os,
const TResourceBrokerExpansionResponse& expansion) {
847 os <<
"Expansion Response("
848 <<
"reservation id=" << expansion.reservation_id <<
" "
849 <<
"resources=[" << expansion.allocated_resources <<
"])";
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
const llama::TUniqueId & request_id() const
const llama::TUniqueId & reservation_id() const
static const string LLAMA_KERBEROS_SERVICE_NAME
const std::string GetDetail() const
TNetworkAddress llama_callback_address_
AllocatedRequestMap allocated_requests_
void AMNotification(const llama::TLlamaAMNotificationRequest &request, llama::TLlamaAMNotificationResponse &response)
std::vector< llama::TAllocatedResource > allocated_resources_
IntCounter * reservation_requests_total_metric_
Total number of reservation requests.
IntCounter * reservation_requests_timedout_metric_
boost::mutex llama_registration_lock_
UIntGauge * allocated_memory_metric_
Total amount of memory currently allocated by Llama to this node.
const TUniqueId & query_id() const
Status Reserve(const TResourceBrokerReservationRequest &request, TResourceBrokerReservationResponse *response)
Requests resources from Llama. Blocks until the request has been granted or denied.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
IntCounter * requests_released_metric_
Total number of fulfilled reservation requests that have been released.
M * RegisterMetric(M *metric)
StatsMetric< double > * reservation_rpc_time_metric_
void Close()
Closes the llama_client_cache_ and joins the llama_callback_server_.
IntCounter * expansion_requests_total_metric_
Total number of expansion requests.
virtual ~LlamaNotificationThriftIf()
MetricGroups may be organised hierarchically as a tree.
StatsMetric< double > * expansion_rpc_time_metric_
boost::shared_ptr< llama::LlamaNotificationServiceIf > llama_callback_thrift_iface_
Thrift API implementation which proxies Llama notifications onto this ResourceBroker.
bool is_expansion() const
bool WaitForNotification(int64_t timeout, ResourceMap *resources, bool *timed_out, PendingRequest *reservation)
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
bool GetQueryResourceMgr(const TUniqueId &query_id, const TUniqueId &reservation_id, const TNetworkAddress &local_resource_address, QueryResourceMgr **res_mgr)
ResourceBroker * resource_broker_
UIntGauge * allocated_vcpus_metric_
Total number of vcpu cores currently allocated by Llama to this node.
IntCounter * reservation_requests_failed_metric_
StringProperty * active_llama_metric_
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
void SendLlamaRpc(ClientConnection< llama::LlamaAMServiceClient > *llama_client, const LlamaReqType &request, LlamaRespType *response)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
void ClearRequests(const TUniqueId &reservation_id, bool include_reservation)
boost::mutex pending_requests_lock_
Protects pending_requests_.
IntCounter * expansion_requests_rejected_metric_
Number of well-formed expansion requests rejected by the central scheduler.
bool LlamaHasRestarted(const llama::TStatus &status) const
Detects Llama restarts from the given return status of a Llama RPC.
IntCounter * expansion_requests_failed_metric_
impala::Status LlamaStatusToImpalaStatus(const TStatus &status, const string &err_prefix)
DECLARE_int32(resource_broker_cnxn_attempts)
Status Release(const TResourceBrokerReleaseRequest &request, TResourceBrokerReleaseResponse *response)
IntCounter * reservation_requests_fulfilled_metric_
Number of fulfilled reservation requests.
int active_llama_addr_idx_
PendingRequestMap pending_requests_
SimpleMetric< T, TMetricKind::PROPERTY > * AddProperty(const std::string &key, const T &value, const std::string &description="")
llama::TUniqueId reservation_id_
void CreateLlamaReleaseRequest(const TResourceBrokerReleaseRequest &src, llama::TLlamaAMReleaseRequest &dest)
Creates a Llama release request from a resource broker release request.
void UnregisterQueryResourceMgr(const TUniqueId &query_id)
virtual void NMNotification(llama::TLlamaNMNotificationResponse &response, const llama::TLlamaNMNotificationRequest &request)
StatsMetric< double > * expansion_response_time_metric_
Promise< bool > * promise()
void Update(const T &value)
boost::mutex allocated_requests_lock_
Protectes allocated_requests_.
boost::uuids::uuid llama_client_id_
ResourceBroker(const std::vector< TNetworkAddress > &llama_addresses, const TNetworkAddress &llama_callback_address, MetricGroup *metrics)
virtual void HandlePreemptedResource(const TUniqueId &client_resource_id)=0
uint64_t ElapsedTime() const
Returns time in nanosecond.
virtual void HandlePreemptedReservation(const TUniqueId &reservation_id)=0
IntCounter * expansion_requests_fulfilled_metric_
Number of fulfilled expansion requests.
Status RefreshLlamaNodes()
Retrieves the nodes known to Llama and stores them in llama_nodes_.
StringProperty * active_llama_handle_metric_
IntCounter * expansion_requests_timedout_metric_
LlamaNotificationThriftIf(ResourceBroker *resource_broker)
const string LLAMA_RESTART_SEARCH_STRING
DECLARE_int64(llama_registration_timeout_secs)
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
Status RegisterWithLlama()
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
llama::TUniqueId llama_handle_
Handle received from Llama during registration. Set in RegisterWithLlama().
std::vector< TNetworkAddress > llama_addresses_
Llama availability group.
Status ReRegisterWithLlama(const LlamaReqType &request, LlamaRespType *response)
int64_t MonotonicSeconds()
boost::scoped_ptr< ThriftServer > llama_callback_server_
Status Expand(const TResourceBrokerExpansionRequest &request, TResourceBrokerExpansionResponse *response)
QueryResourceMgrsMap query_resource_mgrs_
boost::mutex query_resource_mgrs_lock_
Protects query_resource_mgrs_.
std::vector< std::string > llama_nodes_
List of nodes registered with Llama. Set in RefreshLlamaNodes().
virtual void AMNotification(llama::TLlamaAMNotificationResponse &response, const llama::TLlamaAMNotificationRequest &request)
IntCounter * reservation_requests_rejected_metric_
Number of well-formed reservation requests rejected by the central scheduler.
Only CPU-heavy threads need be managed using this class.
ostream & operator<<(ostream &os, const map< TNetworkAddress, llama::TAllocatedResource > &resources)
std::map< TNetworkAddress, llama::TAllocatedResource > ResourceMap
void GetResources(ResourceMap *resources)
void NMNotification(const llama::TLlamaNMNotificationRequest &request, llama::TLlamaNMNotificationResponse &response)
StatsMetric< double > * reservation_response_time_metric_
Status LlamaRpc(LlamaReqType *request, LlamaRespType *response, StatsMetric< double > *rpc_time_metric)
boost::scoped_ptr< ClientCache< llama::LlamaAMServiceClient > > llama_client_cache_
Cache of Llama client connections.
void SetResources(const std::vector< llama::TAllocatedResource > &resources)
void CreateLlamaReservationRequest(const TResourceBrokerReservationRequest &src, llama::TLlamaAMReservationRequest &dest)
Creates a Llama reservation request from a resource broker reservation request.