下面列出了org.apache.zookeeper.KeeperException#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Waits for the next event from ZooKeeper to arrive.
*
* @param connectionTimeoutMs zookeeper connection timeout in milliseconds
* @throws KeeperException if the connection attempt times out. This will
* be a ZooKeeper ConnectionLoss exception code.
* @throws IOException if interrupted while connecting to ZooKeeper
*/
private void waitForZKConnectionEvent(int connectionTimeoutMs)
throws KeeperException, IOException {
try {
if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
LOG.error("Connection timed out: couldn't connect to ZooKeeper in "
+ connectionTimeoutMs + " milliseconds");
zk.close();
throw KeeperException.create(Code.CONNECTIONLOSS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(
"Interrupted when connecting to zookeeper server", e);
}
}
@Override
protected void abortOpResult(Throwable t,
@Nullable OpResult opResult) {
Throwable cause;
if (null == opResult) {
cause = t;
} else {
assert (opResult instanceof OpResult.ErrorResult);
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) opResult;
if (KeeperException.Code.OK.intValue() == errorResult.getErr()) {
cause = t;
} else {
cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr()));
}
}
if (null != listener) {
listener.onAbort(cause);
}
}
@Test(timeout = 60000)
public void testAbortNullOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
versionedSetOp.abortOpResult(ke, null);
latch.await();
assertTrue(ke == exception.get());
}
@Test(timeout = 60000)
public void testAbortOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
OpResult opResult = new OpResult.ErrorResult(KeeperException.Code.NONODE.intValue());
versionedSetOp.abortOpResult(ke, opResult);
latch.await();
assertTrue(exception.get() instanceof KeeperException.NoNodeException);
}
private void expireZooKeeperSession(ZooKeeper zk, int timeout)
throws IOException, InterruptedException, KeeperException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper newZk = new ZooKeeper(zkServers, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.None && event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
}},
zk.getSessionId(),
zk.getSessionPasswd());
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
newZk.close();
}
@Override
protected void abortOpResult(Throwable t,
@Nullable OpResult opResult) {
Throwable cause;
if (null == opResult) {
cause = t;
} else {
assert (opResult instanceof OpResult.ErrorResult);
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) opResult;
if (KeeperException.Code.OK.intValue() == errorResult.getErr()) {
cause = t;
} else {
cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr()));
}
}
listener.onAbort(cause);
}
@Test(timeout = 60000)
public void testAbortNullOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
versionedSetOp.abortOpResult(ke, null);
latch.await();
assertTrue(ke == exception.get());
}
@Test(timeout = 60000)
public void testAbortOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
OpResult opResult = new OpResult.ErrorResult(KeeperException.Code.NONODE.intValue());
versionedSetOp.abortOpResult(ke, opResult);
latch.await();
assertTrue(exception.get() instanceof KeeperException.NoNodeException);
}
private void expireZooKeeperSession(ZooKeeper zk, int timeout)
throws IOException, InterruptedException, KeeperException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper newZk = new ZooKeeper(zkServers, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.None && event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
}},
zk.getSessionId(),
zk.getSessionPasswd());
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
newZk.close();
}
/**
* Creates the specified node with the specified data and watches it.
*
* <p>Throws an exception if the node already exists.
*
* <p>The node created is persistent and open access.
*
* <p>Returns the version number of the created node if successful.
*
* @param zkw zk reference
* @param znode path of node to create
* @param data data of node to create
* @return version of node created
* @throws KeeperException if unexpected zookeeper exception
* @throws KeeperException.NodeExistsException if node already exists
*/
public static int createAndWatch(ZKWatcher zkw,
String znode, byte [] data)
throws KeeperException, KeeperException.NodeExistsException {
try {
zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
CreateMode.PERSISTENT);
Stat stat = zkw.getRecoverableZooKeeper().exists(znode, zkw);
if (stat == null){
// Likely a race condition. Someone deleted the znode.
throw KeeperException.create(KeeperException.Code.SYSTEMERROR,
"ZK.exists returned null (i.e.: znode does not exist) for znode=" + znode);
}
return stat.getVersion();
} catch (InterruptedException e) {
zkw.interruptedException(e);
return -1;
}
}
@SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
boolean doRetry = false;
if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
{
doRetry = true;
}
else
{
if ( operationAndData.getErrorCallback() != null )
{
operationAndData.getErrorCallback().retriesExhausted(operationAndData);
}
if ( operationAndData.getCallback() != null )
{
sendToBackgroundCallback(operationAndData, event);
}
KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
Exception e = null;
try
{
e = (code != null) ? KeeperException.create(code) : null;
}
catch ( Throwable t )
{
ThreadUtils.checkInterrupted(t);
}
if ( e == null )
{
e = new Exception("Unknown result codegetResultCode()");
}
validateConnection(codeToState(code));
logError("Background operation retry gave up", e);
}
return doRetry;
}
private void checkLatch(CountDownLatch latch, String path) throws InterruptedException {
if (!latch.await(configuration.getZookeeperAsyncTimeout(), TimeUnit.MILLISECONDS)) {
throw new IllegalStateException(
String.format(
"Timed out waiting response for objects from %s, waited %s millis",
path,
configuration.getZookeeperAsyncTimeout()
),
KeeperException.create(Code.OPERATIONTIMEOUT, path)
);
}
}
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> results) {
if (KeeperException.Code.OK.intValue() == rc) { // transaction succeed
for (int i = 0; i < ops.size(); i++) {
ops.get(i).commitOpResult(results.get(i));
}
FutureUtils.complete(result, null);
} else {
KeeperException ke = KeeperException.create(KeeperException.Code.get(rc));
for (int i = 0; i < ops.size(); i++) {
ops.get(i).abortOpResult(ke, null != results ? results.get(i) : null);
}
FutureUtils.completeExceptionally(result, ke);
}
}
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> results) {
if (KeeperException.Code.OK.intValue() == rc) { // transaction succeed
for (int i = 0; i < ops.size(); i++) {
ops.get(i).commitOpResult(results.get(i));
}
FutureUtils.setValue(result, null);
} else {
KeeperException ke = KeeperException.create(KeeperException.Code.get(rc));
for (int i = 0; i < ops.size(); i++) {
ops.get(i).abortOpResult(ke, null != results ? results.get(i) : null);
}
FutureUtils.setException(result, ke);
}
}
/**
* Convert the <i>throwable</i> to zookeeper related exceptions.
*
* @param throwable cause
* @param path zookeeper path
* @return zookeeper related exceptions
*/
public static Throwable zkException(Throwable throwable, String path) {
if (throwable instanceof KeeperException) {
return throwable;
} else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) {
return KeeperException.create(KeeperException.Code.CONNECTIONLOSS, path);
} else if (throwable instanceof InterruptedException) {
return new DLInterruptedException("Interrupted on operating " + path, throwable);
} else {
return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable);
}
}
protected void zkSync(String path) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger rc = new AtomicInteger(KeeperException.Code.OK.intValue());
globalZk().sync(path, (rc2, s, ctx) -> {
if (KeeperException.Code.OK.intValue() != rc2) {
rc.set(rc2);
}
latch.countDown();
}, null);
latch.await();
if (KeeperException.Code.OK.intValue() != rc.get()) {
throw KeeperException.create(KeeperException.Code.get(rc.get()));
}
}
public Set<String> get(String path) throws KeeperException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("getChildren called at: {}", path);
}
Set<String> children = cache.getChildrenAsync(path, this).join();
if (children == null) {
throw KeeperException.create(KeeperException.Code.NONODE);
}
return children;
}
public Set<String> get(String path) throws KeeperException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("getChildren called at: {}", path);
}
Set<String> children = cache.getChildrenAsync(path, this).join();
if (children == null) {
throw KeeperException.create(KeeperException.Code.NONODE);
}
return children;
}
private static MetadataStoreException getException(Code code, String path) {
KeeperException ex = KeeperException.create(code, path);
switch (code) {
case BADVERSION:
return new BadVersionException(ex);
case NONODE:
return new NotFoundException(ex);
default:
return new MetadataStoreException(ex);
}
}
@Override
public void checkError()
{
checkException();
if ( code != KeeperException.Code.OK )
{
throw new RuntimeException(KeeperException.create(code));
}
}