下面列出了io.grpc.Status#isOk ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Answer<Void> answerWith(@Nullable Operation op, Status status) {
return invocationOnMock -> {
@SuppressWarnings("unchecked")
StreamObserver<Operation> responseObserver =
(StreamObserver<Operation>) invocationOnMock.getArguments()[1];
if (op != null) {
responseObserver.onNext(op);
}
if (status.isOk()) {
responseObserver.onCompleted();
} else {
responseObserver.onError(status.asRuntimeException());
}
return null;
};
}
public WatchTargetChange(
WatchTargetChangeType changeType,
List<Integer> targetIds,
ByteString resumeToken,
@Nullable Status cause) {
// cause != null implies removal
hardAssert(
cause == null || changeType == WatchTargetChangeType.Removed,
"Got cause for a target change that was not a removal");
this.changeType = changeType;
this.targetIds = targetIds;
this.resumeToken = resumeToken;
// We can get a cause that is considered ok, but everywhere we assume that
// any non-null cause is an error.
if (cause != null && !cause.isOk()) {
this.cause = cause;
} else {
this.cause = null;
}
}
private void removeAndCleanupTarget(int targetId, Status status) {
for (Query query : queriesByTarget.get(targetId)) {
queryViewsByQuery.remove(query);
if (!status.isOk()) {
syncEngineListener.onError(query, status);
logErrorIfInteresting(status, "Listen for %s failed", query);
}
}
queriesByTarget.remove(targetId);
ImmutableSortedSet<DocumentKey> limboKeys = limboDocumentRefs.referencesForId(targetId);
limboDocumentRefs.removeReferencesForId(targetId);
for (DocumentKey key : limboKeys) {
if (!limboDocumentRefs.containsKey(key)) {
// We removed the last reference for this key.
removeLimboTarget(key);
}
}
}
/**
* Updates picker with the list of active subchannels (state == READY).
*/
@SuppressWarnings("ReferenceEquality")
private void updateBalancingState() {
List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels());
if (activeList.isEmpty()) {
// No READY subchannels, determine aggregate state and error status
boolean isConnecting = false;
Status aggStatus = EMPTY_OK;
for (Subchannel subchannel : getSubchannels()) {
ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
// This subchannel IDLE is not because of channel IDLE_TIMEOUT,
// in which case LB is already shutdown.
// RRLB will request connection immediately on subchannel IDLE.
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
isConnecting = true;
}
if (aggStatus == EMPTY_OK || !aggStatus.isOk()) {
aggStatus = stateInfo.getStatus();
}
}
updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE,
// If all subchannels are TRANSIENT_FAILURE, return the Status associated with
// an arbitrary subchannel, otherwise return OK.
new EmptyPicker(aggStatus));
} else {
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = random.nextInt(activeList.size());
updateBalancingState(READY, new ReadyPicker(activeList, startIndex));
}
}
/**
* Returns a {@link RpcResponse} corresponding to the given {@link Status} generated by the server.
*/
public static RpcResponse rpcResponse(Status status, @Nullable Object message) {
if (status.isOk()) {
return RpcResponse.of(message);
} else {
return RpcResponse.ofFailure(status.asException());
}
}
@Override
public synchronized void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
if (!resultSet) {
result =
Status.INTERNAL
.withDescription("No value received for unary call")
.asRuntimeException(trailers);
}
} else {
result = status.asRuntimeException(trailers);
}
resultSet = true;
countDown.countDown();
}
@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
if (status.isOk()) {
closedFuture.set(null);
} else {
StatusException e = status.asException();
closedFuture.setException(e);
responseFuture.setException(e);
}
}
private void closedInternal(Status status) {
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
}
} finally {
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
context.cancel(null);
}
}
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
observer.onCompleted();
} else {
observer.onError(status.asRuntimeException(trailers));
}
}
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
if (value == null) {
// No value received so mark the future as an error
responseFuture.setException(
Status.INTERNAL.withDescription("No value received for unary call")
.asRuntimeException(trailers));
}
responseFuture.set(value);
} else {
responseFuture.setException(status.asRuntimeException(trailers));
}
}
@Override
public void onClose(Status status, Metadata trailers) {
Preconditions.checkState(!done, "ClientCall already closed");
if (status.isOk()) {
buffer.add(BlockingResponseStream.this);
} else {
buffer.add(status.asRuntimeException(trailers));
}
done = true;
}
private void handleWriteStreamClose(Status status) {
if (Status.OK.equals(status)) {
// Graceful stop (due to stop() or idle timeout). Make sure that's desirable.
hardAssert(
!shouldStartWriteStream(), "Write stream was stopped gracefully while still needed.");
}
// If the write stream closed due to an error, invoke the error callbacks if there are pending
// writes.
if (!status.isOk() && !writePipeline.isEmpty()) {
// TODO: handle UNAUTHENTICATED status, see go/firestore-client-errors
if (writeStream.isHandshakeComplete()) {
// This error affects the actual writes
handleWriteError(status);
} else {
// If there was an error before the handshake has finished, it's possible that the server is
// unable to process the stream token we're sending. (Perhaps it's too old?)
handleWriteHandshakeError(status);
}
}
// The write stream may have already been restarted by refilling the write pipeline for failed
// writes. In that case, we don't want to start the write stream again.
if (shouldStartWriteStream()) {
startWriteStream();
}
}
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
observer.onCompleted();
} else {
observer.onError(status.asRuntimeException(trailers));
}
}
@Override
public void onClose(Status status, Metadata trailers) {
try {
SocketAddress remoteServer = clientCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
callOptions.getOption(GrpcCallOptions.CALLOPTIONS_CUSTOME_KEY)
.put(GrpcCallOptions.GRPC_CURRENT_ADDR_KEY, remoteServer);
} finally {
if (status.isOk()) {
statusOk(trailers);
} else {
statusError(status, trailers);
}
}
}
private void closedInternal(final Status status) {
// For cancellations, promptly inform any users of the context that their work should be
// aborted. Otherwise, we can wait until pending work is done.
if (!status.isOk()) {
// The callExecutor might be busy doing user work. To avoid waiting, use an executor that
// is not serializing.
cancelExecutor.execute(new ContextCloser(context, status.getCause()));
}
final Link link = PerfMark.linkOut();
final class Closed extends ContextRunnable {
Closed() {
super(context);
}
@Override
public void runInContext() {
PerfMark.startTask("ServerCallListener(app).closed", tag);
PerfMark.linkIn(link);
try {
getListener().closed(status);
} finally {
PerfMark.stopTask("ServerCallListener(app).closed", tag);
}
}
}
callExecutor.execute(new Closed());
}
/**
* Record a finished stream and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
@Override
public void streamClosed(Status status) {
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (streamClosed != 0) {
return;
}
streamClosed = 1;
}
if (!module.recordFinishedRpcs) {
return;
}
stopwatch.stop();
long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// TODO(songya): remove the deprecated measure constants once they are completed removed.
.put(DeprecatedCensusConstants.RPC_SERVER_FINISHED_COUNT, 1)
// The latency is double value
.put(
DeprecatedCensusConstants.RPC_SERVER_SERVER_LATENCY,
elapsedTimeNanos / NANOS_PER_MILLI)
.put(DeprecatedCensusConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount)
.put(DeprecatedCensusConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount)
.put(DeprecatedCensusConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize)
.put(DeprecatedCensusConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize)
.put(
DeprecatedCensusConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES,
outboundUncompressedSize)
.put(
DeprecatedCensusConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES,
inboundUncompressedSize);
if (!status.isOk()) {
measureMap.put(DeprecatedCensusConstants.RPC_SERVER_ERROR_COUNT, 1);
}
TagValue statusTag = TagValue.create(status.getCode().toString());
measureMap.record(
module
.tagger
.toBuilder(parentCtx)
.putLocal(RpcMeasureConstants.GRPC_SERVER_STATUS, statusTag)
.build());
}
/**
* Record a finished call and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
void callEnded(Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (callEnded != 0) {
return;
}
callEnded = 1;
}
if (!recordFinishedRpcs) {
return;
}
stopwatch.stop();
long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
ClientTracer tracer = streamTracer;
if (tracer == null) {
tracer = BLANK_CLIENT_TRACER;
}
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// TODO(songya): remove the deprecated measure constants once they are completed removed.
.put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
// The latency is double value
.put(
DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY,
roundtripNanos / NANOS_PER_MILLI)
.put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount)
.put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount)
.put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize)
.put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize)
.put(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES,
tracer.outboundUncompressedSize)
.put(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES,
tracer.inboundUncompressedSize);
if (!status.isOk()) {
measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1);
}
TagValue statusTag = TagValue.create(status.getCode().toString());
measureMap.record(
module
.tagger
.toBuilder(startCtx)
.put(DeprecatedCensusConstants.RPC_STATUS, statusTag)
.build());
}
/**
* Record a finished call and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
void callEnded(Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (callEnded != 0) {
return;
}
callEnded = 1;
}
if (!module.recordFinishedRpcs) {
return;
}
stopwatch.stop();
long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
ClientTracer tracer = streamTracer;
if (tracer == null) {
tracer = new ClientTracer(module, startCtx);
}
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// TODO(songya): remove the deprecated measure constants once they are completed removed.
.put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
// The latency is double value
.put(
DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY,
roundtripNanos / NANOS_PER_MILLI)
.put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount)
.put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount)
.put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize)
.put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize)
.put(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES,
tracer.outboundUncompressedSize)
.put(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES,
tracer.inboundUncompressedSize);
if (!status.isOk()) {
measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1);
}
TagValue statusTag = TagValue.create(status.getCode().toString());
measureMap.record(
module
.tagger
.toBuilder(startCtx)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_STATUS, statusTag)
.build());
}
/**
* Closes the stream and cleans up as necessary:
*
* <ul>
* <li>closes the underlying GRPC stream;
* <li>calls the onClose handler with the given 'status';
* <li>sets internal stream state to 'finalState';
* <li>adjusts the backoff timer based on status
* </ul>
*
* <p>A new stream can be opened by calling {@link #start).
*
* @param finalState the intended state of the stream after closing.
* @param status the status to emit to the listener.
*/
private void close(State finalState, Status status) {
hardAssert(isStarted(), "Only started streams should be closed.");
hardAssert(
finalState == State.Error || status.equals(Status.OK),
"Can't provide an error when not in an error state.");
workerQueue.verifyIsCurrentThread();
if (Datastore.isMissingSslCiphers(status)) {
// The Android device is missing required SSL Ciphers. This error is non-recoverable and must
// be addressed by the app developer (see https://bit.ly/2XFpdma).
Util.crashMainThread(
new IllegalStateException(SSL_DEPENDENCY_ERROR_MESSAGE, status.getCause()));
}
// Cancel any outstanding timers (they're guaranteed not to execute).
cancelIdleCheck();
this.backoff.cancel();
// Invalidates any stream-related callbacks (e.g. from auth or the underlying stream),
// guaranteeing they won't execute.
this.closeCount++;
Code code = status.getCode();
if (code == Code.OK) {
// If this is an intentional close ensure we don't delay our next connection attempt.
backoff.reset();
} else if (code == Code.RESOURCE_EXHAUSTED) {
Logger.debug(
getClass().getSimpleName(),
"(%x) Using maximum backoff delay to prevent overloading the backend.",
System.identityHashCode(this));
backoff.resetToMax();
} else if (code == Code.UNAUTHENTICATED) {
// "unauthenticated" error means the token was rejected. Try force refreshing it in case it
// just expired.
firestoreChannel.invalidateToken();
} else if (code == Code.UNAVAILABLE) {
// This exception is thrown when the gRPC connection fails on the client side, To shorten
// reconnect time, we can use a shorter max delay when reconnecting.
if (status.getCause() instanceof java.net.UnknownHostException
|| status.getCause() instanceof java.net.ConnectException) {
backoff.setTemporaryMaxDelay(BACKOFF_CLIENT_NETWORK_FAILURE_MAX_DELAY_MS);
}
}
if (finalState != State.Error) {
Logger.debug(
getClass().getSimpleName(),
"(%x) Performing stream teardown",
System.identityHashCode(this));
tearDown();
}
if (call != null) {
// Clean up the underlying RPC. If this close() is in response to an error, don't attempt to
// call half-close to avoid secondary failures.
if (status.isOk()) {
Logger.debug(
getClass().getSimpleName(),
"(%x) Closing stream client-side",
System.identityHashCode(this));
call.halfClose();
}
call = null;
}
// This state must be assigned before calling listener.onClose to allow the callback to
// inhibit backoff or otherwise manipulate the state in its non-started state.
this.state = finalState;
// Notify the listener that the stream closed.
listener.onClose(status);
}
private static boolean isSuccess(Status status) {
return status != null && status.isOk();
}