下面列出了怎么用org.apache.curator.RetryLoop的API类实例代码及写法,或者点击链接到github查看源代码。
private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
{
long startMs = System.currentTimeMillis();
int retryCount = 0;
boolean done = false;
while ( !done )
{
result.stats.incrementOptimisticTries();
if ( tryOnce(result, makeValue) )
{
result.succeeded = true;
done = true;
}
else
{
if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
{
done = true;
}
}
}
result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
}
@Override
public synchronized void ensure(final CuratorZookeeperClient client, final String path, final boolean makeLastNode) throws Exception
{
if ( !isSet )
{
RetryLoop.callWithRetry
(
client,
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
ZKPaths.mkdirs(client.getZooKeeper(), path, makeLastNode, aclProvider, asContainers());
helper.set(doNothingHelper);
isSet = true;
return null;
}
}
);
}
}
static <T> void backgroundCreateParentsThenNode(final CuratorFrameworkImpl client, final OperationAndData<T> mainOperationAndData, final String path, Backgrounding backgrounding, final boolean createParentsAsContainers)
{
BackgroundOperation<T> operation = new BackgroundOperation<T>()
{
@Override
public void performBackgroundOperation(OperationAndData<T> dummy) throws Exception
{
try
{
ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider(), createParentsAsContainers);
}
catch ( KeeperException e )
{
if ( !RetryLoop.isRetryException(e) )
{
throw e;
}
// otherwise safe to ignore as it will get retried
}
client.queueOperation(mainOperationAndData);
}
};
OperationAndData<T> parentOperation = new OperationAndData<T>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
client.queueOperation(parentOperation);
}
@Override
public byte[] forPath(String path) throws Exception
{
final String localPath = client.fixForNamespace(path);
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground");
byte[] responseData = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<byte[]>()
{
@Override
public byte[] call() throws Exception
{
return client.getZooKeeper().getData(localPath, false, responseStat);
}
}
);
trace.setResponseBytesLength(responseData).setPath(path).setStat(responseStat).commit();
return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;
}
private Stat pathInForeground(final String path, final byte[] data) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Foreground");
Stat resultStat = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<Stat>()
{
@Override
public Stat call() throws Exception
{
return client.getZooKeeper().setData(path, data, version);
}
}
);
trace.setRequestBytesLength(data).setPath(path).setStat(resultStat).commit();
return resultStat;
}
private Stat pathInForeground(final String path) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetACLBuilderImpl-Foreground");
Stat resultStat = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<Stat>()
{
@Override
public Stat call() throws Exception
{
return client.getZooKeeper().setACL(path, acling.getAclList(path), version);
}
}
);
trace.setPath(path).setStat(resultStat).commit();
return resultStat;
}
private List<ACL> pathInForeground(final String path) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Foreground");
List<ACL> result = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<List<ACL>>()
{
@Override
public List<ACL> call() throws Exception
{
return client.getZooKeeper().getACL(path, responseStat);
}
}
);
trace.setPath(path).setStat(responseStat).commit();
return result;
}
private PartETag uploadChunkWithRetry(byte[] buffer, int bytesRead, InitiateMultipartUploadResult initResponse, int index, RetryPolicy retryPolicy) throws Exception
{
long startMs = System.currentTimeMillis();
int retries = 0;
for(;;)
{
try
{
return uploadChunk(buffer, bytesRead, initResponse, index);
}
catch ( Exception e )
{
if ( !retryPolicy.allowRetry(retries++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
{
throw e;
}
}
}
}
private List<CuratorTransactionResult> forOperationsInForeground(final CuratorMultiTransactionRecord record) throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Foreground");
List<OpResult> responseData = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<List<OpResult>>()
{
@Override
public List<OpResult> call() throws Exception
{
return client.getZooKeeper().multi(record);
}
}
);
trace.commit();
return CuratorTransactionImpl.wrapResults(client, responseData, record);
}
@Override
public byte[] forPath(String path) throws Exception
{
final String localPath = client.fixForNamespace(path);
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground");
byte[] responseData = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<byte[]>()
{
@Override
public byte[] call() throws Exception
{
return client.getZooKeeper().getData(localPath, false, responseStat);
}
}
);
trace.setResponseBytesLength(responseData).setPath(path).setStat(responseStat).commit();
return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;
}
private Stat pathInForeground(final String path, final byte[] data) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Foreground");
Stat resultStat = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<Stat>()
{
@Override
public Stat call() throws Exception
{
return client.getZooKeeper().setData(path, data, version);
}
}
);
trace.setRequestBytesLength(data).setPath(path).setStat(resultStat).commit();
return resultStat;
}
private byte[] ensembleInForeground() throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Foreground");
byte[] responseData = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<byte[]>()
{
@Override
public byte[] call() throws Exception
{
return ((ZooKeeperAdmin)client.getZooKeeper()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
}
}
);
trace.commit();
return responseData;
}
@Override
public Collection<CuratorTransactionResult> commit() throws Exception
{
Preconditions.checkState(!isCommitted, "transaction already committed");
isCommitted = true;
List<OpResult> resultList = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<List<OpResult>>()
{
@Override
public List<OpResult> call() throws Exception
{
return doOperation();
}
}
);
if ( resultList.size() != transaction.metadataSize() )
{
throw new IllegalStateException(String.format("Result size (%d) doesn't match input size (%d)", resultList.size(), transaction.metadataSize()));
}
return wrapResults(client, resultList, transaction);
}
private Stat pathInForeground(final String path, final List<ACL> aclList) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetACLBuilderImpl-Foreground");
Stat resultStat = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<Stat>()
{
@Override
public Stat call() throws Exception
{
return client.getZooKeeper().setACL(path, aclList, version);
}
}
);
trace.setPath(path).setStat(resultStat).commit();
return resultStat;
}
private void pathInForeground(final String path) throws Exception
{
final String fixedPath = client.fixForNamespace(path);
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground");
RetryLoop.callWithRetry
(
client.getZookeeperClient(), () -> {
if ( watching.isWatched() )
{
client.getZooKeeper().addWatch(fixedPath, mode);
}
else
{
client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode);
}
return null;
});
trace.setPath(fixedPath).setWithWatcher(true).commit();
}
private List<ACL> pathInForeground(final String path) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Foreground");
List<ACL> result = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<List<ACL>>()
{
@Override
public List<ACL> call() throws Exception
{
return client.getZooKeeper().getACL(path, responseStat);
}
}
);
trace.setPath(path).setStat(responseStat).commit();
return result;
}
private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
{
long startMs = System.currentTimeMillis();
int retryCount = 0;
boolean done = false;
while ( !done )
{
result.stats.incrementOptimisticTries();
if ( tryOnce(result, makeValue) )
{
result.succeeded = true;
done = true;
}
else
{
if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
{
done = true;
}
}
}
result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
}
private Void doOperation(CuratorFramework client) throws Exception
{
try
{
RetryLoop.callWithRetry(client.getZookeeperClient(), () -> {
client.checkExists().forPath("/hey");
return null;
});
Assert.fail("Should have thrown an exception");
}
catch ( KeeperException dummy )
{
// correct
}
return null;
}
@Override
public synchronized void ensure(final CuratorZookeeperClient client, final String path, final boolean makeLastNode) throws Exception
{
if ( !isSet )
{
RetryLoop.callWithRetry
(
client,
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
ZKPaths.mkdirs(client.getZooKeeper(), path, makeLastNode, aclProvider, asContainers());
helper.set(doNothingHelper);
isSet = true;
return null;
}
}
);
}
}
private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
{
long startMs = System.currentTimeMillis();
int retryCount = 0;
if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) )
{
try
{
boolean done = false;
while ( !done )
{
result.stats.incrementPromotedTries();
if ( tryOnce(result, makeValue) )
{
result.succeeded = true;
done = true;
}
else
{
if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
{
done = true;
}
}
}
}
finally
{
mutex.release();
}
}
result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
}
private String findProtectedNodeInForeground(final String path) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-findProtectedNodeInForeground");
String returnPath = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<String>()
{
@Override
public String call() throws Exception
{
String foundNode = null;
try
{
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
foundNode = findNode(children, pathAndNode.getPath(), protectedId);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
return foundNode;
}
}
);
trace.setPath(path).commit();
return returnPath;
}
<DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
boolean isInitialExecution = (event == null);
if ( isInitialExecution )
{
performBackgroundOperation(operationAndData);
return;
}
boolean doQueueOperation = false;
do
{
if ( RetryLoop.shouldRetry(event.getResultCode()) )
{
doQueueOperation = checkBackgroundRetry(operationAndData, event);
break;
}
if ( operationAndData.getCallback() != null )
{
sendToBackgroundCallback(operationAndData, event);
break;
}
processEvent(event);
}
while ( false );
if ( doQueueOperation )
{
queueOperation(operationAndData);
}
}
private <DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndData, Throwable e)
{
do
{
if ( (operationAndData != null) && RetryLoop.isRetryException(e) )
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.debug("Retry-able exception received", e);
}
if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.debug("Retrying operation");
}
backgroundOperations.offer(operationAndData);
break;
}
else
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.debug("Retry policy did not allow retry");
}
if ( operationAndData.getErrorCallback() != null )
{
operationAndData.getErrorCallback().retriesExhausted(operationAndData);
}
}
}
logError("Background exception was not retry-able or retry gave up", e);
}
while ( false );
}
String fixForNamespace(String path, boolean isSequential)
{
if ( ensurePathNeeded.get() )
{
try
{
final CuratorZookeeperClient zookeeperClient = client.getZookeeperClient();
RetryLoop.callWithRetry
(
zookeeperClient,
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
ZKPaths.mkdirs(zookeeperClient.getZooKeeper(), ZKPaths.makePath("/", namespace), true, client.getAclProvider(), true);
return null;
}
}
);
ensurePathNeeded.set(false);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
client.logError("Ensure path threw exception", e);
}
}
return ZKPaths.fixForNamespace(namespace, path, isSequential);
}
@Override
public Collection<CuratorTransactionResult> commit() throws Exception
{
Preconditions.checkState(!isCommitted, "transaction already committed");
isCommitted = true;
final AtomicBoolean firstTime = new AtomicBoolean(true);
List<OpResult> resultList = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<List<OpResult>>()
{
@Override
public List<OpResult> call() throws Exception
{
return doOperation(firstTime);
}
}
);
if ( resultList.size() != transaction.metadataSize() )
{
throw new IllegalStateException(String.format("Result size (%d) doesn't match input size (%d)", resultList.size(), transaction.metadataSize()));
}
ImmutableList.Builder<CuratorTransactionResult> builder = ImmutableList.builder();
for ( int i = 0; i < resultList.size(); ++i )
{
OpResult opResult = resultList.get(i);
CuratorMultiTransactionRecord.TypeAndPath metadata = transaction.getMetadata(i);
CuratorTransactionResult curatorResult = makeCuratorResult(opResult, metadata);
builder.add(curatorResult);
}
return builder.build();
}
private String findProtectedNodeInForeground(final String path) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-findProtectedNodeInForeground");
String returnPath = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<String>()
{
@Override
public String call() throws Exception
{
String foundNode = null;
try
{
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
foundNode = findNode(children, pathAndNode.getPath(), protectedMode.protectedId());
log.debug("Protected mode findNode result: {}", foundNode);
foundNode = protectedMode.validateFoundNode(client, createMode, foundNode);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
return foundNode;
}
}
);
trace.setPath(path).commit();
return returnPath;
}
private byte[] configInForeground() throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("GetConfigBuilderImpl-Foreground");
try
{
return RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<byte[]>()
{
@Override
public byte[] call() throws Exception
{
if ( watching.isWatched() )
{
return client.getZooKeeper().getConfig(true, stat);
}
byte[] config = client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), stat);
watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false);
return config;
}
}
);
}
finally
{
trace.commit();
}
}
String fixForNamespace(String path, boolean isSequential)
{
if ( ensurePathNeeded.get() )
{
try
{
final CuratorZookeeperClient zookeeperClient = client.getZookeeperClient();
RetryLoop.callWithRetry
(
zookeeperClient,
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
ZKPaths.mkdirs(zookeeperClient.getZooKeeper(), ZKPaths.makePath("/", namespace), true, client.getAclProvider(), true);
return null;
}
}
);
ensurePathNeeded.set(false);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
client.logError("Ensure path threw exception", e);
}
}
return ZKPaths.fixForNamespace(namespace, path, isSequential);
}
private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
{
long startMs = System.currentTimeMillis();
int retryCount = 0;
if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) )
{
try
{
boolean done = false;
while ( !done )
{
result.stats.incrementPromotedTries();
if ( tryOnce(result, makeValue) )
{
result.succeeded = true;
done = true;
}
else
{
if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
{
done = true;
}
}
}
}
finally
{
mutex.release();
}
}
result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
}
/**
* Call to get the current retry loop. If there isn't one, one is allocated
* via {@code newRetryLoopSupplier}.
*
* @param newRetryLoopSupplier supply a new retry loop when needed. Normally you should use {@link org.apache.curator.CuratorZookeeperClient#newRetryLoop()}
* @return retry loop to use
*/
public RetryLoop getRetryLoop(Supplier<RetryLoop> newRetryLoopSupplier)
{
Entry entry = threadLocal.get();
if ( entry == null )
{
entry = new Entry(new WrappedRetryLoop(newRetryLoopSupplier.get()));
threadLocal.set(entry);
}
++entry.counter;
return entry.retryLoop;
}