类org.apache.curator.RetryLoop源码实例Demo

下面列出了怎么用org.apache.curator.RetryLoop的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: xian   文件: DistributedAtomicValue.java
private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
{
    long            startMs = System.currentTimeMillis();
    int             retryCount = 0;

    boolean         done = false;
    while ( !done )
    {
        result.stats.incrementOptimisticTries();
        if ( tryOnce(result, makeValue) )
        {
            result.succeeded = true;
            done = true;
        }
        else
        {
            if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
            {
                done = true;
            }
        }
    }

    result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
}
 
源代码2 项目: xian   文件: EnsurePath.java
@Override
public synchronized void ensure(final CuratorZookeeperClient client, final String path, final boolean makeLastNode) throws Exception
{
    if ( !isSet )
    {
        RetryLoop.callWithRetry
            (
                client,
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        ZKPaths.mkdirs(client.getZooKeeper(), path, makeLastNode, aclProvider, asContainers());
                        helper.set(doNothingHelper);
                        isSet = true;
                        return null;
                    }
                }
            );
    }
}
 
源代码3 项目: xian   文件: CreateBuilderImpl.java
static <T> void backgroundCreateParentsThenNode(final CuratorFrameworkImpl client, final OperationAndData<T> mainOperationAndData, final String path, Backgrounding backgrounding, final boolean createParentsAsContainers)
{
    BackgroundOperation<T> operation = new BackgroundOperation<T>()
    {
        @Override
        public void performBackgroundOperation(OperationAndData<T> dummy) throws Exception
        {
            try
            {
                ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider(), createParentsAsContainers);
            }
            catch ( KeeperException e )
            {
                if ( !RetryLoop.isRetryException(e) )
                {
                    throw e;
                }
                // otherwise safe to ignore as it will get retried
            }
            client.queueOperation(mainOperationAndData);
        }
    };
    OperationAndData<T> parentOperation = new OperationAndData<T>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
    client.queueOperation(parentOperation);
}
 
源代码4 项目: xian   文件: TempGetDataBuilderImpl.java
@Override
public byte[] forPath(String path) throws Exception
{
    final String    localPath = client.fixForNamespace(path);

    OperationTrace       trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground");
    byte[]          responseData = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<byte[]>()
        {
            @Override
            public byte[] call() throws Exception
            {
                return client.getZooKeeper().getData(localPath, false, responseStat);
            }
        }
    );
    trace.setResponseBytesLength(responseData).setPath(path).setStat(responseStat).commit();

    return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;
}
 
源代码5 项目: xian   文件: SetDataBuilderImpl.java
private Stat pathInForeground(final String path, final byte[] data) throws Exception
{
    OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Foreground");
    Stat        resultStat = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<Stat>()
        {
            @Override
            public Stat call() throws Exception
            {
                return client.getZooKeeper().setData(path, data, version);
            }
        }
    );
    trace.setRequestBytesLength(data).setPath(path).setStat(resultStat).commit();
    return resultStat;
}
 
源代码6 项目: xian   文件: SetACLBuilderImpl.java
private Stat pathInForeground(final String path) throws Exception
{
    OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("SetACLBuilderImpl-Foreground");
    Stat        resultStat = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<Stat>()
        {
            @Override
            public Stat call() throws Exception
            {
                return client.getZooKeeper().setACL(path, acling.getAclList(path), version);
            }
        }
    );
    trace.setPath(path).setStat(resultStat).commit();
    return resultStat;
}
 
源代码7 项目: xian   文件: GetACLBuilderImpl.java
private List<ACL> pathInForeground(final String path) throws Exception
{
    OperationTrace    trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Foreground");
    List<ACL>    result = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<List<ACL>>()
        {
            @Override
            public List<ACL> call() throws Exception
            {
                return client.getZooKeeper().getACL(path, responseStat);
            }
        }
    );
    trace.setPath(path).setStat(responseStat).commit();
    return result;
}
 
