下面列出了org.apache.hadoop.fs.s3.S3Exception#org.apache.hadoop.io.retry.RetryPolicies 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Creates an Endpoint class for testing purpose.
*
* @param conf - Conf
* @param address - InetAddres
* @param rpcTimeout - rpcTimeOut
* @return EndPoint
* @throws Exception
*/
public static EndpointStateMachine createEndpoint(Configuration conf,
InetSocketAddress address, int rpcTimeout) throws Exception {
RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
long version =
RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
StorageContainerDatanodeProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), rpcTimeout,
RetryPolicies.TRY_ONCE_THEN_FAIL).getProxy();
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
return new EndpointStateMachine(address, rpcClient,
new LegacyHadoopConfigurationSource(conf));
}
/**
* Create a scm security client.
* @param conf - Ozone configuration.
*
* @return {@link SCMSecurityProtocol}
* @throws IOException
*/
public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient(
OzoneConfiguration conf) throws IOException {
RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
ProtobufRpcEngine.class);
long scmVersion =
RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
InetSocketAddress address =
getScmAddressForSecurityProtocol(conf);
RetryPolicy retryPolicy =
RetryPolicies.retryForeverWithFixedSleep(
1000, TimeUnit.MILLISECONDS);
return new SCMSecurityProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(SCMSecurityProtocolPB.class, scmVersion,
address, UserGroupInformation.getCurrentUser(),
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf), retryPolicy).getProxy());
}
private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException {
NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
address, conf, ugi, NamenodeProtocolPB.class);
if (withRetries) { // create the proxy with retries
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
TimeUnit.MILLISECONDS);
Map<String, RetryPolicy> methodNameToPolicyMap
= new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("getBlocks", timeoutPolicy);
methodNameToPolicyMap.put("getAccessKeys", timeoutPolicy);
NamenodeProtocol translatorProxy =
new NamenodeProtocolTranslatorPB(proxy);
return (NamenodeProtocol) RetryProxy.create(
NamenodeProtocol.class, translatorProxy, methodNameToPolicyMap);
} else {
return new NamenodeProtocolTranslatorPB(proxy);
}
}
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
nnUri, ClientProtocol.class, true, null);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Integer.MAX_VALUE,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT));
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { ClientProtocol.class }, dummyHandler);
DFSClient client = new DFSClient(null, proxy, conf, null);
return client;
}
private static NativeFileSystemStore createDefaultStore(Configuration conf) {
NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap =
new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("storeFile", methodPolicy);
methodNameToPolicyMap.put("rename", methodPolicy);
return (NativeFileSystemStore)
RetryProxy.create(NativeFileSystemStore.class, store,
methodNameToPolicyMap);
}
private static FileSystemStore createDefaultStore(Configuration conf) {
FileSystemStore store = new Jets3tFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("storeBlock", methodPolicy);
methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
store, methodNameToPolicyMap);
}
/**
* Returns a ConnectionId object.
* @param addr Remote address for the connection.
* @param protocol Protocol for RPC.
* @param ticket UGI
* @param rpcTimeout timeout
* @param conf Configuration object
* @return A ConnectionId instance
* @throws IOException
*/
static ConnectionId getConnectionId(InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException {
if (connectionRetryPolicy == null) {
final int max = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
final int retryInterval = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
CommonConfigurationKeysPublic
.IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT);
connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
max, retryInterval, TimeUnit.MILLISECONDS);
}
return new ConnectionId(addr, protocol, ticket, rpcTimeout,
connectionRetryPolicy, conf);
}
private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException {
NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
address, conf, ugi, NamenodeProtocolPB.class);
if (withRetries) { // create the proxy with retries
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
TimeUnit.MILLISECONDS);
Map<String, RetryPolicy> methodNameToPolicyMap
= new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("getBlocks", timeoutPolicy);
methodNameToPolicyMap.put("getAccessKeys", timeoutPolicy);
NamenodeProtocol translatorProxy =
new NamenodeProtocolTranslatorPB(proxy);
return (NamenodeProtocol) RetryProxy.create(
NamenodeProtocol.class, translatorProxy, methodNameToPolicyMap);
} else {
return new NamenodeProtocolTranslatorPB(proxy);
}
}
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
nnUri, ClientProtocol.class, true, null);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Integer.MAX_VALUE,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT));
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { ClientProtocol.class }, dummyHandler);
DFSClient client = new DFSClient(null, proxy, conf, null);
return client;
}
private static NativeFileSystemStore createDefaultStore(Configuration conf) {
NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap =
new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("storeFile", methodPolicy);
methodNameToPolicyMap.put("rename", methodPolicy);
return (NativeFileSystemStore)
RetryProxy.create(NativeFileSystemStore.class, store,
methodNameToPolicyMap);
}
private static FileSystemStore createDefaultStore(Configuration conf) {
FileSystemStore store = new Jets3tFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("storeBlock", methodPolicy);
methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
store, methodNameToPolicyMap);
}
/**
* Returns a ConnectionId object.
* @param addr Remote address for the connection.
* @param protocol Protocol for RPC.
* @param ticket UGI
* @param rpcTimeout timeout
* @param conf Configuration object
* @return A ConnectionId instance
* @throws IOException
*/
static ConnectionId getConnectionId(InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException {
if (connectionRetryPolicy == null) {
final int max = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
final int retryInterval = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
CommonConfigurationKeysPublic
.IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT);
connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
max, retryInterval, TimeUnit.MILLISECONDS);
}
return new ConnectionId(addr, protocol, ticket, rpcTimeout,
connectionRetryPolicy, conf);
}
private static RaidProtocol createRaidnode(RaidProtocol rpcRaidnode)
throws IOException {
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, 5000, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class,
RetryPolicies.retryByRemoteException(
RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (RaidProtocol) RetryProxy.create(RaidProtocol.class,
rpcRaidnode, methodNameToPolicyMap);
}
private static SnapshotProtocol createSnapshotNode(SnapshotProtocol rpcSnapshotNode)
throws IOException {
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, 5000, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class,
RetryPolicies.retryByRemoteException(
RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (SnapshotProtocol) RetryProxy.create(SnapshotProtocol.class,
rpcSnapshotNode, methodNameToPolicyMap);
}
private static AvatarProtocol createAvatarnode(AvatarProtocol rpcAvatarnode)
throws IOException {
RetryPolicy createPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(5, 5000, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (AvatarProtocol) RetryProxy.create(AvatarProtocol.class,
rpcAvatarnode, methodNameToPolicyMap);
}
private static AvatarProtocol createAvatarnode(AvatarProtocol rpcAvatarnode)
throws IOException {
RetryPolicy createPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(5, 5000, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (AvatarProtocol) RetryProxy.create(AvatarProtocol.class,
rpcAvatarnode, methodNameToPolicyMap);
}
private static NativeFileSystemStore createDefaultStore(Configuration conf) {
NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap =
new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("storeFile", methodPolicy);
return (NativeFileSystemStore)
RetryProxy.create(NativeFileSystemStore.class, store,
methodNameToPolicyMap);
}
private static FileSystemStore createDefaultStore(Configuration conf) {
FileSystemStore store = new Jets3tFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("storeBlock", methodPolicy);
methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
store, methodNameToPolicyMap);
}
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
Configuration conf)
throws IOException {
long sleepTime = conf.getLong("dfs.client.rpc.retry.sleep",
LEASE_SOFTLIMIT_PERIOD);
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, sleepTime, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class,
RetryPolicies.retryByRemoteException(
RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
rpcNamenode, methodNameToPolicyMap);
}
private static HighTideProtocol createHighTidenode(HighTideProtocol rpcHighTidenode)
throws IOException {
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, 5000, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class,
RetryPolicies.retryByRemoteException(
RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (HighTideProtocol) RetryProxy.create(HighTideProtocol.class,
rpcHighTidenode, methodNameToPolicyMap);
}
private static NativeFileSystemStore createDefaultStore(Configuration conf) {
NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap =
new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("storeFile", methodPolicy);
return (NativeFileSystemStore)
RetryProxy.create(NativeFileSystemStore.class, store,
methodNameToPolicyMap);
}
private static FileSystemStore createDefaultStore(Configuration conf) {
FileSystemStore store = new Jets3tFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("storeBlock", methodPolicy);
methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
store, methodNameToPolicyMap);
}
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
throws IOException {
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class,
RetryPolicies.retryByRemoteException(
RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
rpcNamenode, methodNameToPolicyMap);
}
/**
* Adds a new Recon server to the set of endpoints.
* @param address Recon address.
* @throws IOException
*/
public void addReconServer(InetSocketAddress address) throws IOException {
LOG.info("Adding Recon Server : {}", address.toString());
writeLock();
try {
if (scmMachines.containsKey(address)) {
LOG.warn("Trying to add an existing SCM Machine to Machines group. " +
"Ignoring the request.");
return;
}
Configuration hadoopConfig =
LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
RPC.setProtocolEngine(hadoopConfig, ReconDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
long version =
RPC.getProtocolVersion(ReconDatanodeProtocolPB.class);
RetryPolicy retryPolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(10,
60000, TimeUnit.MILLISECONDS);
ReconDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
ReconDatanodeProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), hadoopConfig,
NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
retryPolicy).getProxy();
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
EndpointStateMachine endPoint =
new EndpointStateMachine(address, rpcClient, conf);
endPoint.setPassive(true);
scmMachines.put(address, endPoint);
} finally {
writeUnlock();
}
}
@Test
public void returnsSuccessfulResult() throws Exception {
String result = "bilbo";
RetriableTask<String> task = new RetriableTask<>(
RetryPolicies.RETRY_FOREVER, "test", () -> result);
assertEquals(result, task.call());
}
@Test
public void returnsSuccessfulResultAfterFailures() throws Exception {
String result = "gandalf";
AtomicInteger attempts = new AtomicInteger();
RetriableTask<String> task = new RetriableTask<>(
RetryPolicies.RETRY_FOREVER, "test",
() -> {
if (attempts.incrementAndGet() <= 2) {
throw new Exception("testing");
}
return result;
});
assertEquals(result, task.call());
}
@Test
public void respectsRetryPolicy() {
int expectedAttempts = 3;
AtomicInteger attempts = new AtomicInteger();
RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
expectedAttempts, 1, TimeUnit.MILLISECONDS);
RetriableTask<String> task = new RetriableTask<>(retryPolicy, "thr", () -> {
attempts.incrementAndGet();
throw new ZipException("testing");
});
IOException e = assertThrows(IOException.class, task::call);
assertEquals(ZipException.class, e.getCause().getClass());
assertEquals(expectedAttempts, attempts.get());
}
/**
* A constructor for testing purpose only.
*/
@VisibleForTesting
public KeyOutputStream() {
closed = false;
this.retryPolicyMap = HddsClientUtils.getExceptionList()
.stream()
.collect(Collectors.toMap(Function.identity(),
e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
retryCount = 0;
offset = 0;
blockOutputStreamEntryPool = new BlockOutputStreamEntryPool();
}
protected static RetryPolicy createRetryPolicy(Configuration conf,
String maxWaitTimeStr, long defMaxWaitTime,
String connectRetryIntervalStr, long defRetryInterval) {
long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
long retryIntervalMS =
conf.getLong(connectRetryIntervalStr, defRetryInterval);
if (maxWaitTime == -1) {
// wait forever.
return RetryPolicies.RETRY_FOREVER;
}
Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. "
+ maxWaitTimeStr + " should be a positive value.");
Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
+ connectRetryIntervalStr + "should be a positive value.");
RetryPolicy retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
retryIntervalMS, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(EOFException.class, retryPolicy);
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
exceptionToPolicyMap.put(NMNotYetReadyException.class, retryPolicy);
return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
exceptionToPolicyMap);
}
/**
* Creates the namenode proxy with the passed protocol. This will handle
* creation of either HA- or non-HA-enabled proxy objects, depending upon
* if the provided URI is a configured logical URI.
*
* @param conf the configuration containing the required IPC
* properties, client failover configurations, etc.
* @param nameNodeUri the URI pointing either to a specific NameNode
* or to a logical nameservice.
* @param xface the IPC interface which should be created
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to
* @throws IOException if there is an error creating the proxy
**/
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(conf, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
// Non-HA case
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
} else {
// HA case
Conf config = new Conf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
config.maxRetryAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis));
Text dtService;
if (failoverProxyProvider.useLogicalURI()) {
dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
HdfsConstants.HDFS_URI_SCHEME);
} else {
dtService = SecurityUtil.buildTokenService(
NameNode.getAddress(nameNodeUri));
}
return new ProxyAndInfo<T>(proxy, dtService,
NameNode.getAddress(nameNodeUri));
}
}