下面列出了怎么用 io.netty.handler.codec.compression.JdkZlibEncoder 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void setCompressionEnabled(boolean compressionEnabled) {
if (isClosed()) return;
super.setCompressionEnabled(compressionEnabled);
channel.pipeline().replace("compressionEncoder", "compressionEncoder",
compressionEnabled ? new JdkZlibEncoder() : new ChannelOutboundHandlerAdapter());
channel.pipeline().replace("compressionDecoder", "compressionDecoder",
compressionEnabled ? new JdkZlibDecoder() : new ChannelInboundHandlerAdapter());
}
@SuppressWarnings("rawtypes")
public RemoteRxServer(Builder builder) {
port = builder.getPort();
// setup configuration state for server
Map<String, ServeConfig> configuredObservables = new HashMap<String, ServeConfig>();
// add configs
for (ServeConfig config : builder.getObservablesConfigured()) {
String observableName = config.getName();
logger.debug("RemoteRxServer configured with remote observable: " + observableName);
configuredObservables.put(observableName, config);
}
metrics = new RxMetrics();
// create server
RxServer<RemoteRxEvent, List<RemoteRxEvent>> server
= RxNetty.newTcpServerBuilder(port, new RemoteObservableConnectionHandler(configuredObservables, builder.getIngressPolicy(),
metrics, writeBufferTimeMSec))
.pipelineConfigurator(new PipelineConfiguratorComposite<RemoteRxEvent, List<RemoteRxEvent>>(
new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>() {
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
if (enableNettyLogging) {
pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
}
if (enableHeartBeating) {
pipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
pipeline.addLast("heartbeat", new HeartbeatHandler());
}
if (enableCompression) {
pipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 4)); // max frame = half MB
}
}, new BatchedRxEventPipelineConfigurator()))
.channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 5 * 1024 * 1024))
.build();
this.server = server;
logger.info("RemoteRxServer started on port: " + port);
}
private static <T> Observable<T> createTcpConnectionToServer(final ConnectToObservable<T> params,
final RemoteUnsubscribe remoteUnsubscribe, final RxMetrics metrics,
final Action0 connectionDisconnectCallback, Observable<Integer> closeTrigger) {
final Decoder<T> decoder = params.getDecoder();
loadFastProperties();
return
RxNetty.createTcpClient(params.getHost(), params.getPort(), new PipelineConfiguratorComposite<RemoteRxEvent, List<RemoteRxEvent>>(
new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>() {
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
if (enableNettyLogging) {
pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
}
if (enableHeartBeating) {
pipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
pipeline.addLast("heartbeat", new HeartbeatHandler());
}
if (enableCompression) {
pipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 4)); // max frame = half MB
}
}, new BatchedRxEventPipelineConfigurator()))
.connect()
// send subscription request, get input stream
.flatMap(new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>() {
@Override
public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection) {
connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters())); // send subscribe event to server
remoteUnsubscribe.setConnection(connection);
return connection.getInput()
.lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer"));
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
// connection completed
logger.warn("Detected connection completed when trying to connect to host: " + params.getHost() + " port: " + params.getPort());
connectionDisconnectCallback.call();
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<RemoteRxEvent>>() {
@Override
public Observable<RemoteRxEvent> call(Throwable t1) {
logger.warn("Detected connection error when trying to connect to host: " + params.getHost() + " port: " + params.getPort(), t1);
connectionDisconnectCallback.call();
// complete if error occurs
return Observable.empty();
}
})
.takeUntil(closeTrigger)
.map(new Func1<RemoteRxEvent, Notification<T>>() {
@Override
public Notification<T> call(RemoteRxEvent rxEvent) {
if (rxEvent.getType() == RemoteRxEvent.Type.next) {
metrics.incrementNextCount();
return Notification.createOnNext(decoder.decode(rxEvent.getData()));
} else if (rxEvent.getType() == RemoteRxEvent.Type.error) {
metrics.incrementErrorCount();
return Notification.createOnError(fromBytesToThrowable(rxEvent.getData()));
} else if (rxEvent.getType() == RemoteRxEvent.Type.completed) {
metrics.incrementCompletedCount();
return Notification.createOnCompleted();
} else {
throw new RuntimeException("RemoteRxEvent of type: " + rxEvent.getType() + ", not supported.");
}
}
})
.<T>dematerialize()
.doOnEach(new Observer<T>() {
@Override
public void onCompleted() {
logger.info("RemoteRxEvent: " + params.getName() + " onCompleted()");
}
@Override
public void onError(Throwable e) {
logger.error("RemoteRxEvent: " + params.getName() + " onError()", e);
}
@Override
public void onNext(T t) {
if (logger.isDebugEnabled()) {
logger.debug("RemoteRxEvent: " + params.getName() + " onNext(): " + t);
}
}
});
}
@Override
public void start(AppEntity appEntity) throws Exception {
boolean started = started(appEntity);
if (started) {
return;
}
final String host = appEntity.getHost();
final int port = appEntity.getPort();
Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
@Override
public Object call() {
EventLoopGroup group = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.localAddress(new InetSocketAddress(port))
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//.addLast( new LineBasedFrameDecoder( 2048 ) )
// .addLast( new StringDecoder() )
.addLast(new NettyObjectDecoder(1024))
.addLast(new NettyObjectEncoder())
.addLast(new JdkZlibDecoder())
.addLast(new JdkZlibEncoder())
.addLast(new NettyServerHandler(getExcutorContainer()));
}
});
ChannelFuture f = b.bind().sync();
start.set(true);
LOG.info("netty server started successfully", host, port);
System.out.println("netty server started and listening for connections on " + f.channel().localAddress());
return f;
} catch (Exception e) {
e.printStackTrace();
} finally {
}
return null;
}
}).get();
}
@Substitute
public static ZlibEncoder newZlibEncoder(int compressionLevel) {
return new JdkZlibEncoder(compressionLevel);
}
@Substitute
public static ZlibEncoder newZlibEncoder(ZlibWrapper wrapper) {
return new JdkZlibEncoder(wrapper);
}
@Substitute
public static ZlibEncoder newZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
return new JdkZlibEncoder(wrapper, compressionLevel);
}
@Substitute
public static ZlibEncoder newZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
return new JdkZlibEncoder(wrapper, compressionLevel);
}
@Substitute
public static ZlibEncoder newZlibEncoder(byte[] dictionary) {
return new JdkZlibEncoder(dictionary);
}
@Substitute
public static ZlibEncoder newZlibEncoder(int compressionLevel, byte[] dictionary) {
return new JdkZlibEncoder(compressionLevel, dictionary);
}
@Substitute
public static ZlibEncoder newZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
return new JdkZlibEncoder(compressionLevel, dictionary);
}