下面列出了怎么用io.netty.channel.ChannelHandler的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
final HostPort hostPort = parseAuthority(handler.getAuthority());
ChannelHandler sslBootstrap = new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(), hostPort.host, hostPort.port);
SSLParameters sslParams = sslEngine.getSSLParameters();
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(sslParams);
ctx.pipeline().replace(this, null, new SslHandler(sslEngine, false));
}
};
return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler);
}
@Test
public void tlsHandler_userEventTriggeredSslEvent_unsupportedProtocol() throws Exception {
SslHandler badSslHandler = new SslHandler(engine, false) {
@Override
public String applicationProtocol() {
return "badprotocol";
}
};
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
pipeline.addLast(handler);
pipeline.replace(SslHandler.class, null, badSslHandler);
channelHandlerCtx = pipeline.context(handler);
Object sslEvent = SslHandshakeCompletionEvent.SUCCESS;
pipeline.fireUserEventTriggered(sslEvent);
// No h2 protocol was specified, so this should be closed.
assertFalse(channel.isOpen());
ChannelHandlerContext grpcHandlerCtx = pipeline.context(grpcHandler);
assertNull(grpcHandlerCtx);
}
protected ServerBootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup acceptEventGroup, EventLoopGroup ioEventGroup) throws Exception {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.group(acceptEventGroup, ioEventGroup)
.childHandler(channelHandler)
.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress())
.option(ChannelOption.SO_RCVBUF, config.getSocketBufferSize())
.option(ChannelOption.SO_BACKLOG, config.getBacklog())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_SNDBUF, config.getSocketBufferSize())
.childOption(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())
.childOption(ChannelOption.SO_KEEPALIVE, config.isKeepAlive())
.childOption(ChannelOption.SO_LINGER, config.getSoLinger())
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return serverBootstrap;
}
private synchronized Channel createChannel(SocketAddress address, long connectionTimeout) throws InterruptedException, TimeoutException {
if (address == null) {
throw new IllegalArgumentException("address must not be null!");
}
if (ioEventGroup == null) {
ioEventGroup = newIoEventGroup();
}
if (bootstrap == null){
ChannelHandler channelHandlerPipeline = newChannelHandlerPipeline();
bootstrap = newBootstrap(channelHandlerPipeline, ioEventGroup);
}
ChannelFuture channelFuture;
Channel channel;
channelFuture = bootstrap.connect(address);
if (!channelFuture.await(connectionTimeout)) {
throw new TimeoutException();
}
channel = channelFuture.channel();
if (channel == null || !channel.isActive()) {
throw new IllegalStateException();
}
channels.add(channel);
return channel;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new WebSocketServerProtocolHandler(WEB_SOCKET_PATH, null, true));
pipeline.addLast(new WebSocketHandler(litchi));
for (ChannelHandler handler : handlers) {
pipeline.addLast(handler);
}
}
@Redirect(method = "setCompressionThreshold", at = @At(
value = "INVOKE",
remap = false,
target = "Lio/netty/channel/ChannelPipeline;addBefore(Ljava/lang/String;Ljava/lang/String;Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline;"
))
private ChannelPipeline decodeEncodePlacement(ChannelPipeline instance, String base, String newHandler, ChannelHandler handler) {
// Fixes the handler order
switch (base) {
case "decoder": {
if (instance.get(CommonTransformer.HANDLER_DECODER_NAME) != null)
base = CommonTransformer.HANDLER_DECODER_NAME;
break;
}
case "encoder": {
if (instance.get(CommonTransformer.HANDLER_ENCODER_NAME) != null)
base = CommonTransformer.HANDLER_ENCODER_NAME;
break;
}
}
return instance.addBefore(base, newHandler, handler);
}
private static ChannelHandler newServerHandler(final SslContext context,
final byte[] response, final ChannelHandler handler) {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SslHandler sslHandler = context.newHandler(ch.alloc());
if (response != null) {
ReferenceCountedOpenSslEngine engine = (ReferenceCountedOpenSslEngine) sslHandler.engine();
engine.setOcspResponse(response);
}
pipeline.addLast(sslHandler);
if (handler != null) {
pipeline.addLast(handler);
}
}
};
}
protected Bootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup ioEventGroup) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.group(ioEventGroup)
.handler(channelHandler)
.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress())
.option(ChannelOption.SO_RCVBUF, config.getSocketBufferSize())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
@Override
protected ChannelHandler newChannelHandlerPipeline() {
final CommandDispatcher commandDispatcher = new DefaultCommandDispatcher(requestBarrier, requestHandler, responseHandler);
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline()
.addLast(new NettyDecoder(codec))
.addLast(new NettyEncoder(codec))
.addLast(new TransportEventHandler(requestBarrier, transportEventBus))
.addLast(new ExceptionChannelHandler(exceptionHandler, requestBarrier))
.addLast(new CommandInvocation(commandDispatcher));
}
};
}
public void start(ServerTransportListener listener) {
Preconditions.checkState(this.listener == null, "Handler already registered");
this.listener = listener;
// Create the Netty handler for the pipeline.
grpcHandler = createHandler(listener, channelUnused);
NettyHandlerSettings.setAutoWindow(grpcHandler);
// Notify when the channel closes.
final class TerminationNotifier implements ChannelFutureListener {
boolean done;
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!done) {
done = true;
notifyTerminated(grpcHandler.connectionError());
}
}
}
ChannelFutureListener terminationNotifier = new TerminationNotifier();
channelUnused.addListener(terminationNotifier);
channel.closeFuture().addListener(terminationNotifier);
ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
channel.pipeline().addLast(negotiationHandler);
}
@Override
protected void doStart() throws Exception {
EventLoopGroup acceptEventGroup = newAcceptEventGroup();
EventLoopGroup ioEventGroup = newIoEventGroup();
ChannelHandler channelHandlerPipeline = newChannelHandlerPipeline();
ServerBootstrap serverBootstrap = newBootstrap(channelHandlerPipeline, acceptEventGroup, ioEventGroup);
Channel channel = doBind(serverBootstrap);
this.acceptEventGroup = acceptEventGroup;
this.ioEventGroup = ioEventGroup;
this.serverBootstrap = serverBootstrap;
this.channel = channel;
}
@Test
public void tlsHandler_handlerAddedAddsSslHandler() throws Exception {
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
pipeline.addLast(handler);
assertTrue(pipeline.first() instanceof SslHandler);
}
@Test
public void tlsHandler_userEventTriggeredNonSslEvent() throws Exception {
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
pipeline.addLast(handler);
channelHandlerCtx = pipeline.context(handler);
Object nonSslEvent = new Object();
pipeline.fireUserEventTriggered(nonSslEvent);
// A non ssl event should not cause the grpcHandler to be in the pipeline yet.
ChannelHandlerContext grpcHandlerCtx = pipeline.context(grpcHandler);
assertNull(grpcHandlerCtx);
}
private Http2ServerUpgradeCodec(String handlerName, Http2ConnectionHandler connectionHandler,
ChannelHandler... handlers) {
this.handlerName = handlerName;
this.connectionHandler = connectionHandler;
this.handlers = handlers;
frameReader = new DefaultHttp2FrameReader();
}
BufferUntilAltsNegotiatedHandler(
GrpcHttp2ConnectionHandler grpcHandler, ChannelHandler... negotiationhandlers) {
super(negotiationhandlers);
// Save the gRPC handler. The ALTS handler doesn't support buffering before the handshake
// completes, so we wait until the handshake was successful before adding the grpc handler.
this.grpcHandler = grpcHandler;
}
public MqttOverWebsocketProtocolHandlerPipeline(Protocol protocol, ChannelHandler channelHandler, BrokerContext brokerContext) {
super(brokerContext);
this.protocol = protocol;
this.brokerContext = brokerContext;
if (channelHandler instanceof DefaultProtocolHandlerPipeline) {
DefaultProtocolHandlerPipeline handlerPipeline = (DefaultProtocolHandlerPipeline) channelHandler;
// todo
}
}
Http2MultiplexCodec(Http2ConnectionEncoder encoder,
Http2ConnectionDecoder decoder,
Http2Settings initialSettings,
ChannelHandler inboundStreamHandler) {
super(encoder, decoder, initialSettings);
this.inboundStreamHandler = inboundStreamHandler;
}
public static ChannelHandler getSslHandler(SocketChannel channel, boolean useClientCA, String sslKeyStoreType, String sslKeyFilePath, String sslManagerPwd, String sslStorePwd) {
SslContext sslContext = createSSLContext(useClientCA, sslKeyStoreType, sslKeyFilePath, sslManagerPwd, sslStorePwd);
SSLEngine sslEngine = sslContext.newEngine(
channel.alloc(),
channel.remoteAddress().getHostString(),
channel.remoteAddress().getPort());
sslEngine.setUseClientMode(false); // server mode
if (useClientCA) {
sslEngine.setNeedClientAuth(true);
}
return new SslHandler(sslEngine);
}
private Bootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup ioEventGroup) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.group(ioEventGroup)
.handler(channelHandler)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
@Override
public void start(RequestHandlerRegistry requestHandlerRegistry, int port) throws Exception {
this.port = port;
this.requestHandlerRegistry = requestHandlerRegistry;
EventLoopGroup acceptEventGroup = newEventLoopGroup();
EventLoopGroup ioEventGroup = newEventLoopGroup();
ChannelHandler channelHandlerPipeline = newChannelHandlerPipeline();
ServerBootstrap serverBootstrap = newBootstrap(channelHandlerPipeline, acceptEventGroup, ioEventGroup);
Channel channel = doBind(serverBootstrap);
this.acceptEventGroup = acceptEventGroup;
this.ioEventGroup = ioEventGroup;
this.channel = channel;
}
private ChannelHandler newChannelHandlerPipeline() {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
channel.pipeline()
.addLast(new RequestDecoder())
.addLast(new ResponseEncoder())
.addLast(new RequestInvocation(requestHandlerRegistry));
}
};
}
@Override
public ChannelHandler createPipeline(Protocol protocol) {
CommandDispatcher commandDispatcher = commandDispatcherFactory.getCommandDispatcher(protocol);
ChannelHandler handlerPipeline = new DefaultProtocolHandlerPipeline(protocol, commandDispatcher, transportEventHandler, connectionHandler);
if (protocol instanceof ChannelHandlerProvider) {
ChannelHandler customHandlerPipeline = ((ChannelHandlerProvider) protocol).getChannelHandler(handlerPipeline);
if (customHandlerPipeline != null) {
return customHandlerPipeline;
}
}
return handlerPipeline;
}
private static EmbeddedChannel createServerChannel(ChannelHandler handler) {
return new EmbeddedChannel(
new HttpServerCodec(),
new HttpObjectAggregator(8192),
new WebSocketServerProtocolHandler("/test", "test-proto-1, test-proto-2", false),
handler);
}
public RxtxServerChannelManager(String serialport, int baudrate,
Map<String, ClientEntry> clientEntries,
Supplier<List<ChannelHandler>> channelSupplier,
int readTimeout,
boolean loggingInitially) {
super(serialport, baudrate, clientEntries, channelSupplier, readTimeout, loggingInitially);
}
public RxtxClientChannelManager(@Nonnull ConnectionEventListener connectionEventListener,
Supplier<List<ChannelHandler>> channelSupplier,
int readTimeout,
boolean enableLogging) {
this.connectionEventListener = requireNonNull(connectionEventListener, "connEventListener");
this.channelSupplier = requireNonNull(channelSupplier, "channelSupplier");
this.readTimeout = readTimeout;
this.loggingEnabled = enableLogging;
}
public WebSocketServerInitializer(Litchi litchi, boolean openSSL, ChannelHandler... handlers) {
this.litchi = litchi;
if (openSSL) {
this.sslCtx = createSSLContext();
}
this.handlers = handlers;
}
@Override
public ChannelHandler getChannelHandler(ChannelHandler channelHandler) {
if (delegate instanceof ChannelHandlerProvider) {
return ((ChannelHandlerProvider) delegate).getChannelHandler(channelHandler);
}
return null;
}
@Override
protected void test() throws Exception {
final long TIMEOUT = 2000;
for (ChannelHandler h: clientHandlers) {
if (h instanceof ProxyHandler) {
((ProxyHandler) h).setConnectTimeoutMillis(TIMEOUT);
}
}
final FailureTestHandler testHandler = new FailureTestHandler();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.resolver(NoopAddressResolverGroup.INSTANCE);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(clientHandlers);
p.addLast(new LineBasedFrameDecoder(64));
p.addLast(testHandler);
}
});
ChannelFuture cf = b.connect(DESTINATION).channel().closeFuture();
boolean finished = cf.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
finished &= testHandler.latch.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
logger.debug("Recorded exceptions: {}", testHandler.exceptions);
assertProxyHandlers(false);
assertThat(testHandler.exceptions.size(), is(1));
Throwable e = testHandler.exceptions.poll();
assertThat(e, is(instanceOf(ProxyConnectException.class)));
assertThat(String.valueOf(e), containsString("timeout"));
assertThat(finished, is(true));
}
private static EmbeddedChannel createClientChannel(ChannelHandler handler) throws Exception {
return new EmbeddedChannel(
new HttpClientCodec(),
new HttpObjectAggregator(8192),
new WebSocketClientProtocolHandler(new URI("ws://localhost:1234/test"),
WebSocketVersion.V13, "test-proto-2",
false, null, 65536),
handler);
}
private static ChannelHandler checkSharable(ChannelHandler handler) {
if ((handler instanceof ChannelHandlerAdapter && !((ChannelHandlerAdapter) handler).isSharable()) &&
!handler.getClass().isAnnotationPresent(ChannelHandler.Sharable.class)) {
throw new IllegalArgumentException("The handler must be Sharable");
}
return handler;
}