下面列出了怎么用org.apache.zookeeper.OpResult的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void abortOpResult(Throwable t,
@Nullable OpResult opResult) {
Throwable cause;
if (null == opResult) {
cause = t;
} else {
assert (opResult instanceof OpResult.ErrorResult);
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) opResult;
if (KeeperException.Code.OK.intValue() == errorResult.getErr()) {
cause = t;
} else {
cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr()));
}
}
if (null != listener) {
listener.onAbort(cause);
}
}
@Test(timeout = 60000)
public void testAbortOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
OpResult opResult = new OpResult.ErrorResult(KeeperException.Code.NONODE.intValue());
versionedSetOp.abortOpResult(ke, opResult);
latch.await();
assertTrue(exception.get() instanceof KeeperException.NoNodeException);
}
@Override
protected void abortOpResult(Throwable t,
@Nullable OpResult opResult) {
Throwable cause;
if (null == opResult) {
cause = t;
} else {
assert (opResult instanceof OpResult.ErrorResult);
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) opResult;
if (KeeperException.Code.OK.intValue() == errorResult.getErr()) {
cause = t;
} else {
cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr()));
}
}
listener.onAbort(cause);
}
@Test(timeout = 60000)
public void testAbortOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
OpResult opResult = new OpResult.ErrorResult(KeeperException.Code.NONODE.intValue());
versionedSetOp.abortOpResult(ke, opResult);
latch.await();
assertTrue(exception.get() instanceof KeeperException.NoNodeException);
}
/**
* Run multiple operations in a transactional manner. Retry before throwing exception
*/
public List<OpResult> multi(Iterable<Op> ops)
throws KeeperException, InterruptedException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
RetryCounter retryCounter = retryCounterFactory.create();
Iterable<Op> multiOps = prepareZKMulti(ops);
while (true) {
try {
return checkZk().multi(multiOps);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "multi");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "multi");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
}
@Override
public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception
{
try
{
final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background");
AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> opResults)
{
trace.commit();
List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null;
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults);
client.processBackgroundOperation(operationAndData, event);
}
};
client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext());
}
catch ( Throwable e )
{
backgrounding.checkError(e, null);
}
}
private List<CuratorTransactionResult> forOperationsInForeground(final CuratorMultiTransactionRecord record) throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Foreground");
List<OpResult> responseData = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<List<OpResult>>()
{
@Override
public List<OpResult> call() throws Exception
{
return client.getZooKeeper().multi(record);
}
}
);
trace.commit();
return CuratorTransactionImpl.wrapResults(client, responseData, record);
}
@Override
public Collection<CuratorTransactionResult> commit() throws Exception
{
Preconditions.checkState(!isCommitted, "transaction already committed");
isCommitted = true;
List<OpResult> resultList = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<List<OpResult>>()
{
@Override
public List<OpResult> call() throws Exception
{
return doOperation();
}
}
);
if ( resultList.size() != transaction.metadataSize() )
{
throw new IllegalStateException(String.format("Result size (%d) doesn't match input size (%d)", resultList.size(), transaction.metadataSize()));
}
return wrapResults(client, resultList, transaction);
}
@Override
public Collection<CuratorTransactionResult> commit() throws Exception
{
Preconditions.checkState(!isCommitted, "transaction already committed");
isCommitted = true;
final AtomicBoolean firstTime = new AtomicBoolean(true);
List<OpResult> resultList = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<List<OpResult>>()
{
@Override
public List<OpResult> call() throws Exception
{
return doOperation(firstTime);
}
}
);
if ( resultList.size() != transaction.metadataSize() )
{
throw new IllegalStateException(String.format("Result size (%d) doesn't match input size (%d)", resultList.size(), transaction.metadataSize()));
}
ImmutableList.Builder<CuratorTransactionResult> builder = ImmutableList.builder();
for ( int i = 0; i < resultList.size(); ++i )
{
OpResult opResult = resultList.get(i);
CuratorMultiTransactionRecord.TypeAndPath metadata = transaction.getMetadata(i);
CuratorTransactionResult curatorResult = makeCuratorResult(opResult, metadata);
builder.add(curatorResult);
}
return builder.build();
}
private CuratorTransactionResult makeCuratorResult(OpResult opResult, CuratorMultiTransactionRecord.TypeAndPath metadata)
{
String resultPath = null;
Stat resultStat = null;
switch ( opResult.getType() )
{
default:
{
// NOP
break;
}
case ZooDefs.OpCode.create:
{
OpResult.CreateResult createResult = (OpResult.CreateResult)opResult;
resultPath = client.unfixForNamespace(createResult.getPath());
break;
}
case ZooDefs.OpCode.setData:
{
OpResult.SetDataResult setDataResult = (OpResult.SetDataResult)opResult;
resultStat = setDataResult.getStat();
break;
}
}
return new CuratorTransactionResult(metadata.type, metadata.forPath, resultPath, resultStat);
}
public List<OpResult> multi(final Iterable<Op> ops, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
if (retryOnConnLoss) {
return zkCmdExecutor.retryOperation(() -> keeper.multi(ops));
} else {
return keeper.multi(ops);
}
}
@Override
protected void commitOpResult(OpResult opResult) {
assert(opResult instanceof OpResult.SetDataResult);
OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult;
if (null != listener) {
listener.onCommit(new LongVersion(setDataResult.getStat().getVersion()));
}
}
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> results) {
if (KeeperException.Code.OK.intValue() == rc) { // transaction succeed
for (int i = 0; i < ops.size(); i++) {
ops.get(i).commitOpResult(results.get(i));
}
FutureUtils.complete(result, null);
} else {
KeeperException ke = KeeperException.create(KeeperException.Code.get(rc));
for (int i = 0; i < ops.size(); i++) {
ops.get(i).abortOpResult(ke, null != results ? results.get(i) : null);
}
FutureUtils.completeExceptionally(result, ke);
}
}
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> results) {
if (KeeperException.Code.OK.intValue() == rc) { // transaction succeed
for (int i = 0; i < ops.size(); i++) {
ops.get(i).commitOpResult(results.get(i));
}
FutureUtils.setValue(result, null);
} else {
KeeperException ke = KeeperException.create(KeeperException.Code.get(rc));
for (int i = 0; i < ops.size(); i++) {
ops.get(i).abortOpResult(ke, null != results ? results.get(i) : null);
}
FutureUtils.setException(result, ke);
}
}
static List<CuratorTransactionResult> wrapResults(CuratorFrameworkImpl client, List<OpResult> resultList, CuratorMultiTransactionRecord transaction)
{
ImmutableList.Builder<CuratorTransactionResult> builder = ImmutableList.builder();
for ( int i = 0; i < resultList.size(); ++i )
{
OpResult opResult = resultList.get(i);
TypeAndPath metadata = transaction.getMetadata(i);
CuratorTransactionResult curatorResult = makeCuratorResult(client, opResult, metadata);
builder.add(curatorResult);
}
return builder.build();
}
static CuratorTransactionResult makeCuratorResult(CuratorFrameworkImpl client, OpResult opResult, TypeAndPath metadata)
{
String resultPath = null;
Stat resultStat = null;
int error = 0;
switch ( opResult.getType() )
{
default:
{
// NOP
break;
}
case ZooDefs.OpCode.create:
{
OpResult.CreateResult createResult = (OpResult.CreateResult)opResult;
resultPath = client.unfixForNamespace(createResult.getPath());
break;
}
case ZooDefs.OpCode.setData:
{
OpResult.SetDataResult setDataResult = (OpResult.SetDataResult)opResult;
resultStat = setDataResult.getStat();
break;
}
case ZooDefs.OpCode.error:
{
OpResult.ErrorResult errorResult = (OpResult.ErrorResult)opResult;
error = errorResult.getErr();
break;
}
}
return new CuratorTransactionResult(metadata.getType(), metadata.getForPath(), resultPath, resultStat, error);
}
public List<OpResult> multi(final Iterable<Op> ops) throws ZkException {
if (ops == null) {
throw new NullPointerException("ops must not be null.");
}
return retryUntilConnected(new Callable<List<OpResult>>() {
@Override
public List<OpResult> call() throws Exception {
return getConnection().multi(ops);
}
});
}
@Override
public List<OpResult> multi(final Iterable<Op> ops) throws InterruptedException, KeeperException {
return execute(new ZKExecutor<List<OpResult>>("multi") {
@Override
List<OpResult> execute() throws KeeperException, InterruptedException {
return ZooKeeperClient.super.multi(ops);
}
});
}
@Override
public void commit() throws RegistryException {
try {
List<OpResult> results = zkTransaction.commit();
} catch (InterruptedException | KeeperException e) {
throw new RegistryException(ErrorCode.Unknown, e);
}
}
@Override
public List<OpResult> commit() throws InterruptedException, KeeperException {
return transaction.commit();
}
@Override
public List<OpResult> multi(Iterable<Op> ops) throws BadVersionException, NoSuchElementException, AlreadyExistsException, IOException, KeeperException, InterruptedException {
return delegate.multi(ops);
}
@Override
public List<OpResult> multi(Iterable<Op> ops) throws BadVersionException, NoSuchElementException, AlreadyExistsException, IOException, KeeperException, InterruptedException {
throw new UnsupportedOperationException("multi");
}
@Override
public void commit(Object r) {
assert(r instanceof OpResult);
commitOpResult((OpResult) r);
}
@Override
public void abort(Throwable t, Object r) {
assert(r instanceof OpResult);
abortOpResult(t, (OpResult) r);
}
@Override
protected void commitOpResult(OpResult opResult) {
if (null != listener) {
listener.onCommit(null);
}
}
@Override
protected void abortOpResult(Throwable t, OpResult opResult) {
if (null != listener) {
listener.onAbort(t);
}
}
private static void executeCreateMissingPathTxn(ZooKeeper zk,
List<Op> zkOps,
List<byte[]> pathsToCreate,
List<Versioned<byte[]>> metadatas,
String logRootPath,
CompletableFuture<List<Versioned<byte[]>>> promise) {
zk.multi(zkOps, new AsyncCallback.MultiCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
if (KeeperException.Code.OK.intValue() == rc) {
List<Versioned<byte[]>> finalMetadatas =
Lists.newArrayListWithExpectedSize(metadatas.size());
for (int i = 0; i < pathsToCreate.size(); i++) {
byte[] dataCreated = pathsToCreate.get(i);
if (null == dataCreated) {
finalMetadatas.add(metadatas.get(i));
} else {
finalMetadatas.add(new Versioned<byte[]>(dataCreated, new LongVersion(0)));
}
}
promise.complete(finalMetadatas);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
promise.completeExceptionally(new LogExistsException("Someone just created log "
+ logRootPath));
} else {
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
for (OpResult result : resultList) {
if (result instanceof OpResult.ErrorResult) {
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
builder.append(errorResult.getErr()).append(",");
} else {
builder.append(0).append(",");
}
}
String resultCodeList = builder.substring(0, builder.length() - 1);
LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
}
promise.completeExceptionally(new ZKException("Failed to create log " + logRootPath,
KeeperException.Code.get(rc)));
}
}
}, null);
}
@Override
protected void commitOpResult(OpResult opResult) {
this.commitLatch.countDown();
}
@Override
protected void abortOpResult(Throwable t, @Nullable OpResult opResult) {
this.abortLatch.countDown();
}
@Override
protected void commitOpResult(OpResult opResult) {
assert(opResult instanceof OpResult.SetDataResult);
OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult;
listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
}