类org.apache.zookeeper.AsyncCallback源码实例Demo

下面列出了怎么用org.apache.zookeeper.AsyncCallback的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: opensharding-spi-impl   文件: BaseClientTest.java
protected final void asyncGet(final IZookeeperClient client) throws KeeperException, InterruptedException {
    final CountDownLatch ready = new CountDownLatch(1);
    String key = "a/b";
    String value = "bbb11";
    client.createAllNeedPath(key, value, CreateMode.PERSISTENT);
    AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() {
        @Override
        public void processResult(final int rc, final String path, final Object ctx, final byte[] data, final Stat stat) {
            assertThat(new String(data), is(ctx));
            ready.countDown();
        }
    };
    client.getData(key, callback, value);
    ready.await();
    client.deleteCurrentBranch("a/b");
}
 
源代码2 项目: xian   文件: BackgroundSyncImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("BackgroundSyncImpl");
    final String data = operationAndData.getData();
    client.getZooKeeper().sync
    (
        data,
        new AsyncCallback.VoidCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx)
            {
                trace.setReturnCode(rc).setRequestBytesLength(data).commit();
                CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        },
        context
    );
}
 
源代码3 项目: xian   文件: GetACLBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    try
    {
        final OperationTrace             trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Background");
        AsyncCallback.ACLCallback   callback = new AsyncCallback.ACLCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat)
            {
                trace.setReturnCode(rc).setPath(path).setStat(stat).commit();
                CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        client.getZooKeeper().getACL(operationAndData.getData(), responseStat, callback, backgrounding.getContext());
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e);
    }
}
 
源代码4 项目: distributedlog   文件: Utils.java
/**
 * Asynchronously create zookeeper path recursively and optimistically.
 *
 * @param zkc Zookeeper client
 * @param pathToCreate  Zookeeper full path
 * @param parentPathShouldNotCreate zookeeper parent path should not be created
 * @param data Zookeeper data
 * @param acl Acl of the zk path
 * @param createMode Create mode of zk path
 */
public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic(
    final ZooKeeperClient zkc,
    final String pathToCreate,
    final Optional<String> parentPathShouldNotCreate,
    final byte[] data,
    final List<ACL> acl,
    final CreateMode createMode) {
    final CompletableFuture<Void> result = new CompletableFuture<Void>();

    zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
            data, acl, createMode, new AsyncCallback.StringCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            handleKeeperExceptionCode(rc, path, result);
        }
    }, result);

    return result;
}
 
源代码5 项目: distributedlog   文件: ZKSubscriptionsStore.java
@Override
public CompletableFuture<Map<String, DLSN>> getLastCommitPositions() {
    final CompletableFuture<Map<String, DLSN>> result = new CompletableFuture<Map<String, DLSN>>();
    try {
        this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() {
            @Override
            public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
                if (KeeperException.Code.NONODE.intValue() == rc) {
                    result.complete(new HashMap<String, DLSN>());
                } else if (KeeperException.Code.OK.intValue() != rc) {
                    result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
                } else {
                    getLastCommitPositions(result, children);
                }
            }
        }, null);
    } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
        result.completeExceptionally(zkce);
    } catch (InterruptedException ie) {
        result.completeExceptionally(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
    }
    return result;
}
 
源代码6 项目: distributedlog   文件: Utils.java
/**
 * Asynchronously create zookeeper path recursively and optimistically
 *
 * @param zkc Zookeeper client
 * @param pathToCreate  Zookeeper full path
 * @param parentPathShouldNotCreate zookeeper parent path should not be created
 * @param data Zookeeper data
 * @param acl Acl of the zk path
 * @param createMode Create mode of zk path
 */
public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic(
    final ZooKeeperClient zkc,
    final String pathToCreate,
    final Optional<String> parentPathShouldNotCreate,
    final byte[] data,
    final List<ACL> acl,
    final CreateMode createMode) {
    final Promise<BoxedUnit> result = new Promise<BoxedUnit>();

    zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
            data, acl, createMode, new AsyncCallback.StringCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            handleKeeperExceptionCode(rc, path, result);
        }
    }, result);

    return result;
}
 
