Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
simple-scheduler.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <vector>
18 
19 #include <boost/algorithm/string.hpp>
20 #include <boost/algorithm/string/join.hpp>
21 #include <boost/bind.hpp>
22 #include <boost/mem_fn.hpp>
23 #include <boost/foreach.hpp>
24 #include <gutil/strings/substitute.h>
25 
26 #include "common/logging.h"
27 #include "util/metrics.h"
28 #include "runtime/exec-env.h"
29 #include "runtime/coordinator.h"
30 #include "service/impala-server.h"
31 
33 #include "gen-cpp/Types_types.h"
34 #include "gen-cpp/ImpalaInternalService_constants.h"
35 
36 #include "util/network-util.h"
37 #include "util/uid-util.h"
38 #include "util/container-util.h"
39 #include "util/debug-util.h"
40 #include "util/error-util.h"
41 #include "util/llama-util.h"
42 #include "util/mem-info.h"
43 #include "util/parse-util.h"
44 #include "gen-cpp/ResourceBrokerService_types.h"
45 
46 #include "common/names.h"
47 
48 using boost::algorithm::join;
49 using namespace apache::thrift;
50 using namespace rapidjson;
51 using namespace strings;
52 
53 DECLARE_int32(be_port);
54 DECLARE_string(hostname);
55 DECLARE_bool(enable_rm);
56 DECLARE_int32(rm_default_cpu_vcores);
57 DECLARE_string(rm_default_memory);
58 
59 DEFINE_bool(disable_admission_control, true, "Disables admission control.");
60 
61 DEFINE_bool(require_username, false, "Requires that a user be provided in order to "
62  "schedule requests. If enabled and a user is not provided, requests will be "
63  "rejected, otherwise requests without a username will be submitted with the "
64  "username 'default'.");
65 
66 namespace impala {
67 
68 static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
69 static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
70 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
71 static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
72 static const string DEFAULT_USER("default");
73 
74 static const string BACKENDS_WEB_PAGE = "/backends";
75 static const string BACKENDS_TEMPLATE = "backends.tmpl";
76 
77 const string SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
78 
79 static const string ERROR_USER_TO_POOL_MAPPING_NOT_FOUND(
80  "No mapping found for request from user '$0' with requested pool '$1'");
81 static const string ERROR_USER_NOT_ALLOWED_IN_POOL("Request from user '$0' with "
82  "requested pool '$1' denied access to assigned pool '$2'");
83 static const string ERROR_USER_NOT_SPECIFIED("User must be specified because "
84  "-require_username=true.");
85 
86 SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
87  const string& backend_id, const TNetworkAddress& backend_address,
88  MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
89  RequestPoolService* request_pool_service)
90  : metrics_(metrics->GetChildGroup("scheduler")),
91  webserver_(webserver),
92  statestore_subscriber_(subscriber),
93  backend_id_(backend_id),
94  thrift_serializer_(false),
95  total_assignments_(NULL),
96  total_local_assignments_(NULL),
97  initialised_(NULL),
98  update_count_(0),
99  resource_broker_(resource_broker),
100  request_pool_service_(request_pool_service) {
101  backend_descriptor_.address = backend_address;
102  next_nonlocal_backend_entry_ = backend_map_.begin();
103  if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
104  if (!FLAGS_disable_admission_control) {
105  admission_controller_.reset(
106  new AdmissionController(request_pool_service_, metrics, backend_id_));
107  }
108 
109  if (FLAGS_enable_rm) {
110  if (FLAGS_rm_default_cpu_vcores <= 0) {
111  LOG(ERROR) << "Bad value for --rm_default_cpu_vcores (must be postive): "
112  << FLAGS_rm_default_cpu_vcores;
113  exit(1);
114  }
115  bool is_percent;
116  int64_t mem_bytes =
117  ParseUtil::ParseMemSpec(FLAGS_rm_default_memory, &is_percent, MemInfo::physical_mem());
118  if (mem_bytes <= 1024 * 1024) {
119  LOG(ERROR) << "Bad value for --rm_default_memory (must be larger than 1M):"
120  << FLAGS_rm_default_memory;
121  exit(1);
122  } else if (is_percent) {
123  LOG(ERROR) << "Must use absolute value for --rm_default_memory: "
124  << FLAGS_rm_default_memory;
125  exit(1);
126  }
127  }
128 }
129 
130 SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends,
131  MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
132  RequestPoolService* request_pool_service)
133  : metrics_(metrics),
134  webserver_(webserver),
135  statestore_subscriber_(NULL),
136  thrift_serializer_(false),
137  total_assignments_(NULL),
138  total_local_assignments_(NULL),
139  initialised_(NULL),
140  update_count_(0),
141  resource_broker_(resource_broker),
142  request_pool_service_(request_pool_service) {
143  DCHECK(backends.size() > 0);
144  if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
145  // request_pool_service_ may be null in unit tests
146  if (request_pool_service_ != NULL && !FLAGS_disable_admission_control) {
147  admission_controller_.reset(
148  new AdmissionController(request_pool_service_, metrics, backend_id_));
149  }
150 
151  for (int i = 0; i < backends.size(); ++i) {
152  vector<string> ipaddrs;
153  Status status = HostnameToIpAddrs(backends[i].hostname, &ipaddrs);
154  if (!status.ok()) {
155  VLOG(1) << "Failed to resolve " << backends[i].hostname << ": "
156  << status.GetDetail();
157  continue;
158  }
159 
160  // Try to find a non-localhost address, otherwise just use the
161  // first IP address returned.
162  string ipaddr = ipaddrs[0];
163  if (!FindFirstNonLocalhost(ipaddrs, &ipaddr)) {
164  VLOG(1) << "Only localhost addresses found for " << backends[i].hostname;
165  }
166 
167  BackendMap::iterator it = backend_map_.find(ipaddr);
168  if (it == backend_map_.end()) {
169  it = backend_map_.insert(
170  make_pair(ipaddr, list<TBackendDescriptor>())).first;
171  backend_ip_map_[backends[i].hostname] = ipaddr;
172  }
173 
174  TBackendDescriptor descriptor;
175  descriptor.address = MakeNetworkAddress(ipaddr, backends[i].port);
176  it->second.push_back(descriptor);
177  }
178  next_nonlocal_backend_entry_ = backend_map_.begin();
179 }
180 
181 Status SimpleScheduler::Init() {
182  LOG(INFO) << "Starting simple scheduler";
183 
184  if (webserver_ != NULL) {
185  Webserver::UrlCallback backends_callback =
186  bind<void>(mem_fn(&SimpleScheduler::BackendsUrlCallback), this, _1, _2);
187  webserver_->RegisterUrlCallback(BACKENDS_WEB_PAGE, BACKENDS_TEMPLATE,
188  backends_callback);
189  }
190 
191  if (statestore_subscriber_ != NULL) {
193  bind<void>(mem_fn(&SimpleScheduler::UpdateMembership), this, _1, _2);
194  Status status = statestore_subscriber_->AddTopic(IMPALA_MEMBERSHIP_TOPIC, true, cb);
195  if (!status.ok()) {
196  status.AddDetail("SimpleScheduler failed to register membership topic");
197  return status;
198  }
199  if (!FLAGS_disable_admission_control) {
200  RETURN_IF_ERROR(admission_controller_->Init(statestore_subscriber_));
201  }
202  }
203  if (metrics_ != NULL) {
204  total_assignments_ = metrics_->AddCounter(ASSIGNMENTS_KEY, 0L);
205  total_local_assignments_ = metrics_->AddCounter(LOCAL_ASSIGNMENTS_KEY, 0L);
206  initialised_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
207  num_backends_metric_ = metrics_->AddGauge<int64_t>(
208  NUM_BACKENDS_KEY, backend_map_.size());
209  }
210 
211  if (statestore_subscriber_ != NULL) {
212  // Figure out what our IP address is, so that each subscriber
213  // doesn't have to resolve it on every heartbeat.
214  vector<string> ipaddrs;
215  const string& hostname = backend_descriptor_.address.hostname;
216  Status status = HostnameToIpAddrs(hostname, &ipaddrs);
217  if (!status.ok()) {
218  VLOG(1) << "Failed to resolve " << hostname << ": " << status.GetDetail();
219  status.AddDetail("SimpleScheduler failed to start");
220  return status;
221  }
222  // Find a non-localhost address for this host; if one can't be
223  // found use the first address returned by HostnameToIpAddrs
224  string ipaddr = ipaddrs[0];
225  if (!FindFirstNonLocalhost(ipaddrs, &ipaddr)) {
226  VLOG(3) << "Only localhost addresses found for " << hostname;
227  }
228 
229  backend_descriptor_.ip_address = ipaddr;
230  LOG(INFO) << "Simple-scheduler using " << ipaddr << " as IP address";
231 
232  if (webserver_ != NULL) {
233  const TNetworkAddress& webserver_address = webserver_->http_address();
234  if (IsWildcardAddress(webserver_address.hostname)) {
235  backend_descriptor_.__set_debug_http_address(
236  MakeNetworkAddress(ipaddr, webserver_address.port));
237  } else {
238  backend_descriptor_.__set_debug_http_address(webserver_address);
239  }
240  backend_descriptor_.__set_secure_webserver(webserver_->IsSecure());
241  }
242  }
243  return Status::OK;
244 }
245 
246 // Utility method to help sort backends by ascending network address
247 bool TBackendDescriptorComparator(const TBackendDescriptor& a,
248  const TBackendDescriptor& b) {
249  return TNetworkAddressComparator(a.address, b.address);
250 }
251 
252 void SimpleScheduler::BackendsUrlCallback(const Webserver::ArgumentMap& args,
253  Document* document) {
255  GetAllKnownBackends(&backends);
256  Value backends_list(kArrayType);
257  BOOST_FOREACH(const BackendList::value_type& backend, backends) {
258  Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator());
259  backends_list.PushBack(str, document->GetAllocator());
260  }
261 
262  document->AddMember("backends", backends_list, document->GetAllocator());
263 }
264 
265 void SimpleScheduler::UpdateMembership(
266  const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
267  vector<TTopicDelta>* subscriber_topic_updates) {
268  ++update_count_;
269  // TODO: Work on a copy if possible, or at least do resolution as a separate step
270  // First look to see if the topic(s) we're interested in have an update
271  StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
272  incoming_topic_deltas.find(IMPALA_MEMBERSHIP_TOPIC);
273 
274  if (topic != incoming_topic_deltas.end()) {
275  const TTopicDelta& delta = topic->second;
276 
277  // This function needs to handle both delta and non-delta updates. For delta
278  // updates, it is desireable to minimize the number of copies to only
279  // the added/removed items. To accomplish this, all updates are processed
280  // under a lock and applied to the shared backend maps (backend_map_ and
281  // backend_ip_map_) in place.
282  {
283  lock_guard<mutex> lock(backend_map_lock_);
284  if (!delta.is_delta) {
285  current_membership_.clear();
286  backend_map_.clear();
287  backend_ip_map_.clear();
288  }
289 
290  // Process new entries to the topic
291  BOOST_FOREACH(const TTopicItem& item, delta.topic_entries) {
292  TBackendDescriptor be_desc;
293  // Benchmarks have suggested that this method can deserialize
294  // ~10m messages per second, so no immediate need to consider optimisation.
295  uint32_t len = item.value.size();
296  Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
297  item.value.data()), &len, false, &be_desc);
298  if (!status.ok()) {
299  VLOG(2) << "Error deserializing membership topic item with key: " << item.key;
300  continue;
301  }
302  if (item.key == backend_id_ && be_desc.address != backend_descriptor_.address) {
303  // Someone else has registered this subscriber ID with a
304  // different address. We will try to re-register
305  // (i.e. overwrite their subscription), but there is likely
306  // a configuration problem.
307  LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
308  << be_desc.address;
309  }
310 
311  list<TBackendDescriptor>* be_descs = &backend_map_[be_desc.ip_address];
312  if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) {
313  backend_map_[be_desc.ip_address].push_back(be_desc);
314  }
315  backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
316  current_membership_.insert(make_pair(item.key, be_desc));
317  }
318  // Process deletions from the topic
319  BOOST_FOREACH(const string& backend_id, delta.topic_deletions) {
320  if (current_membership_.find(backend_id) != current_membership_.end()) {
321  const TBackendDescriptor& be_desc = current_membership_[backend_id];
322  backend_ip_map_.erase(be_desc.address.hostname);
323  list<TBackendDescriptor>* be_descs = &backend_map_[be_desc.ip_address];
324  be_descs->erase(
325  remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
326  if (be_descs->empty()) backend_map_.erase(be_desc.ip_address);
327  current_membership_.erase(backend_id);
328  }
329  }
330  next_nonlocal_backend_entry_ = backend_map_.begin();
331  }
332 
333  // If this impalad is not in our view of the membership list, we should add it and
334  // tell the statestore.
335  bool is_offline = ExecEnv::GetInstance()->impala_server()->IsOffline();
336  if (!is_offline &&
337  current_membership_.find(backend_id_) == current_membership_.end()) {
338  VLOG(1) << "Registering local backend with statestore";
339  subscriber_topic_updates->push_back(TTopicDelta());
340  TTopicDelta& update = subscriber_topic_updates->back();
341  update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
342  update.topic_entries.push_back(TTopicItem());
343 
344  TTopicItem& item = update.topic_entries.back();
345  item.key = backend_id_;
346  Status status = thrift_serializer_.Serialize(&backend_descriptor_, &item.value);
347  if (!status.ok()) {
348  LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic: "
349  << status.GetDetail();
350  subscriber_topic_updates->pop_back();
351  }
352  } else if (is_offline &&
353  current_membership_.find(backend_id_) != current_membership_.end()) {
354  LOG(WARNING) << "Removing offline ImpalaServer from statestore";
355  subscriber_topic_updates->push_back(TTopicDelta());
356  TTopicDelta& update = subscriber_topic_updates->back();
357  update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
358  update.topic_deletions.push_back(backend_id_);
359  }
360  if (metrics_ != NULL) num_backends_metric_->set_value(current_membership_.size());
361  }
362 }
363 
364 Status SimpleScheduler::GetBackends(
365  const vector<TNetworkAddress>& data_locations, BackendList* backendports) {
366  backendports->clear();
367  for (int i = 0; i < data_locations.size(); ++i) {
368  TBackendDescriptor backend;
369  GetBackend(data_locations[i], &backend);
370  backendports->push_back(backend);
371  }
372  DCHECK_EQ(data_locations.size(), backendports->size());
373  return Status::OK;
374 }
375 
376 Status SimpleScheduler::GetBackend(const TNetworkAddress& data_location,
377  TBackendDescriptor* backend) {
378  lock_guard<mutex> lock(backend_map_lock_);
379  if (backend_map_.size() == 0) {
380  return Status("No backends configured");
381  }
382  bool local_assignment = false;
383  BackendMap::iterator entry = backend_map_.find(data_location.hostname);
384 
385  if (entry == backend_map_.end()) {
386  // backend_map_ maps ip address to backend but
387  // data_location.hostname might be a hostname.
388  // Find the ip address of the data_location from backend_ip_map_.
389  BackendIpAddressMap::const_iterator itr =
390  backend_ip_map_.find(data_location.hostname);
391  if (itr != backend_ip_map_.end()) {
392  entry = backend_map_.find(itr->second);
393  }
394  }
395 
396  if (entry == backend_map_.end()) {
397  // round robin the ipaddress
398  entry = next_nonlocal_backend_entry_;
399  ++next_nonlocal_backend_entry_;
400  if (next_nonlocal_backend_entry_ == backend_map_.end()) {
401  next_nonlocal_backend_entry_ = backend_map_.begin();
402  }
403  } else {
404  local_assignment = true;
405  }
406  DCHECK(!entry->second.empty());
407  // Round-robin between impalads on the same ipaddress.
408  // Pick the first one, then move it to the back of the queue
409  *backend = entry->second.front();
410  entry->second.pop_front();
411  entry->second.push_back(*backend);
412 
413  if (metrics_ != NULL) {
414  total_assignments_->Increment(1);
415  if (local_assignment) {
416  total_local_assignments_->Increment(1L);
417  }
418  }
419 
420  if (VLOG_FILE_IS_ON) {
421  stringstream s;
422  s << "(" << data_location;
423  s << " -> " << backend->address << ")";
424  VLOG_FILE << "SimpleScheduler assignment (data->backend): " << s.str();
425  }
426  return Status::OK;
427 }
428 
429 void SimpleScheduler::GetAllKnownBackends(BackendList* backends) {
430  lock_guard<mutex> lock(backend_map_lock_);
431  backends->clear();
432  BOOST_FOREACH(const BackendMap::value_type& backend_list, backend_map_) {
433  backends->insert(backends->end(), backend_list.second.begin(),
434  backend_list.second.end());
435  }
436 }
437 
438 Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
439  QuerySchedule* schedule) {
440  map<TPlanNodeId, vector<TScanRangeLocations> >::const_iterator entry;
441  for (entry = exec_request.per_node_scan_ranges.begin();
442  entry != exec_request.per_node_scan_ranges.end(); ++entry) {
443  int fragment_idx = schedule->GetFragmentIdx(entry->first);
444  const TPlanFragment& fragment = exec_request.fragments[fragment_idx];
445  bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
446 
447  FragmentScanRangeAssignment* assignment =
448  &(*schedule->exec_params())[fragment_idx].scan_range_assignment;
449  RETURN_IF_ERROR(ComputeScanRangeAssignment(
450  entry->first, entry->second, exec_request.host_list, exec_at_coord,
451  schedule->query_options(), assignment));
452  schedule->AddScanRanges(entry->second.size());
453  }
454  return Status::OK;
455 }
456 
457 Status SimpleScheduler::ComputeScanRangeAssignment(
458  PlanNodeId node_id, const vector<TScanRangeLocations>& locations,
459  const vector<TNetworkAddress>& host_list, bool exec_at_coord,
460  const TQueryOptions& query_options, FragmentScanRangeAssignment* assignment) {
461  // If cached reads are enabled, we will always prefer cached replicas over non-cached
462  // replicas. Since it is likely that only one replica is cached, this could generate
463  // hotspots which is why this is controllable by a query option.
464  //
465  // We schedule greedily in this order:
466  // cached collocated replicas > collocated replicas > remote (cached or not) replicas.
467  // The query option to disable cached reads removes the first group.
468  bool schedule_with_caching = !query_options.disable_cached_reads;
469 
470  // map from datanode host to total assigned bytes;
471  // If the data node does not have a collocated impalad, the actual assigned bytes is
472  // "total assigned - numeric_limits<int64_t>::max()".
473  unordered_map<TNetworkAddress, uint64_t> assigned_bytes_per_host;
474  unordered_set<TNetworkAddress> remote_hosts;
475  int64_t remote_bytes = 0L;
476  int64_t local_bytes = 0L;
477  int64_t cached_bytes = 0L;
478 
479  BOOST_FOREACH(const TScanRangeLocations& scan_range_locations, locations) {
480  // assign this scan range to the host w/ the fewest assigned bytes
481  uint64_t min_assigned_bytes = numeric_limits<uint64_t>::max();
482  const TNetworkAddress* data_host = NULL; // data server; not necessarily backend
483  int volume_id = -1;
484  bool is_cached = false;
485 
486  // Separate cached replicas from non-cached replicas
487  vector<const TScanRangeLocation*> cached_locations;
488  if (schedule_with_caching) {
489  BOOST_FOREACH(const TScanRangeLocation& location, scan_range_locations.locations) {
490  // Adjust whether or not this replica should count as being cached based on
491  // the query option and whether it is collocated. If the DN is not collocated
492  // treat the replica as not cached (network transfer dominates anyway in this
493  // case).
494  // TODO: measure this in a cluster setup. Are remote reads better with caching?
495  if (location.is_cached && HasLocalBackend(host_list[location.host_idx])) {
496  cached_locations.push_back(&location);
497  }
498  }
499  }
500  // If no replicas are cached find the ones based on assigned bytes
501  if (cached_locations.size() == 0) {
502  BOOST_FOREACH(const TScanRangeLocation& location, scan_range_locations.locations) {
503  DCHECK_LT(location.host_idx, host_list.size());
504  const TNetworkAddress& replica_host = host_list[location.host_idx];
505  // Deprioritize non-collocated datanodes by assigning a very high initial bytes
506  uint64_t initial_bytes =
507  HasLocalBackend(replica_host) ? 0L : numeric_limits<int64_t>::max();
508  uint64_t* assigned_bytes =
509  FindOrInsert(&assigned_bytes_per_host, replica_host, initial_bytes);
510  // Update the assignment if this is a less busy host.
511  if (*assigned_bytes < min_assigned_bytes) {
512  min_assigned_bytes = *assigned_bytes;
513  data_host = &replica_host;
514  volume_id = location.volume_id;
515  is_cached = false;
516  }
517  }
518  } else {
519  // Randomly pick a cached host based on the extracted list of cached local hosts
520  size_t rand_host = rand() % cached_locations.size();
521  const TNetworkAddress& replica_host = host_list[cached_locations[rand_host]->host_idx];
522  uint64_t initial_bytes = 0L;
523  min_assigned_bytes = *FindOrInsert(&assigned_bytes_per_host, replica_host, initial_bytes);
524  data_host = &replica_host;
525  volume_id = cached_locations[rand_host]->volume_id;
526  is_cached = true;
527  }
528 
529  int64_t scan_range_length = 0;
530  if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
531  scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
532  }
533  bool remote_read = min_assigned_bytes >= numeric_limits<int64_t>::max();
534  if (remote_read) {
535  remote_bytes += scan_range_length;
536  remote_hosts.insert(*data_host);
537  } else {
538  local_bytes += scan_range_length;
539  if (is_cached) cached_bytes += scan_range_length;
540  }
541  assigned_bytes_per_host[*data_host] += scan_range_length;
542 
543  // translate data host to backend host
544  DCHECK(data_host != NULL);
545 
546  TNetworkAddress exec_hostport;
547  if (!exec_at_coord) {
548  TBackendDescriptor backend;
549  RETURN_IF_ERROR(GetBackend(*data_host, &backend));
550  exec_hostport = backend.address;
551  } else {
552  exec_hostport = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
553  }
554 
555  PerNodeScanRanges* scan_ranges =
556  FindOrInsert(assignment, exec_hostport, PerNodeScanRanges());
557  vector<TScanRangeParams>* scan_range_params_list =
558  FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
559  // add scan range
560  TScanRangeParams scan_range_params;
561  scan_range_params.scan_range = scan_range_locations.scan_range;
562  // Explicitly set the optional fields.
563  scan_range_params.__set_volume_id(volume_id);
564  scan_range_params.__set_is_cached(is_cached);
565  scan_range_params.__set_is_remote(remote_read);
566  scan_range_params_list->push_back(scan_range_params);
567  }
568 
569  if (VLOG_FILE_IS_ON) {
570  VLOG_FILE << "Total remote scan volume = " <<
571  PrettyPrinter::Print(remote_bytes, TUnit::BYTES);
572  VLOG_FILE << "Total local scan volume = " <<
573  PrettyPrinter::Print(local_bytes, TUnit::BYTES);
574  VLOG_FILE << "Total cached scan volume = " <<
575  PrettyPrinter::Print(cached_bytes, TUnit::BYTES);
576  if (remote_hosts.size() > 0) {
577  stringstream remote_node_log;
578  remote_node_log << "Remote data node list: ";
579  BOOST_FOREACH(const TNetworkAddress& remote_host, remote_hosts) {
580  remote_node_log << remote_host << " ";
581  }
582  }
583 
584  BOOST_FOREACH(FragmentScanRangeAssignment::value_type& entry, *assignment) {
585  VLOG_FILE << "ScanRangeAssignment: server=" << ThriftDebugString(entry.first);
586  BOOST_FOREACH(PerNodeScanRanges::value_type& per_node_scan_ranges, entry.second) {
587  stringstream str;
588  BOOST_FOREACH(TScanRangeParams& params, per_node_scan_ranges.second) {
589  str << ThriftDebugString(params) << " ";
590  }
591  VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str();
592  }
593  }
594  }
595 
596  return Status::OK;
597 }
598 
599 void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
600  QuerySchedule* schedule) {
601  vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
602  // assign instance ids
603  int64_t num_backends = 0;
604  BOOST_FOREACH(FragmentExecParams& params, *fragment_exec_params) {
605  for (int j = 0; j < params.hosts.size(); ++j) {
606  int instance_num = num_backends + j;
607  // we add instance_num to query_id.lo to create a globally-unique instance id
608  TUniqueId instance_id;
609  instance_id.hi = schedule->query_id().hi;
610  DCHECK_LT(
611  schedule->query_id().lo, numeric_limits<int64_t>::max() - instance_num - 1);
612  instance_id.lo = schedule->query_id().lo + instance_num + 1;
613  params.instance_ids.push_back(instance_id);
614  }
615  num_backends += params.hosts.size();
616  }
617  if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) {
618  // the root fragment is executed directly by the coordinator
619  --num_backends;
620  }
621  schedule->set_num_backends(num_backends);
622 
623  // compute destinations and # senders per exchange node
624  // (the root fragment doesn't have a destination)
625  for (int i = 1; i < fragment_exec_params->size(); ++i) {
626  FragmentExecParams& params = (*fragment_exec_params)[i];
627  int dest_fragment_idx = exec_request.dest_fragment_idx[i - 1];
628  DCHECK_LT(dest_fragment_idx, fragment_exec_params->size());
629  FragmentExecParams& dest_params = (*fragment_exec_params)[dest_fragment_idx];
630 
631  // set # of senders
632  DCHECK(exec_request.fragments[i].output_sink.__isset.stream_sink);
633  const TDataStreamSink& sink = exec_request.fragments[i].output_sink.stream_sink;
634  // we can only handle unpartitioned (= broadcast), random-partitioned or
635  // hash-partitioned output at the moment
636  DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
637  || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
638  || sink.output_partition.type == TPartitionType::RANDOM);
639  PlanNodeId exch_id = sink.dest_node_id;
640  // we might have multiple fragments sending to this exchange node
641  // (distributed MERGE), which is why we need to add up the #senders
642  params.sender_id_base = dest_params.per_exch_num_senders[exch_id];
643  dest_params.per_exch_num_senders[exch_id] += params.hosts.size();
644 
645  // create one TPlanFragmentDestination per destination host
646  params.destinations.resize(dest_params.hosts.size());
647  for (int j = 0; j < dest_params.hosts.size(); ++j) {
648  TPlanFragmentDestination& dest = params.destinations[j];
649  dest.fragment_instance_id = dest_params.instance_ids[j];
650  dest.server = dest_params.hosts[j];
651  VLOG_RPC << "dest for fragment " << i << ":"
652  << " instance_id=" << dest.fragment_instance_id
653  << " server=" << dest.server;
654  }
655  }
656 }
657 
658 void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request,
659  QuerySchedule* schedule) {
660  vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
661  TNetworkAddress coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
662  DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
663  vector<TPlanNodeType::type> scan_node_types;
664  scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE);
665  scan_node_types.push_back(TPlanNodeType::HBASE_SCAN_NODE);
666  scan_node_types.push_back(TPlanNodeType::DATA_SOURCE_NODE);
667 
668  // compute hosts of producer fragment before those of consumer fragment(s),
669  // the latter might inherit the set of hosts from the former
670  for (int i = exec_request.fragments.size() - 1; i >= 0; --i) {
671  const TPlanFragment& fragment = exec_request.fragments[i];
672  FragmentExecParams& params = (*fragment_exec_params)[i];
673  if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
674  // all single-node fragments run on the coordinator host
675  params.hosts.push_back(coord);
676  continue;
677  }
678 
679  // UnionNodes are special because they can consume multiple partitioned inputs,
680  // as well as execute multiple scans in the same fragment.
681  // Fragments containing a UnionNode are executed on the union of hosts of all
682  // scans in the fragment as well as the hosts of all its input fragments (s.t.
683  // a UnionNode with partitioned joins or grouping aggregates as children runs on
684  // at least as many hosts as the input to those children).
685  if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
686  vector<TPlanNodeId> scan_nodes;
687  FindNodes(fragment.plan, scan_node_types, &scan_nodes);
688  vector<TPlanNodeId> exch_nodes;
689  FindNodes(fragment.plan,
690  vector<TPlanNodeType::type>(1, TPlanNodeType::EXCHANGE_NODE),
691  &exch_nodes);
692 
693  // Add hosts of scan nodes.
694  vector<TNetworkAddress> scan_hosts;
695  for (int j = 0; j < scan_nodes.size(); ++j) {
696  GetScanHosts(scan_nodes[j], exec_request, params, &scan_hosts);
697  }
698  unordered_set<TNetworkAddress> hosts(scan_hosts.begin(), scan_hosts.end());
699 
700  // Add hosts of input fragments.
701  for (int j = 0; j < exch_nodes.size(); ++j) {
702  int input_fragment_idx = FindSenderFragment(exch_nodes[j], i, exec_request);
703  const vector<TNetworkAddress>& input_fragment_hosts =
704  (*fragment_exec_params)[input_fragment_idx].hosts;
705  hosts.insert(input_fragment_hosts.begin(), input_fragment_hosts.end());
706  }
707  DCHECK(!hosts.empty()) << "no hosts for fragment " << i << " with a UnionNode";
708 
709  params.hosts.assign(hosts.begin(), hosts.end());
710  continue;
711  }
712 
713  PlanNodeId leftmost_scan_id = FindLeftmostNode(fragment.plan, scan_node_types);
714  if (leftmost_scan_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
715  // there is no leftmost scan; we assign the same hosts as those of our
716  // leftmost input fragment (so that a partitioned aggregation fragment
717  // runs on the hosts that provide the input data)
718  int input_fragment_idx = FindLeftmostInputFragment(i, exec_request);
719  DCHECK_GE(input_fragment_idx, 0);
720  DCHECK_LT(input_fragment_idx, fragment_exec_params->size());
721  params.hosts = (*fragment_exec_params)[input_fragment_idx].hosts;
722  // TODO: switch to unpartitioned/coord execution if our input fragment
723  // is executed that way (could have been downgraded from distributed)
724  continue;
725  }
726 
727  // This fragment is executed on those hosts that have scan ranges
728  // for the leftmost scan.
729  GetScanHosts(leftmost_scan_id, exec_request, params, &params.hosts);
730  }
731 
732  unordered_set<TNetworkAddress> unique_hosts;
733  BOOST_FOREACH(const FragmentExecParams& exec_params, *fragment_exec_params) {
734  unique_hosts.insert(exec_params.hosts.begin(), exec_params.hosts.end());
735  }
736 
737  schedule->SetUniqueHosts(unique_hosts);
738 }
739 
740 PlanNodeId SimpleScheduler::FindLeftmostNode(
741  const TPlan& plan, const vector<TPlanNodeType::type>& types) {
742  // the first node with num_children == 0 is the leftmost node
743  int node_idx = 0;
744  while (node_idx < plan.nodes.size() && plan.nodes[node_idx].num_children != 0) {
745  ++node_idx;
746  }
747  if (node_idx == plan.nodes.size()) {
748  return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
749  }
750  const TPlanNode& node = plan.nodes[node_idx];
751 
752  for (int i = 0; i < types.size(); ++i) {
753  if (node.node_type == types[i]) return node.node_id;
754  }
755  return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
756 }
757 
758 bool SimpleScheduler::ContainsNode(const TPlan& plan, TPlanNodeType::type type) {
759  for (int i = 0; i < plan.nodes.size(); ++i) {
760  if (plan.nodes[i].node_type == type) return true;
761  }
762  return false;
763 }
764 
765 void SimpleScheduler::FindNodes(const TPlan& plan,
766  const vector<TPlanNodeType::type>& types, vector<TPlanNodeId>* results) {
767  for (int i = 0; i < plan.nodes.size(); ++i) {
768  for (int j = 0; j < types.size(); ++j) {
769  if (plan.nodes[i].node_type == types[j]) {
770  results->push_back(plan.nodes[i].node_id);
771  break;
772  }
773  }
774  }
775 }
776 
777 void SimpleScheduler::GetScanHosts(TPlanNodeId scan_id,
778  const TQueryExecRequest& exec_request, const FragmentExecParams& params,
779  vector<TNetworkAddress>* scan_hosts) {
780  map<TPlanNodeId, vector<TScanRangeLocations> >::const_iterator entry =
781  exec_request.per_node_scan_ranges.find(scan_id);
782  if (entry == exec_request.per_node_scan_ranges.end() || entry->second.empty()) {
783  // this scan node doesn't have any scan ranges; run it on the coordinator
784  // TODO: we'll need to revisit this strategy once we can partition joins
785  // (in which case this fragment might be executing a right outer join
786  // with a large build table)
787  scan_hosts->push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
788  return;
789  }
790 
791  // Get the list of impalad host from scan_range_assignment_
792  BOOST_FOREACH(const FragmentScanRangeAssignment::value_type& scan_range_assignment,
793  params.scan_range_assignment) {
794  scan_hosts->push_back(scan_range_assignment.first);
795  }
796 }
797 
798 int SimpleScheduler::FindLeftmostInputFragment(
799  int fragment_idx, const TQueryExecRequest& exec_request) {
800  // find the leftmost node, which we expect to be an exchage node
801  vector<TPlanNodeType::type> exch_node_type;
802  exch_node_type.push_back(TPlanNodeType::EXCHANGE_NODE);
803  PlanNodeId exch_id =
804  FindLeftmostNode(exec_request.fragments[fragment_idx].plan, exch_node_type);
805  if (exch_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
806  return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
807  }
808  // find the fragment that sends to this exchange node
809  return FindSenderFragment(exch_id, fragment_idx, exec_request);
810 }
811 
812 int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
813  const TQueryExecRequest& exec_request) {
814  for (int i = 0; i < exec_request.dest_fragment_idx.size(); ++i) {
815  if (exec_request.dest_fragment_idx[i] != fragment_idx) continue;
816  const TPlanFragment& input_fragment = exec_request.fragments[i + 1];
817  DCHECK(input_fragment.__isset.output_sink);
818  DCHECK(input_fragment.output_sink.__isset.stream_sink);
819  if (input_fragment.output_sink.stream_sink.dest_node_id == exch_id) return i + 1;
820  }
821  // this shouldn't happen
822  DCHECK(false) << "no fragment sends to exch id " << exch_id;
823  return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
824 }
825 
826 Status SimpleScheduler::GetRequestPool(const string& user,
827  const TQueryOptions& query_options, string* pool) const {
828  TResolveRequestPoolResult resolve_pool_result;
829  const string& configured_pool = query_options.request_pool;
830  RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(configured_pool, user,
831  &resolve_pool_result));
832  if (resolve_pool_result.status.status_code != TErrorCode::OK) {
833  return Status(join(resolve_pool_result.status.error_msgs, "; "));
834  }
835  if (resolve_pool_result.resolved_pool.empty()) {
836  return Status(Substitute(ERROR_USER_TO_POOL_MAPPING_NOT_FOUND, user,
837  configured_pool));
838  }
839  if (!resolve_pool_result.has_access) {
840  return Status(Substitute(ERROR_USER_NOT_ALLOWED_IN_POOL, user,
841  configured_pool, resolve_pool_result.resolved_pool));
842  }
843  *pool = resolve_pool_result.resolved_pool;
844  return Status::OK;
845 }
846 
847 Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
848  if (schedule->effective_user().empty()) {
849  if (FLAGS_require_username) return Status(ERROR_USER_NOT_SPECIFIED);
850  // Fall back to a 'default' user if not set so that queries can still run.
851  VLOG(2) << "No user specified: using user=default";
852  }
853  const string& user =
854  schedule->effective_user().empty() ? DEFAULT_USER : schedule->effective_user();
855  VLOG(3) << "user='" << user << "'";
856  string pool;
857  RETURN_IF_ERROR(GetRequestPool(user, schedule->query_options(), &pool));
858  schedule->set_request_pool(pool);
859  // Statestore topic may not have been updated yet if this is soon after startup, but
860  // there is always at least this backend.
861  schedule->set_num_hosts(max(num_backends_metric_->value(), 1L));
862 
863  if (!FLAGS_disable_admission_control) {
864  RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
865  }
866  if (ExecEnv::GetInstance()->impala_server()->IsOffline()) {
867  return Status("This Impala server is offine. Please retry your query later.");
868  }
869 
870  RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
871  ComputeFragmentHosts(schedule->request(), schedule);
872  ComputeFragmentExecParams(schedule->request(), schedule);
873  if (!FLAGS_enable_rm) return Status::OK;
874  schedule->PrepareReservationRequest(pool, user);
875  const TResourceBrokerReservationRequest& reservation_request =
876  schedule->reservation_request();
877  if (!reservation_request.resources.empty()) {
878  Status status = resource_broker_->Reserve(
879  reservation_request, schedule->reservation());
880  if (!status.ok()) {
881  // Warn about missing table and/or column stats if necessary.
882  const TQueryCtx& query_ctx = schedule->request().query_ctx;
883  if(!query_ctx.__isset.parent_query_id &&
884  query_ctx.__isset.tables_missing_stats &&
885  !query_ctx.tables_missing_stats.empty()) {
886  status.AddDetail(GetTablesMissingStatsWarning(query_ctx.tables_missing_stats));
887  }
888  return status;
889  }
891  AddToActiveResourceMaps(*schedule->reservation(), coord);
892  }
893  return Status::OK;
894 }
895 
896 Status SimpleScheduler::Release(QuerySchedule* schedule) {
897  if (!FLAGS_disable_admission_control) {
898  RETURN_IF_ERROR(admission_controller_->ReleaseQuery(schedule));
899  }
900  if (FLAGS_enable_rm && schedule->NeedsRelease()) {
901  DCHECK(resource_broker_ != NULL);
902  TResourceBrokerReleaseRequest request;
903  TResourceBrokerReleaseResponse response;
904  request.reservation_id = schedule->reservation()->reservation_id;
905  resource_broker_->Release(request, &response);
906  // Remove the reservation from the active-resource maps even if there was an error
907  // releasing the reservation because the query running in the reservation is done.
908  RemoveFromActiveResourceMaps(*schedule->reservation());
909  if (response.status.status_code != TErrorCode::OK) {
910  return Status(join(response.status.error_msgs, ", "));
911  }
912  }
913  return Status::OK;
914 }
915 
916 void SimpleScheduler::AddToActiveResourceMaps(
917  const TResourceBrokerReservationResponse& reservation, Coordinator* coord) {
918  lock_guard<mutex> l(active_resources_lock_);
919  active_reservations_[reservation.reservation_id] = coord;
920  map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
921  for (iter = reservation.allocated_resources.begin();
922  iter != reservation.allocated_resources.end();
923  ++iter) {
924  TUniqueId client_resource_id;
925  client_resource_id << iter->second.client_resource_id;
926  active_client_resources_[client_resource_id] = coord;
927  }
928 }
929 
930 void SimpleScheduler::RemoveFromActiveResourceMaps(
931  const TResourceBrokerReservationResponse& reservation) {
932  lock_guard<mutex> l(active_resources_lock_);
933  active_reservations_.erase(reservation.reservation_id);
934  map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
935  for (iter = reservation.allocated_resources.begin();
936  iter != reservation.allocated_resources.end();
937  ++iter) {
938  TUniqueId client_resource_id;
939  client_resource_id << iter->second.client_resource_id;
940  active_client_resources_.erase(client_resource_id);
941  }
942 }
943 
944 void SimpleScheduler::HandlePreemptedReservation(const TUniqueId& reservation_id) {
945  Coordinator* coord = NULL;
946  {
947  lock_guard<mutex> l(active_resources_lock_);
948  ActiveReservationsMap::iterator it = active_reservations_.find(reservation_id);
949  if (it != active_reservations_.end()) coord = it->second;
950  }
951  if (coord == NULL) {
952  LOG(WARNING) << "Ignoring preempted reservation id " << reservation_id
953  << " because no active query using it was found.";
954  } else {
955  stringstream err_msg;
956  err_msg << "Reservation " << reservation_id << " was preempted";
957  Status status(err_msg.str());
958  coord->Cancel(&status);
959  }
960 }
961 
962 void SimpleScheduler::HandlePreemptedResource(const TUniqueId& client_resource_id) {
963  Coordinator* coord = NULL;
964  {
965  lock_guard<mutex> l(active_resources_lock_);
966  ActiveClientResourcesMap::iterator it =
967  active_client_resources_.find(client_resource_id);
968  if (it != active_client_resources_.end()) coord = it->second;
969  }
970  if (coord == NULL) {
971  LOG(WARNING) << "Ignoring preempted client resource id " << client_resource_id
972  << " because no active query using it was found.";
973  } else {
974  stringstream err_msg;
975  err_msg << "Resource " << client_resource_id << " was preempted";
976  Status status(err_msg.str());
977  coord->Cancel();
978  }
979 }
980 
981 void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id) {
982  Coordinator* coord = NULL;
983  {
984  lock_guard<mutex> l(active_resources_lock_);
985  ActiveClientResourcesMap::iterator it =
986  active_client_resources_.find(client_resource_id);
987  if (it != active_client_resources_.end()) coord = it->second;
988  }
989  if (coord == NULL) {
990  LOG(WARNING) << "Ignoring lost client resource id " << client_resource_id
991  << " because no active query using it was found.";
992  } else {
993  stringstream err_msg;
994  err_msg << "Resource " << client_resource_id << " was lost";
995  Status status(err_msg.str());
996  coord->Cancel();
997  }
998 }
999 
1000 }
static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized")
string GetTablesMissingStatsWarning(const vector< TTableName > &tables_missing_stats)
Definition: error-util.cc:40
static const string DEFAULT_USER("default")
static const string BACKENDS_WEB_PAGE
const std::string GetDetail() const
Definition: status.cc:184
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
int PlanNodeId
Definition: global-types.h:26
bool FindFirstNonLocalhost(const vector< string > &addresses, string *addr)
Definition: network-util.cc:85
static list< string > backends
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
Definition: webserver.h:38
void set_request_pool(const std::string &pool_name)
std::vector< TNetworkAddress > hosts
int32_t GetFragmentIdx(PlanNodeId id) const
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
Definition: mem-info.h:36
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
boost::unordered_map< TNetworkAddress, PerNodeScanRanges > FragmentScanRangeAssignment
static int64_t ParseMemSpec(const std::string &mem_spec_str, bool *is_percent, int64_t relative_reference)
Definition: parse-util.cc:23
std::vector< TPlanFragmentDestination > destinations
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
Definition: status.cc:166
bool TBackendDescriptorComparator(const TBackendDescriptor &a, const TBackendDescriptor &b)
static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total")
DECLARE_bool(enable_rm)
void PrepareReservationRequest(const std::string &pool, const std::string &user)
bool TNetworkAddressComparator(const TNetworkAddress &a, const TNetworkAddress &b)
Definition: thrift-util.cc:168
void set_num_backends(int64_t num_backends)
bool NeedsRelease() const
std::map< std::string, std::string > ArgumentMap
Definition: webserver.h:36
void AddScanRanges(int64_t delta)
Helper methods used by scheduler to populate this QuerySchedule.
const TUniqueId & query_id() const
FragmentScanRangeAssignment scan_range_assignment
V * FindOrInsert(std::map< K, V > *m, const K &key, const V &default_val)
#define VLOG_FILE_IS_ON
Definition: logging.h:65
std::map< TPlanNodeId, std::vector< TScanRangeParams > > PerNodeScanRanges
map from scan node id to a list of scan ranges
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
void set_num_hosts(int64_t num_hosts)
ObjectPool pool
std::vector< FragmentExecParams > * exec_params()
bool IsWildcardAddress(const string &ipaddress)
DECLARE_string(hostname)
void SetUniqueHosts(const boost::unordered_set< TNetworkAddress > &unique_hosts)
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
Status HostnameToIpAddrs(const string &name, vector< string > *addresses)
Definition: network-util.cc:53
static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends")
static const string ERROR_USER_TO_POOL_MAPPING_NOT_FOUND("No mapping found for request from user '$0' with requested pool '$1'")
const TResourceBrokerReservationRequest & reservation_request() const
static const string ERROR_USER_NOT_SPECIFIED("User must be specified because ""-require_username=true.")
const std::string & effective_user() const
DEFINE_bool(disable_admission_control, true,"Disables admission control.")
#define VLOG_RPC
Definition: logging.h:56
#define VLOG_FILE
Definition: logging.h:58
static const string BACKENDS_TEMPLATE
bool DeserializeThriftMsg(uint8_t *buf, uint32_t *len, bool compact, T *deserialized_msg)
bool ok() const
Definition: status.h:172
TResourceBrokerReservationResponse * reservation()
static const string ERROR_USER_NOT_ALLOWED_IN_POOL("Request from user '$0' with ""requested pool '$1' denied access to assigned pool '$2'")
const TQueryOptions & query_options() const
std::vector< TBackendDescriptor > BackendList
List of server descriptors.
Definition: scheduler.h:45
DECLARE_int32(be_port)
std::vector< TUniqueId > instance_ids
static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total")
const TQueryExecRequest & request() const