Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HdfsTable.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.catalog;
16 
17 import static com.cloudera.impala.thrift.ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID;
18 
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.net.URI;
22 import java.net.URL;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.TreeMap;
32 
33 import org.apache.commons.io.IOUtils;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.BlockLocation;
36 import org.apache.hadoop.fs.BlockStorageLocation;
37 import org.apache.hadoop.fs.FileStatus;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.fs.VolumeId;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hdfs.DFSConfigKeys;
43 import org.apache.hadoop.hdfs.DistributedFileSystem;
44 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
45 import org.apache.hadoop.hive.metastore.api.FieldSchema;
46 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
47 import org.apache.hadoop.hive.serde.serdeConstants;
48 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
49 import org.apache.hadoop.util.StringUtils;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 
63 import com.cloudera.impala.thrift.ImpalaInternalServiceConstants;
64 import com.cloudera.impala.thrift.TAccessLevel;
65 import com.cloudera.impala.thrift.TCatalogObjectType;
66 import com.cloudera.impala.thrift.TColumn;
67 import com.cloudera.impala.thrift.THdfsFileBlock;
68 import com.cloudera.impala.thrift.THdfsPartition;
69 import com.cloudera.impala.thrift.THdfsTable;
70 import com.cloudera.impala.thrift.TNetworkAddress;
71 import com.cloudera.impala.thrift.TPartitionKeyValue;
72 import com.cloudera.impala.thrift.TResultRow;
73 import com.cloudera.impala.thrift.TResultSet;
74 import com.cloudera.impala.thrift.TResultSetMetadata;
75 import com.cloudera.impala.thrift.TTable;
76 import com.cloudera.impala.thrift.TTableDescriptor;
77 import com.cloudera.impala.thrift.TTableType;
81 import com.cloudera.impala.util.ListMap;
85 import com.google.common.base.Joiner;
86 import com.google.common.base.Preconditions;
87 import com.google.common.collect.Lists;
88 import com.google.common.collect.Maps;
89 import com.google.common.collect.Sets;
90 
104 public class HdfsTable extends Table {
105  // hive's default value for table property 'serialization.null.format'
106  private static final String DEFAULT_NULL_COLUMN_VALUE = "\\N";
107 
108  // Number of times to retry fetching the partitions from the HMS should an error occur.
109  private final static int NUM_PARTITION_FETCH_RETRIES = 5;
110 
111  // An invalid network address, which will always be treated as remote.
112  private final static TNetworkAddress REMOTE_NETWORK_ADDRESS =
113  new TNetworkAddress("remote*addr", 0);
114 
115  // Minimum block size in bytes allowed for synthetic file blocks (other than the last
116  // block, which may be shorter).
117  private final static long MIN_SYNTHETIC_BLOCK_SIZE = 1024 * 1024;
118 
119  // string to indicate NULL. set in load() from table properties
120  private String nullColumnValue_;
121 
122  // hive uses this string for NULL partition keys. Set in load().
123  private String nullPartitionKeyValue_;
124 
125  // Avro schema of this table if this is an Avro table, otherwise null. Set in load().
126  private String avroSchema_ = null;
127 
128  // True if this table's metadata is marked as cached. Does not necessarily mean the
129  // data is cached or that all/any partitions are cached.
130  private boolean isMarkedCached_ = false;
131 
132  private static boolean hasLoggedDiskIdFormatWarning_ = false;
133 
134  private final List<HdfsPartition> partitions_; // these are only non-empty partitions
135 
136  // Array of sorted maps storing the association between partition values and
137  // partition ids. There is one sorted map per partition key.
138  private final ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> partitionValuesMap_ =
139  Lists.newArrayList();
140 
141  // Array of partition id sets that correspond to partitions with null values
142  // in the partition keys; one set per partition key.
143  private final ArrayList<HashSet<Long>> nullPartitionIds_ = Lists.newArrayList();
144 
145  // Map of partition ids to HdfsPartitions. Used for speeding up partition
146  // pruning.
147  private final HashMap<Long, HdfsPartition> partitionMap_ = Maps.newHashMap();
148 
149  // Store all the partition ids of an HdfsTable.
150  private final HashSet<Long> partitionIds_ = Sets.newHashSet();
151 
152  // Flag to indicate if the HdfsTable has the partition metadata populated.
153  private boolean hasPartitionMd_ = false;
154 
155  // Bi-directional map between an integer index and a unique datanode
156  // TNetworkAddresses, each of which contains blocks of 1 or more
157  // files in this table. The network addresses are stored using IP
158  // address as the host name. Each FileBlock specifies a list of
159  // indices within this hostIndex_ to specify which nodes contain
160  // replicas of the block.
161  private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<TNetworkAddress>();
162 
163  // Map of parent directory (partition location) to list of files (FileDescriptors)
164  // under that directory. Used to look up/index all files in the table.
165  private final Map<String, List<FileDescriptor>> fileDescMap_ = Maps.newHashMap();
166 
167  // Total number of Hdfs files in this table. Set in load().
168  private long numHdfsFiles_;
169 
170  // Sum of sizes of all Hdfs files in this table. Set in load().
171  private long totalHdfsBytes_;
172 
173  // True iff the table's partitions are located on more than one filesystem.
174  private boolean multipleFileSystems_ = false;
175 
176  // Base Hdfs directory where files of this table are stored.
177  // For unpartitioned tables it is simply the path where all files live.
178  // For partitioned tables it is the root directory
179  // under which partition dirs are placed.
180  protected String hdfsBaseDir_;
181 
182  private final static Logger LOG = LoggerFactory.getLogger(HdfsTable.class);
183 
184  // Caching this configuration object makes calls to getFileSystem much quicker
185  // (saves ~50ms on a standard plan)
186  // TODO(henry): confirm that this is thread safe - cursory inspection of the class
187  // and its usage in getFileSystem suggests it should be.
188  private static final Configuration CONF = new Configuration();
189 
190  private static final boolean SUPPORTS_VOLUME_ID;
191 
192  // Wrapper around a FileSystem object to hash based on the underlying FileSystem's
193  // scheme and authority.
194  private static class FsKey {
195  FileSystem filesystem;
196 
197  public FsKey(FileSystem fs) { filesystem = fs; }
198 
199  @Override
200  public int hashCode() { return filesystem.getUri().hashCode(); }
201 
202  @Override
203  public boolean equals(Object o) {
204  if (o == this) return true;
205  if (o != null && o instanceof FsKey) {
206  URI uri = filesystem.getUri();
207  URI otherUri = ((FsKey)o).filesystem.getUri();
208  return uri.equals(otherUri);
209  }
210  return false;
211  }
212 
213  @Override
214  public String toString() { return filesystem.getUri().toString(); }
215  }
216 
217  // Keeps track of newly added THdfsFileBlock metadata and its corresponding
218  // BlockLocation. For each i, blocks.get(i) corresponds to locations.get(i). Once
219  // all the new file blocks are collected, the disk volume IDs are retrieved in one
220  // batched DFS call.
221  private static class FileBlocksInfo {
222  final List<THdfsFileBlock> blocks = Lists.newArrayList();
223  final List<BlockLocation> locations = Lists.newArrayList();
224 
225  public void addBlocks(List<THdfsFileBlock> b, List<BlockLocation> l) {
226  Preconditions.checkState(b.size() == l.size());
227  blocks.addAll(b);
228  locations.addAll(l);
229  }
230  }
231 
232  static {
234  CONF.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
235  DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
236  }
237 
243  private static int getDiskId(VolumeId hdfsVolumeId) {
244  // Initialize the diskId as -1 to indicate it is unknown
245  int diskId = -1;
246 
247  if (hdfsVolumeId != null) {
248  // TODO: this is a hack and we'll have to address this by getting the
249  // public API. Also, we need to be very mindful of this when we change
250  // the version of HDFS.
251  String volumeIdString = hdfsVolumeId.toString();
252  // This is the hacky part. The toString is currently the underlying id
253  // encoded as hex.
254  byte[] volumeIdBytes = StringUtils.hexStringToByte(volumeIdString);
255  if (volumeIdBytes != null && volumeIdBytes.length == 4) {
256  diskId = Bytes.toInt(volumeIdBytes);
257  } else if (!hasLoggedDiskIdFormatWarning_) {
258  LOG.warn("wrong disk id format: " + volumeIdString);
260  }
261  }
262  return diskId;
263  }
264 
265  public Map<String, List<FileDescriptor>> getFileDescMap() { return fileDescMap_; }
266 
267  public boolean spansMultipleFileSystems() { return multipleFileSystems_; }
268 
275  private void loadBlockMetadata(FileSystem fs, FileStatus file, FileDescriptor fd,
276  HdfsFileFormat fileFormat, Map<FsKey, FileBlocksInfo> perFsFileBlocks) {
277  Preconditions.checkNotNull(fd);
278  Preconditions.checkNotNull(perFsFileBlocks);
279  Preconditions.checkArgument(!file.isDirectory());
280  LOG.debug("load block md for " + name_ + " file " + fd.getFileName());
281 
283  synthesizeBlockMetadata(fs, fd, fileFormat);
284  return;
285  }
286  try {
287  BlockLocation[] locations = fs.getFileBlockLocations(file, 0, file.getLen());
288  Preconditions.checkNotNull(locations);
289 
290  // Loop over all blocks in the file.
291  for (BlockLocation loc: locations) {
292  Preconditions.checkNotNull(loc);
293  // Get the location of all block replicas in ip:port format.
294  String[] blockHostPorts = loc.getNames();
295  // Get the hostnames for all block replicas. Used to resolve which hosts
296  // contain cached data. The results are returned in the same order as
297  // block.getNames() so it allows us to match a host specified as ip:port to
298  // corresponding hostname using the same array index.
299  String[] blockHostNames = loc.getHosts();
300  Preconditions.checkState(blockHostNames.length == blockHostPorts.length);
301  // Get the hostnames that contain cached replicas of this block.
302  Set<String> cachedHosts =
303  Sets.newHashSet(Arrays.asList(loc.getCachedHosts()));
304  Preconditions.checkState(cachedHosts.size() <= blockHostNames.length);
305 
306  // Now enumerate all replicas of the block, adding any unknown hosts
307  // to hostMap_/hostList_. The host ID (index in to the hostList_) for each
308  // replica is stored in replicaHostIdxs.
309  List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
310  blockHostPorts.length);
311  for (int i = 0; i < blockHostPorts.length; ++i) {
312  TNetworkAddress networkAddress = BlockReplica.parseLocation(blockHostPorts[i]);
313  Preconditions.checkState(networkAddress != null);
314  replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
315  cachedHosts.contains(blockHostNames[i])));
316  }
317  fd.addFileBlock(new FileBlock(loc.getOffset(), loc.getLength(), replicas));
318  }
319  // Remember the THdfsFileBlocks and corresponding BlockLocations. Once all the
320  // blocks are collected, the disk IDs will be queried in one batch per filesystem.
321  addPerFsFileBlocks(perFsFileBlocks, fs, fd.getFileBlocks(),
322  Arrays.asList(locations));
323  } catch (IOException e) {
324  throw new RuntimeException("couldn't determine block locations for path '" +
325  file.getPath() + "':\n" + e.getMessage(), e);
326  }
327  }
328 
335  private void synthesizeBlockMetadata(FileSystem fs, FileDescriptor fd,
336  HdfsFileFormat fileFormat) {
337  long start = 0;
338  long remaining = fd.getFileLength();
339  // Workaround HADOOP-11584 by using the filesystem default block size rather than
340  // the block size from the FileStatus.
341  // TODO: after HADOOP-11584 is resolved, get the block size from the FileStatus.
342  long blockSize = fs.getDefaultBlockSize();
343  if (blockSize < MIN_SYNTHETIC_BLOCK_SIZE) blockSize = MIN_SYNTHETIC_BLOCK_SIZE;
344  if (!fileFormat.isSplittable(HdfsCompression.fromFileName(fd.getFileName()))) {
345  blockSize = remaining;
346  }
347  while (remaining > 0) {
348  long len = Math.min(remaining, blockSize);
349  List<BlockReplica> replicas = Lists.newArrayList(
350  new BlockReplica(hostIndex_.getIndex(REMOTE_NETWORK_ADDRESS), false));
351  fd.addFileBlock(new FileBlock(start, len, replicas));
352  remaining -= len;
353  start += len;
354  }
355  }
356 
363  private void loadDiskIds(Map<FsKey, FileBlocksInfo> perFsFileBlocks) {
364  if (!SUPPORTS_VOLUME_ID) return;
365  // Loop over each filesystem. If the filesystem is DFS, retrieve the volume IDs
366  // for all the blocks.
367  for (FsKey fsKey: perFsFileBlocks.keySet()) {
368  FileSystem fs = fsKey.filesystem;
369  // Only DistributedFileSystem has getFileBlockStorageLocations(). It's not even
370  // part of the FileSystem interface, so we'll need to downcast.
371  if (!(fs instanceof DistributedFileSystem)) continue;
372 
373  LOG.trace("Loading disk ids for: " + getFullName() + ". nodes: " + getNumNodes() +
374  ". filesystem: " + fsKey);
375  DistributedFileSystem dfs = (DistributedFileSystem)fs;
376  FileBlocksInfo blockLists = perFsFileBlocks.get(fsKey);
377  Preconditions.checkNotNull(blockLists);
378  BlockStorageLocation[] storageLocs = null;
379  try {
380  // Get the BlockStorageLocations for all the blocks
381  storageLocs = dfs.getFileBlockStorageLocations(blockLists.locations);
382  } catch (IOException e) {
383  LOG.error("Couldn't determine block storage locations for filesystem " +
384  fs + ":\n" + e.getMessage());
385  continue;
386  }
387  if (storageLocs == null || storageLocs.length == 0) {
388  LOG.warn("Attempted to get block locations for filesystem " + fs +
389  " but the call returned no results");
390  continue;
391  }
392  if (storageLocs.length != blockLists.locations.size()) {
393  // Block locations and storage locations didn't match up.
394  LOG.error("Number of block storage locations not equal to number of blocks: "
395  + "#storage locations=" + Long.toString(storageLocs.length)
396  + " #blocks=" + Long.toString(blockLists.locations.size()));
397  continue;
398  }
399  long unknownDiskIdCount = 0;
400  // Attach volume IDs given by the storage location to the corresponding
401  // THdfsFileBlocks.
402  for (int locIdx = 0; locIdx < storageLocs.length; ++locIdx) {
403  VolumeId[] volumeIds = storageLocs[locIdx].getVolumeIds();
404  THdfsFileBlock block = blockLists.blocks.get(locIdx);
405  // Convert opaque VolumeId to 0 based ids.
406  // TODO: the diskId should be eventually retrievable from Hdfs when the
407  // community agrees this API is useful.
408  int[] diskIds = new int[volumeIds.length];
409  for (int i = 0; i < volumeIds.length; ++i) {
410  diskIds[i] = getDiskId(volumeIds[i]);
411  if (diskIds[i] < 0) ++unknownDiskIdCount;
412  }
413  FileBlock.setDiskIds(diskIds, block);
414  }
415  if (unknownDiskIdCount > 0) {
416  LOG.warn("Unknown disk id count for filesystem " + fs + ":" + unknownDiskIdCount);
417  }
418  }
419  }
420 
421  public HdfsTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTbl,
422  Db db, String name, String owner) {
423  super(id, msTbl, db, name, owner);
424  this.partitions_ = Lists.newArrayList();
425  }
426 
427  @Override
428  public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; }
429  public List<HdfsPartition> getPartitions() {
430  return new ArrayList<HdfsPartition>(partitions_);
431  }
432  public boolean isMarkedCached() { return isMarkedCached_; }
433 
434  public HashMap<Long, HdfsPartition> getPartitionMap() { return partitionMap_; }
435  public HashSet<Long> getNullPartitionIds(int i) { return nullPartitionIds_.get(i); }
436  public HashSet<Long> getPartitionIds() { return partitionIds_; }
437  public TreeMap<LiteralExpr, HashSet<Long>> getPartitionValueMap(int i) {
438  return partitionValuesMap_.get(i);
439  }
440 
446  public String getNullColumnValue() { return nullColumnValue_; }
447 
448  /*
449  * Returns the storage location (HDFS path) of this table.
450  */
451  public String getLocation() { return super.getMetaStoreTable().getSd().getLocation(); }
452 
453  public List<FieldSchema> getFieldSchemas() { return fields_; }
454  public List<FieldSchema> getNonPartitionFieldSchemas() {
455  return fields_.subList(getNumClusteringCols(), fields_.size());
456  }
457 
458  // True if Impala has HDFS write permissions on the hdfsBaseDir (for an unpartitioned
459  // table) or if Impala has write permissions on all partition directories (for
460  // a partitioned table).
461  public boolean hasWriteAccess() {
462  return TAccessLevelUtil.impliesWriteAccess(accessLevel_);
463  }
464 
471  if (getMetaStoreTable() == null) return null;
472 
473  if (getMetaStoreTable().getPartitionKeysSize() == 0) {
475  return hdfsBaseDir_;
476  }
477  } else {
478  for (HdfsPartition partition: partitions_) {
479  if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
480  return partition.getLocation();
481  }
482  }
483  }
484  return null;
485  }
486 
491  public HdfsPartition getPartition(List<PartitionKeyValue> partitionSpec) {
492  List<TPartitionKeyValue> partitionKeyValues = Lists.newArrayList();
493  for (PartitionKeyValue kv: partitionSpec) {
494  String value = PartitionKeyValue.getPartitionKeyValueString(
495  kv.getLiteralValue(), getNullPartitionKeyValue());
496  partitionKeyValues.add(new TPartitionKeyValue(kv.getColName(), value));
497  }
498  return getPartitionFromThriftPartitionSpec(partitionKeyValues);
499  }
500 
506  List<TPartitionKeyValue> partitionSpec) {
507  // First, build a list of the partition values to search for in the same order they
508  // are defined in the table.
509  List<String> targetValues = Lists.newArrayList();
510  Set<String> keys = Sets.newHashSet();
511  for (FieldSchema fs: getMetaStoreTable().getPartitionKeys()) {
512  for (TPartitionKeyValue kv: partitionSpec) {
513  if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
514  targetValues.add(kv.getValue().toLowerCase());
515  // Same key was specified twice
516  if (!keys.add(kv.getName().toLowerCase())) {
517  return null;
518  }
519  }
520  }
521  }
522 
523  // Make sure the number of values match up and that some values were found.
524  if (targetValues.size() == 0 ||
525  (targetValues.size() != getMetaStoreTable().getPartitionKeysSize())) {
526  return null;
527  }
528 
529  // Now search through all the partitions and check if their partition key values match
530  // the values being searched for.
531  for (HdfsPartition partition: getPartitions()) {
532  if (partition.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
533  continue;
534  }
535  List<LiteralExpr> partitionValues = partition.getPartitionValues();
536  Preconditions.checkState(partitionValues.size() == targetValues.size());
537  boolean matchFound = true;
538  for (int i = 0; i < targetValues.size(); ++i) {
539  String value;
540  if (partitionValues.get(i) instanceof NullLiteral) {
541  value = getNullPartitionKeyValue();
542  } else {
543  value = partitionValues.get(i).getStringValue();
544  Preconditions.checkNotNull(value);
545  // See IMPALA-252: we deliberately map empty strings on to
546  // NULL when they're in partition columns. This is for
547  // backwards compatibility with Hive, and is clearly broken.
548  if (value.isEmpty()) value = getNullPartitionKeyValue();
549  }
550  if (!targetValues.get(i).equals(value.toLowerCase())) {
551  matchFound = false;
552  break;
553  }
554  }
555  if (matchFound) {
556  return partition;
557  }
558  }
559  return null;
560  }
561 
567  private void loadColumns(List<FieldSchema> fieldSchemas, HiveMetaStoreClient client)
568  throws TableLoadingException {
569  int pos = 0;
570  for (FieldSchema s: fieldSchemas) {
571  Type type = parseColumnType(s);
572  // Check if we support partitioning on columns of such a type.
573  if (pos < numClusteringCols_ && !type.supportsTablePartitioning()) {
574  throw new TableLoadingException(
575  String.format("Failed to load metadata for table '%s' because of " +
576  "unsupported partition-column type '%s' in partition column '%s'",
577  getFullName(), type.toString(), s.getName()));
578  }
579 
580  Column col = new Column(s.getName(), type, s.getComment(), pos);
581  addColumn(col);
582  ++pos;
583  }
584  fields_ = fieldSchemas == null ? new ArrayList<FieldSchema>() : fieldSchemas;
585  loadAllColumnStats(client);
586  }
587 
591  private void populatePartitionMd() {
592  if (hasPartitionMd_) return;
593  for (HdfsPartition partition: partitions_) {
594  updatePartitionMdAndColStats(partition);
595  }
596  hasPartitionMd_ = true;
597  }
598 
602  private void resetPartitionMd() {
603  partitionIds_.clear();
604  partitionMap_.clear();
605  partitionValuesMap_.clear();
606  nullPartitionIds_.clear();
607  // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
608  for (int i = 0; i < numClusteringCols_; ++i) {
609  getColumns().get(i).getStats().setNumNulls(0);
610  getColumns().get(i).getStats().setNumDistinctValues(0);
611  partitionValuesMap_.add(Maps.<LiteralExpr, HashSet<Long>>newTreeMap());
612  nullPartitionIds_.add(Sets.<Long>newHashSet());
613  }
614  hasPartitionMd_ = false;
615  }
616 
629  private void loadPartitions(
630  List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions,
631  org.apache.hadoop.hive.metastore.api.Table msTbl,
632  Map<String, List<FileDescriptor>> oldFileDescMap) throws IOException,
635  partitions_.clear();
636  hdfsBaseDir_ = msTbl.getSd().getLocation();
637 
638  // Map of filesystem to the file blocks for new/modified FileDescriptors. Blocks in
639  // this map will have their disk volume IDs information (re)loaded. This is used to
640  // speed up the incremental refresh of a table's metadata by skipping unmodified,
641  // previously loaded blocks.
642  Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap();
643 
644  // INSERT statements need to refer to this if they try to write to new partitions
645  // Scans don't refer to this because by definition all partitions they refer to
646  // exist.
647  addDefaultPartition(msTbl.getSd());
648 
649  // We silently ignore cache directives that no longer exist in HDFS, and remove
650  // non-existing cache directives from the parameters.
651  isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
652 
653  if (msTbl.getPartitionKeysSize() == 0) {
654  Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty());
655  // This table has no partition key, which means it has no declared partitions.
656  // We model partitions slightly differently to Hive - every file must exist in a
657  // partition, so add a single partition with no keys which will get all the
658  // files in the table's root directory.
659  HdfsPartition part = createPartition(msTbl.getSd(), null, oldFileDescMap,
660  blocksToLoad);
661  addPartition(part);
662  if (isMarkedCached_) part.markCached();
663  Path location = new Path(hdfsBaseDir_);
664  FileSystem fs = location.getFileSystem(CONF);
665  if (fs.exists(location)) {
666  accessLevel_ = getAvailableAccessLevel(fs, location);
667  }
668  } else {
669  for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
670  HdfsPartition partition = createPartition(msPartition.getSd(), msPartition,
671  oldFileDescMap, blocksToLoad);
672  addPartition(partition);
673  // If the partition is null, its HDFS path does not exist, and it was not added to
674  // this table's partition list. Skip the partition.
675  if (partition == null) continue;
676  if (msPartition.getParameters() != null); {
677  partition.setNumRows(getRowCount(msPartition.getParameters()));
678  }
680  // TODO: READ_ONLY isn't exactly correct because the it's possible the
681  // partition does not have READ permissions either. When we start checking
682  // whether we can READ from a table, this should be updated to set the
683  // table's access level to the "lowest" effective level across all
684  // partitions. That is, if one partition has READ_ONLY and another has
685  // WRITE_ONLY the table's access level should be NONE.
686  accessLevel_ = TAccessLevel.READ_ONLY;
687  }
688  }
689  }
690  loadDiskIds(blocksToLoad);
691  }
692 
699  private TAccessLevel getAvailableAccessLevel(FileSystem fs, Path location)
700  throws IOException {
701  FsPermissionChecker permissionChecker = FsPermissionChecker.getInstance();
702  while (location != null) {
703  if (fs.exists(location)) {
704  FsPermissionChecker.Permissions perms =
705  permissionChecker.getPermissions(fs, location);
706  if (perms.canReadAndWrite()) {
707  return TAccessLevel.READ_WRITE;
708  } else if (perms.canRead()) {
709  return TAccessLevel.READ_ONLY;
710  } else if (perms.canWrite()) {
711  return TAccessLevel.WRITE_ONLY;
712  }
713  return TAccessLevel.NONE;
714  }
715  location = location.getParent();
716  }
717  // Should never get here.
718  Preconditions.checkNotNull(location, "Error: no path ancestor exists");
719  return TAccessLevel.NONE;
720  }
721 
735  public HdfsPartition createPartition(StorageDescriptor storageDescriptor,
736  org.apache.hadoop.hive.metastore.api.Partition msPartition)
737  throws CatalogException {
738  Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap();
739  HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition,
740  fileDescMap_, blocksToLoad);
741  loadDiskIds(blocksToLoad);
742  return hdfsPartition;
743  }
744 
764  private HdfsPartition createPartition(StorageDescriptor storageDescriptor,
765  org.apache.hadoop.hive.metastore.api.Partition msPartition,
766  Map<String, List<FileDescriptor>> oldFileDescMap,
767  Map<FsKey, FileBlocksInfo> perFsFileBlocks)
768  throws CatalogException {
769  HdfsStorageDescriptor fileFormatDescriptor =
770  HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
771  Path partDirPath = new Path(storageDescriptor.getLocation());
772  List<FileDescriptor> fileDescriptors = Lists.newArrayList();
773  // If the partition is marked as cached, the block location metadata must be
774  // reloaded, even if the file times have not changed.
776  List<LiteralExpr> keyValues = Lists.newArrayList();
777  if (msPartition != null) {
778  isMarkedCached = HdfsCachingUtil.validateCacheParams(msPartition.getParameters());
779  // Load key values
780  for (String partitionKey: msPartition.getValues()) {
781  Type type = getColumns().get(keyValues.size()).getType();
782  // Deal with Hive's special NULL partition key.
783  if (partitionKey.equals(nullPartitionKeyValue_)) {
784  keyValues.add(NullLiteral.create(type));
785  } else {
786  try {
787  keyValues.add(LiteralExpr.create(partitionKey, type));
788  } catch (Exception ex) {
789  LOG.warn("Failed to create literal expression of type: " + type, ex);
790  throw new CatalogException("Invalid partition key value of type: " + type,
791  ex);
792  }
793  }
794  }
795  try {
796  Expr.analyze(keyValues, null);
797  } catch (AnalysisException e) {
798  // should never happen
799  throw new IllegalStateException(e);
800  }
801  }
802  try {
803  // Each partition could reside on a different filesystem.
804  FileSystem fs = partDirPath.getFileSystem(CONF);
805  multipleFileSystems_ = multipleFileSystems_ ||
806  !FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs);
807  if (fs.exists(partDirPath)) {
808  // FileSystem does not have an API that takes in a timestamp and returns a list
809  // of files that has been added/changed since. Therefore, we are calling
810  // fs.listStatus() to list all the files.
811  for (FileStatus fileStatus: fs.listStatus(partDirPath)) {
812  String fileName = fileStatus.getPath().getName().toString();
813  if (fileStatus.isDirectory() || FileSystemUtil.isHiddenFile(fileName) ||
815  // Ignore directory, hidden file starting with . or _, and LZO index files
816  // If a directory is erroneously created as a subdirectory of a partition dir
817  // we should ignore it and move on. Hive will not recurse into directories.
818  // Skip index files, these are read by the LZO scanner directly.
819  continue;
820  }
821 
822  String partitionDir = fileStatus.getPath().getParent().toString();
823  FileDescriptor fd = null;
824  // Search for a FileDescriptor with the same partition dir and file name. If one
825  // is found, it will be chosen as a candidate to reuse.
826  if (oldFileDescMap != null && oldFileDescMap.get(partitionDir) != null) {
827  for (FileDescriptor oldFileDesc: oldFileDescMap.get(partitionDir)) {
828  if (oldFileDesc.getFileName().equals(fileName)) {
829  fd = oldFileDesc;
830  break;
831  }
832  }
833  }
834 
835  // Check if this FileDescriptor has been modified since last loading its block
836  // location information. If it has not been changed, the previously loaded
837  // value can be reused.
838  if (fd == null || isMarkedCached || fd.getFileLength() != fileStatus.getLen()
839  || fd.getModificationTime() != fileStatus.getModificationTime()) {
840  // Create a new file descriptor and load the file block metadata,
841  // collecting the block metadata into perFsFileBlocks. The disk IDs for
842  // all the blocks of each filesystem will be loaded by loadDiskIds().
843  fd = new FileDescriptor(fileName, fileStatus.getLen(),
844  fileStatus.getModificationTime());
845  loadBlockMetadata(fs, fileStatus, fd, fileFormatDescriptor.getFileFormat(),
846  perFsFileBlocks);
847  }
848 
849  List<FileDescriptor> fds = fileDescMap_.get(partitionDir);
850  if (fds == null) {
851  fds = Lists.newArrayList();
852  fileDescMap_.put(partitionDir, fds);
853  }
854  fds.add(fd);
855 
856  // Add to the list of FileDescriptors for this partition.
857  fileDescriptors.add(fd);
858  }
859  numHdfsFiles_ += fileDescriptors.size();
860  }
861  HdfsPartition partition = new HdfsPartition(this, msPartition, keyValues,
862  fileFormatDescriptor, fileDescriptors,
863  getAvailableAccessLevel(fs, partDirPath));
864  partition.checkWellFormed();
865  return partition;
866  } catch (Exception e) {
867  throw new CatalogException("Failed to create partition: ", e);
868  }
869  }
870 
875  private void addPerFsFileBlocks(Map<FsKey, FileBlocksInfo> fsToBlocks, FileSystem fs,
876  List<THdfsFileBlock> blocks, List<BlockLocation> locations) {
877  FsKey fsKey = new FsKey(fs);
878  FileBlocksInfo infos = fsToBlocks.get(fsKey);
879  if (infos == null) {
880  infos = new FileBlocksInfo();
881  fsToBlocks.put(fsKey, infos);
882  }
883  infos.addBlocks(blocks, locations);
884  }
885 
892  public void addPartition(HdfsPartition partition) {
893  if (partitions_.contains(partition)) return;
894  partitions_.add(partition);
895  totalHdfsBytes_ += partition.getSize();
896  updatePartitionMdAndColStats(partition);
897  }
898 
904  private void updatePartitionMdAndColStats(HdfsPartition partition) {
905  if (partition.getPartitionValues().size() != numClusteringCols_) return;
906 
907  partitionIds_.add(partition.getId());
908  partitionMap_.put(partition.getId(), partition);
909  for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
910  ColumnStats stats = getColumns().get(i).getStats();
911  LiteralExpr literal = partition.getPartitionValues().get(i);
912  // Store partitions with null partition values separately
913  if (literal instanceof NullLiteral) {
914  stats.setNumNulls(stats.getNumNulls() + 1);
915  if (nullPartitionIds_.get(i).isEmpty()) {
916  stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
917  }
918  nullPartitionIds_.get(i).add(partition.getId());
919  continue;
920  }
921  HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
922  if (partitionIds == null) {
923  partitionIds = Sets.newHashSet();
924  partitionValuesMap_.get(i).put(literal, partitionIds);
925  stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
926  }
927  partitionIds.add(partition.getId());
928  }
929  }
930 
942  public HdfsPartition dropPartition(List<TPartitionKeyValue> partitionSpec) {
943  HdfsPartition partition = getPartitionFromThriftPartitionSpec(partitionSpec);
944  // Check if the partition does not exist.
945  if (partition == null || !partitions_.remove(partition)) return null;
946  totalHdfsBytes_ -= partition.getSize();
947  Preconditions.checkArgument(partition.getPartitionValues().size() ==
949  Long partitionId = partition.getId();
950  // Remove the partition id from the list of partition ids and other mappings.
951  partitionIds_.remove(partitionId);
952  partitionMap_.remove(partitionId);
953  for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
954  ColumnStats stats = getColumns().get(i).getStats();
955  LiteralExpr literal = partition.getPartitionValues().get(i);
956  // Check if this is a null literal.
957  if (literal instanceof NullLiteral) {
958  nullPartitionIds_.get(i).remove(partitionId);
959  stats.setNumNulls(stats.getNumNulls() - 1);
960  if (nullPartitionIds_.get(i).isEmpty()) {
961  stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
962  }
963  continue;
964  }
965  HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
966  // If there are multiple partition ids corresponding to a literal, remove
967  // only this id. Otherwise, remove the <literal, id> pair.
968  if (partitionIds.size() > 1) partitionIds.remove(partitionId);
969  else {
970  partitionValuesMap_.get(i).remove(literal);
971  stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
972  }
973  }
974  return partition;
975  }
976 
977  private void addDefaultPartition(StorageDescriptor storageDescriptor)
978  throws CatalogException {
979  // Default partition has no files and is not referred to by scan nodes. Data sinks
980  // refer to this to understand how to create new partitions.
981  HdfsStorageDescriptor hdfsStorageDescriptor =
982  HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
983  HdfsPartition partition = HdfsPartition.defaultPartition(this, hdfsStorageDescriptor);
984  partitions_.add(partition);
985  }
986 
987  @Override
1003  public void load(Table cachedEntry, HiveMetaStoreClient client,
1004  org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
1005  numHdfsFiles_ = 0;
1006  totalHdfsBytes_ = 0;
1007  LOG.debug("load table: " + db_.getName() + "." + name_);
1008 
1009  // turn all exceptions into TableLoadingException
1010  try {
1011  // set nullPartitionKeyValue from the hive conf.
1012  nullPartitionKeyValue_ = client.getConfigValue(
1013  "hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__");
1014 
1015  // set NULL indicator string from table properties
1017  msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
1018  if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE;
1019 
1020  // populate with both partition keys and regular columns
1021  List<FieldSchema> partKeys = msTbl.getPartitionKeys();
1022  List<FieldSchema> tblFields = Lists.newArrayList();
1023  String inputFormat = msTbl.getSd().getInputFormat();
1025  // Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
1026  // taking precedence.
1027  List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
1028  schemaSearchLocations.add(
1029  getMetaStoreTable().getSd().getSerdeInfo().getParameters());
1030  schemaSearchLocations.add(getMetaStoreTable().getParameters());
1031 
1032  avroSchema_ =
1033  HdfsTable.getAvroSchema(schemaSearchLocations, getFullName());
1034  String serdeLib = msTbl.getSd().getSerdeInfo().getSerializationLib();
1035  if (serdeLib == null ||
1036  serdeLib.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) {
1037  // If the SerDe library is null or set to LazySimpleSerDe or is null, it
1038  // indicates there is an issue with the table metadata since Avro table need a
1039  // non-native serde. Instead of failing to load the table, fall back to
1040  // using the fields from the storage descriptor (same as Hive).
1041  tblFields.addAll(msTbl.getSd().getCols());
1042  } else {
1043  // Load the fields from the Avro schema.
1044  // Since Avro does not include meta-data for CHAR or VARCHAR, an Avro type of
1045  // "string" is used for CHAR, VARCHAR and STRING. Default back to the storage
1046  // descriptor to determine the the type for "string"
1047  List<FieldSchema> sdTypes = msTbl.getSd().getCols();
1048  int i = 0;
1049  List<Column> avroTypeList = AvroSchemaParser.parse(avroSchema_);
1050  boolean canFallBack = sdTypes.size() == avroTypeList.size();
1051  for (Column parsedCol: avroTypeList) {
1052  FieldSchema fs = new FieldSchema();
1053  fs.setName(parsedCol.getName());
1054  String avroType = parsedCol.getType().toSql();
1055  if (avroType.toLowerCase().equals("string") && canFallBack) {
1056  fs.setType(sdTypes.get(i).getType());
1057  } else {
1058  fs.setType(avroType);
1059  }
1060  fs.setComment("from deserializer");
1061  tblFields.add(fs);
1062  i++;
1063  }
1064  }
1065  } else {
1066  tblFields.addAll(msTbl.getSd().getCols());
1067  }
1068  List<FieldSchema> fieldSchemas = new ArrayList<FieldSchema>(
1069  partKeys.size() + tblFields.size());
1070  fieldSchemas.addAll(partKeys);
1071  fieldSchemas.addAll(tblFields);
1072  // The number of clustering columns is the number of partition keys.
1073  numClusteringCols_ = partKeys.size();
1074  loadColumns(fieldSchemas, client);
1075 
1076  // Collect the list of partitions to use for the table. Partitions may be reused
1077  // from the existing cached table entry (if one exists), read from the metastore,
1078  // or a mix of both. Whether or not a partition is reused depends on whether
1079  // the table or partition has been modified.
1080  List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
1081  Lists.newArrayList();
1082  if (cachedEntry == null || !(cachedEntry instanceof HdfsTable) ||
1083  cachedEntry.lastDdlTime_ != lastDdlTime_) {
1084  msPartitions.addAll(MetaStoreUtil.fetchAllPartitions(
1085  client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES));
1086  } else {
1087  // The table was already in the metadata cache and it has not been modified.
1088  Preconditions.checkArgument(cachedEntry instanceof HdfsTable);
1089  HdfsTable cachedHdfsTableEntry = (HdfsTable) cachedEntry;
1090  // Set of partition names that have been modified. Partitions in this Set need to
1091  // be reloaded from the metastore.
1092  Set<String> modifiedPartitionNames = Sets.newHashSet();
1093 
1094  // If these are not the exact same object, look up the set of partition names in
1095  // the metastore. This is to support the special case of CTAS which creates a
1096  // "temp" table that doesn't actually exist in the metastore.
1097  if (cachedEntry != this) {
1098  // Since the table has not been modified, we might be able to reuse some of the
1099  // old partition metadata if the individual partitions have not been modified.
1100  // First get a list of all the partition names for this table from the
1101  // metastore, this is much faster than listing all the Partition objects.
1102  modifiedPartitionNames.addAll(
1103  client.listPartitionNames(db_.getName(), name_, (short) -1));
1104  }
1105 
1106  int totalPartitions = modifiedPartitionNames.size();
1107  // Get all the partitions from the cached entry that have not been modified.
1108  for (HdfsPartition cachedPart: cachedHdfsTableEntry.getPartitions()) {
1109  // Skip the default partition and any partitions that have been modified.
1110  if (cachedPart.isDirty() || cachedPart.isDefaultPartition()) {
1111  continue;
1112  }
1113 
1114  org.apache.hadoop.hive.metastore.api.Partition cachedMsPart =
1115  cachedPart.toHmsPartition();
1116  if (cachedMsPart == null) continue;
1117 
1118  // This is a partition we already know about and it hasn't been modified.
1119  // No need to reload the metadata.
1120  String cachedPartName = cachedPart.getPartitionName();
1121  if (modifiedPartitionNames.contains(cachedPartName)) {
1122  msPartitions.add(cachedMsPart);
1123  modifiedPartitionNames.remove(cachedPartName);
1124  }
1125  }
1126  LOG.info(String.format("Incrementally refreshing %d/%d partitions.",
1127  modifiedPartitionNames.size(), totalPartitions));
1128 
1129  // No need to make the metastore call if no partitions are to be updated.
1130  if (modifiedPartitionNames.size() > 0) {
1131  // Now reload the the remaining partitions.
1132  msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
1133  Lists.newArrayList(modifiedPartitionNames), db_.getName(), name_));
1134  }
1135  }
1136 
1137  Map<String, List<FileDescriptor>> oldFileDescMap = null;
1138  if (cachedEntry != null && cachedEntry instanceof HdfsTable) {
1139  HdfsTable cachedHdfsTable = (HdfsTable) cachedEntry;
1140  oldFileDescMap = cachedHdfsTable.fileDescMap_;
1141  hostIndex_.populate(cachedHdfsTable.hostIndex_.getList());
1142  }
1143  loadPartitions(msPartitions, msTbl, oldFileDescMap);
1144 
1145  // load table stats
1146  numRows_ = getRowCount(msTbl.getParameters());
1147  LOG.debug("table #rows=" + Long.toString(numRows_));
1148 
1149  // For unpartitioned tables set the numRows in its partitions
1150  // to the table's numRows.
1151  if (numClusteringCols_ == 0 && !partitions_.isEmpty()) {
1152  // Unpartitioned tables have a 'dummy' partition and a default partition.
1153  // Temp tables used in CTAS statements have one partition.
1154  Preconditions.checkState(partitions_.size() == 2 || partitions_.size() == 1);
1155  for (HdfsPartition p: partitions_) {
1156  p.setNumRows(numRows_);
1157  }
1158  }
1159  } catch (TableLoadingException e) {
1160  throw e;
1161  } catch (Exception e) {
1162  throw new TableLoadingException(
1163  "Failed to load metadata for table: " + getFullName(), e);
1164  }
1165  }
1166 
1177  public static String getAvroSchema(List<Map<String, String>> schemaSearchLocations,
1178  String tableName) throws TableLoadingException {
1179  String url = null;
1180  // Search all locations and break out on the first valid schema found.
1181  for (Map<String, String> schemaLocation: schemaSearchLocations) {
1182  if (schemaLocation == null) continue;
1183 
1184  String literal = schemaLocation.get(AvroSerdeUtils.SCHEMA_LITERAL);
1185  if (literal != null && !literal.equals(AvroSerdeUtils.SCHEMA_NONE)) return literal;
1186 
1187  url = schemaLocation.get(AvroSerdeUtils.SCHEMA_URL);
1188  if (url != null) {
1189  url = url.trim();
1190  break;
1191  }
1192  }
1193  if (url == null || url.equals(AvroSerdeUtils.SCHEMA_NONE)) {
1194  throw new TableLoadingException(String.format("No Avro schema provided in " +
1195  "SERDEPROPERTIES or TBLPROPERTIES for table: %s ", tableName));
1196  }
1197  String schema = null;
1198  if (url.toLowerCase().startsWith("http://")) {
1199  InputStream urlStream = null;
1200  try {
1201  urlStream = new URL(url).openStream();
1202  schema = IOUtils.toString(urlStream);
1203  } catch (IOException e) {
1204  throw new TableLoadingException("Problem reading Avro schema from: " + url, e);
1205  } finally {
1206  IOUtils.closeQuietly(urlStream);
1207  }
1208  } else {
1209  Path path = new Path(url);
1210  FileSystem fs = null;
1211  try {
1212  fs = path.getFileSystem(FileSystemUtil.getConfiguration());
1213  } catch (Exception e) {
1214  throw new TableLoadingException(String.format(
1215  "Invalid avro.schema.url: %s. %s", path, e.getMessage()));
1216  }
1217  StringBuilder errorMsg = new StringBuilder();
1218  if (!FileSystemUtil.isPathReachable(path, fs, errorMsg)) {
1219  throw new TableLoadingException(String.format(
1220  "Invalid avro.schema.url: %s. %s", path, errorMsg));
1221  }
1222  try {
1223  schema = FileSystemUtil.readFile(path);
1224  } catch (IOException e) {
1225  throw new TableLoadingException(
1226  "Problem reading Avro schema at: " + url, e);
1227  }
1228  }
1229  return schema;
1230  }
1231 
1232  @Override
1233  protected List<String> getColumnNamesWithHmsStats() {
1234  List<String> ret = Lists.newArrayList();
1235  // Only non-partition columns have column stats in the HMS.
1236  for (Column column: getColumns().subList(numClusteringCols_, getColumns().size())) {
1237  ret.add(column.getName().toLowerCase());
1238  }
1239  return ret;
1240  }
1241 
1242  @Override
1243  protected void loadFromThrift(TTable thriftTable) throws TableLoadingException {
1244  super.loadFromThrift(thriftTable);
1245  THdfsTable hdfsTable = thriftTable.getHdfs_table();
1246  hdfsBaseDir_ = hdfsTable.getHdfsBaseDir();
1247  nullColumnValue_ = hdfsTable.nullColumnValue;
1248  nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue;
1249  multipleFileSystems_ = hdfsTable.multiple_filesystems;
1250  hostIndex_.populate(hdfsTable.getNetwork_addresses());
1251  resetPartitionMd();
1252 
1253  numHdfsFiles_ = 0;
1254  totalHdfsBytes_ = 0;
1255  for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) {
1256  HdfsPartition hdfsPart =
1257  HdfsPartition.fromThrift(this, part.getKey(), part.getValue());
1258  numHdfsFiles_ += hdfsPart.getFileDescriptors().size();
1259  totalHdfsBytes_ += hdfsPart.getSize();
1260  partitions_.add(hdfsPart);
1261  }
1262  avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : null;
1263  isMarkedCached_ = HdfsCachingUtil.getCacheDirectiveId(
1264  getMetaStoreTable().getParameters()) != null;
1266  }
1267 
1268  @Override
1269  public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) {
1270  // Create thrift descriptors to send to the BE. The BE does not
1271  // need any information below the THdfsPartition level.
1272  TTableDescriptor tableDesc = new TTableDescriptor(id_.asInt(), TTableType.HDFS_TABLE,
1274  tableDesc.setHdfsTable(getTHdfsTable(false, referencedPartitions));
1275  tableDesc.setColNames(getColumnNames());
1276  return tableDesc;
1277  }
1278 
1279  @Override
1280  public TTable toThrift() {
1281  // Send all metadata between the catalog service and the FE.
1282  TTable table = super.toThrift();
1283  table.setTable_type(TTableType.HDFS_TABLE);
1284  table.setHdfs_table(getTHdfsTable(true, null));
1285  return table;
1286  }
1287 
1295  private THdfsTable getTHdfsTable(boolean includeFileDesc, Set<Long> refPartitions) {
1296  // includeFileDesc implies all partitions should be included (refPartitions == null).
1297  Preconditions.checkState(!includeFileDesc || refPartitions == null);
1298  Map<Long, THdfsPartition> idToPartition = Maps.newHashMap();
1299  for (HdfsPartition partition: partitions_) {
1300  long id = partition.getId();
1301  if (refPartitions == null || refPartitions.contains(id)) {
1302  idToPartition.put(id, partition.toThrift(includeFileDesc));
1303  }
1304  }
1305  THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
1306  nullPartitionKeyValue_, nullColumnValue_, idToPartition);
1307  hdfsTable.setAvroSchema(avroSchema_);
1308  hdfsTable.setMultiple_filesystems(multipleFileSystems_);
1309  if (includeFileDesc) {
1310  // Network addresses are used only by THdfsFileBlocks which are inside
1311  // THdfsFileDesc, so include network addreses only when including THdfsFileDesc.
1312  hdfsTable.setNetwork_addresses(hostIndex_.getList());
1313  }
1314  return hdfsTable;
1315  }
1316 
1317  public long getNumHdfsFiles() { return numHdfsFiles_; }
1318  public long getTotalHdfsBytes() { return totalHdfsBytes_; }
1319  public String getHdfsBaseDir() { return hdfsBaseDir_; }
1320  public boolean isAvroTable() { return avroSchema_ != null; }
1321 
1322  @Override
1323  public int getNumNodes() { return hostIndex_.size(); }
1324 
1328  public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
1329 
1334  Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newHashMap();
1335  for (HdfsPartition partition: partitions_) {
1336  HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
1337  Integer numPartitions = numPartitionsByFormat.get(format);
1338  if (numPartitions == null) {
1339  numPartitions = Integer.valueOf(1);
1340  } else {
1341  numPartitions = Integer.valueOf(numPartitions.intValue() + 1);
1342  }
1343  numPartitionsByFormat.put(format, numPartitions);
1344  }
1345 
1346  int maxNumPartitions = Integer.MIN_VALUE;
1347  HdfsFileFormat majorityFormat = null;
1348  for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) {
1349  if (entry.getValue().intValue() > maxNumPartitions) {
1350  majorityFormat = entry.getKey();
1351  maxNumPartitions = entry.getValue().intValue();
1352  }
1353  }
1354  Preconditions.checkNotNull(majorityFormat);
1355  return majorityFormat;
1356  }
1357 
1363  public TResultSet getTableStats() {
1364  TResultSet result = new TResultSet();
1365  TResultSetMetadata resultSchema = new TResultSetMetadata();
1366  result.setSchema(resultSchema);
1367 
1368  for (int i = 0; i < numClusteringCols_; ++i) {
1369  // Add the partition-key values as strings for simplicity.
1370  Column partCol = getColumns().get(i);
1371  TColumn colDesc = new TColumn(partCol.getName(), Type.STRING.toThrift());
1372  resultSchema.addToColumns(colDesc);
1373  }
1374 
1375  resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
1376  resultSchema.addToColumns(new TColumn("#Files", Type.BIGINT.toThrift()));
1377  resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
1378  resultSchema.addToColumns(new TColumn("Bytes Cached", Type.STRING.toThrift()));
1379  resultSchema.addToColumns(new TColumn("Cache Replication", Type.STRING.toThrift()));
1380  resultSchema.addToColumns(new TColumn("Format", Type.STRING.toThrift()));
1381  resultSchema.addToColumns(new TColumn("Incremental stats", Type.STRING.toThrift()));
1382  resultSchema.addToColumns(new TColumn("Location", Type.STRING.toThrift()));
1383 
1384  // Pretty print partitions and their stats.
1385  ArrayList<HdfsPartition> orderedPartitions = Lists.newArrayList(partitions_);
1386  Collections.sort(orderedPartitions);
1387 
1388  long totalCachedBytes = 0L;
1389  for (HdfsPartition p: orderedPartitions) {
1390  // Ignore dummy default partition.
1391  if (p.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) continue;
1392  TResultRowBuilder rowBuilder = new TResultRowBuilder();
1393 
1394  // Add the partition-key values (as strings for simplicity).
1395  for (LiteralExpr expr: p.getPartitionValues()) {
1396  rowBuilder.add(expr.getStringValue());
1397  }
1398 
1399  // Add number of rows, files, bytes, cache stats, and file format.
1400  rowBuilder.add(p.getNumRows()).add(p.getFileDescriptors().size())
1401  .addBytes(p.getSize());
1402  if (!p.isMarkedCached()) {
1403  // Helps to differentiate partitions that have 0B cached versus partitions
1404  // that are not marked as cached.
1405  rowBuilder.add("NOT CACHED");
1406  rowBuilder.add("NOT CACHED");
1407  } else {
1408  // Calculate the number the number of bytes that are cached.
1409  long cachedBytes = 0L;
1410  for (FileDescriptor fd: p.getFileDescriptors()) {
1411  for (THdfsFileBlock fb: fd.getFileBlocks()) {
1412  if (fb.getIs_replica_cached().contains(true)) {
1413  cachedBytes += fb.getLength();
1414  }
1415  }
1416  }
1417  totalCachedBytes += cachedBytes;
1418  rowBuilder.addBytes(cachedBytes);
1419 
1420  // Extract cache replication factor from the parameters of the table
1421  // if the table is not partitioned or directly from the partition.
1422  Short rep = HdfsCachingUtil.getCachedCacheReplication(
1423  numClusteringCols_ == 0 ?
1424  p.getTable().getMetaStoreTable().getParameters() :
1425  p.getParameters());
1426  rowBuilder.add(rep.toString());
1427  }
1428  rowBuilder.add(p.getInputFormatDescriptor().getFileFormat().toString());
1429 
1430  rowBuilder.add(String.valueOf(p.hasIncrementalStats()));
1431  rowBuilder.add(p.getLocation());
1432  result.addToRows(rowBuilder.get());
1433  }
1434 
1435  // For partitioned tables add a summary row at the bottom.
1436  if (numClusteringCols_ > 0) {
1437  TResultRowBuilder rowBuilder = new TResultRowBuilder();
1438  int numEmptyCells = numClusteringCols_ - 1;
1439  rowBuilder.add("Total");
1440  for (int i = 0; i < numEmptyCells; ++i) {
1441  rowBuilder.add("");
1442  }
1443 
1444  // Total num rows, files, and bytes (leave format empty).
1445  rowBuilder.add(numRows_).add(numHdfsFiles_).addBytes(totalHdfsBytes_)
1446  .addBytes(totalCachedBytes).add("").add("").add("").add("");
1447  result.addToRows(rowBuilder.get());
1448  }
1449  return result;
1450  }
1451 
1456  public TResultSet getFiles(List<TPartitionKeyValue> partitionSpec) throws CatalogException {
1457  TResultSet result = new TResultSet();
1458  TResultSetMetadata resultSchema = new TResultSetMetadata();
1459  result.setSchema(resultSchema);
1460  resultSchema.addToColumns(new TColumn("path", Type.STRING.toThrift()));
1461  resultSchema.addToColumns(new TColumn("size", Type.STRING.toThrift()));
1462  resultSchema.addToColumns(new TColumn("partition", Type.STRING.toThrift()));
1463  result.setRows(Lists.<TResultRow>newArrayList());
1464 
1465  List<HdfsPartition> partitions = null;
1466  if (partitionSpec == null) {
1467  partitions = partitions_;
1468  } else {
1469  // Get the HdfsPartition object for the given partition spec.
1470  HdfsPartition partition = getPartitionFromThriftPartitionSpec(partitionSpec);
1471  Preconditions.checkState(partition != null);
1472  partitions = Lists.newArrayList(partition);
1473  }
1474 
1475  for (HdfsPartition p: partitions) {
1476  for (FileDescriptor fd: p.getFileDescriptors()) {
1477  TResultRowBuilder rowBuilder = new TResultRowBuilder();
1478  rowBuilder.add(p.getLocation() + "/" + fd.getFileName());
1479  rowBuilder.add(PrintUtils.printBytes(fd.getFileLength()));
1480  rowBuilder.add(p.getPartitionName());
1481  result.addToRows(rowBuilder.get());
1482  }
1483  }
1484  return result;
1485  }
1486 }
void loadDiskIds(Map< FsKey, FileBlocksInfo > perFsFileBlocks)
Definition: HdfsTable.java:363
static HdfsCompression fromFileName(String fileName)
List< String > getColumnNames()
Definition: Table.java:354
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
void addDefaultPartition(StorageDescriptor storageDescriptor)
Definition: HdfsTable.java:977
static boolean hasGetFileBlockLocations(FileSystem fs)
static long getRowCount(Map< String, String > parameters)
Definition: Table.java:191
List< FieldSchema > getFieldSchemas()
Definition: HdfsTable.java:453
static final ScalarType BIGINT
Definition: Type.java:50
void loadBlockMetadata(FileSystem fs, FileStatus file, FileDescriptor fd, HdfsFileFormat fileFormat, Map< FsKey, FileBlocksInfo > perFsFileBlocks)
Definition: HdfsTable.java:275
void addColumn(Column col)
Definition: Table.java:114
static final int NUM_PARTITION_FETCH_RETRIES
Definition: HdfsTable.java:109
final Map< String, List< FileDescriptor > > fileDescMap_
Definition: HdfsTable.java:165
void addPartition(HdfsPartition partition)
Definition: HdfsTable.java:892
static final ScalarType STRING
Definition: Type.java:53
void load(Table cachedEntry, HiveMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl)
final ArrayList< TreeMap< LiteralExpr, HashSet< Long > > > partitionValuesMap_
Definition: HdfsTable.java:138
TreeMap< LiteralExpr, HashSet< Long > > getPartitionValueMap(int i)
Definition: HdfsTable.java:437
void synthesizeBlockMetadata(FileSystem fs, FileDescriptor fd, HdfsFileFormat fileFormat)
Definition: HdfsTable.java:335
void loadFromThrift(TTable thriftTable)
void loadColumns(List< FieldSchema > fieldSchemas, HiveMetaStoreClient client)
Definition: HdfsTable.java:567
HdfsTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTbl, Db db, String name, String owner)
Definition: HdfsTable.java:421
Type parseColumnType(FieldSchema fs)
Definition: Table.java:331
List< HdfsPartition > getPartitions()
Definition: HdfsTable.java:429
THdfsTable getTHdfsTable(boolean includeFileDesc, Set< Long > refPartitions)
ArrayList< Column > getColumns()
Definition: Table.java:349
boolean isSplittable(HdfsCompression compression)
TAccessLevel getAvailableAccessLevel(FileSystem fs, Path location)
Definition: HdfsTable.java:699
static String getAvroSchema(List< Map< String, String >> schemaSearchLocations, String tableName)
void updatePartitionMdAndColStats(HdfsPartition partition)
Definition: HdfsTable.java:904
void addBlocks(List< THdfsFileBlock > b, List< BlockLocation > l)
Definition: HdfsTable.java:225
void loadAllColumnStats(HiveMetaStoreClient client)
Definition: Table.java:155
final HashMap< Long, HdfsPartition > partitionMap_
Definition: HdfsTable.java:147
HashSet< Long > getNullPartitionIds(int i)
Definition: HdfsTable.java:435
static int getDiskId(VolumeId hdfsVolumeId)
Definition: HdfsTable.java:243
static final boolean SUPPORTS_VOLUME_ID
Definition: HdfsTable.java:190
static final long MIN_SYNTHETIC_BLOCK_SIZE
Definition: HdfsTable.java:117
HdfsPartition getPartition(List< PartitionKeyValue > partitionSpec)
Definition: HdfsTable.java:491
final ListMap< TNetworkAddress > hostIndex_
Definition: HdfsTable.java:161
Map< String, List< FileDescriptor > > getFileDescMap()
Definition: HdfsTable.java:265
HdfsPartition dropPartition(List< TPartitionKeyValue > partitionSpec)
Definition: HdfsTable.java:942
HdfsPartition createPartition(StorageDescriptor storageDescriptor, org.apache.hadoop.hive.metastore.api.Partition msPartition, Map< String, List< FileDescriptor >> oldFileDescMap, Map< FsKey, FileBlocksInfo > perFsFileBlocks)
Definition: HdfsTable.java:764
final List< HdfsPartition > partitions_
Definition: HdfsTable.java:134
final ArrayList< HashSet< Long > > nullPartitionIds_
Definition: HdfsTable.java:143
static final Configuration CONF
Definition: HdfsTable.java:188
static boolean impliesWriteAccess(TAccessLevel level)
TCatalogObjectType getCatalogObjectType()
Definition: HdfsTable.java:428
static Boolean isPathReachable(Path path, FileSystem fs, StringBuilder error_msg)
static final String DEFAULT_NULL_COLUMN_VALUE
Definition: HdfsTable.java:106
org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable()
Definition: Table.java:398
void toThrift(TColumnType container)
void loadPartitions(List< org.apache.hadoop.hive.metastore.api.Partition > msPartitions, org.apache.hadoop.hive.metastore.api.Table msTbl, Map< String, List< FileDescriptor >> oldFileDescMap)
Definition: HdfsTable.java:629
static HdfsFileFormat fromJavaClassName(String className)
void addPerFsFileBlocks(Map< FsKey, FileBlocksInfo > fsToBlocks, FileSystem fs, List< THdfsFileBlock > blocks, List< BlockLocation > locations)
Definition: HdfsTable.java:875
Table(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner)
Definition: Table.java:91
static final TNetworkAddress REMOTE_NETWORK_ADDRESS
Definition: HdfsTable.java:112
ListMap< TNetworkAddress > getHostIndex()
TResultSet getFiles(List< TPartitionKeyValue > partitionSpec)
List< FieldSchema > getNonPartitionFieldSchemas()
Definition: HdfsTable.java:454
string name
Definition: cpu-info.cc:50
HdfsPartition createPartition(StorageDescriptor storageDescriptor, org.apache.hadoop.hive.metastore.api.Partition msPartition)
Definition: HdfsTable.java:735
TTableDescriptor toThriftDescriptor(Set< Long > referencedPartitions)
final HashSet< Long > partitionIds_
Definition: HdfsTable.java:150
List< FieldSchema > fields_
Definition: Table.java:65
HashMap< Long, HdfsPartition > getPartitionMap()
Definition: HdfsTable.java:434
HdfsPartition getPartitionFromThriftPartitionSpec(List< TPartitionKeyValue > partitionSpec)
Definition: HdfsTable.java:505