Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Planner.java
Go to the documentation of this file.
1 package com.cloudera.impala.planner;
2 
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.List;
6 
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 
19 import com.cloudera.impala.thrift.TExplainLevel;
20 import com.cloudera.impala.thrift.TQueryCtx;
21 import com.cloudera.impala.thrift.TQueryExecRequest;
22 import com.cloudera.impala.thrift.TTableName;
24 import com.google.common.base.Joiner;
25 import com.google.common.base.Preconditions;
26 import com.google.common.collect.Lists;
27 
31 public class Planner {
32  private final static Logger LOG = LoggerFactory.getLogger(Planner.class);
33 
34  private final PlannerContext ctx_;
35 
36  public Planner(AnalysisContext.AnalysisResult analysisResult, TQueryCtx queryCtx) {
37  ctx_ = new PlannerContext(analysisResult, queryCtx);
38  }
39 
44  public ArrayList<PlanFragment> createPlan() throws ImpalaException {
45  SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
46  DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
47  PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
48  ctx_.getRootAnalyzer().getTimeline().markEvent("Single node plan created");
49  ArrayList<PlanFragment> fragments = null;
50 
51  // Determine the maximum number of rows processed by any node in the plan tree
53  singleNodePlan.accept(visitor);
54  long maxRowsProcessed = visitor.get() == -1 ? Long.MAX_VALUE : visitor.get();
55  boolean isSmallQuery =
56  maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold;
57  if (isSmallQuery) {
58  // Execute on a single node and disable codegen for small results
59  ctx_.getQueryOptions().setNum_nodes(1);
60  ctx_.getQueryOptions().setDisable_codegen(true);
61  if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
62  maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
63  // Only one scanner thread for small queries
64  ctx_.getQueryOptions().setNum_scanner_threads(1);
65  }
66  }
67 
68  if (ctx_.isSingleNodeExec()) {
69  // create one fragment containing the entire single-node plan tree
70  fragments = Lists.newArrayList(new PlanFragment(
72  } else {
73  // create distributed plan
74  fragments = distributedPlanner.createPlanFragments(singleNodePlan);
75  }
76 
77  PlanFragment rootFragment = fragments.get(fragments.size() - 1);
78  if (ctx_.isInsertOrCtas()) {
79  InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
80  if (!ctx_.isSingleNodeExec()) {
81  // repartition on partition keys
82  rootFragment = distributedPlanner.createInsertFragment(
83  rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
84  }
85  // set up table sink for root fragment
86  rootFragment.setSink(insertStmt.createDataSink());
87  }
88 
89  ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph();
90  List<Expr> resultExprs = null;
91  Table targetTable = null;
92  if (ctx_.isInsertOrCtas()) {
93  InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
94  resultExprs = insertStmt.getResultExprs();
95  targetTable = insertStmt.getTargetTable();
96  graph.addTargetColumnLabels(targetTable);
97  } else {
98  resultExprs = ctx_.getQueryStmt().getResultExprs();
99  graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels());
100  }
101  resultExprs = Expr.substituteList(resultExprs,
102  rootFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer(), true);
103  rootFragment.setOutputExprs(resultExprs);
104  LOG.debug("desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString());
105  LOG.debug("resultexprs: " + Expr.debugString(rootFragment.getOutputExprs()));
106  LOG.debug("finalize plan fragments");
107  for (PlanFragment fragment: fragments) {
108  fragment.finalize(ctx_.getRootAnalyzer());
109  }
110 
111  Collections.reverse(fragments);
112  ctx_.getRootAnalyzer().getTimeline().markEvent("Distributed plan created");
113 
114  if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
115  // Compute the column lineage graph
116  if (ctx_.isInsertOrCtas()) {
117  Preconditions.checkNotNull(targetTable);
118  List<Expr> exprs = Lists.newArrayList();
119  if (targetTable instanceof HBaseTable) {
120  exprs.addAll(resultExprs);
121  } else {
122  exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs());
123  exprs.addAll(resultExprs.subList(0,
124  targetTable.getNonClusteringColumns().size()));
125  }
126  graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer());
127  } else {
128  graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer());
129  }
130  LOG.trace("lineage: " + graph.debugString());
131  ctx_.getRootAnalyzer().getTimeline().markEvent("Lineage info computed");
132  }
133 
134  return fragments;
135  }
136 
141  public String getExplainString(ArrayList<PlanFragment> fragments,
142  TQueryExecRequest request, TExplainLevel explainLevel) {
143  StringBuilder str = new StringBuilder();
144  boolean hasHeader = false;
145  if (request.isSetPer_host_mem_req() && request.isSetPer_host_vcores()) {
146  str.append(
147  String.format("Estimated Per-Host Requirements: Memory=%s VCores=%s\n",
148  PrintUtils.printBytes(request.getPer_host_mem_req()),
149  request.per_host_vcores));
150  hasHeader = true;
151  }
152  // Append warning about tables missing stats except for child queries of
153  // 'compute stats'. The parent_query_id is only set for compute stats child queries.
154  if (!request.query_ctx.isSetParent_query_id() &&
155  request.query_ctx.isSetTables_missing_stats() &&
156  !request.query_ctx.getTables_missing_stats().isEmpty()) {
157  List<String> tableNames = Lists.newArrayList();
158  for (TTableName tableName: request.query_ctx.getTables_missing_stats()) {
159  tableNames.add(tableName.db_name + "." + tableName.table_name);
160  }
161  str.append("WARNING: The following tables are missing relevant table " +
162  "and/or column statistics.\n" + Joiner.on(", ").join(tableNames) + "\n");
163  hasHeader = true;
164  }
165  if (request.query_ctx.isDisable_spilling()) {
166  str.append("WARNING: Spilling is disabled for this query as a safety guard.\n" +
167  "Reason: Query option disable_unsafe_spills is set, at least one table\n" +
168  "is missing relevant stats, and no plan hints were given.\n");
169  hasHeader = true;
170  }
171  if (hasHeader) str.append("\n");
172 
173  if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) {
174  // Print the non-fragmented parallel plan.
175  str.append(fragments.get(0).getExplainString(explainLevel));
176  } else {
177  // Print the fragmented parallel plan.
178  for (int i = 0; i < fragments.size(); ++i) {
179  PlanFragment fragment = fragments.get(i);
180  str.append(fragment.getExplainString(explainLevel));
181  if (explainLevel == TExplainLevel.VERBOSE && i + 1 != fragments.size()) {
182  str.append("\n");
183  }
184  }
185  }
186  return str.toString();
187  }
188 
196  public void computeResourceReqs(List<PlanFragment> fragments,
197  boolean excludeUnpartitionedFragments,
198  TQueryExecRequest request) {
199  Preconditions.checkState(!fragments.isEmpty());
200  Preconditions.checkNotNull(request);
201 
202  // Compute pipelined plan node sets.
203  ArrayList<PipelinedPlanNodeSet> planNodeSets =
204  PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot());
205 
206  // Compute the max of the per-host mem and vcores requirement.
207  // Note that the max mem and vcores may come from different plan node sets.
208  long maxPerHostMem = Long.MIN_VALUE;
209  int maxPerHostVcores = Integer.MIN_VALUE;
210  for (PipelinedPlanNodeSet planNodeSet: planNodeSets) {
211  if (!planNodeSet.computeResourceEstimates(
212  excludeUnpartitionedFragments, ctx_.getQueryOptions())) {
213  continue;
214  }
215  long perHostMem = planNodeSet.getPerHostMem();
216  int perHostVcores = planNodeSet.getPerHostVcores();
217  if (perHostMem > maxPerHostMem) maxPerHostMem = perHostMem;
218  if (perHostVcores > maxPerHostVcores) maxPerHostVcores = perHostVcores;
219  }
220 
221  // Do not ask for more cores than are in the RuntimeEnv.
222  maxPerHostVcores = Math.min(maxPerHostVcores, RuntimeEnv.INSTANCE.getNumCores());
223 
224  // Legitimately set costs to zero if there are only unpartitioned fragments
225  // and excludeUnpartitionedFragments is true.
226  if (maxPerHostMem == Long.MIN_VALUE || maxPerHostVcores == Integer.MIN_VALUE) {
227  boolean allUnpartitioned = true;
228  for (PlanFragment fragment: fragments) {
229  if (fragment.isPartitioned()) {
230  allUnpartitioned = false;
231  break;
232  }
233  }
234  if (allUnpartitioned && excludeUnpartitionedFragments) {
235  maxPerHostMem = 0;
236  maxPerHostVcores = 0;
237  }
238  }
239 
240  if (maxPerHostMem < 0 || maxPerHostMem == Long.MIN_VALUE) {
241  LOG.warn("Invalid per-host memory requirement: " + maxPerHostMem);
242  }
243  if (maxPerHostVcores < 0 || maxPerHostVcores == Integer.MIN_VALUE) {
244  LOG.warn("Invalid per-host virtual cores requirement: " + maxPerHostVcores);
245  }
246  request.setPer_host_mem_req(maxPerHostMem);
247  request.setPer_host_vcores((short) maxPerHostVcores);
248 
249  LOG.debug("Estimated per-host peak memory requirement: " + maxPerHostMem);
250  LOG.debug("Estimated per-host virtual cores requirement: " + maxPerHostVcores);
251  }
252 }
void computeResourceReqs(List< PlanFragment > fragments, boolean excludeUnpartitionedFragments, TQueryExecRequest request)
Definition: Planner.java:196
static final DataPartition UNPARTITIONED
final PlannerContext ctx_
Definition: Planner.java:34
Planner(AnalysisContext.AnalysisResult analysisResult, TQueryCtx queryCtx)
Definition: Planner.java:36
ArrayList< PlanFragment > createPlan()
Definition: Planner.java:44
String getExplainString(ArrayList< PlanFragment > fragments, TQueryExecRequest request, TExplainLevel explainLevel)
Definition: Planner.java:141