下面列出了怎么用io.grpc.Decompressor的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void setDecompressorRegistryTest() {
DecompressorRegistry decompressor =
DecompressorRegistry.emptyInstance().with(new Decompressor() {
@Override
public String getMessageEncoding() {
return "some-encoding";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return is;
}
}, true);
forward.setDecompressorRegistry(decompressor);
verify(mock).setDecompressorRegistry(same(decompressor));
}
@Test
public void setDecompressorRegistryTest() {
DecompressorRegistry decompressor =
DecompressorRegistry.emptyInstance().with(new Decompressor() {
@Override
public String getMessageEncoding() {
return "some-encoding";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return is;
}
}, true);
forward.setDecompressorRegistry(decompressor);
verify(mock).setDecompressorRegistry(same(decompressor));
}
/**
* Create a deframer.
*
* @param listener listener for deframer events.
* @param decompressor the compression used if a compressed frame is encountered, with
* {@code NONE} meaning unsupported
* @param maxMessageSize the maximum allowed size for received messages.
*/
public MessageDeframer(
Listener listener,
Decompressor decompressor,
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
this.listener = checkNotNull(listener, "sink");
this.decompressor = checkNotNull(decompressor, "decompressor");
this.maxInboundMessageSize = maxMessageSize;
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");
}
/**
* Create a deframer.
*
* @param listener listener for deframer events.
* @param decompressor the compression used if a compressed frame is encountered, with
* {@code NONE} meaning unsupported
* @param maxMessageSize the maximum allowed size for received messages.
*/
public MessageDeframer(
Listener listener,
Decompressor decompressor,
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
this.listener = checkNotNull(listener, "sink");
this.decompressor = checkNotNull(decompressor, "decompressor");
this.maxInboundMessageSize = maxMessageSize;
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");
}
@Override
public void setDecompressor(Decompressor decompressor) {}
@Override
public final void setDecompressor(Decompressor decompressor) {
transportState().setDecompressor(Preconditions.checkNotNull(decompressor, "decompressor"));
}
/**
* Called by transport implementations when they receive headers.
*
* @param headers the parsed headers
*/
protected void inboundHeadersReceived(Metadata headers) {
checkState(!statusReported, "Received headers on closed stream");
statsTraceCtx.clientInboundHeaders();
boolean compressedStream = false;
String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
if (fullStreamDecompression && streamEncoding != null) {
if (streamEncoding.equalsIgnoreCase("gzip")) {
setFullStreamDecompressor(new GzipInflatingBuffer());
compressedStream = true;
} else if (!streamEncoding.equalsIgnoreCase("identity")) {
deframeFailed(
Status.INTERNAL
.withDescription(
String.format("Can't find full stream decompressor for %s", streamEncoding))
.asRuntimeException());
return;
}
}
String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
if (messageEncoding != null) {
Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
if (decompressor == null) {
deframeFailed(
Status.INTERNAL
.withDescription(String.format("Can't find decompressor for %s", messageEncoding))
.asRuntimeException());
return;
} else if (decompressor != Codec.Identity.NONE) {
if (compressedStream) {
deframeFailed(
Status.INTERNAL
.withDescription(
String.format("Full stream and gRPC message encoding cannot both be set"))
.asRuntimeException());
return;
}
setDecompressor(decompressor);
}
}
listener().headersRead(headers);
}
@Override
public void setDecompressor(Decompressor decompressor) {
deframer.setDecompressor(decompressor);
}
protected final void setDecompressor(Decompressor decompressor) {
deframer.setDecompressor(decompressor);
}
@Override
public void setDecompressor(Decompressor decompressor) {
checkState(fullStreamDecompressor == null, "Already set full stream decompressor");
this.decompressor = checkNotNull(decompressor, "Can't pass an empty decompressor");
}
@Override
public void onNext(HttpObject obj) {
if (cancelled) {
return;
}
if (obj instanceof HttpHeaders) {
// Only clients will see headers from a stream. It doesn't hurt to share this logic between server
// and client though as everything else is identical.
final HttpHeaders headers = (HttpHeaders) obj;
if (!sawLeadingHeaders) {
final String statusText = headers.get(HttpHeaderNames.STATUS);
if (statusText == null) {
// Not allowed to have empty leading headers, kill the stream hard.
transportStatusListener.transportReportStatus(
Status.INTERNAL.withDescription("Missing HTTP status code"));
return;
}
if (ArmeriaHttpUtil.isInformational(statusText)) {
// Skip informational headers.
return;
}
sawLeadingHeaders = true;
final HttpStatus status = HttpStatus.valueOf(statusText);
if (!status.equals(HttpStatus.OK)) {
transportStatusListener.transportReportStatus(
GrpcStatus.httpStatusToGrpcStatus(status.code()));
return;
}
}
final String grpcStatus = headers.get(GrpcHeaderNames.GRPC_STATUS);
if (grpcStatus != null) {
GrpcStatus.reportStatus(headers, this, transportStatusListener);
}
// Headers without grpc-status are the leading headers of a non-failing response, prepare to receive
// messages.
final String grpcEncoding = headers.get(GrpcHeaderNames.GRPC_ENCODING);
if (grpcEncoding != null) {
final Decompressor decompressor = decompressorRegistry.lookupDecompressor(grpcEncoding);
if (decompressor == null) {
transportStatusListener.transportReportStatus(Status.INTERNAL.withDescription(
"Can't find decompressor for " + grpcEncoding));
return;
}
try {
deframer.decompressor(ForwardingDecompressor.forGrpc(decompressor));
} catch (Throwable t) {
transportStatusListener.transportReportStatus(GrpcStatus.fromThrowable(t));
return;
}
}
requestHttpFrame();
return;
}
final HttpData data = (HttpData) obj;
try {
deframer.deframe(data, false);
} catch (Throwable cause) {
try {
transportStatusListener.transportReportStatus(GrpcStatus.fromThrowable(cause));
return;
} finally {
deframer.close();
}
}
requestHttpFrame();
}
@Override
public void setDecompressor(Decompressor decompressor) {}
@Override
public final void setDecompressor(Decompressor decompressor) {
transportState().setDecompressor(Preconditions.checkNotNull(decompressor, "decompressor"));
}
@Override
public void setDecompressor(Decompressor decompressor) {
deframer.setDecompressor(decompressor);
}
/**
* Called by transport implementations when they receive headers.
*
* @param headers the parsed headers
*/
protected void inboundHeadersReceived(Metadata headers) {
checkState(!statusReported, "Received headers on closed stream");
statsTraceCtx.clientInboundHeaders();
boolean compressedStream = false;
String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
if (fullStreamDecompression && streamEncoding != null) {
if (streamEncoding.equalsIgnoreCase("gzip")) {
setFullStreamDecompressor(new GzipInflatingBuffer());
compressedStream = true;
} else if (!streamEncoding.equalsIgnoreCase("identity")) {
deframeFailed(
Status.INTERNAL
.withDescription(
String.format("Can't find full stream decompressor for %s", streamEncoding))
.asRuntimeException());
return;
}
}
String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
if (messageEncoding != null) {
Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
if (decompressor == null) {
deframeFailed(
Status.INTERNAL
.withDescription(String.format("Can't find decompressor for %s", messageEncoding))
.asRuntimeException());
return;
} else if (decompressor != Codec.Identity.NONE) {
if (compressedStream) {
deframeFailed(
Status.INTERNAL
.withDescription(
String.format("Full stream and gRPC message encoding cannot both be set"))
.asRuntimeException());
return;
}
setDecompressor(decompressor);
}
}
listener().headersRead(headers);
}
@Override
public void setDecompressor(Decompressor decompressor) {
deframer.setDecompressor(decompressor);
}
protected final void setDecompressor(Decompressor decompressor) {
deframer.setDecompressor(decompressor);
}
@Override
public void setDecompressor(Decompressor decompressor) {
checkState(fullStreamDecompressor == null, "Already set full stream decompressor");
this.decompressor = checkNotNull(decompressor, "Can't pass an empty decompressor");
}
/**
* Sets the decompressor on the deframer. If the transport does not support compression, this may
* do nothing.
*
* @param decompressor the decompressor to use.
*/
void setDecompressor(Decompressor decompressor);
/**
* Sets the decompressor available to use. The message encoding for the stream comes later in
* time, and thus will not be available at the time of construction. This should only be set once,
* since the compression codec cannot change after the headers have been sent.
*
* @param decompressor the decompressing wrapper.
*/
void setDecompressor(Decompressor decompressor);
/**
* Sets the decompressor on the deframer. If the transport does not support compression, this may
* do nothing.
*
* @param decompressor the decompressor to use.
*/
void setDecompressor(Decompressor decompressor);
/**
* Sets the decompressor available to use. The message encoding for the stream comes later in
* time, and thus will not be available at the time of construction. This should only be set once,
* since the compression codec cannot change after the headers have been sent.
*
* @param decompressor the decompressing wrapper.
*/
void setDecompressor(Decompressor decompressor);