Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ColumnLineageGraph.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.analysis;
16 
17 import java.text.SimpleDateFormat;
18 import java.util.Collection;
19 import java.util.Date;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.TreeSet;
25 
26 import org.json.simple.JSONArray;
27 import org.json.simple.JSONObject;
28 import org.json.simple.JSONValue;
29 import org.json.simple.parser.JSONParser;
30 import org.json.simple.parser.ParseException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 
35 import com.cloudera.impala.common.Id;
36 import com.cloudera.impala.common.IdGenerator;
37 import com.cloudera.impala.thrift.TQueryCtx;
38 import com.google.common.base.Joiner;
39 import com.google.common.base.Preconditions;
40 import com.google.common.collect.Lists;
41 import com.google.common.collect.Maps;
42 import com.google.common.collect.Sets;
43 import com.google.common.hash.Hasher;
44 import com.google.common.hash.Hashing;
45 
51 final class Vertex implements Comparable<Vertex> {
52  // Unique identifier of this vertex.
53  private final VertexId id_;
54 
55  private final String type_ = "COLUMN";
56 
57  // A fully-qualified column name or the label of a result expr
58  private final String label_;
59 
60  public Vertex(VertexId id, String label) {
61  Preconditions.checkNotNull(id);
62  Preconditions.checkNotNull(label);
63  id_ = id;
64  label_ = label;
65  }
66  public VertexId getVertexId() { return id_; }
67  public String getLabel() { return label_; }
68  public String getType() { return type_; }
69 
70  @Override
71  public String toString() { return "(" + id_ + ":" + type_ + ":" + label_ + ")"; }
72 
76  public Map toJson() {
77  // Use a LinkedHashMap to generate a strict ordering of elements.
78  Map obj = new LinkedHashMap();
79  obj.put("id", id_.asInt());
80  obj.put("vertexType", type_);
81  obj.put("vertexId", label_);
82  return obj;
83  }
84 
88  public static Vertex fromJsonObj(JSONObject obj) {
89  int id = ((Long) obj.get("id")).intValue();
90  String label = (String) obj.get("vertexId");
91  return new Vertex(new VertexId(id), label);
92  }
93 
94  @Override
95  public boolean equals(Object obj) {
96  if (obj == null) return false;
97  if (obj.getClass() != this.getClass()) return false;
98  Vertex vertex = (Vertex) obj;
99  return this.id_.equals(vertex.id_) &&
100  this.label_.equals(vertex.label_);
101  }
102 
103  public int compareTo(Vertex cmp) { return this.id_.compareTo(cmp.id_); }
104 
105  @Override
106  public int hashCode() { return id_.hashCode(); }
107 }
108 
112 class VertexId extends Id<VertexId> {
113  protected VertexId(int id) {
114  super(id);
115  }
116  public static IdGenerator<VertexId> createGenerator() {
117  return new IdGenerator<VertexId>() {
118  @Override
119  public VertexId getNextId() { return new VertexId(nextId_++); }
120  @Override
121  public VertexId getMaxId() { return new VertexId(nextId_ - 1); }
122  };
123  }
124 }
125 
132 final class MultiEdge {
133  public static enum EdgeType {
134  PROJECTION, PREDICATE
135  }
136  private final Set<Vertex> sources_;
137  private final Set<Vertex> targets_;
138  private final EdgeType edgeType_;
139 
140  public MultiEdge(Set<Vertex> sources, Set<Vertex> targets, EdgeType type) {
141  sources_ = sources;
142  targets_ = targets;
143  edgeType_ = type;
144  }
145 
146  @Override
147  public String toString() {
148  StringBuilder builder = new StringBuilder();
149  Joiner joiner = Joiner.on(",");
150  builder.append("Sources: [");
151  builder.append(joiner.join(sources_) + "]\n");
152  builder.append("Targets: [");
153  builder.append(joiner.join(targets_) + "]\n");
154  builder.append("Type: " + edgeType_);
155  return builder.toString();
156  }
157 
161  public Map toJson() {
162  Map obj = new LinkedHashMap();
163  // Add sources
164  JSONArray sourceIds = new JSONArray();
165  for (Vertex vertex: sources_) {
166  sourceIds.add(vertex.getVertexId());
167  }
168  obj.put("sources", sourceIds);
169  // Add targets
170  JSONArray targetIds = new JSONArray();
171  for (Vertex vertex: targets_) {
172  targetIds.add(vertex.getVertexId());
173  }
174  obj.put("targets", targetIds);
175  obj.put("edgeType", edgeType_.toString());
176  return obj;
177  }
178 
179  @Override
180  public boolean equals(Object obj) {
181  if (obj == null) return false;
182  if (obj.getClass() != this.getClass()) return false;
183  MultiEdge edge = (MultiEdge) obj;
184  return edge.sources_.equals(this.sources_) &&
185  edge.targets_.equals(this.targets_) &&
186  edge.edgeType_ == this.edgeType_;
187  }
188 }
189 
212 public class ColumnLineageGraph {
213  private final static Logger LOG = LoggerFactory.getLogger(ColumnLineageGraph.class);
214  // Query statement
215  private String queryStr_;
216 
217  // Name of the user that issued this query
218  private String user_;
219 
220  private final List<Expr> resultDependencyPredicates_ = Lists.newArrayList();
221 
222  private final List<MultiEdge> edges_ = Lists.newArrayList();
223 
224  // Timestamp in seconds since epoch (GMT) this query was submitted for execution.
225  private long timestamp_;
226 
227  // Map of Vertex labels to Vertex objects.
228  private final Map<String, Vertex> vertices_ = Maps.newHashMap();
229 
230  // Map of Vertex ids to Vertex objects. Used primarily during the construction of the
231  // ColumnLineageGraph from a serialized JSON object.
232  private final Map<VertexId, Vertex> idToVertexMap_ = Maps.newHashMap();
233 
234  // For an INSERT or a CTAS, these are the columns of the
235  // destination table plus any partitioning columns (when dynamic partitioning is used).
236  // For a SELECT stmt, they are the labels of the result exprs.
237  private final List<String> targetColumnLabels_ = Lists.newArrayList();
238 
239  // Repository for tuple and slot descriptors for this query. Use it to construct the
240  // column lineage graph.
242 
243  private final IdGenerator<VertexId> vertexIdGenerator = VertexId.createGenerator();
244 
245  public ColumnLineageGraph() { }
246 
250  private ColumnLineageGraph(String stmt, String user, long timestamp) {
251  queryStr_ = stmt;
252  user_ = user;
253  timestamp_ = timestamp;
254  }
255 
256  private void setVertices(Set<Vertex> vertices) {
257  for (Vertex vertex: vertices) {
258  vertices_.put(vertex.getLabel(), vertex);
259  idToVertexMap_.put(vertex.getVertexId(), vertex);
260  }
261  }
262 
268  private MultiEdge createMultiEdge(Set<String> targets, Set<String> sources,
269  MultiEdge.EdgeType type) {
270  Set<Vertex> targetVertices = Sets.newHashSet();
271  for (String target: targets) {
272  targetVertices.add(createVertex(target));
273  }
274  Set<Vertex> sourceVertices = Sets.newHashSet();
275  for (String source: sources) {
276  sourceVertices.add(createVertex(source));
277  }
278  MultiEdge edge = new MultiEdge(sourceVertices, targetVertices, type);
279  edges_.add(edge);
280  return edge;
281  }
282 
287  private Vertex createVertex(String label) {
288  Vertex newVertex = vertices_.get(label);
289  if (newVertex != null) return newVertex;
290  newVertex = new Vertex(vertexIdGenerator.getNextId(), label);
291  vertices_.put(newVertex.getLabel(), newVertex);
292  idToVertexMap_.put(newVertex.getVertexId(), newVertex);
293  return newVertex;
294  }
295 
300  public void computeLineageGraph(List<Expr> resultExprs, Analyzer rootAnalyzer) {
301  init(rootAnalyzer);
302  computeProjectionDependencies(resultExprs);
304  }
305 
309  private void init(Analyzer analyzer) {
310  Preconditions.checkNotNull(analyzer);
311  Preconditions.checkState(analyzer.isRootAnalyzer());
312  TQueryCtx queryCtx = analyzer.getQueryCtx();
313  if (queryCtx.request.isSetRedacted_stmt()) {
314  queryStr_ = queryCtx.request.redacted_stmt;
315  } else {
316  queryStr_ = queryCtx.request.stmt;
317  }
318  Preconditions.checkNotNull(queryStr_);
319  SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
320  try {
321  timestamp_ = df.parse(queryCtx.now_string).getTime() / 1000;
322  } catch (java.text.ParseException e) {
323  LOG.error("Error parsing timestamp value: " + queryCtx.now_string +
324  " " + e.getMessage());
325  timestamp_ = new Date().getTime() / 1000;
326  }
327  descTbl_ = analyzer.getDescTbl();
328  user_ = analyzer.getUser().getName();
329  }
330 
331  private void computeProjectionDependencies(List<Expr> resultExprs) {
332  Preconditions.checkNotNull(resultExprs);
333  Preconditions.checkState(!resultExprs.isEmpty());
334  Preconditions.checkState(resultExprs.size() == targetColumnLabels_.size());
335  for (int i = 0; i < resultExprs.size(); ++i) {
336  Expr expr = resultExprs.get(i);
337  Set<String> sourceBaseCols = Sets.newHashSet();
338  List<Expr> dependentExprs = Lists.newArrayList();
339  getSourceBaseCols(expr, sourceBaseCols, dependentExprs, false);
340  Set<String> targets = Sets.newHashSet(targetColumnLabels_.get(i));
341  createMultiEdge(targets, sourceBaseCols, MultiEdge.EdgeType.PROJECTION);
342  if (!dependentExprs.isEmpty()) {
343  // We have additional exprs that 'expr' has a predicate dependency on.
344  // Gather the transitive predicate dependencies of 'expr' based on its direct
345  // predicate dependencies. For each direct predicate dependency p, 'expr' is
346  // transitively predicate dependent on all exprs that p is projection and
347  // predicate dependent on.
348  Set<String> predicateBaseCols = Sets.newHashSet();
349  for (Expr dependentExpr: dependentExprs) {
350  getSourceBaseCols(dependentExpr, predicateBaseCols, null, true);
351  }
352  createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE);
353  }
354  }
355  }
356 
363  List<Expr> conjuncts = analyzer.getConjuncts();
364  for (Expr expr: conjuncts) {
365  if (expr.isAuxExpr()) continue;
366  resultDependencyPredicates_.add(expr);
367  }
368  Set<String> predicateBaseCols = Sets.newHashSet();
369  for (Expr expr: resultDependencyPredicates_) {
370  getSourceBaseCols(expr, predicateBaseCols, null, true);
371  }
372  if (predicateBaseCols.isEmpty()) return;
373  Set<String> targets = Sets.newHashSet(targetColumnLabels_);
374  createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE);
375  }
376 
387  private void getSourceBaseCols(Expr expr, Set<String> sourceBaseCols,
388  List<Expr> directPredDeps, boolean traversePredDeps) {
389  List<Expr> exprsToTraverse = getProjectionDeps(expr);
390  List<Expr> predicateDepExprs = getPredicateDeps(expr);
391  if (directPredDeps != null) directPredDeps.addAll(predicateDepExprs);
392  if (traversePredDeps) exprsToTraverse.addAll(predicateDepExprs);
393  List<SlotId> slotIds = Lists.newArrayList();
394  for (Expr e: exprsToTraverse) {
395  e.getIds(null, slotIds);
396  }
397  for (SlotId slotId: slotIds) {
398  SlotDescriptor slotDesc = descTbl_.getSlotDesc(slotId);
399  List<Expr> sourceExprs = slotDesc.getSourceExprs();
400  // TODO: Add lineage support for nested types.
401  if (sourceExprs.isEmpty() && slotDesc.getColumn() != null) {
402  // slot should correspond to a materialized base table column
403  Preconditions.checkState(slotDesc.getParent().isMaterialized()
404  && slotDesc.getParent().getTable() != null
405  && slotDesc.getColumn() != null);
406  String colName = slotDesc.getParent().getTableName() + "." +
407  slotDesc.getColumn().getName();
408  sourceBaseCols.add(colName);
409  } else {
410  for (Expr sourceExpr: sourceExprs) {
411  getSourceBaseCols(sourceExpr, sourceBaseCols, directPredDeps,
412  traversePredDeps);
413  }
414  }
415  }
416  }
417 
422  private List<Expr> getProjectionDeps(Expr e) {
423  Preconditions.checkNotNull(e);
424  List<Expr> outputExprs = Lists.newArrayList();
425  if (e instanceof AnalyticExpr) {
426  AnalyticExpr analytic = (AnalyticExpr) e;
427  outputExprs.addAll(analytic.getChildren().subList(0,
428  analytic.getFnCall().getParams().size()));
429  } else {
430  outputExprs.add(e);
431  }
432  return outputExprs;
433  }
434 
439  private List<Expr> getPredicateDeps(Expr e) {
440  Preconditions.checkNotNull(e);
441  List<Expr> outputExprs = Lists.newArrayList();
442  if (e instanceof AnalyticExpr) {
443  AnalyticExpr analyticExpr = (AnalyticExpr) e;
444  outputExprs.addAll(analyticExpr.getPartitionExprs());
445  for (OrderByElement orderByElem: analyticExpr.getOrderByElements()) {
446  outputExprs.add(orderByElem.getExpr());
447  }
448  }
449  return outputExprs;
450  }
451 
452  public void addDependencyPredicates(Collection<Expr> exprs) {
453  resultDependencyPredicates_.addAll(exprs);
454  }
455 
459  public String toJson() {
460  if (queryStr_ == null) return "";
461  Map obj = new LinkedHashMap();
462  obj.put("queryText", queryStr_);
463  obj.put("hash", getQueryHash(queryStr_));
464  obj.put("user", user_);
465  obj.put("timestamp", timestamp_);
466  // Add edges
467  JSONArray edges = new JSONArray();
468  for (MultiEdge edge: edges_) {
469  edges.add(edge.toJson());
470  }
471  obj.put("edges", edges);
472  // Add vertices
473  TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values());
474  JSONArray vertices = new JSONArray();
475  for (Vertex vertex: sortedVertices) {
476  vertices.add(vertex.toJson());
477  }
478  obj.put("vertices", vertices);
479  return JSONValue.toJSONString(obj);
480  }
481 
482  private String getQueryHash(String queryStr) {
483  Hasher hasher = Hashing.md5().newHasher();
484  hasher.putString(queryStr);
485  return hasher.hash().toString();
486  }
487 
492  public static ColumnLineageGraph createFromJSON(String json) {
493  if (json == null || json.isEmpty()) return null;
494  JSONParser parser = new JSONParser();
495  Object obj = null;
496  try {
497  obj = parser.parse(json);
498  } catch (ParseException e) {
499  LOG.error("Error parsing serialized column lineage graph: " + e.getMessage());
500  return null;
501  }
502  if (!(obj instanceof JSONObject)) return null;
503  JSONObject jsonObj = (JSONObject) obj;
504  String stmt = (String) jsonObj.get("queryText");
505  String hash = (String) jsonObj.get("hash");
506  String user = (String) jsonObj.get("user");
507  long timestamp = (Long) jsonObj.get("timestamp");
508  ColumnLineageGraph graph = new ColumnLineageGraph(stmt, user, timestamp);
509  JSONArray serializedVertices = (JSONArray) jsonObj.get("vertices");
510  Set<Vertex> vertices = Sets.newHashSet();
511  for (int i = 0; i < serializedVertices.size(); ++i) {
512  Vertex v = Vertex.fromJsonObj((JSONObject) serializedVertices.get(i));
513  vertices.add(v);
514  }
515  graph.setVertices(vertices);
516  JSONArray serializedEdges = (JSONArray) jsonObj.get("edges");
517  for (int i = 0; i < serializedEdges.size(); ++i) {
518  MultiEdge e =
519  graph.createMultiEdgeFromJSONObj((JSONObject) serializedEdges.get(i));
520  graph.edges_.add(e);
521  }
522  return graph;
523  }
524 
525  private MultiEdge createMultiEdgeFromJSONObj(JSONObject jsonEdge) {
526  Preconditions.checkNotNull(jsonEdge);
527  JSONArray sources = (JSONArray) jsonEdge.get("sources");
528  Set<Vertex> sourceVertices = getVerticesFromJSONArray(sources);
529  JSONArray targets = (JSONArray) jsonEdge.get("targets");
530  Set<Vertex> targetVertices = getVerticesFromJSONArray(targets);
531  MultiEdge.EdgeType type =
532  MultiEdge.EdgeType.valueOf((String) jsonEdge.get("edgeType"));
533  return new MultiEdge(sourceVertices, targetVertices, type);
534  }
535 
536  private Set<Vertex> getVerticesFromJSONArray(JSONArray vertexIdArray) {
537  Set<Vertex> vertices = Sets.newHashSet();
538  for (int i = 0; i < vertexIdArray.size(); ++i) {
539  int sourceId = ((Long) vertexIdArray.get(i)).intValue();
540  Vertex sourceVertex = idToVertexMap_.get(new VertexId(sourceId));
541  Preconditions.checkNotNull(sourceVertex);
542  vertices.add(sourceVertex);
543  }
544  return vertices;
545  }
546 
547  @Override
548  public boolean equals(Object obj) {
549  if (obj == null) return false;
550  if (obj.getClass() != this.getClass()) return false;
552  if (!this.vertices_.equals(g.vertices_) ||
553  !this.edges_.equals(g.edges_)) {
554  return false;
555  }
556  return true;
557  }
558 
559  public String debugString() {
560  StringBuilder builder = new StringBuilder();
561  for (MultiEdge edge: edges_) {
562  builder.append(edge.toString() + "\n");
563  }
564  builder.append(toJson());
565  return builder.toString();
566  }
567 
568  public void addTargetColumnLabels(Collection<String> columnLabels) {
569  Preconditions.checkNotNull(columnLabels);
570  targetColumnLabels_.addAll(columnLabels);
571  }
572 
573  public void addTargetColumnLabels(Table dstTable) {
574  Preconditions.checkNotNull(dstTable);
575  String tblFullName = dstTable.getFullName();
576  for (String columnName: dstTable.getColumnNames()) {
577  targetColumnLabels_.add(tblFullName + "." + columnName);
578  }
579  }
580 }
Set< Vertex > getVerticesFromJSONArray(JSONArray vertexIdArray)
List< String > getColumnNames()
Definition: Table.java:354
MultiEdge createMultiEdge(Set< String > targets, Set< String > sources, MultiEdge.EdgeType type)
static IdGenerator< VertexId > createGenerator()
const StringSearch UrlParser::hash_search & hash
Definition: url-parser.cc:41
void computeLineageGraph(List< Expr > resultExprs, Analyzer rootAnalyzer)
void addDependencyPredicates(Collection< Expr > exprs)
MultiEdge createMultiEdgeFromJSONObj(JSONObject jsonEdge)
ColumnLineageGraph(String stmt, String user, long timestamp)
List< OrderByElement > getOrderByElements()
void computeProjectionDependencies(List< Expr > resultExprs)
MultiEdge(Set< Vertex > sources, Set< Vertex > targets, EdgeType type)
void addTargetColumnLabels(Collection< String > columnLabels)
void getSourceBaseCols(Expr expr, Set< String > sourceBaseCols, List< Expr > directPredDeps, boolean traversePredDeps)
static ColumnLineageGraph createFromJSON(String json)
static Vertex fromJsonObj(JSONObject obj)