源代码7 项目: hbase   文件: TestReadOnlyZKClient.java
@Test
public void testNotCloseZkWhenPending() throws Exception {
  ZooKeeper mockedZK = mock(ZooKeeper.class);
  Exchanger<AsyncCallback.DataCallback> exchanger = new Exchanger<>();
  doAnswer(i -> {
    exchanger.exchange(i.getArgument(2));
    return null;
  }).when(mockedZK).getData(anyString(), anyBoolean(),
    any(AsyncCallback.DataCallback.class), any());
  doAnswer(i -> null).when(mockedZK).close();
  when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED);
  RO_ZK.zookeeper = mockedZK;
  CompletableFuture<byte[]> future = RO_ZK.get(PATH);
  AsyncCallback.DataCallback callback = exchanger.exchange(null);
  // 2 * keep alive time to ensure that we will not close the zk when there are pending requests
  Thread.sleep(6000);
  assertNotNull(RO_ZK.zookeeper);
  verify(mockedZK, never()).close();
  callback.processResult(Code.OK.intValue(), PATH, null, DATA, null);
  assertArrayEquals(DATA, future.get());
  // now we will close the idle connection.
  waitForIdleConnectionClosed();
  verify(mockedZK, times(1)).close();
}
 
源代码8 项目: curator   文件: CuratorMultiTransactionImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception
{
    try
    {
        final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background");
        AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, List<OpResult> opResults)
            {
                trace.commit();
                List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null;
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext());
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, null);
    }
}
 
源代码9 项目: curator   文件: BackgroundSyncImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("BackgroundSyncImpl");
    final String data = operationAndData.getData();
    client.getZooKeeper().sync
    (
        data,
        new AsyncCallback.VoidCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx)
            {
                trace.setReturnCode(rc).setRequestBytesLength(data).commit();
                CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        },
        context
    );
}
 
源代码10 项目: curator   文件: GetACLBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    try
    {
        final OperationTrace             trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Background");
        AsyncCallback.ACLCallback   callback = new AsyncCallback.ACLCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat)
            {
                trace.setReturnCode(rc).setPath(path).setStat(stat).commit();
                CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        client.getZooKeeper().getACL(operationAndData.getData(), responseStat, callback, backgrounding.getContext());
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, null);
    }
}
 
源代码11 项目: opensharding-spi-impl   文件: ContentionStrategy.java
@Override
public final void deleteOnlyCurrent(final String key, final AsyncCallback.VoidCallback callback, final Object ctx) throws KeeperException, InterruptedException {
    getProvider().executeContention(new LeaderElection() {
        
        @Override
        public void action() throws KeeperException, InterruptedException {
            getProvider().delete(getProvider().getRealPath(key), callback, ctx);
        }
    });
}
 
源代码12 项目: opensharding-spi-impl   文件: UsualClient.java
@Override
public void deleteOnlyCurrent(final String key, final AsyncCallback.VoidCallback callback, final Object ctx) throws KeeperException, InterruptedException {
    if (getRootNode().equals(key)) {
        deleteNamespace();
        return;
    }
    execStrategy.deleteOnlyCurrent(key, callback, ctx);
}
 
源代码13 项目: xian   文件: CreateBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
{
    try
    {
        final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Background");
        final byte[] data = operationAndData.getData().getData();
        client.getZooKeeper().create
            (
                operationAndData.getData().getPath(),
                data,
                acling.getAclList(operationAndData.getData().getPath()),
                createMode,
                new AsyncCallback.StringCallback()
                {
                    @Override
                    public void processResult(int rc, String path, Object ctx, String name)
                    {
                        trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).commit();

                        if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
                        {
                            backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData().getPath(), backgrounding, createParentsAsContainers);
                        }
                        else
                        {
                            sendBackgroundResponse(rc, path, ctx, name, operationAndData);
                        }
                    }
                },
                backgrounding.getContext()
            );
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e);
    }
}
 
源代码14 项目: xian   文件: SetDataBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
{
    try
    {
        final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Background");
        final byte[] data = operationAndData.getData().getData();
        client.getZooKeeper().setData
        (
            operationAndData.getData().getPath(),
            data,
            version,
            new AsyncCallback.StatCallback()
            {
                @SuppressWarnings({"unchecked"})
                @Override
                public void processResult(int rc, String path, Object ctx, Stat stat)
                {
                    trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).setStat(stat).commit();
                    CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_DATA, rc, path, null, ctx, stat, null, null, null, null);
                    client.processBackgroundOperation(operationAndData, event);
                }
            },
            backgrounding.getContext()
        );
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e);
    }
}
 
