15 package com.cloudera.impala.analysis;
17 import java.text.SimpleDateFormat;
18 import java.util.Collection;
19 import java.util.Date;
20 import java.util.LinkedHashMap;
21 import java.util.List;
24 import java.util.TreeSet;
26 import org.json.simple.JSONArray;
27 import org.json.simple.JSONObject;
28 import org.json.simple.JSONValue;
29 import org.json.simple.parser.JSONParser;
30 import org.json.simple.parser.ParseException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 import com.cloudera.impala.common.Id;
36 import com.cloudera.impala.common.IdGenerator;
37 import com.cloudera.impala.thrift.TQueryCtx;
38 import com.google.common.base.Joiner;
39 import com.google.common.base.Preconditions;
40 import com.google.common.collect.Lists;
41 import com.google.common.collect.Maps;
42 import com.google.common.collect.Sets;
43 import com.google.common.hash.Hasher;
44 import com.google.common.hash.Hashing;
55 private final String
type_ =
"COLUMN";
61 Preconditions.checkNotNull(id);
62 Preconditions.checkNotNull(label);
78 Map obj =
new LinkedHashMap();
79 obj.put(
"id", id_.asInt());
80 obj.put(
"vertexType",
type_);
81 obj.put(
"vertexId",
label_);
89 int id = ((Long) obj.get(
"id")).intValue();
90 String label = (String) obj.get(
"vertexId");
96 if (obj == null)
return false;
97 if (obj.getClass() != this.getClass())
return false;
99 return this.id_.equals(vertex.id_) &&
117 return new IdGenerator<VertexId>() {
148 StringBuilder builder =
new StringBuilder();
149 Joiner joiner = Joiner.on(
",");
150 builder.append(
"Sources: [");
151 builder.append(joiner.join(
sources_) +
"]\n");
152 builder.append(
"Targets: [");
153 builder.append(joiner.join(
targets_) +
"]\n");
155 return builder.toString();
162 Map obj =
new LinkedHashMap();
164 JSONArray sourceIds =
new JSONArray();
166 sourceIds.add(vertex.getVertexId());
168 obj.put(
"sources", sourceIds);
170 JSONArray targetIds =
new JSONArray();
172 targetIds.add(vertex.getVertexId());
174 obj.put(
"targets", targetIds);
175 obj.put(
"edgeType", edgeType_.toString());
181 if (obj == null)
return false;
182 if (obj.getClass() != this.getClass())
return false;
184 return edge.sources_.equals(this.sources_) &&
185 edge.
targets_.equals(
this.targets_) &&
186 edge.edgeType_ == this.edgeType_;
213 private final static Logger
LOG = LoggerFactory.getLogger(ColumnLineageGraph.class);
222 private final List<MultiEdge>
edges_ = Lists.newArrayList();
228 private final Map<String, Vertex>
vertices_ = Maps.newHashMap();
257 for (
Vertex vertex: vertices) {
258 vertices_.put(vertex.getLabel(), vertex);
259 idToVertexMap_.put(vertex.getVertexId(), vertex);
270 Set<Vertex> targetVertices = Sets.newHashSet();
271 for (String target: targets) {
274 Set<Vertex> sourceVertices = Sets.newHashSet();
275 for (String source: sources) {
288 Vertex newVertex = vertices_.get(label);
289 if (newVertex != null)
return newVertex;
291 vertices_.put(newVertex.getLabel(), newVertex);
292 idToVertexMap_.put(newVertex.getVertexId(), newVertex);
310 Preconditions.checkNotNull(analyzer);
311 Preconditions.checkState(analyzer.isRootAnalyzer());
312 TQueryCtx queryCtx = analyzer.getQueryCtx();
313 if (queryCtx.request.isSetRedacted_stmt()) {
314 queryStr_ = queryCtx.request.redacted_stmt;
319 SimpleDateFormat df =
new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
321 timestamp_ = df.parse(queryCtx.now_string).getTime() / 1000;
322 }
catch (java.text.ParseException e) {
323 LOG.error(
"Error parsing timestamp value: " + queryCtx.now_string +
324 " " + e.getMessage());
328 user_ = analyzer.getUser().getName();
332 Preconditions.checkNotNull(resultExprs);
333 Preconditions.checkState(!resultExprs.isEmpty());
335 for (
int i = 0; i < resultExprs.size(); ++i) {
336 Expr expr = resultExprs.get(i);
337 Set<String> sourceBaseCols = Sets.newHashSet();
338 List<Expr> dependentExprs = Lists.newArrayList();
340 Set<String> targets = Sets.newHashSet(targetColumnLabels_.get(i));
342 if (!dependentExprs.isEmpty()) {
348 Set<String> predicateBaseCols = Sets.newHashSet();
349 for (
Expr dependentExpr: dependentExprs) {
363 List<Expr> conjuncts = analyzer.getConjuncts();
364 for (
Expr expr: conjuncts) {
365 if (expr.isAuxExpr())
continue;
366 resultDependencyPredicates_.add(expr);
368 Set<String> predicateBaseCols = Sets.newHashSet();
372 if (predicateBaseCols.isEmpty())
return;
388 List<Expr> directPredDeps,
boolean traversePredDeps) {
391 if (directPredDeps != null) directPredDeps.addAll(predicateDepExprs);
392 if (traversePredDeps) exprsToTraverse.addAll(predicateDepExprs);
393 List<SlotId> slotIds = Lists.newArrayList();
394 for (
Expr e: exprsToTraverse) {
395 e.getIds(null, slotIds);
397 for (
SlotId slotId: slotIds) {
399 List<Expr> sourceExprs = slotDesc.getSourceExprs();
401 if (sourceExprs.isEmpty() && slotDesc.getColumn() != null) {
403 Preconditions.checkState(slotDesc.getParent().isMaterialized()
404 && slotDesc.getParent().getTable() != null
405 && slotDesc.getColumn() != null);
406 String colName = slotDesc.getParent().getTableName() +
"." +
407 slotDesc.getColumn().getName();
408 sourceBaseCols.add(colName);
410 for (
Expr sourceExpr: sourceExprs) {
423 Preconditions.checkNotNull(e);
424 List<Expr> outputExprs = Lists.newArrayList();
426 AnalyticExpr analytic = (AnalyticExpr) e;
427 outputExprs.addAll(analytic.getChildren().subList(0,
428 analytic.
getFnCall().getParams().size()));
440 Preconditions.checkNotNull(e);
441 List<Expr> outputExprs = Lists.newArrayList();
443 AnalyticExpr analyticExpr = (AnalyticExpr) e;
444 outputExprs.addAll(analyticExpr.getPartitionExprs());
446 outputExprs.add(orderByElem.getExpr());
453 resultDependencyPredicates_.addAll(exprs);
461 Map obj =
new LinkedHashMap();
464 obj.put(
"user",
user_);
467 JSONArray edges =
new JSONArray();
469 edges.add(edge.toJson());
471 obj.put(
"edges", edges);
473 TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values());
474 JSONArray vertices =
new JSONArray();
475 for (
Vertex vertex: sortedVertices) {
476 vertices.add(vertex.toJson());
478 obj.put(
"vertices", vertices);
479 return JSONValue.toJSONString(obj);
483 Hasher hasher = Hashing.md5().newHasher();
484 hasher.putString(queryStr);
485 return hasher.hash().toString();
493 if (json == null || json.isEmpty())
return null;
494 JSONParser
parser =
new JSONParser();
497 obj = parser.parse(json);
498 }
catch (ParseException e) {
499 LOG.error(
"Error parsing serialized column lineage graph: " + e.getMessage());
502 if (!(obj instanceof JSONObject))
return null;
503 JSONObject jsonObj = (JSONObject) obj;
504 String stmt = (String) jsonObj.get(
"queryText");
505 String
hash = (String) jsonObj.get(
"hash");
506 String user = (String) jsonObj.get(
"user");
507 long timestamp = (Long) jsonObj.get(
"timestamp");
509 JSONArray serializedVertices = (JSONArray) jsonObj.get(
"vertices");
510 Set<Vertex> vertices = Sets.newHashSet();
511 for (
int i = 0; i < serializedVertices.size(); ++i) {
512 Vertex v = Vertex.fromJsonObj((JSONObject) serializedVertices.get(i));
515 graph.setVertices(vertices);
516 JSONArray serializedEdges = (JSONArray) jsonObj.get(
"edges");
517 for (
int i = 0; i < serializedEdges.size(); ++i) {
519 graph.createMultiEdgeFromJSONObj((JSONObject) serializedEdges.get(i));
526 Preconditions.checkNotNull(jsonEdge);
527 JSONArray sources = (JSONArray) jsonEdge.get(
"sources");
529 JSONArray targets = (JSONArray) jsonEdge.get(
"targets");
531 MultiEdge.EdgeType type =
532 MultiEdge.EdgeType.valueOf((String) jsonEdge.get(
"edgeType"));
533 return new MultiEdge(sourceVertices, targetVertices, type);
537 Set<Vertex> vertices = Sets.newHashSet();
538 for (
int i = 0; i < vertexIdArray.size(); ++i) {
539 int sourceId = ((Long) vertexIdArray.get(i)).intValue();
541 Preconditions.checkNotNull(sourceVertex);
542 vertices.add(sourceVertex);
549 if (obj == null)
return false;
550 if (obj.getClass() != this.getClass())
return false;
553 !this.edges_.equals(g.edges_)) {
560 StringBuilder builder =
new StringBuilder();
562 builder.append(edge.toString() +
"\n");
565 return builder.toString();
569 Preconditions.checkNotNull(columnLabels);
570 targetColumnLabels_.addAll(columnLabels);
574 Preconditions.checkNotNull(dstTable);
575 String tblFullName = dstTable.getFullName();
577 targetColumnLabels_.add(tblFullName +
"." + columnName);
final Set< Vertex > targets_
Set< Vertex > getVerticesFromJSONArray(JSONArray vertexIdArray)
final IdGenerator< VertexId > vertexIdGenerator
int compareTo(Vertex cmp)
List< String > getColumnNames()
final List< MultiEdge > edges_
MultiEdge createMultiEdge(Set< String > targets, Set< String > sources, MultiEdge.EdgeType type)
final Map< String, Vertex > vertices_
static IdGenerator< VertexId > createGenerator()
void addTargetColumnLabels(Table dstTable)
const StringSearch UrlParser::hash_search & hash
boolean equals(Object obj)
void init(Analyzer analyzer)
final Set< Vertex > sources_
void computeLineageGraph(List< Expr > resultExprs, Analyzer rootAnalyzer)
void addDependencyPredicates(Collection< Expr > exprs)
MultiEdge createMultiEdgeFromJSONObj(JSONObject jsonEdge)
List< Expr > getPredicateDeps(Expr e)
Vertex createVertex(String label)
ColumnLineageGraph(String stmt, String user, long timestamp)
List< OrderByElement > getOrderByElements()
void computeProjectionDependencies(List< Expr > resultExprs)
List< Expr > getProjectionDeps(Expr e)
MultiEdge(Set< Vertex > sources, Set< Vertex > targets, EdgeType type)
boolean equals(Object obj)
void computeResultPredicateDependencies(Analyzer analyzer)
void addTargetColumnLabels(Collection< String > columnLabels)
final List< Expr > resultDependencyPredicates_
Vertex(VertexId id, String label)
FunctionCallExpr getFnCall()
String getQueryHash(String queryStr)
final Map< VertexId, Vertex > idToVertexMap_
final List< String > targetColumnLabels_
void getSourceBaseCols(Expr expr, Set< String > sourceBaseCols, List< Expr > directPredDeps, boolean traversePredDeps)
static ColumnLineageGraph createFromJSON(String json)
void setVertices(Set< Vertex > vertices)
boolean equals(Object obj)
static Vertex fromJsonObj(JSONObject obj)