下面列出了怎么用io.grpc.DecompressorRegistry的API类实例代码及写法,或者点击链接到github查看源代码。
@VisibleForTesting
static void prepareHeaders(
Metadata headers,
DecompressorRegistry decompressorRegistry,
Compressor compressor,
boolean fullStreamDecompression) {
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
}
headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
byte[] advertisedEncodings =
InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
if (advertisedEncodings.length != 0) {
headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
}
headers.discardAll(CONTENT_ENCODING_KEY);
headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
if (fullStreamDecompression) {
headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
}
}
@Test
public void prepareHeaders_removeReservedHeaders() {
Metadata m = new Metadata();
m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip");
m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
m.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip");
m.put(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
ClientCallImpl.prepareHeaders(
m, DecompressorRegistry.emptyInstance(), Codec.Identity.NONE, false);
assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY));
assertNull(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY));
}
private void sendMessage_serverSendsOne_closeOnSecondCall(
MethodDescriptor<Long, Long> method) {
ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>(
stream,
method,
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
verify(stream, never()).close(any(Status.class), any(Metadata.class));
// trying to send a second message causes gRPC to close the underlying stream
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription());
}
private void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion(
MethodDescriptor<Long, Long> method) {
ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>(
stream,
method,
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
verify(stream, times(1)).cancel(any(Status.class));
// App runs to completion but everything is ignored
serverCall.sendMessage(1L);
serverCall.close(Status.OK, new Metadata());
try {
serverCall.close(Status.OK, new Metadata());
fail("calling a second time should still cause an error");
} catch (IllegalStateException expected) {
// noop
}
}
private void serverSendsOne_okFailsOnMissingResponse(
MethodDescriptor<Long, Long> method) {
ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>(
stream,
method,
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCall.close(Status.OK, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription());
}
@Test
public void setDecompressorRegistryTest() {
DecompressorRegistry decompressor =
DecompressorRegistry.emptyInstance().with(new Decompressor() {
@Override
public String getMessageEncoding() {
return "some-encoding";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return is;
}
}, true);
forward.setDecompressorRegistry(decompressor);
verify(mock).setDecompressorRegistry(same(decompressor));
}
@ConditionalOnBean(GrpcCodecDiscoverer.class)
@ConditionalOnMissingBean
@Bean
public DecompressorRegistry defaultDecompressorRegistry(final GrpcCodecDiscoverer codecDiscoverer) {
log.debug("Found GrpcCodecDiscoverer -> Creating custom DecompressorRegistry");
DecompressorRegistry registry = DecompressorRegistry.getDefaultInstance();
for (final GrpcCodecDefinition definition : codecDiscoverer.findGrpcCodecs()) {
if (definition.getCodecType().isForDecompression()) {
final Codec codec = definition.getCodec();
final boolean isAdvertised = definition.isAdvertised();
log.debug("Registering {} decompressor: '{}' ({})",
isAdvertised ? "advertised" : "", codec.getMessageEncoding(), codec.getClass().getName());
registry = registry.with(codec, isAdvertised);
}
}
return registry;
}
@ConditionalOnBean(GrpcCodecDiscoverer.class)
@ConditionalOnMissingBean
@Bean
public DecompressorRegistry defaultDecompressorRegistry(final GrpcCodecDiscoverer codecDiscoverer) {
log.debug("Found GrpcCodecDiscoverer -> Creating custom DecompressorRegistry");
DecompressorRegistry registry = DecompressorRegistry.getDefaultInstance();
for (final GrpcCodecDefinition definition : codecDiscoverer.findGrpcCodecs()) {
if (definition.getCodecType().isForDecompression()) {
final Codec codec = definition.getCodec();
final boolean isAdvertised = definition.isAdvertised();
log.debug("Registering {} decompressor: '{}' ({})",
isAdvertised ? "advertised" : "", codec.getMessageEncoding(), codec.getClass().getName());
registry = registry.with(codec, isAdvertised);
}
}
return registry;
}
/**
* Constructs a new {@link GrpcService} that can be bound to
* {@link ServerBuilder}. It is recommended to bind the service to a server using
* {@linkplain ServerBuilder#service(HttpServiceWithRoutes, Function[])
* ServerBuilder.service(HttpServiceWithRoutes)} to mount all service paths
* without interfering with other services.
*/
public GrpcService build() {
final HandlerRegistry handlerRegistry = registryBuilder.build();
final GrpcService grpcService = new FramedGrpcService(
handlerRegistry,
handlerRegistry
.methods()
.keySet()
.stream()
.map(path -> Route.builder().exact('/' + path).build())
.collect(ImmutableSet.toImmutableSet()),
firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
supportedSerializationFormats,
jsonMarshallerFactory,
protoReflectionServiceInterceptor,
maxOutboundMessageSizeBytes,
useBlockingTaskExecutor,
unsafeWrapRequestBuffers,
useClientTimeoutHeader,
maxInboundMessageSizeBytes);
return enableUnframedRequests ? new UnframedGrpcService(grpcService) : grpcService;
}
ArmeriaChannel(ClientBuilderParams params,
HttpClient httpClient,
MeterRegistry meterRegistry,
SessionProtocol sessionProtocol,
SerializationFormat serializationFormat,
@Nullable GrpcJsonMarshaller jsonMarshaller) {
this.params = params;
this.httpClient = PooledHttpClient.of(httpClient);
this.meterRegistry = meterRegistry;
this.sessionProtocol = sessionProtocol;
this.serializationFormat = serializationFormat;
this.jsonMarshaller = jsonMarshaller;
advertisedEncodingsHeader = String.join(
",", DecompressorRegistry.getDefaultInstance().getAdvertisedMessageEncodings());
}
@Test
public void messageRead_wrappedByteBuf() {
tearDown();
call = new ArmeriaServerCall<>(
HttpHeaders.of(),
TestServiceGrpc.getUnaryCallMethod(),
CompressorRegistry.getDefaultInstance(),
DecompressorRegistry.getDefaultInstance(),
res,
MAX_MESSAGE_BYTES,
MAX_MESSAGE_BYTES,
ctx,
GrpcSerializationFormats.PROTO,
new DefaultJsonMarshaller(MessageMarshaller.builder().build()),
true,
false,
ResponseHeaders.builder(HttpStatus.OK)
.contentType(GrpcSerializationFormats.PROTO.mediaType())
.build());
final ByteBuf buf = GrpcTestUtil.requestByteBuf();
call.messageRead(new DeframedMessage(buf, 0));
verify(buffersAttr).put(any(), same(buf));
}
@Test
void uncompressedClient_compressedEndpoint() throws Exception {
final ManagedChannel nonDecompressingChannel =
ManagedChannelBuilder.forAddress("127.0.0.1", server.httpPort())
.decompressorRegistry(
DecompressorRegistry.emptyInstance()
.with(Codec.Identity.NONE, false))
.usePlaintext()
.build();
final UnitTestServiceBlockingStub client = UnitTestServiceGrpc.newBlockingStub(
nonDecompressingChannel);
assertThat(client.staticUnaryCallSetsMessageCompression(REQUEST_MESSAGE))
.isEqualTo(RESPONSE_MESSAGE);
nonDecompressingChannel.shutdownNow();
checkRequestLog((rpcReq, rpcRes, grpcStatus) -> {
assertThat(rpcReq.method()).isEqualTo(
"armeria.grpc.testing.UnitTestService/StaticUnaryCallSetsMessageCompression");
assertThat(rpcReq.params()).containsExactly(REQUEST_MESSAGE);
assertThat(rpcRes.get()).isEqualTo(RESPONSE_MESSAGE);
});
}
@VisibleForTesting
static void prepareHeaders(
Metadata headers,
DecompressorRegistry decompressorRegistry,
Compressor compressor,
boolean fullStreamDecompression) {
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
}
headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
byte[] advertisedEncodings =
InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
if (advertisedEncodings.length != 0) {
headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
}
headers.discardAll(CONTENT_ENCODING_KEY);
headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
if (fullStreamDecompression) {
headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
}
}
@Test
public void prepareHeaders_removeReservedHeaders() {
Metadata m = new Metadata();
m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip");
m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
m.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip");
m.put(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
ClientCallImpl.prepareHeaders(
m, DecompressorRegistry.emptyInstance(), Codec.Identity.NONE, false);
assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY));
assertNull(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY));
}
private void sendMessage_serverSendsOne_closeOnSecondCall(
MethodDescriptor<Long, Long> method) {
ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<>(
stream,
method,
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer,
PerfMark.createTag());
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
verify(stream, never()).close(any(Status.class), any(Metadata.class));
// trying to send a second message causes gRPC to close the underlying stream
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription());
}
private void serverSendsOne_okFailsOnMissingResponse(
MethodDescriptor<Long, Long> method) {
ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<>(
stream,
method,
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer,
PerfMark.createTag());
serverCall.close(Status.OK, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription());
}
@Test
public void setDecompressorRegistryTest() {
DecompressorRegistry decompressor =
DecompressorRegistry.emptyInstance().with(new Decompressor() {
@Override
public String getMessageEncoding() {
return "some-encoding";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return is;
}
}, true);
forward.setDecompressorRegistry(decompressor);
verify(mock).setDecompressorRegistry(same(decompressor));
}
@Override
public final T decompressorRegistry(DecompressorRegistry registry) {
if (registry != null) {
this.decompressorRegistry = registry;
} else {
this.decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
}
return thisT();
}
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
Metadata inboundHeaders, Context.CancellableContext context,
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
CallTracer serverCallTracer) {
this.stream = stream;
this.method = method;
this.context = context;
this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
this.decompressorRegistry = decompressorRegistry;
this.compressorRegistry = compressorRegistry;
this.serverCallTracer = serverCallTracer;
this.serverCallTracer.reportCallStarted();
}
@Override
public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
checkNotNull(decompressorRegistry, "decompressorRegistry");
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setDecompressorRegistry(decompressorRegistry);
}
});
}
@Override
public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
class DecompressorRegistryEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.setDecompressorRegistry(decompressorRegistry);
}
}
delayOrExecute(new DecompressorRegistryEntry());
}
@Override
public final T decompressorRegistry(DecompressorRegistry registry) {
if (registry != null) {
decompressorRegistry = registry;
} else {
decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
}
return thisT();
}
@Test
public void setStream_sendsAllMessages() {
stream.start(listener);
stream.setCompressor(Codec.Identity.NONE);
stream.setDecompressorRegistry(DecompressorRegistry.getDefaultInstance());
stream.setMessageCompression(true);
InputStream message = new ByteArrayInputStream(new byte[]{'a'});
stream.writeMessage(message);
stream.setMessageCompression(false);
stream.writeMessage(message);
stream.setStream(realStream);
verify(realStream).setCompressor(Codec.Identity.NONE);
verify(realStream).setDecompressorRegistry(DecompressorRegistry.getDefaultInstance());
verify(realStream).setMessageCompression(true);
verify(realStream).setMessageCompression(false);
verify(realStream, times(2)).writeMessage(message);
verify(realStream).start(listenerCaptor.capture());
stream.writeMessage(message);
verify(realStream, times(3)).writeMessage(message);
verifyNoMoreInteractions(listener);
listenerCaptor.getValue().onReady();
verify(listener).onReady();
}
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
context = Context.ROOT.withCancellation();
call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
serverCallTracer);
}
private void callTracer0(Status status) {
CallTracer tracer = CallTracer.getDefaultFactory().create();
ServerStats.Builder beforeBuilder = new ServerStats.Builder();
tracer.updateBuilder(beforeBuilder);
ServerStats before = beforeBuilder.build();
assertEquals(0, before.callsStarted);
assertEquals(0, before.lastCallStartedNanos);
call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
tracer);
// required boilerplate
call.sendHeaders(new Metadata());
call.sendMessage(123L);
// end: required boilerplate
call.close(status, new Metadata());
ServerStats.Builder afterBuilder = new ServerStats.Builder();
tracer.updateBuilder(afterBuilder);
ServerStats after = afterBuilder.build();
assertEquals(1, after.callsStarted);
if (status.isOk()) {
assertEquals(1, after.callsSucceeded);
} else {
assertEquals(1, after.callsFailed);
}
}
@Test
public void decompressorRegistry_normal() {
DecompressorRegistry decompressorRegistry = DecompressorRegistry.emptyInstance();
assertNotEquals(decompressorRegistry, builder.decompressorRegistry);
assertEquals(builder, builder.decompressorRegistry(decompressorRegistry));
assertEquals(decompressorRegistry, builder.decompressorRegistry);
}
@Test
public void decompressorRegistry_null() {
DecompressorRegistry defaultValue = builder.decompressorRegistry;
assertEquals(builder, builder.decompressorRegistry(DecompressorRegistry.emptyInstance()));
assertNotEquals(defaultValue, builder.decompressorRegistry);
builder.decompressorRegistry(null);
assertEquals(defaultValue, builder.decompressorRegistry);
}
private static ManagedChannel createChannel(BotSystemConfig config) throws Exception {
Security.addProvider(new BouncyCastleProvider());
NettyChannelBuilder nettyChannelBuilder = (NettyChannelBuilder) ManagedChannelBuilder
.forAddress(config.getHost(), config.getPort())
.idleTimeout(15, SECONDS)
.keepAliveTime(30, SECONDS);
if (config.getCertPath() != null && config.getCertPassword() != null) {
File certFile = new File(config.getCertPath());
SslContext sslContext = GrpcSslContexts.forClient()
.keyManager(NetUtils.createKeyFactory(certFile, config.getCertPassword()))
.build();
nettyChannelBuilder.sslContext(sslContext);
}
if (!config.isSecure()) {
nettyChannelBuilder.usePlaintext();
}
if (!config.isCompression()) {
nettyChannelBuilder.decompressorRegistry(DecompressorRegistry.emptyInstance());
}
return nettyChannelBuilder.build();
}
@Nullable
private static Decompressor clientDecompressor(HttpHeaders headers, DecompressorRegistry registry) {
final String encoding = headers.get(GrpcHeaderNames.GRPC_ENCODING);
if (encoding == null) {
return ForwardingDecompressor.forGrpc(Identity.NONE);
}
final io.grpc.Decompressor decompressor = registry.lookupDecompressor(encoding);
if (decompressor != null) {
return ForwardingDecompressor.forGrpc(decompressor);
}
return ForwardingDecompressor.forGrpc(Identity.NONE);
}
public HttpStreamReader(DecompressorRegistry decompressorRegistry,
ArmeriaMessageDeframer deframer,
TransportStatusListener transportStatusListener) {
this.decompressorRegistry = requireNonNull(decompressorRegistry, "decompressorRegistry");
this.deframer = requireNonNull(deframer, "deframer");
this.transportStatusListener = requireNonNull(transportStatusListener, "transportStatusListener");
}