下面列出了java.io.InterruptedIOException#initCause ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Retrieves a task from the queue with the given timeout.
*
* @param useTimeout whether to use a timeout.
* @param timeoutNano Time to wait, in nanoseconds.
* @return A non-{@code null} Runnable from the queue.
* @throws InterruptedIOException
*/
private Runnable take(boolean useTimeout, long timeoutNano) throws InterruptedIOException {
Runnable task = null;
try {
if (!useTimeout) {
task = mQueue.take(); // Blocks if the queue is empty.
} else {
// poll returns null upon timeout.
task = mQueue.poll(timeoutNano, TimeUnit.NANOSECONDS);
}
} catch (InterruptedException e) {
InterruptedIOException exception = new InterruptedIOException();
exception.initCause(e);
throw exception;
}
if (task == null) {
// This will terminate the loop.
throw new SocketTimeoutException("loop timeout");
}
return task;
}
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) return scanner;
if (user == null) {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, null, request, null);
} else {
try {
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
@Override
public InternalScanner run() throws Exception {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, null, request, null);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
@Override
public int read() throws IOException {
synchronized(synchronizer) {
while(read_pos == write_pos) {
if(exception != null) {
throw exception;
}
if(isEOF) {
return -1;
}
try {
synchronizer.wait();
}
catch(InterruptedException e) {
final InterruptedIOException f = new InterruptedIOException(e.getMessage());
f.initCause(e);
throw f;
}
}
return buffer[read_pos++] & 0xff;
}
}
/**
* @return an InterruptedIOException if t was an interruption, null otherwise
*/
public static InterruptedIOException asInterrupt(Throwable t) {
if (t instanceof SocketTimeoutException) {
return null;
}
if (t instanceof InterruptedIOException) {
return (InterruptedIOException) t;
}
if (t instanceof InterruptedException || t instanceof ClosedByInterruptException) {
InterruptedIOException iie =
new InterruptedIOException("Origin: " + t.getClass().getSimpleName());
iie.initCause(t);
return iie;
}
return null;
}
protected ScanInfo preCreateCoprocScanner(final CompactionRequest request,
final ScanType scanType,
final long earliestPutTs,
final List<StoreFileScanner> scanners,
User user) throws IOException {
if (store.getCoprocessorHost() == null) return null;
if (user == null) {
return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, null, request, null);
} else {
try {
return user.getUGI().doAs(new PrivilegedExceptionAction<ScanInfo>() {
@Override
public ScanInfo run() throws Exception {
return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, null, request, null);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
/**
* Returns the stream's response headers, blocking if necessary if they
* have not been received yet.
*/
public synchronized List<String> getResponseHeaders() throws IOException {
try {
while (responseHeaders == null && errorCode == null) {
wait();
}
if (responseHeaders != null) {
return responseHeaders;
}
throw new IOException("stream was reset: " + errorCode);
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
throw rethrow;
}
}
/**
* Returns the stream's response headers, blocking if necessary if they
* have not been received yet.
*/
public synchronized List<String> getResponseHeaders() throws IOException {
long remaining = 0;
long start = 0;
if (readTimeoutMillis != 0) {
start = (System.nanoTime() / 1000000);
remaining = readTimeoutMillis;
}
try {
while (responseHeaders == null && errorCode == null) {
if (readTimeoutMillis == 0) { // No timeout configured.
wait();
} else if (remaining > 0) {
wait(remaining);
remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000);
} else {
throw new SocketTimeoutException("Read response header timeout. readTimeoutMillis: "
+ readTimeoutMillis);
}
}
if (responseHeaders != null) {
return responseHeaders;
}
throw new IOException("stream was reset: " + errorCode);
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
throw rethrow;
}
}
/**
* Returns an {@link IOException} to represent a timeout. By default this
* method returns {@link java.io.InterruptedIOException}. If {@code cause}
* is non-null it is set as the cause of the returned exception.
*/
protected IOException newTimeoutException(IOException cause) {
InterruptedIOException e = new InterruptedIOException("timeout");
if (cause != null) {
e.initCause(cause);
}
return e;
}
/**
* Returns the stream's response headers, blocking if necessary if they
* have not been received yet.
*/
public synchronized List<String> getResponseHeaders() throws IOException {
long remaining = 0;
long start = 0;
if (readTimeoutMillis != 0) {
start = (System.nanoTime() / 1000000);
remaining = readTimeoutMillis;
}
try {
while (responseHeaders == null && errorCode == null) {
if (readTimeoutMillis == 0) { // No timeout configured.
wait();
} else if (remaining > 0) {
wait(remaining);
remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000);
} else {
throw new SocketTimeoutException("Read response header timeout. readTimeoutMillis: "
+ readTimeoutMillis);
}
}
if (responseHeaders != null) {
return responseHeaders;
}
throw new IOException("stream was reset: " + errorCode);
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
throw rethrow;
}
}
/**
* Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
* passed. When used asynchronously, this method will block until the {@link #run(Object)}
* method has been called.
* @return the response object or {@code null} if no response was passed
*/
public synchronized R get() throws IOException {
while (!resultSet) {
try {
this.wait();
} catch (InterruptedException ie) {
InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
exception.initCause(ie);
throw exception;
}
}
return result;
}
/**
* Returns the stream's response headers, blocking if necessary if they
* have not been received yet.
*/
public synchronized List<String> getResponseHeaders() throws IOException {
long remaining = 0;
long start = 0;
if (readTimeoutMillis != 0) {
start = (System.nanoTime() / 1000000);
remaining = readTimeoutMillis;
}
try {
while (responseHeaders == null && errorCode == null) {
if (readTimeoutMillis == 0) { // No timeout configured.
wait();
} else if (remaining > 0) {
wait(remaining);
remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000);
} else {
throw new SocketTimeoutException("Read response header timeout. readTimeoutMillis: "
+ readTimeoutMillis);
}
}
if (responseHeaders != null) {
return responseHeaders;
}
throw new IOException("stream was reset: " + errorCode);
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
throw rethrow;
}
}
/**
* Returns the stream's response headers, blocking if necessary if they
* have not been received yet.
*/
public synchronized List<String> getResponseHeaders() throws IOException {
long remaining = 0;
long start = 0;
if (readTimeoutMillis != 0) {
start = (System.nanoTime() / 1000000);
remaining = readTimeoutMillis;
}
try {
while (responseHeaders == null && errorCode == null) {
if (readTimeoutMillis == 0) { // No timeout configured.
wait();
} else if (remaining > 0) {
wait(remaining);
remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000);
} else {
throw new SocketTimeoutException("Read response header timeout. readTimeoutMillis: "
+ readTimeoutMillis);
}
}
if (responseHeaders != null) {
return responseHeaders;
}
throw new IOException("stream was reset: " + errorCode);
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
throw rethrow;
}
}
/**
* Returns the stream's response headers, blocking if necessary if they
* have not been received yet.
*/
public synchronized List<String> getResponseHeaders() throws IOException {
long remaining = 0;
long start = 0;
if (readTimeoutMillis != 0) {
start = (System.nanoTime() / 1000000);
remaining = readTimeoutMillis;
}
try {
while (responseHeaders == null && errorCode == null) {
if (readTimeoutMillis == 0) { // No timeout configured.
wait();
} else if (remaining > 0) {
wait(remaining);
remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000);
} else {
throw new SocketTimeoutException("Read response header timeout. readTimeoutMillis: "
+ readTimeoutMillis);
}
}
if (responseHeaders != null) {
return responseHeaders;
}
throw new IOException("stream was reset: " + errorCode);
} catch (InterruptedException e) {
InterruptedIOException rethrow = new InterruptedIOException();
rethrow.initCause(e);
throw rethrow;
}
}
/**
* Returns an {@link IOException} to represent a timeout. By default this method returns
* {@link java.io.InterruptedIOException}. If {@code cause} is non-null it is set as the cause of
* the returned exception.
*/
protected IOException newTimeoutException(IOException cause) {
InterruptedIOException e = new InterruptedIOException("timeout");
if (cause != null) {
e.initCause(cause);
}
return e;
}
/**
* Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
* passed. When used asynchronously, this method will block until the {@link #run(Object)} method
* has been called.
* @return the response object or {@code null} if no response was passed
*/
public synchronized R get() throws IOException {
while (!resultSet) {
try {
this.wait();
} catch (InterruptedException ie) {
InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
exception.initCause(ie);
throw exception;
}
}
return result;
}
void processLine(String line) throws IOException {
// Does it look like a data of json structured output?
Matcher matcher = LOG_LINE_REGEX.matcher(line);
if (!matcher.matches()) {
return;
}
String time = matcher.group(1);
String data = matcher.group(2);
try {
@SuppressWarnings("unchecked")
Map<String, Object> json = JsonUtils.reader().forType(HashMap.class).readValue(data); //NOPMD
// are the required fields set?
String exchange = validate((String) json.remove("exchange"));
if (exchange == null) {
// This log entry is not valid json format
return;
}
long keyTimeMillis = KeyGenerator.getKeyTimeMillis(exchange);
long until = now() - logsController.getRetentionTime().toMillis();
if (keyTimeMillis < until) {
// This log entry is too old.. don't process it..
return;
}
InflightData inflightData = getInflightData(exchange, time);
String id = validate((String) json.remove("id"));
String step = (String) json.remove("step");
if (step == null) {
// Looks like an exchange level logging event.
processLogLineExchange(json, inflightData, exchange, time);
} else {
// Looks like a step level logging event.
processLogLineStep(json, inflightData, step, id);
}
} catch (JsonDBException | ClassCastException | IOException ignored) {
/// log record not in the expected format.
} catch (InterruptedException e) {
final InterruptedIOException rethrow = new InterruptedIOException(e.getMessage());
rethrow.initCause(e);
throw rethrow;
}
}
private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
final Configuration conf, final CancelableProgressable reporter) throws IOException {
LOG.info("Recover lease on dfs file " + p);
long startWaiting = EnvironmentEdgeManager.currentTime();
// Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
// usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
// beyond that limit 'to be safe'.
long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
// This setting should be a little bit above what the cluster dfs heartbeat is set to.
long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000);
// This should be set to how long it'll take for us to timeout against primary datanode if it
// is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the
// default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this
// timeout, then further recovery will take liner backoff with this base, to avoid endless
// preemptions when this value is not properly configured.
long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000);
Method isFileClosedMeth = null;
// whether we need to look for isFileClosed method
boolean findIsFileClosedMeth = true;
boolean recovered = false;
// We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
for (int nbAttempt = 0; !recovered; nbAttempt++) {
recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
if (recovered) {
break;
}
checkIfCancelled(reporter);
if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
break;
}
try {
// On the first time through wait the short 'firstPause'.
if (nbAttempt == 0) {
Thread.sleep(firstPause);
} else {
// Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check
// isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though.
long localStartWaiting = EnvironmentEdgeManager.currentTime();
while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) < subsequentPauseBase *
nbAttempt) {
Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
if (findIsFileClosedMeth) {
try {
isFileClosedMeth =
dfs.getClass().getMethod("isFileClosed", new Class[] { Path.class });
} catch (NoSuchMethodException nsme) {
LOG.debug("isFileClosed not available");
} finally {
findIsFileClosedMeth = false;
}
}
if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
recovered = true;
break;
}
checkIfCancelled(reporter);
}
}
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
return recovered;
}
/**
* Lock the row or throw otherwise
* @param rowKey the row key
* @return RowLock used to eventually release the lock
* @throws TimeoutIOException if the lock could not be acquired within the
* allowed rowLockWaitDuration and InterruptedException if interrupted while
* waiting to acquire lock.
*/
public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws IOException {
RowLockContext rowLockContext = null;
RowLockImpl result = null;
TraceScope traceScope = null;
// If we're tracing start a span to show how long this took.
if (Trace.isTracing()) {
traceScope = Trace.startSpan("LockManager.getRowLock");
traceScope.getSpan().addTimelineAnnotation("Getting a lock");
}
boolean success = false;
try {
// Keep trying until we have a lock or error out.
// TODO: do we need to add a time component here?
while (result == null) {
// Try adding a RowLockContext to the lockedRows.
// If we can add it then there's no other transactions currently running.
rowLockContext = new RowLockContext(rowKey);
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
// if there was a running transaction then there's already a context.
if (existingContext != null) {
rowLockContext = existingContext;
}
result = rowLockContext.newRowLock();
}
if (!result.getLock().tryLock(waitDuration, TimeUnit.MILLISECONDS)) {
if (traceScope != null) {
traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
}
throw new TimeoutIOException("Timed out waiting for lock for row: " + rowKey);
}
rowLockContext.setThreadName(Thread.currentThread().getName());
success = true;
return result;
} catch (InterruptedException ie) {
LOGGER.warn("Thread interrupted waiting for lock on row: " + rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
if (traceScope != null) {
traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
}
Thread.currentThread().interrupt();
throw iie;
} finally {
// On failure, clean up the counts just in case this was the thing keeping the context alive.
if (!success && rowLockContext != null) rowLockContext.cleanUp();
if (traceScope != null) {
traceScope.close();
}
}
}
static InterruptedIOException toInterruptedIOException(
String message, InterruptedException e) {
final InterruptedIOException iioe = new InterruptedIOException(message);
iioe.initCause(e);
return iioe;
}
@Override
public void close() throws IOException {
synchronized (this) {
if (closed) {
return;
}
closed = true;
// Don't allow any new request
channelAssociation.shutdown();
// First close the channel and connection
if (strategy != null) {
StreamUtils.safeClose(strategy);
strategy = null;
}
// Then the endpoint
final Endpoint endpoint = this.endpoint;
if (endpoint != null) {
this.endpoint = null;
try {
endpoint.closeAsync();
} catch (UnsupportedOperationException ignored) {
}
}
// Cancel all still active operations
channelAssociation.shutdownNow();
try {
channelAssociation.awaitCompletion(1, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
} finally {
StreamUtils.safeClose(clientConfiguration);
}
// Per WFCORE-1573 remoting endpoints should be closed asynchronously, however consumers of this client
// likely need to wait until the endpoints are fully shutdown.
if (endpoint != null) try {
endpoint.awaitClosed();
} catch (InterruptedException e) {
final InterruptedIOException cause = new InterruptedIOException(e.getLocalizedMessage());
cause.initCause(e);
throw cause;
}
}
}