15 package com.cloudera.impala.catalog;
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.List;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.HColumnDescriptor;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.HRegionInfo;
32 import org.apache.hadoop.hbase.HRegionLocation;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.client.Result;
37 import org.apache.hadoop.hbase.client.ResultScanner;
38 import org.apache.hadoop.hbase.client.Scan;
39 import org.apache.hadoop.hbase.io.compress.Compression;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hive.hbase.HBaseSerDe;
42 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
43 import org.apache.hadoop.hive.metastore.api.FieldSchema;
44 import org.apache.hadoop.hive.metastore.api.MetaException;
45 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
46 import org.apache.hadoop.hive.serde2.SerDeException;
47 import org.apache.log4j.Logger;
49 import com.cloudera.impala.common.Pair;
50 import com.cloudera.impala.thrift.TCatalogObjectType;
51 import com.cloudera.impala.thrift.TColumn;
52 import com.cloudera.impala.thrift.THBaseTable;
53 import com.cloudera.impala.thrift.TResultSet;
54 import com.cloudera.impala.thrift.TResultSetMetadata;
55 import com.cloudera.impala.thrift.TTable;
56 import com.cloudera.impala.thrift.TTableDescriptor;
57 import com.cloudera.impala.thrift.TTableType;
58 import com.cloudera.impala.util.StatsHelper;
60 import com.google.common.base.Preconditions;
61 import com.google.common.collect.Lists;
81 private static final Logger
LOG = Logger.getLogger(HBaseTable.class);
102 "org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat";
106 "org.apache.hadoop.hive.hbase.HBaseSerDe";
110 "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
116 private final static Configuration
hbaseConf_ = HBaseConfiguration.create();
124 Db db, String
name, String owner) {
125 super(
id, msTbl, db, name, owner);
138 String columnsMappingSpec, List<FieldSchema> fieldSchemas,
139 List<String> columnFamilies, List<String> columnQualifiers,
140 List<Boolean> colIsBinaryEncoded)
throws SerDeException {
141 if (columnsMappingSpec == null) {
142 throw new SerDeException(
143 "Error: hbase.columns.mapping missing for this HBase table.");
146 if (columnsMappingSpec.equals(
"") ||
147 columnsMappingSpec.equals(HBaseSerDe.HBASE_KEY_COL)) {
148 throw new SerDeException(
"Error: hbase.columns.mapping specifies only "
149 +
"the HBase table row key. A valid Hive-HBase table must specify at "
150 +
"least one additional column.");
153 int rowKeyIndex = -1;
154 String[] columnSpecs = columnsMappingSpec.split(
",");
157 int fsStartIdxOffset = fieldSchemas.size() - columnSpecs.length;
158 if (fsStartIdxOffset != 0 && fsStartIdxOffset != 1) {
161 throw new SerDeException(String.format(
"Number of entries in " +
162 "'hbase.columns.mapping' does not match the number of columns in the " +
163 "table: %d != %d (counting the key if implicit)",
164 columnSpecs.length, fieldSchemas.size()));
167 for (
int i = 0; i < columnSpecs.length; ++i) {
168 String mappingSpec = columnSpecs[i];
169 String[] mapInfo = mappingSpec.split(
"#");
171 String colInfo = mapInfo[0].trim();
173 int idxFirst = colInfo.indexOf(
":");
174 int idxLast = colInfo.lastIndexOf(
":");
176 if (idxFirst < 0 || !(idxFirst == idxLast)) {
177 throw new SerDeException(
"Error: the HBase columns mapping contains a "
178 +
"badly formed column family, column qualifier specification.");
181 if (colInfo.equals(HBaseSerDe.HBASE_KEY_COL)) {
182 Preconditions.checkState(fsStartIdxOffset == 0);
184 columnFamilies.add(colInfo);
185 columnQualifiers.add(null);
187 String[] parts = colInfo.split(
":");
188 Preconditions.checkState(parts.length > 0 && parts.length <= 2);
189 columnFamilies.add(parts[0]);
190 if (parts.length == 2) {
191 columnQualifiers.add(parts[1]);
193 columnQualifiers.add(null);
198 FieldSchema fieldSchema = fieldSchemas.get(i + fsStartIdxOffset);
200 if (mapInfo.length == 1) {
202 colIsBinaryEncoded.add(
203 new Boolean(tableDefaultStorageIsBinary && supportsBinaryEncoding));
204 }
else if (mapInfo.length == 2) {
206 String storageOption = mapInfo[1];
208 if (!(storageOption.equals(
"-") ||
"string".startsWith(storageOption) ||
"binary"
209 .startsWith(storageOption))) {
210 throw new SerDeException(
"Error: A column storage specification is one of"
211 +
" the following: '-', a prefix of 'string', or a prefix of 'binary'. "
212 + storageOption +
" is not a valid storage option specification for "
213 + fieldSchema.getName());
216 boolean isBinaryEncoded =
false;
217 if (
"-".equals(storageOption)) {
218 isBinaryEncoded = tableDefaultStorageIsBinary;
219 }
else if (
"binary".startsWith(storageOption)) {
220 isBinaryEncoded =
true;
222 if (isBinaryEncoded && !supportsBinaryEncoding) {
226 LOG.warn(
"Column storage specification for column " + fieldSchema.getName()
227 +
" is binary" +
" but the column type " + fieldSchema.getType() +
228 " does not support binary encoding. Fallback to string format.");
229 isBinaryEncoded =
false;
231 colIsBinaryEncoded.add(isBinaryEncoded);
234 throw new SerDeException(
"Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING
235 +
" storage specification " + mappingSpec +
" is not valid for column: "
236 + fieldSchema.getName());
240 if (rowKeyIndex == -1) {
241 columnFamilies.add(0, HBaseSerDe.HBASE_KEY_COL);
242 columnQualifiers.add(0, null);
243 colIsBinaryEncoded.add(0,
253 || colType.isFloatingPointType();
266 public void load(
Table oldValue, HiveMetaStoreClient client,
273 Map<String, String> serdeParams =
275 String hbaseColumnsMapping = serdeParams.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
276 if (hbaseColumnsMapping == null) {
277 throw new MetaException(
"No hbase.columns.mapping defined in Serde.");
281 HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
282 boolean tableDefaultStorageIsBinary =
false;
283 if (hbaseTableDefaultStorageType != null &&
284 !hbaseTableDefaultStorageType.isEmpty()) {
285 if (hbaseTableDefaultStorageType.equalsIgnoreCase(
"binary")) {
286 tableDefaultStorageIsBinary =
true;
287 }
else if (!hbaseTableDefaultStorageType.equalsIgnoreCase(
"string")) {
288 throw new SerDeException(
"Error: " +
289 HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
290 " parameter must be specified as" +
291 " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
292 "' is not a valid specification for this table/serde property.");
298 List<String> hbaseColumnFamilies =
new ArrayList<String>();
299 List<String> hbaseColumnQualifiers =
new ArrayList<String>();
300 List<Boolean> hbaseColumnBinaryEncodings =
new ArrayList<Boolean>();
302 hbaseColumnFamilies, hbaseColumnQualifiers, hbaseColumnBinaryEncodings);
303 Preconditions.checkState(
304 hbaseColumnFamilies.size() == hbaseColumnQualifiers.size());
305 Preconditions.checkState(fieldSchemas.size() == hbaseColumnFamilies.size());
309 List<HBaseColumn> tmpCols = Lists.newArrayList();
313 for (
int i = 0; i < fieldSchemas.size(); ++i) {
314 FieldSchema s = fieldSchemas.get(i);
318 }
catch (TableLoadingException e) {
323 hbaseColumnQualifiers.get(i), hbaseColumnBinaryEncodings.get(i),
324 t, s.getComment(), -1);
332 Preconditions.checkState(keyCol != null);
338 Collections.sort(tmpCols);
341 keyCol.setPosition(0);
344 for (
int i = 0; i < tmpCols.size(); ++i) {
346 col.setPosition(i + 1);
357 }
catch (Exception e) {
358 throw new TableLoadingException(
"Failed to load metadata for HBase table: " +
365 super.loadFromThrift(table);
370 }
catch (Exception e) {
372 "thrift table: " +
name_, e);
381 String tableName = tbl.getParameters().
get(HBaseSerDe.HBASE_TABLE_NAME);
382 if (tableName == null) {
383 tableName = tbl.getSd().getSerdeInfo().getParameters().
get(
384 HBaseSerDe.HBASE_TABLE_NAME);
386 if (tableName == null) {
387 tableName = tbl.getDbName() +
"." + tbl.getTableName();
389 tableName = tableName.substring(DEFAULT_PREFIX.length());
400 boolean isCompressed)
throws IOException {
401 HRegionInfo info = location.getRegionInfo();
403 Scan s =
new Scan(info.getStartKey());
407 s.setMaxVersions(Short.MAX_VALUE);
410 s.setCacheBlocks(
false);
413 ResultScanner rs = hTable_.getScanner(s);
415 long currentRowSize = 0;
416 long currentRowCount = 0;
421 Result r = rs.next();
431 Cell[] cells = r.rawCells();
432 for (Cell c : cells) {
433 if (c instanceof KeyValue) {
434 currentRowSize += KeyValue.getKeyValueDataStructureSize(c.getRowLength(),
435 c.getFamilyLength(), c.getQualifierLength(), c.getValueLength(),
438 throw new IllegalStateException(
"Celltype " + c.getClass().getName() +
448 if (currentRowCount == 0)
449 return new Pair<Long, Long>(0L, 0L);
453 double bytesPerRow = currentRowSize / (double) currentRowCount;
455 long estimatedRowCount =
456 (long) ((isCompressed ? 2 : 1) * (currentHdfsSize / bytesPerRow));
457 return new Pair<Long, Long>(estimatedRowCount, (long) bytesPerRow);
490 Preconditions.checkNotNull(startRowKey);
491 Preconditions.checkNotNull(endRowKey);
493 boolean isCompressed =
false;
505 isCompressed |= desc.getCompression() != Compression.Algorithm.NONE;
509 List<HRegionLocation> locations =
511 Collections.shuffle(locations);
515 StatsHelper<Long> statsCount =
new StatsHelper<Long>();
516 StatsHelper<Long> statsSize =
new StatsHelper<Long>();
522 statsSize.count() < locations.size()) {
524 locations.get((
int) statsCount.count()), isCompressed);
525 statsCount.addSample(tmp.first);
526 statsSize.addSample(tmp.second);
529 rowCount = (long) (
getHdfsSize(null) / statsSize.mean());
530 rowSize = (long) statsSize.mean();
531 }
catch (IOException ioe) {
535 LOG.error(
"Error computing HBase row count estimate", ioe);
536 return new Pair<Long, Long>(-1l, -1l);
538 return new Pair<Long, Long>(rowCount, rowSize);
546 Path tableDir = HTableDescriptor.getTableDir(
548 FileSystem fs = tableDir.getFileSystem(
hbaseConf_);
550 Path regionDir = tableDir.suffix(
"/" + info.getEncodedName());
551 return fs.getContentSummary(regionDir).getLength();
553 return fs.getContentSummary(tableDir).getLength();
562 public static Path
getRootDir(
final Configuration c)
throws IOException {
563 Path p =
new Path(c.get(HConstants.HBASE_DIR));
564 FileSystem fs = p.getFileSystem(c);
565 return p.makeQualified(fs);
578 TTableDescriptor tableDescriptor =
579 new TTableDescriptor(
id_.asInt(), TTableType.HBASE_TABLE,
getColumns().size(),
583 return tableDescriptor;
606 return TCatalogObjectType.TABLE;
611 TTable table = super.toThrift();
612 table.setTable_type(TTableType.HBASE_TABLE);
618 THBaseTable tHbaseTable =
new THBaseTable();
622 tHbaseTable.addToFamilies(hbaseCol.getColumnFamily());
624 tHbaseTable.addToQualifiers(hbaseCol.getColumnQualifier());
626 tHbaseTable.addToQualifiers(
"");
628 tHbaseTable.addToBinary_encoded(hbaseCol.isBinaryEncoded());
651 final byte[] startKey,
final byte[] endKey)
throws IOException {
652 final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
653 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
654 throw new IllegalArgumentException(
"Invalid range: " +
655 Bytes.toStringBinary(startKey) +
" > " + Bytes.toStringBinary(endKey));
657 final List<HRegionLocation> regionList =
new ArrayList<HRegionLocation>();
658 byte[] currentKey = startKey;
660 synchronized (hbaseTbl) {
663 HRegionLocation regionLocation = hbaseTbl.getRegionLocation(currentKey,
true);
664 regionList.add(regionLocation);
665 currentKey = regionLocation.getRegionInfo().getEndKey();
666 }
while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
667 (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
686 TResultSet result =
new TResultSet();
687 TResultSetMetadata resultSchema =
new TResultSetMetadata();
688 result.setSchema(resultSchema);
689 resultSchema.addToColumns(
691 resultSchema.addToColumns(
new TColumn(
"Start RowKey",
700 long totalNumRows = 0;
701 long totalHdfsSize = 0;
702 List<HRegionLocation> regions = HBaseTable.getRegionsInRange(
hTable_,
703 HConstants.EMPTY_END_ROW, HConstants.EMPTY_START_ROW);
704 for (HRegionLocation region : regions) {
706 HRegionInfo regionInfo = region.getRegionInfo();
709 long numRows = estRowStats.first.longValue();
711 totalNumRows += numRows;
712 totalHdfsSize += hdfsSize;
715 rowBuilder.add(String.valueOf(region.getHostname()))
716 .add(Bytes.toString(regionInfo.getStartKey())).add(numRows)
718 result.addToRows(rowBuilder.get());
722 if (regions.size() > 1) {
724 rowBuilder.add(
"Total").add(
"").add(totalNumRows).addBytes(totalHdfsSize);
725 result.addToRows(rowBuilder.get());
727 }
catch (IOException e) {
728 throw new RuntimeException(e);
740 org.apache.hadoop.hive.metastore.api.Table msTbl) {
741 if (msTbl.getParameters() != null &&
745 StorageDescriptor sd = msTbl.getSd();
746 if (sd == null)
return false;
749 }
else if (sd.getSerdeInfo() != null &&
750 sd.getSerdeInfo().getSerializationLib() != null &&
THBaseTable getTHBaseTable()
List< String > getColumnNames()
static long getRowCount(Map< String, String > parameters)
void load(Table oldValue, HiveMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl)
boolean supportsBinaryEncoding(FieldSchema fs)
static final ScalarType BIGINT
void addColumn(Column col)
static final String HBASE_STORAGE_HANDLER
static final String DEFAULT_PREFIX
static final ScalarType STRING
static boolean isHBaseTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
static final Configuration hbaseConf_
HColumnDescriptor[] columnFamilies_
Type parseColumnType(FieldSchema fs)
static Configuration getHBaseConf()
ArrayList< Column > getColumns()
void loadAllColumnStats(HiveMetaStoreClient client)
String getStorageHandlerClassName()
static final int MIN_NUM_REGIONS_TO_CHECK
void parseColumnMapping(boolean tableDefaultStorageIsBinary, String columnsMappingSpec, List< FieldSchema > fieldSchemas, List< String > columnFamilies, List< String > columnQualifiers, List< Boolean > colIsBinaryEncoded)
static final String ROW_KEY_COLUMN_FAMILY
Pair< Long, Long > getEstimatedRowStatsForRegion(HRegionLocation location, boolean isCompressed)
String getHBaseTableName(org.apache.hadoop.hive.metastore.api.Table tbl)
String getHBaseTableName()
synchronized Pair< Long, Long > getEstimatedRowStats(byte[] startRowKey, byte[] endRowKey)
static final int ROW_COUNT_ESTIMATE_BATCH_SIZE
static final String HBASE_SERIALIZATION_LIB
long getHdfsSize(HRegionInfo info)
void loadFromThrift(TTable table)
static List< HRegionLocation > getRegionsInRange(HTable hbaseTbl, final byte[] startKey, final byte[] endKey)
static Path getRootDir(final Configuration c)
org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable()
static final double DELTA_FROM_AVERAGE
void toThrift(TColumnType container)
TResultSet getTableStats()
String getColumnQualifier()
Table(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner)
ArrayList< Column > getColumnsInHiveOrder()
HBaseTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTbl, Db db, String name, String owner)
static final String HBASE_INPUT_FORMAT
TCatalogObjectType getCatalogObjectType()
static final ScalarType INVALID
TTableDescriptor toThriftDescriptor(Set< Long > referencedPartitions)