下面列出了怎么用java.nio.file.WatchEvent的API类实例代码及写法,或者点击链接到github查看源代码。
private WatchEvent.Kind<?> translateActionToEvent(int action) {
switch (action) {
case FILE_ACTION_MODIFIED :
return StandardWatchEventKinds.ENTRY_MODIFY;
case FILE_ACTION_ADDED :
case FILE_ACTION_RENAMED_NEW_NAME :
return StandardWatchEventKinds.ENTRY_CREATE;
case FILE_ACTION_REMOVED :
case FILE_ACTION_RENAMED_OLD_NAME :
return StandardWatchEventKinds.ENTRY_DELETE;
default :
return null; // action not recognized
}
}
/**
* Check if the file system event is a create event
*/
public void processFileSystemEvent(FileSystemEvent event) throws Exception {
Path absolutePath = event.logDir();
String podUid = extractPodUidFromPath(absolutePath,
SingerSettings.getSingerConfig().getKubeConfig().getPodLogDirectory());
WatchEvent.Kind<?> kind = event.event().kind();
Path path = (Path) event.event().context();
// resolve path because WatchService returns event Paths that are relative
path = event.logDir().resolve(path);
boolean isCreate = kind.equals(StandardWatchEventKinds.ENTRY_CREATE);
if (isCreate) {
evaluateAndRegisterLogStreamOrWatcher(path, podUid);
}
}
/**
* Returns true if and only if the path corresponding to a WatchEvent
* represents the given file. This will be the case for Create, Modify,
* Delete events.
*
* @param file
* the file to restrict events to
* @return predicate
*/
private final static Func1<WatchEvent<?>, Boolean> onlyRelatedTo(final File file) {
return new Func1<WatchEvent<?>, Boolean>() {
@Override
public Boolean call(WatchEvent<?> event) {
final boolean ok;
if (file.isDirectory())
ok = true;
else if (StandardWatchEventKinds.OVERFLOW.equals(event.kind()))
ok = true;
else {
Object context = event.context();
if (context != null && context instanceof Path) {
Path p = (Path) context;
Path basePath = getBasePath(file);
File pFile = new File(basePath.toFile(), p.toString());
ok = pFile.getAbsolutePath().equals(file.getAbsolutePath());
} else
ok = false;
}
return ok;
}
};
}
private void callback(final Local folder, final WatchEvent<?> event, final FileWatcherListener l) {
final WatchEvent.Kind<?> kind = event.kind();
if(log.isInfoEnabled()) {
log.info(String.format("Process file system event %s for %s", kind.name(), event.context()));
}
if(ENTRY_MODIFY == kind) {
l.fileWritten(this.normalize(folder, event.context().toString()));
}
else if(ENTRY_DELETE == kind) {
l.fileDeleted(this.normalize(folder, event.context().toString()));
}
else if(ENTRY_CREATE == kind) {
l.fileCreated(this.normalize(folder, event.context().toString()));
}
else {
log.debug(String.format("Ignored file system event %s for %s", kind.name(), event.context()));
}
}
/**
* Creates a new instance.
*
* @param fileToWatch path of the file to watch
* @param callback the callback to invoke when a modification to the {@code fileToWatch} is detected
* @param loopContinuously whether to keep continue to look for file modification after one iteration. This option
* is useful for testing only.
* @throws InvalidPathException if {@code fileToWatch} is invalid
* @throws FileNotFoundException when a file at specified path {@code fileToWatch} does not exist
* @throws NullPointerException if either {@code fileToWatch} or {@code callback} is null
*/
@VisibleForTesting
FileModificationEventWatcher(@NonNull Path fileToWatch, @NonNull Consumer<WatchEvent<?>> callback,
boolean loopContinuously, boolean checkForFileExistence)
throws FileNotFoundException {
// Set the name for this object/thread for identification purposes.
super("pravega-file-watcher-" + THREAD_NUM.incrementAndGet());
Exceptions.checkNotNullOrEmpty(fileToWatch.toString(), "fileToWatch");
if (checkForFileExistence && !fileToWatch.toFile().exists()) {
throw new FileNotFoundException(String.format("File [%s] does not exist.", fileToWatch));
}
this.watchedFilePath = fileToWatch;
this.callback = callback;
this.loopContinuously = loopContinuously;
setUncaughtExceptionHandler(uncaughtExceptionalHandler);
}
WindowsWatchKey init(long handle,
Set<? extends WatchEvent.Kind<?>> events,
boolean watchSubtree,
NativeBuffer buffer,
long countAddress,
long overlappedAddress,
int completionKey)
{
this.handle = handle;
this.events = events;
this.watchSubtree = watchSubtree;
this.buffer = buffer;
this.countAddress = countAddress;
this.overlappedAddress = overlappedAddress;
this.completionKey = completionKey;
return this;
}
public static void main(final String[] args) throws InterruptedException, IOException {
final WatchService watchService = FileSystems.getDefault().newWatchService();
Paths.get(WATCH_DIR).register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
for (;;) {
final WatchKey key = watchService.take();
for (final WatchEvent<?> event : key.pollEvents()) {
final WatchEvent.Kind kind = event.kind();
// TODO
if (kind == StandardWatchEventKinds.OVERFLOW) continue;
System.out.format("%s: %s\n", kind.name(), cast(event).context());
}
key.reset();
}
}
private Path resolvePath(WatchKey key, WatchEvent<?> event) {
WatchEvent<Path> ev = cast(event);
// Context for directory entry event is the file name of entry.
Path contextPath = ev.context();
Path baseWatchedDir = null;
Path registeredPath = null;
synchronized (this) {
baseWatchedDir = keyToService.get(key).getSourcePath();
registeredPath = registeredKeys.get(key);
}
if (registeredPath != null) {
// If the path has been registered in the watch service it relative path can be resolved
// The context path is resolved by its already registered parent path
return registeredPath.resolve(contextPath);
}
logger.warn(
"Detected invalid WatchEvent '{}' and key '{}' for entry '{}' in not registered file or directory of '{}'",
event, key, contextPath, baseWatchedDir);
return null;
}
/**
* Process all modify events
*
* @throws InterruptedException if interrupted while waiting
*/
@SuppressWarnings("unchecked")
private void processModifiedFiles() throws InterruptedException {
WatchKey key = watcher.take();
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
overflowed = true;
continue;
}
if (watchKeys.containsKey(key)) {
WatchEvent<Path> ev = (WatchEvent<Path>) event;
String filename = String.format("%s%s%s", watchKeys.get(key).toString(),
File.separator, ev.context().toString());
File file = new File(filename);
if (fileIsModified(file))
fileRecords.get(file).onEvent();
}
}
key.reset();
}
private void processEvent(Path dir, WatchEvent<Path> event) {
// Context for directory entry event is the file name of entry
Path relChild = event.context();
Path child = dir.resolve(relChild);
Kind<Path> kind = event.kind();
if(kind == ENTRY_MODIFY) {
handleModification(child, externalInitiator);
} else if(kind == ENTRY_CREATE) {
handleCreation(child, externalInitiator);
} else if(kind == ENTRY_DELETE) {
model.delete(child, externalInitiator);
} else {
throw new AssertionError("unreachable code");
}
}
private Path resolvePath(WatchKey key, WatchEvent<?> event) {
WatchEvent<Path> ev = cast(event);
// Context for directory entry event is the file name of entry.
Path contextPath = ev.context();
Path baseWatchedDir = null;
Path registeredPath = null;
synchronized (this) {
baseWatchedDir = keyToService.get(key).getSourcePath();
registeredPath = registeredKeys.get(key);
}
if (registeredPath != null) {
// If the path has been registered in the watch service it relative path can be resolved
// The context path is resolved by its already registered parent path
return registeredPath.resolve(contextPath);
}
logger.warn(
"Detected invalid WatchEvent '{}' and key '{}' for entry '{}' in not registered file or directory of '{}'",
event, key, contextPath, baseWatchedDir);
return null;
}
WindowsWatchKey init(long handle,
Set<? extends WatchEvent.Kind<?>> events,
boolean watchSubtree,
NativeBuffer buffer,
long countAddress,
long overlappedAddress,
int completionKey)
{
this.handle = handle;
this.events = events;
this.watchSubtree = watchSubtree;
this.buffer = buffer;
this.countAddress = countAddress;
this.overlappedAddress = overlappedAddress;
this.completionKey = completionKey;
return this;
}
@Test
public void testOverflow() throws IOException {
AbstractWatchService.Key key =
watcher.register(new StubWatchable(), ImmutableSet.of(ENTRY_CREATE));
for (int i = 0; i < AbstractWatchService.Key.MAX_QUEUE_SIZE + 10; i++) {
key.post(new AbstractWatchService.Event<>(ENTRY_CREATE, 1, null));
}
key.signal();
List<WatchEvent<?>> events = key.pollEvents();
assertThat(events).hasSize(AbstractWatchService.Key.MAX_QUEUE_SIZE + 1);
for (int i = 0; i < AbstractWatchService.Key.MAX_QUEUE_SIZE; i++) {
assertThat(events.get(i).kind()).isEqualTo(ENTRY_CREATE);
}
WatchEvent<?> lastEvent = events.get(AbstractWatchService.Key.MAX_QUEUE_SIZE);
assertThat(lastEvent.kind()).isEqualTo(OVERFLOW);
assertThat(lastEvent.count()).isEqualTo(10);
}
@Override
public void run() {
WatchService watchService = WatchServiceUtil.watchModify(pluginDir);
WatchKey key;
while (watchService != null){
try {
key = watchService.take();
for (WatchEvent<?> watchEvent : key.pollEvents()) {
if(watchEvent.kind() == ENTRY_MODIFY){
String fileName = watchEvent.context() == null ? "" : watchEvent.context().toString();
Plugin plugin = PluginLibraryHelper.getPluginByConfigFileName(fileName);
if(plugin != null){
plugin.init(PluginLibraryHelper.getPluginConfig(plugin));
log.info("已完成插件{}的配置重新加载",plugin.pluginName());
}
}
}
key.reset();
} catch (Exception e) {
log.error("插件配置文件监听异常",e);
break;
}
}
}
@Override
public void run() {
while (true) {
try {
WatchKey key = watchService.take();
if (key == trustedKey) {
for (WatchEvent<?> watchEvent : key.pollEvents()) {
WatchEvent.Kind<?> kind = watchEvent.kind();
if (kind != StandardWatchEventKinds.OVERFLOW) {
synchronizeTrustedCertificates();
}
}
}
if (!key.reset()) {
break;
}
} catch (InterruptedException e) {
logger.error("Watcher interrupted.", e);
}
}
}
@Override
public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events) throws IOException {
checkNotNull(watcher);
checkNotNull(events);
if (!(watcher instanceof AbstractWatchService)) {
throw new IllegalArgumentException(
"watcher (" + watcher + ") is not associated with this file system");
}
AbstractWatchService service = (AbstractWatchService) watcher;
return service.register(this, Arrays.asList(events));
}
/**
* Check if there are any new events available in the eventfetcher queue
*
* @throws InterruptedException
*/
public void checkAndProcessFsEvents() throws InterruptedException {
// process events from fsEventFetcher
FileSystemEvent event = fsEventFetcher.getEvent();
WatchEvent.Kind<?> kind = event.event().kind();
Path file = (Path) event.event().context();
// should be NO use case for FS Modify
// ignore delete events
if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) {
if (!file.toFile().isFile()) {
String podName = file.toFile().getName();
if (podName.startsWith(".")) {
// ignore tombstone files
return;
}
LOG.info("New pod directory discovered by FSM:" + event.logDir() + " " + podLogDirectory
+ " podname:" + podName);
Stats.incr(SingerMetrics.PODS_CREATED);
Stats.incr(SingerMetrics.NUMBER_OF_PODS);
activePodSet.add(podName);
updatePodWatchers(podName, false);
}
// ignore all events that are not directory create events
} else if (kind.equals(StandardWatchEventKinds.OVERFLOW)) {
LOG.warn("Received overflow watch event from filesystem: Events may have been lost");
// perform a full sync on pod names from file system
updatePodNamesFromFileSystem();
} else if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE)) {
// ignore the . files
if (!file.toFile().getName().startsWith(".")) {
LOG.info("File deleted:" + file.toFile().getName());
}
}
}
private static void watch(final WatchKey key) {
if (watcherThread != null) {
// tell the old watchter thread to shutdown
watcherThread.interrupt();
}
watcherThread = new Thread("ArchivingTest.watch") {
@Override
public void run() {
for (; ; ) {
for (final WatchEvent<?> event : key.pollEvents()) {
final WatchEvent.Kind<?> kind = event.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
continue;
}
if (watcherThread != this || isInterrupted()) {
return;
}
lastEvent.set(event);
latch.get().countDown();
}
final boolean valid = key.reset();
if (!valid) {
System.out.println("ArchivingTest.watch terminated");
break;
}
}
}
};
watcherThread.start();
}
@Override
public void dealWithEvent(WatchEvent<?> event) {
_log.debug(event.context() + ":\t " + event.kind() + " event.");
String dynamicPolicy = PropUtils.getInstance().getProperty("dynamic.prop.changed");
_log.debug("DynamicPolicy is {}", dynamicPolicy);
if (!StrUtils.isEmpty(dynamicPolicy) && DYNAMIC_PROP_UPLOAD.equals(dynamicPolicy)) {
uploadProp2ZK();
}
}
public void addRegistration(Path p, BiConsumer<Path, WatchEvent.Kind<?>> callback) {
Path toWatch = p;
if(!Files.isDirectory(toWatch))
toWatch = toWatch.getParent();
try {
WatchKey k = toWatch.register(service, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
callbacks.put(p, callback);
}
/**
* Schedules forwarding of the event to the listeners (if appliccable).
* <p>
* By delaying the forwarding, duplicate modification events and those where the actual file-content is not
* consistent or empty in between will get skipped and the file system gets a chance to "settle" before the
* framework is going to act on it.
* <p>
* Also, modification events are received for meta-data changes (e.g. last modification timestamp or file
* permissions). They are filtered out by comparing the checksums of the file's content.
* <p>
* See also
* <a href=
* "https://stackoverflow.com/questions/16777869/java-7-watchservice-ignoring-multiple-occurrences-of-the-same-event">this
* discussion</a> on Stack Overflow.
*
*
* @param key
* @param event
* @param resolvedPath
* @param service
*/
private void processModificationEvent(WatchKey key, WatchEvent<?> event, Path resolvedPath,
AbstractWatchService service) {
synchronized (futures) {
logger.trace("Modification event for {} ", resolvedPath);
ScheduledFuture<?> previousFuture = removeScheduledJob(key, resolvedPath);
if (previousFuture != null) {
previousFuture.cancel(true);
logger.trace("Cancelled previous for {} ", resolvedPath);
}
ScheduledFuture<?> future = scheduler.schedule(() -> {
logger.trace("Executing job for {}", resolvedPath);
ScheduledFuture<?> res = removeScheduledJob(key, resolvedPath);
if (res != null) {
logger.trace("Job removed itself for {}", resolvedPath);
} else {
logger.trace("Job couldn't find itself for {}", resolvedPath);
}
if (checkAndTrackContent(service, resolvedPath)) {
service.processWatchEvent(event, event.kind(), resolvedPath);
} else {
logger.trace("File content '{}' has not changed, skipping modification event", resolvedPath);
}
}, PROCESSING_DELAY, TimeUnit.MILLISECONDS);
logger.trace("Scheduled processing of {}", resolvedPath);
rememberScheduledJob(key, resolvedPath, future);
}
}
@Override
public WatchKey register(final Watchable folder,
final WatchEvent.Kind<?>[] events,
final WatchEvent.Modifier... modifiers)
throws IOException {
if(log.isInfoEnabled()) {
log.info(String.format("Register file %s for events %s", folder, Arrays.toString(events)));
}
final Pointer[] values = {
CFStringRef.toCFString(folder.toString()).getPointer()};
final MacOSXWatchKey key = new MacOSXWatchKey(folder, this, events);
final double latency = 1.0; // Latency in seconds
final Map<File, Long> timestamps = createLastModifiedMap(new File(folder.toString()));
final FSEvents.FSEventStreamCallback callback = new Callback(key, timestamps);
final FSEventStreamRef stream = library.FSEventStreamCreate(
Pointer.NULL, callback, Pointer.NULL,
library.CFArrayCreate(null, values, CFIndex.valueOf(1), null),
-1, latency,
kFSEventStreamCreateFlagNoDefer);
final CountDownLatch lock = new CountDownLatch(1);
final CFRunLoop loop = new CFRunLoop(lock, stream);
threadFactory.newThread(loop).start();
Uninterruptibles.awaitUninterruptibly(lock);
loops.put(key, loop);
callbacks.put(key, callback);
return key;
}
private void processEvents(WindowsWatchKey key, int size) {
long address = key.buffer().address();
int nextOffset;
do {
int action = UNSAFE.getInt(address + OFFSETOF_ACTION);
// map action to event
WatchEvent.Kind<?> kind = translateActionToEvent(action);
if (key.events().contains(kind)) {
// copy the name
int nameLengthInBytes = UNSAFE.getInt(address + OFFSETOF_FILENAMELENGTH);
if ((nameLengthInBytes % 2) != 0) {
throw new AssertionError("FileNameLength is not a multiple of 2");
}
char[] nameAsArray = new char[nameLengthInBytes/2];
UNSAFE.copyMemory(null, address + OFFSETOF_FILENAME, nameAsArray,
Unsafe.ARRAY_CHAR_BASE_OFFSET, nameLengthInBytes);
// create FileName and queue event
WindowsPath name = WindowsPath
.createFromNormalizedPath(fs, new String(nameAsArray));
key.signalEvent(kind, name);
}
// next event
nextOffset = UNSAFE.getInt(address + OFFSETOF_NEXTENTRYOFFSET);
address += (long)nextOffset;
} while (nextOffset != 0);
}
private void handleNextWatchNotification() throws InterruptedException {
log.debug("Watching file change: " + file);
// wait for key to be signalled
WatchKey key = watchService.take();
log.info("Watch Key notified");
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
log.debug("Watch event is OVERFLOW");
continue;
}
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path changed = this.file.getParent().resolve(ev.context());
log.info("Watch file change: " + ev.context() + "=>" + changed);
// Need to use path equals than isSameFile
if (Files.exists(changed) && changed.equals(this.file)) {
log.debug("Watch matching file: " + file);
try {
callback.run();
} catch (Exception e) {
log.warn("Hit error callback on file change", e);
}
break;
}
}
key.reset();
}
private void assertWatcherHasEvents(List<WatchEvent<?>> expected, List<WatchEvent<?>> alternate)
throws InterruptedException {
ensureTimeToPoll(); // otherwise we could read 1 event but not all the events we're expecting
WatchKey key = watcher.take();
List<WatchEvent<?>> keyEvents = key.pollEvents();
if (keyEvents.size() == expected.size() || alternate.isEmpty()) {
assertThat(keyEvents).containsExactlyElementsIn(expected);
} else {
assertThat(keyEvents).containsExactlyElementsIn(alternate);
}
key.reset();
}
/**
* Every time doHasNext() is called, check the WatchService for new events and add all new events
* to the queue. Then return true if there are files on the queue, or false otherwise.
*
* <p>If the event indicates that a file has been deleted, ensure it is removed from the queue.
*/
@Override
public boolean doHasNext() throws IOException, CollectionException {
WatchKey key;
while ((key = watcher.poll()) != null) {
for (WatchEvent<?> event : key.pollEvents()) {
processEvent(key, event);
getMonitor().meter("events").mark();
}
key.reset();
}
return !currLines.isEmpty() || !queue.isEmpty();
}
public void run(){
isRunning.set(true);
clearErroredFiles();
try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
final WatchKey watchKey = FileSystems.getDefault().getPath(serverConfiguration.getParentDirectoryPath()).register(watchService, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE}, SensitivityWatchEventModifier.HIGH);
while (isRunning.get()) {
final WatchKey wk = watchService.take();
for (WatchEvent<?> event : wk.pollEvents()) {
final Path changed = (Path) event.context();
if (changed.toFile().getName().startsWith(SCRIPT_ERROR_FILE_PREFIX)) {
processError(ErrorFileType.SCRIPT, changed.toFile());
}
}
boolean valid = wk.reset();
if (!valid) {
ServerLogger.log(ServerLogger.Level.INFO, TAG, "The watcher key has been unregistered");
}
}
}catch(Exception ex){
if (isRunning.get()) {
ServerLogger.error(TAG, "An error occurred while watching for errors. Shutting down!", ex);
messageServer.shutdown(-1, false);
}
}
}
private void watchThreadFunc() {
logger.info("Config watch thread is started");
while(state!=ServiceState.Stopped) {
WatchKey watchKey = null;
try{
watchKey = watcher.poll(100, TimeUnit.MILLISECONDS);
}catch(Throwable t) {}
if ( watchKey==null ) {
Thread.yield();
continue;
}
ConfigProviderEntry providerEntry = getEntryByFile(watchKey);
if ( providerEntry!=null ) {
for(WatchEvent<?> event:watchKey.pollEvents()) {
WatchEvent<Path> ev = (WatchEvent<Path>) event;
WatchEvent.Kind<?> kind = event.kind();
Path filename = ev.context();
if (kind == java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY
&& filename.toString().equals(providerEntry.file.getName())) {
doReload(providerEntry);
}
}
}
watchKey.reset();
}
logger.info("Config watch thread is stopped");
}
@Override
WatchKey register(Path path,
WatchEvent.Kind<?>[] events,
WatchEvent.Modifier... modifiers)
throws IOException
{
// delegate to poller
return poller.register(path, events, modifiers);
}
@Override
public List<WatchEvent<?>> pollEvents() {
// note: it's correct to be able to retrieve more events from a key without calling reset()
// reset() is ONLY for "returning" the key to the watch service to potentially be retrieved by
// another thread when you're finished with it
List<WatchEvent<?>> result = new ArrayList<>(events.size());
events.drainTo(result);
int overflowCount = overflow.getAndSet(0);
if (overflowCount != 0) {
result.add(overflowEvent(overflowCount));
}
return Collections.unmodifiableList(result);
}