15 package com.cloudera.impala.service;
17 import java.io.BufferedReader;
19 import java.io.IOException;
20 import java.io.InputStreamReader;
23 import java.net.URLConnection;
24 import java.util.Collections;
25 import java.util.Enumeration;
26 import java.util.List;
28 import java.util.Random;
30 import java.util.regex.Matcher;
31 import java.util.regex.Pattern;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hdfs.DFSConfigKeys;
38 import org.apache.hadoop.hdfs.DFSUtil;
39 import org.apache.hadoop.hdfs.DistributedFileSystem;
40 import org.apache.hadoop.hdfs.HAUtil;
41 import org.apache.log4j.Appender;
42 import org.apache.log4j.FileAppender;
43 import org.apache.thrift.TException;
44 import org.apache.thrift.TSerializer;
45 import org.apache.thrift.protocol.TBinaryProtocol;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
60 import com.cloudera.impala.thrift.TCatalogObject;
61 import com.cloudera.impala.thrift.TDescribeTableParams;
62 import com.cloudera.impala.thrift.TDescribeTableResult;
63 import com.cloudera.impala.thrift.TExecRequest;
64 import com.cloudera.impala.thrift.TGetAllHadoopConfigsResponse;
65 import com.cloudera.impala.thrift.TGetDataSrcsParams;
66 import com.cloudera.impala.thrift.TGetDataSrcsResult;
67 import com.cloudera.impala.thrift.TGetDbsParams;
68 import com.cloudera.impala.thrift.TGetDbsResult;
69 import com.cloudera.impala.thrift.TGetFunctionsParams;
70 import com.cloudera.impala.thrift.TGetFunctionsResult;
71 import com.cloudera.impala.thrift.TGetHadoopConfigRequest;
72 import com.cloudera.impala.thrift.TGetHadoopConfigResponse;
73 import com.cloudera.impala.thrift.TGetTablesParams;
74 import com.cloudera.impala.thrift.TGetTablesResult;
75 import com.cloudera.impala.thrift.TLoadDataReq;
76 import com.cloudera.impala.thrift.TLoadDataResp;
77 import com.cloudera.impala.thrift.TLogLevel;
78 import com.cloudera.impala.thrift.TMetadataOpRequest;
79 import com.cloudera.impala.thrift.TQueryCtx;
80 import com.cloudera.impala.thrift.TResultSet;
81 import com.cloudera.impala.thrift.TShowFilesParams;
82 import com.cloudera.impala.thrift.TShowGrantRoleParams;
83 import com.cloudera.impala.thrift.TShowRolesParams;
84 import com.cloudera.impala.thrift.TShowRolesResult;
85 import com.cloudera.impala.thrift.TShowStatsParams;
86 import com.cloudera.impala.thrift.TTableName;
87 import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest;
90 import com.google.common.base.Preconditions;
91 import com.google.common.collect.Lists;
92 import com.google.common.collect.Maps;
93 import com.google.common.collect.Sets;
100 private final static Logger
LOG = LoggerFactory.getLogger(JniFrontend.class);
102 new TBinaryProtocol.Factory();
113 public JniFrontend(
boolean lazy, String serverName, String authorizationPolicyFile,
114 String sentryConfigFile, String authPolicyProviderClass,
int impalaLogLevel,
116 GlogAppender.Install(TLogLevel.values()[impalaLogLevel],
117 TLogLevel.values()[otherLogLevel]);
122 authorizationPolicyFile, sentryConfigFile, authPolicyProviderClass);
123 authConfig.validateConfig();
124 if (authConfig.isEnabled()) {
125 LOG.info(String.format(
"Authorization is 'ENABLED' using %s",
126 authConfig.isFileBasedPolicy() ?
" file based policy from: " +
127 authConfig.getPolicyFile() :
" using Sentry Policy Service."));
129 LOG.info(
"Authorization is 'DISABLED'.");
131 LOG.info(JniUtil.getJavaVersion());
142 TQueryCtx queryCtx =
new TQueryCtx();
145 StringBuilder explainString =
new StringBuilder();
146 TExecRequest result = frontend_.createExecRequest(queryCtx, explainString);
147 if (explainString.length() > 0)
LOG.debug(explainString.toString());
150 TSerializer serializer =
new TSerializer(protocolFactory_);
152 return serializer.serialize(result);
153 }
catch (TException e) {
159 TUpdateCatalogCacheRequest req =
new TUpdateCatalogCacheRequest();
161 TSerializer serializer =
new TSerializer(protocolFactory_);
163 return serializer.serialize(frontend_.updateCatalogCache(req));
164 }
catch (TException e) {
177 TLoadDataReq request =
new TLoadDataReq();
178 JniUtil.deserializeThrift(
protocolFactory_, request, thriftLoadTableDataParams);
179 TLoadDataResp response = frontend_.loadTableData(request);
180 TSerializer serializer =
new TSerializer(protocolFactory_);
182 return serializer.serialize(response);
183 }
catch (TException e) {
193 TQueryCtx queryCtx =
new TQueryCtx();
195 String plan = frontend_.getExplainString(queryCtx);
196 LOG.debug(
"Explain plan: " + plan);
208 TGetTablesParams params =
new TGetTablesParams();
211 User user = params.isSetSession() ?
213 ImpalaInternalAdminUser.getInstance();
215 Preconditions.checkState(!params.isSetSession() || user != null );
216 List<String> tables = frontend_.getTableNames(params.db, params.pattern, user);
218 TGetTablesResult result =
new TGetTablesResult();
219 result.setTables(tables);
221 TSerializer serializer =
new TSerializer(protocolFactory_);
223 return serializer.serialize(result);
224 }
catch (TException e) {
236 TShowFilesParams params =
new TShowFilesParams();
238 TResultSet result = frontend_.getTableFiles(params);
240 TSerializer serializer =
new TSerializer(protocolFactory_);
242 return serializer.serialize(result);
243 }
catch (TException e) {
255 TGetDbsParams params =
new TGetDbsParams();
258 User user = params.isSetSession() ?
260 ImpalaInternalAdminUser.getInstance();
261 List<String> dbs = frontend_.getDbNames(params.pattern, user);
263 TGetDbsResult result =
new TGetDbsResult();
266 TSerializer serializer =
new TSerializer(protocolFactory_);
268 return serializer.serialize(result);
269 }
catch (TException e) {
281 TGetDataSrcsParams params =
new TGetDataSrcsParams();
284 TGetDataSrcsResult result =
new TGetDataSrcsResult();
285 List<DataSource> dataSources = frontend_.getDataSrcs(params.pattern);
286 result.setData_src_names(Lists.<String>newArrayListWithCapacity(dataSources.size()));
287 result.setLocations(Lists.<String>newArrayListWithCapacity(dataSources.size()));
288 result.setClass_names(Lists.<String>newArrayListWithCapacity(dataSources.size()));
289 result.setApi_versions(Lists.<String>newArrayListWithCapacity(dataSources.size()));
291 result.addToData_src_names(dataSource.getName());
292 result.addToLocations(dataSource.getLocation());
293 result.addToClass_names(dataSource.getClassName());
294 result.addToApi_versions(dataSource.getApiVersion());
296 TSerializer serializer =
new TSerializer(protocolFactory_);
298 return serializer.serialize(result);
299 }
catch (TException e) {
305 TShowStatsParams params =
new TShowStatsParams();
307 Preconditions.checkState(params.isSetTable_name());
309 if (params.isIs_show_col_stats()) {
310 result = frontend_.getColumnStats(params.getTable_name().getDb_name(),
311 params.getTable_name().getTable_name());
313 result = frontend_.getTableStats(params.getTable_name().getDb_name(),
314 params.getTable_name().getTable_name());
316 TSerializer serializer =
new TSerializer(protocolFactory_);
318 return serializer.serialize(result);
319 }
catch (TException e) {
331 TGetFunctionsParams params =
new TGetFunctionsParams();
332 JniUtil.deserializeThrift(
protocolFactory_, params, thriftGetFunctionsParams);
334 TGetFunctionsResult result =
new TGetFunctionsResult();
335 List<String> signatures = Lists.newArrayList();
336 List<String> retTypes = Lists.newArrayList();
337 List<Function> fns = frontend_.getFunctions(params.category, params.db, params.pattern);
339 signatures.add(fn.signatureString());
340 retTypes.add(fn.getReturnType().toString());
342 result.setFn_signatures(signatures);
343 result.setFn_ret_types(retTypes);
344 TSerializer serializer =
new TSerializer(protocolFactory_);
346 return serializer.serialize(result);
347 }
catch (TException e) {
357 TCatalogObject objectDescription =
new TCatalogObject();
358 JniUtil.deserializeThrift(
protocolFactory_, objectDescription, thriftParams);
359 TSerializer serializer =
new TSerializer(protocolFactory_);
360 return serializer.serialize(
361 frontend_.getCatalog().getTCatalogObject(objectDescription));
371 TDescribeTableParams params =
new TDescribeTableParams();
372 JniUtil.deserializeThrift(
protocolFactory_, params, thriftDescribeTableParams);
374 TDescribeTableResult result = frontend_.describeTable(
375 params.getDb(), params.getTable_name(), params.getOutput_style());
377 TSerializer serializer =
new TSerializer(protocolFactory_);
379 return serializer.serialize(result);
380 }
catch (TException e) {
390 TTableName params =
new TTableName();
392 return ToSqlUtils.getCreateTableSql(frontend_.getCatalog().getTable(
393 params.getDb_name(), params.getTable_name()));
400 TShowRolesParams params =
new TShowRolesParams();
402 TShowRolesResult result =
new TShowRolesResult();
404 List<Role> roles = Lists.newArrayList();
405 if (params.isIs_show_current_roles() || params.isSetGrant_group()) {
406 User user =
new User(params.getRequesting_user());
407 Set<String> groupNames;
408 if (params.isIs_show_current_roles()) {
409 groupNames = frontend_.getAuthzChecker().getUserGroups(user);
411 Preconditions.checkState(params.isSetGrant_group());
412 groupNames = Sets.newHashSet(params.getGrant_group());
414 for (String groupName: groupNames) {
415 roles.addAll(frontend_.getCatalog().getAuthPolicy().getGrantedRoles(groupName));
418 Preconditions.checkState(!params.isIs_show_current_roles());
419 roles = frontend_.getCatalog().getAuthPolicy().getAllRoles();
422 result.setRole_names(Lists.<String>newArrayListWithExpectedSize(roles.size()));
423 for (
Role role: roles) {
424 result.getRole_names().add(role.getName());
427 Collections.sort(result.getRole_names());
428 TSerializer serializer =
new TSerializer(protocolFactory_);
430 return serializer.serialize(result);
431 }
catch (TException e) {
437 TShowGrantRoleParams params =
new TShowGrantRoleParams();
439 TResultSet result = frontend_.getCatalog().getAuthPolicy().getRolePrivileges(
440 params.getRole_name(), params.getPrivilege());
441 TSerializer serializer =
new TSerializer(protocolFactory_);
443 return serializer.serialize(result);
444 }
catch (TException e) {
454 TMetadataOpRequest params =
new TMetadataOpRequest();
456 TResultSet result = frontend_.execHiveServer2MetadataOp(params);
458 TSerializer serializer =
new TSerializer(protocolFactory_);
460 return serializer.serialize(result);
461 }
catch (TException e) {
467 frontend_.getCatalog().setIsReady();
471 private static final Configuration
CONF =
new Configuration();
478 Map<String, String> configs = Maps.newHashMap();
479 for (Map.Entry<String, String> e:
CONF) {
480 configs.put(e.getKey(), e.getValue());
482 TGetAllHadoopConfigsResponse result =
new TGetAllHadoopConfigsResponse();
483 result.setConfigs(configs);
486 return serializer.serialize(result);
487 }
catch (TException e) {
498 TGetHadoopConfigRequest request =
new TGetHadoopConfigRequest();
500 TGetHadoopConfigResponse result =
new TGetHadoopConfigResponse();
501 result.setValue(CONF.get(request.getName()));
502 TSerializer serializer =
new TSerializer(protocolFactory_);
504 return serializer.serialize(result);
505 }
catch (TException e) {
514 public CdhVersion(String versionString)
throws IllegalArgumentException {
515 String[] version = versionString.split(
"\\.");
516 if (version.length != 2) {
517 throw new IllegalArgumentException(
"Invalid version string:" + versionString);
520 major = Integer.parseInt(version[0]);
521 minor = Integer.parseInt(version[1]);
522 }
catch (NumberFormatException e) {
523 throw new IllegalArgumentException(
"Invalid version string:" + versionString);
547 StringBuilder output =
new StringBuilder();
552 if (guessedCdhVersion == null) {
554 LOG.warn(
"Cannot detect CDH version. Skipping Hadoop configuration checks");
555 return output.toString();
558 if (guessedCdhVersion.
compareTo(cdh41) == 0) {
560 }
else if (guessedCdhVersion.
compareTo(cdh42) >= 0) {
563 output.append(guessedCdhVersion)
564 .append(
" is detected but Impala requires CDH 4.1 or above.");
568 return output.toString();
576 org.apache.log4j.Logger l4jRootLogger = org.apache.log4j.Logger.getRootLogger();
577 Enumeration appenders = l4jRootLogger.getAllAppenders();
578 while (appenders.hasMoreElements()) {
579 Appender appender = (Appender) appenders.nextElement();
580 if (appender instanceof FileAppender) {
581 if (((FileAppender) appender).getFile() == null) {
585 return "Impala does not have permission to write to the log file specified " +
586 "in log4j.properties.";
600 Random randomGenerator =
new Random();
601 Thread.sleep(randomGenerator.nextInt(2000));
602 }
catch (Exception e) {
607 if (nnUri == null)
return null;
608 URL nnWebUi =
new URL(nnUri.toURL(),
"/dfshealth.jsp");
609 URLConnection conn = nnWebUi.openConnection();
610 BufferedReader in =
new BufferedReader(
611 new InputStreamReader(conn.getInputStream()));
613 while ((inputLine = in.readLine()) != null) {
614 if (inputLine.contains(
"Version:")) {
616 Pattern cdhVersionPattern = Pattern.compile(
"cdh\\d\\.\\d");
617 Matcher versionMatcher = cdhVersionPattern.matcher(inputLine);
618 if (versionMatcher.find()) {
620 return new CdhVersion(versionMatcher.group().substring(3));
625 }
catch (Exception e) {
626 LOG.info(e.toString());
640 fs = FileSystem.get(
CONF);
641 if (!(fs instanceof DistributedFileSystem)) {
642 LOG.error(
"FileSystem is " + fs.getUri());
645 return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs),
CONF,
"http");
653 StringBuilder output =
new StringBuilder();
654 String errorMessage =
"ERROR: short-circuit local reads is disabled because\n";
655 String prefix =
" - ";
656 StringBuilder errorCause =
new StringBuilder();
659 String domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
660 DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
661 if (domainSocketPath.isEmpty()) {
662 errorCause.append(prefix);
663 errorCause.append(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY);
664 errorCause.append(
" is not configured.\n");
667 File socketFile =
new File(domainSocketPath);
668 File socketDir = socketFile.getParentFile();
669 if (socketDir == null || !socketDir.canRead() || !socketDir.canExecute()) {
670 errorCause.append(prefix);
671 errorCause.append(
"Impala cannot read or execute the parent directory of ");
672 errorCause.append(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY);
673 errorCause.append(
"\n");
678 if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
679 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
680 errorCause.append(prefix);
681 errorCause.append(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY);
682 errorCause.append(
" is not enabled.\n");
686 if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
687 DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT)) {
688 errorCause.append(prefix);
689 errorCause.append(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL);
690 errorCause.append(
" should not be enabled.\n");
693 if (errorCause.length() > 0) {
694 output.append(errorMessage);
695 output.append(errorCause);
698 return output.toString();
707 StringBuilder output =
new StringBuilder();
708 String errorMessage =
"ERROR: short-circuit local reads is disabled because\n";
709 String prefix =
" - ";
710 StringBuilder errorCause =
new StringBuilder();
714 if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
715 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
716 errorCause.append(prefix);
717 errorCause.append(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY);
718 errorCause.append(
" is not enabled.\n");
722 if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
723 DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT)) {
724 errorCause.append(prefix);
725 errorCause.append(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL);
726 errorCause.append(
" is not enabled.\n");
735 if (errorCause.length() > 0) {
736 output.append(errorMessage);
737 output.append(errorCause);
740 return output.toString();
751 String dnWebUiAddr = CONF.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
752 DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT);
753 URL dnWebUiUrl = null;
755 dnWebUiUrl =
new URL(
"http://" + dnWebUiAddr +
"/conf");
756 }
catch (Exception e) {
757 LOG.info(e.toString());
759 Configuration dnConf =
new Configuration(
false);
760 dnConf.addResource(dnWebUiUrl);
763 int permissionInt = 0;
765 String permission = dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
766 DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT);
767 permissionInt = Integer.parseInt(permission);
768 }
catch (Exception e) {
770 if (permissionInt < 750) {
771 errorCause.append(prefix);
772 errorCause.append(
"Data node configuration ");
773 errorCause.append(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY);
774 errorCause.append(
" is not properly set. It should be set to 750.\n");
779 String accessUser = dnConf.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
780 if (accessUser == null || !accessUser.contains(System.getProperty(
"user.name"))) {
781 errorCause.append(prefix);
782 errorCause.append(
"Data node configuration ");
783 errorCause.append(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
784 errorCause.append(
" is not properly set. It should contain ");
785 errorCause.append(System.getProperty(
"user.name"));
786 errorCause.append(
"\n");
795 StringBuilder output =
new StringBuilder();
796 String errorMessage =
"ERROR: block location tracking is not properly enabled " +
798 String prefix =
" - ";
799 StringBuilder errorCause =
new StringBuilder();
800 if (!conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
801 DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT)) {
802 errorCause.append(prefix);
803 errorCause.append(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED);
804 errorCause.append(
" is not enabled.\n");
808 int dfsClientFileBlockStorageLocationsTimeoutMs = conf.getInt(
809 DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
810 DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
811 if (dfsClientFileBlockStorageLocationsTimeoutMs <
813 errorCause.append(prefix);
814 errorCause.append(DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS);
815 errorCause.append(
" is too low. It should be at least 10 seconds.\n");
818 if (errorCause.length() > 0) {
819 output.append(errorMessage);
820 output.append(errorCause);
823 return output.toString();
833 FileSystem fs = FileSystem.get(
CONF);
834 if (!(fs instanceof DistributedFileSystem)) {
835 return "Unsupported default filesystem. The default filesystem must be " +
836 "a DistributedFileSystem but the configured default filesystem is " +
837 fs.getClass().getSimpleName() +
". " +
838 CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY +
839 " (" + CONF.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
")" +
840 " might be set incorrectly.";
842 }
catch (IOException e) {
843 return "couldn't retrieve FileSystem:\n" + e.getMessage();
847 FileSystemUtil.getTotalNumVisibleFiles(
new Path(
"/"));
848 }
catch (IOException e) {
849 return "Could not read the HDFS root directory at " +
850 CONF.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
851 ". Error was: \n" + e.getMessage();
String checkLogFilePermission()
int compareTo(CdhVersion o)
String checkConfiguration()
byte[] getCatalogObject(byte[] thriftParams)
JniFrontend(boolean lazy, String serverName, String authorizationPolicyFile, String sentryConfigFile, String authPolicyProviderClass, int impalaLogLevel, int otherLogLevel)
static final TBinaryProtocol.Factory protocolFactory_
URI getCurrentNameNodeAddress()
String showCreateTable(byte[] thriftTableName)
byte[] getRoles(byte[] showRolesParams)
byte[] execHiveServer2MetadataOp(byte[] metadataOpsParams)
byte[] getDataSrcMetadata(byte[] thriftParams)
byte[] getRolePrivileges(byte[] showGrantRolesParams)
String checkFileSystem(Configuration conf)
CdhVersion(String versionString)
byte[] getFunctions(byte[] thriftGetFunctionsParams)
static final long MIN_DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS
byte[] getAllHadoopConfigs()
byte[] createExecRequest(byte[] thriftQueryContext)
byte[] updateCatalogCache(byte[] thriftCatalogUpdate)
byte[] describeTable(byte[] thriftDescribeTableParams)
byte[] getHadoopConfig(byte[] serializedRequest)
byte[] getStats(byte[] thriftShowStatsParams)
String checkBlockLocationTracking(Configuration conf)
String checkShortCircuitRead(Configuration conf)
byte[] getTableFiles(byte[] thriftShowFilesParams)
void setCatalogInitialized()
static final Configuration CONF
byte[] loadTableData(byte[] thriftLoadTableDataParams)
CdhVersion guessCdhVersionFromNnWebUi()
String getExplainPlan(byte[] thriftQueryContext)
byte[] getTableNames(byte[] thriftGetTablesParams)
String checkShortCircuitReadCdh41(Configuration conf)
byte[] getDbNames(byte[] thriftGetTablesParams)
static String getEffectiveUser(TSessionState session)
void cdh41ShortCircuitReadDatanodeCheck(StringBuilder errorCause, String prefix)