Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
FileWatchService.java
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.util;
16 
17 import java.io.File;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 
27 import com.google.common.annotations.VisibleForTesting;
28 import com.google.common.base.Preconditions;
29 import com.google.common.util.concurrent.ThreadFactoryBuilder;
30 
36 public class FileWatchService {
37  final static Logger LOG = LoggerFactory.getLogger(FileWatchService.class);
38 
39  // Default time to wait between checking the file.
40  static final long DEFAULT_CHECK_INTERVAL_MS = 10 * 1000;
41 
42  // Time between checking for changes. Mutable for unit tests.
44 
45  // Future returned by scheduleAtFixedRate(), needed to stop the checking thread.
46  private ScheduledFuture<?> fileCheckFuture_;
47 
48  private final AtomicBoolean running_;
49  private final FileChangeListener changeListener_; // Used to notify when changes occur.
50  private final File file_; // The file to check for changes.
51  private boolean alreadyWarned_; // Avoid repeatedly warning if the file is missing
52  private long prevChange_; // Time of the last observed change
53 
57  public interface FileChangeListener {
58 
62  void onFileChange();
63  }
64 
65  public FileWatchService(File file, FileChangeListener listener) {
66  Preconditions.checkNotNull(file);
67  Preconditions.checkNotNull(listener);
68  Preconditions.checkArgument(file.exists());
69  running_ = new AtomicBoolean(false);
70  file_ = file;
71  changeListener_ = listener;
72  prevChange_ = 0L;
73  alreadyWarned_ = false;
74  }
75 
80  @VisibleForTesting
81  public void setCheckIntervalMs(long checkIntervalMs) {
82  checkIntervalMs_ = checkIntervalMs;
83  }
84 
89  private void checkFile() {
90  if (file_.exists()) {
91  long lastChange = file_.lastModified();
92  if (lastChange > prevChange_) {
93  changeListener_.onFileChange();
94  prevChange_ = lastChange;
95  alreadyWarned_ = false;
96  }
97  } else {
98  if (!alreadyWarned_) {
99  LOG.warn("File does not exist: {}", file_.getPath());
100  alreadyWarned_ = true;
101  }
102  }
103  }
104 
109  public synchronized void start() {
110  Preconditions.checkState(!running_.get());
111  running_.set(true);
112 
113  ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
114  new ThreadFactoryBuilder()
115  .setDaemon(true)
116  .setNameFormat("FileWatchThread(" + file_.getPath() + ")-%d")
117  .build());
118  fileCheckFuture_ = executor.scheduleAtFixedRate(new Runnable() {
119  public void run() {
120  try {
121  checkFile();
122  } catch (SecurityException e) {
123  LOG.warn("Not allowed to check read file existence: " + file_.getPath(), e);
124  }
125  }
126  }, 0L, checkIntervalMs_, TimeUnit.MILLISECONDS);
127  }
128 
132  public synchronized void stop() {
133  Preconditions.checkState(running_.get());
134  running_.set(false);
135  fileCheckFuture_.cancel(false);
136  }
137 }
FileWatchService(File file, FileChangeListener listener)
void setCheckIntervalMs(long checkIntervalMs)