源代码8 项目: exhibitor   文件: S3BackupProvider.java
private PartETag uploadChunkWithRetry(byte[] buffer, int bytesRead, InitiateMultipartUploadResult initResponse, int index, RetryPolicy retryPolicy) throws Exception
{
    long            startMs = System.currentTimeMillis();
    int             retries = 0;
    for(;;)
    {
        try
        {
            return uploadChunk(buffer, bytesRead, initResponse, index);
        }
        catch ( Exception e )
        {
            if ( !retryPolicy.allowRetry(retries++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
            {
                throw e;
            }
        }
    }
}
 
源代码9 项目: curator   文件: CuratorMultiTransactionImpl.java
private List<CuratorTransactionResult> forOperationsInForeground(final CuratorMultiTransactionRecord record) throws Exception
{
    TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Foreground");
    List<OpResult> responseData = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<List<OpResult>>()
        {
            @Override
            public List<OpResult> call() throws Exception
            {
                return client.getZooKeeper().multi(record);
            }
        }
    );
    trace.commit();

    return CuratorTransactionImpl.wrapResults(client, responseData, record);
}
 
源代码10 项目: curator   文件: TempGetDataBuilderImpl.java
@Override
public byte[] forPath(String path) throws Exception
{
    final String    localPath = client.fixForNamespace(path);

    OperationTrace       trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground");
    byte[]          responseData = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<byte[]>()
        {
            @Override
            public byte[] call() throws Exception
            {
                return client.getZooKeeper().getData(localPath, false, responseStat);
            }
        }
    );
    trace.setResponseBytesLength(responseData).setPath(path).setStat(responseStat).commit();

    return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;
}
 
源代码11 项目: curator   文件: SetDataBuilderImpl.java
private Stat pathInForeground(final String path, final byte[] data) throws Exception
{
    OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Foreground");
    Stat        resultStat = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<Stat>()
        {
            @Override
            public Stat call() throws Exception
            {
                return client.getZooKeeper().setData(path, data, version);
            }
        }
    );
    trace.setRequestBytesLength(data).setPath(path).setStat(resultStat).commit();
    return resultStat;
}
 
源代码12 项目: curator   文件: ReconfigBuilderImpl.java
private byte[] ensembleInForeground() throws Exception
{
    TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Foreground");
    byte[] responseData = RetryLoop.callWithRetry
        (
            client.getZookeeperClient(),
            new Callable<byte[]>()
            {
                @Override
                public byte[] call() throws Exception
                {
                    return ((ZooKeeperAdmin)client.getZooKeeper()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
                }
            }
        );
    trace.commit();
    return responseData;
}
 
源代码13 项目: curator   文件: CuratorTransactionImpl.java
@Override
public Collection<CuratorTransactionResult> commit() throws Exception
{
    Preconditions.checkState(!isCommitted, "transaction already committed");
    isCommitted = true;

    List<OpResult> resultList = RetryLoop.callWithRetry
        (
            client.getZookeeperClient(),
            new Callable<List<OpResult>>()
            {
                @Override
                public List<OpResult> call() throws Exception
                {
                    return doOperation();
                }
            }
        );

    if ( resultList.size() != transaction.metadataSize() )
    {
        throw new IllegalStateException(String.format("Result size (%d) doesn't match input size (%d)", resultList.size(), transaction.metadataSize()));
    }

    return wrapResults(client, resultList, transaction);
}
 
源代码14 项目: curator   文件: SetACLBuilderImpl.java
private Stat pathInForeground(final String path, final List<ACL> aclList) throws Exception
{
    OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("SetACLBuilderImpl-Foreground");
    Stat        resultStat = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<Stat>()
        {
            @Override
            public Stat call() throws Exception
            {
                return client.getZooKeeper().setACL(path, aclList, version);
            }
        }
    );
    trace.setPath(path).setStat(resultStat).commit();
    return resultStat;
}
 
源代码15 项目: curator   文件: AddWatchBuilderImpl.java
private void pathInForeground(final String path) throws Exception
{
    final String fixedPath = client.fixForNamespace(path);
    OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground");
    RetryLoop.callWithRetry
    (
        client.getZookeeperClient(), () -> {
            if ( watching.isWatched() )
            {
                client.getZooKeeper().addWatch(fixedPath, mode);
            }
            else
            {
                client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode);
            }
            return null;
        });
    trace.setPath(fixedPath).setWithWatcher(true).commit();
}
 
源代码16 项目: curator   文件: GetACLBuilderImpl.java
private List<ACL> pathInForeground(final String path) throws Exception
{
    OperationTrace    trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Foreground");
    List<ACL>    result = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<List<ACL>>()
        {
            @Override
            public List<ACL> call() throws Exception
            {
                return client.getZooKeeper().getACL(path, responseStat);
            }
        }
    );
    trace.setPath(path).setStat(responseStat).commit();
    return result;
}
 
源代码17 项目: curator   文件: DistributedAtomicValue.java
private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
{
    long            startMs = System.currentTimeMillis();
    int             retryCount = 0;

    boolean         done = false;
    while ( !done )
    {
        result.stats.incrementOptimisticTries();
        if ( tryOnce(result, makeValue) )
        {
            result.succeeded = true;
            done = true;
        }
        else
        {
            if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
            {
                done = true;
            }
        }
    }

    result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
}
 
