19 #include <boost/algorithm/string.hpp>
20 #include <boost/algorithm/string/join.hpp>
21 #include <boost/bind.hpp>
22 #include <boost/mem_fn.hpp>
23 #include <boost/foreach.hpp>
24 #include <gutil/strings/substitute.h>
33 #include "gen-cpp/Types_types.h"
34 #include "gen-cpp/ImpalaInternalService_constants.h"
44 #include "gen-cpp/ResourceBrokerService_types.h"
48 using boost::algorithm::join;
49 using namespace apache::thrift;
50 using namespace rapidjson;
51 using namespace strings;
59 DEFINE_bool(disable_admission_control,
true,
"Disables admission control.");
61 DEFINE_bool(require_username,
false,
"Requires that a user be provided in order to "
62 "schedule requests. If enabled and a user is not provided, requests will be "
63 "rejected, otherwise requests without a username will be submitted with the "
64 "username 'default'.");
69 static const string ASSIGNMENTS_KEY(
"simple-scheduler.assignments.total");
77 const string SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC(
"impala-membership");
80 "No mapping found for request from user '$0' with requested pool '$1'");
82 "requested pool '$1' denied access to assigned pool '$2'");
84 "-require_username=true.");
87 const string& backend_id,
const TNetworkAddress& backend_address,
90 : metrics_(metrics->GetChildGroup(
"scheduler")),
91 webserver_(webserver),
92 statestore_subscriber_(subscriber),
93 backend_id_(backend_id),
94 thrift_serializer_(false),
95 total_assignments_(NULL),
96 total_local_assignments_(NULL),
99 resource_broker_(resource_broker),
100 request_pool_service_(request_pool_service) {
101 backend_descriptor_.address = backend_address;
102 next_nonlocal_backend_entry_ = backend_map_.begin();
103 if (FLAGS_disable_admission_control) LOG(INFO) <<
"Admission control is disabled.";
104 if (!FLAGS_disable_admission_control) {
105 admission_controller_.reset(
109 if (FLAGS_enable_rm) {
110 if (FLAGS_rm_default_cpu_vcores <= 0) {
111 LOG(ERROR) <<
"Bad value for --rm_default_cpu_vcores (must be postive): "
112 << FLAGS_rm_default_cpu_vcores;
118 if (mem_bytes <= 1024 * 1024) {
119 LOG(ERROR) <<
"Bad value for --rm_default_memory (must be larger than 1M):"
120 << FLAGS_rm_default_memory;
122 }
else if (is_percent) {
123 LOG(ERROR) <<
"Must use absolute value for --rm_default_memory: "
124 << FLAGS_rm_default_memory;
130 SimpleScheduler::SimpleScheduler(
const vector<TNetworkAddress>&
backends,
131 MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
132 RequestPoolService* request_pool_service)
134 webserver_(webserver),
135 statestore_subscriber_(NULL),
136 thrift_serializer_(false),
137 total_assignments_(NULL),
138 total_local_assignments_(NULL),
141 resource_broker_(resource_broker),
142 request_pool_service_(request_pool_service) {
143 DCHECK(backends.size() > 0);
144 if (FLAGS_disable_admission_control) LOG(INFO) <<
"Admission control is disabled.";
146 if (request_pool_service_ != NULL && !FLAGS_disable_admission_control) {
147 admission_controller_.reset(
148 new AdmissionController(request_pool_service_, metrics, backend_id_));
151 for (
int i = 0; i < backends.size(); ++i) {
152 vector<string> ipaddrs;
155 VLOG(1) <<
"Failed to resolve " << backends[i].hostname <<
": "
156 << status.GetDetail();
162 string ipaddr = ipaddrs[0];
164 VLOG(1) <<
"Only localhost addresses found for " << backends[i].hostname;
167 BackendMap::iterator it = backend_map_.find(ipaddr);
168 if (it == backend_map_.end()) {
169 it = backend_map_.insert(
170 make_pair(ipaddr, list<TBackendDescriptor>())).first;
171 backend_ip_map_[backends[i].hostname] = ipaddr;
174 TBackendDescriptor descriptor;
176 it->second.push_back(descriptor);
178 next_nonlocal_backend_entry_ = backend_map_.begin();
182 LOG(INFO) <<
"Starting simple scheduler";
184 if (webserver_ != NULL) {
186 bind<void>(mem_fn(&SimpleScheduler::BackendsUrlCallback),
this, _1, _2);
191 if (statestore_subscriber_ != NULL) {
193 bind<void>(mem_fn(&SimpleScheduler::UpdateMembership),
this, _1, _2);
194 Status status = statestore_subscriber_->AddTopic(IMPALA_MEMBERSHIP_TOPIC,
true, cb);
196 status.
AddDetail(
"SimpleScheduler failed to register membership topic");
199 if (!FLAGS_disable_admission_control) {
203 if (metrics_ != NULL) {
207 num_backends_metric_ = metrics_->AddGauge<int64_t>(
211 if (statestore_subscriber_ != NULL) {
214 vector<string> ipaddrs;
215 const string& hostname = backend_descriptor_.address.hostname;
218 VLOG(1) <<
"Failed to resolve " << hostname <<
": " << status.
GetDetail();
219 status.
AddDetail(
"SimpleScheduler failed to start");
224 string ipaddr = ipaddrs[0];
226 VLOG(3) <<
"Only localhost addresses found for " << hostname;
229 backend_descriptor_.ip_address = ipaddr;
230 LOG(INFO) <<
"Simple-scheduler using " << ipaddr <<
" as IP address";
232 if (webserver_ != NULL) {
233 const TNetworkAddress& webserver_address = webserver_->http_address();
235 backend_descriptor_.__set_debug_http_address(
238 backend_descriptor_.__set_debug_http_address(webserver_address);
240 backend_descriptor_.__set_secure_webserver(webserver_->IsSecure());
248 const TBackendDescriptor& b) {
253 Document* document) {
255 GetAllKnownBackends(&backends);
256 Value backends_list(kArrayType);
257 BOOST_FOREACH(
const BackendList::value_type& backend, backends) {
259 backends_list.PushBack(str, document->GetAllocator());
262 document->AddMember(
"backends", backends_list, document->GetAllocator());
265 void SimpleScheduler::UpdateMembership(
267 vector<TTopicDelta>* subscriber_topic_updates) {
271 StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
272 incoming_topic_deltas.find(IMPALA_MEMBERSHIP_TOPIC);
274 if (topic != incoming_topic_deltas.end()) {
275 const TTopicDelta& delta = topic->second;
283 lock_guard<mutex> lock(backend_map_lock_);
284 if (!delta.is_delta) {
285 current_membership_.clear();
286 backend_map_.clear();
287 backend_ip_map_.clear();
291 BOOST_FOREACH(
const TTopicItem& item, delta.topic_entries) {
292 TBackendDescriptor be_desc;
295 uint32_t len = item.value.size();
297 item.value.data()), &len,
false, &be_desc);
299 VLOG(2) <<
"Error deserializing membership topic item with key: " << item.key;
302 if (item.key == backend_id_ && be_desc.address != backend_descriptor_.address) {
307 LOG_EVERY_N(WARNING, 30) <<
"Duplicate subscriber registration from address: "
311 list<TBackendDescriptor>* be_descs = &backend_map_[be_desc.ip_address];
312 if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) {
313 backend_map_[be_desc.ip_address].push_back(be_desc);
315 backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
316 current_membership_.insert(make_pair(item.key, be_desc));
319 BOOST_FOREACH(
const string& backend_id, delta.topic_deletions) {
320 if (current_membership_.find(backend_id) != current_membership_.end()) {
321 const TBackendDescriptor& be_desc = current_membership_[backend_id];
322 backend_ip_map_.erase(be_desc.address.hostname);
323 list<TBackendDescriptor>* be_descs = &backend_map_[be_desc.ip_address];
325 remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
326 if (be_descs->empty()) backend_map_.erase(be_desc.ip_address);
327 current_membership_.erase(backend_id);
330 next_nonlocal_backend_entry_ = backend_map_.begin();
335 bool is_offline = ExecEnv::GetInstance()->impala_server()->IsOffline();
337 current_membership_.find(backend_id_) == current_membership_.end()) {
338 VLOG(1) <<
"Registering local backend with statestore";
339 subscriber_topic_updates->push_back(TTopicDelta());
340 TTopicDelta& update = subscriber_topic_updates->back();
341 update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
342 update.topic_entries.push_back(TTopicItem());
344 TTopicItem& item = update.topic_entries.back();
345 item.key = backend_id_;
346 Status status = thrift_serializer_.Serialize(&backend_descriptor_, &item.value);
348 LOG(WARNING) <<
"Failed to serialize Impala backend address for statestore topic: "
350 subscriber_topic_updates->pop_back();
352 }
else if (is_offline &&
353 current_membership_.find(backend_id_) != current_membership_.end()) {
354 LOG(WARNING) <<
"Removing offline ImpalaServer from statestore";
355 subscriber_topic_updates->push_back(TTopicDelta());
356 TTopicDelta& update = subscriber_topic_updates->back();
357 update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
358 update.topic_deletions.push_back(backend_id_);
360 if (metrics_ != NULL) num_backends_metric_->set_value(current_membership_.size());
365 const vector<TNetworkAddress>& data_locations,
BackendList* backendports) {
366 backendports->clear();
367 for (
int i = 0; i < data_locations.size(); ++i) {
368 TBackendDescriptor backend;
369 GetBackend(data_locations[i], &backend);
370 backendports->push_back(backend);
372 DCHECK_EQ(data_locations.size(), backendports->size());
376 Status SimpleScheduler::GetBackend(
const TNetworkAddress& data_location,
377 TBackendDescriptor* backend) {
378 lock_guard<mutex> lock(backend_map_lock_);
379 if (backend_map_.size() == 0) {
380 return Status(
"No backends configured");
382 bool local_assignment =
false;
383 BackendMap::iterator entry = backend_map_.find(data_location.hostname);
385 if (entry == backend_map_.end()) {
389 BackendIpAddressMap::const_iterator itr =
390 backend_ip_map_.find(data_location.hostname);
391 if (itr != backend_ip_map_.end()) {
392 entry = backend_map_.find(itr->second);
396 if (entry == backend_map_.end()) {
398 entry = next_nonlocal_backend_entry_;
399 ++next_nonlocal_backend_entry_;
400 if (next_nonlocal_backend_entry_ == backend_map_.end()) {
401 next_nonlocal_backend_entry_ = backend_map_.begin();
404 local_assignment =
true;
406 DCHECK(!entry->second.empty());
409 *backend = entry->second.front();
410 entry->second.pop_front();
411 entry->second.push_back(*backend);
413 if (metrics_ != NULL) {
414 total_assignments_->Increment(1);
415 if (local_assignment) {
416 total_local_assignments_->Increment(1L);
422 s <<
"(" << data_location;
423 s <<
" -> " << backend->address <<
")";
424 VLOG_FILE <<
"SimpleScheduler assignment (data->backend): " << s.str();
429 void SimpleScheduler::GetAllKnownBackends(
BackendList* backends) {
430 lock_guard<mutex> lock(backend_map_lock_);
432 BOOST_FOREACH(
const BackendMap::value_type& backend_list, backend_map_) {
433 backends->insert(backends->end(), backend_list.second.begin(),
434 backend_list.second.end());
438 Status SimpleScheduler::ComputeScanRangeAssignment(
const TQueryExecRequest& exec_request,
440 map<TPlanNodeId, vector<TScanRangeLocations> >::const_iterator entry;
441 for (entry = exec_request.per_node_scan_ranges.begin();
442 entry != exec_request.per_node_scan_ranges.end(); ++entry) {
444 const TPlanFragment& fragment = exec_request.fragments[fragment_idx];
445 bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
448 &(*schedule->
exec_params())[fragment_idx].scan_range_assignment;
450 entry->first, entry->second, exec_request.host_list, exec_at_coord,
457 Status SimpleScheduler::ComputeScanRangeAssignment(
458 PlanNodeId node_id,
const vector<TScanRangeLocations>& locations,
459 const vector<TNetworkAddress>& host_list,
bool exec_at_coord,
468 bool schedule_with_caching = !query_options.disable_cached_reads;
473 unordered_map<TNetworkAddress, uint64_t> assigned_bytes_per_host;
474 unordered_set<TNetworkAddress> remote_hosts;
475 int64_t remote_bytes = 0L;
476 int64_t local_bytes = 0L;
477 int64_t cached_bytes = 0L;
479 BOOST_FOREACH(
const TScanRangeLocations& scan_range_locations, locations) {
481 uint64_t min_assigned_bytes = numeric_limits<uint64_t>::max();
482 const TNetworkAddress* data_host = NULL;
484 bool is_cached =
false;
487 vector<const TScanRangeLocation*> cached_locations;
488 if (schedule_with_caching) {
489 BOOST_FOREACH(
const TScanRangeLocation& location, scan_range_locations.locations) {
495 if (location.is_cached && HasLocalBackend(host_list[location.host_idx])) {
496 cached_locations.push_back(&location);
501 if (cached_locations.size() == 0) {
502 BOOST_FOREACH(
const TScanRangeLocation& location, scan_range_locations.locations) {
503 DCHECK_LT(location.host_idx, host_list.size());
504 const TNetworkAddress& replica_host = host_list[location.host_idx];
507 HasLocalBackend(replica_host) ? 0L : numeric_limits<int64_t>::max();
509 FindOrInsert(&assigned_bytes_per_host, replica_host, initial_bytes);
511 if (*assigned_bytes < min_assigned_bytes) {
512 min_assigned_bytes = *assigned_bytes;
513 data_host = &replica_host;
514 volume_id = location.volume_id;
520 size_t rand_host = rand() % cached_locations.size();
521 const TNetworkAddress& replica_host = host_list[cached_locations[rand_host]->host_idx];
523 min_assigned_bytes = *
FindOrInsert(&assigned_bytes_per_host, replica_host, initial_bytes);
524 data_host = &replica_host;
525 volume_id = cached_locations[rand_host]->volume_id;
529 int64_t scan_range_length = 0;
530 if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
531 scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
533 bool remote_read = min_assigned_bytes >= numeric_limits<int64_t>::max();
535 remote_bytes += scan_range_length;
536 remote_hosts.insert(*data_host);
538 local_bytes += scan_range_length;
539 if (is_cached) cached_bytes += scan_range_length;
541 assigned_bytes_per_host[*data_host] += scan_range_length;
544 DCHECK(data_host != NULL);
546 TNetworkAddress exec_hostport;
547 if (!exec_at_coord) {
548 TBackendDescriptor backend;
550 exec_hostport = backend.address;
557 vector<TScanRangeParams>* scan_range_params_list =
558 FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
560 TScanRangeParams scan_range_params;
561 scan_range_params.scan_range = scan_range_locations.scan_range;
563 scan_range_params.__set_volume_id(volume_id);
564 scan_range_params.__set_is_cached(is_cached);
565 scan_range_params.__set_is_remote(remote_read);
566 scan_range_params_list->push_back(scan_range_params);
570 VLOG_FILE <<
"Total remote scan volume = " <<
571 PrettyPrinter::Print(remote_bytes, TUnit::BYTES);
572 VLOG_FILE <<
"Total local scan volume = " <<
573 PrettyPrinter::Print(local_bytes, TUnit::BYTES);
574 VLOG_FILE <<
"Total cached scan volume = " <<
575 PrettyPrinter::Print(cached_bytes, TUnit::BYTES);
576 if (remote_hosts.size() > 0) {
577 stringstream remote_node_log;
578 remote_node_log <<
"Remote data node list: ";
579 BOOST_FOREACH(
const TNetworkAddress& remote_host, remote_hosts) {
580 remote_node_log << remote_host <<
" ";
584 BOOST_FOREACH(FragmentScanRangeAssignment::value_type& entry, *assignment) {
585 VLOG_FILE <<
"ScanRangeAssignment: server=" << ThriftDebugString(entry.first);
586 BOOST_FOREACH(PerNodeScanRanges::value_type& per_node_scan_ranges, entry.second) {
588 BOOST_FOREACH(TScanRangeParams& params, per_node_scan_ranges.second) {
589 str << ThriftDebugString(params) <<
" ";
591 VLOG_FILE <<
"node_id=" << per_node_scan_ranges.first <<
" ranges=" << str.str();
599 void SimpleScheduler::ComputeFragmentExecParams(
const TQueryExecRequest& exec_request,
601 vector<FragmentExecParams>* fragment_exec_params = schedule->
exec_params();
603 int64_t num_backends = 0;
605 for (
int j = 0; j < params.
hosts.size(); ++j) {
606 int instance_num = num_backends + j;
608 TUniqueId instance_id;
609 instance_id.hi = schedule->
query_id().hi;
611 schedule->
query_id().lo, numeric_limits<int64_t>::max() - instance_num - 1);
612 instance_id.lo = schedule->
query_id().lo + instance_num + 1;
615 num_backends += params.
hosts.size();
617 if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) {
625 for (
int i = 1; i < fragment_exec_params->size(); ++i) {
627 int dest_fragment_idx = exec_request.dest_fragment_idx[i - 1];
628 DCHECK_LT(dest_fragment_idx, fragment_exec_params->size());
632 DCHECK(exec_request.fragments[i].output_sink.__isset.stream_sink);
633 const TDataStreamSink& sink = exec_request.fragments[i].output_sink.stream_sink;
636 DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
637 || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
638 || sink.output_partition.type == TPartitionType::RANDOM);
642 params.
sender_id_base = dest_params.per_exch_num_senders[exch_id];
643 dest_params.per_exch_num_senders[exch_id] += params.
hosts.size();
647 for (
int j = 0; j < dest_params.hosts.size(); ++j) {
648 TPlanFragmentDestination& dest = params.
destinations[j];
649 dest.fragment_instance_id = dest_params.instance_ids[j];
650 dest.server = dest_params.hosts[j];
651 VLOG_RPC <<
"dest for fragment " << i <<
":"
652 <<
" instance_id=" << dest.fragment_instance_id
653 <<
" server=" << dest.server;
658 void SimpleScheduler::ComputeFragmentHosts(
const TQueryExecRequest& exec_request,
660 vector<FragmentExecParams>* fragment_exec_params = schedule->
exec_params();
662 DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
663 vector<TPlanNodeType::type> scan_node_types;
664 scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE);
665 scan_node_types.push_back(TPlanNodeType::HBASE_SCAN_NODE);
666 scan_node_types.push_back(TPlanNodeType::DATA_SOURCE_NODE);
670 for (
int i = exec_request.fragments.size() - 1; i >= 0; --i) {
671 const TPlanFragment& fragment = exec_request.fragments[i];
673 if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
675 params.
hosts.push_back(coord);
685 if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
686 vector<TPlanNodeId> scan_nodes;
687 FindNodes(fragment.plan, scan_node_types, &scan_nodes);
688 vector<TPlanNodeId> exch_nodes;
689 FindNodes(fragment.plan,
690 vector<TPlanNodeType::type>(1, TPlanNodeType::EXCHANGE_NODE),
694 vector<TNetworkAddress> scan_hosts;
695 for (
int j = 0; j < scan_nodes.size(); ++j) {
696 GetScanHosts(scan_nodes[j], exec_request, params, &scan_hosts);
698 unordered_set<TNetworkAddress> hosts(scan_hosts.begin(), scan_hosts.end());
701 for (
int j = 0; j < exch_nodes.size(); ++j) {
702 int input_fragment_idx = FindSenderFragment(exch_nodes[j], i, exec_request);
703 const vector<TNetworkAddress>& input_fragment_hosts =
704 (*fragment_exec_params)[input_fragment_idx].hosts;
705 hosts.insert(input_fragment_hosts.begin(), input_fragment_hosts.end());
707 DCHECK(!hosts.empty()) <<
"no hosts for fragment " << i <<
" with a UnionNode";
709 params.
hosts.assign(hosts.begin(), hosts.end());
713 PlanNodeId leftmost_scan_id = FindLeftmostNode(fragment.plan, scan_node_types);
714 if (leftmost_scan_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
718 int input_fragment_idx = FindLeftmostInputFragment(i, exec_request);
719 DCHECK_GE(input_fragment_idx, 0);
720 DCHECK_LT(input_fragment_idx, fragment_exec_params->size());
721 params.
hosts = (*fragment_exec_params)[input_fragment_idx].hosts;
729 GetScanHosts(leftmost_scan_id, exec_request, params, ¶ms.
hosts);
732 unordered_set<TNetworkAddress> unique_hosts;
734 unique_hosts.insert(exec_params.
hosts.begin(), exec_params.
hosts.end());
741 const TPlan& plan,
const vector<TPlanNodeType::type>& types) {
744 while (node_idx < plan.nodes.size() && plan.nodes[node_idx].num_children != 0) {
747 if (node_idx == plan.nodes.size()) {
748 return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
750 const TPlanNode& node = plan.nodes[node_idx];
752 for (
int i = 0; i < types.size(); ++i) {
753 if (node.node_type == types[i])
return node.node_id;
755 return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
758 bool SimpleScheduler::ContainsNode(
const TPlan& plan, TPlanNodeType::type type) {
759 for (
int i = 0; i < plan.nodes.size(); ++i) {
760 if (plan.nodes[i].node_type == type)
return true;
765 void SimpleScheduler::FindNodes(
const TPlan& plan,
766 const vector<TPlanNodeType::type>& types, vector<TPlanNodeId>* results) {
767 for (
int i = 0; i < plan.nodes.size(); ++i) {
768 for (
int j = 0; j < types.size(); ++j) {
769 if (plan.nodes[i].node_type == types[j]) {
770 results->push_back(plan.nodes[i].node_id);
777 void SimpleScheduler::GetScanHosts(TPlanNodeId scan_id,
779 vector<TNetworkAddress>* scan_hosts) {
780 map<TPlanNodeId, vector<TScanRangeLocations> >::const_iterator entry =
781 exec_request.per_node_scan_ranges.find(scan_id);
782 if (entry == exec_request.per_node_scan_ranges.end() || entry->second.empty()) {
792 BOOST_FOREACH(
const FragmentScanRangeAssignment::value_type& scan_range_assignment,
794 scan_hosts->push_back(scan_range_assignment.first);
798 int SimpleScheduler::FindLeftmostInputFragment(
799 int fragment_idx,
const TQueryExecRequest& exec_request) {
801 vector<TPlanNodeType::type> exch_node_type;
802 exch_node_type.push_back(TPlanNodeType::EXCHANGE_NODE);
804 FindLeftmostNode(exec_request.fragments[fragment_idx].plan, exch_node_type);
805 if (exch_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
806 return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
809 return FindSenderFragment(exch_id, fragment_idx, exec_request);
812 int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id,
int fragment_idx,
813 const TQueryExecRequest& exec_request) {
814 for (
int i = 0; i < exec_request.dest_fragment_idx.size(); ++i) {
815 if (exec_request.dest_fragment_idx[i] != fragment_idx)
continue;
816 const TPlanFragment& input_fragment = exec_request.fragments[i + 1];
817 DCHECK(input_fragment.__isset.output_sink);
818 DCHECK(input_fragment.output_sink.__isset.stream_sink);
819 if (input_fragment.output_sink.stream_sink.dest_node_id == exch_id)
return i + 1;
822 DCHECK(
false) <<
"no fragment sends to exch id " << exch_id;
823 return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
826 Status SimpleScheduler::GetRequestPool(
const string& user,
827 const TQueryOptions& query_options,
string*
pool)
const {
828 TResolveRequestPoolResult resolve_pool_result;
829 const string& configured_pool = query_options.request_pool;
830 RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(configured_pool, user,
831 &resolve_pool_result));
833 return Status(join(resolve_pool_result.status.error_msgs,
"; "));
835 if (resolve_pool_result.resolved_pool.empty()) {
839 if (!resolve_pool_result.has_access) {
841 configured_pool, resolve_pool_result.resolved_pool));
843 *pool = resolve_pool_result.resolved_pool;
851 VLOG(2) <<
"No user specified: using user=default";
855 VLOG(3) <<
"user='" << user <<
"'";
861 schedule->
set_num_hosts(max(num_backends_metric_->value(), 1L));
863 if (!FLAGS_disable_admission_control) {
866 if (ExecEnv::GetInstance()->impala_server()->IsOffline()) {
867 return Status(
"This Impala server is offine. Please retry your query later.");
871 ComputeFragmentHosts(schedule->
request(), schedule);
872 ComputeFragmentExecParams(schedule->
request(), schedule);
875 const TResourceBrokerReservationRequest& reservation_request =
877 if (!reservation_request.resources.empty()) {
878 Status status = resource_broker_->Reserve(
882 const TQueryCtx& query_ctx = schedule->
request().query_ctx;
883 if(!query_ctx.__isset.parent_query_id &&
884 query_ctx.__isset.tables_missing_stats &&
885 !query_ctx.tables_missing_stats.empty()) {
891 AddToActiveResourceMaps(*schedule->
reservation(), coord);
897 if (!FLAGS_disable_admission_control) {
901 DCHECK(resource_broker_ != NULL);
902 TResourceBrokerReleaseRequest request;
903 TResourceBrokerReleaseResponse response;
904 request.reservation_id = schedule->
reservation()->reservation_id;
905 resource_broker_->Release(request, &response);
908 RemoveFromActiveResourceMaps(*schedule->
reservation());
910 return Status(join(response.status.error_msgs,
", "));
916 void SimpleScheduler::AddToActiveResourceMaps(
917 const TResourceBrokerReservationResponse& reservation, Coordinator* coord) {
918 lock_guard<mutex> l(active_resources_lock_);
919 active_reservations_[reservation.reservation_id] = coord;
920 map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
921 for (iter = reservation.allocated_resources.begin();
922 iter != reservation.allocated_resources.end();
924 TUniqueId client_resource_id;
925 client_resource_id << iter->second.client_resource_id;
926 active_client_resources_[client_resource_id] = coord;
930 void SimpleScheduler::RemoveFromActiveResourceMaps(
931 const TResourceBrokerReservationResponse& reservation) {
932 lock_guard<mutex> l(active_resources_lock_);
933 active_reservations_.erase(reservation.reservation_id);
934 map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
935 for (iter = reservation.allocated_resources.begin();
936 iter != reservation.allocated_resources.end();
938 TUniqueId client_resource_id;
939 client_resource_id << iter->second.client_resource_id;
940 active_client_resources_.erase(client_resource_id);
944 void SimpleScheduler::HandlePreemptedReservation(
const TUniqueId& reservation_id) {
945 Coordinator* coord = NULL;
947 lock_guard<mutex> l(active_resources_lock_);
948 ActiveReservationsMap::iterator it = active_reservations_.find(reservation_id);
949 if (it != active_reservations_.end()) coord = it->second;
952 LOG(WARNING) <<
"Ignoring preempted reservation id " << reservation_id
953 <<
" because no active query using it was found.";
955 stringstream err_msg;
956 err_msg <<
"Reservation " << reservation_id <<
" was preempted";
957 Status status(err_msg.str());
958 coord->Cancel(&status);
962 void SimpleScheduler::HandlePreemptedResource(
const TUniqueId& client_resource_id) {
963 Coordinator* coord = NULL;
965 lock_guard<mutex> l(active_resources_lock_);
966 ActiveClientResourcesMap::iterator it =
967 active_client_resources_.find(client_resource_id);
968 if (it != active_client_resources_.end()) coord = it->second;
971 LOG(WARNING) <<
"Ignoring preempted client resource id " << client_resource_id
972 <<
" because no active query using it was found.";
974 stringstream err_msg;
975 err_msg <<
"Resource " << client_resource_id <<
" was preempted";
976 Status status(err_msg.str());
981 void SimpleScheduler::HandleLostResource(
const TUniqueId& client_resource_id) {
982 Coordinator* coord = NULL;
984 lock_guard<mutex> l(active_resources_lock_);
985 ActiveClientResourcesMap::iterator it =
986 active_client_resources_.find(client_resource_id);
987 if (it != active_client_resources_.end()) coord = it->second;
990 LOG(WARNING) <<
"Ignoring lost client resource id " << client_resource_id
991 <<
" because no active query using it was found.";
993 stringstream err_msg;
994 err_msg <<
"Resource " << client_resource_id <<
" was lost";
995 Status status(err_msg.str());
static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized")
string GetTablesMissingStatsWarning(const vector< TTableName > &tables_missing_stats)
static const string DEFAULT_USER("default")
static const string BACKENDS_WEB_PAGE
const std::string GetDetail() const
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
bool FindFirstNonLocalhost(const vector< string > &addresses, string *addr)
static list< string > backends
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
void set_request_pool(const std::string &pool_name)
std::vector< TNetworkAddress > hosts
int32_t GetFragmentIdx(PlanNodeId id) const
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
MetricGroups may be organised hierarchically as a tree.
boost::unordered_map< TNetworkAddress, PerNodeScanRanges > FragmentScanRangeAssignment
static int64_t ParseMemSpec(const std::string &mem_spec_str, bool *is_percent, int64_t relative_reference)
std::vector< TPlanFragmentDestination > destinations
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Status ValidateReservation()
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
bool TBackendDescriptorComparator(const TBackendDescriptor &a, const TBackendDescriptor &b)
static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total")
void PrepareReservationRequest(const std::string &pool, const std::string &user)
bool TNetworkAddressComparator(const TNetworkAddress &a, const TNetworkAddress &b)
void set_num_backends(int64_t num_backends)
bool NeedsRelease() const
std::map< std::string, std::string > ArgumentMap
void AddScanRanges(int64_t delta)
Helper methods used by scheduler to populate this QuerySchedule.
const TUniqueId & query_id() const
FragmentScanRangeAssignment scan_range_assignment
V * FindOrInsert(std::map< K, V > *m, const K &key, const V &default_val)
std::map< TPlanNodeId, std::vector< TScanRangeParams > > PerNodeScanRanges
map from scan node id to a list of scan ranges
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
void set_num_hosts(int64_t num_hosts)
std::vector< FragmentExecParams > * exec_params()
bool IsWildcardAddress(const string &ipaddress)
void SetUniqueHosts(const boost::unordered_set< TNetworkAddress > &unique_hosts)
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
Status HostnameToIpAddrs(const string &name, vector< string > *addresses)
static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends")
static const string ERROR_USER_TO_POOL_MAPPING_NOT_FOUND("No mapping found for request from user '$0' with requested pool '$1'")
const TResourceBrokerReservationRequest & reservation_request() const
static const string ERROR_USER_NOT_SPECIFIED("User must be specified because ""-require_username=true.")
const std::string & effective_user() const
DEFINE_bool(disable_admission_control, true,"Disables admission control.")
static const string BACKENDS_TEMPLATE
bool DeserializeThriftMsg(uint8_t *buf, uint32_t *len, bool compact, T *deserialized_msg)
TResourceBrokerReservationResponse * reservation()
static const string ERROR_USER_NOT_ALLOWED_IN_POOL("Request from user '$0' with ""requested pool '$1' denied access to assigned pool '$2'")
const TQueryOptions & query_options() const
std::vector< TBackendDescriptor > BackendList
List of server descriptors.
std::vector< TUniqueId > instance_ids
static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total")
const TQueryExecRequest & request() const