Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
JniFrontend.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.service;
16 
17 import java.io.BufferedReader;
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.InputStreamReader;
21 import java.net.URI;
22 import java.net.URL;
23 import java.net.URLConnection;
24 import java.util.Collections;
25 import java.util.Enumeration;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Random;
29 import java.util.Set;
30 import java.util.regex.Matcher;
31 import java.util.regex.Pattern;
32 
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hdfs.DFSConfigKeys;
38 import org.apache.hadoop.hdfs.DFSUtil;
39 import org.apache.hadoop.hdfs.DistributedFileSystem;
40 import org.apache.hadoop.hdfs.HAUtil;
41 import org.apache.log4j.Appender;
42 import org.apache.log4j.FileAppender;
43 import org.apache.thrift.TException;
44 import org.apache.thrift.TSerializer;
45 import org.apache.thrift.protocol.TBinaryProtocol;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 
60 import com.cloudera.impala.thrift.TCatalogObject;
61 import com.cloudera.impala.thrift.TDescribeTableParams;
62 import com.cloudera.impala.thrift.TDescribeTableResult;
63 import com.cloudera.impala.thrift.TExecRequest;
64 import com.cloudera.impala.thrift.TGetAllHadoopConfigsResponse;
65 import com.cloudera.impala.thrift.TGetDataSrcsParams;
66 import com.cloudera.impala.thrift.TGetDataSrcsResult;
67 import com.cloudera.impala.thrift.TGetDbsParams;
68 import com.cloudera.impala.thrift.TGetDbsResult;
69 import com.cloudera.impala.thrift.TGetFunctionsParams;
70 import com.cloudera.impala.thrift.TGetFunctionsResult;
71 import com.cloudera.impala.thrift.TGetHadoopConfigRequest;
72 import com.cloudera.impala.thrift.TGetHadoopConfigResponse;
73 import com.cloudera.impala.thrift.TGetTablesParams;
74 import com.cloudera.impala.thrift.TGetTablesResult;
75 import com.cloudera.impala.thrift.TLoadDataReq;
76 import com.cloudera.impala.thrift.TLoadDataResp;
77 import com.cloudera.impala.thrift.TLogLevel;
78 import com.cloudera.impala.thrift.TMetadataOpRequest;
79 import com.cloudera.impala.thrift.TQueryCtx;
80 import com.cloudera.impala.thrift.TResultSet;
81 import com.cloudera.impala.thrift.TShowFilesParams;
82 import com.cloudera.impala.thrift.TShowGrantRoleParams;
83 import com.cloudera.impala.thrift.TShowRolesParams;
84 import com.cloudera.impala.thrift.TShowRolesResult;
85 import com.cloudera.impala.thrift.TShowStatsParams;
86 import com.cloudera.impala.thrift.TTableName;
87 import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest;
90 import com.google.common.base.Preconditions;
91 import com.google.common.collect.Lists;
92 import com.google.common.collect.Maps;
93 import com.google.common.collect.Sets;
94 
99 public class JniFrontend {
100  private final static Logger LOG = LoggerFactory.getLogger(JniFrontend.class);
101  private final static TBinaryProtocol.Factory protocolFactory_ =
102  new TBinaryProtocol.Factory();
103  private final Frontend frontend_;
104 
105  // Required minimum value (in milliseconds) for the HDFS config
106  // 'dfs.client.file-block-storage-locations.timeout.millis'
108  10 * 1000;
109 
113  public JniFrontend(boolean lazy, String serverName, String authorizationPolicyFile,
114  String sentryConfigFile, String authPolicyProviderClass, int impalaLogLevel,
115  int otherLogLevel) throws InternalException {
116  GlogAppender.Install(TLogLevel.values()[impalaLogLevel],
117  TLogLevel.values()[otherLogLevel]);
118 
119  // Validate the authorization configuration before initializing the Frontend.
120  // If there are any configuration problems Impala startup will fail.
121  AuthorizationConfig authConfig = new AuthorizationConfig(serverName,
122  authorizationPolicyFile, sentryConfigFile, authPolicyProviderClass);
123  authConfig.validateConfig();
124  if (authConfig.isEnabled()) {
125  LOG.info(String.format("Authorization is 'ENABLED' using %s",
126  authConfig.isFileBasedPolicy() ? " file based policy from: " +
127  authConfig.getPolicyFile() : " using Sentry Policy Service."));
128  } else {
129  LOG.info("Authorization is 'DISABLED'.");
130  }
131  LOG.info(JniUtil.getJavaVersion());
132 
133  frontend_ = new Frontend(authConfig);
134  }
135 
140  public byte[] createExecRequest(byte[] thriftQueryContext)
141  throws ImpalaException {
142  TQueryCtx queryCtx = new TQueryCtx();
143  JniUtil.deserializeThrift(protocolFactory_, queryCtx, thriftQueryContext);
144 
145  StringBuilder explainString = new StringBuilder();
146  TExecRequest result = frontend_.createExecRequest(queryCtx, explainString);
147  if (explainString.length() > 0) LOG.debug(explainString.toString());
148 
149  // TODO: avoid creating serializer for each query?
150  TSerializer serializer = new TSerializer(protocolFactory_);
151  try {
152  return serializer.serialize(result);
153  } catch (TException e) {
154  throw new InternalException(e.getMessage());
155  }
156  }
157 
158  public byte[] updateCatalogCache(byte[] thriftCatalogUpdate) throws ImpalaException {
159  TUpdateCatalogCacheRequest req = new TUpdateCatalogCacheRequest();
160  JniUtil.deserializeThrift(protocolFactory_, req, thriftCatalogUpdate);
161  TSerializer serializer = new TSerializer(protocolFactory_);
162  try {
163  return serializer.serialize(frontend_.updateCatalogCache(req));
164  } catch (TException e) {
165  throw new InternalException(e.getMessage());
166  }
167  }
168 
175  public byte[] loadTableData(byte[] thriftLoadTableDataParams)
176  throws ImpalaException, IOException {
177  TLoadDataReq request = new TLoadDataReq();
178  JniUtil.deserializeThrift(protocolFactory_, request, thriftLoadTableDataParams);
179  TLoadDataResp response = frontend_.loadTableData(request);
180  TSerializer serializer = new TSerializer(protocolFactory_);
181  try {
182  return serializer.serialize(response);
183  } catch (TException e) {
184  throw new InternalException(e.getMessage());
185  }
186  }
187 
192  public String getExplainPlan(byte[] thriftQueryContext) throws ImpalaException {
193  TQueryCtx queryCtx = new TQueryCtx();
194  JniUtil.deserializeThrift(protocolFactory_, queryCtx, thriftQueryContext);
195  String plan = frontend_.getExplainString(queryCtx);
196  LOG.debug("Explain plan: " + plan);
197  return plan;
198  }
199 
200 
207  public byte[] getTableNames(byte[] thriftGetTablesParams) throws ImpalaException {
208  TGetTablesParams params = new TGetTablesParams();
209  JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams);
210  // If the session was not set it indicates this is an internal Impala call.
211  User user = params.isSetSession() ?
212  new User(TSessionStateUtil.getEffectiveUser(params.getSession())) :
213  ImpalaInternalAdminUser.getInstance();
214 
215  Preconditions.checkState(!params.isSetSession() || user != null );
216  List<String> tables = frontend_.getTableNames(params.db, params.pattern, user);
217 
218  TGetTablesResult result = new TGetTablesResult();
219  result.setTables(tables);
220 
221  TSerializer serializer = new TSerializer(protocolFactory_);
222  try {
223  return serializer.serialize(result);
224  } catch (TException e) {
225  throw new InternalException(e.getMessage());
226  }
227  }
228 
235  public byte[] getTableFiles(byte[] thriftShowFilesParams) throws ImpalaException {
236  TShowFilesParams params = new TShowFilesParams();
237  JniUtil.deserializeThrift(protocolFactory_, params, thriftShowFilesParams);
238  TResultSet result = frontend_.getTableFiles(params);
239 
240  TSerializer serializer = new TSerializer(protocolFactory_);
241  try {
242  return serializer.serialize(result);
243  } catch (TException e) {
244  throw new InternalException(e.getMessage());
245  }
246  }
247 
254  public byte[] getDbNames(byte[] thriftGetTablesParams) throws ImpalaException {
255  TGetDbsParams params = new TGetDbsParams();
256  JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams);
257  // If the session was not set it indicates this is an internal Impala call.
258  User user = params.isSetSession() ?
259  new User(TSessionStateUtil.getEffectiveUser(params.getSession())) :
260  ImpalaInternalAdminUser.getInstance();
261  List<String> dbs = frontend_.getDbNames(params.pattern, user);
262 
263  TGetDbsResult result = new TGetDbsResult();
264  result.setDbs(dbs);
265 
266  TSerializer serializer = new TSerializer(protocolFactory_);
267  try {
268  return serializer.serialize(result);
269  } catch (TException e) {
270  throw new InternalException(e.getMessage());
271  }
272  }
273 
280  public byte[] getDataSrcMetadata(byte[] thriftParams) throws ImpalaException {
281  TGetDataSrcsParams params = new TGetDataSrcsParams();
282  JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
283 
284  TGetDataSrcsResult result = new TGetDataSrcsResult();
285  List<DataSource> dataSources = frontend_.getDataSrcs(params.pattern);
286  result.setData_src_names(Lists.<String>newArrayListWithCapacity(dataSources.size()));
287  result.setLocations(Lists.<String>newArrayListWithCapacity(dataSources.size()));
288  result.setClass_names(Lists.<String>newArrayListWithCapacity(dataSources.size()));
289  result.setApi_versions(Lists.<String>newArrayListWithCapacity(dataSources.size()));
290  for (DataSource dataSource: dataSources) {
291  result.addToData_src_names(dataSource.getName());
292  result.addToLocations(dataSource.getLocation());
293  result.addToClass_names(dataSource.getClassName());
294  result.addToApi_versions(dataSource.getApiVersion());
295  }
296  TSerializer serializer = new TSerializer(protocolFactory_);
297  try {
298  return serializer.serialize(result);
299  } catch (TException e) {
300  throw new InternalException(e.getMessage());
301  }
302  }
303 
304  public byte[] getStats(byte[] thriftShowStatsParams) throws ImpalaException {
305  TShowStatsParams params = new TShowStatsParams();
306  JniUtil.deserializeThrift(protocolFactory_, params, thriftShowStatsParams);
307  Preconditions.checkState(params.isSetTable_name());
308  TResultSet result;
309  if (params.isIs_show_col_stats()) {
310  result = frontend_.getColumnStats(params.getTable_name().getDb_name(),
311  params.getTable_name().getTable_name());
312  } else {
313  result = frontend_.getTableStats(params.getTable_name().getDb_name(),
314  params.getTable_name().getTable_name());
315  }
316  TSerializer serializer = new TSerializer(protocolFactory_);
317  try {
318  return serializer.serialize(result);
319  } catch (TException e) {
320  throw new InternalException(e.getMessage());
321  }
322  }
323 
330  public byte[] getFunctions(byte[] thriftGetFunctionsParams) throws ImpalaException {
331  TGetFunctionsParams params = new TGetFunctionsParams();
332  JniUtil.deserializeThrift(protocolFactory_, params, thriftGetFunctionsParams);
333 
334  TGetFunctionsResult result = new TGetFunctionsResult();
335  List<String> signatures = Lists.newArrayList();
336  List<String> retTypes = Lists.newArrayList();
337  List<Function> fns = frontend_.getFunctions(params.category, params.db, params.pattern);
338  for (Function fn: fns) {
339  signatures.add(fn.signatureString());
340  retTypes.add(fn.getReturnType().toString());
341  }
342  result.setFn_signatures(signatures);
343  result.setFn_ret_types(retTypes);
344  TSerializer serializer = new TSerializer(protocolFactory_);
345  try {
346  return serializer.serialize(result);
347  } catch (TException e) {
348  throw new InternalException(e.getMessage());
349  }
350  }
351 
355  public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException,
356  TException {
357  TCatalogObject objectDescription = new TCatalogObject();
358  JniUtil.deserializeThrift(protocolFactory_, objectDescription, thriftParams);
359  TSerializer serializer = new TSerializer(protocolFactory_);
360  return serializer.serialize(
361  frontend_.getCatalog().getTCatalogObject(objectDescription));
362  }
363 
370  public byte[] describeTable(byte[] thriftDescribeTableParams) throws ImpalaException {
371  TDescribeTableParams params = new TDescribeTableParams();
372  JniUtil.deserializeThrift(protocolFactory_, params, thriftDescribeTableParams);
373 
374  TDescribeTableResult result = frontend_.describeTable(
375  params.getDb(), params.getTable_name(), params.getOutput_style());
376 
377  TSerializer serializer = new TSerializer(protocolFactory_);
378  try {
379  return serializer.serialize(result);
380  } catch (TException e) {
381  throw new InternalException(e.getMessage());
382  }
383  }
384 
388  public String showCreateTable(byte[] thriftTableName)
389  throws ImpalaException {
390  TTableName params = new TTableName();
391  JniUtil.deserializeThrift(protocolFactory_, params, thriftTableName);
392  return ToSqlUtils.getCreateTableSql(frontend_.getCatalog().getTable(
393  params.getDb_name(), params.getTable_name()));
394  }
395 
399  public byte[] getRoles(byte[] showRolesParams) throws ImpalaException {
400  TShowRolesParams params = new TShowRolesParams();
401  JniUtil.deserializeThrift(protocolFactory_, params, showRolesParams);
402  TShowRolesResult result = new TShowRolesResult();
403 
404  List<Role> roles = Lists.newArrayList();
405  if (params.isIs_show_current_roles() || params.isSetGrant_group()) {
406  User user = new User(params.getRequesting_user());
407  Set<String> groupNames;
408  if (params.isIs_show_current_roles()) {
409  groupNames = frontend_.getAuthzChecker().getUserGroups(user);
410  } else {
411  Preconditions.checkState(params.isSetGrant_group());
412  groupNames = Sets.newHashSet(params.getGrant_group());
413  }
414  for (String groupName: groupNames) {
415  roles.addAll(frontend_.getCatalog().getAuthPolicy().getGrantedRoles(groupName));
416  }
417  } else {
418  Preconditions.checkState(!params.isIs_show_current_roles());
419  roles = frontend_.getCatalog().getAuthPolicy().getAllRoles();
420  }
421 
422  result.setRole_names(Lists.<String>newArrayListWithExpectedSize(roles.size()));
423  for (Role role: roles) {
424  result.getRole_names().add(role.getName());
425  }
426 
427  Collections.sort(result.getRole_names());
428  TSerializer serializer = new TSerializer(protocolFactory_);
429  try {
430  return serializer.serialize(result);
431  } catch (TException e) {
432  throw new InternalException(e.getMessage());
433  }
434  }
435 
436  public byte[] getRolePrivileges(byte[] showGrantRolesParams) throws ImpalaException {
437  TShowGrantRoleParams params = new TShowGrantRoleParams();
438  JniUtil.deserializeThrift(protocolFactory_, params, showGrantRolesParams);
439  TResultSet result = frontend_.getCatalog().getAuthPolicy().getRolePrivileges(
440  params.getRole_name(), params.getPrivilege());
441  TSerializer serializer = new TSerializer(protocolFactory_);
442  try {
443  return serializer.serialize(result);
444  } catch (TException e) {
445  throw new InternalException(e.getMessage());
446  }
447  }
448 
452  public byte[] execHiveServer2MetadataOp(byte[] metadataOpsParams)
453  throws ImpalaException {
454  TMetadataOpRequest params = new TMetadataOpRequest();
455  JniUtil.deserializeThrift(protocolFactory_, params, metadataOpsParams);
456  TResultSet result = frontend_.execHiveServer2MetadataOp(params);
457 
458  TSerializer serializer = new TSerializer(protocolFactory_);
459  try {
460  return serializer.serialize(result);
461  } catch (TException e) {
462  throw new InternalException(e.getMessage());
463  }
464  }
465 
466  public void setCatalogInitialized() {
467  frontend_.getCatalog().setIsReady();
468  }
469 
470  // Caching this saves ~50ms per call to getHadoopConfigAsHtml
471  private static final Configuration CONF = new Configuration();
472 
477  public byte[] getAllHadoopConfigs() throws ImpalaException {
478  Map<String, String> configs = Maps.newHashMap();
479  for (Map.Entry<String, String> e: CONF) {
480  configs.put(e.getKey(), e.getValue());
481  }
482  TGetAllHadoopConfigsResponse result = new TGetAllHadoopConfigsResponse();
483  result.setConfigs(configs);
484  TSerializer serializer = new TSerializer(protocolFactory_);
485  try {
486  return serializer.serialize(result);
487  } catch (TException e) {
488  throw new InternalException(e.getMessage());
489  }
490  }
491 
497  public byte[] getHadoopConfig(byte[] serializedRequest) throws ImpalaException {
498  TGetHadoopConfigRequest request = new TGetHadoopConfigRequest();
499  JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
500  TGetHadoopConfigResponse result = new TGetHadoopConfigResponse();
501  result.setValue(CONF.get(request.getName()));
502  TSerializer serializer = new TSerializer(protocolFactory_);
503  try {
504  return serializer.serialize(result);
505  } catch (TException e) {
506  throw new InternalException(e.getMessage());
507  }
508  }
509 
510  public class CdhVersion implements Comparable<CdhVersion> {
511  private final int major;
512  private final int minor;
513 
514  public CdhVersion(String versionString) throws IllegalArgumentException {
515  String[] version = versionString.split("\\.");
516  if (version.length != 2) {
517  throw new IllegalArgumentException("Invalid version string:" + versionString);
518  }
519  try {
520  major = Integer.parseInt(version[0]);
521  minor = Integer.parseInt(version[1]);
522  } catch (NumberFormatException e) {
523  throw new IllegalArgumentException("Invalid version string:" + versionString);
524  }
525  }
526 
527  public int compareTo(CdhVersion o) {
528  return (this.major == o.major) ? (this.minor - o.minor) : (this.major - o.major);
529  }
530 
531  @Override
532  public String toString() {
533  return major + "." + minor;
534  }
535  }
536 
543  public String checkConfiguration() {
544  CdhVersion guessedCdhVersion = guessCdhVersionFromNnWebUi();
545  CdhVersion cdh41 = new CdhVersion("4.1");
546  CdhVersion cdh42 = new CdhVersion("4.2");
547  StringBuilder output = new StringBuilder();
548 
549  output.append(checkLogFilePermission());
550  output.append(checkFileSystem(CONF));
551 
552  if (guessedCdhVersion == null) {
553  // Do not run any additional checks because we cannot determine the CDH version
554  LOG.warn("Cannot detect CDH version. Skipping Hadoop configuration checks");
555  return output.toString();
556  }
557 
558  if (guessedCdhVersion.compareTo(cdh41) == 0) {
559  output.append(checkShortCircuitReadCdh41(CONF));
560  } else if (guessedCdhVersion.compareTo(cdh42) >= 0) {
561  output.append(checkShortCircuitRead(CONF));
562  } else {
563  output.append(guessedCdhVersion)
564  .append(" is detected but Impala requires CDH 4.1 or above.");
565  }
566  output.append(checkBlockLocationTracking(CONF));
567 
568  return output.toString();
569  }
570 
575  private String checkLogFilePermission() {
576  org.apache.log4j.Logger l4jRootLogger = org.apache.log4j.Logger.getRootLogger();
577  Enumeration appenders = l4jRootLogger.getAllAppenders();
578  while (appenders.hasMoreElements()) {
579  Appender appender = (Appender) appenders.nextElement();
580  if (appender instanceof FileAppender) {
581  if (((FileAppender) appender).getFile() == null) {
582  // If Impala does not have permission to write to the log file, the
583  // FileAppender will fail to initialize and logFile will be null.
584  // Unfortunately, we can't get the log file name here.
585  return "Impala does not have permission to write to the log file specified " +
586  "in log4j.properties.";
587  }
588  }
589  }
590  return "";
591  }
592 
598  try {
599  // On a large cluster, avoid hitting the name node at the same time
600  Random randomGenerator = new Random();
601  Thread.sleep(randomGenerator.nextInt(2000));
602  } catch (Exception e) {
603  }
604 
605  try {
606  URI nnUri = getCurrentNameNodeAddress();
607  if (nnUri == null) return null;
608  URL nnWebUi = new URL(nnUri.toURL(), "/dfshealth.jsp");
609  URLConnection conn = nnWebUi.openConnection();
610  BufferedReader in = new BufferedReader(
611  new InputStreamReader(conn.getInputStream()));
612  String inputLine;
613  while ((inputLine = in.readLine()) != null) {
614  if (inputLine.contains("Version:")) {
615  // Parse the version string cdh<major>.<minor>
616  Pattern cdhVersionPattern = Pattern.compile("cdh\\d\\.\\d");
617  Matcher versionMatcher = cdhVersionPattern.matcher(inputLine);
618  if (versionMatcher.find()) {
619  // Strip out "cdh" before passing to CdhVersion
620  return new CdhVersion(versionMatcher.group().substring(3));
621  }
622  return null;
623  }
624  }
625  } catch (Exception e) {
626  LOG.info(e.toString());
627  }
628  return null;
629  }
630 
637  private URI getCurrentNameNodeAddress() throws Exception {
638  // get the filesystem object to verify it is an HDFS system
639  FileSystem fs;
640  fs = FileSystem.get(CONF);
641  if (!(fs instanceof DistributedFileSystem)) {
642  LOG.error("FileSystem is " + fs.getUri());
643  return null;
644  }
645  return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), CONF, "http");
646  }
647 
652  private String checkShortCircuitRead(Configuration conf) {
653  StringBuilder output = new StringBuilder();
654  String errorMessage = "ERROR: short-circuit local reads is disabled because\n";
655  String prefix = " - ";
656  StringBuilder errorCause = new StringBuilder();
657 
658  // dfs.domain.socket.path must be set properly
659  String domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
660  DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
661  if (domainSocketPath.isEmpty()) {
662  errorCause.append(prefix);
663  errorCause.append(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY);
664  errorCause.append(" is not configured.\n");
665  } else {
666  // The socket path parent directory must be readable and executable.
667  File socketFile = new File(domainSocketPath);
668  File socketDir = socketFile.getParentFile();
669  if (socketDir == null || !socketDir.canRead() || !socketDir.canExecute()) {
670  errorCause.append(prefix);
671  errorCause.append("Impala cannot read or execute the parent directory of ");
672  errorCause.append(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY);
673  errorCause.append("\n");
674  }
675  }
676 
677  // dfs.client.read.shortcircuit must be set to true.
678  if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
679  DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
680  errorCause.append(prefix);
681  errorCause.append(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY);
682  errorCause.append(" is not enabled.\n");
683  }
684 
685  // dfs.client.use.legacy.blockreader.local must be set to false
686  if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
687  DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT)) {
688  errorCause.append(prefix);
689  errorCause.append(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL);
690  errorCause.append(" should not be enabled.\n");
691  }
692 
693  if (errorCause.length() > 0) {
694  output.append(errorMessage);
695  output.append(errorCause);
696  }
697 
698  return output.toString();
699  }
700 
706  private String checkShortCircuitReadCdh41(Configuration conf) {
707  StringBuilder output = new StringBuilder();
708  String errorMessage = "ERROR: short-circuit local reads is disabled because\n";
709  String prefix = " - ";
710  StringBuilder errorCause = new StringBuilder();
711 
712  // Client side checks
713  // dfs.client.read.shortcircuit must be set to true.
714  if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
715  DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
716  errorCause.append(prefix);
717  errorCause.append(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY);
718  errorCause.append(" is not enabled.\n");
719  }
720 
721  // dfs.client.use.legacy.blockreader.local must be set to true
722  if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
723  DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT)) {
724  errorCause.append(prefix);
725  errorCause.append(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL);
726  errorCause.append(" is not enabled.\n");
727  }
728 
729  // Server side checks
730  // Check data node server side configuration by reading the CONF from the data node
731  // web UI
732  // TODO: disabled for now
733  //cdh41ShortCircuitReadDatanodeCheck(errorCause, prefix);
734 
735  if (errorCause.length() > 0) {
736  output.append(errorMessage);
737  output.append(errorCause);
738  }
739 
740  return output.toString();
741  }
742 
749  private void cdh41ShortCircuitReadDatanodeCheck(StringBuilder errorCause,
750  String prefix) {
751  String dnWebUiAddr = CONF.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
752  DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT);
753  URL dnWebUiUrl = null;
754  try {
755  dnWebUiUrl = new URL("http://" + dnWebUiAddr + "/conf");
756  } catch (Exception e) {
757  LOG.info(e.toString());
758  }
759  Configuration dnConf = new Configuration(false);
760  dnConf.addResource(dnWebUiUrl);
761 
762  // dfs.datanode.data.dir.perm should be at least 750
763  int permissionInt = 0;
764  try {
765  String permission = dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
766  DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT);
767  permissionInt = Integer.parseInt(permission);
768  } catch (Exception e) {
769  }
770  if (permissionInt < 750) {
771  errorCause.append(prefix);
772  errorCause.append("Data node configuration ");
773  errorCause.append(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY);
774  errorCause.append(" is not properly set. It should be set to 750.\n");
775  }
776 
777  // dfs.block.local-path-access.user should contain the user account impala is running
778  // under
779  String accessUser = dnConf.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
780  if (accessUser == null || !accessUser.contains(System.getProperty("user.name"))) {
781  errorCause.append(prefix);
782  errorCause.append("Data node configuration ");
783  errorCause.append(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
784  errorCause.append(" is not properly set. It should contain ");
785  errorCause.append(System.getProperty("user.name"));
786  errorCause.append("\n");
787  }
788  }
789 
794  private String checkBlockLocationTracking(Configuration conf) {
795  StringBuilder output = new StringBuilder();
796  String errorMessage = "ERROR: block location tracking is not properly enabled " +
797  "because\n";
798  String prefix = " - ";
799  StringBuilder errorCause = new StringBuilder();
800  if (!conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
801  DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT)) {
802  errorCause.append(prefix);
803  errorCause.append(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED);
804  errorCause.append(" is not enabled.\n");
805  }
806 
807  // dfs.client.file-block-storage-locations.timeout.millis should be >= 10 seconds
808  int dfsClientFileBlockStorageLocationsTimeoutMs = conf.getInt(
809  DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
810  DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
811  if (dfsClientFileBlockStorageLocationsTimeoutMs <
813  errorCause.append(prefix);
814  errorCause.append(DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS);
815  errorCause.append(" is too low. It should be at least 10 seconds.\n");
816  }
817 
818  if (errorCause.length() > 0) {
819  output.append(errorMessage);
820  output.append(errorCause);
821  }
822 
823  return output.toString();
824  }
825 
831  private String checkFileSystem(Configuration conf) {
832  try {
833  FileSystem fs = FileSystem.get(CONF);
834  if (!(fs instanceof DistributedFileSystem)) {
835  return "Unsupported default filesystem. The default filesystem must be " +
836  "a DistributedFileSystem but the configured default filesystem is " +
837  fs.getClass().getSimpleName() + ". " +
838  CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY +
839  " (" + CONF.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) + ")" +
840  " might be set incorrectly.";
841  }
842  } catch (IOException e) {
843  return "couldn't retrieve FileSystem:\n" + e.getMessage();
844  }
845 
846  try {
847  FileSystemUtil.getTotalNumVisibleFiles(new Path("/"));
848  } catch (IOException e) {
849  return "Could not read the HDFS root directory at " +
850  CONF.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
851  ". Error was: \n" + e.getMessage();
852  }
853  return "";
854  }
855 }
byte[] getCatalogObject(byte[] thriftParams)
JniFrontend(boolean lazy, String serverName, String authorizationPolicyFile, String sentryConfigFile, String authPolicyProviderClass, int impalaLogLevel, int otherLogLevel)
static final TBinaryProtocol.Factory protocolFactory_
String showCreateTable(byte[] thriftTableName)
byte[] getRoles(byte[] showRolesParams)
byte[] execHiveServer2MetadataOp(byte[] metadataOpsParams)
byte[] getDataSrcMetadata(byte[] thriftParams)
byte[] getRolePrivileges(byte[] showGrantRolesParams)
String checkFileSystem(Configuration conf)
byte[] getFunctions(byte[] thriftGetFunctionsParams)
static final long MIN_DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS
byte[] createExecRequest(byte[] thriftQueryContext)
byte[] updateCatalogCache(byte[] thriftCatalogUpdate)
byte[] describeTable(byte[] thriftDescribeTableParams)
byte[] getHadoopConfig(byte[] serializedRequest)
byte[] getStats(byte[] thriftShowStatsParams)
String checkBlockLocationTracking(Configuration conf)
String checkShortCircuitRead(Configuration conf)
byte[] getTableFiles(byte[] thriftShowFilesParams)
static final Configuration CONF
byte[] loadTableData(byte[] thriftLoadTableDataParams)
String getExplainPlan(byte[] thriftQueryContext)
byte[] getTableNames(byte[] thriftGetTablesParams)
String checkShortCircuitReadCdh41(Configuration conf)
byte[] getDbNames(byte[] thriftGetTablesParams)
static String getEffectiveUser(TSessionState session)
void cdh41ShortCircuitReadDatanodeCheck(StringBuilder errorCause, String prefix)