下面列出了怎么用org.apache.commons.io.input.Tailer的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Creates a CsvStreamReader with supplied separator and quote char.
*
* @param source The file to an underlying CSV source
* @param separator The delimiter to use for separating entries
* @param quoteChar The character to use for quoted elements
* @param escape The character to use for escaping a separator or quote
* @param line The line number to skip for start reading
* @param strictQuotes Sets if characters outside the quotes are ignored
* @param ignoreLeadingWhiteSpace If true, parser should ignore
* white space before a quote in a field
*/
private CsvStreamReader(Source source, char separator, char quoteChar,
char escape, int line, boolean strictQuotes,
boolean ignoreLeadingWhiteSpace) {
super(new StringReader("")); // dummy call to base constructor
contentQueue = new ArrayDeque<>();
TailerListener listener = new CsvContentListener(contentQueue);
tailer = Tailer.create(source.file(), listener, DEFAULT_MONITOR_DELAY,
false, true, 4096);
this.parser = new CSVParser(separator, quoteChar, escape, strictQuotes,
ignoreLeadingWhiteSpace);
this.skipLines = line;
try {
// wait for tailer to capture data
Thread.sleep(DEFAULT_MONITOR_DELAY);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void readAndTail(File logsFile, int fromNumberOfLines) throws IOException {
new Tailer(logsFile, StandardCharsets.UTF_8, new TailerListenerAdapter() {
@SneakyThrows
@Override
public void init(Tailer tailer) {
super.init(tailer);
if(fromNumberOfLines != -1) {
out.println(LauncherUtils.readLastLines(logsFile, fromNumberOfLines));
}
}
@Override
public void handle(String line) {
out.println(line);
}
@Override
public void handle(Exception ex) {
ex.printStackTrace();
System.exit(1);
}
}, 100, fromNumberOfLines != -1, false, 4096).run();
}
private void startTailingLog() {
TailerListener listener =
new TailerListenerAdapter() {
@Override
public void handle(String line) {
System.out.println(testName + ": " + line);
}
};
// Tail the log
File file = new File(getBasedir() + File.separator + getLogFileName());
try {
if (file.exists()) {
file.delete();
}
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
Tailer tailer = new Tailer(file, listener, TAIL_DELAY_MILLIS);
Thread thread = new Thread(tailer);
thread.setDaemon(true);
thread.start();
}
@SuppressWarnings("unchecked")
@Activate
public void activate(ComponentContext context) throws ConfigurationException {
properties = context.getProperties();
if (properties.get("type") == null) {
throw new ConfigurationException("type", "type property is mandatory");
}
String type = (String) properties.get("type");
if (properties.get("path") == null) {
throw new ConfigurationException("path", "path property is mandatory");
}
String path = (String) properties.get("path");
LOGGER.debug("Starting tail on {}", path);
tailer = new Tailer(new File(path), this, 1000, true, true);
Thread thread = new Thread(tailer, "File tailer for " + path);
this.type = type;
this.path = path;
this.regex = (String) properties.get("regex");
thread.start();
if (regex != null) {
compiledRegex = Pattern.compile(regex);
}
}
/**
* Creates a CsvStreamReader with supplied separator and quote char.
*
* @param source The file to an underlying CSV source
* @param separator The delimiter to use for separating entries
* @param quoteChar The character to use for quoted elements
* @param escape The character to use for escaping a separator or quote
* @param line The line number to skip for start reading
* @param strictQuotes Sets if characters outside the quotes are ignored
* @param ignoreLeadingWhiteSpace If true, parser should ignore
* white space before a quote in a field
*/
private CsvStreamReader(Source source, char separator, char quoteChar,
char escape, int line, boolean strictQuotes,
boolean ignoreLeadingWhiteSpace) {
super(new StringReader("")); // dummy call to base constructor
contentQueue = new ArrayDeque<>();
TailerListener listener = new CsvContentListener(contentQueue);
tailer = Tailer.create(source.file(), listener, DEFAULT_MONITOR_DELAY,
false, true, 4096);
this.parser = new CSVParser(separator, quoteChar, escape, strictQuotes,
ignoreLeadingWhiteSpace);
this.skipLines = line;
try {
// wait for tailer to capture data
Thread.sleep(DEFAULT_MONITOR_DELAY);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
protected LogTailer(File toRead, long timeForFileToBeCreated) throws IOException {
long requiredTime = System.currentTimeMillis() + timeForFileToBeCreated;
while ((System.currentTimeMillis() < requiredTime) && !(toRead.exists() && toRead.canRead())) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ignore
}
}
if (!toRead.exists() || !toRead.canRead()) {
throw new IllegalStateException("Couldn't read "+toRead.getCanonicalPath()+" in the configured timeout");
}
logger.debug("Initialising Tailer for "+toRead.getCanonicalPath());
tailer = new Tailer(toRead, this, DELAY, false);
}
/**
* Start tailer thread.
*/
public void start() {
clearLogs();
tailer = new Tailer(carbonLogFile, carbonLogTailer, 1000, startReadingFromEndOfFile);
Thread thread = new Thread(tailer);
thread.setDaemon(true);
thread.start();
Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).
atMost(5, TimeUnit.SECONDS).
until(hasThreadStarted(thread));
}
/**
* Setup a file tailer.
* @param buf String. The Address buffer.
* @throws Exception on setup exception.
*/
void processLogTailer(String buf) throws Exception {
// String buf = "file://log.txt&whence=eof&delay=500&reopen=false&bufsize=4096";
buf = buf.substring(7);
String parts [] = buf.split("&");
String fileName = parts[0];
for (int i=1;i<parts.length;i++) {
String t[] = parts[i].trim().split("=");
if (t.length != 2) {
throw new Exception("Not a proper parameter (a=b)");
}
t[0] = t[0].trim();
t[1] = t[1].trim();
switch(t[0]) {
case "whence":
if (t[1].equalsIgnoreCase("bof"))
end = false;
else
end = true;
break;
case "delay":
delay = Integer.parseInt(t[1]);
break;
case "reopen":
reopen = Boolean.parseBoolean(t[1]);
break;
case "bufsize":
bufsize = Integer.parseInt(t[1]);
break;
}
}
topic = fileName;
tailer = Tailer.create(new File(fileName), this, delay, end, reopen, bufsize );
}
/**
* start the file watch service
*/
public void startService(){
tailListener.setParams(datastore, collection, interpreter, filePath);
Tailer tailer = new Tailer(new File(filePath), tailListener, defaultTime, startFromEnd);
Thread thread = new Thread(tailer);
thread.setDaemon(true);
thread.start();
this.tailer = tailer;
}
FileToStreamOutputWriter(OutputWriter innerOutputWriter, Path filePath, KinesisConfig kinesisConfig) {
this.innerOutputWriter = innerOutputWriter;
this.filePath = filePath;
this.stream = kinesisConfig.stream();
this.listener = new ExportListener(stream);
this.tailer = Tailer.create(filePath.toFile(), listener);
}
@Override
public void startReading() {
System.out.println("TailingReader:StartReading() called");
if (tailingThread != null && tailingThread.isAlive()) return;
if (readingThread != null && readingThread.isAlive()) return;
Task readingTask = new Task<Void>() {
@Override public Void call() {
try {
// make sure the file exists
while (readingStatus.getValue() && (sourceFile == null || !sourceFile.exists() || !sourceFile.canRead() || !sourceFile.isFile())){
Thread.sleep(1000);
System.out.println("Waiting for " + sourceFile.getPath());
Platform.runLater(() ->{
statusLabel.setText("Waiting for " + sourceFile.getPath());
});
}
if (readingStatus.getValue() ) {
Platform.runLater(() -> statusLabel.setText("Reading file: " + sourceFile.getPath()));
MyHandler listener = new MyHandler();
tailer = new Tailer(sourceFile, listener, 1000, Boolean.FALSE, Boolean.TRUE);
tailingThread = new Thread(tailer);
tailingThread.setDaemon(true); // optional
tailingThread.start();
readingStatus.setValue(Boolean.TRUE);
}
} catch (InterruptedException ex) {
Logger.getLogger(TailingReader.class.getName()).log(Level.SEVERE, null, ex);
}
return null;
}
};
readingThread = new Thread(readingTask);
readingThread.setDaemon(true); // optional
readingThread.start();
}
public void open(Map stormConf) {
fileToParseName = fileToParse.getAbsolutePath();
parsedLineQueue = new LinkedBlockingQueue<Map>(maxQueueSize);
currentLine = 0;
if (lineParser == null)
lineParser = new NoOpLogLineParser();
tailer = Tailer.create(fileToParse, this, tailerDelayMs);
log.info("Started tailing "+fileToParseName);
}
public void tail(TailerListener listener) {
File file = new File(
fileLocation);
Tailer tailer = Tailer.create(file, listener, 10);
tailer.run();
}
public void tail(TailerListener listener, String alias) {
File file = logLookup.map(l -> l.lookup(alias))
.orElse(new File(
fileLocation));
Tailer tailer = Tailer.create(file, listener, 10);
tailer.run();
}
public Observable<String> getStream(final long pollIntervalMs) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
TailerListener listener = createListener(emitter);
final Tailer tailer = new Tailer(file, listener, pollIntervalMs);
try {
tailer.run();
} catch (Throwable e) {
emitter.onError(e);
}
}
});
}
@Autowired
public TailService(@Value("${geoip2.cityfile}") String cityFile,
@Value("${access.logs}") String accessLogs,
SimpMessageSendingOperations messagingTemplate) {
this.messagingTemplate = messagingTemplate;
String databaseFile = cityFile;
if (databaseFile != null) {
Path database = Paths.get(databaseFile);
if (Files.exists(database)) {
try {
this.reader = new DatabaseReader.Builder(database.toFile()).build();
}
catch (IOException e) {
LoggerFactory.getLogger(getClass()).error("GeoIPCityService init", e);
}
}
}
this.tailers = new ArrayList<>();
for (String logFile : accessLogs.split(",")) {
Path p = Paths.get(logFile.trim());
this.tailers.add(new Tailer(p.toFile(), new ListenerAdapter()));
}
this.executor = Executors.newFixedThreadPool(this.tailers.size());
for (Tailer tailer : this.tailers) {
this.executor.execute(tailer);
}
}
@PreDestroy
public void preDestroy() {
if (this.tailers != null) {
for (Tailer tailer : this.tailers) {
tailer.stop();
}
}
if (this.executor != null) {
this.executor.shutdown();
}
}
@Override
public void init(Tailer tailer) {
this.tailer = tailer;
}
public static void test(final TestParams testParams) throws IOException, ClassNotFoundException,
NoSuchMethodException, InvocationTargetException, IllegalAccessException, InterruptedException {
final File tempFile = File.createTempFile("test", "test");
final File labelFile = File.createTempFile("result", "result");
LOG.info("Starting test, output file is {}, test config is \n{}", tempFile.getAbsolutePath(), testParams.toString());
Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
Class.forName(testParams.getTaskClassName())
.getMethod("main", String[].class)
.invoke(null, (Object) String.format(
testParams.getCliStringTemplate(),
tempFile.getAbsolutePath(),
testParams.getInputInstances(),
testParams.getSamplingSize(),
testParams.getInputDelayMicroSec(),
labelFile.getAbsolutePath(),
testParams.getLabelSamplingSize()
).split("[ ]"));
} catch (Exception e) {
LOG.error("Cannot execute test {} {}", e.getMessage(), e.getCause().getMessage());
}
return null;
}
});
Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds()));
CountDownLatch signalComplete = new CountDownLatch(1);
final Tailer tailer = Tailer.create(tempFile, new TestResultsTailerAdapter(signalComplete), 1000);
new Thread(new Runnable() {
@Override
public void run() {
tailer.run();
}
}).start();
signalComplete.await();
tailer.stop();
assertResults(tempFile, testParams);
if (testParams.getLabelFileCreated())
assertLabels(labelFile, testParams);
}
@Override
public void init(Tailer tailer) {
}
@Override
public void init(Tailer tailer) {
startupLatch.countDown();
// logger.debug(System.currentTimeMillis()+": Started!");
}
public void start () {
OutputFileListener listener = new OutputFileListener(this);
tailer = Tailer.create(fileToTail, listener, 500);
}
public static void test(final TestParams testParams) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InterruptedException {
final File tempFile = File.createTempFile("test", "test");
LOG.info("Starting test, output file is {}, test config is \n{}", tempFile.getAbsolutePath(), testParams.toString());
Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
Class.forName(testParams.getTaskClassName())
.getMethod("main", String[].class)
.invoke(null, (Object) String.format(
testParams.getCliStringTemplate(),
tempFile.getAbsolutePath(),
testParams.getInputInstances(),
testParams.getSamplingSize(),
testParams.getInputDelayMicroSec()
).split("[ ]"));
} catch (Exception e) {
LOG.error("Cannot execute test {} {}", e.getMessage(), e.getCause().getMessage());
}
return null;
}
});
Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds()));
CountDownLatch signalComplete = new CountDownLatch(1);
final Tailer tailer = Tailer.create(tempFile, new TestResultsTailerAdapter(signalComplete), 1000);
new Thread(new Runnable() {
@Override
public void run() {
tailer.run();
}
}).start();
signalComplete.await();
tailer.stop();
assertResults(tempFile, testParams);
}