15 package com.cloudera.impala.planner;
17 import static org.junit.Assert.fail;
20 import java.io.FileWriter;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.List;
28 import java.util.regex.Matcher;
29 import java.util.regex.Pattern;
31 import org.apache.hadoop.fs.Path;
32 import org.junit.AfterClass;
33 import org.junit.BeforeClass;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
51 import com.cloudera.impala.thrift.ImpalaInternalServiceConstants;
52 import com.cloudera.impala.thrift.TDescriptorTable;
53 import com.cloudera.impala.thrift.TExecRequest;
54 import com.cloudera.impala.thrift.TExplainLevel;
55 import com.cloudera.impala.thrift.THBaseKeyRange;
56 import com.cloudera.impala.thrift.THdfsFileSplit;
57 import com.cloudera.impala.thrift.THdfsPartition;
58 import com.cloudera.impala.thrift.THdfsScanNode;
59 import com.cloudera.impala.thrift.THdfsTable;
60 import com.cloudera.impala.thrift.TNetworkAddress;
61 import com.cloudera.impala.thrift.TPlanFragment;
62 import com.cloudera.impala.thrift.TPlanNode;
63 import com.cloudera.impala.thrift.TQueryCtx;
64 import com.cloudera.impala.thrift.TQueryExecRequest;
65 import com.cloudera.impala.thrift.TQueryOptions;
66 import com.cloudera.impala.thrift.TScanRangeLocations;
67 import com.cloudera.impala.thrift.TTableDescriptor;
68 import com.cloudera.impala.thrift.TTupleDescriptor;
69 import com.google.common.base.Joiner;
70 import com.google.common.base.Preconditions;
71 import com.google.common.collect.Lists;
72 import com.google.common.collect.Maps;
73 import com.google.common.collect.Sets;
76 private final static Logger
LOG = LoggerFactory.getLogger(PlannerTest.class);
80 private final String
testDir_ =
"functional-planner/queries/PlannerTest";
81 private final String
outDir_ =
"/tmp/PlannerTest/";
84 private final Map<Integer, TPlanNode>
planMap_ = Maps.newHashMap();
86 private final Map<Integer, TTupleDescriptor>
tupleMap_ = Maps.newHashMap();
88 private final Map<Integer, TTableDescriptor>
tableMap_ = Maps.newHashMap();
91 public static void setUp() throws Exception {
93 RuntimeEnv.INSTANCE.setNumCores(8);
95 RuntimeEnv.INSTANCE.setTestEnv(
true);
100 RuntimeEnv.INSTANCE.reset();
113 for (TPlanFragment frag: execRequest.fragments) {
114 for (TPlanNode node: frag.plan.nodes) {
115 planMap_.put(node.node_id, node);
118 if (execRequest.isSetDesc_tbl()) {
119 TDescriptorTable descTbl = execRequest.desc_tbl;
120 for (TTupleDescriptor tupleDesc: descTbl.tupleDescriptors) {
121 tupleMap_.put(tupleDesc.id, tupleDesc);
123 if (descTbl.isSetTableDescriptors()) {
124 for (TTableDescriptor tableDesc: descTbl.tableDescriptors) {
125 tableMap_.put(tableDesc.id, tableDesc);
136 TPlanNode node = planMap_.get(nodeId);
137 Preconditions.checkNotNull(node);
138 Preconditions.checkState(node.node_id == nodeId && node.isSetHdfs_scan_node());
139 THdfsScanNode scanNode = node.getHdfs_scan_node();
140 int tupleId = scanNode.getTuple_id();
141 TTupleDescriptor tupleDesc = tupleMap_.get(tupleId);
142 Preconditions.checkNotNull(tupleDesc);
143 Preconditions.checkState(tupleDesc.id == tupleId);
144 TTableDescriptor tableDesc = tableMap_.get(tupleDesc.tableId);
145 Preconditions.checkNotNull(tableDesc);
146 Preconditions.checkState(tableDesc.id == tupleDesc.tableId &&
147 tableDesc.isSetHdfsTable());
148 THdfsTable hdfsTable = tableDesc.getHdfsTable();
149 THdfsPartition partition = hdfsTable.getPartitions().
get(split.partition_id);
150 Preconditions.checkNotNull(partition);
151 Preconditions.checkState(partition.id == split.partition_id);
162 String query, StringBuilder errorLog) {
163 long insertTableId = -1;
165 Set<THdfsPartition> scanRangePartitions = Sets.newHashSet();
166 if (execRequest.per_node_scan_ranges != null) {
167 for (Map.Entry<Integer, List<TScanRangeLocations>> entry:
168 execRequest.per_node_scan_ranges.entrySet()) {
169 if (entry.getValue() == null) {
172 for (TScanRangeLocations locations: entry.getValue()) {
173 if (locations.scan_range.isSetHdfs_file_split()) {
174 THdfsFileSplit split = locations.scan_range.getHdfs_file_split();
175 THdfsPartition partition =
findPartition(entry.getKey(), split);
176 scanRangePartitions.add(partition);
181 if (execRequest.isSetFinalize_params()) {
182 insertTableId = execRequest.getFinalize_params().getTable_id();
184 boolean first =
true;
187 if (execRequest.isSetDesc_tbl() && execRequest.desc_tbl.isSetTableDescriptors()) {
188 for (TTableDescriptor tableDesc: execRequest.desc_tbl.tableDescriptors) {
190 if (tableDesc.getId() == insertTableId)
continue;
191 if (!tableDesc.isSetHdfsTable())
continue;
192 THdfsTable hdfsTable = tableDesc.getHdfsTable();
193 for (Map.Entry<Long, THdfsPartition> e :
194 hdfsTable.getPartitions().entrySet()) {
195 THdfsPartition partition = e.getValue();
196 if (!scanRangePartitions.contains(partition)) {
197 if (first) errorLog.append(
"query:\n" + query +
"\n");
199 " unreferenced partition: HdfsTable: " + tableDesc.getId() +
200 " HdfsPartition: " + partition.getId() +
"\n");
212 StringBuilder result =
new StringBuilder();
213 if (execRequest.per_node_scan_ranges == null) {
216 for (Map.Entry<Integer, List<TScanRangeLocations>> entry:
217 execRequest.per_node_scan_ranges.entrySet()) {
218 result.append(
"NODE " + entry.getKey().toString() +
":\n");
219 if (entry.getValue() == null) {
223 for (TScanRangeLocations locations: entry.getValue()) {
226 if (locations.scan_range.isSetHdfs_file_split()) {
227 THdfsFileSplit split = locations.scan_range.getHdfs_file_split();
228 THdfsPartition partition =
findPartition(entry.getKey(), split);
229 Path filePath =
new Path(partition.getLocation(), split.file_name);
231 result.append(
"HDFS SPLIT " + filePath.toString() +
" "
232 + Long.toString(split.offset) +
":" + Long.toString(split.length));
234 if (locations.scan_range.isSetHbase_key_range()) {
235 THBaseKeyRange keyRange = locations.scan_range.getHbase_key_range();
236 Integer hostIdx = locations.locations.get(0).host_idx;
237 TNetworkAddress networkAddress = execRequest.getHost_list().
get(hostIdx);
238 result.append(
"HBASE KEYRANGE ");
239 result.append(
"port=" + networkAddress.port +
" ");
240 if (keyRange.isSetStartKey()) {
241 result.append(HBaseScanNode.printKey(keyRange.getStartKey().getBytes()));
243 result.append(
"<unbounded>");
246 if (keyRange.isSetStopKey()) {
247 result.append(HBaseScanNode.printKey(keyRange.getStopKey().getBytes()));
249 result.append(
"<unbounded>");
265 String fileName = path.getName();
266 Pattern pattern = Pattern.compile(
"\\w{16}-\\w{16}_\\d+_data");
267 Matcher matcher = pattern.matcher(fileName);
268 fileName = matcher.replaceFirst(
"<UID>_data");
269 return new Path(path.getParent(), fileName);
280 if (expectedPlan.isEmpty())
return null;
281 if (!expectedPlan.get(0).toLowerCase().startsWith(
"not implemented"))
return null;
283 int ix = expectedPlan.get(0).indexOf(
":");
285 return expectedPlan.get(0).substring(ix + 1).trim();
292 StringBuilder errorLog, StringBuilder actualOutput, Throwable e) {
293 boolean isImplemented = expectedErrorMsg == null;
294 actualOutput.append(
"not implemented: " + e.getMessage() +
"\n");
296 errorLog.append(
"query:\n" + query +
"\nPLAN not implemented: "
297 + e.getMessage() +
"\n");
300 if (expectedErrorMsg != null && !expectedErrorMsg.isEmpty()) {
301 if (!e.getMessage().toLowerCase().equals(expectedErrorMsg.toLowerCase())) {
302 errorLog.append(
"query:\n" + query +
"\nExpected error message: '"
303 + expectedErrorMsg +
"'\nActual error message: '"
304 + e.getMessage() +
"'\n");
314 for(TQueryOptions._Fields f : TQueryOptions._Fields.values()) {
316 a.setFieldValue(f, b.getFieldValue(f));
323 TQueryOptions options =
new TQueryOptions();
324 options.setExplain_level(TExplainLevel.STANDARD);
325 options.setAllow_unsupported_formats(
true);
326 options.setExec_single_node_rows_threshold(0);
339 StringBuilder actualOutput, String dbName, TQueryOptions options)
342 if (options == null) {
348 String query = testCase.getQuery();
349 LOG.info(
"running query " + query);
350 if (query.isEmpty()) {
351 throw new IllegalStateException(
"Cannot plan empty query in line: " +
352 testCase.getStartingLineNum());
354 TQueryCtx queryCtx = TestUtils.createQueryContext(
355 dbName, System.getProperty(
"user.name"));
356 queryCtx.request.query_options = options;
369 StringBuilder errorLog, StringBuilder actualOutput)
throws CatalogException {
370 ArrayList<String> expectedPlan = testCase.getSectionContents(
Section.PLAN);
372 if (expectedPlan == null || expectedPlan.isEmpty())
return;
373 String query = testCase.getQuery();
375 queryCtx.request.getQuery_options().setNum_nodes(1);
376 queryCtx.request.setStmt(query);
377 boolean isImplemented = expectedErrorMsg == null;
378 StringBuilder explainBuilder =
new StringBuilder();
380 TExecRequest execRequest = null;
381 String locationsStr = null;
382 actualOutput.append(Section.PLAN.getHeader() +
"\n");
384 execRequest = frontend_.createExecRequest(queryCtx, explainBuilder);
385 buildMaps(execRequest.query_exec_request);
387 actualOutput.append(explainStr);
388 if (!isImplemented) {
390 "query produced PLAN\nquery=" + query +
"\nplan=\n" + explainStr);
392 LOG.info(
"single-node plan: " + explainStr);
393 String result = TestUtils.compareOutput(
394 Lists.newArrayList(explainStr.split(
"\n")), expectedPlan,
true);
395 if (!result.isEmpty()) {
396 errorLog.append(
"section " + Section.PLAN.toString() +
" of query:\n" + query
400 if (execRequest.isSetQuery_exec_request()) {
409 "query:\n" + query +
"\nanalysis error: " + e.getMessage() +
"\n");
413 "query:\n" + query +
"\ninternal error: " + e.getMessage() +
"\n");
419 throw (CatalogException) e;
422 "query:\n" + query +
"\nunhandled exception: " + e.getMessage() +
"\n");
427 LOG.info(
"scan range locations: " + locationsStr);
428 ArrayList<String> expectedLocations =
431 if (expectedLocations.size() > 0 && locationsStr != null) {
433 String result = TestUtils.compareOutput(
434 Lists.newArrayList(locationsStr.split(
"\n")), expectedLocations,
false);
435 if (!result.isEmpty()) {
436 errorLog.append(
"section " + Section.SCANRANGELOCATIONS +
" of query:\n"
437 + query +
"\n" + result);
439 actualOutput.append(Section.SCANRANGELOCATIONS.getHeader() +
"\n");
443 ArrayList<String> locations = Lists.newArrayList(locationsStr.split(
"\n"));
444 ArrayList<String> perNodeLocations = Lists.newArrayList();
446 for (
int i = 0; i < locations.size(); ++i) {
447 if (locations.get(i).startsWith(
"NODE")) {
448 if (!perNodeLocations.isEmpty()) {
449 Collections.sort(perNodeLocations);
450 actualOutput.append(Joiner.on(
"\n").join(perNodeLocations)).append(
"\n");
451 perNodeLocations.clear();
453 actualOutput.append(locations.get(i)).append(
"\n");
455 perNodeLocations.add(locations.get(i));
459 if (!perNodeLocations.isEmpty()) {
460 Collections.sort(perNodeLocations);
461 actualOutput.append(Joiner.on(
"\n").join(perNodeLocations)).append(
"\n");
469 StringBuilder errorLog, StringBuilder actualOutput)
throws CatalogException {
470 ArrayList<String> expectedLineage = testCase.getSectionContents(
Section.LINEAGE);
471 if (expectedLineage == null || expectedLineage.isEmpty())
return;
472 String query = testCase.getQuery();
473 queryCtx.request.getQuery_options().setNum_nodes(1);
474 queryCtx.request.setStmt(query);
475 StringBuilder explainBuilder =
new StringBuilder();
476 TExecRequest execRequest = null;
477 String lineageGraph = null;
479 execRequest = frontend_.createExecRequest(queryCtx, explainBuilder);
480 if (execRequest.isSetQuery_exec_request()) {
481 lineageGraph = execRequest.query_exec_request.lineage_graph;
482 }
else if (execRequest.isSetCatalog_op_request()) {
483 lineageGraph = execRequest.catalog_op_request.lineage_graph;
488 "query:\n" + query +
"\nanalysis error: " + e.getMessage() +
"\n");
492 "query:\n" + query +
"\ninternal error: " + e.getMessage() +
"\n");
497 throw (CatalogException) e;
500 "query:\n" + query +
"\nunhandled exception: " + e.getMessage() +
"\n");
503 LOG.info(
"lineage graph: " + lineageGraph);
504 ArrayList<String> expected =
506 if (expected.size() > 0 && lineageGraph != null) {
507 String serializedGraph = Joiner.on(
"\n").join(expected);
509 ColumnLineageGraph.createFromJSON(serializedGraph);
511 ColumnLineageGraph.createFromJSON(lineageGraph);
512 if (expectedGraph == null || outputGraph == null ||
513 !outputGraph.
equals(expectedGraph)) {
514 StringBuilder lineageError =
new StringBuilder();
515 lineageError.append(
"section " + Section.LINEAGE +
" of query:\n"
517 lineageError.append(
"Output:\n");
518 lineageError.append(lineageGraph +
"\n");
519 lineageError.append(
"Expected:\n");
520 lineageError.append(serializedGraph +
"\n");
521 errorLog.append(lineageError.toString());
523 actualOutput.append(Section.LINEAGE.getHeader());
524 actualOutput.append(TestUtils.prettyPrintJson(lineageGraph) +
"\n");
533 StringBuilder errorLog, StringBuilder actualOutput)
throws CatalogException {
534 ArrayList<String> expectedPlan =
537 if (expectedPlan == null || expectedPlan.isEmpty())
return;
539 String query = testCase.getQuery();
541 queryCtx.request.getQuery_options().setNum_nodes(
542 ImpalaInternalServiceConstants.NUM_NODES_ALL);
543 queryCtx.request.setStmt(query);
544 boolean isImplemented = expectedErrorMsg == null;
545 StringBuilder explainBuilder =
new StringBuilder();
546 actualOutput.append(Section.DISTRIBUTEDPLAN.getHeader() +
"\n");
547 TExecRequest execRequest = null;
550 execRequest = frontend_.createExecRequest(queryCtx, explainBuilder);
552 actualOutput.append(explainStr);
553 if (!isImplemented) {
555 "query produced DISTRIBUTEDPLAN\nquery=" + query +
"\nplan=\n"
558 LOG.info(
"distributed plan: " + explainStr);
559 String result = TestUtils.compareOutput(
560 Lists.newArrayList(explainStr.split(
"\n")), expectedPlan,
true);
561 if (!result.isEmpty()) {
562 errorLog.append(
"section " + Section.DISTRIBUTEDPLAN.toString()
563 +
" of query:\n" + query +
"\n" + result);
569 "query:\n" + query +
"\nanalysis error: " + e.getMessage() +
"\n");
573 "query:\n" + query +
"\ninternal error: " + e.getMessage() +
"\n");
578 throw (CatalogException) e;
581 "query:\n" + query +
"\nunhandled exception: " + e.getMessage() +
"\n");
583 }
catch (IllegalStateException e) {
585 "query:\n" + query +
"\nunhandled exception: " + e.getMessage() +
"\n");
595 String[] lines = explain.split(
"\n");
597 for (
int i = 0; i < lines.length - 1; ++i) {
598 if (lines[i].isEmpty()) {
599 return Joiner.on(
"\n").join(Arrays.copyOfRange(lines, i + 1 , lines.length))
611 String fileName =
testDir_ +
"/" + testFile +
".test";
613 StringBuilder actualOutput =
new StringBuilder();
615 queryFileParser.parseFile();
616 StringBuilder errorLog =
new StringBuilder();
618 actualOutput.append(testCase.getSectionAsString(
Section.QUERY,
true,
"\n"));
619 actualOutput.append(
"\n");
621 RunTestCase(testCase, errorLog, actualOutput, dbName, options);
623 errorLog.append(String.format(
"Failed to plan query\n%s\n%s",
624 testCase.getQuery(), e.getMessage()));
626 actualOutput.append(
"====\n");
632 File outDirFile =
new File(
outDir_);
634 FileWriter fw =
new FileWriter(
outDir_ + testFile +
".test");
635 fw.write(actualOutput.toString());
637 }
catch (IOException e) {
638 errorLog.append(
"Unable to create output file: " + e.getMessage());
642 if (errorLog.length() != 0) {
643 fail(errorLog.toString());
void handleNotImplException(String query, String expectedErrorMsg, StringBuilder errorLog, StringBuilder actualOutput, Throwable e)
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
void testSingleNodePlan(TestCase testCase, TQueryCtx queryCtx, StringBuilder errorLog, StringBuilder actualOutput)
void buildMaps(TQueryExecRequest execRequest)
boolean equals(Object obj)
final Map< Integer, TTableDescriptor > tableMap_
THdfsPartition findPartition(int nodeId, THdfsFileSplit split)
Path cleanseFilePath(Path path)
void testDistributedPlan(TestCase testCase, TQueryCtx queryCtx, StringBuilder errorLog, StringBuilder actualOutput)
void RunTestCase(TestCase testCase, StringBuilder errorLog, StringBuilder actualOutput, String dbName, TQueryOptions options)
TQueryOptions mergeQueryOptions(TQueryOptions a, TQueryOptions b)
final Map< Integer, TTupleDescriptor > tupleMap_
String getExpectedErrorMessage(ArrayList< String > expectedPlan)
TQueryOptions defaultQueryOptions()
void runPlannerTestFile(String testFile, String dbName)
void testColumnLineageOutput(TestCase testCase, TQueryCtx queryCtx, StringBuilder errorLog, StringBuilder actualOutput)
void runPlannerTestFile(String testFile, TQueryOptions options)
void runPlannerTestFile(String testFile)
void runPlannerTestFile(String testFile, String dbName, TQueryOptions options)
static AuthorizationConfig createAuthDisabledConfig()
void testHdfsPartitionsReferenced(TQueryExecRequest execRequest, String query, StringBuilder errorLog)
static Frontend frontend_
final Map< Integer, TPlanNode > planMap_
String removeExplainHeader(String explain)
List< TestCase > getTestCases()
static final boolean GENERATE_OUTPUT_FILE
StringBuilder PrintScanRangeLocations(TQueryExecRequest execRequest)