Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
LoadDataStmt.java
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.analysis;
16 
17 import java.io.FileNotFoundException;
18 import java.io.IOException;
19 
20 import org.apache.hadoop.fs.FileStatus;
21 import org.apache.hadoop.fs.FileSystem;
22 import org.apache.hadoop.fs.permission.FsAction;
23 import org.apache.hadoop.fs.Path;
24 import org.apache.hadoop.hdfs.DistributedFileSystem;
25 
33 import com.cloudera.impala.thrift.TLoadDataReq;
34 import com.cloudera.impala.thrift.TTableName;
37 import com.google.common.base.Preconditions;
38 
39 /*
40  * Represents a LOAD DATA statement for moving data into an existing table:
41  * LOAD DATA INPATH 'filepath' [OVERWRITE] INTO TABLE <table name>
42  * [PARTITION (partcol1=val1, partcol2=val2 ...)]
43  *
44  * The LOAD DATA operation supports loading (moving) a single file or all files in a
45  * given source directory to a table or partition location. If OVERWRITE is true, all
46  * exiting files in the destination will be removed before moving the new data in.
47  * If OVERWRITE is false, existing files will be preserved. If there are any file name
48  * conflicts, the new files will be uniquified by inserting a UUID into the file name
49  * (preserving the extension).
50  * Loading hidden files is not supported and any hidden files in the source or
51  * destination are preserved, even if OVERWRITE is true.
52  */
53 public class LoadDataStmt extends StatementBase {
54  private final TableName tableName_;
55  private final HdfsUri sourceDataPath_;
57  private final boolean overwrite_;
58 
59  // Set during analysis
60  private String dbName_;
61 
62  public LoadDataStmt(TableName tableName, HdfsUri sourceDataPath, boolean overwrite,
63  PartitionSpec partitionSpec) {
64  Preconditions.checkNotNull(tableName);
65  Preconditions.checkNotNull(sourceDataPath);
66  this.tableName_ = tableName;
67  this.sourceDataPath_ = sourceDataPath;
68  this.overwrite_ = overwrite;
69  this.partitionSpec_ = partitionSpec;
70  }
71 
72  public String getTbl() {
73  return tableName_.getTbl();
74  }
75 
76  public String getDb() {
77  Preconditions.checkNotNull(dbName_);
78  return dbName_;
79  }
80 
81  /*
82  * Print SQL syntax corresponding to this node.
83  * @see com.cloudera.impala.parser.ParseNode#toSql()
84  */
85  @Override
86  public String toSql() {
87  StringBuilder sb = new StringBuilder("LOAD DATA INPATH '");
88  sb.append(sourceDataPath_ + "' ");
89  if (overwrite_) sb.append("OVERWRITE ");
90  sb.append("INTO TABLE " + tableName_.toString());
91  if (partitionSpec_ != null) sb.append(" " + partitionSpec_.toSql());
92  return sb.toString();
93  }
94 
95  @Override
96  public void analyze(Analyzer analyzer) throws AnalysisException {
97  dbName_ = analyzer.getTargetDbName(tableName_);
98  Table table = analyzer.getTable(tableName_, Privilege.INSERT);
99  if (!(table instanceof HdfsTable)) {
100  throw new AnalysisException("LOAD DATA only supported for HDFS tables: " +
101  dbName_ + "." + getTbl());
102  }
103 
104  // Analyze the partition spec, if one was specified.
105  if (partitionSpec_ != null) {
106  partitionSpec_.setTableName(tableName_);
107  partitionSpec_.setPartitionShouldExist();
108  partitionSpec_.setPrivilegeRequirement(Privilege.INSERT);
109  partitionSpec_.analyze(analyzer);
110  } else {
111  if (table.getMetaStoreTable().getPartitionKeysSize() > 0) {
112  throw new AnalysisException("Table is partitioned but no partition spec was " +
113  "specified: " + dbName_ + "." + getTbl());
114  }
115  }
116  analyzePaths(analyzer, (HdfsTable) table);
117  }
118 
125  private void analyzePaths(Analyzer analyzer, HdfsTable hdfsTable)
126  throws AnalysisException {
127  // The user must have permission to access the source location. Since the files will
128  // be moved from this location, the user needs to have all permission.
129  sourceDataPath_.analyze(analyzer, Privilege.ALL);
130 
131  // Catch all exceptions thrown by accessing files, and rethrow as AnalysisExceptions.
132  try {
133  Path source = sourceDataPath_.getPath();
134  FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration());
135  if (!(fs instanceof DistributedFileSystem)) {
136  throw new AnalysisException(String.format("INPATH location '%s' " +
137  "must point to an HDFS filesystem.", sourceDataPath_));
138  }
139  if (!fs.exists(source)) {
140  throw new AnalysisException(String.format(
141  "INPATH location '%s' does not exist.", sourceDataPath_));
142  }
143 
144  // If the source file is a directory, we must be able to read from and write to
145  // it. If the source file is a file, we must be able to read from it, and write to
146  // its parent directory (in order to delete the file as part of the move operation).
147  FsPermissionChecker checker = FsPermissionChecker.getInstance();
148 
149  if (fs.isDirectory(source)) {
150  if (FileSystemUtil.getTotalNumVisibleFiles(source) == 0) {
151  throw new AnalysisException(String.format(
152  "INPATH location '%s' contains no visible files.", sourceDataPath_));
153  }
154  if (FileSystemUtil.containsSubdirectory(source)) {
155  throw new AnalysisException(String.format(
156  "INPATH location '%s' cannot contain subdirectories.", sourceDataPath_));
157  }
158  if (!checker.getPermissions(fs, source).checkPermissions(
159  FsAction.READ_WRITE)) {
160  throw new AnalysisException(String.format("Unable to LOAD DATA from %s " +
161  "because Impala does not have READ and WRITE permissions on this directory",
162  source));
163  }
164  } else {
165  // INPATH names a file.
166  if (FileSystemUtil.isHiddenFile(source.getName())) {
167  throw new AnalysisException(String.format(
168  "INPATH location '%s' points to a hidden file.", source));
169  }
170 
171  if (!checker.getPermissions(fs, source.getParent()).checkPermissions(
172  FsAction.WRITE)) {
173  throw new AnalysisException(String.format("Unable to LOAD DATA from %s " +
174  "because Impala does not have WRITE permissions on its parent " +
175  "directory %s", source, source.getParent()));
176  }
177 
178  if (!checker.getPermissions(fs, source).checkPermissions(
179  FsAction.READ)) {
180  throw new AnalysisException(String.format("Unable to LOAD DATA from %s " +
181  "because Impala does not have READ permissions on this file", source));
182  }
183  }
184 
185  String noWriteAccessErrorMsg = String.format("Unable to LOAD DATA into " +
186  "target table (%s) because Impala does not have WRITE access to HDFS " +
187  "location: ", hdfsTable.getFullName());
188 
189  HdfsPartition partition;
190  String location;
191  if (partitionSpec_ != null) {
192  partition = hdfsTable.getPartition(partitionSpec_.getPartitionSpecKeyValues());
193  location = partition.getLocation();
195  throw new AnalysisException(noWriteAccessErrorMsg + partition.getLocation());
196  }
197  } else {
198  // "default" partition
199  partition = hdfsTable.getPartitions().get(0);
200  location = hdfsTable.getLocation();
201  if (!hdfsTable.hasWriteAccess()) {
202  throw new AnalysisException(noWriteAccessErrorMsg + hdfsTable.getLocation());
203  }
204  }
205  Preconditions.checkNotNull(partition);
206 
207  // Until Frontend.loadTableData() can handle cross-filesystem and filesystems
208  // that aren't HDFS, require that source and dest are on the same HDFS.
209  if (!FileSystemUtil.isPathOnFileSystem(new Path(location), fs)) {
210  throw new AnalysisException(String.format(
211  "Unable to LOAD DATA into target table (%s) because source path (%s) and " +
212  "destination %s (%s) are on different filesystems.",
213  hdfsTable.getFullName(),
214  source, partitionSpec_ == null ? "table" : "partition",
215  partition.getLocation()));
216  }
217  // Verify the files being loaded are supported.
218  for (FileStatus fStatus: fs.listStatus(source)) {
219  if (fs.isDirectory(fStatus.getPath())) continue;
220  StringBuilder errorMsg = new StringBuilder();
221  HdfsFileFormat fileFormat = partition.getInputFormatDescriptor().getFileFormat();
222  if (!fileFormat.isFileCompressionTypeSupported(fStatus.getPath().toString(),
223  errorMsg)) {
224  throw new AnalysisException(errorMsg.toString());
225  }
226  }
227  } catch (FileNotFoundException e) {
228  throw new AnalysisException("File not found: " + e.getMessage(), e);
229  } catch (IOException e) {
230  throw new AnalysisException("Error accessing filesystem: " + e.getMessage(), e);
231  }
232  }
233 
234  public TLoadDataReq toThrift() {
235  TLoadDataReq loadDataReq = new TLoadDataReq();
236  loadDataReq.setTable_name(new TTableName(getDb(), getTbl()));
237  loadDataReq.setSource_path(sourceDataPath_.toString());
238  loadDataReq.setOverwrite(overwrite_);
239  if (partitionSpec_ != null) {
240  loadDataReq.setPartition_spec(partitionSpec_.toThrift());
241  }
242  return loadDataReq;
243  }
244 }
static boolean containsSubdirectory(Path directory)
void analyzePaths(Analyzer analyzer, HdfsTable hdfsTable)
Permissions getPermissions(FileSystem fs, Path path)
LoadDataStmt(TableName tableName, HdfsUri sourceDataPath, boolean overwrite, PartitionSpec partitionSpec)
static boolean isHiddenFile(String fileName)
static boolean impliesWriteAccess(TAccessLevel level)
boolean isFileCompressionTypeSupported(String fileName, StringBuilder errorMsg)
static int getTotalNumVisibleFiles(Path directory)
static Boolean isPathOnFileSystem(Path path, FileSystem fs)