源代码18 项目: curator   文件: TestThreadLocalRetryLoop.java
private Void doOperation(CuratorFramework client) throws Exception
{
    try
    {
        RetryLoop.callWithRetry(client.getZookeeperClient(), () -> {
            client.checkExists().forPath("/hey");
            return null;
        });
        Assert.fail("Should have thrown an exception");
    }
    catch ( KeeperException dummy )
    {
        // correct
    }
    return null;
}
 
源代码19 项目: curator   文件: EnsurePath.java
@Override
public synchronized void ensure(final CuratorZookeeperClient client, final String path, final boolean makeLastNode) throws Exception
{
    if ( !isSet )
    {
        RetryLoop.callWithRetry
            (
                client,
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        ZKPaths.mkdirs(client.getZooKeeper(), path, makeLastNode, aclProvider, asContainers());
                        helper.set(doNothingHelper);
                        isSet = true;
                        return null;
                    }
                }
            );
    }
}
 
源代码20 项目: xian   文件: DistributedAtomicValue.java
private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
 {
     long            startMs = System.currentTimeMillis();
     int             retryCount = 0;

     if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) )
     {
         try
         {
             boolean         done = false;
             while ( !done )
             {
                 result.stats.incrementPromotedTries();
                 if ( tryOnce(result, makeValue) )
                 {
                     result.succeeded = true;
                     done = true;
                 }
                 else
                 {
                     if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
                     {
                         done = true;
                     }
                 }
             }
         }
         finally
         {
             mutex.release();
         }
     }

     result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
}
 
源代码21 项目: xian   文件: CreateBuilderImpl.java
private String findProtectedNodeInForeground(final String path) throws Exception
{
    OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-findProtectedNodeInForeground");

    String returnPath = RetryLoop.callWithRetry
        (
            client.getZookeeperClient(),
            new Callable<String>()
            {
                @Override
                public String call() throws Exception
                {
                    String foundNode = null;
                    try
                    {
                        final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
                        List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);

                        foundNode = findNode(children, pathAndNode.getPath(), protectedId);
                    }
                    catch ( KeeperException.NoNodeException ignore )
                    {
                        // ignore
                    }
                    return foundNode;
                }
            }
        );

    trace.setPath(path).commit();
    return returnPath;
}
 
源代码22 项目: xian   文件: CuratorFrameworkImpl.java
<DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
    boolean isInitialExecution = (event == null);
    if ( isInitialExecution )
    {
        performBackgroundOperation(operationAndData);
        return;
    }

    boolean doQueueOperation = false;
    do
    {
        if ( RetryLoop.shouldRetry(event.getResultCode()) )
        {
            doQueueOperation = checkBackgroundRetry(operationAndData, event);
            break;
        }

        if ( operationAndData.getCallback() != null )
        {
            sendToBackgroundCallback(operationAndData, event);
            break;
        }

        processEvent(event);
    }
    while ( false );

    if ( doQueueOperation )
    {
        queueOperation(operationAndData);
    }
}
 
源代码23 项目: xian   文件: CuratorFrameworkImpl.java
private <DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndData, Throwable e)
{
    do
    {
        if ( (operationAndData != null) && RetryLoop.isRetryException(e) )
        {
            if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
            {
                log.debug("Retry-able exception received", e);
            }
            if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
            {
                if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
                {
                    log.debug("Retrying operation");
                }
                backgroundOperations.offer(operationAndData);
                break;
            }
            else
            {
                if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
                {
                    log.debug("Retry policy did not allow retry");
                }
                if ( operationAndData.getErrorCallback() != null )
                {
                    operationAndData.getErrorCallback().retriesExhausted(operationAndData);
                }
            }
        }

        logError("Background exception was not retry-able or retry gave up", e);
    }
    while ( false );
}
 
源代码24 项目: xian   文件: NamespaceImpl.java
String    fixForNamespace(String path, boolean isSequential)
{
    if ( ensurePathNeeded.get() )
    {
        try
        {
            final CuratorZookeeperClient zookeeperClient = client.getZookeeperClient();
            RetryLoop.callWithRetry
            (
                zookeeperClient,
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        ZKPaths.mkdirs(zookeeperClient.getZooKeeper(), ZKPaths.makePath("/", namespace), true, client.getAclProvider(), true);
                        return null;
                    }
                }
            );
            ensurePathNeeded.set(false);
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            client.logError("Ensure path threw exception", e);
        }
    }

    return ZKPaths.fixForNamespace(namespace, path, isSequential);
}
 
