下面列出了io.netty.channel.ChannelInitializer#io.netty.channel.ChannelPipeline 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
// and accept any invalid certificates in the client side.
// You will need something more complicated to identify both
// and server in the real world.
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
// On top of the SSL handler, add the text line codec.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// and then business logic.
pipeline.addLast(new SecureChatServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// Enable HTTPS if necessary.
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
p.addLast(new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpContents.
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(nettyHttpClientHandler);
}
public final ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
channel.writeAndFlush(Unpooled.wrappedBuffer(version.getBytes())).addListener((ChannelFuture future) -> {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
ChannelPipeline p = future.channel().pipeline();
ChannelHandlerContext ctx = p.context(ProtocolHandshakeHandler.class);
p.addBefore(ctx.name(), "rfb-handshake-decoder", newRfbClientDecoder());
p.addBefore(ctx.name(), "rfb-handshake-encoder", newRfbClientEncoder());
promise.setSuccess();
});
return promise;
}
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
NetworkManagerWrapper networkmanager = new SpigotNetworkManagerWrapper(channel, (NetworkManager) pipeline.get(SpigotChannelHandlers.NETWORK_MANAGER));
networkmanager.setPacketListener(new SpigotFakePacketListener(networkmanager));
ConnectionImpl connection = new ConnectionImpl(networkmanager);
ProtocolStorage.addConnection(channel.remoteAddress(), connection);
pipeline.addAfter(SpigotChannelHandlers.READ_TIMEOUT, ChannelHandlers.INITIAL_DECODER, new InitialPacketDecoder());
pipeline.addBefore(SpigotChannelHandlers.NETWORK_MANAGER, ChannelHandlers.LOGIC, new LogicHandler(connection, Packet.class));
pipeline.remove("legacy_query");
pipeline.replace(SpigotChannelHandlers.READ_TIMEOUT, SpigotChannelHandlers.READ_TIMEOUT, new SimpleReadTimeoutHandler(30));
pipeline.replace(SpigotChannelHandlers.SPLITTER, SpigotChannelHandlers.SPLITTER, new SpigotWrappedSplitter());
pipeline.replace(SpigotChannelHandlers.PREPENDER, SpigotChannelHandlers.PREPENDER, new SpigotWrappedPrepender());
pipeline.addAfter(SpigotChannelHandlers.PREPENDER, ChannelHandlers.RAW_CAPTURE_SEND, new RawPacketDataCaptureSend(connection));
pipeline.addAfter(SpigotChannelHandlers.SPLITTER, ChannelHandlers.RAW_CAPTURE_RECEIVE, new RawPacketDataCaptureReceive(connection));
if (replaceDecoderEncoder) {
if (pipeline.get(SpigotChannelHandlers.DECODER).getClass().equals(net.minecraft.server.v1_16_R1.PacketDecoder.class)) {
pipeline.replace(SpigotChannelHandlers.DECODER, SpigotChannelHandlers.DECODER, new SpigotPacketDecoder());
}
if (pipeline.get(SpigotChannelHandlers.ENCODER).getClass().equals(net.minecraft.server.v1_16_R1.PacketEncoder.class)) {
pipeline.replace(SpigotChannelHandlers.ENCODER, SpigotChannelHandlers.ENCODER, new SpigotPacketEncoder());
}
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
MixMessageEncoder encoder = new MixMessageEncoder();
MixMessageDecoder decoder = new MixMessageDecoder();
if (throughputCounter != null) {
pipeline.addLast(throughputCounter, decoder, encoder, requestHandler);
} else {
pipeline.addLast(decoder, encoder, requestHandler);
}
}
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
/**
* Initialize the {@code SocketChannel}.
*
* This method initializes a new channel created by the {@code ServerBootstrap}
*
* The default implementation create a remote connection, configures a default pipeline
* which handles coding/decoding messages, handshaking, timeout and error handling based
* on {@code RpcConfig} instance provided at construction time.
*
* On each call to this method, every handler added must be a new instance. As of now, the
* handlers cannot be shared across connections.
*
* Subclasses can override it to add extra handlers if needed.
*
* @param ch the socket channel
*/
protected void initChannel(final SocketChannel ch) throws SSLException {
C connection = initRemoteConnection(ch);
connection.setChannelCloseHandler(newCloseListener(ch, connection));
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(PROTOCOL_ENCODER, new RpcEncoder("s-" + rpcConfig.getName()));
pipeline.addLast(MESSAGE_DECODER, newDecoder(connection.getAllocator()));
pipeline.addLast(HANDSHAKE_HANDLER, newHandshakeHandler(connection));
if (rpcConfig.hasTimeout()) {
pipeline.addLast(TIMEOUT_HANDLER,
new LoggingReadTimeoutHandler(connection, rpcConfig.getTimeout()));
}
pipeline.addLast(MESSAGE_HANDLER, new InboundHandler(connection));
pipeline.addLast(EXCEPTION_HANDLER, new RpcExceptionHandler<>(connection));
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
/*
* 解码和编码,应和服务端一致
* */
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyClientHandler",nettyClientHandler);
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0,0,35));
pipeline.addLast(new IdleStateTrigger());
//��ư��ĸ�ʽ 1�ֽڹ̶���ͷ 1�ֽڹ����� 1�ֽڣ��ж��Ƿ����topic�ֶΣ� 4�ֽڹ̶������ֶ� 12�ֽڹ̶�topic���DZ��룩 ʣ���ֽ�����
pipeline.addLast(new LengthFieldBasedFrameDecoder(2048, 3, 4, 0, 0));
pipeline.addLast(new MessageToPoDecoder());
//�����֤�Ĵ�����
//pipeline.addLast("auth",new AuthenticationHandler());
//���Э�鴦����
pipeline.addLast( "message-process", new MessageProcessHandler());
pipeline.addLast(new MessageEncoder());
//pipeline.addLast("auth",new AuthenticationHandler());
//pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// Uncomment the following lines if you want HTTPS
//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
//engine.setUseClientMode(false);
//pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpFileServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
pipeline.addLast(new HttpContentCompressor());
pipeline.addLast(new HttpUploadServerHandler());
}
private Bootstrap initClientBootstrap() {
Bootstrap b = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
clientHandler = new TokenClientHandler(currentState, disconnectCallback);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
pipeline.addLast(new NettyResponseDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new NettyRequestEncoder());
pipeline.addLast(clientHandler);
}
});
return b;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
pipeline.addLast(new HttpContentCompressor());
pipeline.addLast(new HttpUploadServerHandler());
}
@Override
protected void initChannel(SocketChannel channel)
{
ChannelPipeline pipeline = channel.pipeline();
if (sslContextSupplier.isPresent()) {
if (allowPlainText) {
pipeline.addLast(new OptionalSslHandler(sslContextSupplier.get().get()));
}
else {
pipeline.addLast(sslContextSupplier.get().get().newHandler(channel.alloc()));
}
}
pipeline.addLast(new ThriftProtocolDetection(
new ThriftServerHandler(methodInvoker, requestTimeout, timeoutExecutor),
maxFrameSize,
assumeClientsSupportOutOfOrderResponses));
}
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
return true;
}
return false;
}
public static void main(String[] args) throws Exception {
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringEncoder());
p.addLast(new StringDecoder());
p.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
ctx.close();
}
});
}
};
BootstrapTemplate.newServerBootstrap(HOST, PORT, initializer);
}
@Override
protected void configureServer(Iso8583Server<IsoMessage> server) {
server.setConfigurer(new ConnectorConfigurer<>() {
@Override
public void configurePipeline(ChannelPipeline pipeline, ServerConfiguration configuration) {
pipeline.addBefore("idleEventHandler", "connectListenerHandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
final IsoMessage message = server.getIsoMessageFactory().newMessage(0x800);
ctx.writeAndFlush(message);
}
});
}
});
}
public static void serializePipeline(SerializeProtocolEnum serializeProtocol, ChannelPipeline pipeline) {
switch (serializeProtocol) {
case KRYO:
KryoCodecServiceImpl kryoCodecServiceImpl = new KryoCodecServiceImpl(KryoPoolFactory.getKryoPoolInstance());
pipeline.addLast(new KryoEncoder(kryoCodecServiceImpl));
pipeline.addLast(new KryoDecoder(kryoCodecServiceImpl));
break;
case HESSIAN:
HessianCodecServiceImpl hessianCodecServiceImpl = new HessianCodecServiceImpl();
pipeline.addLast(new HessianEncoder(hessianCodecServiceImpl));
pipeline.addLast(new HessianDecoder(hessianCodecServiceImpl));
break;
case PROTOSTUFF:
ProtostuffCodecServiceImpl protostuffCodecServiceImpl = new ProtostuffCodecServiceImpl();
pipeline.addLast(new ProtostuffEncoder(protostuffCodecServiceImpl));
pipeline.addLast(new ProtostuffDecoder(protostuffCodecServiceImpl));
break;
default:
KryoCodecServiceImpl defaultCodec = new KryoCodecServiceImpl(KryoPoolFactory.getKryoPoolInstance());
pipeline.addLast(new KryoEncoder(defaultCodec));
pipeline.addLast(new KryoDecoder(defaultCodec));
break;
}
}
@Override
public void buildBungeeServer(Channel channel, Connection connection) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(new EncapsulatedHandshakeSender(null, false));
NetworkDataCache cache = NetworkDataCache.getFrom(connection);
pipeline.replace(PipelineUtils.PACKET_DECODER, PipelineUtils.PACKET_DECODER, new FromServerPacketDecoder(connection, cache));
pipeline.replace(PipelineUtils.PACKET_ENCODER, PipelineUtils.PACKET_ENCODER, new ToServerPacketEncoder(connection, cache));
pipeline.get(CustomHandlerBoss.class).setHandlerChangeListener(handler -> {
try {
return (handler instanceof DownstreamBridge) ? new EntityRewriteDownstreamBridge(
ReflectionUtils.getFieldValue(handler, "con"), connection.getVersion()
) : handler;
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new RuntimeException(e);
}
});
}
private void createChannel(int port) throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(2);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.DEBUG));
p.addLast(new NettySimpleMessageHandler());
p.addLast(new NettyMasterHandler(null, new CommandHandlerManager(), 1000 * 60 * 24));
p.addLast(new HAProxyMessageDecoder());
}
});
b.bind(port).sync();
}
public ProxyServer(int requestedPort, RequestFilter requestFilter, ResponseFilter responseFilter) {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel c) {
ChannelPipeline p = c.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new ProxyClientHandler(requestFilter, responseFilter));
}
});
channel = b.bind(requestedPort).sync().channel();
InetSocketAddress isa = (InetSocketAddress) channel.localAddress();
String host = "127.0.0.1"; //isa.getHostString();
port = isa.getPort();
logger.info("proxy server started - http://{}:{}", host, port);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// HttpServerCodec is a combination of HttpRequestDecoder and HttpResponseEncoder
// 使用HttpServerCodec将ByteBuf编解码为httpRequest/httpResponse
pipeline.addLast(new HttpServerCodec());
addAdvanced(pipeline);
pipeline.addLast(new ChunkedWriteHandler());
// 路由分发器
pipeline.addLast(new ControllerDispatcher());
}
/**
* 通道注册的时候配置websocket解码handler
*/
@Override
protected final void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(),host,port));
}
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64*1024));
pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(webSocketURL, WebSocketVersion.V13, subprotocol, false, new DefaultHttpHeaders())));
pipeline.addLast(new WebSocketConnectedClientHandler());//连接成功监听handler
}
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(encoder);
pipeline.addLast(new IdleStateHandler(config.getMaxReadIdleSeconds(), 0, 0));
pipeline.addLast(peerChannelHandlerExecutorGroup, peerChannelHandler);
}
private void dispatchToHttp(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
// aggregate HttpRequest/HttpContent/LastHttpContent to FullHttpRequest
pipeline.addLast(new HttpObjectAggregator(8096));
pipeline.addLast(HttpHandler.getInstance(channelListener));
pipeline.remove(this);
// 将channelActive事件传递到HttpHandler
ctx.fireChannelActive();
}
private void switchToHttp(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("decoder", new HttpRequestDecoder());
p.addLast("encoder", new HttpResponseEncoder());
p.addLast("deflater", new HttpContentCompressor());
p.addLast("handler", new HttpSnoopServerHandler());
p.remove(this);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("httpDecoder", new HttpRequestDecoder());
pipeline.addLast("httpAggregator", new HttpObjectAggregator(conf.getClientMaxBodySize()));
pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
pipeline.addLast("httpMyResponseHandler", new HttpSearchResponseHandler());
pipeline.addLast("httpSearchDecoder", new SearchQueryDecoder());
RestClient restClient = applicationContext.getBean("elasticRestClient", RestClient.class);
pipeline.addLast("httpSearchHandler", new HttpSearchHandler(this.executor, restClient));
}
@Override
protected void initChannel(final SocketChannel socketChannel) {
DatabaseType databaseType = ProxySchemaContexts.getInstance().getSchemaContexts().getSchemaContexts().values().iterator().next().getSchema().getDatabaseType();
DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = DatabaseProtocolFrontendEngineFactory.newInstance(databaseType);
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine));
}
@Override
protected void initChannel(SocketChannel socketChannel) {
//channel 代表了一个socket.
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ReadTimeoutHandler(1));
/**
* http-request解码器
* http服务器端对request解码
*/
pipeline.addLast("decoder", new HttpRequestDecoder(8192, 8192, 8192));
/**
* http-response解码器
* http服务器端对response编码
*/
pipeline.addLast("encoder", new HttpResponseEncoder());
/**
* 把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse
*/
pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));
/**
* 压缩
*/
pipeline.addLast(new HttpContentCompressor());
/**
* handler分为两种,inbound handler,outbound handler,分别处理 流入,流出。
* 服务端业务逻辑
*/
pipeline.addLast(new HttpServerHandler());
}
public void removeReadTimeoutHandler()
{
// Remove (and therefore destroy) the readTimeoutHandler when we release the
// channel back to the pool. As don't want it timing-out when it's not in use.
final ChannelPipeline pipeline = getChannel().pipeline();
removeHandlerFromPipeline(READ_TIMEOUT_HANDLER_NAME, pipeline);
}