15 package com.cloudera.impala.planner;
17 import java.io.IOException;
18 import java.nio.ByteBuffer;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.List;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.HRegionLocation;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.filter.CompareFilter;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
45 import com.cloudera.impala.common.Pair;
47 import com.cloudera.impala.thrift.TColumnValue;
48 import com.cloudera.impala.thrift.TExplainLevel;
49 import com.cloudera.impala.thrift.THBaseFilter;
50 import com.cloudera.impala.thrift.THBaseKeyRange;
51 import com.cloudera.impala.thrift.THBaseScanNode;
52 import com.cloudera.impala.thrift.TNetworkAddress;
53 import com.cloudera.impala.thrift.TPlanNode;
54 import com.cloudera.impala.thrift.TPlanNodeType;
55 import com.cloudera.impala.thrift.TQueryOptions;
56 import com.cloudera.impala.thrift.TScanRange;
57 import com.cloudera.impala.thrift.TScanRangeLocation;
58 import com.cloudera.impala.thrift.TScanRangeLocations;
59 import com.google.common.base.Objects;
60 import com.google.common.base.Preconditions;
61 import com.google.common.collect.Lists;
62 import com.google.common.collect.Maps;
69 private final static Logger
LOG = LoggerFactory.getLogger(HBaseScanNode.class);
80 private byte[]
startKey_ = HConstants.EMPTY_START_ROW;
81 private byte[]
stopKey_ = HConstants.EMPTY_END_ROW;
89 private final List<THBaseFilter>
filters_ =
new ArrayList<THBaseFilter>();
103 private static Configuration
hbaseConf_ = HBaseConfiguration.create();
106 super(
id, desc,
"SCAN HBASE");
111 Preconditions.checkNotNull(keyRanges);
143 Preconditions.checkState(keyRanges_.size() == 1);
146 if (rowRange != null) {
148 Preconditions.checkState(rowRange.getLowerBound().isConstant());
149 Preconditions.checkState(
150 rowRange.getLowerBound().getType().equals(
Type.STRING));
151 TColumnValue val = FeSupport.EvalConstExpr(rowRange.getLowerBound(),
152 analyzer.getQueryCtx());
153 if (!val.isSetString_val()) {
159 !rowRange.getLowerBoundInclusive());
163 Preconditions.checkState(rowRange.getUpperBound().isConstant());
164 Preconditions.checkState(
165 rowRange.getUpperBound().getType().equals(
Type.STRING));
166 TColumnValue val = FeSupport.EvalConstExpr(rowRange.getUpperBound(),
167 analyzer.getQueryCtx());
168 if (!val.isSetString_val()) {
174 rowRange.getUpperBoundInclusive());
179 boolean endKeyIsEndOfTable = Bytes.equals(
stopKey_, HConstants.EMPTY_END_ROW);
180 if ((Bytes.compareTo(
startKey_, stopKey_) > 0) && !endKeyIsEndOfTable) {
191 super.computeStats(analyzer);
197 }
else if (rowRange != null && rowRange.
isEqRange()) {
204 if (estimate.second.longValue() > 0) {
214 LOG.debug(
"computeStats HbaseScan: cardinality=" + Long.toString(
cardinality_));
218 LOG.debug(
"computeStats HbaseScan: #nodes=" + Integer.toString(
numNodes_));
224 return Objects.toStringHelper(
this)
226 .add(
"hiveTblName", tbl.getFullName())
227 .add(
"hbaseTblName", tbl.getHBaseTableName())
228 .add(
"startKey", ByteBuffer.wrap(
startKey_).toString())
229 .add(
"stopKey", ByteBuffer.wrap(
stopKey_).toString())
231 .addValue(super.debugString())
247 BinaryPredicate bp = (BinaryPredicate) e;
250 if (hbaseOp == null)
continue;
256 Expr bindingExpr = bp.getSlotBinding(slot.getId());
257 if (bindingExpr == null || !(bindingExpr instanceof
StringLiteral))
continue;
259 StringLiteral literal = (StringLiteral) bindingExpr;
261 filters_.add(
new THBaseFilter(
263 (byte) hbaseOp.ordinal(), literal.
getValue()));
264 analyzer.materializeSlots(Lists.newArrayList(e));
271 msg.node_type = TPlanNodeType.HBASE_SCAN_NODE;
273 msg.hbase_scan_node =
276 msg.hbase_scan_node.setFilters(
filters_);
294 HTable hbaseTbl = null;
295 List<HRegionLocation> regionsLoc;
299 }
catch (IOException e) {
300 throw new RuntimeException(
308 Map<String, List<HRegionLocation>> locationMap = Maps.newHashMap();
309 for (HRegionLocation regionLoc: regionsLoc) {
310 String locHostPort = regionLoc.getHostnamePort();
311 if (locationMap.containsKey(locHostPort)) {
312 locationMap.get(locHostPort).add(regionLoc);
314 locationMap.put(locHostPort, Lists.newArrayList(regionLoc));
318 for (Map.Entry<String, List<HRegionLocation>> locEntry: locationMap.entrySet()) {
322 THBaseKeyRange keyRange = null;
323 byte[] prevEndKey = null;
324 for (HRegionLocation regionLoc: locEntry.getValue()) {
325 byte[] curRegStartKey = regionLoc.getRegionInfo().getStartKey();
326 byte[] curRegEndKey = regionLoc.getRegionInfo().getEndKey();
327 if (prevEndKey != null &&
328 Bytes.compareTo(prevEndKey, curRegStartKey) == 0) {
335 keyRange =
new THBaseKeyRange();
339 TScanRangeLocations scanRangeLocation =
new TScanRangeLocations();
341 scanRangeLocation.addToLocations(
342 new TScanRangeLocation(analyzer.
getHostIndex().getIndex(networkAddress)));
343 scanRanges_.add(scanRangeLocation);
345 TScanRange scanRange =
new TScanRange();
346 scanRange.setHbase_key_range(keyRange);
347 scanRangeLocation.setScan_range(scanRange);
349 prevEndKey = curRegEndKey;
360 keyRange.unsetStartKey();
362 if (!Bytes.equals(rangeStartKey, HConstants.EMPTY_START_ROW) ||
363 !Bytes.equals(
startKey_, HConstants.EMPTY_START_ROW)) {
364 byte[] partStart = (Bytes.compareTo(rangeStartKey,
startKey_) < 0) ?
365 startKey_ : rangeStartKey;
366 keyRange.setStartKey(Bytes.toString(partStart));
376 keyRange.unsetStopKey();
378 if (!Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW) ||
379 !Bytes.equals(
stopKey_, HConstants.EMPTY_END_ROW)) {
380 if (Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
381 keyRange.setStopKey(Bytes.toString(rangeEndKey));
382 }
else if (Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW)) {
383 keyRange.setStopKey(Bytes.toString(
stopKey_));
385 byte[] partEnd = (Bytes.compareTo(rangeEndKey,
stopKey_) < 0) ?
387 keyRange.setStopKey(Bytes.toString(partEnd));
394 TExplainLevel detailLevel) {
396 StringBuilder output =
new StringBuilder();
398 output.append(prefix +
"empty scan node\n");
399 return output.toString();
401 String aliasStr =
"";
402 if (!table.
getFullName().equalsIgnoreCase(desc_.getAlias()) &&
404 aliasStr =
" " + desc_.getAlias();
406 output.append(String.format(
"%s%s:%s [%s%s]\n", prefix, id_.toString(),
408 if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
409 if (!Bytes.equals(
startKey_, HConstants.EMPTY_START_ROW)) {
412 if (!Bytes.equals(
stopKey_, HConstants.EMPTY_END_ROW)) {
416 output.append(detailPrefix +
"hbase filters:");
418 THBaseFilter filter = filters_.get(0);
419 output.append(
" " + filter.family +
":" + filter.qualifier +
" " +
420 CompareFilter.CompareOp.values()[filter.op_ordinal].toString() +
" " +
421 "'" + filter.filter_constant +
"'");
423 for (
int i = 0; i < filters_.size(); ++i) {
424 THBaseFilter filter = filters_.get(i);
425 output.append(
"\n " + filter.family +
":" + filter.qualifier +
" " +
426 CompareFilter.CompareOp.values()[filter.op_ordinal].toString() +
" " +
427 "'" + filter.filter_constant +
"'");
437 if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
441 return output.toString();
448 byte[] keyBytes = Bytes.toBytes(rowKey);
453 return Arrays.copyOf(keyBytes, keyBytes.length + 1);
462 StringBuilder result =
new StringBuilder();
463 for (
int i = 0; i < key.length; ++i) {
464 if (!Character.isISOControl(key[i])) {
465 result.append((char) key[i]);
468 result.append(Integer.toOctalString(key[i]));
471 return result.toString();
477 case EQ:
return CompareFilter.CompareOp.EQUAL;
478 case NE:
return CompareFilter.CompareOp.NOT_EQUAL;
479 case GT:
return CompareFilter.CompareOp.GREATER;
480 case GE:
return CompareFilter.CompareOp.GREATER_OR_EQUAL;
481 case LT:
return CompareFilter.CompareOp.LESS;
482 case LE:
return CompareFilter.CompareOp.LESS_OR_EQUAL;
484 default:
throw new IllegalArgumentException(
485 "HBase: Unsupported Impala compare operator: " + impalaOp);
501 return 1024L * 1024L * 1024L;
static long getPerHostMemUpperBound()
static TNetworkAddress addressToTNetworkAddress(String address)
final List< THBaseFilter > filters_
static final int MAX_HBASE_FETCH_BATCH_SIZE
void assignConjuncts(Analyzer analyzer)
void createHBaseFilters(Analyzer analyzer)
static final ScalarType STRING
static String printKey(byte[] key)
String getStatsExplainString(String prefix, TExplainLevel detailLevel)
String getExplainString()
void computeStats(Analyzer analyzer)
void setKeyRangeStart(THBaseKeyRange keyRange, byte[] rangeStartKey)
static final int DEFAULT_SUGGESTED_CACHING
void computeScanRangeLocations(Analyzer analyzer)
void setStartStopKey(Analyzer analyzer)
void setKeyRangeEnd(THBaseKeyRange keyRange, byte[] rangeEndKey)
String getHBaseTableName(org.apache.hadoop.hive.metastore.api.Table tbl)
ArrayList< SlotDescriptor > getSlots()
List< ValueRange > keyRanges_
void toThrift(TPlanNode msg)
final TupleDescriptor desc_
void init(Analyzer analyzer)
void computeCosts(TQueryOptions queryOptions)
void computeMemLayout(Analyzer analyzer)
long capAtLimit(long cardinality)
static Configuration hbaseConf_
double computeSelectivity()
void setKeyRanges(List< ValueRange > keyRanges)
List< TScanRangeLocations > scanRanges_
byte[] convertToBytes(String rowKey, boolean nextKey)
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
ListMap< TNetworkAddress > getHostIndex()
HBaseScanNode(PlanNodeId id, TupleDescriptor desc)
static CompareFilter.CompareOp impalaOpToHBaseOp(BinaryPredicate.Operator impalaOp)