源代码25 项目: xian   文件: CuratorTransactionImpl.java
@Override
public Collection<CuratorTransactionResult> commit() throws Exception
{
    Preconditions.checkState(!isCommitted, "transaction already committed");
    isCommitted = true;

    final AtomicBoolean firstTime = new AtomicBoolean(true);
    List<OpResult>      resultList = RetryLoop.callWithRetry
    (
        client.getZookeeperClient(),
        new Callable<List<OpResult>>()
        {
            @Override
            public List<OpResult> call() throws Exception
            {
                return doOperation(firstTime);
            }
        }
    );
    
    if ( resultList.size() != transaction.metadataSize() )
    {
        throw new IllegalStateException(String.format("Result size (%d) doesn't match input size (%d)", resultList.size(), transaction.metadataSize()));
    }

    ImmutableList.Builder<CuratorTransactionResult>     builder = ImmutableList.builder();
    for ( int i = 0; i < resultList.size(); ++i )
    {
        OpResult                                    opResult = resultList.get(i);
        CuratorMultiTransactionRecord.TypeAndPath   metadata = transaction.getMetadata(i);
        CuratorTransactionResult                    curatorResult = makeCuratorResult(opResult, metadata);
        builder.add(curatorResult);
    }

    return builder.build();
}
 
源代码26 项目: curator   文件: CreateBuilderImpl.java
private String findProtectedNodeInForeground(final String path) throws Exception
{
    OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-findProtectedNodeInForeground");

    String returnPath = RetryLoop.callWithRetry
        (
            client.getZookeeperClient(),
            new Callable<String>()
            {
                @Override
                public String call() throws Exception
                {
                    String foundNode = null;
                    try
                    {
                        final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
                        List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);

                        foundNode = findNode(children, pathAndNode.getPath(), protectedMode.protectedId());
                        log.debug("Protected mode findNode result: {}", foundNode);

                        foundNode = protectedMode.validateFoundNode(client, createMode, foundNode);
                    }
                    catch ( KeeperException.NoNodeException ignore )
                    {
                        // ignore
                    }
                    return foundNode;
                }
            }
        );

    trace.setPath(path).commit();
    return returnPath;
}
 
源代码27 项目: curator   文件: GetConfigBuilderImpl.java
private byte[] configInForeground() throws Exception
{
    TimeTrace trace = client.getZookeeperClient().startTracer("GetConfigBuilderImpl-Foreground");
    try
    {
        return RetryLoop.callWithRetry
        (
            client.getZookeeperClient(),
            new Callable<byte[]>()
            {
                @Override
                public byte[] call() throws Exception
                {
                    if ( watching.isWatched() )
                    {
                        return client.getZooKeeper().getConfig(true, stat);
                    }
                    byte[] config = client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), stat);
                    watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false);
                    return config;
                }
            }
        );
    }
    finally
    {
        trace.commit();
    }
}
 
源代码28 项目: curator   文件: NamespaceImpl.java
String    fixForNamespace(String path, boolean isSequential)
{
    if ( ensurePathNeeded.get() )
    {
        try
        {
            final CuratorZookeeperClient zookeeperClient = client.getZookeeperClient();
            RetryLoop.callWithRetry
            (
                zookeeperClient,
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        ZKPaths.mkdirs(zookeeperClient.getZooKeeper(), ZKPaths.makePath("/", namespace), true, client.getAclProvider(), true);
                        return null;
                    }
                }
            );
            ensurePathNeeded.set(false);
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            client.logError("Ensure path threw exception", e);
        }
    }

    return ZKPaths.fixForNamespace(namespace, path, isSequential);
}
 
源代码29 项目: curator   文件: DistributedAtomicValue.java
private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
 {
     long            startMs = System.currentTimeMillis();
     int             retryCount = 0;

     if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) )
     {
         try
         {
             boolean         done = false;
             while ( !done )
             {
                 result.stats.incrementPromotedTries();
                 if ( tryOnce(result, makeValue) )
                 {
                     result.succeeded = true;
                     done = true;
                 }
                 else
                 {
                     if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
                     {
                         done = true;
                     }
                 }
             }
         }
         finally
         {
             mutex.release();
         }
     }

     result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
}
 
源代码30 项目: curator   文件: ThreadLocalRetryLoop.java
/**
 * Call to get the current retry loop. If there isn't one, one is allocated
 * via {@code newRetryLoopSupplier}.
 *
 * @param newRetryLoopSupplier supply a new retry loop when needed. Normally you should use {@link org.apache.curator.CuratorZookeeperClient#newRetryLoop()}
 * @return retry loop to use
 */
public RetryLoop getRetryLoop(Supplier<RetryLoop> newRetryLoopSupplier)
{
    Entry entry = threadLocal.get();
    if ( entry == null )
    {
        entry = new Entry(new WrappedRetryLoop(newRetryLoopSupplier.get()));
        threadLocal.set(entry);
    }
    ++entry.counter;
    return entry.retryLoop;
}