15 package com.cloudera.impala.analysis;
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.List;
22 import org.apache.hadoop.fs.Path;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
37 import com.google.common.base.Joiner;
38 import com.google.common.base.Preconditions;
39 import com.google.common.collect.Lists;
40 import com.google.common.collect.Sets;
47 private final static Logger
LOG = LoggerFactory.getLogger(InsertStmt.class);
91 private final ArrayList<Expr>
resultExprs_ =
new ArrayList<Expr>();
114 List<PartitionKeyValue> partitionKeyValues, List<String> planHints,
115 QueryStmt queryStmt, List<String> columnPermutation) {
132 withClause_ = other.withClause_ != null ? other.withClause_.clone() : null;
138 queryStmt_ = other.queryStmt_ != null ? other.queryStmt_.clone() : null;
151 if (
withClause_ != null) withClause_.analyze(analyzer);
155 if (analyzer.getMissingTbls().isEmpty())
throw e;
158 List<Expr> selectListExprs = null;
164 queryStmt_.analyze(queryStmtAnalyzer);
166 if (analyzer.containsSubquery()) {
167 StmtRewriter.rewriteQueryStatement(
queryStmt_, queryStmtAnalyzer);
168 queryStmt_ = queryStmt_.clone();
169 queryStmtAnalyzer =
new Analyzer(analyzer);
170 queryStmt_.analyze(queryStmtAnalyzer);
173 selectListExprs = Expr.cloneList(queryStmt_.getBaseTblResultExprs());
175 if (analyzer.getMissingTbls().isEmpty())
throw e;
178 selectListExprs = Lists.newArrayList();
186 if (!analyzer.getMissingTbls().isEmpty()) {
191 int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
216 if (analysisColumnPermutation == null) {
217 analysisColumnPermutation = Lists.newArrayList();
218 ArrayList<Column> tableColumns = table_.getColumns();
219 for (
int i = numClusteringCols; i < tableColumns.size(); ++i) {
220 analysisColumnPermutation.add(tableColumns.get(i).getName());
227 ArrayList<Column> selectExprTargetColumns = Lists.newArrayList();
231 Set<String> mentionedColumnNames = Sets.newHashSet();
232 for (String columnName: analysisColumnPermutation) {
233 Column column = table_.getColumn(columnName);
234 if (column == null) {
236 "Unknown column '" + columnName +
"' in column permutation");
239 if (!mentionedColumnNames.add(columnName)) {
241 "Duplicate column '" + columnName +
"' in column permutation");
243 selectExprTargetColumns.add(column);
246 int numStaticPartitionExprs = 0;
249 Column column = table_.getColumn(pkv.getColName());
250 if (column == null) {
252 "' in partition clause");
257 "Column '" + pkv.getColName() +
"' is not a partition column");
260 if (!mentionedColumnNames.add(pkv.getColName())) {
262 "Duplicate column '" + pkv.getColName() +
"' in partition clause");
264 if (!pkv.isDynamic()) {
265 numStaticPartitionExprs++;
267 selectExprTargetColumns.add(column);
274 selectListExprs.size(), numStaticPartitionExprs);
279 kv.analyze(analyzer);
306 new TableName(analyzer.getDefaultDb(), targetTableName_.getTbl());
312 analyzer.registerPrivReq(pb.onTable(table_.getDb().getName(), table_.getName())
319 String.format(
"Impala does not support inserting into views: %s",
325 int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
340 HdfsTable hdfsTable = (HdfsTable)
table_;
342 throw new AnalysisException(String.format(
"Unable to INSERT into target table " +
343 "(%s) because Impala does not have WRITE access to at least one HDFS path" +
347 throw new AnalysisException(String.format(
"Unable to INSERT into target table " +
356 }
catch (IOException e) {
360 for (
int colIdx = 0; colIdx < numClusteringCols; ++colIdx) {
361 Column col = hdfsTable.getColumns().
get(colIdx);
368 "partition column (%s) is not supported: %s", col.
getName(),
375 throw new AnalysisException(
"HBase doesn't have a way to perform INSERT OVERWRITE");
379 analyzer.getDescTbl().addReferencedTable(
table_);
388 Set<String> mentionedColumnNames,
int numSelectListExprs,
391 int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
393 if (selectExprTargetColumns.size() + numStaticPartitionExprs !=
394 table_.getColumns().size()) {
399 List<String> missingColumnNames = Lists.newArrayList();
401 if (!mentionedColumnNames.contains(column.getName())) {
405 if (isHBaseTable && column.getPosition() == 0) {
407 "' must be explicitly mentioned in column permutation.");
409 if (column.getPosition() < numClusteringCols) {
410 missingColumnNames.add(column.getName());
415 if (!missingColumnNames.isEmpty()) {
417 "Not enough partition columns mentioned in query. Missing columns are: " +
418 Joiner.on(
", ").join(missingColumnNames));
423 if (selectExprTargetColumns.size() != numSelectListExprs) {
425 (selectExprTargetColumns.size() < numSelectListExprs) ?
"fewer" :
"more";
426 String partitionClause =
434 int totalColumnsMentioned = numSelectListExprs + numStaticPartitionExprs;
436 "Target table '%s' has %s columns (%s) than the SELECT / VALUES clause %s" +
438 table_.getColumns().size(), partitionClause, totalColumnsMentioned));
440 String partitionPrefix =
443 "Column permutation %s %s columns (%s) than " +
444 "the SELECT / VALUES clause %s (%s)", partitionPrefix, comparator,
445 selectExprTargetColumns.size(), partitionClause, numSelectListExprs));
471 List<Expr> tmpPartitionKeyExprs =
new ArrayList<Expr>();
472 List<String> tmpPartitionKeyNames =
new ArrayList<String>();
474 int numClusteringCols = (tbl instanceof
HBaseTable) ? 0 : tbl.getNumClusteringCols();
477 for (
int i = 0; i < selectListExprs.size(); ++i) {
478 Column targetColumn = selectExprTargetColumns.get(i);
480 if (targetColumn.
getPosition() < numClusteringCols) {
482 tmpPartitionKeyExprs.add(compatibleExpr);
483 tmpPartitionKeyNames.add(targetColumn.getName());
485 selectListExprs.set(i, compatibleExpr);
492 if (pkv.isStatic()) {
494 Column tableColumn = table_.getColumn(pkv.getColName());
496 tmpPartitionKeyExprs.add(compatibleExpr);
497 tmpPartitionKeyNames.add(pkv.getColName());
506 for (
int j = 0; j < tmpPartitionKeyNames.size(); ++j) {
507 if (c.getName().equals(tmpPartitionKeyNames.get(j))) {
508 partitionKeyExprs_.add(tmpPartitionKeyExprs.get(j));
514 Preconditions.checkState(partitionKeyExprs_.size() == numClusteringCols);
517 expr.analyze(analyzer);
523 boolean matchFound =
false;
524 for (
int i = 0; i < selectListExprs.size(); ++i) {
525 if (selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) {
526 resultExprs_.add(selectListExprs.get(i));
535 if (tblColumn.getPosition() >= numClusteringCols) {
538 resultExprs_.add(NullLiteral.create(tblColumn.getType()));
545 List<SelectListItem> selectListItems = Lists.newArrayList();
551 queryStmt_.analyze(analyzer);
563 Type colType = column.getType();
564 Type exprType = expr.getType();
566 if (colType.equals(exprType) && !colType.isComplexType())
return expr;
568 Type compatibleType =
569 Type.getAssignmentCompatibleType(colType, exprType);
571 if (!compatibleType.
isValid()) {
574 "Target table '%s' is incompatible with SELECT / PARTITION expressions.\n" +
575 "Expression '%s' (type: %s) is not compatible with column '%s' (type: %s)",
580 if (!compatibleType.equals(colType) && !compatibleType.isNull()) {
582 String.format(
"Possible loss of precision for target table '%s'.\n" +
583 "Expression '%s' (type: %s) would need to be cast to %s" +
586 colType.toSql(), column.getName()));
589 return expr.castTo(compatibleType);
596 throw new AnalysisException(
"INSERT hints are only supported for inserting into " +
597 "partitioned Hdfs tables.");
600 if (hint.equalsIgnoreCase(
"SHUFFLE")) {
605 analyzer.setHasPlanHints();
606 }
else if (hint.equalsIgnoreCase(
"NOSHUFFLE")) {
611 analyzer.setHasPlanHints();
613 analyzer.addWarning(
"INSERT hint not recognized: " + hint);
635 Preconditions.checkState(
table_ != null);
641 StringBuilder strBuilder =
new StringBuilder();
643 if (
withClause_ != null) strBuilder.append(withClause_.toSql() +
" ");
645 strBuilder.append(
"INSERT ");
647 strBuilder.append(
"OVERWRITE ");
649 strBuilder.append(
"INTO ");
653 strBuilder.append(
"(");
655 strBuilder.append(
")");
658 List<String> values = Lists.newArrayList();
660 values.add(pkv.getColName() +
661 (pkv.getValue() != null ? (
"=" + pkv.getValue().
toSql()) :
""));
663 strBuilder.append(
" PARTITION (" + Joiner.on(
", ").join(values) +
")");
666 strBuilder.append(
" " + ToSqlUtils.getPlanHintsSql(
planHints_));
669 strBuilder.append(
" " + queryStmt_.toSql());
671 return strBuilder.toString();
ArrayList< Column > getColumnsInHiveOrder()
void setTargetTable(Table table)
Expr checkTypeCompatibility(Column column, Expr expr)
List< Expr > getPartitionKeyExprs()
void analyze(Analyzer analyzer)
void setQueryStmt(QueryStmt stmt)
void setTargetTable(Analyzer analyzer)
static final ScalarType BOOLEAN
InsertStmt(InsertStmt other)
TableName getTargetTableName()
ArrayList< Column > getColumns()
TableName targetTableName_
final boolean needsGeneratedQueryStatement_
static boolean isDistributedFileSystem(FileSystem fs)
void analyzePlanHints(Analyzer analyzer)
final List< String > columnPermutation_
final TableName originalTableName_
final ArrayList< Expr > resultExprs_
String getFirstLocationWithoutWriteAccess()
void checkColumnCoverage(ArrayList< Column > selectExprTargetColumns, Set< String > mentionedColumnNames, int numSelectListExprs, int numStaticPartitionExprs)
InsertStmt(WithClause withClause, TableName targetTable, boolean overwrite, List< PartitionKeyValue > partitionKeyValues, List< String > planHints, QueryStmt queryStmt, List< String > columnPermutation)
final List< Expr > partitionKeyExprs_
List< String > getPlanHints()
final List< String > planHints_
ArrayList< Expr > getResultExprs()
void prepareExpressions(List< Column > selectExprTargetColumns, List< Expr > selectListExprs, Table tbl, Analyzer analyzer)
final WithClause withClause_
boolean isFullyQualified()
DataSink createDataSink()
boolean spansMultipleFileSystems()
final List< PartitionKeyValue > partitionKeyValues_