下面列出了怎么用org.apache.hadoop.hbase.util.CancelableProgressable的API类实例代码及写法,或者点击链接到github查看源代码。
private void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("recover WAL lease: " + path);
return true;
}
});
} catch (IOException e) {
LOG.warn("unable to recover lease for WAL: " + path, e);
}
}
/**
* Submit a log split task to executor service
* @param curTask task to submit
* @param curTaskZKVersion current version of task
*/
void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
CancelableProgressable reporter = new CancelableProgressable() {
private long last_report_at = 0;
@Override
public boolean progress() {
long t = EnvironmentEdgeManager.currentTime();
if ((t - last_report_at) > reportPeriod) {
last_report_at = t;
int latestZKVersion =
attemptToOwnTask(false, watcher, server.getServerName(), curTask,
zkVersion.intValue());
if (latestZKVersion < 0) {
LOG.warn("Failed to heartbeat the task" + curTask);
return false;
}
zkVersion.setValue(latestZKVersion);
}
return true;
}
};
ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
splitTaskDetails.setTaskNode(curTask);
splitTaskDetails.setCurTaskZKVersion(zkVersion);
WALSplitterHandler hsh =
new WALSplitterHandler(server, this, splitTaskDetails, reporter,
this.tasksInProgress, splitTaskExecutor);
server.getExecutorService().submit(hsh);
}
private static void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf);
RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("Still trying to recover WAL lease: " + path);
return true;
}
});
} catch (IOException e) {
LOG.warn("unable to recover lease for WAL: " + path, e);
}
}
/**
* Splits a WAL file.
* @return false if it is interrupted by the progress-able.
*/
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
RegionServerServices rsServices) throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
FileSystem rootFS = rootDir.getFileSystem(conf);
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
splitLogWorkerCoordination, rsServices);
return s.splitLogFile(logfile, reporter);
}
@Override
public void recoverFileLease(final FileSystem fs, final Path path) throws IOException {
final Configuration conf = master.getConfiguration();
RecoverLeaseFSUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("Recover Procedure Store log lease: " + path);
return isRunning();
}
});
}
/**
* @return Instance of HRegion if successful open else null.
*/
private HRegion openRegion() {
HRegion region = null;
try {
// Instantiate the region. This also periodically tickles OPENING
// state so master doesn't timeout this region in transition.
region = HRegion.openHRegion(this.regionInfo, this.htd,
this.rsServices.getWAL(this.regionInfo),
this.server.getConfiguration(),
this.rsServices,
new CancelableProgressable() {
@Override
public boolean progress() {
if (!isRegionStillOpening()) {
LOG.warn("Open region aborted since it isn't opening any more");
return false;
}
return true;
}
});
} catch (Throwable t) {
// We failed open. Our caller will see the 'null' return value
// and transition the node back to FAILED_OPEN. If that fails,
// we rely on the Timeout Monitor in the master to reassign.
LOG.error(
"Failed open of region=" + this.regionInfo.getRegionNameAsString(), t);
}
return region;
}
public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
super(server, EventType.RS_LOG_REPLAY);
this.splitTaskDetails = splitDetails;
this.coordination = coordination;
this.reporter = reporter;
this.inProgressTasks = inProgressTasks;
this.inProgressTasks.incrementAndGet();
this.serverName = server.getServerName();
this.splitTaskExecutor = splitTaskExecutor;
}
@Test
public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
generateWALs(1, 10, -1);
FileStatus logfile = fs.listStatus(WALDIR)[0];
useDifferentDFSClient();
final AtomicInteger count = new AtomicInteger();
CancelableProgressable localReporter
= new CancelableProgressable() {
@Override
public boolean progress() {
count.getAndIncrement();
return false;
}
};
FileSystem spiedFs = Mockito.spy(fs);
Mockito.doAnswer(new Answer<FSDataInputStream>() {
@Override
public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(1500); // Sleep a while and wait report status invoked
return (FSDataInputStream)invocation.callRealMethod();
}
}).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
null, wals, null);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
fail("There shouldn't be any exception but: " + e.toString());
} finally {
// reset it back to its default value
conf.setInt("hbase.splitlog.report.period", 59000);
}
}
@Override
public Status exec(String name, CancelableProgressable p) {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return Status.PREEMPTED;
}
if (!p.progress()) {
return Status.PREEMPTED;
}
}
}
/**
* The close method when error occurred. Now we just call recoverFileLease.
*/
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
if (buf != null) {
buf.release();
buf = null;
}
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}
/**
* Open HRegion. Calls initialize and sets sequenceid to both regular WAL and trx WAL.
*
* @param reporter
* @return Returns <code>this</code>
* @throws IOException
*/
@Override
protected HRegion openHRegion(final CancelableProgressable reporter) throws IOException {
super.openHRegion(reporter);
if (this.transactionLog != null) {
this.transactionLog.setSequenceNumber(super.getLog().getSequenceNumber());
}
return this;
}
public Reader createReader(final FileSystem fs, final Path path) throws IOException {
return createReader(fs, path, (CancelableProgressable)null);
}
void setReporter(CancelableProgressable reporter) {
this.reporter = reporter;
}
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
executor.shutdown();
out.close();
}
/**
* Create a reader for the WAL. If you are reading from a file that's being written to and need
* to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
* then just seek back to the last known good position.
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
public Reader createReader(final FileSystem fs, final Path path,
CancelableProgressable reporter) throws IOException {
return createReader(fs, path, reporter, true);
}
/**
* Create a reader for the given path, accept custom reader classes from conf.
* If you already have a WALFactory, you should favor the instance method.
* @return a WAL Reader, caller must close.
*/
static Reader createReader(final FileSystem fs, final Path path,
final Configuration configuration, final CancelableProgressable reporter) throws IOException {
return getInstance(configuration).createReader(fs, path, reporter);
}
/**
* Create a new {@link Reader} for reading logs to split.
* @return new Reader instance, caller should close
*/
protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
return walFactory.createReader(walFS, curLogFile, reporter);
}
/**
* The close method when error occurred.
*/
void recoverAndClose(CancelableProgressable reporter) throws IOException;
Status exec(String name, CancelableProgressable p);