Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AnalyticExpr.java
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.analysis;
16 
17 import java.math.BigDecimal;
18 import java.math.BigInteger;
19 import java.util.ArrayList;
20 import java.util.List;
21 
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24 
32 import com.cloudera.impala.common.TreeNode;
34 import com.cloudera.impala.thrift.TColumnValue;
35 import com.cloudera.impala.thrift.TExprNode;
37 import com.google.common.base.Joiner;
38 import com.google.common.base.Objects;
39 import com.google.common.base.Preconditions;
40 import com.google.common.collect.Lists;
41 
60 public class AnalyticExpr extends Expr {
61  private final static Logger LOG = LoggerFactory.getLogger(AnalyticExpr.class);
62 
64  private final List<Expr> partitionExprs_;
65  // These elements are modified to point to the corresponding child exprs to keep them
66  // in sync through expr substitutions.
67  private List<OrderByElement> orderByElements_ = Lists.newArrayList();
69 
70  // If set, requires the window to be set to null in resetAnalysisState(). Required for
71  // proper substitution/cloning because standardization may set a window that is illegal
72  // in SQL, and hence, will fail analysis().
73  private boolean resetWindow_ = false;
74 
75  // SQL string of this AnalyticExpr before standardization. Returned in toSqlImpl().
76  private String sqlString_;
77 
78  private static String LEAD = "lead";
79  private static String LAG = "lag";
80  private static String FIRSTVALUE = "first_value";
81  private static String LASTVALUE = "last_value";
82  private static String RANK = "rank";
83  private static String DENSERANK = "dense_rank";
84  private static String ROWNUMBER = "row_number";
85  private static String MIN = "min";
86  private static String MAX = "max";
87 
88  // Internal function used to implement FIRST_VALUE with a window rewrite and
89  // additional null handling in the backend.
90  public static String FIRST_VALUE_REWRITE = "first_value_rewrite";
91 
92  public AnalyticExpr(FunctionCallExpr fnCall, List<Expr> partitionExprs,
93  List<OrderByElement> orderByElements, AnalyticWindow window) {
94  Preconditions.checkNotNull(fnCall);
95  fnCall_ = fnCall;
96  partitionExprs_ = partitionExprs != null ? partitionExprs : new ArrayList<Expr>();
97  if (orderByElements != null) orderByElements_.addAll(orderByElements);
98  window_ = window;
99  setChildren();
100  }
101 
105  protected AnalyticExpr(AnalyticExpr other) {
106  super(other);
108  for (OrderByElement e: other.orderByElements_) {
109  orderByElements_.add(e.clone());
110  }
111  partitionExprs_ = Expr.cloneList(other.partitionExprs_);
112  window_ = (other.window_ != null ? other.window_.clone() : null);
113  resetWindow_ = other.resetWindow_;
114  sqlString_ = other.sqlString_;
115  setChildren();
116  }
117 
118  public FunctionCallExpr getFnCall() { return fnCall_; }
119  public List<Expr> getPartitionExprs() { return partitionExprs_; }
120  public List<OrderByElement> getOrderByElements() { return orderByElements_; }
121  public AnalyticWindow getWindow() { return window_; }
122 
123  @Override
124  public boolean equals(Object obj) {
125  if (!super.equals(obj)) return false;
126  AnalyticExpr o = (AnalyticExpr)obj;
127  if (!fnCall_.equals(o.getFnCall())) return false;
128  if ((window_ == null) != (o.window_ == null)) return false;
129  if (window_ != null) {
130  if (!window_.equals(o.window_)) return false;
131  }
132  return orderByElements_.equals(o.orderByElements_);
133  }
134 
138  @Override
139  public boolean isConstant() { return false; }
140 
141  @Override
142  public Expr clone() { return new AnalyticExpr(this); }
143 
144  @Override
145  public String toSqlImpl() {
146  if (sqlString_ != null) return sqlString_;
147  StringBuilder sb = new StringBuilder();
148  sb.append(fnCall_.toSql()).append(" OVER (");
149  boolean needsSpace = false;
150  if (!partitionExprs_.isEmpty()) {
151  sb.append("PARTITION BY ").append(Expr.toSql(partitionExprs_));
152  needsSpace = true;
153  }
154  if (!orderByElements_.isEmpty()) {
155  List<String> orderByStrings = Lists.newArrayList();
157  orderByStrings.add(e.toSql());
158  }
159  if (needsSpace) sb.append(" ");
160  sb.append("ORDER BY ").append(Joiner.on(", ").join(orderByStrings));
161  needsSpace = true;
162  }
163  if (window_ != null) {
164  if (needsSpace) sb.append(" ");
165  sb.append(window_.toSql());
166  }
167  sb.append(")");
168  return sb.toString();
169  }
170 
171  @Override
172  public String debugString() {
173  return Objects.toStringHelper(this)
174  .add("fn", getFnCall())
175  .add("window", window_)
176  .addValue(super.debugString())
177  .toString();
178  }
179 
180  @Override
181  protected void toThrift(TExprNode msg) {
182  }
183 
184  public static boolean isAnalyticFn(Function fn) {
185  return fn instanceof AggregateFunction
186  && ((AggregateFunction) fn).isAnalyticFn();
187  }
188 
189  public static boolean isAggregateFn(Function fn) {
190  return fn instanceof AggregateFunction
191  && ((AggregateFunction) fn).isAggregateFn();
192  }
193 
194  static private boolean isOffsetFn(Function fn) {
195  if (!isAnalyticFn(fn)) return false;
196  return fn.functionName().equals(LEAD) || fn.functionName().equals(LAG);
197  }
198 
199  static private boolean isMinMax(Function fn) {
200  if (!isAnalyticFn(fn)) return false;
201  return fn.functionName().equals(MIN) || fn.functionName().equals(MAX);
202  }
203 
204  static private boolean isRankingFn(Function fn) {
205  if (!isAnalyticFn(fn)) return false;
206  return fn.functionName().equals(RANK)
207  || fn.functionName().equals(DENSERANK)
208  || fn.functionName().equals(ROWNUMBER);
209  }
210 
215  private void checkRangeOffsetBoundaryExpr(AnalyticWindow.Boundary boundary)
216  throws AnalysisException {
217  Preconditions.checkState(boundary.getType().isOffset());
218  if (orderByElements_.size() > 1) {
219  throw new AnalysisException("Only one ORDER BY expression allowed if used with "
220  + "a RANGE window with PRECEDING/FOLLOWING: " + toSql());
221  }
222  Expr rangeExpr = boundary.getExpr();
224  rangeExpr.getType(), orderByElements_.get(0).getExpr().getType())) {
225  throw new AnalysisException(
226  "The value expression of a PRECEDING/FOLLOWING clause of a RANGE window must "
227  + "be implicitly convertable to the ORDER BY expression's type: "
228  + rangeExpr.toSql() + " cannot be implicitly converted to "
229  + orderByElements_.get(0).getExpr().getType().toSql());
230  }
231  }
232 
236  void checkOffset(Analyzer analyzer) throws AnalysisException {
237  Preconditions.checkState(isOffsetFn(getFnCall().getFn()));
238  Preconditions.checkState(getFnCall().getChildren().size() > 1);
239  Expr offset = getFnCall().getChild(1);
240  Preconditions.checkState(offset.getType().isIntegerType());
241  boolean isPosConstant = true;
242  if (!offset.isConstant()) {
243  isPosConstant = false;
244  } else {
245  try {
246  TColumnValue val = FeSupport.EvalConstExpr(offset, analyzer.getQueryCtx());
247  if (TColumnValueUtil.getNumericVal(val) <= 0) isPosConstant = false;
248  } catch (InternalException exc) {
249  throw new AnalysisException(
250  "Couldn't evaluate LEAD/LAG offset: " + exc.getMessage());
251  }
252  }
253  if (!isPosConstant) {
254  throw new AnalysisException(
255  "The offset parameter of LEAD/LAG must be a constant positive integer: "
256  + getFnCall().toSql());
257  }
258  }
259 
260  @Override
261  public void analyze(Analyzer analyzer) throws AnalysisException {
262  if (isAnalyzed_) return;
263  fnCall_.analyze(analyzer);
264  super.analyze(analyzer);
265  type_ = getFnCall().getType();
266 
267  for (Expr e: partitionExprs_) {
268  if (e.isConstant()) {
269  throw new AnalysisException(
270  "Expressions in the PARTITION BY clause must not be constant: "
271  + e.toSql() + " (in " + toSql() + ")");
272  }
273  }
275  if (e.getExpr().isConstant()) {
276  throw new AnalysisException(
277  "Expressions in the ORDER BY clause must not be constant: "
278  + e.getExpr().toSql() + " (in " + toSql() + ")");
279  }
280  }
281 
282  if (getFnCall().getParams().isDistinct()) {
283  throw new AnalysisException(
284  "DISTINCT not allowed in analytic function: " + getFnCall().toSql());
285  }
286 
287  // check for correct composition of analytic expr
288  Function fn = getFnCall().getFn();
289  if (!(fn instanceof AggregateFunction)) {
290  throw new AnalysisException(
291  "OVER clause requires aggregate or analytic function: "
292  + getFnCall().toSql());
293  }
294 
295  // check for non-analytic aggregate functions
296  if (!isAnalyticFn(fn)) {
297  throw new AnalysisException(
298  String.format("Aggregate function '%s' not supported with OVER clause.",
299  getFnCall().toSql()));
300  }
301 
302  if (isAnalyticFn(fn) && !isAggregateFn(fn)) {
303  if (orderByElements_.isEmpty()) {
304  throw new AnalysisException(
305  "'" + getFnCall().toSql() + "' requires an ORDER BY clause");
306  }
307  if ((isRankingFn(fn) || isOffsetFn(fn)) && window_ != null) {
308  throw new AnalysisException(
309  "Windowing clause not allowed with '" + getFnCall().toSql() + "'");
310  }
311  if (isOffsetFn(fn) && getFnCall().getChildren().size() > 1) {
312  checkOffset(analyzer);
313  // check the default, which needs to be a constant at the moment
314  // TODO: remove this check when the backend can handle non-constants
315  if (getFnCall().getChildren().size() > 2) {
316  if (!getFnCall().getChild(2).isConstant()) {
317  throw new AnalysisException(
318  "The default parameter (parameter 3) of LEAD/LAG must be a constant: "
319  + getFnCall().toSql());
320  }
321  }
322  }
323  }
324 
325  if (window_ != null) {
326  if (orderByElements_.isEmpty()) {
327  throw new AnalysisException("Windowing clause requires ORDER BY clause: "
328  + toSql());
329  }
330  window_.analyze(analyzer);
331 
332  if (!orderByElements_.isEmpty()
333  && window_.getType() == AnalyticWindow.Type.RANGE) {
334  // check that preceding/following ranges match ordering
335  if (window_.getLeftBoundary().getType().isOffset()) {
337  }
338  if (window_.getRightBoundary() != null
339  && window_.getRightBoundary().getType().isOffset()) {
341  }
342  }
343  }
344 
345  // check nesting
346  if (TreeNode.contains(getChildren(), AnalyticExpr.class)) {
347  throw new AnalysisException(
348  "Nesting of analytic expressions is not allowed: " + toSql());
349  }
350  sqlString_ = toSql();
351 
352  standardize(analyzer);
353 
354  // min/max is not currently supported on sliding windows (i.e. start bound is not
355  // unbounded).
356  if (window_ != null && isMinMax(fn) &&
358  throw new AnalysisException(
359  "'" + getFnCall().toSql() + "' is only supported with an "
360  + "UNBOUNDED PRECEDING start bound.");
361  }
362 
363  setChildren();
364  }
365 
404  private void standardize(Analyzer analyzer) {
405  FunctionName analyticFnName = getFnCall().getFnName();
406 
407  // Set a window from UNBOUNDED PRECEDING to CURRENT_ROW for row_number().
408  if (analyticFnName.getFunction().equals(ROWNUMBER)) {
409  Preconditions.checkState(window_ == null, "Unexpected window set for row_numer()");
410  window_ = new AnalyticWindow(AnalyticWindow.Type.ROWS,
412  new Boundary(BoundaryType.CURRENT_ROW, null));
413  resetWindow_ = true;
414  return;
415  }
416 
417  // Explicitly set the default arguments to lead()/lag() for BE simplicity.
418  // Set a window for lead(): UNBOUNDED PRECEDING to OFFSET FOLLOWING,
419  // Set a window for lag(): UNBOUNDED PRECEDING to OFFSET PRECEDING.
420  if (isOffsetFn(getFnCall().getFn())) {
421  Preconditions.checkState(window_ == null);
422 
423  // If necessary, create a new fn call with the default args explicitly set.
424  List<Expr> newExprParams = null;
425  if (getFnCall().getChildren().size() == 1) {
426  newExprParams = Lists.newArrayListWithExpectedSize(3);
427  newExprParams.addAll(getFnCall().getChildren());
428  // Default offset is 1.
429  newExprParams.add(new NumericLiteral(BigDecimal.valueOf(1)));
430  // Default default value is NULL.
431  newExprParams.add(new NullLiteral());
432  } else if (getFnCall().getChildren().size() == 2) {
433  newExprParams = Lists.newArrayListWithExpectedSize(3);
434  newExprParams.addAll(getFnCall().getChildren());
435  // Default default value is NULL.
436  newExprParams.add(new NullLiteral());
437  } else {
438  Preconditions.checkState(getFnCall().getChildren().size() == 3);
439  }
440  if (newExprParams != null) {
441  fnCall_ = new FunctionCallExpr(getFnCall().getFnName(),
442  new FunctionParams(newExprParams));
443  fnCall_.setIsAnalyticFnCall(true);
444  fnCall_.analyzeNoThrow(analyzer);
445  }
446 
447  // Set the window.
448  BoundaryType rightBoundaryType = BoundaryType.FOLLOWING;
449  if (analyticFnName.getFunction().equals(LAG)) {
450  rightBoundaryType = BoundaryType.PRECEDING;
451  }
452  window_ = new AnalyticWindow(AnalyticWindow.Type.ROWS,
454  new Boundary(rightBoundaryType, getOffsetExpr(getFnCall())));
455  try {
456  window_.analyze(analyzer);
457  } catch (AnalysisException e) {
458  throw new IllegalStateException(e);
459  }
460  resetWindow_ = true;
461  return;
462  }
463 
464  if (analyticFnName.getFunction().equals(FIRSTVALUE)
465  && window_ != null
467  if (window_.getLeftBoundary().getType() != BoundaryType.PRECEDING) {
468  window_ = new AnalyticWindow(window_.getType(), window_.getLeftBoundary(),
470  fnCall_ = new FunctionCallExpr(new FunctionName("last_value"),
471  getFnCall().getParams());
472  } else {
473  List<Expr> paramExprs = Expr.cloneList(getFnCall().getParams().exprs());
474  if (window_.getRightBoundary().getType() == BoundaryType.PRECEDING) {
475  // The number of rows preceding for the end bound determines the number of
476  // rows at the beginning of each partition that should have a NULL value.
477  paramExprs.add(new NumericLiteral(window_.getRightBoundary().getOffsetValue(),
478  Type.BIGINT));
479  } else {
480  // -1 indicates that no NULL values are inserted even though we set the end
481  // bound to the start bound (which is PRECEDING) below; this is different from
482  // the default behavior of windows with an end bound PRECEDING.
483  paramExprs.add(new NumericLiteral(BigInteger.valueOf(-1), Type.BIGINT));
484  }
485 
488  window_.getLeftBoundary());
489  fnCall_ = new FunctionCallExpr(new FunctionName("first_value_rewrite"),
490  new FunctionParams(paramExprs));
491  fnCall_.setIsInternalFnCall(true);
492  }
493  fnCall_.setIsAnalyticFnCall(true);
494  fnCall_.analyzeNoThrow(analyzer);
495  type_ = fnCall_.getReturnType();
496  analyticFnName = getFnCall().getFnName();
497  }
498 
499  // Reverse the ordering and window for windows ending with UNBOUNDED FOLLOWING,
500  // and and not starting with UNBOUNDED PRECEDING.
501  if (window_ != null
504  orderByElements_ = OrderByElement.reverse(orderByElements_);
505  window_ = window_.reverse();
506 
507  // Also flip first_value()/last_value(). For other analytic functions there is no
508  // need to also change the function.
509  FunctionName reversedFnName = null;
510  if (analyticFnName.getFunction().equals(FIRSTVALUE)) {
511  reversedFnName = new FunctionName(LASTVALUE);
512  } else if (analyticFnName.getFunction().equals(LASTVALUE)) {
513  reversedFnName = new FunctionName(FIRSTVALUE);
514  }
515  if (reversedFnName != null) {
516  fnCall_ = new FunctionCallExpr(reversedFnName, getFnCall().getParams());
517  fnCall_.setIsAnalyticFnCall(true);
518  fnCall_.analyzeNoThrow(analyzer);
519  }
520  analyticFnName = getFnCall().getFnName();
521  }
522 
523  // Set the upper boundary to CURRENT_ROW for first_value() if the lower boundary
524  // is UNBOUNDED_PRECEDING.
525  if (window_ != null
528  && analyticFnName.getFunction().equals(FIRSTVALUE)) {
529  window_.setRightBoundary(new Boundary(BoundaryType.CURRENT_ROW, null));
530  }
531 
532  // Set the default window.
533  if (!orderByElements_.isEmpty() && window_ == null) {
535  resetWindow_ = true;
536  }
537  }
538 
542  private Expr getOffsetExpr(FunctionCallExpr offsetFnCall) {
543  Preconditions.checkState(isOffsetFn(getFnCall().getFn()));
544  if (offsetFnCall.getChild(1) != null) return offsetFnCall.getChild(1);
545  // The default offset is 1.
546  return new NumericLiteral(BigDecimal.valueOf(1));
547  }
548 
552  private void syncWithChildren() {
553  int numArgs = fnCall_.getChildren().size();
554  for (int i = 0; i < numArgs; ++i) {
555  fnCall_.setChild(i, getChild(i));
556  }
557  int numPartitionExprs = partitionExprs_.size();
558  for (int i = 0; i < numPartitionExprs; ++i) {
559  partitionExprs_.set(i, getChild(numArgs + i));
560  }
561  for (int i = 0; i < orderByElements_.size(); ++i) {
562  orderByElements_.get(i).setExpr(getChild(numArgs + numPartitionExprs + i));
563  }
564  }
565 
569  private void setChildren() {
570  getChildren().clear();
571  addChildren(fnCall_.getChildren());
572  addChildren(partitionExprs_);
574  addChild(e.getExpr());
575  }
576  if (window_ != null) {
577  if (window_.getLeftBoundary().getExpr() != null) {
578  addChild(window_.getLeftBoundary().getExpr());
579  }
580  if (window_.getRightBoundary() != null
581  && window_.getRightBoundary().getExpr() != null) {
582  addChild(window_.getRightBoundary().getExpr());
583  }
584  }
585  }
586 
587  @Override
588  protected void resetAnalysisState() {
589  super.resetAnalysisState();
590  fnCall_.resetAnalysisState();
591  if (resetWindow_) window_ = null;
592  resetWindow_ = false;
593  // sync with children, now that they've been reset
595  }
596 
597  @Override
599  throws AnalysisException {
600  Expr e = super.substituteImpl(smap, analyzer);
601  if (!(e instanceof AnalyticExpr)) return e;
602  // Re-sync state after possible child substitution.
603  ((AnalyticExpr) e).syncWithChildren();
604  return e;
605  }
606 }
AnalyticExpr(FunctionCallExpr fnCall, List< Expr > partitionExprs, List< OrderByElement > orderByElements, AnalyticWindow window)
static final ScalarType BIGINT
Definition: Type.java:50
static boolean isAggregateFn(Function fn)
static final AnalyticWindow DEFAULT_WINDOW
Expr getOffsetExpr(FunctionCallExpr offsetFnCall)
static boolean isImplicitlyCastable(Type t1, Type t2)
Definition: Type.java:259
List< OrderByElement > getOrderByElements()
static boolean isOffsetFn(Function fn)
static boolean isMinMax(Function fn)
static double getNumericVal(TColumnValue val)
static boolean isRankingFn(Function fn)
uint8_t offset[7 *64-sizeof(uint64_t)]
Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer)
void checkRangeOffsetBoundaryExpr(AnalyticWindow.Boundary boundary)
static boolean isAnalyticFn(Function fn)