下面列出了org.apache.zookeeper.KeeperException#ConnectionLossException ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Enter the barrier and block until all members have entered or the timeout has
* elapsed
*
* @param maxWait max time to block
* @param unit time unit
* @return true if the entry was successful, false if the timeout elapsed first
* @throws Exception interruptions, errors, etc.
*/
public boolean enter(long maxWait, TimeUnit unit) throws Exception
{
long startMs = System.currentTimeMillis();
boolean hasMaxWait = (unit != null);
long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;
boolean readyPathExists = (client.checkExists().usingWatcher(watcher).forPath(readyPath) != null);
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath);
boolean result = (readyPathExists || internalEnter(startMs, hasMaxWait, maxWaitMs));
if ( connectionLost.get() )
{
throw new KeeperException.ConnectionLossException();
}
return result;
}
@Test
public void testFailure() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 100, 100, new RetryOneTime(1));
client.start();
try
{
client.checkExists().forPath("/hey");
client.checkExists().inBackground().forPath("/hey");
server.stop();
client.checkExists().forPath("/hey");
Assert.fail();
}
catch ( KeeperException.ConnectionLossException e )
{
// correct
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
/**
* Test for a bug encountered during development of HADOOP-8163:
* ensureBaseNode() should throw an exception if it has to retry
* more than 3 times to create any part of the path.
*/
@Test
public void testEnsureBaseNodeFails() throws Exception {
Mockito.doThrow(new KeeperException.ConnectionLossException())
.when(mockZK).create(
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
try {
elector.ensureParentZNode();
Assert.fail("Did not throw!");
} catch (IOException ioe) {
if (!(ioe.getCause() instanceof KeeperException.ConnectionLossException)) {
throw ioe;
}
}
// Should have tried three times
Mockito.verify(mockZK, Mockito.times(3)).create(
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
}
/**
* Test for a bug encountered during development of HADOOP-8163:
* ensureBaseNode() should throw an exception if it has to retry
* more than 3 times to create any part of the path.
*/
@Test
public void testEnsureBaseNodeFails() throws Exception {
Mockito.doThrow(new KeeperException.ConnectionLossException())
.when(mockZK).create(
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
try {
elector.ensureParentZNode();
Assert.fail("Did not throw!");
} catch (IOException ioe) {
if (!(ioe.getCause() instanceof KeeperException.ConnectionLossException)) {
throw ioe;
}
}
// Should have tried three times
Mockito.verify(mockZK, Mockito.times(3)).create(
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
}
/**
* Perform the given operation, retrying if the connection fails
*/
@SuppressWarnings("unchecked")
public <T> T retryOperation(ZkOperation operation)
throws KeeperException, InterruptedException {
KeeperException exception = null;
for (int i = 0; i < retryCount; i++) {
try {
if (i > 0 && isClosed()) {
throw new AlreadyClosedException();
}
return (T) operation.execute();
} catch (KeeperException.ConnectionLossException e) {
if (exception == null) {
exception = e;
}
if (Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
throw new InterruptedException();
}
if (i != retryCount -1) {
retryDelay(i);
}
}
}
throw exception;
}
/**
* Enter the barrier and block until all members have entered or the timeout has
* elapsed
*
* @param maxWait max time to block
* @param unit time unit
* @return true if the entry was successful, false if the timeout elapsed first
* @throws Exception interruptions, errors, etc.
*/
public boolean enter(long maxWait, TimeUnit unit) throws Exception
{
long startMs = System.currentTimeMillis();
boolean hasMaxWait = (unit != null);
long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;
boolean readyPathExists = (client.checkExists().usingWatcher(watcher).forPath(readyPath) != null);
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath);
boolean result = (readyPathExists || internalEnter(startMs, hasMaxWait, maxWaitMs));
if ( connectionLost.get() )
{
throw new KeeperException.ConnectionLossException();
}
return result;
}
@Test
public void testReconnectWithoutExpiration() throws Exception
{
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
server.stop();
try
{
client.checkExists().forPath("/"); // any API call that will invoke the retry policy, etc.
}
catch ( KeeperException.ConnectionLossException ignore )
{
}
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
server.restart();
client.checkExists().forPath("/");
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
}
private void processNodeChange(final String path) {
try {
String value = provider.getDataString(path);
put(path, value);
} catch (final KeeperException | InterruptedException ex) {
if (ex instanceof KeeperException.NoNodeException || ex instanceof KeeperException.ConnectionLossException) {
log.debug(ex.getMessage());
return;
}
log.error("PathTree put error : " + ex.getMessage());
}
}
private synchronized void checkTimeouts() throws Exception
{
int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
long elapsed = System.currentTimeMillis() - connectionStartMs;
if ( elapsed >= minTimeout )
{
if ( zooKeeper.hasNewConnectionString() )
{
handleNewConnectionString();
}
else
{
int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
if ( elapsed > maxTimeout )
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.warn(String.format("Connection attempt unsuccessful after %d (greater than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed, maxTimeout));
}
reset();
}
else
{
KeeperException.ConnectionLossException connectionLossException = new CuratorConnectionLossException();
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);
}
new EventTrace("connections-timed-out", tracer.get(), getSessionId()).commit();
throw connectionLossException;
}
}
}
}
private String protectedPathInForeground(String adjustedPath, byte[] data) throws Exception
{
try
{
return pathInForeground(adjustedPath, data);
}
catch ( Exception e)
{
ThreadUtils.checkInterrupted(e);
if ( ( e instanceof KeeperException.ConnectionLossException ||
!( e instanceof KeeperException )) && protectedId != null )
{
/*
* CURATOR-45 + CURATOR-79: we don't know if the create operation was successful or not,
* register the znode to be sure it is deleted later.
*/
new FindAndDeleteProtectedNodeInBackground(client, ZKPaths.getPathAndNode(adjustedPath).getPath(), protectedId).execute();
/*
* The current UUID is scheduled to be deleted, it is not safe to use it again.
* If this builder is used again later create a new UUID
*/
protectedId = UUID.randomUUID().toString();
}
throw e;
}
}
@Test
public void testProtectedCreateNodeDeletion() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryNTimes(0, 0));
try
{
client.start();
for ( int i = 0; i < 2; ++i )
{
CuratorFramework localClient = (i == 0) ? client : client.usingNamespace("nm");
localClient.create().forPath("/parent");
Assert.assertEquals(localClient.getChildren().forPath("/parent").size(), 0);
CreateBuilderImpl createBuilder = (CreateBuilderImpl)localClient.create();
createBuilder.failNextCreateForTesting = true;
FindAndDeleteProtectedNodeInBackground.debugInsertError.set(true);
try
{
createBuilder.withProtection().forPath("/parent/test");
Assert.fail("failNextCreateForTesting should have caused a ConnectionLossException");
}
catch ( KeeperException.ConnectionLossException e )
{
// ignore, correct
}
timing.sleepABit();
List<String> children = localClient.getChildren().forPath("/parent");
Assert.assertEquals(children.size(), 0, children.toString()); // protected mode should have deleted the node
localClient.delete().forPath("/parent");
}
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
private String protectedPathInForeground(String adjustedPath, byte[] data, List<ACL> aclList) throws Exception
{
try
{
return pathInForeground(adjustedPath, data, aclList);
}
catch ( Exception e)
{
ThreadUtils.checkInterrupted(e);
if ( ( e instanceof KeeperException.ConnectionLossException ||
!( e instanceof KeeperException )) && protectedMode.doProtected() )
{
/*
* CURATOR-45 + CURATOR-79: we don't know if the create operation was successful or not,
* register the znode to be sure it is deleted later.
*/
new FindAndDeleteProtectedNodeInBackground(client, ZKPaths.getPathAndNode(adjustedPath).getPath(), protectedMode.protectedId()).execute();
/*
* The current UUID is scheduled to be deleted, it is not safe to use it again.
* If this builder is used again later create a new UUID
*/
protectedMode.resetProtectedId();
}
throw e;
}
}
void logError(String reason, final Throwable e)
{
if ( (reason == null) || (reason.length() == 0) )
{
reason = "n/a";
}
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) || !(e instanceof KeeperException) )
{
if ( e instanceof KeeperException.ConnectionLossException )
{
if ( LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL || logAsErrorConnectionErrors.compareAndSet(true, false) )
{
log.error(reason, e);
}
else
{
log.debug(reason, e);
}
}
else
{
log.error(reason, e);
}
}
final String localReason = reason;
unhandledErrorListeners.forEach(l -> l.unhandledError(localReason, e));
if ( debugUnhandledErrorListener != null )
{
debugUnhandledErrorListener.unhandledError(reason, e);
}
}
@Test
public void testProtectedCreateNodeDeletion() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryNTimes(0, 0));
try
{
client.start();
for ( int i = 0; i < 2; ++i )
{
CuratorFramework localClient = (i == 0) ? client : client.usingNamespace("nm");
localClient.create().forPath("/parent");
Assert.assertEquals(localClient.getChildren().forPath("/parent").size(), 0);
CreateBuilderImpl createBuilder = (CreateBuilderImpl)localClient.create();
createBuilder.failNextCreateForTesting = true;
FindAndDeleteProtectedNodeInBackground.debugInsertError.set(true);
try
{
createBuilder.withProtection().forPath("/parent/test");
Assert.fail("failNextCreateForTesting should have caused a ConnectionLossException");
}
catch ( KeeperException.ConnectionLossException e )
{
// ignore, correct
}
timing.sleepABit();
List<String> children = localClient.getChildren().forPath("/parent");
Assert.assertEquals(children.size(), 0, children.toString()); // protected mode should have deleted the node
localClient.delete().forPath("/parent");
}
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
private boolean internalLeave(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception
{
String ourPathName = ZKPaths.getNodeFromPath(ourPath);
boolean ourNodeShouldExist = true;
boolean result = true;
for(;;)
{
if ( connectionLost.get() )
{
throw new KeeperException.ConnectionLossException();
}
List<String> children;
try
{
children = client.getChildren().forPath(barrierPath);
}
catch ( KeeperException.NoNodeException dummy )
{
children = Lists.newArrayList();
}
children = filterAndSortChildren(children);
if ( (children == null) || (children.size() == 0) )
{
break;
}
int ourIndex = children.indexOf(ourPathName);
if ( (ourIndex < 0) && ourNodeShouldExist )
{
if ( connectionLost.get() )
{
break; // connection was lost but we've reconnected. However, our ephemeral node is gone
}
else
{
throw new IllegalStateException(String.format("Our path (%s) is missing", ourPathName));
}
}
if ( children.size() == 1 )
{
if ( ourNodeShouldExist && !children.get(0).equals(ourPathName) )
{
throw new IllegalStateException(String.format("Last path (%s) is not ours (%s)", children.get(0), ourPathName));
}
checkDeleteOurPath(ourNodeShouldExist);
break;
}
Stat stat;
boolean IsLowestNode = (ourIndex == 0);
if ( IsLowestNode )
{
String highestNodePath = ZKPaths.makePath(barrierPath, children.get(children.size() - 1));
stat = client.checkExists().usingWatcher(watcher).forPath(highestNodePath);
}
else
{
String lowestNodePath = ZKPaths.makePath(barrierPath, children.get(0));
stat = client.checkExists().usingWatcher(watcher).forPath(lowestNodePath);
checkDeleteOurPath(ourNodeShouldExist);
ourNodeShouldExist = false;
}
if ( stat != null )
{
if ( hasMaxWait )
{
long elapsed = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsed;
if ( thisWaitMs <= 0 )
{
result = false;
}
else
{
wait(thisWaitMs);
}
}
else
{
wait();
}
}
}
try
{
client.delete().forPath(readyPath);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
return result;
}
@Test
public void testServerCrash() throws Exception
{
final int TIMEOUT = 1000;
final CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).connectionTimeoutMs(TIMEOUT).retryPolicy(new RetryOneTime(1)).build();
try
{
client.start();
final DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
barrier.setBarrier();
final ExecutorService service = Executors.newSingleThreadExecutor();
Future<Object> future = service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(TIMEOUT / 2);
server.stop();
return null;
}
}
);
barrier.waitOnBarrier(TIMEOUT * 2, TimeUnit.SECONDS);
future.get();
Assert.fail();
}
catch ( KeeperException.ConnectionLossException expected )
{
// expected
}
finally
{
client.close();
}
}
void logError(String reason, final Throwable e)
{
if ( (reason == null) || (reason.length() == 0) )
{
reason = "n/a";
}
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) || !(e instanceof KeeperException) )
{
if ( e instanceof KeeperException.ConnectionLossException )
{
if ( LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL || logAsErrorConnectionErrors.compareAndSet(true, false) )
{
log.error(reason, e);
}
else
{
log.debug(reason, e);
}
}
else
{
log.error(reason, e);
}
}
final String localReason = reason;
unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()
{
@Override
public Void apply(UnhandledErrorListener listener)
{
listener.unhandledError(localReason, e);
return null;
}
});
if ( debugUnhandledErrorListener != null )
{
debugUnhandledErrorListener.unhandledError(reason, e);
}
}
private static boolean isIgnoredException(final Throwable cause) {
return cause instanceof KeeperException.ConnectionLossException || cause instanceof KeeperException.NoNodeException || cause instanceof KeeperException.NodeExistsException;
}
@Test
public void testGuaranteedRemoveWatch() throws Exception {
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString(server.getConnectString()).
retryPolicy(new RetryOneTime(1)).
build();
try
{
client.start();
AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
String path = "/";
CountDownLatch removeLatch = new CountDownLatch(1);
Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
client.checkExists().usingWatcher(watcher).forPath(path);
server.stop();
Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
//Remove the watch while we're not connected
try
{
client.watches().remove(watcher).guaranteed().forPath(path);
Assert.fail();
}
catch(KeeperException.ConnectionLossException e)
{
//Expected
}
server.restart();
timing.awaitLatch(removeLatch);
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testServerCrash() throws Exception
{
final int TIMEOUT = 1000;
final CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).connectionTimeoutMs(TIMEOUT).retryPolicy(new RetryOneTime(1)).build();
try
{
client.start();
final DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
barrier.setBarrier();
final ExecutorService service = Executors.newSingleThreadExecutor();
Future<Object> future = service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(TIMEOUT / 2);
server.stop();
return null;
}
}
);
barrier.waitOnBarrier(TIMEOUT * 2, TimeUnit.SECONDS);
future.get();
Assert.fail();
}
catch ( KeeperException.ConnectionLossException expected )
{
// expected
}
finally
{
client.close();
}
}