Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
RequestPoolService.java
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.util;
16 
17 import java.io.File;
18 import java.io.IOException;
19 import java.net.MalformedURLException;
20 import java.net.URL;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.concurrent.atomic.AtomicReference;
23 
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.security.UserGroupInformation;
26 import org.apache.hadoop.yarn.api.records.QueueACL;
27 import org.apache.hadoop.yarn.conf.YarnConfiguration;
28 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
29 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
30 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
31 import org.apache.thrift.TException;
32 import org.apache.thrift.TSerializer;
33 import org.apache.thrift.protocol.TBinaryProtocol;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 
42 import com.cloudera.impala.thrift.TErrorCode;
43 import com.cloudera.impala.thrift.TPoolConfigParams;
44 import com.cloudera.impala.thrift.TPoolConfigResult;
45 import com.cloudera.impala.thrift.TResolveRequestPoolParams;
46 import com.cloudera.impala.thrift.TResolveRequestPoolResult;
47 import com.cloudera.impala.thrift.TStatus;
49 import com.google.common.annotations.VisibleForTesting;
50 import com.google.common.base.Preconditions;
51 import com.google.common.base.Strings;
52 import com.google.common.collect.Lists;
53 
75 public class RequestPoolService {
76  final static Logger LOG = LoggerFactory.getLogger(RequestPoolService.class);
77 
78  private final static TBinaryProtocol.Factory protocolFactory_ =
79  new TBinaryProtocol.Factory();
80  // Used to ensure start() has been called before any other methods can be used.
81  private final AtomicBoolean running_;
82 
83  // Key for the default maximum number of running queries ("placed reservations")
84  // property. The per-pool key name is this key with the pool name appended, e.g.
85  // "{key}.{pool}".
86  final static String LLAMA_MAX_PLACED_RESERVATIONS_KEY =
87  "llama.am.throttling.maximum.placed.reservations";
88 
89  // Default value for the maximum.placed.reservations property. Note that this value
90  // differs from the current Llama default of 10000 which is too high.
91  final static int LLAMA_MAX_PLACED_RESERVATIONS_DEFAULT = 200;
92 
93  // Key for the default maximum number of queued requests ("queued reservations")
94  // property. The per-pool key name is this key with the pool name appended, e.g.
95  // "{key}.{pool}".
96  final static String LLAMA_MAX_QUEUED_RESERVATIONS_KEY =
97  "llama.am.throttling.maximum.queued.reservations";
98 
99  // Default value for the maximum.queued.reservations property. Note that this value
100  // differs from the current Llama default of 0 which disables queuing.
102 
103  // String format for a per-pool configuration key. First parameter is the key for the
104  // default, e.g. LLAMA_MAX_PLACED_RESERVATIONS_KEY, and the second parameter is the
105  // pool name.
106  final static String LLAMA_PER_POOL_CONFIG_KEY_FORMAT = "%s.%s";
107 
108  // Watches for changes to the fair scheduler allocation file.
109  @VisibleForTesting
110  final AllocationFileLoaderService allocLoader_;
111 
112  // Provides access to the fair scheduler allocation file. An AtomicReference becaus it
113  // is reset when the allocation configuration file changes and other threads access it.
114  private final AtomicReference<AllocationConfiguration> allocationConf_;
115 
116  // Watches the Llama configuration file for changes.
117  @VisibleForTesting
119 
120  // Used by this class to access to the configs provided by the Llama configuration.
121  // This is replaced when the Llama configuration file changes.
122  private volatile Configuration llamaConf_;
123 
124  // URL of the Llama configuration file.
125  private final URL llamaConfUrl_;
126 
133  private final class LlamaConfWatcher implements FileChangeListener {
134  public void onFileChange() {
135  // If llamaConfUrl_ is null the watcher should not have been created.
136  Preconditions.checkNotNull(llamaConfUrl_);
137  LOG.info("Loading Llama configuration: " + llamaConfUrl_.getFile());
138  Configuration conf = new Configuration();
139  conf.addResource(llamaConfUrl_);
140  llamaConf_ = conf;
141  }
142  }
143 
151  public RequestPoolService(final String fsAllocationPath, final String llamaSitePath) {
152  Preconditions.checkNotNull(fsAllocationPath);
153  running_ = new AtomicBoolean(false);
154  allocationConf_ = new AtomicReference<AllocationConfiguration>();
155  URL fsAllocationURL = getURL(fsAllocationPath);
156  if (fsAllocationURL == null) {
157  throw new IllegalArgumentException(
158  "Unable to find allocation configuration file: " + fsAllocationPath);
159  }
160  Configuration allocConf = new Configuration(false);
161  allocConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocationURL.getPath());
162  allocLoader_ = new AllocationFileLoaderService();
163  allocLoader_.init(allocConf);
164 
165  if (!Strings.isNullOrEmpty(llamaSitePath)) {
166  llamaConfUrl_ = getURL(llamaSitePath);
167  if (llamaConfUrl_ == null) {
168  throw new IllegalArgumentException(
169  "Unable to find Llama configuration file: " + llamaSitePath);
170  }
171  llamaConf_ = new Configuration(false);
172  llamaConf_.addResource(llamaConfUrl_);
173  llamaConfWatcher_ = new FileWatchService(new File(llamaConfUrl_.getPath()),
174  new LlamaConfWatcher());
175  } else {
176  llamaConfWatcher_ = null;
177  llamaConfUrl_ = null;
178  }
179  }
180 
184  @VisibleForTesting
185  static URL getURL(String path) {
186  Preconditions.checkNotNull(path);
187  File file = new File(path);
188  file = file.getAbsoluteFile();
189  if (!file.exists()) {
190  LOG.error("Unable to find specified file: " + path);
191  return null;
192  }
193  try {
194  return file.toURI().toURL();
195  } catch (MalformedURLException ex) {
196  LOG.error("Unable to construct URL for file: " + path, ex);
197  return null;
198  }
199  }
200 
205  public void start() {
206  Preconditions.checkState(!running_.get());
207  allocLoader_.setReloadListener(new AllocationFileLoaderService.Listener() {
208  @Override
209  public void onReload(AllocationConfiguration info) {
210  allocationConf_.set(info);
211  }
212  });
213  allocLoader_.start();
214  try {
215  allocLoader_.reloadAllocations();
216  } catch (Exception ex) {
217  try {
218  stopInternal();
219  } catch (Exception stopEx) {
220  LOG.error("Unable to stop AllocationFileLoaderService after failed start.",
221  stopEx);
222  }
223  throw new RuntimeException(ex);
224  }
225  if (llamaConfWatcher_ != null) llamaConfWatcher_.start();
226  running_.set(true);
227  }
228 
232  public void stop() {
233  Preconditions.checkState(running_.get());
234  stopInternal();
235  }
236 
242  private void stopInternal() {
243  running_.set(false);
244  if (llamaConfWatcher_ != null) llamaConfWatcher_.stop();
245  allocLoader_.stop();
246  }
247 
255  public byte[] resolveRequestPool(byte[] thriftResolvePoolParams)
256  throws ImpalaException {
257  TResolveRequestPoolParams resolvePoolParams = new TResolveRequestPoolParams();
258  JniUtil.deserializeThrift(protocolFactory_, resolvePoolParams,
259  thriftResolvePoolParams);
260  TResolveRequestPoolResult result = resolveRequestPool(resolvePoolParams);
261  LOG.trace("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}",
262  new Object[] { resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(),
263  result.resolved_pool, result.has_access });
264  try {
265  return new TSerializer(protocolFactory_).serialize(result);
266  } catch (TException e) {
267  throw new InternalException(e.getMessage());
268  }
269  }
270 
271  @VisibleForTesting
272  TResolveRequestPoolResult resolveRequestPool(
273  TResolveRequestPoolParams resolvePoolParams) {
274  String requestedPool = resolvePoolParams.getRequested_pool();
275  String user = resolvePoolParams.getUser();
276  TResolveRequestPoolResult result = new TResolveRequestPoolResult();
277  String errorMessage = null;
278  String pool = null;
279  try {
280  pool = assignToPool(requestedPool, user);
281  } catch (IOException ex) {
282  errorMessage = ex.getMessage();
283  if (errorMessage.startsWith("No groups found for user")) {
284  // The error thrown when using the 'primaryGroup' or 'secondaryGroup' rules and
285  // the user does not exist are not helpful.
286  errorMessage = String.format(
287  "Failed to resolve user '%s' to a pool while evaluating the " +
288  "'primaryGroup' or 'secondaryGroup' queue placement rules because no " +
289  "groups were found for the user. This is likely because the user does not " +
290  "exist on the local operating system.", resolvePoolParams.getUser());
291  }
292  LOG.warn(String.format("Error assigning to pool. requested='%s', user='%s', msg=%s",
293  requestedPool, user, errorMessage), ex);
294  }
295  if (pool == null) {
296  if (errorMessage == null) {
297  // This occurs when assignToPool returns null (not an error), i.e. if the pool
298  // cannot be resolved according to the policy.
299  result.setStatus(new TStatus(TErrorCode.OK, Lists.<String>newArrayList()));
300  } else {
301  // If Yarn throws an exception, return an error status.
302  result.setStatus(
303  new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(errorMessage)));
304  }
305  } else {
306  result.setResolved_pool(pool);
307  result.setHas_access(hasAccess(pool, user));
308  result.setStatus(new TStatus(TErrorCode.OK, Lists.<String>newArrayList()));
309  }
310  return result;
311  }
312 
319  public byte[] getPoolConfig(byte[] thriftPoolConfigParams) throws ImpalaException {
320  Preconditions.checkState(running_.get());
321  TPoolConfigParams poolConfigParams = new TPoolConfigParams();
322  JniUtil.deserializeThrift(protocolFactory_, poolConfigParams,
323  thriftPoolConfigParams);
324  TPoolConfigResult result = getPoolConfig(poolConfigParams.getPool());
325  try {
326  return new TSerializer(protocolFactory_).serialize(result);
327  } catch (TException e) {
328  throw new InternalException(e.getMessage());
329  }
330  }
331 
332  @VisibleForTesting
333  TPoolConfigResult getPoolConfig(String pool) {
334  TPoolConfigResult result = new TPoolConfigResult();
335  int maxMemoryMb = allocationConf_.get().getMaxResources(pool).getMemory();
336  result.setMem_limit(
337  maxMemoryMb == Integer.MAX_VALUE ? -1 : (long) maxMemoryMb * ByteUnits.MEGABYTE);
338  if (llamaConf_ == null) {
339  result.setMax_requests(LLAMA_MAX_PLACED_RESERVATIONS_DEFAULT);
340  result.setMax_queued(LLAMA_MAX_QUEUED_RESERVATIONS_DEFAULT);
341  } else {
342  // Capture the current llamaConf_ in case it changes while we're using it.
343  Configuration currentLlamaConf = llamaConf_;
344  result.setMax_requests(getLlamaPoolConfigValue(currentLlamaConf, pool,
347  result.setMax_queued(getLlamaPoolConfigValue(currentLlamaConf, pool,
350  }
351  LOG.trace("getPoolConfig(pool={}): mem_limit={}, max_requests={}, max_queued={}",
352  new Object[] { pool, result.mem_limit, result.max_requests, result.max_queued });
353  return result;
354  }
355 
363  private int getLlamaPoolConfigValue(Configuration conf, String pool, String key,
364  int defaultValue) {
365  return conf.getInt(String.format(LLAMA_PER_POOL_CONFIG_KEY_FORMAT, key, pool),
366  conf.getInt(key, defaultValue));
367  }
368 
378  @VisibleForTesting
379  String assignToPool(String requestedPool, String user)
380  throws IOException {
381  Preconditions.checkState(running_.get());
382  Preconditions.checkNotNull(requestedPool);
383  Preconditions.checkArgument(!Strings.isNullOrEmpty(user));
384  // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because
385  // assignAppToQueue() will check group membership which should always be done on
386  // the short name of the principal.
387  String shortName = new User(user).getShortName();
388  return allocationConf_.get().getPlacementPolicy().assignAppToQueue(
389  requestedPool.isEmpty() ? YarnConfiguration.DEFAULT_QUEUE_NAME : requestedPool,
390  shortName);
391  }
392 
401  @VisibleForTesting
402  boolean hasAccess(String pool, String user) {
403  Preconditions.checkState(running_.get());
404  Preconditions.checkArgument(!Strings.isNullOrEmpty(pool));
405  Preconditions.checkArgument(!Strings.isNullOrEmpty(user));
406  // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because
407  // the UserGroupInformation will check group membership which should always be done
408  // on the short name of the principal.
409  String shortName = new User(user).getShortName();
410  UserGroupInformation ugi = UserGroupInformation.createRemoteUser(shortName);
411  return allocationConf_.get().hasAccess(pool, QueueACL.SUBMIT_APPLICATIONS, ugi);
412  }
413 }
final AllocationFileLoaderService allocLoader_
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
byte[] resolveRequestPool(byte[] thriftResolvePoolParams)
TResolveRequestPoolResult resolveRequestPool(TResolveRequestPoolParams resolvePoolParams)
int getLlamaPoolConfigValue(Configuration conf, String pool, String key, int defaultValue)
RequestPoolService(final String fsAllocationPath, final String llamaSitePath)
byte[] getPoolConfig(byte[] thriftPoolConfigParams)
final AtomicReference< AllocationConfiguration > allocationConf_
boolean hasAccess(String pool, String user)
ObjectPool pool
TPoolConfigResult getPoolConfig(String pool)
String assignToPool(String requestedPool, String user)
static final TBinaryProtocol.Factory protocolFactory_