下面列出了怎么用io.grpc.Metadata的API类实例代码及写法,或者点击链接到github查看源代码。
private void respondWithHttpError(
ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
Metadata metadata = new Metadata();
metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
metadata.put(InternalStatus.MESSAGE_KEY, msg);
byte[][] serialized = InternalMetadata.serialize(metadata);
Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
.status("" + code)
.set(CONTENT_TYPE_HEADER, "text/plain; encoding=utf-8");
for (int i = 0; i < serialized.length; i += 2) {
headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
}
encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
}
@Test
public void reprocess_NoPendingStream() {
SubchannelPicker picker = mock(SubchannelPicker.class);
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class))).thenReturn(mockRealStream);
delayedTransport.reprocess(picker);
verifyNoMoreInteractions(picker);
verifyNoMoreInteractions(transportListener);
// Though picker was not originally used, it will be saved and serve future streams.
ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT);
verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT));
verify(subchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
private void createStream() throws Exception {
Http2Headers headers = new DefaultHttp2Headers()
.method(HTTP_METHOD)
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC)
.set(TE_HEADER, TE_TRAILERS)
.path(new AsciiString("/foo/bar"));
ByteBuf headersFrame = headersFrame(STREAM_ID, headers);
channelRead(headersFrame);
ArgumentCaptor<NettyServerStream> streamCaptor =
ArgumentCaptor.forClass(NettyServerStream.class);
ArgumentCaptor<String> methodCaptor = ArgumentCaptor.forClass(String.class);
verify(transportListener).streamCreated(streamCaptor.capture(), methodCaptor.capture(),
any(Metadata.class));
stream = streamCaptor.getValue();
}
private void prepareHeaders(Compressor compressor, Metadata metadata) {
final RequestHeadersBuilder newHeaders = req.headers().toBuilder();
if (compressor != Identity.NONE) {
newHeaders.set(GrpcHeaderNames.GRPC_ENCODING, compressor.getMessageEncoding());
}
if (!advertisedEncodingsHeader.isEmpty()) {
newHeaders.add(GrpcHeaderNames.GRPC_ACCEPT_ENCODING, advertisedEncodingsHeader);
}
newHeaders.add(GrpcHeaderNames.GRPC_TIMEOUT,
TimeoutHeaderUtil.toHeaderValue(
TimeUnit.MILLISECONDS.toNanos(ctx.responseTimeoutMillis())));
MetadataUtil.fillHeaders(metadata, newHeaders);
final HttpRequest newReq = req.withHeaders(newHeaders);
ctx.updateRequest(newReq);
}
/**
* Test for issue https://github.com/grpc/grpc-java/issues/1795
*/
@Test
public void frameShouldBeIgnoredAfterDeframerClosed() {
final Queue<InputStream> streamListenerMessageQueue = new LinkedList<>();
stream.transportState().setListener(new ServerStreamListenerBase() {
@Override
public void messagesAvailable(MessageProducer producer) {
InputStream message;
while ((message = producer.next()) != null) {
streamListenerMessageQueue.add(message);
}
}
});
ReadableBuffer buffer = mock(ReadableBuffer.class);
// Close the deframer
stream.close(Status.OK, new Metadata());
stream.transportState().complete();
// Frame received after deframer closed, should be ignored and not trigger an exception
stream.transportState().inboundDataReceived(buffer, true);
verify(buffer).close();
assertNull("no message expected", streamListenerMessageQueue.poll());
}
@Test
public void droppedShouldNeverRetry() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
// start
retriableStream.start(masterListener);
verify(retriableStreamRecorder).newSubstream(0);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
// drop and verify no retry
Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
sublistenerCaptor1.getValue().closed(status, DROPPED, new Metadata());
verifyNoMoreInteractions(mockStream1, mockStream2);
verify(retriableStreamRecorder).postCommit();
verify(masterListener).closed(same(status), any(Metadata.class));
}
@Test
public void loadRecordingStreamTracerFactory_clientSideQueryCountsAggregation() {
LoadRecordingStreamTracerFactory factory1 =
new LoadRecordingStreamTracerFactory(counter, NOOP_CLIENT_STREAM_TRACER_FACTORY);
ClientStreamTracer tracer = factory1.newClientStreamTracer(STREAM_INFO, new Metadata());
ClientLoadSnapshot snapshot = counter.snapshot();
assertQueryCounts(snapshot, 0, 1, 0, 1);
tracer.streamClosed(Status.OK);
snapshot = counter.snapshot();
assertQueryCounts(snapshot, 1, 0, 0, 0);
// Create a second LoadRecordingStreamTracerFactory with the same counter, stats are aggregated
// together.
LoadRecordingStreamTracerFactory factory2 =
new LoadRecordingStreamTracerFactory(counter, NOOP_CLIENT_STREAM_TRACER_FACTORY);
factory1.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.ABORTED);
factory2.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.CANCELLED);
snapshot = counter.snapshot();
assertQueryCounts(snapshot, 0, 0, 2, 2);
}
static HttpHeaders statusToTrailers(
ServiceRequestContext ctx, Status status, Metadata metadata, boolean headersSent) {
final HttpHeadersBuilder trailers = GrpcTrailersUtil.statusToTrailers(
status.getCode().value(), status.getDescription(), headersSent);
MetadataUtil.fillHeaders(metadata, trailers);
if (ctx.config().verboseResponses() && status.getCause() != null) {
final ThrowableProto proto = GrpcStatus.serializeThrowable(status.getCause());
trailers.add(GrpcHeaderNames.ARMERIA_GRPC_THROWABLEPROTO_BIN,
Base64.getEncoder().encodeToString(proto.toByteArray()));
}
final HttpHeaders additionalTrailers = ctx.additionalResponseTrailers();
ctx.mutateAdditionalResponseTrailers(HttpHeadersBuilder::clear);
trailers.add(additionalTrailers);
return trailers.build();
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
protected Listener<RespT> delegate() {
callTime = System.nanoTime();
return super.delegate();
}
}, headers);
}
};
}
@Test public void userInterceptor_throwsOnClose() throws IOException {
init(new ServerInterceptor() {
@Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata metadata, ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override public void close(Status status, Metadata trailers) {
throw new IllegalStateException("I'm a bad interceptor.");
}
}, metadata);
}
});
assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
.isInstanceOf(StatusRuntimeException.class);
testSpanHandler.takeRemoteSpanWithErrorMessage(Span.Kind.SERVER, "I'm a bad interceptor.");
}
/**
* Called by subclasses for the terminal trailer metadata on a stream.
*
* @param trailers the received terminal trailer metadata
*/
protected void transportTrailersReceived(Metadata trailers) {
Preconditions.checkNotNull(trailers, "trailers");
if (transportError == null && !headersReceived) {
transportError = validateInitialMetadata(trailers);
if (transportError != null) {
transportErrorMetadata = trailers;
}
}
if (transportError != null) {
transportError = transportError.augmentDescription("trailers: " + trailers);
http2ProcessingFailed(transportError, false, transportErrorMetadata);
} else {
Status status = statusFromTrailers(trailers);
stripTransportDetails(trailers);
inboundTrailersReceived(trailers, status);
}
}
/**
* This is more advanced and does not make use of the stub. You should not normally need to do
* this, but here is how you would.
*/
void advancedAsyncCall() {
ClientCall<HelloRequest, HelloReply> call =
channel.newCall(GreeterGrpc.getSayHelloMethod(), CallOptions.DEFAULT);
final CountDownLatch latch = new CountDownLatch(1);
call.start(new ClientCall.Listener<HelloReply>() {
@Override
public void onClose(Status status, Metadata trailers) {
Verify.verify(status.getCode() == Status.Code.INTERNAL);
Verify.verify(status.getDescription().contains("Narwhal"));
// Cause is not transmitted over the wire.
latch.countDown();
}
}, new Metadata());
call.sendMessage(HelloRequest.newBuilder().setName("Marge").build());
call.halfClose();
if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) {
throw new RuntimeException("timeout!");
}
}
private void respondWithHttpError(
ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
Metadata metadata = new Metadata();
metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
metadata.put(InternalStatus.MESSAGE_KEY, msg);
byte[][] serialized = InternalMetadata.serialize(metadata);
Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
.status("" + code)
.set(CONTENT_TYPE_HEADER, "text/plain; encoding=utf-8");
for (int i = 0; i < serialized.length; i += 2) {
headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
}
encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
}
@Test
public void overrideDefaultUserAgent() throws Exception {
startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, "fakeUserAgent");
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
List<Header> expectedHeaders = Arrays.asList(HTTP_SCHEME_HEADER, METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"),
new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()),
new Header(GrpcUtil.USER_AGENT_KEY.name(),
GrpcUtil.getGrpcUserAgent("okhttp", "fakeUserAgent")),
CONTENT_TYPE_HEADER, TE_HEADER);
verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
getStream(3).cancel(Status.CANCELLED);
shutdownAndVerify();
}
@Test
public void headersSupportExtensionContentType() throws Exception {
manualSetUp();
Http2Headers headers = new DefaultHttp2Headers()
.method(HTTP_METHOD)
.set(CONTENT_TYPE_HEADER, new AsciiString("application/grpc+json", UTF_8))
.set(TE_HEADER, TE_TRAILERS)
.path(new AsciiString("/foo/bar"));
ByteBuf headersFrame = headersFrame(STREAM_ID, headers);
channelRead(headersFrame);
ArgumentCaptor<NettyServerStream> streamCaptor =
ArgumentCaptor.forClass(NettyServerStream.class);
ArgumentCaptor<String> methodCaptor = ArgumentCaptor.forClass(String.class);
verify(transportListener).streamCreated(streamCaptor.capture(), methodCaptor.capture(),
any(Metadata.class));
stream = streamCaptor.getValue();
}
protected AbstractClientStream(
WritableBufferAllocator bufferAllocator,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
Metadata headers,
CallOptions callOptions,
boolean useGet) {
checkNotNull(headers, "headers");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");
this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
this.useGet = useGet;
if (!useGet) {
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
this.headers = headers;
} else {
framer = new GetFramer(headers, statsTraceCtx);
}
}
private void close(Status status, Metadata metadata) {
final Deadline deadline = callOptions.getDeadline();
if (status.getCode() == Code.CANCELLED && deadline != null && deadline.isExpired()) {
status = Status.DEADLINE_EXCEEDED.augmentDescription(
"ClientCall was cancelled at or after deadline.");
// Replace trailers to prevent mixing sources of status and trailers.
metadata = new Metadata();
}
final RequestLogBuilder logBuilder = ctx.logBuilder();
logBuilder.responseContent(GrpcLogUtil.rpcResponse(status, firstResponse), null);
if (status.isOk()) {
req.abort();
} else {
req.abort(status.asRuntimeException(metadata));
}
responseReader.cancel();
try (SafeCloseable ignored = ctx.push()) {
assert listener != null;
listener.onClose(status, metadata);
}
notifyExecutor();
}
/**
* Factory method for the client-side.
*/
public static StatsTraceContext newClientContext(
final CallOptions callOptions, final Attributes transportAttrs, Metadata headers) {
List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
if (factories.isEmpty()) {
return NOOP;
}
ClientStreamTracer.StreamInfo info =
ClientStreamTracer.StreamInfo.newBuilder()
.setTransportAttrs(transportAttrs).setCallOptions(callOptions).build();
// This array will be iterated multiple times per RPC. Use primitive array instead of Collection
// so that for-each doesn't create an Iterator every time.
StreamTracer[] tracers = new StreamTracer[factories.size()];
for (int i = 0; i < tracers.length; i++) {
tracers[i] = factories.get(i).newClientStreamTracer(info, headers);
}
return new StatsTraceContext(tracers);
}
/** clientStream.serverClosed() must be called before this method */
private void notifyClientClose(Status status, Metadata trailers) {
Status clientStatus = stripCause(status);
synchronized (this) {
if (closed) {
return;
}
if (clientReceiveQueue.isEmpty()) {
closed = true;
clientStream.statsTraceCtx.streamClosed(clientStatus);
clientStreamListener.closed(clientStatus, trailers);
} else {
clientNotifyStatus = clientStatus;
clientNotifyTrailers = trailers;
}
}
streamClosed();
}
public void setSPNEGOToken() {
String encodedToken;
try {
encodedToken = BaseEncoding.base64().encode(SpnegoUtils.newSPNEGOToken(serverInfo.getKerberosPrincipal()));
} catch (GSSException e) {
// Clean up the channel before re-throwing the exception
managedChannel.shutdownNow();
throw new RuntimeException(
"Failed creating a SPNEGO token. Make sure that you have run kinit and that your Kerberos configuration is correct. See the full Kerberos error message: " + e.getMessage());
}
// Set the 'authorization' header with the SPNEGO token
Metadata metadata = new Metadata();
Metadata.Key<String> key = Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
metadata.put(key, "Negotiate " + encodedToken);
stub = MetadataUtils.attachHeaders(stub, metadata);
}
@Test
@SuppressWarnings("deprecation")
public void authorityPropagation() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Metadata clientHeaders = new Metadata();
ClientStream clientStream = client.newStream(methodDescriptor, clientHeaders, callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
assertEquals(testAuthority(server), serverStream.getAuthority());
}
@Test
public void maxMessageSizeShouldBeEnforced() throws Exception {
// Allow the response payloads of up to 1 byte.
startTransport(3, null, true, 1, INITIAL_WINDOW_SIZE, null);
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
stream.request(1);
assertContainStream(3);
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
assertNotNull(listener.headers);
// Receive the message.
final String message = "Hello Client";
Buffer buffer = createMessageFrame(message);
frameHandler().data(false, 3, buffer, (int) buffer.size());
listener.waitUntilStreamClosed();
assertEquals(Code.RESOURCE_EXHAUSTED, listener.status.getCode());
shutdownAndVerify();
}
private Map<String, String> createRequestHeaders(
Metadata metadata, Map<String, NameMatcher> keyBuilder) {
Map<String, String> rlsRequestHeaders = new HashMap<>();
for (Map.Entry<String, NameMatcher> entry : keyBuilder.entrySet()) {
NameMatcher nameMatcher = entry.getValue();
String value = null;
for (String requestHeaderName : nameMatcher.names()) {
value = metadata.get(Metadata.Key.of(requestHeaderName, Metadata.ASCII_STRING_MARSHALLER));
if (value != null) {
break;
}
}
if (value != null) {
rlsRequestHeaders.put(entry.getKey(), value);
} else if (!nameMatcher.isOptional()) {
throw new StatusRuntimeException(
Status.INVALID_ARGUMENT.withDescription(
String.format("Missing mandatory metadata(%s) not found", entry.getKey())));
}
}
return rlsRequestHeaders;
}
@Override
public void shutdownNow(Status reason) {
shutdown(reason);
synchronized (lock) {
Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, OkHttpClientStream> entry = it.next();
it.remove();
entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
maybeClearInUse(entry.getValue());
}
for (OkHttpClientStream stream : pendingStreams) {
stream.transportState().transportReportStatus(reason, true, new Metadata());
maybeClearInUse(stream);
}
pendingStreams.clear();
stopIfNecessary();
}
}
@Override
public Authentication readAuthentication(final ServerCall<?, ?> call, final Metadata metadata) {
final SSLSession sslSession = call.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
if (sslSession == null) {
log.trace("Peer not verified via SSL");
return null;
}
Certificate[] certs;
try {
certs = sslSession.getPeerCertificates();
} catch (final SSLPeerUnverifiedException e) {
log.trace("Peer not verified via certificate", e);
return null;
}
return fromCertificate(certs[certs.length - 1]);
}
@Test
public void updateOobChannelAddresses_existingAddressDoesNotConnect() {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata()); // Create LB
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost");
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
List<SocketAddress> changedList = new ArrayList<>(servers.get(0).getAddresses());
changedList.add(new FakeSocketAddress("aDifferentServer"));
helper.updateOobChannelAddresses(oobChannel, new EquivalentAddressGroup(changedList));
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
assertNull(newTransports.poll());
}
@Test
public void rstStreamClosesStream() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
// The application will call request when waiting for a message
stream.request(1);
// Send first byte of 2 byte message
stream.transportState().deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 2, 1}));
Status status = Status.INTERNAL.withDescription("rst___stream");
// Simulate getting a reset
stream.transportState().transportReportStatus(status, false /*stop delivery*/, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockListener)
.closed(statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertSame(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals("rst___stream", statusCaptor.getValue().getDescription());
}
@Test
public void pendingStreamCancelled() throws Exception {
initTransport();
setMaxConcurrentStreams(0);
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
waitForStreamPending(1);
stream.cancel(Status.CANCELLED);
// The second cancel should be an no-op.
stream.cancel(Status.UNKNOWN);
listener.waitUntilStreamClosed();
assertEquals(0, clientTransport.getPendingStreamSize());
assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
shutdownAndVerify();
}
@Test
public void streamSucceededWithGrpcError() {
ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
verify(factory)
.newBidirectionalStreamBuilder(
isA(String.class), callbackCaptor.capture(), isA(Executor.class));
BidirectionalStream.Callback callback = callbackCaptor.getValue();
callback.onStreamReady(cronetStream);
verify(cronetStream, times(0)).write(isA(ByteBuffer.class), isA(Boolean.class));
clientStream.abstractClientStreamSink().writeFrame(null, true, true, 1);
verify(cronetStream, times(1)).write(isA(ByteBuffer.class), isA(Boolean.class));
verify(cronetStream, times(1)).flush();
// Receive response header
clientStream.request(2);
UrlResponseInfo info =
new UrlResponseInfoImpl(
new ArrayList<String>(), 200, "", responseHeader("200"), false, "", "");
callback.onResponseHeadersReceived(cronetStream, info);
verify(cronetStream, times(1)).read(isA(ByteBuffer.class));
// Receive trailer
callback.onReadCompleted(cronetStream, null, ByteBuffer.allocate(0), true);
((CronetClientStream.BidirectionalStreamCallback) callback)
.processTrailers(trailers(Status.PERMISSION_DENIED.getCode().value()));
callback.onSucceeded(cronetStream, info);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(clientListener)
.closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class));
// Verify error status.
Status status = statusCaptor.getValue();
assertFalse(status.isOk());
assertEquals(Status.PERMISSION_DENIED.getCode(), status.getCode());
}
/**
* Processes the trailers and status from the server.
*
* @param trailers the received trailers
* @param status the status extracted from the trailers
*/
protected void inboundTrailersReceived(Metadata trailers, Status status) {
checkNotNull(status, "status");
checkNotNull(trailers, "trailers");
if (statusReported) {
log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
new Object[]{status, trailers});
return;
}
this.trailers = trailers;
trailerStatus = status;
closeDeframer(false);
}