18 #include <boost/algorithm/string/join.hpp>
19 #include <boost/foreach.hpp>
20 #include <boost/uuid/uuid.hpp>
21 #include <boost/uuid/uuid_generators.hpp>
33 using boost::uuids::random_generator;
34 using boost::uuids::uuid;
35 using namespace impala;
37 DEFINE_bool(rm_always_use_defaults,
false,
"If true, all queries use the same initial"
38 " resource requests regardless of their computed resource estimates. Only meaningful "
39 "if --enable_rm is set.");
40 DEFINE_string(rm_default_memory,
"4G",
"The initial amount of memory that"
41 " a query should reserve on each node if either it does not have an available "
42 "estimate, or if --rm_always_use_defaults is set.");
43 DEFINE_int32(rm_default_cpu_vcores, 2,
"The initial number of virtual cores that"
44 " a query should reserve on each node if either it does not have an available "
45 "estimate, or if --rm_always_use_defaults is set.");
60 const TQueryExecRequest& request,
const TQueryOptions& query_options,
65 query_options_(query_options),
66 effective_user_(effective_user),
67 summary_profile_(summary_profile),
75 vector<PlanNodeId> per_node_fragment_idx;
76 for (
int i = 0; i < request.fragments.size(); ++i) {
77 BOOST_FOREACH(
const TPlanNode& node, request.fragments[i].plan.nodes) {
89 DCHECK_GE(total_cluster_mem, 0);
90 return total_cluster_mem;
100 int64_t query_option_memory_limit = numeric_limits<int64_t>::max();
101 bool has_query_option =
false;
104 has_query_option =
true;
107 int64_t estimate_limit = numeric_limits<int64_t>::max();
108 bool has_estimate =
false;
110 estimate_limit =
request_.per_host_mem_req;
114 int64_t per_host_mem = 0L;
117 }
else if (FLAGS_rm_always_use_defaults) {
121 }
else if (has_query_option) {
122 per_host_mem = query_option_memory_limit;
123 }
else if (has_estimate) {
124 per_host_mem = estimate_limit;
142 int16_t v_cpu_cores = FLAGS_rm_default_cpu_vcores;
143 if (!FLAGS_rm_always_use_defaults &&
query_options_.__isset.v_cpu_cores &&
152 TNetworkAddress* dst) {
155 <<
"resource_resolver_ is NULL, didn't call SetUniqueHosts()?";
191 if (memory_mb == 0 && v_cpu_cores == 0)
return;
194 <<
"resource_resolver_ is NULL, didn't call SetUniqueHosts()?";
195 random_generator uuid_generator;
199 uuid
id = uuid_generator();
200 resource.client_resource_id.hi = *
reinterpret_cast<uint64_t*
>(&
id.data[0]);
201 resource.client_resource_id.lo = *
reinterpret_cast<uint64_t*
>(&
id.data[8]);
202 resource.enforcement = llama::TLocationEnforcement::MUST;
204 TNetworkAddress resource_hostport;
207 ss << resource_hostport;
208 resource.askedLocation = ss.str();
209 resource.memory_mb = memory_mb;
210 resource.v_cpu_cores = v_cpu_cores;
216 vector<TNetworkAddress> hosts_missing_resources;
219 BOOST_FOREACH(
const TNetworkAddress& host, params.
hosts) {
222 TNetworkAddress resource_hostport;
224 if (
reservation_.allocated_resources.find(resource_hostport) ==
226 hosts_missing_resources.push_back(host);
230 if (!hosts_missing_resources.empty()) {
232 ss <<
"Failed to validate reservation " <<
reservation_.reservation_id <<
"." << endl
233 <<
"Missing resources for hosts [";
234 for (
int i = 0; i < hosts_missing_resources.size(); ++i) {
235 ss << hosts_missing_resources[i];
236 if (i + 1 != hosts_missing_resources.size()) ss <<
", ";
void GetResourceHostport(const TNetworkAddress &src, TNetworkAddress *dst)
const TQueryOptions & query_options_
const TUniqueId & query_id() const
const int64_t DEFAULT_REQUEST_TIMEOUT_MS
std::vector< TNetworkAddress > hosts
std::vector< int32_t > plan_node_to_fragment_idx_
Maps from plan node id to its fragment index. Filled in c'tor.
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
DEFINE_string(rm_default_memory,"4G","The initial amount of memory that"" a query should reserve on each node if either it does not have an available ""estimate, or if --rm_always_use_defaults is set.")
static int64_t ParseMemSpec(const std::string &mem_spec_str, bool *is_percent, int64_t relative_reference)
Status ValidateReservation()
int64_t GetPerHostMemoryEstimate() const
bool HasReservation() const
void PrepareReservationRequest(const std::string &pool, const std::string &user)
void GetResourceHostport(const TNetworkAddress &src, TNetworkAddress *dst)
QuerySchedule(const TUniqueId &query_id, const TQueryExecRequest &request, const TQueryOptions &query_options, const std::string &effective_user, RuntimeProfile *summary_profile, RuntimeProfile::EventSequence *query_events)
std::vector< FragmentExecParams > fragment_exec_params_
const TQueryExecRequest & request_
void SetUniqueHosts(const boost::unordered_set< TNetworkAddress > &unique_hosts)
DEFINE_bool(rm_always_use_defaults, false,"If true, all queries use the same initial"" resource requests regardless of their computed resource estimates. Only meaningful ""if --enable_rm is set.")
int16_t GetPerHostVCores() const
int64_t GetClusterMemoryEstimate() const
Total estimated memory for all nodes. set_num_hosts() must be set before calling. ...
const boost::unordered_set< TNetworkAddress > & unique_hosts() const
RuntimeProfile::EventSequence * query_events_
Event timeline for this query. Unowned.
TResourceBrokerReservationResponse reservation_
Fulfilled reservation request. Populated by scheduler.
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
TResourceBrokerReservationRequest reservation_request_
Reservation request to be submitted to Llama. Set in PrepareReservationRequest(). ...
boost::unordered_set< TNetworkAddress > unique_hosts_
The set of hosts that the query will run on excluding the coordinator.
string GetShortName(const string &user)
boost::scoped_ptr< ResourceResolver > resource_resolver_