下面列出了怎么用 io.netty.handler.codec.compression.ZlibWrapper 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(), FactorialClient.HOST, FactorialClient.PORT));
}
// Enable stream compression (you can remove these two if unnecessary)
pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
// Add the number codec first,
pipeline.addLast(new BigIntegerDecoder());
pipeline.addLast(new NumberEncoder());
// and then business logic.
pipeline.addLast(new FactorialClientHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
// Enable stream compression (you can remove these two if unnecessary)
pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
// Add the number codec first,
pipeline.addLast(new BigIntegerDecoder());
pipeline.addLast(new NumberEncoder());
// and then business logic.
// Please note we create a handler for every new channel
// because it has stateful properties.
pipeline.addLast(new FactorialServerHandler());
}
@Override
protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception {
if (GZIP.contentEqualsIgnoreCase(contentEncoding) ||
X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
}
if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) ||
X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.严格来说,“deflate”是指ZLIB,但是有些服务器没有正确实现。
return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(wrapper));
}
// 'identity' or unsupported“身份”或不受支持的
return null;
}
@Test
public void testCompressedEmptyFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibEncoder(ZlibWrapper.NONE, 9, 15, 8));
EmbeddedChannel decoderChannel = new EmbeddedChannel(new PerFrameDeflateDecoder(false));
encoderChannel.writeOutbound(Unpooled.EMPTY_BUFFER);
ByteBuf compressedPayload = encoderChannel.readOutbound();
BinaryWebSocketFrame compressedFrame =
new BinaryWebSocketFrame(true, WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedPayload);
// execute
decoderChannel.writeInbound(compressedFrame);
BinaryWebSocketFrame uncompressedFrame = decoderChannel.readInbound();
// test
assertNotNull(uncompressedFrame);
assertNotNull(uncompressedFrame.content());
assertTrue(uncompressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV3, uncompressedFrame.rsv());
assertEquals(0, uncompressedFrame.content().readableBytes());
uncompressedFrame.release();
}
/**
* Returns a new {@link EmbeddedChannel} that decodes the HTTP2 message content encoded in the specified
* {@code contentEncoding}.
*
* @param contentEncoding the value of the {@code content-encoding} header
* @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
* (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
* @throws Http2Exception If the specified encoding is not not supported and warrants an exception
*/
protected EmbeddedChannel newContentDecompressor(final ChannelHandlerContext ctx, CharSequence contentEncoding)
throws Http2Exception {
if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
}
if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(wrapper));
}
// 'identity' or unsupported
return null;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(), FactorialClient.HOST, FactorialClient.PORT));
}
// Enable stream compression (you can remove these two if unnecessary)
pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
// Add the number codec first,
pipeline.addLast(new BigIntegerDecoder());
pipeline.addLast(new NumberEncoder());
// and then business logic.
pipeline.addLast(new FactorialClientHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
// Enable stream compression (you can remove these two if unnecessary)
pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
// Add the number codec first,
pipeline.addLast(new BigIntegerDecoder());
pipeline.addLast(new NumberEncoder());
// and then business logic.
// Please note we create a handler for every new channel
// because it has stateful properties.
pipeline.addLast(new FactorialServerHandler());
}
@Override
protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception {
if ("gzip".equalsIgnoreCase(contentEncoding) || "x-gzip".equalsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
}
if ("deflate".equalsIgnoreCase(contentEncoding) || "x-deflate".equalsIgnoreCase(contentEncoding)) {
ZlibWrapper wrapper;
if (strict) {
wrapper = ZlibWrapper.ZLIB;
} else {
wrapper = ZlibWrapper.ZLIB_OR_NONE;
}
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(wrapper));
}
// 'identity' or unsupported
return null;
}
private void enableGzip(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
p.addLast("unificationB", new PortUnificationServerHandler(sslCtx, detectSsl, false));
p.remove(this);
}
@Override
protected Result beginEncode(HttpResponse headers, String acceptEncoding) throws Exception {
String contentEncoding = headers.headers().get(HttpHeaderNames.CONTENT_ENCODING);
if (contentEncoding != null) {
// Content-Encoding was set, either as something specific or as the IDENTITY encoding
// Therefore, we should NOT encode here// Content-Encoding被设置为特定的内容编码或标识编码
//因此,我们不应该在这里编码
return null;
}
ZlibWrapper wrapper = determineWrapper(acceptEncoding);
if (wrapper == null) {
return null;
}
String targetContentEncoding;
switch (wrapper) {
case GZIP:
targetContentEncoding = "gzip";
break;
case ZLIB:
targetContentEncoding = "deflate";
break;
default:
throw new Error();
}
return new Result(
targetContentEncoding,
new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(
wrapper, compressionLevel, windowBits, memLevel)));
}
@Test
public void testCompressedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(new PerMessageDeflateEncoder(9, 15, false));
EmbeddedChannel decoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibDecoder(ZlibWrapper.NONE));
// initialize
byte[] payload = new byte[300];
random.nextBytes(payload);
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(true,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload));
// execute
encoderChannel.writeOutbound(frame);
BinaryWebSocketFrame compressedFrame = encoderChannel.readOutbound();
// test
assertNotNull(compressedFrame);
assertNotNull(compressedFrame.content());
assertTrue(compressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame.rsv());
decoderChannel.writeInbound(compressedFrame.content());
decoderChannel.writeInbound(DeflateDecoder.FRAME_TAIL);
ByteBuf uncompressedPayload = decoderChannel.readInbound();
assertEquals(300, uncompressedPayload.readableBytes());
byte[] finalPayload = new byte[300];
uncompressedPayload.readBytes(finalPayload);
assertTrue(Arrays.equals(finalPayload, payload));
uncompressedPayload.release();
}
@Test
public void testCompressedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibEncoder(ZlibWrapper.NONE, 9, 15, 8));
EmbeddedChannel decoderChannel = new EmbeddedChannel(new PerFrameDeflateDecoder(false));
// initialize
byte[] payload = new byte[300];
random.nextBytes(payload);
encoderChannel.writeOutbound(Unpooled.wrappedBuffer(payload));
ByteBuf compressedPayload = encoderChannel.readOutbound();
BinaryWebSocketFrame compressedFrame = new BinaryWebSocketFrame(true,
WebSocketExtension.RSV1 | WebSocketExtension.RSV3,
compressedPayload.slice(0, compressedPayload.readableBytes() - 4));
// execute
decoderChannel.writeInbound(compressedFrame);
BinaryWebSocketFrame uncompressedFrame = decoderChannel.readInbound();
// test
assertNotNull(uncompressedFrame);
assertNotNull(uncompressedFrame.content());
assertTrue(uncompressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV3, uncompressedFrame.rsv());
assertEquals(300, uncompressedFrame.content().readableBytes());
byte[] finalPayload = new byte[300];
uncompressedFrame.content().readBytes(finalPayload);
assertTrue(Arrays.equals(finalPayload, payload));
uncompressedFrame.release();
}
@Test
public void testCompressedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibEncoder(ZlibWrapper.NONE, 9, 15, 8));
EmbeddedChannel decoderChannel = new EmbeddedChannel(new PerMessageDeflateDecoder(false));
// initialize
byte[] payload = new byte[300];
random.nextBytes(payload);
encoderChannel.writeOutbound(Unpooled.wrappedBuffer(payload));
ByteBuf compressedPayload = encoderChannel.readOutbound();
BinaryWebSocketFrame compressedFrame = new BinaryWebSocketFrame(true,
WebSocketExtension.RSV1 | WebSocketExtension.RSV3,
compressedPayload.slice(0, compressedPayload.readableBytes() - 4));
// execute
decoderChannel.writeInbound(compressedFrame);
BinaryWebSocketFrame uncompressedFrame = decoderChannel.readInbound();
// test
assertNotNull(uncompressedFrame);
assertNotNull(uncompressedFrame.content());
assertTrue(uncompressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV3, uncompressedFrame.rsv());
assertEquals(300, uncompressedFrame.content().readableBytes());
byte[] finalPayload = new byte[300];
uncompressedFrame.content().readBytes(finalPayload);
assertTrue(Arrays.equals(finalPayload, payload));
uncompressedFrame.release();
}
@Test
public void testCompressedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(new PerFrameDeflateEncoder(9, 15, false));
EmbeddedChannel decoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibDecoder(ZlibWrapper.NONE));
// initialize
byte[] payload = new byte[300];
random.nextBytes(payload);
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(true,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload));
// execute
encoderChannel.writeOutbound(frame);
BinaryWebSocketFrame compressedFrame = encoderChannel.readOutbound();
// test
assertNotNull(compressedFrame);
assertNotNull(compressedFrame.content());
assertTrue(compressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame.rsv());
decoderChannel.writeInbound(compressedFrame.content());
decoderChannel.writeInbound(DeflateDecoder.FRAME_TAIL);
ByteBuf uncompressedPayload = decoderChannel.readInbound();
assertEquals(300, uncompressedPayload.readableBytes());
byte[] finalPayload = new byte[300];
uncompressedPayload.readBytes(finalPayload);
assertTrue(Arrays.equals(finalPayload, payload));
uncompressedPayload.release();
}
@Test
public void testGetTargetContentEncoding() throws Exception {
HttpContentCompressor compressor = new HttpContentCompressor();
String[] tests = {
// Accept-Encoding -> Content-Encoding
"", null,
"*", "gzip",
"*;q=0.0", null,
"gzip", "gzip",
"compress, gzip;q=0.5", "gzip",
"gzip; q=0.5, identity", "gzip",
"gzip ; q=0.1", "gzip",
"gzip; q=0, deflate", "deflate",
" deflate ; q=0 , *;q=0.5", "gzip",
};
for (int i = 0; i < tests.length; i += 2) {
String acceptEncoding = tests[i];
String contentEncoding = tests[i + 1];
ZlibWrapper targetWrapper = compressor.determineWrapper(acceptEncoding);
String targetEncoding = null;
if (targetWrapper != null) {
switch (targetWrapper) {
case GZIP:
targetEncoding = "gzip";
break;
case ZLIB:
targetEncoding = "deflate";
break;
default:
fail();
}
}
assertEquals(contentEncoding, targetEncoding);
}
}
/**
* Returns a new {@link EmbeddedChannel} that encodes the HTTP2 message content encoded in the specified
* {@code contentEncoding}.
*
* @param ctx the context.
* @param contentEncoding the value of the {@code content-encoding} header
* @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
* (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
* @throws Http2Exception If the specified encoding is not not supported and warrants an exception
*/
protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
throws Http2Exception {
if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
return newCompressionChannel(ctx, ZlibWrapper.GZIP);
}
if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
}
// 'identity' or unsupported
return null;
}
private void enableGzip(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
p.addLast("unificationB", new PortUnificationServerHandler(sslCtx, detectSsl, false));
p.remove(this);
}
private void enableGzip(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
p.addLast("unificationB", new PortUnificationServerHandler(sslCtx, detectSsl, false));
p.remove(this);
}
@Override
protected Result beginEncode(HttpResponse headers, String acceptEncoding) throws Exception {
String contentEncoding = headers.headers().get(HttpHeaders.Names.CONTENT_ENCODING);
if (contentEncoding != null &&
!HttpHeaders.Values.IDENTITY.equalsIgnoreCase(contentEncoding)) {
return null;
}
ZlibWrapper wrapper = determineWrapper(acceptEncoding);
if (wrapper == null) {
return null;
}
String targetContentEncoding;
switch (wrapper) {
case GZIP:
targetContentEncoding = "gzip";
break;
case ZLIB:
targetContentEncoding = "deflate";
break;
default:
throw new Error();
}
return new Result(
targetContentEncoding,
new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(
wrapper, compressionLevel, windowBits, memLevel)));
}
@Test
public void testGetTargetContentEncoding() throws Exception {
HttpContentCompressor compressor = new HttpContentCompressor();
String[] tests = {
// Accept-Encoding -> Content-Encoding
"", null,
"*", "gzip",
"*;q=0.0", null,
"gzip", "gzip",
"compress, gzip;q=0.5", "gzip",
"gzip; q=0.5, identity", "gzip",
"gzip ; q=0.1", "gzip",
"gzip; q=0, deflate", "deflate",
" deflate ; q=0 , *;q=0.5", "gzip",
};
for (int i = 0; i < tests.length; i += 2) {
String acceptEncoding = tests[i];
String contentEncoding = tests[i + 1];
ZlibWrapper targetWrapper = compressor.determineWrapper(acceptEncoding);
String targetEncoding = null;
if (targetWrapper != null) {
switch (targetWrapper) {
case GZIP:
targetEncoding = "gzip";
break;
case ZLIB:
targetEncoding = "deflate";
break;
default:
fail();
}
}
assertEquals(contentEncoding, targetEncoding);
}
}
@SuppressWarnings("rawtypes")
public RemoteRxServer(Builder builder) {
port = builder.getPort();
// setup configuration state for server
Map<String, ServeConfig> configuredObservables = new HashMap<String, ServeConfig>();
// add configs
for (ServeConfig config : builder.getObservablesConfigured()) {
String observableName = config.getName();
logger.debug("RemoteRxServer configured with remote observable: " + observableName);
configuredObservables.put(observableName, config);
}
metrics = new RxMetrics();
// create server
RxServer<RemoteRxEvent, List<RemoteRxEvent>> server
= RxNetty.newTcpServerBuilder(port, new RemoteObservableConnectionHandler(configuredObservables, builder.getIngressPolicy(),
metrics, writeBufferTimeMSec))
.pipelineConfigurator(new PipelineConfiguratorComposite<RemoteRxEvent, List<RemoteRxEvent>>(
new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>() {
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
if (enableNettyLogging) {
pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
}
if (enableHeartBeating) {
pipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
pipeline.addLast("heartbeat", new HeartbeatHandler());
}
if (enableCompression) {
pipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 4)); // max frame = half MB
}
}, new BatchedRxEventPipelineConfigurator()))
.channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 5 * 1024 * 1024))
.build();
this.server = server;
logger.info("RemoteRxServer started on port: " + port);
}
private static <T> Observable<T> createTcpConnectionToServer(final ConnectToObservable<T> params,
final RemoteUnsubscribe remoteUnsubscribe, final RxMetrics metrics,
final Action0 connectionDisconnectCallback, Observable<Integer> closeTrigger) {
final Decoder<T> decoder = params.getDecoder();
loadFastProperties();
return
RxNetty.createTcpClient(params.getHost(), params.getPort(), new PipelineConfiguratorComposite<RemoteRxEvent, List<RemoteRxEvent>>(
new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>() {
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
if (enableNettyLogging) {
pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
}
if (enableHeartBeating) {
pipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
pipeline.addLast("heartbeat", new HeartbeatHandler());
}
if (enableCompression) {
pipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 4)); // max frame = half MB
}
}, new BatchedRxEventPipelineConfigurator()))
.connect()
// send subscription request, get input stream
.flatMap(new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>() {
@Override
public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection) {
connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters())); // send subscribe event to server
remoteUnsubscribe.setConnection(connection);
return connection.getInput()
.lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer"));
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
// connection completed
logger.warn("Detected connection completed when trying to connect to host: " + params.getHost() + " port: " + params.getPort());
connectionDisconnectCallback.call();
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<RemoteRxEvent>>() {
@Override
public Observable<RemoteRxEvent> call(Throwable t1) {
logger.warn("Detected connection error when trying to connect to host: " + params.getHost() + " port: " + params.getPort(), t1);
connectionDisconnectCallback.call();
// complete if error occurs
return Observable.empty();
}
})
.takeUntil(closeTrigger)
.map(new Func1<RemoteRxEvent, Notification<T>>() {
@Override
public Notification<T> call(RemoteRxEvent rxEvent) {
if (rxEvent.getType() == RemoteRxEvent.Type.next) {
metrics.incrementNextCount();
return Notification.createOnNext(decoder.decode(rxEvent.getData()));
} else if (rxEvent.getType() == RemoteRxEvent.Type.error) {
metrics.incrementErrorCount();
return Notification.createOnError(fromBytesToThrowable(rxEvent.getData()));
} else if (rxEvent.getType() == RemoteRxEvent.Type.completed) {
metrics.incrementCompletedCount();
return Notification.createOnCompleted();
} else {
throw new RuntimeException("RemoteRxEvent of type: " + rxEvent.getType() + ", not supported.");
}
}
})
.<T>dematerialize()
.doOnEach(new Observer<T>() {
@Override
public void onCompleted() {
logger.info("RemoteRxEvent: " + params.getName() + " onCompleted()");
}
@Override
public void onError(Throwable e) {
logger.error("RemoteRxEvent: " + params.getName() + " onError()", e);
}
@Override
public void onNext(T t) {
if (logger.isDebugEnabled()) {
logger.debug("RemoteRxEvent: " + params.getName() + " onNext(): " + t);
}
}
});
}
static boolean isCompressionEnabled(String acceptEncoding) {
if (acceptEncoding == null) {
return false;
}
return instance.determineWrapper(acceptEncoding) != ZlibWrapper.NONE;
}
@SuppressWarnings("FloatingPointEquality")
protected ZlibWrapper determineWrapper(String acceptEncoding) {
float starQ = -1.0f;
float gzipQ = -1.0f;
float deflateQ = -1.0f;
for (String encoding : acceptEncoding.split(",")) {
float q = 1.0f;
int equalsPos = encoding.indexOf('=');
if (equalsPos != -1) {
try {
q = Float.parseFloat(encoding.substring(equalsPos + 1));
} catch (NumberFormatException e) {
// Ignore encoding
q = 0.0f;
}
}
if (encoding.contains("*")) {
starQ = q;
} else if (encoding.contains("gzip") && q > gzipQ) {
gzipQ = q;
} else if (encoding.contains("deflate") && q > deflateQ) {
deflateQ = q;
}
}
if (gzipQ > 0.0f || deflateQ > 0.0f) {
if (gzipQ >= deflateQ) {
return ZlibWrapper.GZIP;
} else {
return ZlibWrapper.ZLIB;
}
}
if (starQ > 0.0f) {
if (gzipQ == -1.0f) {
return ZlibWrapper.GZIP;
}
if (deflateQ == -1.0f) {
return ZlibWrapper.ZLIB;
}
}
return null;
}
@Test
public void testFramementedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(new PerMessageDeflateEncoder(9, 15, false));
EmbeddedChannel decoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibDecoder(ZlibWrapper.NONE));
// initialize
byte[] payload1 = new byte[100];
random.nextBytes(payload1);
byte[] payload2 = new byte[100];
random.nextBytes(payload2);
byte[] payload3 = new byte[100];
random.nextBytes(payload3);
BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload1));
ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(false,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload2));
ContinuationWebSocketFrame frame3 = new ContinuationWebSocketFrame(true,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload3));
// execute
encoderChannel.writeOutbound(frame1);
encoderChannel.writeOutbound(frame2);
encoderChannel.writeOutbound(frame3);
BinaryWebSocketFrame compressedFrame1 = encoderChannel.readOutbound();
ContinuationWebSocketFrame compressedFrame2 = encoderChannel.readOutbound();
ContinuationWebSocketFrame compressedFrame3 = encoderChannel.readOutbound();
// test
assertNotNull(compressedFrame1);
assertNotNull(compressedFrame2);
assertNotNull(compressedFrame3);
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame1.rsv());
assertEquals(WebSocketExtension.RSV3, compressedFrame2.rsv());
assertEquals(WebSocketExtension.RSV3, compressedFrame3.rsv());
assertFalse(compressedFrame1.isFinalFragment());
assertFalse(compressedFrame2.isFinalFragment());
assertTrue(compressedFrame3.isFinalFragment());
decoderChannel.writeInbound(compressedFrame1.content());
ByteBuf uncompressedPayload1 = decoderChannel.readInbound();
byte[] finalPayload1 = new byte[100];
uncompressedPayload1.readBytes(finalPayload1);
assertTrue(Arrays.equals(finalPayload1, payload1));
uncompressedPayload1.release();
decoderChannel.writeInbound(compressedFrame2.content());
ByteBuf uncompressedPayload2 = decoderChannel.readInbound();
byte[] finalPayload2 = new byte[100];
uncompressedPayload2.readBytes(finalPayload2);
assertTrue(Arrays.equals(finalPayload2, payload2));
uncompressedPayload2.release();
decoderChannel.writeInbound(compressedFrame3.content());
decoderChannel.writeInbound(DeflateDecoder.FRAME_TAIL);
ByteBuf uncompressedPayload3 = decoderChannel.readInbound();
byte[] finalPayload3 = new byte[100];
uncompressedPayload3.readBytes(finalPayload3);
assertTrue(Arrays.equals(finalPayload3, payload3));
uncompressedPayload3.release();
}
@Test
public void testFramementedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibEncoder(ZlibWrapper.NONE, 9, 15, 8));
EmbeddedChannel decoderChannel = new EmbeddedChannel(new PerMessageDeflateDecoder(false));
// initialize
byte[] payload = new byte[300];
random.nextBytes(payload);
encoderChannel.writeOutbound(Unpooled.wrappedBuffer(payload));
ByteBuf compressedPayload = encoderChannel.readOutbound();
compressedPayload = compressedPayload.slice(0, compressedPayload.readableBytes() - 4);
int oneThird = compressedPayload.readableBytes() / 3;
BinaryWebSocketFrame compressedFrame1 = new BinaryWebSocketFrame(false,
WebSocketExtension.RSV1 | WebSocketExtension.RSV3,
compressedPayload.slice(0, oneThird));
ContinuationWebSocketFrame compressedFrame2 = new ContinuationWebSocketFrame(false,
WebSocketExtension.RSV3, compressedPayload.slice(oneThird, oneThird));
ContinuationWebSocketFrame compressedFrame3 = new ContinuationWebSocketFrame(true,
WebSocketExtension.RSV3, compressedPayload.slice(oneThird * 2,
compressedPayload.readableBytes() - oneThird * 2));
// execute
decoderChannel.writeInbound(compressedFrame1.retain());
decoderChannel.writeInbound(compressedFrame2.retain());
decoderChannel.writeInbound(compressedFrame3);
BinaryWebSocketFrame uncompressedFrame1 = decoderChannel.readInbound();
ContinuationWebSocketFrame uncompressedFrame2 = decoderChannel.readInbound();
ContinuationWebSocketFrame uncompressedFrame3 = decoderChannel.readInbound();
// test
assertNotNull(uncompressedFrame1);
assertNotNull(uncompressedFrame2);
assertNotNull(uncompressedFrame3);
assertEquals(WebSocketExtension.RSV3, uncompressedFrame1.rsv());
assertEquals(WebSocketExtension.RSV3, uncompressedFrame2.rsv());
assertEquals(WebSocketExtension.RSV3, uncompressedFrame3.rsv());
ByteBuf finalPayloadWrapped = Unpooled.wrappedBuffer(uncompressedFrame1.content(),
uncompressedFrame2.content(), uncompressedFrame3.content());
assertEquals(300, finalPayloadWrapped.readableBytes());
byte[] finalPayload = new byte[300];
finalPayloadWrapped.readBytes(finalPayload);
assertTrue(Arrays.equals(finalPayload, payload));
finalPayloadWrapped.release();
}
@Test
public void testMultiCompressedPayloadWithinFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibEncoder(ZlibWrapper.NONE, 9, 15, 8));
EmbeddedChannel decoderChannel = new EmbeddedChannel(new PerMessageDeflateDecoder(false));
// initialize
byte[] payload1 = new byte[100];
random.nextBytes(payload1);
byte[] payload2 = new byte[100];
random.nextBytes(payload2);
encoderChannel.writeOutbound(Unpooled.wrappedBuffer(payload1));
ByteBuf compressedPayload1 = encoderChannel.readOutbound();
encoderChannel.writeOutbound(Unpooled.wrappedBuffer(payload2));
ByteBuf compressedPayload2 = encoderChannel.readOutbound();
BinaryWebSocketFrame compressedFrame = new BinaryWebSocketFrame(true,
WebSocketExtension.RSV1 | WebSocketExtension.RSV3,
Unpooled.wrappedBuffer(
compressedPayload1,
compressedPayload2.slice(0, compressedPayload2.readableBytes() - 4)));
// execute
decoderChannel.writeInbound(compressedFrame);
BinaryWebSocketFrame uncompressedFrame = decoderChannel.readInbound();
// test
assertNotNull(uncompressedFrame);
assertNotNull(uncompressedFrame.content());
assertTrue(uncompressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV3, uncompressedFrame.rsv());
assertEquals(200, uncompressedFrame.content().readableBytes());
byte[] finalPayload1 = new byte[100];
uncompressedFrame.content().readBytes(finalPayload1);
assertTrue(Arrays.equals(finalPayload1, payload1));
byte[] finalPayload2 = new byte[100];
uncompressedFrame.content().readBytes(finalPayload2);
assertTrue(Arrays.equals(finalPayload2, payload2));
uncompressedFrame.release();
}
@Test
public void testFramementedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(new PerFrameDeflateEncoder(9, 15, false));
EmbeddedChannel decoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibDecoder(ZlibWrapper.NONE));
// initialize
byte[] payload1 = new byte[100];
random.nextBytes(payload1);
byte[] payload2 = new byte[100];
random.nextBytes(payload2);
byte[] payload3 = new byte[100];
random.nextBytes(payload3);
BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload1));
ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(false,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload2));
ContinuationWebSocketFrame frame3 = new ContinuationWebSocketFrame(true,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload3));
// execute
encoderChannel.writeOutbound(frame1);
encoderChannel.writeOutbound(frame2);
encoderChannel.writeOutbound(frame3);
BinaryWebSocketFrame compressedFrame1 = encoderChannel.readOutbound();
ContinuationWebSocketFrame compressedFrame2 = encoderChannel.readOutbound();
ContinuationWebSocketFrame compressedFrame3 = encoderChannel.readOutbound();
// test
assertNotNull(compressedFrame1);
assertNotNull(compressedFrame2);
assertNotNull(compressedFrame3);
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame1.rsv());
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame2.rsv());
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame3.rsv());
assertFalse(compressedFrame1.isFinalFragment());
assertFalse(compressedFrame2.isFinalFragment());
assertTrue(compressedFrame3.isFinalFragment());
decoderChannel.writeInbound(compressedFrame1.content());
decoderChannel.writeInbound(Unpooled.wrappedBuffer(DeflateDecoder.FRAME_TAIL));
ByteBuf uncompressedPayload1 = decoderChannel.readInbound();
byte[] finalPayload1 = new byte[100];
uncompressedPayload1.readBytes(finalPayload1);
assertTrue(Arrays.equals(finalPayload1, payload1));
uncompressedPayload1.release();
decoderChannel.writeInbound(compressedFrame2.content());
decoderChannel.writeInbound(Unpooled.wrappedBuffer(DeflateDecoder.FRAME_TAIL));
ByteBuf uncompressedPayload2 = decoderChannel.readInbound();
byte[] finalPayload2 = new byte[100];
uncompressedPayload2.readBytes(finalPayload2);
assertTrue(Arrays.equals(finalPayload2, payload2));
uncompressedPayload2.release();
decoderChannel.writeInbound(compressedFrame3.content());
decoderChannel.writeInbound(Unpooled.wrappedBuffer(DeflateDecoder.FRAME_TAIL));
ByteBuf uncompressedPayload3 = decoderChannel.readInbound();
byte[] finalPayload3 = new byte[100];
uncompressedPayload3.readBytes(finalPayload3);
assertTrue(Arrays.equals(finalPayload3, payload3));
uncompressedPayload3.release();
}
/**
* Generate a new instance of an {@link EmbeddedChannel} capable of compressing data
* @param ctx the context.
* @param wrapper Defines what type of encoder should be used
*/
private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
memLevel));
}
@Substitute
public static ZlibEncoder newZlibEncoder(ZlibWrapper wrapper) {
return new JdkZlibEncoder(wrapper);
}