源代码15 项目: xian   文件: DeleteBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    try
    {
        final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("DeleteBuilderImpl-Background");
        client.getZooKeeper().delete
            (
                operationAndData.getData(),
                version,
                new AsyncCallback.VoidCallback()
                {
                    @Override
                    public void processResult(int rc, String path, Object ctx)
                    {
                        trace.setReturnCode(rc).setPath(path).commit();
                        if ( (rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded )
                        {
                            backgroundDeleteChildrenThenNode(operationAndData);
                        }
                        else
                        {
                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null);
                            client.processBackgroundOperation(operationAndData, event);
                        }
                    }
                },
                backgrounding.getContext()
            );
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e);
    }
}
 
源代码16 项目: xian   文件: GetChildrenBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    try
    {
        final OperationTrace       trace = client.getZookeeperClient().startAdvancedTracer("GetChildrenBuilderImpl-Background");
        AsyncCallback.Children2Callback callback = new AsyncCallback.Children2Callback()
        {
            @Override
            public void processResult(int rc, String path, Object o, List<String> strings, Stat stat)
            {
                trace.setReturnCode(rc).setPath(path).setWithWatcher(watching.getWatcher() != null).setStat(stat).commit();
                if ( strings == null )
                {
                    strings = Lists.newArrayList();
                }
                CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN, rc, path, null, o, stat, null, strings, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        if ( watching.isWatched() )
        {
            client.getZooKeeper().getChildren(operationAndData.getData(), true, callback, backgrounding.getContext());
        }
        else
        {
            client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
        }
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e);
    }
}
 
源代码17 项目: distributedlog   文件: LedgerAllocatorPool.java
private void createAllocators(int numAllocators) throws InterruptedException, IOException {
    final AtomicInteger numPendings = new AtomicInteger(numAllocators);
    final AtomicInteger numFailures = new AtomicInteger(0);
    final CountDownLatch latch = new CountDownLatch(1);
    AsyncCallback.StringCallback createCallback = new AsyncCallback.StringCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            if (KeeperException.Code.OK.intValue() != rc) {
                numFailures.incrementAndGet();
                latch.countDown();
                return;
            }
            if (numPendings.decrementAndGet() == 0 && numFailures.get() == 0) {
                latch.countDown();
            }
        }
    };
    for (int i = 0; i < numAllocators; i++) {
        zkc.get().create(poolPath + "/A", new byte[0],
                         zkc.getDefaultACL(),
                         CreateMode.PERSISTENT_SEQUENTIAL,
                         createCallback, null);
    }
    latch.await();
    if (numFailures.get() > 0) {
        throw new IOException("Failed to create " + numAllocators + " allocators.");
    }
}
 
源代码18 项目: distributedlog   文件: ZKSessionLock.java
/**
 * Get client id and its ephemeral owner.
 *
 * @param zkClient
 *          zookeeper client
 * @param lockPath
 *          lock path
 * @param nodeName
 *          node name
 * @return client id and its ephemeral owner.
 */
static CompletableFuture<Pair<String, Long>> asyncParseClientID(
        ZooKeeper zkClient, String lockPath, String nodeName) {
    String[] parts = nodeName.split("_");
    // member_<clientid>_s<owner_session>_
    if (4 == parts.length && parts[2].startsWith("s")) {
        long sessionOwner = Long.parseLong(parts[2].substring(1));
        String clientId;
        try {
            clientId = URLDecoder.decode(parts[1], UTF_8.name());
            return FutureUtils.value(Pair.of(clientId, sessionOwner));
        } catch (UnsupportedEncodingException e) {
            // if failed to parse client id, we have to get client id by zookeeper#getData.
        }
    }
    final CompletableFuture<Pair<String, Long>> promise = new CompletableFuture<Pair<String, Long>>();
    zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            if (KeeperException.Code.OK.intValue() != rc) {
                promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
            } else {
                promise.complete(Pair.of(deserializeClientId(data), stat.getEphemeralOwner()));
            }
        }
    }, null);
    return promise;
}
 
