下面列出了怎么用io.grpc.Codec的API类实例代码及写法,或者点击链接到github查看源代码。
@VisibleForTesting
static void prepareHeaders(
Metadata headers,
DecompressorRegistry decompressorRegistry,
Compressor compressor,
boolean fullStreamDecompression) {
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
}
headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
byte[] advertisedEncodings =
InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
if (advertisedEncodings.length != 0) {
headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
}
headers.discardAll(CONTENT_ENCODING_KEY);
headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
if (fullStreamDecompression) {
headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
}
}
private InputStream getCompressedBody() {
if (decompressor == Codec.Identity.NONE) {
throw Status.INTERNAL.withDescription(
"Can't decode compressed gRPC message as compression not configured")
.asRuntimeException();
}
try {
// Enforce the maxMessageSize limit on the returned stream.
InputStream unlimitedStream =
decompressor.decompress(ReadableBuffers.openStream(nextFrame, true));
return new SizeEnforcingInputStream(
unlimitedStream, maxInboundMessageSize, statsTraceCtx);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void prepareHeaders_removeReservedHeaders() {
Metadata m = new Metadata();
m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip");
m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
m.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip");
m.put(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
ClientCallImpl.prepareHeaders(
m, DecompressorRegistry.emptyInstance(), Codec.Identity.NONE, false);
assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY));
assertNull(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY));
}
@Test
public void inboundHeadersReceived_disallowsContentAndMessageEncoding() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip");
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding());
stream.setFullStreamDecompression(true);
stream.transportState().inboundHeadersReceived(headers);
verifyNoMoreInteractions(mockListener);
Throwable t = ((BaseTransportState) stream.transportState()).getDeframeFailedCause();
assertEquals(Status.INTERNAL.getCode(), Status.fromThrowable(t).getCode());
assertTrue(
"unexpected deframe failed description",
Status.fromThrowable(t)
.getDescription()
.equals("Full stream and gRPC message encoding cannot both be set"));
}
@Test
public void endOfStreamWithInvalidGzipBlockShouldNotifyDeframerClosedWithPartialMessage() {
assumeTrue("test only valid for full-stream compression", useGzipInflatingBuffer);
// Create new deframer to allow writing bytes directly to the GzipInflatingBuffer
MessageDeframer deframer = new MessageDeframer(listener, Codec.Identity.NONE,
DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer);
deframer.setFullStreamDecompressor(new GzipInflatingBuffer());
deframer.request(1);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[1]));
deframer.closeWhenComplete();
verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock);
}
@Test
public void compressed() throws Exception {
allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE);
// setMessageCompression should default to true
framer = new MessageFramer(sink, allocator, statsTraceCtx)
.setCompressor(new Codec.Gzip());
writeKnownLength(framer, new byte[1000]);
framer.flush();
// The GRPC header is written first as a separate frame.
// The message count is only bumped when a message is completely written.
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(false), eq(0));
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1));
// Check the header
ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0);
assertEquals(0x1, buffer.data[0]);
ByteBuffer byteBuf = ByteBuffer.wrap(buffer.data, 1, 4);
byteBuf.order(ByteOrder.BIG_ENDIAN);
int length = byteBuf.getInt();
// compressed data should be smaller than uncompressed data.
assertTrue(length < 1000);
assertEquals(frameCaptor.getAllValues().get(1).size(), length);
checkStats(length, 1000);
}
@Test
public void dontCompressIfNotRequested() throws Exception {
allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE);
framer = new MessageFramer(sink, allocator, statsTraceCtx)
.setCompressor(new Codec.Gzip())
.setMessageCompression(false);
writeKnownLength(framer, new byte[1000]);
framer.flush();
// The GRPC header is written first as a separate frame
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1));
// Check the header
ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0);
// We purposefully don't check the last byte of length, since that depends on how exactly it
// compressed.
assertEquals(0x0, buffer.data[0]);
ByteBuffer byteBuf = ByteBuffer.wrap(buffer.data, 1, 4);
byteBuf.order(ByteOrder.BIG_ENDIAN);
int length = byteBuf.getInt();
assertEquals(1000, length);
assertEquals(buffer.data.length - 5 , length);
checkStats(1000, 1000);
}
@ConditionalOnBean(GrpcCodecDiscoverer.class)
@ConditionalOnMissingBean
@Bean
public DecompressorRegistry defaultDecompressorRegistry(final GrpcCodecDiscoverer codecDiscoverer) {
log.debug("Found GrpcCodecDiscoverer -> Creating custom DecompressorRegistry");
DecompressorRegistry registry = DecompressorRegistry.getDefaultInstance();
for (final GrpcCodecDefinition definition : codecDiscoverer.findGrpcCodecs()) {
if (definition.getCodecType().isForDecompression()) {
final Codec codec = definition.getCodec();
final boolean isAdvertised = definition.isAdvertised();
log.debug("Registering {} decompressor: '{}' ({})",
isAdvertised ? "advertised" : "", codec.getMessageEncoding(), codec.getClass().getName());
registry = registry.with(codec, isAdvertised);
}
}
return registry;
}
@Override
public Collection<GrpcCodecDefinition> findGrpcCodecs() {
if (this.definitions == null) {
log.debug("Searching for codecs...");
final String[] beanNames = this.applicationContext.getBeanNamesForAnnotation(GrpcCodec.class);
final ImmutableList.Builder<GrpcCodecDefinition> builder = ImmutableList.builder();
for (final String beanName : beanNames) {
final Codec codec = this.applicationContext.getBean(beanName, Codec.class);
final GrpcCodec annotation = this.applicationContext.findAnnotationOnBean(beanName, GrpcCodec.class);
builder.add(new GrpcCodecDefinition(codec, annotation.advertised(), annotation.codecType()));
log.debug("Found gRPC codec: {}, bean: {}, class: {}",
codec.getMessageEncoding(), beanName, codec.getClass().getName());
}
this.definitions = builder.build();
log.debug("Done");
}
return this.definitions;
}
@ConditionalOnBean(GrpcCodecDiscoverer.class)
@ConditionalOnMissingBean
@Bean
public DecompressorRegistry defaultDecompressorRegistry(final GrpcCodecDiscoverer codecDiscoverer) {
log.debug("Found GrpcCodecDiscoverer -> Creating custom DecompressorRegistry");
DecompressorRegistry registry = DecompressorRegistry.getDefaultInstance();
for (final GrpcCodecDefinition definition : codecDiscoverer.findGrpcCodecs()) {
if (definition.getCodecType().isForDecompression()) {
final Codec codec = definition.getCodec();
final boolean isAdvertised = definition.isAdvertised();
log.debug("Registering {} decompressor: '{}' ({})",
isAdvertised ? "advertised" : "", codec.getMessageEncoding(), codec.getClass().getName());
registry = registry.with(codec, isAdvertised);
}
}
return registry;
}
@Override
public Collection<GrpcCodecDefinition> findGrpcCodecs() {
if (this.definitions == null) {
log.debug("Searching for codecs...");
final String[] beanNames = this.applicationContext.getBeanNamesForAnnotation(GrpcCodec.class);
final ImmutableList.Builder<GrpcCodecDefinition> builder = ImmutableList.builder();
for (final String beanName : beanNames) {
final Codec codec = this.applicationContext.getBean(beanName, Codec.class);
final GrpcCodec annotation = this.applicationContext.findAnnotationOnBean(beanName, GrpcCodec.class);
builder.add(new GrpcCodecDefinition(codec, annotation.advertised(), annotation.codecType()));
log.debug("Found gRPC codec: {}, bean: {}, class: {}",
codec.getMessageEncoding(), beanName, codec.getClass().getName());
}
this.definitions = builder.build();
log.debug("Done");
}
return this.definitions;
}
@Test
void uncompressedClient_compressedEndpoint() throws Exception {
final ManagedChannel nonDecompressingChannel =
ManagedChannelBuilder.forAddress("127.0.0.1", server.httpPort())
.decompressorRegistry(
DecompressorRegistry.emptyInstance()
.with(Codec.Identity.NONE, false))
.usePlaintext()
.build();
final UnitTestServiceBlockingStub client = UnitTestServiceGrpc.newBlockingStub(
nonDecompressingChannel);
assertThat(client.staticUnaryCallSetsMessageCompression(REQUEST_MESSAGE))
.isEqualTo(RESPONSE_MESSAGE);
nonDecompressingChannel.shutdownNow();
checkRequestLog((rpcReq, rpcRes, grpcStatus) -> {
assertThat(rpcReq.method()).isEqualTo(
"armeria.grpc.testing.UnitTestService/StaticUnaryCallSetsMessageCompression");
assertThat(rpcReq.params()).containsExactly(REQUEST_MESSAGE);
assertThat(rpcRes.get()).isEqualTo(RESPONSE_MESSAGE);
});
}
@VisibleForTesting
static void prepareHeaders(
Metadata headers,
DecompressorRegistry decompressorRegistry,
Compressor compressor,
boolean fullStreamDecompression) {
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
}
headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
byte[] advertisedEncodings =
InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
if (advertisedEncodings.length != 0) {
headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
}
headers.discardAll(CONTENT_ENCODING_KEY);
headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
if (fullStreamDecompression) {
headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
}
}
private InputStream getCompressedBody() {
if (decompressor == Codec.Identity.NONE) {
throw Status.INTERNAL.withDescription(
"Can't decode compressed gRPC message as compression not configured")
.asRuntimeException();
}
try {
// Enforce the maxMessageSize limit on the returned stream.
InputStream unlimitedStream =
decompressor.decompress(ReadableBuffers.openStream(nextFrame, true));
return new SizeEnforcingInputStream(
unlimitedStream, maxInboundMessageSize, statsTraceCtx);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void prepareHeaders_removeReservedHeaders() {
Metadata m = new Metadata();
m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip");
m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
m.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip");
m.put(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
ClientCallImpl.prepareHeaders(
m, DecompressorRegistry.emptyInstance(), Codec.Identity.NONE, false);
assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY));
assertNull(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY));
}
@Test
public void inboundHeadersReceived_disallowsContentAndMessageEncoding() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip");
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding());
stream.setFullStreamDecompression(true);
stream.transportState().inboundHeadersReceived(headers);
verifyNoMoreInteractions(mockListener);
Throwable t = ((BaseTransportState) stream.transportState()).getDeframeFailedCause();
assertEquals(Status.INTERNAL.getCode(), Status.fromThrowable(t).getCode());
assertTrue(
"unexpected deframe failed description",
Status.fromThrowable(t)
.getDescription()
.equals("Full stream and gRPC message encoding cannot both be set"));
}
@Test
public void endOfStreamWithInvalidGzipBlockShouldNotifyDeframerClosedWithPartialMessage() {
assumeTrue("test only valid for full-stream compression", useGzipInflatingBuffer);
// Create new deframer to allow writing bytes directly to the GzipInflatingBuffer
MessageDeframer deframer = new MessageDeframer(listener, Codec.Identity.NONE,
DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer);
deframer.setFullStreamDecompressor(new GzipInflatingBuffer());
deframer.request(1);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[1]));
deframer.closeWhenComplete();
verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock);
}
@Test
public void compressed() {
allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE);
// setMessageCompression should default to true
framer = new MessageFramer(sink, allocator, statsTraceCtx)
.setCompressor(new Codec.Gzip());
writeKnownLength(framer, new byte[1000]);
framer.flush();
// The GRPC header is written first as a separate frame.
// The message count is only bumped when a message is completely written.
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(false), eq(0));
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1));
// Check the header
ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0);
assertEquals(0x1, buffer.data[0]);
ByteBuffer byteBuf = ByteBuffer.wrap(buffer.data, 1, 4);
byteBuf.order(ByteOrder.BIG_ENDIAN);
int length = byteBuf.getInt();
// compressed data should be smaller than uncompressed data.
assertTrue(length < 1000);
assertEquals(frameCaptor.getAllValues().get(1).size(), length);
checkStats(length, 1000);
}
@Test
public void dontCompressIfNotRequested() {
allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE);
framer = new MessageFramer(sink, allocator, statsTraceCtx)
.setCompressor(new Codec.Gzip())
.setMessageCompression(false);
writeKnownLength(framer, new byte[1000]);
framer.flush();
// The GRPC header is written first as a separate frame
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1));
// Check the header
ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0);
// We purposefully don't check the last byte of length, since that depends on how exactly it
// compressed.
assertEquals(0x0, buffer.data[0]);
ByteBuffer byteBuf = ByteBuffer.wrap(buffer.data, 1, 4);
byteBuf.order(ByteOrder.BIG_ENDIAN);
int length = byteBuf.getInt();
assertEquals(1000, length);
assertEquals(buffer.data.length - 5 , length);
checkStats(1000, 1000);
}
@Override
public void sendHeaders(Metadata headers) {
checkState(!sendHeadersCalled, "sendHeaders has already been called");
checkState(!closeCalled, "call is closed");
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor == null) {
compressor = Codec.Identity.NONE;
} else {
if (messageAcceptEncoding != null) {
// TODO(carl-mastrangelo): remove the string allocation.
if (!GrpcUtil.iterableContains(
ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)),
compressor.getMessageEncoding())) {
// resort to using no compression.
compressor = Codec.Identity.NONE;
}
} else {
compressor = Codec.Identity.NONE;
}
}
// Always put compressor, even if it's identity.
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
stream.setCompressor(compressor);
headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
byte[] advertisedEncodings =
InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
if (advertisedEncodings.length != 0) {
headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
}
// Don't check if sendMessage has been called, since it requires that sendHeaders was already
// called.
sendHeadersCalled = true;
stream.writeHeaders(headers);
}
protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");
deframer = new MessageDeframer(
this,
Codec.Identity.NONE,
maxMessageSize,
statsTraceCtx,
transportTracer);
}
@Override
public void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
checkState(decompressor == Codec.Identity.NONE, "per-message decompressor already set");
checkState(this.fullStreamDecompressor == null, "full stream decompressor already set");
this.fullStreamDecompressor =
checkNotNull(fullStreamDecompressor, "Can't pass a null full stream decompressor");
unprocessed = null;
}
@Test
public void setStream_sendsAllMessages() {
stream.start(listener);
stream.setCompressor(Codec.Identity.NONE);
stream.setDecompressorRegistry(DecompressorRegistry.getDefaultInstance());
stream.setMessageCompression(true);
InputStream message = new ByteArrayInputStream(new byte[]{'a'});
stream.writeMessage(message);
stream.setMessageCompression(false);
stream.writeMessage(message);
stream.setStream(realStream);
verify(realStream).setCompressor(Codec.Identity.NONE);
verify(realStream).setDecompressorRegistry(DecompressorRegistry.getDefaultInstance());
verify(realStream).setMessageCompression(true);
verify(realStream).setMessageCompression(false);
verify(realStream, times(2)).writeMessage(message);
verify(realStream).start(listenerCaptor.capture());
stream.writeMessage(message);
verify(realStream, times(3)).writeMessage(message);
verifyNoMoreInteractions(listener);
listenerCaptor.getValue().onReady();
verify(listener).onReady();
}
@Test
public void prepareHeaders_userAgentIgnored() {
Metadata m = new Metadata();
m.put(GrpcUtil.USER_AGENT_KEY, "batmobile");
ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, false);
// User Agent is removed and set by the transport
assertThat(m.get(GrpcUtil.USER_AGENT_KEY)).isNotNull();
}
@Test
public void prepareHeaders_ignoreIdentityEncoding() {
Metadata m = new Metadata();
ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, false);
assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY));
}
@Test
public void prepareHeaders_noAcceptedContentEncodingsWithoutFullStreamDecompressionEnabled() {
Metadata m = new Metadata();
ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, false);
assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY));
}
@Test
public void inboundHeadersReceived_acceptsGzipMessageEncoding() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding());
stream.transportState().inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
}
@Test
public void inboundHeadersReceived_acceptsIdentityMessageEncoding() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, Codec.Identity.NONE.getMessageEncoding());
stream.transportState().inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
}
@Test
public void compressed() {
deframer = new MessageDeframer(listener, new Codec.Gzip(), DEFAULT_MAX_MESSAGE_SIZE,
statsTraceCtx, transportTracer);
deframer.request(1);
byte[] payload = compress(new byte[1000]);
assertTrue(payload.length < 100);
byte[] header = new byte[]{1, 0, 0, 0, (byte) payload.length};
deframer.deframe(buffer(Bytes.concat(header, payload)));
verify(listener).messagesAvailable(producer.capture());
assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
}
@Test
public void zeroLengthCompressibleMessageIsNotCompressed() {
framer.setCompressor(new Codec.Gzip());
framer.setMessageCompression(true);
writeKnownLength(framer, new byte[]{});
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true, 1);
checkStats(0, 0);
}