15 package com.cloudera.impala.planner;
17 import java.util.ArrayList;
18 import java.util.List;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import com.cloudera.impala.thrift.TQueryOptions;
25 import com.google.common.base.Preconditions;
26 import com.google.common.collect.Lists;
27 import com.google.common.collect.Sets;
42 private final static Logger
LOG = LoggerFactory.getLogger(PipelinedPlanNodeSet.class);
50 private final ArrayList<PlanNode>
planNodes = Lists.newArrayList();
54 private final List<DataSink>
dataSinks = Lists.newArrayList();
62 Preconditions.checkNotNull(node.getFragment());
67 Preconditions.checkNotNull(sink);
78 TQueryOptions queryOptions) {
79 Set<PlanFragment> uniqueFragments = Sets.newHashSet();
86 long perHostHbaseScanMem = 0L;
87 long perHostHdfsScanMem = 0L;
88 long perHostNonScanMem = 0L;
90 for (
int i = 0; i < planNodes.size(); ++i) {
93 if (!fragment.isPartitioned() && excludeUnpartitionedFragments)
continue;
94 node.computeCosts(queryOptions);
95 uniqueFragments.add(fragment);
97 LOG.warn(String.format(
"Invalid per-host memory requirement %s of node %s.\n" +
98 "PlanNode stats are: numNodes_=%s ", node.getPerHostMemCost(),
99 node.getClass().getSimpleName(), node.
getNumNodes()));
102 perHostHbaseScanMem += node.getPerHostMemCost();
104 perHostHdfsScanMem += node.getPerHostMemCost();
106 perHostNonScanMem += node.getPerHostMemCost();
114 perHostHbaseScanMem =
115 Math.min(perHostHbaseScanMem, HBaseScanNode.getPerHostMemUpperBound());
117 Math.min(perHostHdfsScanMem, HdfsScanNode.getPerHostMemUpperBound());
119 long perHostDataSinkMem = 0L;
120 for (
int i = 0; i < dataSinks.size(); ++i) {
123 if (!fragment.isPartitioned() && excludeUnpartitionedFragments)
continue;
125 Preconditions.checkState(uniqueFragments.contains(fragment));
128 LOG.warn(String.format(
"Invalid per-host memory requirement %s of sink %s.\n",
129 sink.getPerHostMemCost(), sink.getClass().getSimpleName()));
131 perHostDataSinkMem += sink.getPerHostMemCost();
135 long perHostMem = perHostHdfsScanMem + perHostHbaseScanMem + perHostNonScanMem +
143 if (perHostMem >= 0 && perHostVcores >= 0) {
158 ArrayList<PipelinedPlanNodeSet> planNodeSets =
177 lhsSet.addSink(node.getFragment().getSink());
182 if (rhsSet == null) {
184 planNodeSets.add(rhsSet);
199 planNodeSets.add(lhsSet);
208 for (
PlanNode child: node.getChildren()) {
final List< DataSink > dataSinks
static ArrayList< PipelinedPlanNodeSet > computePlanNodeSets(PlanNode root)
static final long MIN_PER_HOST_MEM
final ArrayList< PlanNode > planNodes
boolean computeResourceEstimates(boolean excludeUnpartitionedFragments, TQueryOptions queryOptions)
void addSink(DataSink sink)
PlanFragment getFragment()
static final int MIN_PER_HOST_VCORES
static void computePlanNodeSets(PlanNode node, PipelinedPlanNodeSet lhsSet, PipelinedPlanNodeSet rhsSet, ArrayList< PipelinedPlanNodeSet > planNodeSets)