下面列出了org.apache.hadoop.io.retry.RetryPolicies#failoverOnNetworkException ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
nnUri, ClientProtocol.class, true, null);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Integer.MAX_VALUE,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT));
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { ClientProtocol.class }, dummyHandler);
DFSClient client = new DFSClient(null, proxy, conf, null);
return client;
}
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
nnUri, ClientProtocol.class, true, null);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Integer.MAX_VALUE,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT));
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { ClientProtocol.class }, dummyHandler);
DFSClient client = new DFSClient(null, proxy, conf, null);
return client;
}
/**
* Fetch retry policy from Configuration
*/
@Private
@VisibleForTesting
public static RetryPolicy createRetryPolicy(Configuration conf) {
long rmConnectWaitMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
long rmConnectionRetryIntervalMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
YarnConfiguration
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
boolean waitForEver = (rmConnectWaitMS == -1);
if (!waitForEver) {
if (rmConnectWaitMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
+ " can be -1, but can not be other negative numbers");
}
// try connect once
if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
+ " is smaller than "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
+ ". Only try connect once.");
rmConnectWaitMS = 0;
}
}
// Handle HA case first
if (HAUtil.isHAEnabled(conf)) {
final long failoverSleepBaseMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
rmConnectionRetryIntervalMS);
final long failoverSleepMaxMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
rmConnectionRetryIntervalMS);
int maxFailoverAttempts = conf.getInt(
YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
if (maxFailoverAttempts == -1) {
if (waitForEver) {
maxFailoverAttempts = Integer.MAX_VALUE;
} else {
maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
}
}
return RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
failoverSleepBaseMs, failoverSleepMaxMs);
}
if (rmConnectionRetryIntervalMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. " +
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
" should not be negative.");
}
RetryPolicy retryPolicy = null;
if (waitForEver) {
retryPolicy = RetryPolicies.RETRY_FOREVER;
} else {
retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
}
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(EOFException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
return RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
}
/**
* Generate a dummy namenode proxy instance that utilizes our hacked
* {@link LossyRetryInvocationHandler}. Proxy instance generated using this
* method will proactively drop RPC responses. Currently this method only
* support HA setup. null will be returned if the given configuration is not
* for HA.
*
* @param config the configuration containing the required IPC
* properties, client failover configurations, etc.
* @param nameNodeUri the URI pointing either to a specific NameNode
* or to a logical nameservice.
* @param xface the IPC interface which should be created
* @param numResponseToDrop The number of responses to drop for each RPC call
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to. Will return null of the
* given configuration does not support HA.
* @throws IOException if there is an error creating the proxy
*/
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
Configuration config, URI nameNodeUri, Class<T> xface,
int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
Preconditions.checkArgument(numResponseToDrop > 0);
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(config, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider != null) { // HA case
int delay = config.getInt(
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
int maxCap = config.getInt(
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
int maxFailoverAttempts = config.getInt(
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = config.getInt(
DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
numResponseToDrop, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
Math.max(numResponseToDrop + 1, maxRetryAttempts), delay,
maxCap));
T proxy = (T) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { xface }, dummyHandler);
Text dtService;
if (failoverProxyProvider.useLogicalURI()) {
dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
HdfsConstants.HDFS_URI_SCHEME);
} else {
dtService = SecurityUtil.buildTokenService(
NameNode.getAddress(nameNodeUri));
}
return new ProxyAndInfo<T>(proxy, dtService,
NameNode.getAddress(nameNodeUri));
} else {
LOG.warn("Currently creating proxy using " +
"LossyRetryInvocationHandler requires NN HA setup");
return null;
}
}
@Override
public synchronized void initialize(URI uri, Configuration conf
) throws IOException {
super.initialize(uri, conf);
setConf(conf);
/** set user pattern based on configuration file */
UserParam.setUserPattern(conf.get(
DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
ugi = UserGroupInformation.getCurrentUser();
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.nnAddrs = resolveNNAddr();
boolean isHA = HAUtil.isClientFailoverConfigured(conf, this.uri);
boolean isLogicalUri = isHA && HAUtil.isLogicalUri(conf, this.uri);
// In non-HA or non-logical URI case, the code needs to call
// getCanonicalUri() in order to handle the case where no port is
// specified in the URI
this.tokenServiceName = isLogicalUri ?
HAUtil.buildTokenServiceForLogicalUri(uri, getScheme())
: SecurityUtil.buildTokenService(getCanonicalUri());
if (!isHA) {
this.retryPolicy =
RetryUtils.getDefaultRetryPolicy(
conf,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
SafeModeException.class);
} else {
int maxFailoverAttempts = conf.getInt(
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = conf.getInt(
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
int failoverSleepBaseMillis = conf.getInt(
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
int failoverSleepMaxMillis = conf.getInt(
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
this.retryPolicy = RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis,
failoverSleepMaxMillis);
}
this.workingDir = getHomeDirectory();
this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
this.disallowFallbackToInsecureCluster = !conf.getBoolean(
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.delegationToken = null;
}
/**
* Fetch retry policy from Configuration
*/
@Private
@VisibleForTesting
public static RetryPolicy createRetryPolicy(Configuration conf) {
long rmConnectWaitMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
long rmConnectionRetryIntervalMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
YarnConfiguration
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
boolean waitForEver = (rmConnectWaitMS == -1);
if (!waitForEver) {
if (rmConnectWaitMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
+ " can be -1, but can not be other negative numbers");
}
// try connect once
if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
+ " is smaller than "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
+ ". Only try connect once.");
rmConnectWaitMS = 0;
}
}
// Handle HA case first
if (HAUtil.isHAEnabled(conf)) {
final long failoverSleepBaseMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
rmConnectionRetryIntervalMS);
final long failoverSleepMaxMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
rmConnectionRetryIntervalMS);
int maxFailoverAttempts = conf.getInt(
YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
if (maxFailoverAttempts == -1) {
if (waitForEver) {
maxFailoverAttempts = Integer.MAX_VALUE;
} else {
maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
}
}
return RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
failoverSleepBaseMs, failoverSleepMaxMs);
}
if (rmConnectionRetryIntervalMS < 0) {
throw new YarnRuntimeException("Invalid Configuration. " +
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
" should not be negative.");
}
RetryPolicy retryPolicy = null;
if (waitForEver) {
retryPolicy = RetryPolicies.RETRY_FOREVER;
} else {
retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
}
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(EOFException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
return RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
}
/**
* Generate a dummy namenode proxy instance that utilizes our hacked
* {@link LossyRetryInvocationHandler}. Proxy instance generated using this
* method will proactively drop RPC responses. Currently this method only
* support HA setup. null will be returned if the given configuration is not
* for HA.
*
* @param config the configuration containing the required IPC
* properties, client failover configurations, etc.
* @param nameNodeUri the URI pointing either to a specific NameNode
* or to a logical nameservice.
* @param xface the IPC interface which should be created
* @param numResponseToDrop The number of responses to drop for each RPC call
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to. Will return null of the
* given configuration does not support HA.
* @throws IOException if there is an error creating the proxy
*/
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
Configuration config, URI nameNodeUri, Class<T> xface,
int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
Preconditions.checkArgument(numResponseToDrop > 0);
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(config, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider != null) { // HA case
int delay = config.getInt(
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
int maxCap = config.getInt(
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
int maxFailoverAttempts = config.getInt(
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = config.getInt(
DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
numResponseToDrop, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
Math.max(numResponseToDrop + 1, maxRetryAttempts), delay,
maxCap));
T proxy = (T) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { xface }, dummyHandler);
Text dtService;
if (failoverProxyProvider.useLogicalURI()) {
dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
HdfsConstants.HDFS_URI_SCHEME);
} else {
dtService = SecurityUtil.buildTokenService(
NameNode.getAddress(nameNodeUri));
}
return new ProxyAndInfo<T>(proxy, dtService,
NameNode.getAddress(nameNodeUri));
} else {
LOG.warn("Currently creating proxy using " +
"LossyRetryInvocationHandler requires NN HA setup");
return null;
}
}
@Override
public synchronized void initialize(URI uri, Configuration conf
) throws IOException {
super.initialize(uri, conf);
setConf(conf);
/** set user pattern based on configuration file */
UserParam.setUserPattern(conf.get(
DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
ugi = UserGroupInformation.getCurrentUser();
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.nnAddrs = resolveNNAddr();
boolean isHA = HAUtil.isClientFailoverConfigured(conf, this.uri);
boolean isLogicalUri = isHA && HAUtil.isLogicalUri(conf, this.uri);
// In non-HA or non-logical URI case, the code needs to call
// getCanonicalUri() in order to handle the case where no port is
// specified in the URI
this.tokenServiceName = isLogicalUri ?
HAUtil.buildTokenServiceForLogicalUri(uri, getScheme())
: SecurityUtil.buildTokenService(getCanonicalUri());
if (!isHA) {
this.retryPolicy =
RetryUtils.getDefaultRetryPolicy(
conf,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
SafeModeException.class);
} else {
int maxFailoverAttempts = conf.getInt(
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = conf.getInt(
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
int failoverSleepBaseMillis = conf.getInt(
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
int failoverSleepMaxMillis = conf.getInt(
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
this.retryPolicy = RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis,
failoverSleepMaxMillis);
}
this.workingDir = getHomeDirectory();
this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
this.disallowFallbackToInsecureCluster = !conf.getBoolean(
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.delegationToken = null;
}