下面列出了org.apache.hadoop.io.retry.RetryPolicies#RETRY_FOREVER 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void returnsSuccessfulResult() throws Exception {
String result = "bilbo";
RetriableTask<String> task = new RetriableTask<>(
RetryPolicies.RETRY_FOREVER, "test", () -> result);
assertEquals(result, task.call());
}
@Test
public void returnsSuccessfulResultAfterFailures() throws Exception {
String result = "gandalf";
AtomicInteger attempts = new AtomicInteger();
RetriableTask<String> task = new RetriableTask<>(
RetryPolicies.RETRY_FOREVER, "test",
() -> {
if (attempts.incrementAndGet() <= 2) {
throw new Exception("testing");
}
return result;
});
assertEquals(result, task.call());
}
protected static RetryPolicy createRetryPolicy(Configuration conf,
String maxWaitTimeStr, long defMaxWaitTime,
String connectRetryIntervalStr, long defRetryInterval) {
long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
long retryIntervalMS =
conf.getLong(connectRetryIntervalStr, defRetryInterval);
if (maxWaitTime == -1) {
// wait forever.
return RetryPolicies.RETRY_FOREVER;
}
Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. "
+ maxWaitTimeStr + " should be a positive value.");
Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
+ connectRetryIntervalStr + "should be a positive value.");
RetryPolicy retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
retryIntervalMS, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(EOFException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
exceptionToPolicyMap.put(NMNotYetReadyException.class, retryPolicy);
return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
exceptionToPolicyMap);
}
protected static RetryPolicy createRetryPolicy(Configuration conf,
String maxWaitTimeStr, long defMaxWaitTime,
String connectRetryIntervalStr, long defRetryInterval) {
long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
long retryIntervalMS =
conf.getLong(connectRetryIntervalStr, defRetryInterval);
if (maxWaitTime == -1) {
// wait forever.
return RetryPolicies.RETRY_FOREVER;
}
Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. "
+ maxWaitTimeStr + " should be a positive value.");
Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
+ connectRetryIntervalStr + "should be a positive value.");
RetryPolicy retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
retryIntervalMS, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(EOFException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
exceptionToPolicyMap.put(NMNotYetReadyException.class, retryPolicy);
return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
exceptionToPolicyMap);
}
/**
* Fetch retry policy from Configuration
*/
@Private
@VisibleForTesting
public static RetryPolicy createRetryPolicy(Configuration conf) {
long rmConnectWaitMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
long rmConnectionRetryIntervalMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
YarnConfiguration
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
boolean waitForEver = (rmConnectWaitMS == -1);
if (!waitForEver) {
if (rmConnectWaitMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
+ " can be -1, but can not be other negative numbers");
}
// try connect once
if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
+ " is smaller than "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
+ ". Only try connect once.");
rmConnectWaitMS = 0;
}
}
// Handle HA case first
if (HAUtil.isHAEnabled(conf)) {
final long failoverSleepBaseMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
rmConnectionRetryIntervalMS);
final long failoverSleepMaxMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
rmConnectionRetryIntervalMS);
int maxFailoverAttempts = conf.getInt(
YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
if (maxFailoverAttempts == -1) {
if (waitForEver) {
maxFailoverAttempts = Integer.MAX_VALUE;
} else {
maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
}
}
return RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
failoverSleepBaseMs, failoverSleepMaxMs);
}
if (rmConnectionRetryIntervalMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. " +
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
" should not be negative.");
}
RetryPolicy retryPolicy = null;
if (waitForEver) {
retryPolicy = RetryPolicies.RETRY_FOREVER;
} else {
retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
}
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(EOFException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
return RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
}
/**
* Fetch retry policy from Configuration
*/
@Private
@VisibleForTesting
public static RetryPolicy createRetryPolicy(Configuration conf) {
long rmConnectWaitMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
long rmConnectionRetryIntervalMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
YarnConfiguration
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
boolean waitForEver = (rmConnectWaitMS == -1);
if (!waitForEver) {
if (rmConnectWaitMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
+ " can be -1, but can not be other negative numbers");
}
// try connect once
if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
+ " is smaller than "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
+ ". Only try connect once.");
rmConnectWaitMS = 0;
}
}
// Handle HA case first
if (HAUtil.isHAEnabled(conf)) {
final long failoverSleepBaseMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
rmConnectionRetryIntervalMS);
final long failoverSleepMaxMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
rmConnectionRetryIntervalMS);
int maxFailoverAttempts = conf.getInt(
YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
if (maxFailoverAttempts == -1) {
if (waitForEver) {
maxFailoverAttempts = Integer.MAX_VALUE;
} else {
maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
}
}
return RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
failoverSleepBaseMs, failoverSleepMaxMs);
}
if (rmConnectionRetryIntervalMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. " +
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
" should not be negative.");
}
RetryPolicy retryPolicy = null;
if (waitForEver) {
retryPolicy = RetryPolicies.RETRY_FOREVER;
} else {
retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
}
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(EOFException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
return RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
}