下面列出了怎么用org.apache.hadoop.hbase.exceptions.TimeoutIOException的API类实例代码及写法,或者点击链接到github查看源代码。
synchronized long get(long timeoutNs) throws InterruptedException,
ExecutionException, TimeoutIOException {
final long done = System.nanoTime() + timeoutNs;
while (!isDone()) {
wait(1000);
if (System.nanoTime() >= done) {
throw new TimeoutIOException(
"Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+ " ms for txid=" + this.txid + ", WAL system stuck?");
}
}
if (this.throwable != null) {
throw new ExecutionException(this.throwable);
}
return this.doneTxid;
}
protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
// Now we have published the ringbuffer, halt the current thread until we get an answer back.
try {
if (syncFuture != null) {
if (closed) {
throw new IOException("WAL has been closed");
} else {
syncFuture.get(walSyncTimeoutNs);
}
}
} catch (TimeoutIOException tioe) {
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
// still refer to it, so if this thread use it next time may get a wrong
// result.
this.cachedSyncFutures.remove();
throw tioe;
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
throw convertInterruptedExceptionToIOException(ie);
} catch (ExecutionException e) {
throw ensureIOException(e.getCause());
}
}
@Test
public void testTimeout() throws InterruptedException, ExecutionException {
SLEEP_MS = 1000;
long startNs = System.nanoTime();
try {
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT,
TimeUnit.MILLISECONDS.toNanos(500)).get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TimeoutIOException.class));
}
long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
assertTrue(costMs >= 500);
assertTrue(costMs < 1000);
// wait for the background task finish
Thread.sleep(2000);
// Now the location should be in cache, so we will not visit meta again.
HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW,
RegionLocateType.CURRENT, TimeUnit.MILLISECONDS.toNanos(500)).get();
assertEquals(loc.getServerName(),
TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName());
}
private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
Supplier<String> timeoutMsg) {
if (future.isDone() || timeoutNs <= 0) {
return future;
}
Timeout timeoutTask = retryTimer.newTimeout(t -> {
if (future.isDone()) {
return;
}
future.completeExceptionally(new TimeoutIOException(timeoutMsg.get()));
}, timeoutNs, TimeUnit.NANOSECONDS);
FutureUtils.addListener(future, (loc, error) -> {
if (error != null && error.getClass() != TimeoutIOException.class) {
// cancel timeout task if we are not completed by it.
timeoutTask.cancel();
}
});
return future;
}
/**
* See HBASE-21862, it is very important to keep the original exception type for connection
* exceptions.
*/
@Test
public void testWrapConnectionException() throws Exception {
List<Throwable> exceptions = new ArrayList<>();
for (Class<? extends Throwable> clazz : ClientExceptionsUtil.getConnectionExceptionTypes()) {
exceptions.add(create(clazz));
}
InetSocketAddress addr = InetSocketAddress.createUnresolved("127.0.0.1", 12345);
for (Throwable exception : exceptions) {
if (exception instanceof TimeoutException) {
assertThat(IPCUtil.wrapException(addr, exception), instanceOf(TimeoutIOException.class));
} else {
assertThat(IPCUtil.wrapException(addr, exception), instanceOf(exception.getClass()));
}
}
}
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
String purpose, Predicate<T> predicate) throws IOException {
long done = EnvironmentEdgeManager.currentTime() + waitTime;
if (done <= 0) {
// long overflow, usually this means we pass Long.MAX_VALUE as waitTime
done = Long.MAX_VALUE;
}
boolean logged = false;
do {
T result = predicate.evaluate();
if (result != null && !result.equals(Boolean.FALSE)) {
return result;
}
try {
Thread.sleep(waitingTimeForEvents);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping, waiting on " + purpose);
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
if (LOG.isTraceEnabled()) {
LOG.trace("waitFor " + purpose);
} else {
if (!logged) LOG.debug("waitFor " + purpose);
}
logged = true;
} while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
throw new TimeoutIOException("Timed out while waiting on " + purpose);
}
@Test(expected = TimeoutIOException.class)
public void testGet() throws Exception {
long timeout = 5000;
long txid = 100000;
SyncFuture syncFulture = new SyncFuture().reset(txid);
syncFulture.done(txid, null);
assertEquals(txid, syncFulture.get(timeout));
syncFulture.reset(txid).get(timeout);
}
public static void assertIsTimeoutException(final Procedure<?> result) {
Throwable cause = assertProcFailed(result);
assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
}
/**
* Lock the row or throw otherwise
* @param rowKey the row key
* @return RowLock used to eventually release the lock
* @throws TimeoutIOException if the lock could not be acquired within the
* allowed rowLockWaitDuration and InterruptedException if interrupted while
* waiting to acquire lock.
*/
public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws IOException {
RowLockContext rowLockContext = null;
RowLockImpl result = null;
TraceScope traceScope = null;
// If we're tracing start a span to show how long this took.
if (Trace.isTracing()) {
traceScope = Trace.startSpan("LockManager.getRowLock");
traceScope.getSpan().addTimelineAnnotation("Getting a lock");
}
boolean success = false;
try {
// Keep trying until we have a lock or error out.
// TODO: do we need to add a time component here?
while (result == null) {
// Try adding a RowLockContext to the lockedRows.
// If we can add it then there's no other transactions currently running.
rowLockContext = new RowLockContext(rowKey);
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
// if there was a running transaction then there's already a context.
if (existingContext != null) {
rowLockContext = existingContext;
}
result = rowLockContext.newRowLock();
}
if (!result.getLock().tryLock(waitDuration, TimeUnit.MILLISECONDS)) {
if (traceScope != null) {
traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
}
throw new TimeoutIOException("Timed out waiting for lock for row: " + rowKey);
}
rowLockContext.setThreadName(Thread.currentThread().getName());
success = true;
return result;
} catch (InterruptedException ie) {
LOGGER.warn("Thread interrupted waiting for lock on row: " + rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
if (traceScope != null) {
traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
}
Thread.currentThread().interrupt();
throw iie;
} finally {
// On failure, clean up the counts just in case this was the thing keeping the context alive.
if (!success && rowLockContext != null) rowLockContext.cleanUp();
if (traceScope != null) {
traceScope.close();
}
}
}
/**
* Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
* <p/>
* Another usage for this method is to implement retrying. A procedure can set the state to
* {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
* {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
* call {@link #setTimeout(int)} method to set the timeout. And you should also override this
* method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
* timeout event has been handled.
* @return true to let the framework handle the timeout as abort, false in case the procedure
* handled the timeout itself.
*/
protected synchronized boolean setTimeoutFailure(TEnvironment env) {
if (state == ProcedureState.WAITING_TIMEOUT) {
long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
setFailure("ProcedureExecutor",
new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
return true;
}
return false;
}