15 package com.cloudera.impala.util;
18 import java.io.IOException;
19 import java.net.MalformedURLException;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.concurrent.atomic.AtomicReference;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.security.UserGroupInformation;
26 import org.apache.hadoop.yarn.api.records.QueueACL;
27 import org.apache.hadoop.yarn.conf.YarnConfiguration;
28 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
29 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
30 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
31 import org.apache.thrift.TException;
32 import org.apache.thrift.TSerializer;
33 import org.apache.thrift.protocol.TBinaryProtocol;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
42 import com.cloudera.impala.thrift.TErrorCode;
43 import com.cloudera.impala.thrift.TPoolConfigParams;
44 import com.cloudera.impala.thrift.TPoolConfigResult;
45 import com.cloudera.impala.thrift.TResolveRequestPoolParams;
46 import com.cloudera.impala.thrift.TResolveRequestPoolResult;
47 import com.cloudera.impala.thrift.TStatus;
49 import com.google.common.annotations.VisibleForTesting;
50 import com.google.common.base.Preconditions;
51 import com.google.common.base.Strings;
52 import com.google.common.collect.Lists;
76 final static Logger
LOG = LoggerFactory.getLogger(RequestPoolService.class);
79 new TBinaryProtocol.Factory();
87 "llama.am.throttling.maximum.placed.reservations";
97 "llama.am.throttling.maximum.queued.reservations";
137 LOG.info(
"Loading Llama configuration: " + llamaConfUrl_.getFile());
138 Configuration conf =
new Configuration();
152 Preconditions.checkNotNull(fsAllocationPath);
153 running_ =
new AtomicBoolean(
false);
155 URL fsAllocationURL =
getURL(fsAllocationPath);
156 if (fsAllocationURL == null) {
157 throw new IllegalArgumentException(
158 "Unable to find allocation configuration file: " + fsAllocationPath);
160 Configuration allocConf =
new Configuration(
false);
161 allocConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocationURL.getPath());
163 allocLoader_.init(allocConf);
165 if (!Strings.isNullOrEmpty(llamaSitePath)) {
168 throw new IllegalArgumentException(
169 "Unable to find Llama configuration file: " + llamaSitePath);
186 Preconditions.checkNotNull(
path);
187 File file =
new File(path);
188 file = file.getAbsoluteFile();
189 if (!file.exists()) {
190 LOG.error(
"Unable to find specified file: " +
path);
194 return file.toURI().toURL();
195 }
catch (MalformedURLException ex) {
196 LOG.error(
"Unable to construct URL for file: " +
path, ex);
206 Preconditions.checkState(!running_.get());
207 allocLoader_.setReloadListener(
new AllocationFileLoaderService.Listener() {
209 public void onReload(AllocationConfiguration info) {
210 allocationConf_.set(info);
213 allocLoader_.start();
215 allocLoader_.reloadAllocations();
216 }
catch (Exception ex) {
219 }
catch (Exception stopEx) {
220 LOG.error(
"Unable to stop AllocationFileLoaderService after failed start.",
223 throw new RuntimeException(ex);
233 Preconditions.checkState(running_.get());
257 TResolveRequestPoolParams resolvePoolParams =
new TResolveRequestPoolParams();
259 thriftResolvePoolParams);
261 LOG.trace(
"resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}",
262 new Object[] { resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(),
263 result.resolved_pool, result.has_access });
265 return new TSerializer(protocolFactory_).serialize(result);
266 }
catch (TException e) {
273 TResolveRequestPoolParams resolvePoolParams) {
274 String requestedPool = resolvePoolParams.getRequested_pool();
275 String user = resolvePoolParams.getUser();
276 TResolveRequestPoolResult result =
new TResolveRequestPoolResult();
277 String errorMessage = null;
281 }
catch (IOException ex) {
282 errorMessage = ex.getMessage();
283 if (errorMessage.startsWith(
"No groups found for user")) {
286 errorMessage = String.format(
287 "Failed to resolve user '%s' to a pool while evaluating the " +
288 "'primaryGroup' or 'secondaryGroup' queue placement rules because no " +
289 "groups were found for the user. This is likely because the user does not " +
290 "exist on the local operating system.", resolvePoolParams.getUser());
292 LOG.warn(String.format(
"Error assigning to pool. requested='%s', user='%s', msg=%s",
293 requestedPool, user, errorMessage), ex);
296 if (errorMessage == null) {
299 result.setStatus(
new TStatus(TErrorCode.OK, Lists.<String>newArrayList()));
303 new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(errorMessage)));
306 result.setResolved_pool(
pool);
307 result.setHas_access(
hasAccess(pool, user));
308 result.setStatus(
new TStatus(TErrorCode.OK, Lists.<String>newArrayList()));
320 Preconditions.checkState(running_.get());
321 TPoolConfigParams poolConfigParams =
new TPoolConfigParams();
323 thriftPoolConfigParams);
324 TPoolConfigResult result =
getPoolConfig(poolConfigParams.getPool());
326 return new TSerializer(protocolFactory_).serialize(result);
327 }
catch (TException e) {
334 TPoolConfigResult result =
new TPoolConfigResult();
335 int maxMemoryMb = allocationConf_.get().getMaxResources(pool).getMemory();
337 maxMemoryMb == Integer.MAX_VALUE ? -1 : (long) maxMemoryMb *
ByteUnits.
MEGABYTE);
351 LOG.trace(
"getPoolConfig(pool={}): mem_limit={}, max_requests={}, max_queued={}",
352 new Object[] {
pool, result.mem_limit, result.max_requests, result.max_queued });
366 conf.getInt(key, defaultValue));
381 Preconditions.checkState(running_.get());
382 Preconditions.checkNotNull(requestedPool);
383 Preconditions.checkArgument(!Strings.isNullOrEmpty(user));
387 String shortName =
new User(user).getShortName();
388 return allocationConf_.get().getPlacementPolicy().assignAppToQueue(
389 requestedPool.isEmpty() ? YarnConfiguration.DEFAULT_QUEUE_NAME : requestedPool,
403 Preconditions.checkState(running_.get());
404 Preconditions.checkArgument(!Strings.isNullOrEmpty(
pool));
405 Preconditions.checkArgument(!Strings.isNullOrEmpty(user));
409 String shortName =
new User(user).getShortName();
410 UserGroupInformation ugi = UserGroupInformation.createRemoteUser(shortName);
411 return allocationConf_.get().
hasAccess(pool, QueueACL.SUBMIT_APPLICATIONS, ugi);
final AllocationFileLoaderService allocLoader_
static final int LLAMA_MAX_QUEUED_RESERVATIONS_DEFAULT
static final String LLAMA_PER_POOL_CONFIG_KEY_FORMAT
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
byte[] resolveRequestPool(byte[] thriftResolvePoolParams)
TResolveRequestPoolResult resolveRequestPool(TResolveRequestPoolParams resolvePoolParams)
int getLlamaPoolConfigValue(Configuration conf, String pool, String key, int defaultValue)
RequestPoolService(final String fsAllocationPath, final String llamaSitePath)
byte[] getPoolConfig(byte[] thriftPoolConfigParams)
final AtomicBoolean running_
static final String LLAMA_MAX_PLACED_RESERVATIONS_KEY
static URL getURL(String path)
final AtomicReference< AllocationConfiguration > allocationConf_
boolean hasAccess(String pool, String user)
static final long MEGABYTE
volatile Configuration llamaConf_
final FileWatchService llamaConfWatcher_
static final int LLAMA_MAX_PLACED_RESERVATIONS_DEFAULT
TPoolConfigResult getPoolConfig(String pool)
String assignToPool(String requestedPool, String user)
static final String LLAMA_MAX_QUEUED_RESERVATIONS_KEY
static final TBinaryProtocol.Factory protocolFactory_