下面列出了io.netty.channel.ChannelPipeline#addLast ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("unchecked")
private void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
ChannelHandler handler = this.handler;
if (handler != null) {
p.addLast(handler);
}
synchronized (options) {
setChannelOptions(channel, options, logger);
}
synchronized (attrs) {
for (Map.Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
@Override
protected void initChannel(Channel ch) throws Exception {
Preconditions.checkNotNull(handler, "Must specify a channel handler");
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
if(maxRequestSizeBytes > 0) {
pipeline.addLast(new HttpObjectAggregator(maxRequestSizeBytes));
}
if(chunkedWrites) {
pipeline.addLast(new ChunkedWriteHandler());
}
if(clientFactory != null) {
pipeline.addLast(new BindClientContextHandler(cookieConfig, clientFactory, requestAuthorizer));
}
pipeline.addLast(handler);
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// p.addLast("log", new LoggingHandler(LogLevel.ERROR));
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
p.addLast("http compressor", new HttpContentCompressor());
// p.addLast(new HttpObjectAggregator(1048576));
p.addLast("http dispatcher", reqDis);
p.addLast("idleStateHandler", new IdleStateHandler(10, 10, 0));
p.addLast("heartbeatHandler", new HeartbeatHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// 解码和编码,应和客户端一致
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.User.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyServerHandler", new NettyServerHandler());
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// netty提供的自定义长度解码器,解决TCP拆包/粘包问题
pipeline.addLast("frameEncoder", new LengthFieldPrepender(2));
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,
0, 2, 0, 2));
// 增加protobuf编解码支持
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(MessageProtobuf.Msg.getDefaultInstance()));
// 握手认证消息响应处理handler
pipeline.addLast(LoginAuthRespHandler.class.getSimpleName(), new LoginAuthRespHandler(imsClient));
// 心跳消息响应处理handler
pipeline.addLast(HeartbeatRespHandler.class.getSimpleName(), new HeartbeatRespHandler(imsClient));
// 接收消息处理handler
pipeline.addLast(TCPReadHandler.class.getSimpleName(), new TCPReadHandler(imsClient));
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(), FactorialClient.HOST, FactorialClient.PORT));
}
// Enable stream compression (you can remove these two if unnecessary)
pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
// Add the number codec first,
pipeline.addLast(new BigIntegerDecoder());
pipeline.addLast(new NumberEncoder());
// and then business logic.
pipeline.addLast(new FactorialClientHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new SynapsePacketDecoder());
pipeline.addLast(new SynapsePacketEncoder());
pipeline.addLast(new SynapseClientHandler(this.synapseClient));
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// the encoder and decoder are static as these are sharable
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(new GraphiteHandler(m_repository, this));
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//使用Netty实现的线程池
// DefaultEventExecutorGroup e1=new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast("decoder", new MessageDecoder());
// pipeline.addLast("encoder", new MessageEncoder());
// pipeline.addLast(e1,"handler", new CommonHandler());
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
ch.pipeline().addLast(new ObjectEncoder());
pipeline.addLast("handler", new EchoServerHandler());
}
public static void main(String... args) throws Exception {
DataReceiver dataReceiver = new FileReceiver();
final DefaultCommandExecutionTemplate tpl = new DefaultCommandExecutionTemplate(dataReceiver);
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new CrlfStringDecoder());
p.addLast(new FtpServerHandler(tpl));
}
};
BootstrapTemplate.newServerBootstrap("127.0.0.1", 2121, initializer);
}
/**
* Invoke jar send handler.
*
* @param ctx the ctx
*/
private void invokeJarSendHandler(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast(STREAMER, new ArchiveDecoder(10485760, receiveDirectory));
p.addLast(ACK_RESPONSER, new AckResponser());
p.addLast(ENCODER, new StringEncoder());
p.remove(this);
}
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(ENCODER);
pipeline.addLast(DECODER);
pipeline.addLast(new NettyTelnetHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("encoder", new HttpResponseEncoder());
p.addLast("decoder", new HttpRequestDecoder(4096, 8192, 8192, false));
p.addLast("handler", new HelloServerHandler(service));
}
public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMode,
ChannelManager channelManager) {
this.channelManager = channelManager;
isActive = remoteId != null && !remoteId.isEmpty();
startTime = System.currentTimeMillis();
//TODO: use config here
pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(60, TimeUnit.SECONDS));
pipeline.addLast(stats.tcp);
pipeline.addLast("protoPender", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("lengthDecode", new GSCProtobufVarint32FrameDecoder(this));
//handshake first
pipeline.addLast("handshakeHandler", handshakeHandler);
messageCodec.setChannel(this);
msgQueue.setChannel(this);
handshakeHandler.setChannel(this, remoteId);
p2pHandler.setChannel(this);
gscNetHandler.setChannel(this);
p2pHandler.setMsgQueue(msgQueue);
gscNetHandler.setMsgQueue(msgQueue);
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpClasspathServerHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
channels.add(ch);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new MightReturn500ChannelHandler());
}
@Override
protected void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(getPushMessageSender(pushConnectionRegistry));
}
@Override
public void initChannel(SocketChannel ch) {
CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new CorsHandler(corsConfig));
pipeline.addLast(new OkResponseHandler());
}
@Override
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
/*
* If we use a HttpClientCodec here instead of the HttpRequestEncoder and the HttpResponseDecoder
* and there is a HttpProxyHandler in the pipeline, that ProxyHandler will add another HttpClientCodec
* for communication with the proxy. When the WebSocketClientHandshaker tries to exchange the codecs in
* the pipeline, it will mix up the two HttpRequestEncoders in the pipeline and exchange the wrong one.
* HttpReqestEncoder and HttpResponseDecoder has precedence over the HttpClientCodec, so the
* WebSocketClientHandshaker will remove these handlers inserted here and will leave the HttpClientCodec
* added by the HttpProxyHandler alone.
*/
pipeline.addLast(new HttpResponseDecoder());
pipeline.addLast(new HttpRequestEncoder());
pipeline.addLast(new HttpObjectAggregator(8192));
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
LogLevel level = LogLevel.DEBUG;
p.addLast(new LoggingHandler(level));
p.addLast(new ReadTimeoutHandler(ConfigurationManager.getConfiguration().getInt("Network.Netty.ServerReadTimeoutMillis",10000), TimeUnit.MILLISECONDS));
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(NettyPistachioProtocol.Request.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new NettyPistachioServerHandler(handler));
}