源代码19 项目: distributedlog   文件: ZKSessionLock.java
private void deleteLockNode(final CompletableFuture<Void> promise) {
    if (null == currentNode) {
        promise.complete(null);
        return;
    }

    zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() {
        @Override
        public void processResult(final int rc, final String path, Object ctx) {
            lockStateExecutor.submit(lockPath, new SafeRunnable() {
                @Override
                public void safeRun() {
                    if (KeeperException.Code.OK.intValue() == rc) {
                        LOG.info("Deleted lock node {} for {} successfully.", path, lockId);
                    } else if (KeeperException.Code.NONODE.intValue() == rc
                            || KeeperException.Code.SESSIONEXPIRED.intValue() == rc) {
                        LOG.info("Delete node failed. Node already gone for node {} id {}, rc = {}",
                                new Object[] { path, lockId, KeeperException.Code.get(rc) });
                    } else {
                        LOG.error("Failed on deleting lock node {} for {} : {}",
                                new Object[] { path, lockId, KeeperException.Code.get(rc) });
                    }

                    FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup);
                    promise.complete(null);
                }
            });
        }
    }, null);
}
 
源代码20 项目: distributedlog   文件: ZKSessionLock.java
/**
 * Check Lock Owner Phase 1 : Get all lock waiters.
 *
 * @param lockWatcher
 *          lock watcher.
 * @param wait
 *          whether to wait for ownership.
 * @param promise
 *          promise to satisfy with current lock owner
 */
private void checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
                                             final boolean wait,
                                             final CompletableFuture<String> promise) {
    zk.getChildren(lockPath, false, new AsyncCallback.Children2Callback() {
        @Override
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
            processLockWaiters(lockWatcher, wait, rc, children, promise);
        }
    }, null);
}
 
源代码21 项目: distributedlog   文件: ZKSubscriptionStateStore.java
CompletableFuture<DLSN> getLastCommitPositionFromZK() {
    final CompletableFuture<DLSN> result = new CompletableFuture<DLSN>();
    try {
        logger.debug("Reading last commit position from path {}", zkPath);
        zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
                if (KeeperException.Code.NONODE.intValue() == rc) {
                    result.complete(DLSN.NonInclusiveLowerBound);
                } else if (KeeperException.Code.OK.intValue() != rc) {
                    result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
                } else {
                    try {
                        DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
                        result.complete(dlsn);
                    } catch (Exception t) {
                        logger.warn("Invalid last commit position found from path {}", zkPath, t);
                        // invalid dlsn recorded in subscription state store
                        result.complete(DLSN.NonInclusiveLowerBound);
                    }
                }
            }
        }, null);
    } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
        result.completeExceptionally(zkce);
    } catch (InterruptedException ie) {
        result.completeExceptionally(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
    }
    return result;
}
 
源代码22 项目: distributedlog   文件: ZKSubscriptionStateStore.java
Future<DLSN> getLastCommitPositionFromZK() {
    final Promise<DLSN> result = new Promise<DLSN>();
    try {
        logger.debug("Reading last commit position from path {}", zkPath);
        zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
                if (KeeperException.Code.NONODE.intValue() == rc) {
                    result.setValue(DLSN.NonInclusiveLowerBound);
                } else if (KeeperException.Code.OK.intValue() != rc) {
                    result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
                } else {
                    try {
                        DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
                        result.setValue(dlsn);
                    } catch (Exception t) {
                        logger.warn("Invalid last commit position found from path {}", zkPath, t);
                        // invalid dlsn recorded in subscription state store
                        result.setValue(DLSN.NonInclusiveLowerBound);
                    }
                }
            }
        }, null);
    } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
        result.setException(zkce);
    } catch (InterruptedException ie) {
        result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
    }
    return result;
}
 
源代码23 项目: distributedlog   文件: ZKSessionLock.java
/**
 * Get client id and its ephemeral owner.
 *
 * @param zkClient
 *          zookeeper client
 * @param lockPath
 *          lock path
 * @param nodeName
 *          node name
 * @return client id and its ephemeral owner.
 */
static Future<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) {
    String[] parts = nodeName.split("_");
    // member_<clientid>_s<owner_session>_
    if (4 == parts.length && parts[2].startsWith("s")) {
        long sessionOwner = Long.parseLong(parts[2].substring(1));
        String clientId;
        try {
            clientId = URLDecoder.decode(parts[1], UTF_8.name());
            return Future.value(Pair.of(clientId, sessionOwner));
        } catch (UnsupportedEncodingException e) {
            // if failed to parse client id, we have to get client id by zookeeper#getData.
        }
    }
    final Promise<Pair<String, Long>> promise = new Promise<Pair<String, Long>>();
    zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            if (KeeperException.Code.OK.intValue() != rc) {
                promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
            } else {
                promise.setValue(Pair.of(deserializeClientId(data), stat.getEphemeralOwner()));
            }
        }
    }, null);
    return promise;
}
 
