1 package com.cloudera.impala.planner;
3 import java.util.ArrayList;
4 import java.util.Collections;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
19 import com.cloudera.impala.thrift.TExplainLevel;
20 import com.cloudera.impala.thrift.TQueryCtx;
21 import com.cloudera.impala.thrift.TQueryExecRequest;
22 import com.cloudera.impala.thrift.TTableName;
24 import com.google.common.base.Joiner;
25 import com.google.common.base.Preconditions;
26 import com.google.common.collect.Lists;
32 private final static Logger
LOG = LoggerFactory.getLogger(Planner.class);
47 PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
48 ctx_.getRootAnalyzer().getTimeline().markEvent(
"Single node plan created");
49 ArrayList<PlanFragment> fragments = null;
53 singleNodePlan.accept(visitor);
54 long maxRowsProcessed = visitor.get() == -1 ? Long.MAX_VALUE : visitor.
get();
55 boolean isSmallQuery =
56 maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold;
59 ctx_.getQueryOptions().setNum_nodes(1);
60 ctx_.getQueryOptions().setDisable_codegen(
true);
62 maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
64 ctx_.getQueryOptions().setNum_scanner_threads(1);
74 fragments = distributedPlanner.createPlanFragments(singleNodePlan);
77 PlanFragment rootFragment = fragments.get(fragments.size() - 1);
79 InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
82 rootFragment = distributedPlanner.createInsertFragment(
83 rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
86 rootFragment.setSink(insertStmt.createDataSink());
90 List<Expr> resultExprs = null;
91 Table targetTable = null;
93 InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
94 resultExprs = insertStmt.getResultExprs();
95 targetTable = insertStmt.getTargetTable();
96 graph.addTargetColumnLabels(targetTable);
98 resultExprs = ctx_.getQueryStmt().getResultExprs();
99 graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels());
101 resultExprs = Expr.substituteList(resultExprs,
102 rootFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer(),
true);
103 rootFragment.setOutputExprs(resultExprs);
104 LOG.debug(
"desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString());
105 LOG.debug(
"resultexprs: " + Expr.debugString(rootFragment.getOutputExprs()));
106 LOG.debug(
"finalize plan fragments");
108 fragment.finalize(ctx_.getRootAnalyzer());
111 Collections.reverse(fragments);
112 ctx_.getRootAnalyzer().getTimeline().markEvent(
"Distributed plan created");
117 Preconditions.checkNotNull(targetTable);
118 List<Expr> exprs = Lists.newArrayList();
120 exprs.addAll(resultExprs);
122 exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs());
123 exprs.addAll(resultExprs.subList(0,
124 targetTable.getNonClusteringColumns().size()));
126 graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer());
128 graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer());
130 LOG.trace(
"lineage: " + graph.debugString());
131 ctx_.getRootAnalyzer().getTimeline().markEvent(
"Lineage info computed");
142 TQueryExecRequest request, TExplainLevel explainLevel) {
143 StringBuilder str =
new StringBuilder();
144 boolean hasHeader =
false;
145 if (request.isSetPer_host_mem_req() && request.isSetPer_host_vcores()) {
147 String.format(
"Estimated Per-Host Requirements: Memory=%s VCores=%s\n",
148 PrintUtils.printBytes(request.getPer_host_mem_req()),
149 request.per_host_vcores));
154 if (!request.query_ctx.isSetParent_query_id() &&
155 request.query_ctx.isSetTables_missing_stats() &&
156 !request.query_ctx.getTables_missing_stats().isEmpty()) {
157 List<String> tableNames = Lists.newArrayList();
158 for (TTableName tableName: request.query_ctx.getTables_missing_stats()) {
159 tableNames.add(tableName.db_name +
"." + tableName.table_name);
161 str.append(
"WARNING: The following tables are missing relevant table " +
162 "and/or column statistics.\n" + Joiner.on(
", ").join(tableNames) +
"\n");
165 if (request.query_ctx.isDisable_spilling()) {
166 str.append(
"WARNING: Spilling is disabled for this query as a safety guard.\n" +
167 "Reason: Query option disable_unsafe_spills is set, at least one table\n" +
168 "is missing relevant stats, and no plan hints were given.\n");
171 if (hasHeader) str.append(
"\n");
173 if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) {
178 for (
int i = 0; i < fragments.size(); ++i) {
180 str.append(fragment.getExplainString(explainLevel));
181 if (explainLevel == TExplainLevel.VERBOSE && i + 1 != fragments.size()) {
186 return str.toString();
197 boolean excludeUnpartitionedFragments,
198 TQueryExecRequest request) {
199 Preconditions.checkState(!fragments.isEmpty());
200 Preconditions.checkNotNull(request);
203 ArrayList<PipelinedPlanNodeSet> planNodeSets =
204 PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot());
208 long maxPerHostMem = Long.MIN_VALUE;
209 int maxPerHostVcores = Integer.MIN_VALUE;
211 if (!planNodeSet.computeResourceEstimates(
215 long perHostMem = planNodeSet.getPerHostMem();
216 int perHostVcores = planNodeSet.getPerHostVcores();
217 if (perHostMem > maxPerHostMem) maxPerHostMem = perHostMem;
218 if (perHostVcores > maxPerHostVcores) maxPerHostVcores = perHostVcores;
222 maxPerHostVcores = Math.min(maxPerHostVcores, RuntimeEnv.INSTANCE.getNumCores());
226 if (maxPerHostMem == Long.MIN_VALUE || maxPerHostVcores == Integer.MIN_VALUE) {
227 boolean allUnpartitioned =
true;
229 if (fragment.isPartitioned()) {
230 allUnpartitioned =
false;
234 if (allUnpartitioned && excludeUnpartitionedFragments) {
236 maxPerHostVcores = 0;
240 if (maxPerHostMem < 0 || maxPerHostMem == Long.MIN_VALUE) {
241 LOG.warn(
"Invalid per-host memory requirement: " + maxPerHostMem);
243 if (maxPerHostVcores < 0 || maxPerHostVcores == Integer.MIN_VALUE) {
244 LOG.warn(
"Invalid per-host virtual cores requirement: " + maxPerHostVcores);
246 request.setPer_host_mem_req(maxPerHostMem);
247 request.setPer_host_vcores((short) maxPerHostVcores);
249 LOG.debug(
"Estimated per-host peak memory requirement: " + maxPerHostMem);
250 LOG.debug(
"Estimated per-host virtual cores requirement: " + maxPerHostVcores);
void computeResourceReqs(List< PlanFragment > fragments, boolean excludeUnpartitionedFragments, TQueryExecRequest request)
boolean isSingleNodeExec()
static final DataPartition UNPARTITIONED
static RuntimeEnv INSTANCE
final PlannerContext ctx_
TQueryOptions getQueryOptions()
Planner(AnalysisContext.AnalysisResult analysisResult, TQueryCtx queryCtx)
PlanFragmentId getNextFragmentId()
ArrayList< PlanFragment > createPlan()
String getExplainString(ArrayList< PlanFragment > fragments, TQueryExecRequest request, TExplainLevel explainLevel)