下面列出了怎么用org.apache.hadoop.io.retry.RetryPolicy.RetryAction的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* The execute() method invokes doExecute() until either: 1. doExecute() succeeds, or 2. the command may no longer be
* retried (e.g. runs out of retry-attempts).
*
* @param arguments The list of arguments for the command.
* @return Generic "Object" from doExecute(), on success.
* @throws IOException, IOException, on complete failure.
*/
public T execute(Object... arguments) throws Exception {
Exception latestException;
int counter = 0;
while (true) {
try {
return doExecute(arguments);
} catch (Exception exception) {
LOG.error("Failure in Retriable command: {}", description, exception);
latestException = exception;
}
counter++;
RetryAction action = retryPolicy.shouldRetry(latestException, counter, 0, true);
if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
} else {
break;
}
}
throw new IOException("Couldn't run retriable-command: " + description, latestException);
}
/**
* The execute() method invokes doExecute() until either:
* 1. doExecute() succeeds, or
* 2. the command may no longer be retried (e.g. runs out of retry-attempts).
* @param arguments The list of arguments for the command.
* @return Generic "Object" from doExecute(), on success.
* @throws Exception
*/
public Object execute(Object... arguments) throws Exception {
Exception latestException;
int counter = 0;
while (true) {
try {
return doExecute(arguments);
} catch(Exception exception) {
LOG.error("Failure in Retriable command: " + description, exception);
latestException = exception;
}
counter++;
RetryAction action = retryPolicy.shouldRetry(latestException, counter, 0, true);
if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
} else {
break;
}
}
throw new IOException("Couldn't run retriable-command: " + description,
latestException);
}
/**
* The execute() method invokes doExecute() until either:
* 1. doExecute() succeeds, or
* 2. the command may no longer be retried (e.g. runs out of retry-attempts).
* @param arguments The list of arguments for the command.
* @return Generic "Object" from doExecute(), on success.
* @throws Exception
*/
public Object execute(Object... arguments) throws Exception {
Exception latestException;
int counter = 0;
while (true) {
try {
return doExecute(arguments);
} catch(Exception exception) {
LOG.error("Failure in Retriable command: " + description, exception);
latestException = exception;
}
counter++;
RetryAction action = retryPolicy.shouldRetry(latestException, counter, 0, true);
if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
} else {
break;
}
}
throw new IOException("Couldn't run retriable-command: " + description,
latestException);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
if (policy == null) {
policy = defaultPolicy;
}
// The number of times this method invocation has been failed over.
int invocationFailoverCount = 0;
final boolean isRpc = isRpcInvocation(currentProxy.proxy);
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
int retries = 0;
while (true) {
// The number of times this invocation handler has ever been failed over,
// before this method invocation attempt. Used to prevent concurrent
// failed method invocations from triggering multiple failover attempts.
long invocationAttemptFailoverCount;
synchronized (proxyProvider) {
invocationAttemptFailoverCount = proxyProviderFailoverCount;
}
if (isRpc) {
Client.setCallIdAndRetryCount(callId, retries);
}
try {
Object ret = invokeMethod(method, args);
hasMadeASuccessfulCall = true;
return ret;
} catch (Exception e) {
boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class);
if (!isIdempotentOrAtMostOnce) {
isIdempotentOrAtMostOnce = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(AtMostOnce.class);
}
RetryAction action = policy.shouldRetry(e, retries++,
invocationFailoverCount, isIdempotentOrAtMostOnce);
if (action.action == RetryAction.RetryDecision.FAIL) {
if (action.reason != null) {
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
+ "." + method.getName() + " over " + currentProxy.proxyInfo
+ ". Not retrying because " + action.reason, e);
}
throw e;
} else { // retry or failover
// avoid logging the failover if this is the first call on this
// proxy object, and we successfully achieve the failover without
// any flip-flopping
boolean worthLogging =
!(invocationFailoverCount == 0 && !hasMadeASuccessfulCall);
worthLogging |= LOG.isDebugEnabled();
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY &&
worthLogging) {
String msg = "Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo;
if (invocationFailoverCount > 0) {
msg += " after " + invocationFailoverCount + " fail over attempts";
}
msg += ". Trying to fail over " + formatSleepMessage(action.delayMillis);
LOG.info(msg, e);
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo + ". Retrying "
+ formatSleepMessage(action.delayMillis), e);
}
}
if (action.delayMillis > 0) {
Thread.sleep(action.delayMillis);
}
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
// Make sure that concurrent failed method invocations only cause a
// single actual fail over.
synchronized (proxyProvider) {
if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
proxyProvider.performFailover(currentProxy.proxy);
proxyProviderFailoverCount++;
} else {
LOG.warn("A failover has occurred since the start of this method"
+ " invocation attempt.");
}
currentProxy = proxyProvider.getProxy();
}
invocationFailoverCount++;
}
}
}
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
if (policy == null) {
policy = defaultPolicy;
}
// The number of times this method invocation has been failed over.
int invocationFailoverCount = 0;
final boolean isRpc = isRpcInvocation(currentProxy.proxy);
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
int retries = 0;
while (true) {
// The number of times this invocation handler has ever been failed over,
// before this method invocation attempt. Used to prevent concurrent
// failed method invocations from triggering multiple failover attempts.
long invocationAttemptFailoverCount;
synchronized (proxyProvider) {
invocationAttemptFailoverCount = proxyProviderFailoverCount;
}
if (isRpc) {
Client.setCallIdAndRetryCount(callId, retries);
}
try {
Object ret = invokeMethod(method, args);
hasMadeASuccessfulCall = true;
return ret;
} catch (Exception e) {
boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class);
if (!isIdempotentOrAtMostOnce) {
isIdempotentOrAtMostOnce = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(AtMostOnce.class);
}
RetryAction action = policy.shouldRetry(e, retries++,
invocationFailoverCount, isIdempotentOrAtMostOnce);
if (action.action == RetryAction.RetryDecision.FAIL) {
if (action.reason != null) {
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
+ "." + method.getName() + " over " + currentProxy.proxyInfo
+ ". Not retrying because " + action.reason, e);
}
throw e;
} else { // retry or failover
// avoid logging the failover if this is the first call on this
// proxy object, and we successfully achieve the failover without
// any flip-flopping
boolean worthLogging =
!(invocationFailoverCount == 0 && !hasMadeASuccessfulCall);
worthLogging |= LOG.isDebugEnabled();
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY &&
worthLogging) {
String msg = "Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo;
if (invocationFailoverCount > 0) {
msg += " after " + invocationFailoverCount + " fail over attempts";
}
msg += ". Trying to fail over " + formatSleepMessage(action.delayMillis);
LOG.info(msg, e);
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo + ". Retrying "
+ formatSleepMessage(action.delayMillis), e);
}
}
if (action.delayMillis > 0) {
Thread.sleep(action.delayMillis);
}
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
// Make sure that concurrent failed method invocations only cause a
// single actual fail over.
synchronized (proxyProvider) {
if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
proxyProvider.performFailover(currentProxy.proxy);
proxyProviderFailoverCount++;
} else {
LOG.warn("A failover has occurred since the start of this method"
+ " invocation attempt.");
}
currentProxy = proxyProvider.getProxy();
}
invocationFailoverCount++;
}
}
}
}
}