Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
CreateTableLikeFileStmt.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.analysis;
16 
17 import java.io.FileNotFoundException;
18 import java.io.IOException;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 
23 import org.apache.hadoop.fs.FileSystem;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.fs.permission.FsAction;
26 
27 import parquet.hadoop.ParquetFileReader;
28 import parquet.hadoop.metadata.ParquetMetadata;
29 import parquet.schema.OriginalType;
31 
40 import com.cloudera.impala.thrift.THdfsFileFormat;
41 import com.google.common.base.Preconditions;
42 import com.google.common.collect.Lists;
43 
44 
51  private final HdfsUri schemaLocation_;
52  private final THdfsFileFormat schemaFileFormat_;
53 
54  public CreateTableLikeFileStmt(TableName tableName, THdfsFileFormat schemaFileFormat,
55  HdfsUri schemaLocation, List<ColumnDef> partitionColumnDescs,
56  boolean isExternal, String comment, RowFormat rowFormat,
57  THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp,
58  boolean ifNotExists, Map<String, String> tblProperties,
59  Map<String, String> serdeProperties) {
60  super(tableName, new ArrayList<ColumnDef>(), partitionColumnDescs,
61  isExternal, comment, rowFormat,
62  fileFormat, location, cachingOp, ifNotExists, tblProperties, serdeProperties);
63  schemaLocation_ = schemaLocation;
64  schemaFileFormat_ = schemaFileFormat;
65  }
66 
72  private static parquet.schema.MessageType loadParquetSchema(Path pathToFile)
73  throws AnalysisException {
74  try {
75  FileSystem fs = pathToFile.getFileSystem(FileSystemUtil.getConfiguration());
76  if (!fs.isFile(pathToFile)) {
77  throw new AnalysisException("Cannot infer schema, path is not a file: " +
78  pathToFile);
79  }
80  } catch (IOException e) {
81  throw new AnalysisException("Failed to connect to filesystem:" + e);
82  } catch (IllegalArgumentException e) {
83  throw new AnalysisException(e.getMessage());
84  }
85  ParquetMetadata readFooter = null;
86  try {
87  readFooter = ParquetFileReader.readFooter(FileSystemUtil.getConfiguration(),
88  pathToFile);
89  } catch (FileNotFoundException e) {
90  throw new AnalysisException("File not found: " + e);
91  } catch (IOException e) {
92  throw new AnalysisException("Failed to open file as a parquet file: " + e);
93  } catch (RuntimeException e) {
94  // Parquet throws a generic RuntimeException when reading a non-parquet file
95  if (e.toString().contains("is not a Parquet file")) {
96  throw new AnalysisException("File is not a parquet file: " + pathToFile);
97  }
98  // otherwise, who knows what we caught, throw it back up
99  throw e;
100  }
101  return readFooter.getFileMetaData().getSchema();
102  }
103 
108  private static Type convertPrimitiveParquetType(parquet.schema.Type parquetType)
109  throws AnalysisException {
110  Preconditions.checkState(parquetType.isPrimitive());
111  PrimitiveType prim = parquetType.asPrimitiveType();
112  switch (prim.getPrimitiveTypeName()) {
113  case BINARY:
114  return Type.STRING;
115  case BOOLEAN:
116  return Type.BOOLEAN;
117  case DOUBLE:
118  return Type.DOUBLE;
119  case FIXED_LEN_BYTE_ARRAY:
120  throw new AnalysisException(
121  "Unsupported parquet type FIXED_LEN_BYTE_ARRAY for field " +
122  parquetType.getName());
123  case FLOAT:
124  return Type.FLOAT;
125  case INT32:
126  return Type.INT;
127  case INT64:
128  return Type.BIGINT;
129  case INT96:
130  return Type.TIMESTAMP;
131  default:
132  Preconditions.checkState(false, "Unexpected parquet primitive type: " +
133  prim.getPrimitiveTypeName());
134  return null;
135  }
136  }
137 
144  private static Type convertLogicalParquetType(parquet.schema.Type parquetType)
145  throws AnalysisException {
146  PrimitiveType prim = parquetType.asPrimitiveType();
147  OriginalType orig = parquetType.getOriginalType();
148  if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY &&
149  orig == OriginalType.UTF8) {
150  // UTF8 is the type annotation Parquet uses for strings
151  // We check to make sure it applies to BINARY to avoid errors if there is a bad
152  // annotation.
153  return Type.STRING;
154  }
155  if (orig == OriginalType.DECIMAL) {
156  return ScalarType.createDecimalType(prim.getDecimalMetadata().getPrecision(),
157  prim.getDecimalMetadata().getScale());
158  }
159 
160  throw new AnalysisException(
161  "Unsupported logical parquet type " + orig + " (primitive type is " +
162  prim.getPrimitiveTypeName().name() + ") for field " +
163  parquetType.getName());
164  }
165 
171  private static List<ColumnDef> extractParquetSchema(HdfsUri location)
172  throws AnalysisException {
173  parquet.schema.MessageType parquetSchema = loadParquetSchema(location.getPath());
174  List<parquet.schema.Type> fields = parquetSchema.getFields();
175  List<ColumnDef> schema = new ArrayList<ColumnDef>();
176 
177  for (parquet.schema.Type field: fields) {
178  Type type = null;
179 
180  if (field.getOriginalType() != null) {
181  type = convertLogicalParquetType(field);
182  } else if (field.isPrimitive()) {
183  type = convertPrimitiveParquetType(field);
184  } else {
185  throw new AnalysisException("Unsupported parquet type for field " +
186  field.getName());
187  }
188 
189  String colName = field.getName();
190  schema.add(new ColumnDef(colName, new TypeDef(type),
191  "inferred from: " + field.toString()));
192  }
193  return schema;
194  }
195 
196  @Override
197  public String toSql() {
198  ArrayList<String> colsSql = Lists.newArrayList();
199  ArrayList<String> partitionColsSql = Lists.newArrayList();
200  HdfsCompression compression = HdfsCompression.fromFileName(
201  schemaLocation_.toString());
202  String s = ToSqlUtils.getCreateTableSql(getDb(),
203  getTbl() + " __LIKE_FILEFORMAT__ ", getComment(), colsSql, partitionColsSql,
205  getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()),
206  compression, null, getLocation().toString());
207  s = s.replace("__LIKE_FILEFORMAT__", "LIKE " + schemaFileFormat_ + " " +
208  schemaLocation_.toString());
209  return s;
210  }
211 
212  @Override
213  public void analyze(Analyzer analyzer) throws AnalysisException {
214  schemaLocation_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
215  switch (schemaFileFormat_) {
216  case PARQUET:
218  break;
219  default:
220  throw new AnalysisException("Unsupported file type for schema inference: "
222  }
223  super.analyze(analyzer);
224  }
225 
226 }
static final ScalarType BIGINT
Definition: Type.java:50
static parquet.schema.MessageType loadParquetSchema(Path pathToFile)
static final ScalarType STRING
Definition: Type.java:53
static final ScalarType BOOLEAN
Definition: Type.java:46
static final ScalarType FLOAT
Definition: Type.java:51
PrimitiveType
Definition: types.h:27
static final ScalarType DOUBLE
Definition: Type.java:52
static Type convertLogicalParquetType(parquet.schema.Type parquetType)
static final ScalarType INT
Definition: Type.java:49
static List< ColumnDef > extractParquetSchema(HdfsUri location)
static Type convertPrimitiveParquetType(parquet.schema.Type parquetType)
CreateTableLikeFileStmt(TableName tableName, THdfsFileFormat schemaFileFormat, HdfsUri schemaLocation, List< ColumnDef > partitionColumnDescs, boolean isExternal, String comment, RowFormat rowFormat, THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp, boolean ifNotExists, Map< String, String > tblProperties, Map< String, String > serdeProperties)
static final ScalarType TIMESTAMP
Definition: Type.java:55