Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Frontend.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.service;
16 
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Collections;
21 import java.util.Comparator;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.Set;
28 import java.util.UUID;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hive.service.cli.thrift.TGetColumnsReq;
37 import org.apache.hive.service.cli.thrift.TGetFunctionsReq;
38 import org.apache.hive.service.cli.thrift.TGetSchemasReq;
39 import org.apache.hive.service.cli.thrift.TGetTablesReq;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 
91 import com.cloudera.impala.thrift.TCatalogOpRequest;
92 import com.cloudera.impala.thrift.TCatalogOpType;
93 import com.cloudera.impala.thrift.TCatalogServiceRequestHeader;
94 import com.cloudera.impala.thrift.TColumn;
95 import com.cloudera.impala.thrift.TColumnValue;
96 import com.cloudera.impala.thrift.TCreateDropRoleParams;
97 import com.cloudera.impala.thrift.TDdlExecRequest;
98 import com.cloudera.impala.thrift.TDdlType;
99 import com.cloudera.impala.thrift.TDescribeTableOutputStyle;
100 import com.cloudera.impala.thrift.TDescribeTableResult;
101 import com.cloudera.impala.thrift.TErrorCode;
102 import com.cloudera.impala.thrift.TExecRequest;
103 import com.cloudera.impala.thrift.TExplainLevel;
104 import com.cloudera.impala.thrift.TExplainResult;
105 import com.cloudera.impala.thrift.TFinalizeParams;
106 import com.cloudera.impala.thrift.TFunctionCategory;
107 import com.cloudera.impala.thrift.TGrantRevokePrivParams;
108 import com.cloudera.impala.thrift.TGrantRevokeRoleParams;
109 import com.cloudera.impala.thrift.TLoadDataReq;
110 import com.cloudera.impala.thrift.TLoadDataResp;
111 import com.cloudera.impala.thrift.TMetadataOpRequest;
112 import com.cloudera.impala.thrift.TPlanFragment;
113 import com.cloudera.impala.thrift.TQueryCtx;
114 import com.cloudera.impala.thrift.TQueryExecRequest;
115 import com.cloudera.impala.thrift.TResetMetadataRequest;
116 import com.cloudera.impala.thrift.TResultRow;
117 import com.cloudera.impala.thrift.TResultSet;
118 import com.cloudera.impala.thrift.TResultSetMetadata;
119 import com.cloudera.impala.thrift.TShowFilesParams;
120 import com.cloudera.impala.thrift.TStatus;
121 import com.cloudera.impala.thrift.TStmtType;
122 import com.cloudera.impala.thrift.TTableName;
123 import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest;
124 import com.cloudera.impala.thrift.TUpdateCatalogCacheResponse;
129 import com.google.common.base.Joiner;
130 import com.google.common.base.Preconditions;
131 import com.google.common.base.Predicates;
132 import com.google.common.collect.Lists;
133 import com.google.common.collect.Maps;
134 import com.google.common.collect.Sets;
135 
142 public class Frontend {
143  private final static Logger LOG = LoggerFactory.getLogger(Frontend.class);
144  // Time to wait for missing tables to be loaded before timing out.
145  private final long MISSING_TBL_LOAD_WAIT_TIMEOUT_MS = 2 * 60 * 1000;
146 
147  // Max time to wait for a catalog update notification.
148  private final long MAX_CATALOG_UPDATE_WAIT_TIME_MS = 2 * 1000;
149 
150  //TODO: Make the reload interval configurable.
151  private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60;
152 
155  private final AtomicReference<AuthorizationChecker> authzChecker_;
156  private final ScheduledExecutorService policyReader_ =
157  Executors.newScheduledThreadPool(1);
158 
159  public Frontend(AuthorizationConfig authorizationConfig) {
160  this(authorizationConfig, new ImpaladCatalog());
161  }
162 
166  public Frontend(AuthorizationConfig authorizationConfig, ImpaladCatalog catalog) {
167  authzConfig_ = authorizationConfig;
168  impaladCatalog_ = catalog;
169  authzChecker_ = new AtomicReference<AuthorizationChecker>(
171  // If authorization is enabled, reload the policy on a regular basis.
172  if (authzConfig_.isEnabled() && authzConfig_.isFileBasedPolicy()) {
173  // Stagger the reads across nodes
174  Random randomGen = new Random(UUID.randomUUID().hashCode());
175  int delay = AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS + randomGen.nextInt(60);
176 
177  policyReader_.scheduleAtFixedRate(
179  delay, AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS, TimeUnit.SECONDS);
180  }
181  }
182 
186  private class AuthorizationPolicyReader implements Runnable {
188 
190  config_ = config;
191  }
192 
193  public void run() {
194  try {
195  LOG.info("Reloading authorization policy file from: " + config_.getPolicyFile());
196  authzChecker_.set(new AuthorizationChecker(config_,
197  getCatalog().getAuthPolicy()));
198  } catch (Exception e) {
199  LOG.error("Error reloading policy file: ", e);
200  }
201  }
202  }
203 
205  public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); }
206 
207  public TUpdateCatalogCacheResponse updateCatalogCache(
208  TUpdateCatalogCacheRequest req) throws CatalogException {
210 
211  // If this is not a delta, this update should replace the current
212  // Catalog contents so create a new catalog and populate it.
213  if (!req.is_delta) catalog = new ImpaladCatalog();
214 
215  TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
216 
217  if (!req.is_delta) {
218  // This was not a delta update. Now that the catalog has been updated,
219  // replace the references to impaladCatalog_/authzChecker_ ensure
220  // clients continue don't see the catalog disappear.
221  impaladCatalog_ = catalog;
222  authzChecker_.set(new AuthorizationChecker(authzConfig_,
224  }
225  return response;
226  }
227 
232  private void createCatalogOpRequest(AnalysisContext.AnalysisResult analysis,
233  TExecRequest result) {
234  TCatalogOpRequest ddl = new TCatalogOpRequest();
235  TResultSetMetadata metadata = new TResultSetMetadata();
236  if (analysis.isUseStmt()) {
237  ddl.op_type = TCatalogOpType.USE;
238  ddl.setUse_db_params(analysis.getUseStmt().toThrift());
239  metadata.setColumns(Collections.<TColumn>emptyList());
240  } else if (analysis.isShowTablesStmt()) {
241  ddl.op_type = TCatalogOpType.SHOW_TABLES;
242  ddl.setShow_tables_params(analysis.getShowTablesStmt().toThrift());
243  metadata.setColumns(Arrays.asList(
244  new TColumn("name", Type.STRING.toThrift())));
245  } else if (analysis.isShowDbsStmt()) {
246  ddl.op_type = TCatalogOpType.SHOW_DBS;
247  ddl.setShow_dbs_params(analysis.getShowDbsStmt().toThrift());
248  metadata.setColumns(Arrays.asList(
249  new TColumn("name", Type.STRING.toThrift())));
250  } else if (analysis.isShowDataSrcsStmt()) {
251  ddl.op_type = TCatalogOpType.SHOW_DATA_SRCS;
252  ddl.setShow_data_srcs_params(analysis.getShowDataSrcsStmt().toThrift());
253  metadata.setColumns(Arrays.asList(
254  new TColumn("name", Type.STRING.toThrift()),
255  new TColumn("location", Type.STRING.toThrift()),
256  new TColumn("class name", Type.STRING.toThrift()),
257  new TColumn("api version", Type.STRING.toThrift())));
258  } else if (analysis.isShowStatsStmt()) {
259  ddl.op_type = TCatalogOpType.SHOW_STATS;
260  ddl.setShow_stats_params(analysis.getShowStatsStmt().toThrift());
261  metadata.setColumns(Arrays.asList(
262  new TColumn("name", Type.STRING.toThrift())));
263  } else if (analysis.isShowFunctionsStmt()) {
264  ddl.op_type = TCatalogOpType.SHOW_FUNCTIONS;
265  ShowFunctionsStmt stmt = (ShowFunctionsStmt)analysis.getStmt();
266  ddl.setShow_fns_params(stmt.toThrift());
267  metadata.setColumns(Arrays.asList(
268  new TColumn("return type", Type.STRING.toThrift()),
269  new TColumn("signature", Type.STRING.toThrift())));
270  } else if (analysis.isShowCreateTableStmt()) {
271  ddl.op_type = TCatalogOpType.SHOW_CREATE_TABLE;
272  ddl.setShow_create_table_params(analysis.getShowCreateTableStmt().toThrift());
273  metadata.setColumns(Arrays.asList(
274  new TColumn("result", Type.STRING.toThrift())));
275  } else if (analysis.isShowFilesStmt()) {
276  ddl.op_type = TCatalogOpType.SHOW_FILES;
277  ddl.setShow_files_params(analysis.getShowFilesStmt().toThrift());
278  metadata.setColumns(Collections.<TColumn>emptyList());
279  } else if (analysis.isDescribeStmt()) {
280  ddl.op_type = TCatalogOpType.DESCRIBE;
281  ddl.setDescribe_table_params(analysis.getDescribeStmt().toThrift());
282  metadata.setColumns(Arrays.asList(
283  new TColumn("name", Type.STRING.toThrift()),
284  new TColumn("type", Type.STRING.toThrift()),
285  new TColumn("comment", Type.STRING.toThrift())));
286  } else if (analysis.isAlterTableStmt()) {
287  ddl.op_type = TCatalogOpType.DDL;
288  TDdlExecRequest req = new TDdlExecRequest();
289  req.setDdl_type(TDdlType.ALTER_TABLE);
290  req.setAlter_table_params(analysis.getAlterTableStmt().toThrift());
291  ddl.setDdl_params(req);
292  metadata.setColumns(Collections.<TColumn>emptyList());
293  } else if (analysis.isAlterViewStmt()) {
294  ddl.op_type = TCatalogOpType.DDL;
295  TDdlExecRequest req = new TDdlExecRequest();
296  req.setDdl_type(TDdlType.ALTER_VIEW);
297  req.setAlter_view_params(analysis.getAlterViewStmt().toThrift());
298  ddl.setDdl_params(req);
299  metadata.setColumns(Collections.<TColumn>emptyList());
300  } else if (analysis.isCreateTableStmt()) {
301  ddl.op_type = TCatalogOpType.DDL;
302  TDdlExecRequest req = new TDdlExecRequest();
303  req.setDdl_type(TDdlType.CREATE_TABLE);
304  req.setCreate_table_params(analysis.getCreateTableStmt().toThrift());
305  ddl.setDdl_params(req);
306  metadata.setColumns(Collections.<TColumn>emptyList());
307  } else if (analysis.isCreateTableAsSelectStmt()) {
308  ddl.op_type = TCatalogOpType.DDL;
309  TDdlExecRequest req = new TDdlExecRequest();
310  req.setDdl_type(TDdlType.CREATE_TABLE_AS_SELECT);
311  req.setCreate_table_params(
312  analysis.getCreateTableAsSelectStmt().getCreateStmt().toThrift());
313  ddl.setDdl_params(req);
314  metadata.setColumns(Arrays.asList(
315  new TColumn("summary", Type.STRING.toThrift())));
316  } else if (analysis.isCreateTableLikeStmt()) {
317  ddl.op_type = TCatalogOpType.DDL;
318  TDdlExecRequest req = new TDdlExecRequest();
319  req.setDdl_type(TDdlType.CREATE_TABLE_LIKE);
320  req.setCreate_table_like_params(analysis.getCreateTableLikeStmt().toThrift());
321  ddl.setDdl_params(req);
322  metadata.setColumns(Collections.<TColumn>emptyList());
323  } else if (analysis.isCreateViewStmt()) {
324  ddl.op_type = TCatalogOpType.DDL;
325  TDdlExecRequest req = new TDdlExecRequest();
326  req.setDdl_type(TDdlType.CREATE_VIEW);
327  req.setCreate_view_params(analysis.getCreateViewStmt().toThrift());
328  ddl.setDdl_params(req);
329  metadata.setColumns(Collections.<TColumn>emptyList());
330  } else if (analysis.isCreateDbStmt()) {
331  ddl.op_type = TCatalogOpType.DDL;
332  TDdlExecRequest req = new TDdlExecRequest();
333  req.setDdl_type(TDdlType.CREATE_DATABASE);
334  req.setCreate_db_params(analysis.getCreateDbStmt().toThrift());
335  ddl.setDdl_params(req);
336  metadata.setColumns(Collections.<TColumn>emptyList());
337  } else if (analysis.isCreateUdfStmt()) {
338  ddl.op_type = TCatalogOpType.DDL;
339  CreateUdfStmt stmt = (CreateUdfStmt) analysis.getStmt();
340  TDdlExecRequest req = new TDdlExecRequest();
341  req.setDdl_type(TDdlType.CREATE_FUNCTION);
342  req.setCreate_fn_params(stmt.toThrift());
343  ddl.setDdl_params(req);
344  metadata.setColumns(Collections.<TColumn>emptyList());
345  } else if (analysis.isCreateUdaStmt()) {
346  ddl.op_type = TCatalogOpType.DDL;
347  TDdlExecRequest req = new TDdlExecRequest();
348  req.setDdl_type(TDdlType.CREATE_FUNCTION);
349  CreateUdaStmt stmt = (CreateUdaStmt)analysis.getStmt();
350  req.setCreate_fn_params(stmt.toThrift());
351  ddl.setDdl_params(req);
352  metadata.setColumns(Collections.<TColumn>emptyList());
353  } else if (analysis.isCreateDataSrcStmt()) {
354  ddl.op_type = TCatalogOpType.DDL;
355  TDdlExecRequest req = new TDdlExecRequest();
356  req.setDdl_type(TDdlType.CREATE_DATA_SOURCE);
357  CreateDataSrcStmt stmt = (CreateDataSrcStmt)analysis.getStmt();
358  req.setCreate_data_source_params(stmt.toThrift());
359  ddl.setDdl_params(req);
360  metadata.setColumns(Collections.<TColumn>emptyList());
361  } else if (analysis.isComputeStatsStmt()) {
362  ddl.op_type = TCatalogOpType.DDL;
363  TDdlExecRequest req = new TDdlExecRequest();
364  req.setDdl_type(TDdlType.COMPUTE_STATS);
365  req.setCompute_stats_params(analysis.getComputeStatsStmt().toThrift());
366  ddl.setDdl_params(req);
367  metadata.setColumns(Collections.<TColumn>emptyList());
368  } else if (analysis.isDropDbStmt()) {
369  ddl.op_type = TCatalogOpType.DDL;
370  TDdlExecRequest req = new TDdlExecRequest();
371  req.setDdl_type(TDdlType.DROP_DATABASE);
372  req.setDrop_db_params(analysis.getDropDbStmt().toThrift());
373  ddl.setDdl_params(req);
374  metadata.setColumns(Collections.<TColumn>emptyList());
375  } else if (analysis.isDropTableOrViewStmt()) {
376  ddl.op_type = TCatalogOpType.DDL;
377  TDdlExecRequest req = new TDdlExecRequest();
378  DropTableOrViewStmt stmt = analysis.getDropTableOrViewStmt();
379  req.setDdl_type(stmt.isDropTable() ? TDdlType.DROP_TABLE : TDdlType.DROP_VIEW);
380  req.setDrop_table_or_view_params(stmt.toThrift());
381  ddl.setDdl_params(req);
382  metadata.setColumns(Collections.<TColumn>emptyList());
383  } else if (analysis.isDropFunctionStmt()) {
384  ddl.op_type = TCatalogOpType.DDL;
385  TDdlExecRequest req = new TDdlExecRequest();
386  req.setDdl_type(TDdlType.DROP_FUNCTION);
387  DropFunctionStmt stmt = (DropFunctionStmt)analysis.getStmt();
388  req.setDrop_fn_params(stmt.toThrift());
389  ddl.setDdl_params(req);
390  metadata.setColumns(Collections.<TColumn>emptyList());
391  } else if (analysis.isDropDataSrcStmt()) {
392  ddl.op_type = TCatalogOpType.DDL;
393  TDdlExecRequest req = new TDdlExecRequest();
394  req.setDdl_type(TDdlType.DROP_DATA_SOURCE);
395  DropDataSrcStmt stmt = (DropDataSrcStmt)analysis.getStmt();
396  req.setDrop_data_source_params(stmt.toThrift());
397  ddl.setDdl_params(req);
398  metadata.setColumns(Collections.<TColumn>emptyList());
399  } else if (analysis.isDropStatsStmt()) {
400  ddl.op_type = TCatalogOpType.DDL;
401  TDdlExecRequest req = new TDdlExecRequest();
402  req.setDdl_type(TDdlType.DROP_STATS);
403  DropStatsStmt stmt = (DropStatsStmt) analysis.getStmt();
404  req.setDrop_stats_params(stmt.toThrift());
405  ddl.setDdl_params(req);
406  metadata.setColumns(Collections.<TColumn>emptyList());
407  } else if (analysis.isResetMetadataStmt()) {
408  ddl.op_type = TCatalogOpType.RESET_METADATA;
409  ResetMetadataStmt resetMetadataStmt = (ResetMetadataStmt) analysis.getStmt();
410  TResetMetadataRequest req = resetMetadataStmt.toThrift();
411  ddl.setReset_metadata_params(req);
412  metadata.setColumns(Collections.<TColumn>emptyList());
413  } else if (analysis.isShowRolesStmt()) {
414  ddl.op_type = TCatalogOpType.SHOW_ROLES;
415  ShowRolesStmt showRolesStmt = (ShowRolesStmt) analysis.getStmt();
416  ddl.setShow_roles_params(showRolesStmt.toThrift());
417  Set<String> groupNames =
418  getAuthzChecker().getUserGroups(analysis.getAnalyzer().getUser());
419  // Check if the user is part of the group (case-sensitive) this SHOW ROLE
420  // statement is targeting. If they are already a member of the group,
421  // the admin requirement can be removed.
422  Preconditions.checkState(ddl.getShow_roles_params().isSetIs_admin_op());
423  if (ddl.getShow_roles_params().isSetGrant_group() &&
424  groupNames.contains(ddl.getShow_roles_params().getGrant_group())) {
425  ddl.getShow_roles_params().setIs_admin_op(false);
426  }
427  metadata.setColumns(Arrays.asList(
428  new TColumn("role_name", Type.STRING.toThrift())));
429  } else if (analysis.isShowGrantRoleStmt()) {
430  ddl.op_type = TCatalogOpType.SHOW_GRANT_ROLE;
431  ShowGrantRoleStmt showGrantRoleStmt = (ShowGrantRoleStmt) analysis.getStmt();
432  ddl.setShow_grant_role_params(showGrantRoleStmt.toThrift());
433  Set<String> groupNames =
434  getAuthzChecker().getUserGroups(analysis.getAnalyzer().getUser());
435  // User must be an admin to execute this operation if they have not been granted
436  // this role.
437  ddl.getShow_grant_role_params().setIs_admin_op(Sets.intersection(groupNames,
438  showGrantRoleStmt.getRole().getGrantGroups()).isEmpty());
439  metadata.setColumns(Arrays.asList(
440  new TColumn("name", Type.STRING.toThrift())));
441  } else if (analysis.isCreateDropRoleStmt()) {
442  CreateDropRoleStmt createDropRoleStmt = (CreateDropRoleStmt) analysis.getStmt();
443  TCreateDropRoleParams params = createDropRoleStmt.toThrift();
444  TDdlExecRequest req = new TDdlExecRequest();
445  req.setDdl_type(params.isIs_drop() ? TDdlType.DROP_ROLE : TDdlType.CREATE_ROLE);
446  req.setCreate_drop_role_params(params);
447  ddl.op_type = TCatalogOpType.DDL;
448  ddl.setDdl_params(req);
449  metadata.setColumns(Collections.<TColumn>emptyList());
450  } else if (analysis.isGrantRevokeRoleStmt()) {
451  GrantRevokeRoleStmt grantRoleStmt = (GrantRevokeRoleStmt) analysis.getStmt();
452  TGrantRevokeRoleParams params = grantRoleStmt.toThrift();
453  TDdlExecRequest req = new TDdlExecRequest();
454  req.setDdl_type(params.isIs_grant() ? TDdlType.GRANT_ROLE : TDdlType.REVOKE_ROLE);
455  req.setGrant_revoke_role_params(params);
456  ddl.op_type = TCatalogOpType.DDL;
457  ddl.setDdl_params(req);
458  metadata.setColumns(Collections.<TColumn>emptyList());
459  } else if (analysis.isGrantRevokePrivStmt()) {
460  GrantRevokePrivStmt grantRevokePrivStmt = (GrantRevokePrivStmt) analysis.getStmt();
461  TGrantRevokePrivParams params = grantRevokePrivStmt.toThrift();
462  TDdlExecRequest req = new TDdlExecRequest();
463  req.setDdl_type(params.isIs_grant() ?
464  TDdlType.GRANT_PRIVILEGE : TDdlType.REVOKE_PRIVILEGE);
465  req.setGrant_revoke_priv_params(params);
466  ddl.op_type = TCatalogOpType.DDL;
467  ddl.setDdl_params(req);
468  metadata.setColumns(Collections.<TColumn>emptyList());
469  } else {
470  throw new IllegalStateException("Unexpected CatalogOp statement type.");
471  }
472 
473  result.setResult_set_metadata(metadata);
474  result.setCatalog_op_request(ddl);
475  if (ddl.getOp_type() == TCatalogOpType.DDL) {
476  TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader();
477  header.setRequesting_user(analysis.getAnalyzer().getUser().getName());
478  ddl.getDdl_params().setHeader(header);
479  }
480  }
481 
488  public TLoadDataResp loadTableData(TLoadDataReq request) throws ImpalaException,
489  IOException {
490  TableName tableName = TableName.fromThrift(request.getTable_name());
491 
492  // Get the destination for the load. If the load is targeting a partition,
493  // this the partition location. Otherwise this is the table location.
494  String destPathString = null;
495  if (request.isSetPartition_spec()) {
496  destPathString = impaladCatalog_.getHdfsPartition(tableName.getDb(),
497  tableName.getTbl(), request.getPartition_spec()).getLocation();
498  } else {
499  destPathString = impaladCatalog_.getTable(tableName.getDb(), tableName.getTbl())
500  .getMetaStoreTable().getSd().getLocation();
501  }
502 
503  Path destPath = new Path(destPathString);
504  FileSystem fs = destPath.getFileSystem(FileSystemUtil.getConfiguration());
505 
506  // Create a temporary directory within the final destination directory to stage the
507  // file move.
508  Path tmpDestPath = FileSystemUtil.makeTmpSubdirectory(destPath);
509 
510  Path sourcePath = new Path(request.source_path);
511  int filesLoaded = 0;
512  if (fs.isDirectory(sourcePath)) {
513  filesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath, tmpDestPath);
514  } else {
515  FileSystemUtil.relocateFile(sourcePath, tmpDestPath, true);
516  filesLoaded = 1;
517  }
518 
519  // If this is an OVERWRITE, delete all files in the destination.
520  if (request.isOverwrite()) {
521  FileSystemUtil.deleteAllVisibleFiles(destPath);
522  }
523 
524  // Move the files from the temporary location to the final destination.
525  FileSystemUtil.relocateAllVisibleFiles(tmpDestPath, destPath);
526  // Cleanup the tmp directory.
527  fs.delete(tmpDestPath, true);
528  TLoadDataResp response = new TLoadDataResp();
529  TColumnValue col = new TColumnValue();
530  String loadMsg = String.format(
531  "Loaded %d file(s). Total files in destination location: %d",
532  filesLoaded, FileSystemUtil.getTotalNumVisibleFiles(destPath));
533  col.setString_val(loadMsg);
534  response.setLoad_summary(new TResultRow(Lists.newArrayList(col)));
535  return response;
536  }
537 
542  public String getExplainString(TQueryCtx queryCtx) throws ImpalaException {
543  StringBuilder stringBuilder = new StringBuilder();
544  createExecRequest(queryCtx, stringBuilder);
545  return stringBuilder.toString();
546  }
547 
553  public List<String> getTableNames(String dbName, String tablePattern, User user)
554  throws ImpalaException {
555  List<String> tblNames = impaladCatalog_.getTableNames(dbName, tablePattern);
556  if (authzConfig_.isEnabled()) {
557  Iterator<String> iter = tblNames.iterator();
558  while (iter.hasNext()) {
559  PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder()
560  .allOf(Privilege.ANY).onTable(dbName, iter.next()).toRequest();
561  if (!authzChecker_.get().hasAccess(user, privilegeRequest)) {
562  iter.remove();
563  }
564  }
565  }
566  return tblNames;
567  }
568 
573  public List<String> getDbNames(String dbPattern, User user) {
574  List<String> dbNames = impaladCatalog_.getDbNames(dbPattern);
575  // If authorization is enabled, filter out the databases the user does not
576  // have permissions on.
577  if (authzConfig_.isEnabled()) {
578  Iterator<String> iter = dbNames.iterator();
579  while (iter.hasNext()) {
580  String dbName = iter.next();
581  // Default DB should always be shown.
582  if (dbName.toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) continue;
584  .any().onAnyTable(dbName).toRequest();
585  if (!authzChecker_.get().hasAccess(user, request)) {
586  iter.remove();
587  }
588  }
589  }
590  return dbNames;
591  }
592 
597  public List<DataSource> getDataSrcs(String pattern) {
598  return impaladCatalog_.getDataSources(pattern);
599  }
600 
604  public TResultSet getColumnStats(String dbName, String tableName)
605  throws ImpalaException {
606  Table table = impaladCatalog_.getTable(dbName, tableName);
607  TResultSet result = new TResultSet();
608  TResultSetMetadata resultSchema = new TResultSetMetadata();
609  result.setSchema(resultSchema);
610  resultSchema.addToColumns(new TColumn("Column", Type.STRING.toThrift()));
611  resultSchema.addToColumns(new TColumn("Type", Type.STRING.toThrift()));
612  resultSchema.addToColumns(
613  new TColumn("#Distinct Values", Type.BIGINT.toThrift()));
614  resultSchema.addToColumns(new TColumn("#Nulls", Type.BIGINT.toThrift()));
615  resultSchema.addToColumns(new TColumn("Max Size", Type.INT.toThrift()));
616  resultSchema.addToColumns(new TColumn("Avg Size", Type.DOUBLE.toThrift()));
617 
618  for (Column c: table.getColumnsInHiveOrder()) {
619  TResultRowBuilder rowBuilder = new TResultRowBuilder();
620  // Add name, type, NDVs, numNulls, max size and avg size.
621  rowBuilder.add(c.getName()).add(c.getType().toSql())
622  .add(c.getStats().getNumDistinctValues()).add(c.getStats().getNumNulls())
623  .add(c.getStats().getMaxSize()).add(c.getStats().getAvgSize());
624  result.addToRows(rowBuilder.get());
625  }
626  return result;
627  }
628 
632  public TResultSet getTableStats(String dbName, String tableName)
633  throws ImpalaException {
634  Table table = impaladCatalog_.getTable(dbName, tableName);
635  if (table instanceof HdfsTable) {
636  return ((HdfsTable) table).getTableStats();
637  } else if (table instanceof HBaseTable) {
638  return ((HBaseTable) table).getTableStats();
639  } else if (table instanceof DataSourceTable) {
640  return ((DataSourceTable) table).getTableStats();
641  } else {
642  throw new InternalException("Invalid table class: " + table.getClass());
643  }
644  }
645 
650  public List<Function> getFunctions(TFunctionCategory category,
651  String dbName, String fnPattern)
653  Db db = impaladCatalog_.getDb(dbName);
654  if (db == null) {
655  throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
656  }
657  List<Function> fns = db.getFunctions(
658  category, PatternMatcher.createHivePatternMatcher(fnPattern));
659  Collections.sort(fns,
660  new Comparator<Function>() {
661  public int compare(Function f1, Function f2) {
662  return f1.signatureString().compareTo(f2.signatureString());
663  }
664  });
665  return fns;
666  }
667 
673  public TDescribeTableResult describeTable(String dbName, String tableName,
674  TDescribeTableOutputStyle outputStyle) throws ImpalaException {
675  Table table = impaladCatalog_.getTable(dbName, tableName);
676  return DescribeResultFactory.buildDescribeTableResult(table, outputStyle);
677  }
678 
683  private Set<TableName> getMissingTbls(Set<TableName> tableNames) {
684  Set<TableName> missingTbls = new HashSet<TableName>();
685  for (TableName tblName: tableNames) {
686  Db db = getCatalog().getDb(tblName.getDb());
687  if (db == null) continue;
688  Table tbl = db.getTable(tblName.getTbl());
689  if (tbl == null) continue;
690  if (!tbl.isLoaded()) missingTbls.add(tblName);
691  }
692  return missingTbls;
693  }
694 
708  private boolean requestTblLoadAndWait(Set<TableName> requestedTbls, long timeoutMs)
709  throws InternalException {
710  Set<TableName> missingTbls = getMissingTbls(requestedTbls);
711  // There are no missing tables, return and avoid making an RPC to the CatalogServer.
712  if (missingTbls.isEmpty()) return true;
713 
714  // Call into the CatalogServer and request the required tables be loaded.
715  LOG.info(String.format("Requesting prioritized load of table(s): %s",
716  Joiner.on(", ").join(missingTbls)));
717  TStatus status = FeSupport.PrioritizeLoad(missingTbls);
718  if (status.getStatus_code() != TErrorCode.OK) {
719  throw new InternalException("Error requesting prioritized load: " +
720  Joiner.on("\n").join(status.getError_msgs()));
721  }
722 
723  long startTimeMs = System.currentTimeMillis();
724  // Wait until all the required tables are loaded in the Impalad's catalog cache.
725  while (!missingTbls.isEmpty()) {
726  // Check if the timeout has been reached.
727  if (timeoutMs > 0 && System.currentTimeMillis() - startTimeMs > timeoutMs) {
728  return false;
729  }
730 
731  LOG.trace(String.format("Waiting for table(s) to complete loading: %s",
732  Joiner.on(", ").join(missingTbls)));
733  getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS);
734  missingTbls = getMissingTbls(missingTbls);
735  // TODO: Check for query cancellation here.
736  }
737  return true;
738  }
739 
743  public boolean requestTblLoadAndWait(Set<TableName> requestedTbls)
744  throws InternalException {
746  }
747 
762  private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
764  AnalysisContext analysisCtx = new AnalysisContext(impaladCatalog_, queryCtx,
765  authzConfig_);
766  LOG.debug("analyze query " + queryCtx.request.stmt);
767 
768  // Run analysis in a loop until it any of the following events occur:
769  // 1) Analysis completes successfully.
770  // 2) Analysis fails with an AnalysisException AND there are no missing tables.
771  // 3) Analysis fails with an AuthorizationException.
772  try {
773  while (true) {
774  try {
775  analysisCtx.analyze(queryCtx.request.stmt);
776  Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty());
777  return analysisCtx.getAnalysisResult();
778  } catch (AnalysisException e) {
779  Set<TableName> missingTbls = analysisCtx.getAnalyzer().getMissingTbls();
780  // Only re-throw the AnalysisException if there were no missing tables.
781  if (missingTbls.isEmpty()) throw e;
782 
783  // Some tables/views were missing, request and wait for them to load.
785  LOG.info(String.format("Missing tables were not received in %dms. Load " +
786  "request will be retried.", MISSING_TBL_LOAD_WAIT_TIMEOUT_MS));
787  }
788  }
789  }
790  } finally {
791  // Authorize all accesses.
792  // AuthorizationExceptions must take precedence over any AnalysisException
793  // that has been thrown, so perform the authorization first.
794  analysisCtx.getAnalyzer().authorize(getAuthzChecker());
795  }
796  }
797 
801  public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
802  throws ImpalaException {
803  // Analyze the statement
804  AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
805  EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
806  timeline.markEvent("Analysis finished");
807  Preconditions.checkNotNull(analysisResult.getStmt());
808  TExecRequest result = new TExecRequest();
809  result.setQuery_options(queryCtx.request.getQuery_options());
810  result.setAccess_events(analysisResult.getAccessEvents());
811  result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
812 
813  if (analysisResult.isCatalogOp()) {
814  result.stmt_type = TStmtType.DDL;
815  createCatalogOpRequest(analysisResult, result);
816  String jsonLineageGraph = analysisResult.getJsonLineageGraph();
817  if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
818  result.catalog_op_request.setLineage_graph(jsonLineageGraph);
819  }
820  // All DDL operations except for CTAS are done with analysis at this point.
821  if (!analysisResult.isCreateTableAsSelectStmt()) return result;
822  } else if (analysisResult.isLoadDataStmt()) {
823  result.stmt_type = TStmtType.LOAD;
824  result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
825  new TColumn("summary", Type.STRING.toThrift()))));
826  result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
827  return result;
828  } else if (analysisResult.isSetStmt()) {
829  result.stmt_type = TStmtType.SET;
830  result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
831  new TColumn("option", Type.STRING.toThrift()),
832  new TColumn("value", Type.STRING.toThrift()))));
833  result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
834  return result;
835  }
836 
837  // create TQueryExecRequest
838  Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
839  || analysisResult.isCreateTableAsSelectStmt());
840 
841  TQueryExecRequest queryExecRequest = new TQueryExecRequest();
842  // create plan
843  LOG.debug("create plan");
844  Planner planner = new Planner(analysisResult, queryCtx);
845  ArrayList<PlanFragment> fragments = planner.createPlan();
846 
847  List<ScanNode> scanNodes = Lists.newArrayList();
848  // map from fragment to its index in queryExecRequest.fragments; needed for
849  // queryExecRequest.dest_fragment_idx
850  Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
851 
852  for (int fragmentId = 0; fragmentId < fragments.size(); ++fragmentId) {
853  PlanFragment fragment = fragments.get(fragmentId);
854  Preconditions.checkNotNull(fragment.getPlanRoot());
855  fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
856  fragmentIdx.put(fragment, fragmentId);
857  }
858 
859  // set fragment destinations
860  for (int i = 1; i < fragments.size(); ++i) {
861  PlanFragment dest = fragments.get(i).getDestFragment();
862  Integer idx = fragmentIdx.get(dest);
863  Preconditions.checkState(idx != null);
864  queryExecRequest.addToDest_fragment_idx(idx.intValue());
865  }
866 
867  // Set scan ranges/locations for scan nodes.
868  // Also assemble list of tables names missing stats for assembling a warning message.
869  LOG.debug("get scan range locations");
870  Set<TTableName> tablesMissingStats = Sets.newTreeSet();
871  for (ScanNode scanNode: scanNodes) {
872  queryExecRequest.putToPer_node_scan_ranges(
873  scanNode.getId().asInt(),
874  scanNode.getScanRangeLocations());
875  if (scanNode.isTableMissingStats()) {
876  tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
877  }
878  }
879  queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
880  for (TTableName tableName: tablesMissingStats) {
881  queryCtx.addToTables_missing_stats(tableName);
882  }
883 
884  // Optionally disable spilling in the backend. Allow spilling if there are plan hints
885  // or if all tables have stats.
886  if (queryCtx.request.query_options.isDisable_unsafe_spills()
887  && !tablesMissingStats.isEmpty()
888  && !analysisResult.getAnalyzer().hasPlanHints()) {
889  queryCtx.setDisable_spilling(true);
890  }
891 
892  // Compute resource requirements after scan range locations because the cost
893  // estimates of scan nodes rely on them.
894  try {
895  planner.computeResourceReqs(fragments, true, queryExecRequest);
896  } catch (Exception e) {
897  // Turn exceptions into a warning to allow the query to execute.
898  LOG.error("Failed to compute resource requirements for query\n" +
899  queryCtx.request.getStmt(), e);
900  }
901 
902  // The fragment at this point has all state set, serialize it to thrift.
903  for (PlanFragment fragment: fragments) {
904  TPlanFragment thriftFragment = fragment.toThrift();
905  queryExecRequest.addToFragments(thriftFragment);
906  }
907 
908  // Use VERBOSE by default for all non-explain statements.
909  TExplainLevel explainLevel = TExplainLevel.VERBOSE;
910  // Use the query option for explain stmts and tests (e.g., planner tests).
911  if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
912  explainLevel = queryCtx.request.query_options.getExplain_level();
913  }
914 
915  // Global query parameters to be set in each TPlanExecRequest.
916  queryExecRequest.setQuery_ctx(queryCtx);
917 
918  explainString.append(
919  planner.getExplainString(fragments, queryExecRequest, explainLevel));
920  queryExecRequest.setQuery_plan(explainString.toString());
921  queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
922 
923  String jsonLineageGraph = analysisResult.getJsonLineageGraph();
924  if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
925  queryExecRequest.setLineage_graph(jsonLineageGraph);
926  }
927 
928  if (analysisResult.isExplainStmt()) {
929  // Return the EXPLAIN request
930  createExplainRequest(explainString.toString(), result);
931  return result;
932  }
933 
934  result.setQuery_exec_request(queryExecRequest);
935 
936  if (analysisResult.isQueryStmt()) {
937  // fill in the metadata
938  LOG.debug("create result set metadata");
939  result.stmt_type = TStmtType.QUERY;
940  result.query_exec_request.stmt_type = result.stmt_type;
941  TResultSetMetadata metadata = new TResultSetMetadata();
942  QueryStmt queryStmt = analysisResult.getQueryStmt();
943  int colCnt = queryStmt.getColLabels().size();
944  for (int i = 0; i < colCnt; ++i) {
945  TColumn colDesc = new TColumn();
946  colDesc.columnName = queryStmt.getColLabels().get(i);
947  colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift();
948  metadata.addToColumns(colDesc);
949  }
950  result.setResult_set_metadata(metadata);
951  } else {
952  Preconditions.checkState(analysisResult.isInsertStmt() ||
953  analysisResult.isCreateTableAsSelectStmt());
954 
955  // For CTAS the overall TExecRequest statement type is DDL, but the
956  // query_exec_request should be DML
957  result.stmt_type =
958  analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
959  result.query_exec_request.stmt_type = TStmtType.DML;
960 
961  // create finalization params of insert stmt
962  InsertStmt insertStmt = analysisResult.getInsertStmt();
963  if (insertStmt.getTargetTable() instanceof HdfsTable) {
964  TFinalizeParams finalizeParams = new TFinalizeParams();
965  finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
966  finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
967  finalizeParams.setTable_id(insertStmt.getTargetTable().getId().asInt());
968  String db = insertStmt.getTargetTableName().getDb();
969  finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
970  HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable();
971  finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
972  finalizeParams.setStaging_dir(
973  hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
974  queryExecRequest.setFinalize_params(finalizeParams);
975  }
976  }
977 
978  timeline.markEvent("Planning finished");
979  result.setTimeline(analysisResult.getAnalyzer().getTimeline().toThrift());
980  return result;
981  }
982 
986  private void createExplainRequest(String explainString, TExecRequest result) {
987  // update the metadata - one string column
988  TColumn colDesc = new TColumn("Explain String", Type.STRING.toThrift());
989  TResultSetMetadata metadata = new TResultSetMetadata(Lists.newArrayList(colDesc));
990  result.setResult_set_metadata(metadata);
991 
992  // create the explain result set - split the explain string into one line per row
993  String[] explainStringArray = explainString.toString().split("\n");
994  TExplainResult explainResult = new TExplainResult();
995  explainResult.results = Lists.newArrayList();
996  for (int i = 0; i < explainStringArray.length; ++i) {
997  TColumnValue col = new TColumnValue();
998  col.setString_val(explainStringArray[i]);
999  TResultRow row = new TResultRow(Lists.newArrayList(col));
1000  explainResult.results.add(row);
1001  }
1002  result.setExplain_result(explainResult);
1003  result.stmt_type = TStmtType.EXPLAIN;
1004  }
1005 
1009  public TResultSet execHiveServer2MetadataOp(TMetadataOpRequest request)
1010  throws ImpalaException {
1011  User user = request.isSetSession() ?
1012  new User(TSessionStateUtil.getEffectiveUser(request.session)) :
1014  switch (request.opcode) {
1015  case GET_TYPE_INFO: return MetadataOp.getTypeInfo();
1016  case GET_SCHEMAS:
1017  {
1018  TGetSchemasReq req = request.getGet_schemas_req();
1019  return MetadataOp.getSchemas(this, req.getCatalogName(),
1020  req.getSchemaName(), user);
1021  }
1022  case GET_TABLES:
1023  {
1024  TGetTablesReq req = request.getGet_tables_req();
1025  return MetadataOp.getTables(this, req.getCatalogName(),
1026  req.getSchemaName(), req.getTableName(), req.getTableTypes(), user);
1027  }
1028  case GET_COLUMNS:
1029  {
1030  TGetColumnsReq req = request.getGet_columns_req();
1031  return MetadataOp.getColumns(this, req.getCatalogName(),
1032  req.getSchemaName(), req.getTableName(), req.getColumnName(), user);
1033  }
1034  case GET_CATALOGS: return MetadataOp.getCatalogs();
1035  case GET_TABLE_TYPES: return MetadataOp.getTableTypes();
1036  case GET_FUNCTIONS:
1037  {
1038  TGetFunctionsReq req = request.getGet_functions_req();
1039  return MetadataOp.getFunctions(this, req.getCatalogName(),
1040  req.getSchemaName(), req.getFunctionName(), user);
1041  }
1042  default:
1043  throw new NotImplementedException(request.opcode + " has not been implemented.");
1044  }
1045  }
1046 
1050  public TResultSet getTableFiles(TShowFilesParams request)
1051  throws ImpalaException{
1052  Table table = impaladCatalog_.getTable(request.getTable_name().getDb_name(),
1053  request.getTable_name().getTable_name());
1054  if (table instanceof HdfsTable) {
1055  return ((HdfsTable) table).getFiles(request.getPartition_spec());
1056  } else {
1057  throw new InternalException("SHOW FILES only supports Hdfs table. " +
1058  "Unsupported table class: " + table.getClass());
1059  }
1060  }
1061 }
List< String > getDbNames(String dbPattern, User user)
Definition: Frontend.java:573
ArrayList< Column > getColumnsInHiveOrder()
Definition: Table.java:373
TResultSet getColumnStats(String dbName, String tableName)
Definition: Frontend.java:604
static final ScalarType BIGINT
Definition: Type.java:50
AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
Definition: Frontend.java:762
AuthorizationChecker getAuthzChecker()
Definition: Frontend.java:205
boolean requestTblLoadAndWait(Set< TableName > requestedTbls)
Definition: Frontend.java:743
static final ScalarType STRING
Definition: Type.java:53
boolean requestTblLoadAndWait(Set< TableName > requestedTbls, long timeoutMs)
Definition: Frontend.java:708
final AtomicReference< AuthorizationChecker > authzChecker_
Definition: Frontend.java:155
TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
Definition: Frontend.java:801
List< DataSource > getDataSrcs(String pattern)
Definition: Frontend.java:597
void createCatalogOpRequest(AnalysisContext.AnalysisResult analysis, TExecRequest result)
Definition: Frontend.java:232
TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req)
Definition: Frontend.java:207
Set< TableName > getMissingTbls(Set< TableName > tableNames)
Definition: Frontend.java:683
Planner planner
Frontend(AuthorizationConfig authorizationConfig, ImpaladCatalog catalog)
Definition: Frontend.java:166
Frontend(AuthorizationConfig authorizationConfig)
Definition: Frontend.java:159
static final ScalarType DOUBLE
Definition: Type.java:52
final AuthorizationConfig authzConfig_
Definition: Frontend.java:154
String getExplainString(TQueryCtx queryCtx)
Definition: Frontend.java:542
final ScheduledExecutorService policyReader_
Definition: Frontend.java:156
void createExplainRequest(String explainString, TExecRequest result)
Definition: Frontend.java:986
TDescribeTableResult describeTable(String dbName, String tableName, TDescribeTableOutputStyle outputStyle)
Definition: Frontend.java:673
static final ScalarType INT
Definition: Type.java:49
TResultSet getTableFiles(TShowFilesParams request)
Definition: Frontend.java:1050
TLoadDataResp loadTableData(TLoadDataReq request)
Definition: Frontend.java:488
void toThrift(TColumnType container)
TResultSet getTableStats(String dbName, String tableName)
Definition: Frontend.java:632
List< String > getTableNames(String dbName, String tablePattern, User user)
Definition: Frontend.java:553
TResultSet execHiveServer2MetadataOp(TMetadataOpRequest request)
Definition: Frontend.java:1009
static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS
Definition: Frontend.java:151
List< Function > getFunctions(TFunctionCategory category, String dbName, String fnPattern)
Definition: Frontend.java:650
static String getEffectiveUser(TSessionState session)