17 #include <boost/foreach.hpp>
18 #include <boost/uuid/uuid.hpp>
19 #include <boost/uuid/uuid_generators.hpp>
20 #include <gutil/strings/substitute.h>
32 using boost::uuids::random_generator;
33 using boost::uuids::uuid;
34 using namespace impala;
35 using namespace strings;
39 DEFINE_double(max_vcore_oversubscription_ratio, 2.5,
"(Advanced) The maximum ratio "
40 "allowed between running threads and acquired VCore resources for a query's fragments"
45 CreateLocalLlamaNodeMapping(unique_hosts);
50 TNetworkAddress* dest) {
52 *dest = impalad_to_dn_[src];
54 dest->hostname = src.hostname;
60 const unordered_set<TNetworkAddress>& unique_hosts) {
62 const vector<string>& llama_nodes =
64 DCHECK(!llama_nodes.empty());
65 int llama_node_ix = 0;
66 BOOST_FOREACH(
const TNetworkAddress& host, unique_hosts) {
68 impalad_to_dn_[host] = dn_hostport;
69 dn_to_impalad_[dn_hostport] = host;
70 LOG(INFO) <<
"Mapping Datanode " << dn_hostport <<
" to Impalad: " << host;
72 llama_node_ix = (llama_node_ix + 1) % llama_nodes.size();
77 const TNetworkAddress& local_resource_location,
const TUniqueId&
query_id)
78 : reservation_id_(reservation_id),
query_id_(query_id),
79 local_resource_location_(local_resource_location), exit_(false), callback_count_(0),
80 threads_running_(0), vcores_(0) {
85 LOG(INFO) <<
"Initialising vcore acquisition thread for query " <<
PrintId(
query_id_)
86 <<
" (" << init_vcores <<
" initial vcores)";
88 <<
"Double initialisation of QueryResourceMgr::InitCpuAcquisition()";
108 TResourceBrokerExpansionRequest* request) {
109 DCHECK(request != NULL);
110 DCHECK(memory_mb > 0 || vcores > 0);
111 DCHECK(
reservation_id_ != TUniqueId()) <<
"Expansion requires existing reservation";
113 unordered_set<TNetworkAddress> hosts;
116 llama::TResource res;
117 res.memory_mb = memory_mb;
118 res.v_cpu_cores = vcores;
119 TNetworkAddress res_address;
123 random_generator uuid_generator;
124 uuid
id = uuid_generator();
125 res.client_resource_id.hi = *
reinterpret_cast<uint64_t*
>(&
id.data[0]);
126 res.client_resource_id.lo = *
reinterpret_cast<uint64_t*
>(&
id.data[8]);
127 res.enforcement = llama::TLocationEnforcement::MUST;
129 request->__set_resource(res);
156 CallbackMap::iterator it =
callbacks_.find(callback_id);
157 DCHECK(it !=
callbacks_.end()) <<
"Could not find callback with id: " << callback_id;
167 VLOG_QUERY <<
"Starting Vcore acquisition for: " << reservation_id;
177 TResourceBrokerExpansionRequest request;
179 TResourceBrokerExpansionResponse response;
183 thread_in_expand->FetchAndUpdate(1L);
186 thread_in_expand->FetchAndUpdate(-1L);
189 if (early_exit->FetchAndUpdate(0L) != 0) {
190 VLOG_QUERY <<
"Fragment finished during Expand(): " << reservation_id;
195 <<
", reservation: " <<
PrintId(reservation_id_) <<
". Error was: "
204 const llama::TAllocatedResource& resource =
205 response.allocated_resources.begin()->second;
206 DCHECK(resource.v_cpu_cores == 1)
207 <<
"Asked for 1 core, got: " << resource.v_cpu_cores;
208 vcores_ += resource.v_cpu_cores;
211 const string& cgroup =
226 VLOG_QUERY <<
"Leaving VCore acquisition thread: " << reservation_id;
void GetResourceHostport(const TNetworkAddress &src, TNetworkAddress *dst)
TNetworkAddress local_resource_location_
int64_t vcores_
The number of VCores acquired for this node for this query.
boost::shared_ptr< AtomicInt< int16_t > > thread_in_expand_
int32_t AddVcoreAvailableCb(const VcoreAvailableCb &callback)
const std::string GetDetail() const
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
int32_t VirtualCoresToCpuShares(int16_t v_cpu_cores)
DEFINE_double(max_vcore_oversubscription_ratio, 2.5,"(Advanced) The maximum ratio ""allowed between running threads and acquired VCore resources for a query's fragments"" on a single node")
ResourceResolver(const boost::unordered_set< TNetworkAddress > &unique_hosts)
bool AboveVcoreSubscriptionThreshold()
boost::mutex exit_lock_
Used to control shutdown of AcquireCpuResources().
TODO: Consider allowing fragment IDs as category parameters.
const TUniqueId & query_id() const
std::string UniqueIdToCgroup(const std::string &unique_id) const
boost::condition_variable threads_changed_cv_
Waited on by AcquireCpuResources(), and notified by NotifyThreadUsageChange().
void CreateLocalLlamaNodeMapping(const boost::unordered_set< TNetworkAddress > &unique_hosts)
Status SetCpuShares(const std::string &cgroup, int32_t num_shares)
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
string PrintId(const TUniqueId &id, const string &separator)
int64_t threads_running_
The number of threads we know to be running on behalf of this query.
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
QueryResourceMgr(const TUniqueId &reservation_id, const TNetworkAddress &local_resource_location, const TUniqueId &query_id)
ResourceBroker * resource_broker()
TUniqueId reservation_id_
ID of the single reservation corresponding to this query.
void ClearRequests(const TUniqueId &reservation_id, bool include_reservation)
CallbackMap::iterator callbacks_it_
Round-robin iterator to notify callbacks about new VCores one at a time.
boost::function< void()> VcoreAvailableCb
CgroupsMgr * cgroups_mgr()
boost::mutex threads_running_lock_
Protects threads_running_, threads_changed_cv_ and vcores_.
~QueryResourceMgr()
Waits for the VCore acquisition thread to stop.
TUniqueId query_id_
Query ID of the query this class manages resources for.
void RemoveVcoreAvailableCb(int32_t callback_id)
Removes the callback with the given ID.
boost::scoped_ptr< Thread > acquire_vcore_thread_
Runs AcquireVcoreResources() after InitVcoreAcquisition() is called.
static ExecEnv * GetInstance()
void AcquireVcoreResources(boost::shared_ptr< AtomicInt< int16_t > > thread_in_expand, boost::shared_ptr< AtomicInt< int16_t > > early_exit)
const int64_t DEFAULT_EXPANSION_REQUEST_TIMEOUT_MS
void NotifyThreadUsageChange(int delta)
Status Expand(const TResourceBrokerExpansionRequest &request, TResourceBrokerExpansionResponse *response)
void InitVcoreAcquisition(int32_t init_vcores)
boost::shared_ptr< AtomicInt< int16_t > > early_exit_
boost::mutex callbacks_lock_
Protects callbacks_ and callbacks_it_.
const std::vector< std::string > & llama_nodes()
Status CreateExpansionRequest(int64_t memory_mb, int64_t vcores, TResourceBrokerExpansionRequest *request)
float max_vcore_oversubscription_ratio_
bool ShouldExit()
Notifies acquire_cpu_thread_ that it should terminate. Does not block.