源代码24 项目: distributedlog   文件: ZKSessionLock.java
private void deleteLockNode(final Promise<BoxedUnit> promise) {
    if (null == currentNode) {
        promise.setValue(BoxedUnit.UNIT);
        return;
    }

    zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() {
        @Override
        public void processResult(final int rc, final String path, Object ctx) {
            lockStateExecutor.submit(lockPath, new SafeRunnable() {
                @Override
                public void safeRun() {
                    if (KeeperException.Code.OK.intValue() == rc) {
                        LOG.info("Deleted lock node {} for {} successfully.", path, lockId);
                    } else if (KeeperException.Code.NONODE.intValue() == rc ||
                            KeeperException.Code.SESSIONEXPIRED.intValue() == rc) {
                        LOG.info("Delete node failed. Node already gone for node {} id {}, rc = {}",
                                new Object[] { path, lockId, KeeperException.Code.get(rc) });
                    } else {
                        LOG.error("Failed on deleting lock node {} for {} : {}",
                                new Object[] { path, lockId, KeeperException.Code.get(rc) });
                    }

                    FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup);
                    promise.setValue(BoxedUnit.UNIT);
                }
            });
        }
    }, null);
}
 
源代码25 项目: ignite   文件: ZookeeperClient.java
/**
 * @param path Path.
 * @param data Data.
 * @param createMode Create mode.
 * @param cb Callback.
 */
private void createAsync(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) {
    if (data == null)
        data = EMPTY_BYTES;

    CreateOperation op = new CreateOperation(path, data, createMode, cb);

    zk.create(path, data, ZK_ACL, createMode, new CreateCallbackWrapper(op), null);
}
 
源代码26 项目: ignite   文件: ZookeeperClient.java
/**
 * @param path path.
 * @param data Data.
 * @param createMode Create mode.
 * @param cb Callback.
 */
CreateOperation(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) {
    this.path = path;
    this.data = data;
    this.createMode = createMode;
    this.cb = cb;
}
 
源代码27 项目: curator   文件: RemoveWatchesBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData)
        throws Exception
{
    try
    {
        final TimeTrace   trace = client.getZookeeperClient().startTracer("RemoteWatches-Background");

        AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx)
            {
                trace.commit();
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        };

        ZooKeeper zkClient = client.getZooKeeper();
        NamespaceWatcher namespaceWatcher = makeNamespaceWatcher(operationAndData.getData());
        if(namespaceWatcher == null)
        {
            zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());
        }
        else
        {
            zkClient.removeWatches(operationAndData.getData(), namespaceWatcher, watcherType, local, callback, operationAndData.getContext());
        }
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, null);
    }
}
 
源代码28 项目: curator   文件: GetConfigBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<Void> operationAndData) throws Exception
{
    try
    {
        final TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Background");
        AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
            {
                watching.commitWatcher(rc, false);
                trace.commit();
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, rc, path, null, ctx, stat, data, null, null, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        if ( watching.isWatched() )
        {
            client.getZooKeeper().getConfig(true, callback, backgrounding.getContext());
        }
        else
        {
            client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), callback, backgrounding.getContext());
        }
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, watching);
    }
}
 
源代码29 项目: curator   文件: ExistsBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    try
    {
        final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("ExistsBuilderImpl-Background");
        AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat)
            {
                watching.commitWatcher(rc, true);
                trace.setReturnCode(rc).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(stat).commit();
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        if ( watching.isWatched() )
        {
            client.getZooKeeper().exists(operationAndData.getData(), true, callback, backgrounding.getContext());
        }
        else
        {
            client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(operationAndData.getData()), callback, backgrounding.getContext());
        }
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, watching);
    }
}
 
源代码30 项目: curator   文件: ReconfigBuilderImpl.java
@Override
public void performBackgroundOperation(final OperationAndData<Void> data) throws Exception
{
    try
    {
        final TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Background");
        AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat)
            {
                trace.commit();
                if ( (responseStat != null) && (stat != null) )
                {
                    DataTree.copyStat(stat, responseStat);
                }
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.RECONFIG, rc, path, null, ctx, stat, bytes, null, null, null, null);
                client.processBackgroundOperation(data, event);
            }
        };
        ((ZooKeeperAdmin)client.getZooKeeper()).reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, null);
    }
}
 
 类所在包
 同包方法