下面列出了io.grpc.Metadata#put ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void inboundHeadersReceived_disallowsContentAndMessageEncoding() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip");
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding());
stream.setFullStreamDecompression(true);
stream.transportState().inboundHeadersReceived(headers);
verifyNoMoreInteractions(mockListener);
Throwable t = ((BaseTransportState) stream.transportState()).getDeframeFailedCause();
assertEquals(Status.INTERNAL.getCode(), Status.fromThrowable(t).getCode());
assertTrue(
"unexpected deframe failed description",
Status.fromThrowable(t)
.getDescription()
.equals("Full stream and gRPC message encoding cannot both be set"));
}
@Test
@SuppressWarnings("UndefinedEquals") // AsciiString.equals
public void convertServerHeaders_sanitizes() {
Metadata metaData = new Metadata();
// Intentionally being explicit here rather than relying on any pre-defined lists of headers,
// since the goal of this test is to validate the correctness of such lists in the first place.
metaData.put(GrpcUtil.CONTENT_TYPE_KEY, "to-be-removed");
metaData.put(GrpcUtil.TE_HEADER, "to-be-removed");
metaData.put(GrpcUtil.USER_AGENT_KEY, "to-be-removed");
metaData.put(userKey, userValue);
Http2Headers output = Utils.convertServerHeaders(metaData);
DefaultHttp2Headers headers = new DefaultHttp2Headers();
for (Map.Entry<CharSequence, CharSequence> entry : output) {
headers.add(entry.getKey(), entry.getValue());
}
// 2 reserved headers, 1 user header
assertEquals(2 + 1, headers.size());
assertEquals(Utils.CONTENT_TYPE_GRPC, headers.get(GrpcUtil.CONTENT_TYPE_KEY.name()));
}
@Test
public void exchangeMetadataUnaryCall() throws Exception {
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub;
// Capture the metadata exchange
Metadata fixedHeaders = new Metadata();
// Send a context proto (as it's in the default extension registry)
Messages.SimpleContext contextValue =
Messages.SimpleContext.newBuilder().setValue("dog").build();
fixedHeaders.put(Util.METADATA_KEY, contextValue);
stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
// .. and expect it to be echoed back in trailers
AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
AtomicReference<Metadata> headersCapture = new AtomicReference<>();
stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
assertNotNull(stub.emptyCall(EMPTY));
// Assert that our side channel object is echoed back in both headers and trailers
Assert.assertEquals(contextValue, headersCapture.get().get(Util.METADATA_KEY));
Assert.assertEquals(contextValue, trailersCapture.get().get(Util.METADATA_KEY));
}
@Test
public void transportTrailersReceived_afterHeaders() {
BaseTransportState state = new BaseTransportState(transportTracer);
state.setListener(mockListener);
Metadata headers = new Metadata();
headers.put(testStatusMashaller, "200");
headers.put(Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER),
"application/grpc");
state.transportHeadersReceived(headers);
Metadata trailers = new Metadata();
trailers.put(Metadata.Key.of("grpc-status", Metadata.ASCII_STRING_MARSHALLER), "0");
state.transportTrailersReceived(trailers);
verify(mockListener).headersRead(headers);
verify(mockListener).closed(Status.OK, PROCESSED, trailers);
}
@Test
public void inboundHeadersReceived_failsOnUnrecognizedMessageEncoding() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, "not-a-real-compression-method");
stream.transportState().inboundHeadersReceived(headers);
verifyNoMoreInteractions(mockListener);
Throwable t = ((BaseTransportState) stream.transportState()).getDeframeFailedCause();
assertEquals(Status.INTERNAL.getCode(), Status.fromThrowable(t).getCode());
assertTrue(
"unexpected deframe failed description",
Status.fromThrowable(t).getDescription().startsWith("Can't find decompressor for"));
}
@Test
public void transportHeadersReceived_twice() {
BaseTransportState state = new BaseTransportState(transportTracer);
state.setListener(mockListener);
Metadata headers = new Metadata();
headers.put(testStatusMashaller, "200");
headers.put(Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER),
"application/grpc");
state.transportHeadersReceived(headers);
Metadata headersAgain = new Metadata();
state.transportHeadersReceived(headersAgain);
state.transportDataReceived(ReadableBuffers.empty(), true);
verify(mockListener).headersRead(headers);
verify(mockListener).closed(statusCaptor.capture(), same(PROCESSED), same(headersAgain));
assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode());
assertTrue(statusCaptor.getValue().getDescription().contains("twice"));
}
@Test
// https://tools.ietf.org/html/rfc7231#section-3.1.2.1
public void inboundHeadersReceived_contentEncodingIsCaseInsensitive() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.CONTENT_ENCODING_KEY, "gZIp");
stream.setFullStreamDecompression(true);
stream.transportState().inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
}
@Override
public PickResult picked(Metadata headers) {
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
if (token != null) {
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
}
return result;
}
@Test
public void stickinessEnabled_withStickyHeader() {
Map<String, Object> serviceConfig = new HashMap<String, Object>();
serviceConfig.put("stickinessMetadataKey", "my-sticky-key");
Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes);
for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
assertEquals(sc1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(sc1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(sc1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(sc1, picker.pickSubchannel(mockArgs).getSubchannel());
verify(mockArgs, atLeast(4)).getHeaders();
assertNotNull(loadBalancer.getStickinessMapForTest());
assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1);
}
@Test
void exchangeHeadersUnaryCall_grpcMetadata() throws Exception {
final Metadata metadata = new Metadata();
metadata.put(TestServiceImpl.EXTRA_HEADER_KEY, "dog");
TestServiceBlockingStub stub = MetadataUtils.attachHeaders(blockingStub, metadata);
final AtomicReference<Metadata> headers = new AtomicReference<>();
final AtomicReference<Metadata> trailers = new AtomicReference<>();
stub = MetadataUtils.captureMetadata(stub, headers, trailers);
assertThat(stub.emptyCall(EMPTY)).isNotNull();
final HttpHeaders clientHeaders = CLIENT_HEADERS_CAPTURE.get();
assertThat(clientHeaders.get(HttpHeaderNames.TE))
.isEqualTo(HttpHeaderValues.TRAILERS.toString());
// Assert that our side channel object is echoed back in both headers and trailers
assertThat(clientHeaders.get(TestServiceImpl.EXTRA_HEADER_NAME)).isEqualTo("dog");
assertThat(SERVER_TRAILERS_CAPTURE.get().get(TestServiceImpl.EXTRA_HEADER_NAME)).isEqualTo("dog");
assertThat(headers.get()).isNull();
assertThat(trailers.get().get(TestServiceImpl.EXTRA_HEADER_KEY)).isEqualTo("dog");
assertThat(trailers.get().getAll(TestServiceImpl.STRING_VALUE_KEY)).containsExactly(
StringValue.newBuilder().setValue("hello").build(),
StringValue.newBuilder().setValue("world").build());
checkRequestLog((rpcReq, rpcRes, grpcStatus) -> {
assertThat(rpcReq.params()).containsExactly(EMPTY);
assertThat(rpcRes.get()).isEqualTo(EMPTY);
});
}
@Test
public void inboundHeadersReceived_acceptsGzipMessageEncoding() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding());
stream.transportState().inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
}
/** Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy. */
public void cacheableUnary() {
// Set safe to true.
MethodDescriptor<SimpleRequest, SimpleResponse> safeCacheableUnaryCallMethod =
TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build();
// Set fake user IP since some proxies (GFE) won't cache requests from localhost.
Metadata.Key<String> userIpKey = Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(userIpKey, "1.2.3.4");
Channel channelWithUserIpKey =
ClientInterceptors.intercept(channel, MetadataUtils.newAttachHeadersInterceptor(metadata));
SimpleRequest requests1And2 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleRequest request3 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleResponse response1 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response2 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response3 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3);
assertEquals(response1, response2);
assertNotEquals(response1, response3);
}
@Test
public void transportHeadersReceived_notifiesListener() {
BaseTransportState state = new BaseTransportState(transportTracer);
state.setListener(mockListener);
Metadata headers = new Metadata();
headers.put(testStatusMashaller, "200");
headers.put(Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER),
"application/grpc");
state.transportHeadersReceived(headers);
verify(mockListener, never()).closed(any(Status.class), same(PROCESSED), any(Metadata.class));
verify(mockListener).headersRead(headers);
}
private static ApiException getApiExceptionForVersion(
Metadata.Key<byte[]> failureKey, byte[] data) {
Metadata trailers = new Metadata();
if (data != null) {
trailers.put(failureKey, data);
}
return new ApiException(
new StatusException(Status.UNKNOWN, trailers), GrpcStatusCode.of(Code.UNKNOWN), false);
}
@Test
public void inboundHeadersReceived_acceptsGzipContentEncoding() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip");
stream.setFullStreamDecompression(true);
stream.transportState().inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
}
private static final Metadata toHeaders(Map<String, List<String>> metadata) {
Metadata headers = new Metadata();
if (metadata != null) {
for (String key : metadata.keySet()) {
Metadata.Key<String> headerKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
for (String value : metadata.get(key)) {
headers.put(headerKey, value);
}
}
}
return headers;
}
@Test(expected = StatusRuntimeException.class)
public void extract_fail_agentId() {
Metadata metadata = newMetadata();
metadata.put(Header.AGENT_ID_KEY, "!!agentId");
reader.extract(metadata);
}
private static Metadata tokenHeader(AuthenticateResponse authResponse) {
Metadata header = new Metadata();
header.put(TOKEN_KEY, authResponse.getToken());
return header;
}
/**
* Converts an internal exception thrown by Conductor into an StatusException
* that uses modern "Status" metadata for GRPC.
*
* Note that this is trickier than it ought to be because the GRPC APIs have
* not been upgraded yet. Here's a quick breakdown of how this works in practice:
*
* Reporting a "status" result back to a client with GRPC is pretty straightforward.
* GRPC implementations simply serialize the status into several HTTP/2 trailer headers that
* are sent back to the client before shutting down the HTTP/2 stream.
*
* - 'grpc-status', which is a string representation of a {@link com.google.rpc.Code}
* - 'grpc-message', which is the description of the returned status
* - 'grpc-status-details-bin' (optional), which is an arbitrary payload with a serialized
* ProtoBuf object, containing an accurate description of the error in case the status is not
* successful.
*
* By convention, Google provides a default set of ProtoBuf messages for the most common
* error cases. Here, we'll be using {@link DebugInfo}, as we're reporting an internal
* Java exception which we couldn't properly handle.
*
* Now, how do we go about sending all those headers _and_ the {@link DebugInfo} payload
* using the Java GRPC API?
*
* The only way we can return an error with the Java API is by passing an instance of
* {@link io.grpc.StatusException} or {@link io.grpc.StatusRuntimeException} to
* {@link StreamObserver#onError(Throwable)}. The easiest way to create either of these
* exceptions is by using the {@link Status} class and one of its predefined code
* identifiers (in this case, {@link Status#INTERNAL} because we're reporting an internal
* exception). The {@link Status} class has setters to set its most relevant attributes,
* namely those that will be automatically serialized into the 'grpc-status' and 'grpc-message'
* trailers in the response. There is, however, no setter to pass an arbitrary ProtoBuf message
* to be serialized into a `grpc-status-details-bin` trailer. This feature exists in the other
* language implementations but it hasn't been brought to Java yet.
*
* Fortunately, {@link Status#asException(Metadata)} exists, allowing us to pass any amount
* of arbitrary trailers before we close the response. So we're using this API to manually
* craft the 'grpc-status-detail-bin' trailer, in the same way that the GRPC server implementations
* for Go and C++ craft and serialize the header. This will allow us to access the metadata
* cleanly from Go and C++ clients by using the 'details' method which _has_ been implemented
* in those two clients.
*
* @param t The exception to convert
* @return an instance of {@link StatusException} which will properly serialize all its
* headers into the response.
*/
private StatusException throwableToStatusException(Throwable t) {
String[] frames = ExceptionUtils.getStackFrames(t);
Metadata metadata = new Metadata();
metadata.put(STATUS_DETAILS_KEY,
DebugInfo.newBuilder()
.addAllStackEntries(Arrays.asList(frames))
.setDetail(ExceptionUtils.getMessage(t))
.build()
);
return Status.INTERNAL
.withDescription(t.getMessage())
.withCause(t)
.asException(metadata);
}
/**
* Creates a new call credential with the given token for bearer auth.
*
* <p>
* <b>Note:</b> This method uses experimental grpc-java-API features.
* </p>
*
* @param token the bearer token to use
* @return The newly created bearer auth credentials.
*/
public static CallCredentials bearerAuth(final String token) {
final Metadata extraHeaders = new Metadata();
extraHeaders.put(AUTHORIZATION_HEADER, BEARER_AUTH_PREFIX + token);
return new StaticSecurityHeaderCallCredentials(extraHeaders);
}