Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HBaseScanNode.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.io.IOException;
18 import java.nio.ByteBuffer;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.List;
22 import java.util.Map;
23 
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.HRegionLocation;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.filter.CompareFilter;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 
45 import com.cloudera.impala.common.Pair;
47 import com.cloudera.impala.thrift.TColumnValue;
48 import com.cloudera.impala.thrift.TExplainLevel;
49 import com.cloudera.impala.thrift.THBaseFilter;
50 import com.cloudera.impala.thrift.THBaseKeyRange;
51 import com.cloudera.impala.thrift.THBaseScanNode;
52 import com.cloudera.impala.thrift.TNetworkAddress;
53 import com.cloudera.impala.thrift.TPlanNode;
54 import com.cloudera.impala.thrift.TPlanNodeType;
55 import com.cloudera.impala.thrift.TQueryOptions;
56 import com.cloudera.impala.thrift.TScanRange;
57 import com.cloudera.impala.thrift.TScanRangeLocation;
58 import com.cloudera.impala.thrift.TScanRangeLocations;
59 import com.google.common.base.Objects;
60 import com.google.common.base.Preconditions;
61 import com.google.common.collect.Lists;
62 import com.google.common.collect.Maps;
63 
68 public class HBaseScanNode extends ScanNode {
69  private final static Logger LOG = LoggerFactory.getLogger(HBaseScanNode.class);
70  private final TupleDescriptor desc_;
71 
72  // One range per clustering column. The range bounds are expected to be constants.
73  // A null entry means there's no range restriction for that particular key.
74  // If keyRanges is non-null it always contains as many entries as there are clustering
75  // cols.
76  private List<ValueRange> keyRanges_;
77 
78  // derived from keyRanges_; empty means unbounded;
79  // initialize start/stopKey_ to be unbounded.
80  private byte[] startKey_ = HConstants.EMPTY_START_ROW;
81  private byte[] stopKey_ = HConstants.EMPTY_END_ROW;
82 
83  // True if this scan node is not going to scan anything. If the row key filter
84  // evaluates to null, or if the lower bound > upper bound, then this scan node won't
85  // scan at all.
86  private boolean isEmpty_ = false;
87 
88  // List of HBase Filters for generating thrift message. Filled in finalize().
89  private final List<THBaseFilter> filters_ = new ArrayList<THBaseFilter>();
90 
91  // The suggested value for "hbase.client.scan.setCaching", which batches maxCaching
92  // rows per fetch request to the HBase region server. If the value is too high,
93  // then the hbase region server will have a hard time (GC pressure and long response
94  // times). If the value is too small, then there will be extra trips to the hbase
95  // region server.
96  // Default to 1024 and update it based on row size estimate such that each batch size
97  // won't exceed 500MB.
98  private final static int MAX_HBASE_FETCH_BATCH_SIZE = 500 * 1024 * 1024;
99  private final static int DEFAULT_SUGGESTED_CACHING = 1024;
101 
102  // HBase config; Common across all object instance.
103  private static Configuration hbaseConf_ = HBaseConfiguration.create();
104 
106  super(id, desc, "SCAN HBASE");
107  desc_ = desc;
108  }
109 
110  public void setKeyRanges(List<ValueRange> keyRanges) {
111  Preconditions.checkNotNull(keyRanges);
112  keyRanges_ = keyRanges;
113  }
114 
115  @Override
116  public void init(Analyzer analyzer) throws InternalException {
117  assignConjuncts(analyzer);
118  setStartStopKey(analyzer);
119  // Convert predicates to HBase filters_.
120  createHBaseFilters(analyzer);
121 
122  // materialize slots in remaining conjuncts_
123  analyzer.materializeSlots(conjuncts_);
124  computeMemLayout(analyzer);
125  computeScanRangeLocations(analyzer);
126 
127  // Call computeStats() after materializing slots and computing the mem layout.
128  computeStats(analyzer);
129  }
130 
141  private void setStartStopKey(Analyzer analyzer) throws InternalException {
142  Preconditions.checkNotNull(keyRanges_);
143  Preconditions.checkState(keyRanges_.size() == 1);
144 
145  ValueRange rowRange = keyRanges_.get(0);
146  if (rowRange != null) {
147  if (rowRange.getLowerBound() != null) {
148  Preconditions.checkState(rowRange.getLowerBound().isConstant());
149  Preconditions.checkState(
150  rowRange.getLowerBound().getType().equals(Type.STRING));
151  TColumnValue val = FeSupport.EvalConstExpr(rowRange.getLowerBound(),
152  analyzer.getQueryCtx());
153  if (!val.isSetString_val()) {
154  // lower bound is null.
155  isEmpty_ = true;
156  return;
157  } else {
158  startKey_ = convertToBytes(val.getString_val(),
159  !rowRange.getLowerBoundInclusive());
160  }
161  }
162  if (rowRange.getUpperBound() != null) {
163  Preconditions.checkState(rowRange.getUpperBound().isConstant());
164  Preconditions.checkState(
165  rowRange.getUpperBound().getType().equals(Type.STRING));
166  TColumnValue val = FeSupport.EvalConstExpr(rowRange.getUpperBound(),
167  analyzer.getQueryCtx());
168  if (!val.isSetString_val()) {
169  // upper bound is null.
170  isEmpty_ = true;
171  return;
172  } else {
173  stopKey_ = convertToBytes(val.getString_val(),
174  rowRange.getUpperBoundInclusive());
175  }
176  }
177  }
178 
179  boolean endKeyIsEndOfTable = Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW);
180  if ((Bytes.compareTo(startKey_, stopKey_) > 0) && !endKeyIsEndOfTable) {
181  // Lower bound is greater than upper bound.
182  isEmpty_ = true;
183  }
184  }
185 
189  @Override
190  public void computeStats(Analyzer analyzer) {
191  super.computeStats(analyzer);
193 
194  ValueRange rowRange = keyRanges_.get(0);
195  if (isEmpty_) {
196  cardinality_ = 0;
197  } else if (rowRange != null && rowRange.isEqRange()) {
198  cardinality_ = 1;
199  } else {
200  // Set maxCaching so that each fetch from hbase won't return a batch of more than
201  // MAX_HBASE_FETCH_BATCH_SIZE bytes.
202  Pair<Long, Long> estimate = tbl.getEstimatedRowStats(startKey_, stopKey_);
203  cardinality_ = estimate.first.longValue();
204  if (estimate.second.longValue() > 0) {
205  suggestedCaching_ = (int)
206  Math.max(MAX_HBASE_FETCH_BATCH_SIZE / estimate.second.longValue(), 1);
207  }
208  }
210 
212  cardinality_ = Math.max(0, cardinality_);
213  cardinality_ = capAtLimit(cardinality_);
214  LOG.debug("computeStats HbaseScan: cardinality=" + Long.toString(cardinality_));
215 
216  // TODO: take actual regions into account
217  numNodes_ = desc_.getTable().getNumNodes();
218  LOG.debug("computeStats HbaseScan: #nodes=" + Integer.toString(numNodes_));
219  }
220 
221  @Override
222  protected String debugString() {
224  return Objects.toStringHelper(this)
225  .add("tid", desc_.getId().asInt())
226  .add("hiveTblName", tbl.getFullName())
227  .add("hbaseTblName", tbl.getHBaseTableName())
228  .add("startKey", ByteBuffer.wrap(startKey_).toString())
229  .add("stopKey", ByteBuffer.wrap(stopKey_).toString())
230  .add("isEmpty", isEmpty_)
231  .addValue(super.debugString())
232  .toString();
233  }
234 
235  // We convert predicates of the form <slotref> op <constant> where slotref is of
236  // type string to HBase filters. All these predicates are also evaluated at
237  // the HBaseScanNode. To properly filter out NULL values HBaseScanNode treats all
238  // predicates as disjunctive, thereby requiring re-evaluation when there are multiple
239  // attributes. We explicitly materialize the referenced slots, otherwise our hbase
240  // scans don't return correct data.
241  // TODO: expand this to generate nested filter lists for arbitrary conjunctions
242  // and disjunctions.
243  private void createHBaseFilters(Analyzer analyzer) {
244  for (Expr e: conjuncts_) {
245  // We only consider binary predicates
246  if (!(e instanceof BinaryPredicate)) continue;
247  BinaryPredicate bp = (BinaryPredicate) e;
248  CompareFilter.CompareOp hbaseOp = impalaOpToHBaseOp(bp.getOp());
249  // Ignore unsupported ops
250  if (hbaseOp == null) continue;
251 
252  for (SlotDescriptor slot: desc_.getSlots()) {
253  // Only push down predicates on string columns
254  if (slot.getType().getPrimitiveType() != PrimitiveType.STRING) continue;
255 
256  Expr bindingExpr = bp.getSlotBinding(slot.getId());
257  if (bindingExpr == null || !(bindingExpr instanceof StringLiteral)) continue;
258 
259  StringLiteral literal = (StringLiteral) bindingExpr;
260  HBaseColumn col = (HBaseColumn) slot.getColumn();
261  filters_.add(new THBaseFilter(
262  col.getColumnFamily(), col.getColumnQualifier(),
263  (byte) hbaseOp.ordinal(), literal.getValue()));
264  analyzer.materializeSlots(Lists.newArrayList(e));
265  }
266  }
267  }
268 
269  @Override
270  protected void toThrift(TPlanNode msg) {
271  msg.node_type = TPlanNodeType.HBASE_SCAN_NODE;
273  msg.hbase_scan_node =
274  new THBaseScanNode(desc_.getId().asInt(), tbl.getHBaseTableName());
275  if (!filters_.isEmpty()) {
276  msg.hbase_scan_node.setFilters(filters_);
277  }
278  msg.hbase_scan_node.setSuggested_max_caching(suggestedCaching_);
279  }
280 
286  private void computeScanRangeLocations(Analyzer analyzer) {
287  scanRanges_ = Lists.newArrayList();
288 
289  // For empty scan node, return an empty list.
290  if (isEmpty_) return;
291 
292  // Retrieve relevant HBase regions and their region servers
294  HTable hbaseTbl = null;
295  List<HRegionLocation> regionsLoc;
296  try {
297  hbaseTbl = new HTable(hbaseConf_, tbl.getHBaseTableName());
298  regionsLoc = HBaseTable.getRegionsInRange(hbaseTbl, startKey_, stopKey_);
299  } catch (IOException e) {
300  throw new RuntimeException(
301  "couldn't retrieve HBase table (" + tbl.getHBaseTableName() + ") info:\n"
302  + e.getMessage());
303  }
304 
305  // Convert list of HRegionLocation to Map<hostport, List<HRegionLocation>>.
306  // The List<HRegionLocations>'s end up being sorted by start key/end key, because
307  // regionsLoc is sorted that way.
308  Map<String, List<HRegionLocation>> locationMap = Maps.newHashMap();
309  for (HRegionLocation regionLoc: regionsLoc) {
310  String locHostPort = regionLoc.getHostnamePort();
311  if (locationMap.containsKey(locHostPort)) {
312  locationMap.get(locHostPort).add(regionLoc);
313  } else {
314  locationMap.put(locHostPort, Lists.newArrayList(regionLoc));
315  }
316  }
317 
318  for (Map.Entry<String, List<HRegionLocation>> locEntry: locationMap.entrySet()) {
319  // HBaseTableScanner(backend) initializes a result scanner for each key range.
320  // To minimize # of result scanner re-init, create only a single HBaseKeyRange
321  // for all adjacent regions on this server.
322  THBaseKeyRange keyRange = null;
323  byte[] prevEndKey = null;
324  for (HRegionLocation regionLoc: locEntry.getValue()) {
325  byte[] curRegStartKey = regionLoc.getRegionInfo().getStartKey();
326  byte[] curRegEndKey = regionLoc.getRegionInfo().getEndKey();
327  if (prevEndKey != null &&
328  Bytes.compareTo(prevEndKey, curRegStartKey) == 0) {
329  // the current region starts where the previous one left off;
330  // extend the key range
331  setKeyRangeEnd(keyRange, curRegEndKey);
332  } else {
333  // create a new HBaseKeyRange (and TScanRange2/TScanRangeLocations to go
334  // with it).
335  keyRange = new THBaseKeyRange();
336  setKeyRangeStart(keyRange, curRegStartKey);
337  setKeyRangeEnd(keyRange, curRegEndKey);
338 
339  TScanRangeLocations scanRangeLocation = new TScanRangeLocations();
340  TNetworkAddress networkAddress = addressToTNetworkAddress(locEntry.getKey());
341  scanRangeLocation.addToLocations(
342  new TScanRangeLocation(analyzer.getHostIndex().getIndex(networkAddress)));
343  scanRanges_.add(scanRangeLocation);
344 
345  TScanRange scanRange = new TScanRange();
346  scanRange.setHbase_key_range(keyRange);
347  scanRangeLocation.setScan_range(scanRange);
348  }
349  prevEndKey = curRegEndKey;
350  }
351  }
352  }
353 
359  private void setKeyRangeStart(THBaseKeyRange keyRange, byte[] rangeStartKey) {
360  keyRange.unsetStartKey();
361  // use the max(startKey, rangeStartKey) for scan start
362  if (!Bytes.equals(rangeStartKey, HConstants.EMPTY_START_ROW) ||
363  !Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) {
364  byte[] partStart = (Bytes.compareTo(rangeStartKey, startKey_) < 0) ?
365  startKey_ : rangeStartKey;
366  keyRange.setStartKey(Bytes.toString(partStart));
367  }
368  }
369 
375  private void setKeyRangeEnd(THBaseKeyRange keyRange, byte[] rangeEndKey) {
376  keyRange.unsetStopKey();
377  // use the min(stopkey, regionStopKey) for scan stop
378  if (!Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW) ||
379  !Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
380  if (Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
381  keyRange.setStopKey(Bytes.toString(rangeEndKey));
382  } else if (Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW)) {
383  keyRange.setStopKey(Bytes.toString(stopKey_));
384  } else {
385  byte[] partEnd = (Bytes.compareTo(rangeEndKey, stopKey_) < 0) ?
386  rangeEndKey : stopKey_;
387  keyRange.setStopKey(Bytes.toString(partEnd));
388  }
389  }
390  }
391 
392  @Override
393  protected String getNodeExplainString(String prefix, String detailPrefix,
394  TExplainLevel detailLevel) {
395  HBaseTable table = (HBaseTable) desc_.getTable();
396  StringBuilder output = new StringBuilder();
397  if (isEmpty_) {
398  output.append(prefix + "empty scan node\n");
399  return output.toString();
400  }
401  String aliasStr = "";
402  if (!table.getFullName().equalsIgnoreCase(desc_.getAlias()) &&
403  !table.getName().equalsIgnoreCase(desc_.getAlias())) {
404  aliasStr = " " + desc_.getAlias();
405  }
406  output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(),
407  displayName_, table.getFullName(), aliasStr));
408  if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
409  if (!Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) {
410  output.append(detailPrefix + "start key: " + printKey(startKey_) + "\n");
411  }
412  if (!Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
413  output.append(detailPrefix + "stop key: " + printKey(stopKey_) + "\n");
414  }
415  if (!filters_.isEmpty()) {
416  output.append(detailPrefix + "hbase filters:");
417  if (filters_.size() == 1) {
418  THBaseFilter filter = filters_.get(0);
419  output.append(" " + filter.family + ":" + filter.qualifier + " " +
420  CompareFilter.CompareOp.values()[filter.op_ordinal].toString() + " " +
421  "'" + filter.filter_constant + "'");
422  } else {
423  for (int i = 0; i < filters_.size(); ++i) {
424  THBaseFilter filter = filters_.get(i);
425  output.append("\n " + filter.family + ":" + filter.qualifier + " " +
426  CompareFilter.CompareOp.values()[filter.op_ordinal].toString() + " " +
427  "'" + filter.filter_constant + "'");
428  }
429  }
430  output.append('\n');
431  }
432  if (!conjuncts_.isEmpty()) {
433  output.append(
434  detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n");
435  }
436  }
437  if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
438  output.append(getStatsExplainString(detailPrefix, detailLevel));
439  output.append("\n");
440  }
441  return output.toString();
442  }
443 
447  private byte[] convertToBytes(String rowKey, boolean nextKey) {
448  byte[] keyBytes = Bytes.toBytes(rowKey);
449  if (!nextKey) {
450  return keyBytes;
451  } else {
452  // append \0
453  return Arrays.copyOf(keyBytes, keyBytes.length + 1);
454  }
455  }
456 
461  public static String printKey(byte[] key) {
462  StringBuilder result = new StringBuilder();
463  for (int i = 0; i < key.length; ++i) {
464  if (!Character.isISOControl(key[i])) {
465  result.append((char) key[i]);
466  } else {
467  result.append("\\");
468  result.append(Integer.toOctalString(key[i]));
469  }
470  }
471  return result.toString();
472  }
473 
474  private static CompareFilter.CompareOp impalaOpToHBaseOp(
475  BinaryPredicate.Operator impalaOp) {
476  switch(impalaOp) {
477  case EQ: return CompareFilter.CompareOp.EQUAL;
478  case NE: return CompareFilter.CompareOp.NOT_EQUAL;
479  case GT: return CompareFilter.CompareOp.GREATER;
480  case GE: return CompareFilter.CompareOp.GREATER_OR_EQUAL;
481  case LT: return CompareFilter.CompareOp.LESS;
482  case LE: return CompareFilter.CompareOp.LESS_OR_EQUAL;
483  // TODO: Add support for pushing LIKE/REGEX down to HBase with a different Filter.
484  default: throw new IllegalArgumentException(
485  "HBase: Unsupported Impala compare operator: " + impalaOp);
486  }
487  }
488 
489  @Override
490  public void computeCosts(TQueryOptions queryOptions) {
491  // TODO: What's a good estimate of memory consumption?
492  perHostMemCost_ = 1024L * 1024L * 1024L;
493  }
494 
499  public static long getPerHostMemUpperBound() {
500  // TODO: What's a good estimate of memory consumption?
501  return 1024L * 1024L * 1024L;
502  }
503 }
static TNetworkAddress addressToTNetworkAddress(String address)
Definition: ScanNode.java:132
void assignConjuncts(Analyzer analyzer)
Definition: PlanNode.java:401
static final ScalarType STRING
Definition: Type.java:53
String getStatsExplainString(String prefix, TExplainLevel detailLevel)
Definition: ScanNode.java:75
void setKeyRangeStart(THBaseKeyRange keyRange, byte[] rangeStartKey)
void computeScanRangeLocations(Analyzer analyzer)
void setKeyRangeEnd(THBaseKeyRange keyRange, byte[] rangeEndKey)
String getHBaseTableName(org.apache.hadoop.hive.metastore.api.Table tbl)
PrimitiveType
Definition: types.h:27
void computeCosts(TQueryOptions queryOptions)
void computeMemLayout(Analyzer analyzer)
Definition: PlanNode.java:475
long capAtLimit(long cardinality)
Definition: PlanNode.java:450
void setKeyRanges(List< ValueRange > keyRanges)
List< TScanRangeLocations > scanRanges_
Definition: ScanNode.java:42
byte[] convertToBytes(String rowKey, boolean nextKey)
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
ListMap< TNetworkAddress > getHostIndex()
Definition: Analyzer.java:2070
HBaseScanNode(PlanNodeId id, TupleDescriptor desc)
static CompareFilter.CompareOp impalaOpToHBaseOp(BinaryPredicate.Operator impalaOp)