下面列出了怎么用 io.netty.handler.codec.LengthFieldBasedFrameDecoder 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// netty提供的自定义长度解码器,解决TCP拆包/粘包问题
pipeline.addLast("frameEncoder", new LengthFieldPrepender(2));
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,
0, 2, 0, 2));
// 增加protobuf编解码支持
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(MessageProtobuf.Msg.getDefaultInstance()));
// 握手认证消息响应处理handler
pipeline.addLast(LoginAuthRespHandler.class.getSimpleName(), new LoginAuthRespHandler(imsClient));
// 心跳消息响应处理handler
pipeline.addLast(HeartbeatRespHandler.class.getSimpleName(), new HeartbeatRespHandler(imsClient));
// 接收消息处理handler
pipeline.addLast(TCPReadHandler.class.getSimpleName(), new TCPReadHandler(imsClient));
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0,0,35));
pipeline.addLast(new IdleStateTrigger());
//��ư��ĸ�ʽ 1�ֽڹ̶���ͷ 1�ֽڹ����� 1�ֽڣ��ж��Ƿ����topic�ֶΣ� 4�ֽڹ̶������ֶ� 12�ֽڹ̶�topic���DZ��룩 ʣ���ֽ�����
pipeline.addLast(new LengthFieldBasedFrameDecoder(2048, 3, 4, 0, 0));
pipeline.addLast(new MessageToPoDecoder());
//�����֤�Ĵ�����
//pipeline.addLast("auth",new AuthenticationHandler());
//���Э�鴦����
pipeline.addLast( "message-process", new MessageProcessHandler());
pipeline.addLast(new MessageEncoder());
//pipeline.addLast("auth",new AuthenticationHandler());
//pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
}
private Bootstrap initClientBootstrap() {
Bootstrap b = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
clientHandler = new TokenClientHandler(currentState, disconnectCallback);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
pipeline.addLast(new NettyResponseDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new NettyRequestEncoder());
pipeline.addLast(clientHandler);
}
});
return b;
}
public NettyRpcClient(Litchi litchi, String nodeType, String nodeId, String host, int port) {
this(nodeType, nodeId, host, port);
bootstrap.group(bossGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(DEFAULT_MAX_FRAME_LEN, 0, 4, 0, 0));
ch.pipeline().addLast(new RpcDecoder());
ch.pipeline().addLast(new RpcEncoder());
ch.pipeline().addLast(new GameEventHandler(litchi));
ch.pipeline().addLast(new RpcCallbackHandler(litchi, futureContext));
ch.pipeline().addLast(new ResponseHandler(litchi));
}
})
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true);
}
@Test
public void testFailSlowTooLongFrameRecovery() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4, false));
for (int i = 0; i < 2; i ++) {
assertFalse(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
try {
assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0 })));
fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) {
// Expected
}
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' }));
ByteBuf buf = ch.readInbound();
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
buf.release();
}
}
@Test
public void testFailFastTooLongFrameRecovery() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4));
for (int i = 0; i < 2; i ++) {
try {
assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) {
// Expected
}
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 1, 'A' }));
ByteBuf buf = ch.readInbound();
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
buf.release();
}
}
@Override
protected void initChannel(SocketChannel sc) {
ChannelPipeline pipeline = sc.pipeline();
ClientConfig config = clientContext.getConfig();
if (ServerConst.SERVER_PROTOCOL_STRING_LINE.equals(config.getProtocol())) {
pipeline.addLast(new StringPasswordLineDecoder(config.getMsgLength(), config.getCharset(), config.getPassword()));
pipeline.addLast(new StringPasswordLineEncoder(config.getCharset(), config.getPassword()));
} else if (ServerConst.SERVER_PROTOCOL_LENGTH_FIELD.equals(config.getProtocol())) {
pipeline.addLast(new LengthFieldBasedFrameDecoder(config.getMsgLength(), 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringPasswordDecoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new StringPasswordEncoder(config.getCharset(), config.getPassword()));
} else {
throw new NotFoundProtocolException(config.getProtocol());
}
pipeline.addLast(new ClientStringHandler(clientContext));
}
private Bootstrap initClientBootstrap() {
Bootstrap b = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
clientHandler = new TokenClientHandler(currentState, disconnectCallback);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
pipeline.addLast(new NettyResponseDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new NettyRequestEncoder());
pipeline.addLast(clientHandler);
}
});
return b;
}
/**
* 初始化Bootstrap
*
* @return
*/
public static final Bootstrap getBootstrap() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ByteArrayDecoder());
pipeline.addLast("encoder", new ByteArrayEncoder());
pipeline.addLast("handler", new TcpClientHandler());
}
});
return b;
}
protected static void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ByteArrayDecoder());
pipeline.addLast("encoder", new ByteArrayEncoder());
// pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
// pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new TcpServerHandler());
}
});
// b.bind(IP, PORT).sync();
ChannelFuture f = b.bind(PORT).sync(); // (7)
f.channel().closeFuture().sync();
System.out.println("TCP服务器已启动");
}
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(4, false));
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
0, 4, 0, 4));
ch.pipeline().addLast(new ObjectSerializerEncoder());
ch.pipeline().addLast(new ObjectSerializerDecoder());
ch.pipeline().addLast(rpcCmdDecoder);
ch.pipeline().addLast(new RpcCmdEncoder());
ch.pipeline().addLast(nettyClientRetryHandler);
ch.pipeline().addLast(socketManagerInitHandler);
ch.pipeline().addLast(rpcAnswerHandler);
}
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(4, false));
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast(new IdleStateHandler(managerProperties.getCheckTime(),
managerProperties.getCheckTime(), managerProperties.getCheckTime(), TimeUnit.MILLISECONDS));
ch.pipeline().addLast(new ObjectSerializerEncoder());
ch.pipeline().addLast(new ObjectSerializerDecoder());
ch.pipeline().addLast(rpcCmdDecoder);
ch.pipeline().addLast(new RpcCmdEncoder());
ch.pipeline().addLast(socketManagerInitHandler);
ch.pipeline().addLast(rpcAnswerHandler);
}
public void run() {
workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
// b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new MsgPackDecode());
pipeline.addLast("encoder", new MsgPackEncode());
pipeline.addLast(ClientConfiguration.clientHandler());
}
});
channel = b.connect(clientProperties.getServerHost(), clientProperties.getServerPort()).sync().channel();
status = Status.START;
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
status = Status.STOP;
}
@PostConstruct
public void start() throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channelFactory(NioServerSocketChannel::new)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("idleStateHandler", new IdleStateHandler(300, 0, 0));
pipeline.addLast("handler", nodeChannelInBoundHandler);
}
})
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_SNDBUF, 2048)
.option(ChannelOption.SO_RCVBUF, 1024);
bootstrap.bind(goPushNodeServerConfig.getNodePort()).sync();
log.info("Node server start successful! listening port: {}", goPushNodeServerConfig.getNodePort());
}
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pip = ch.pipeline();
int maxLength = 1048576;
int lengthFieldLength = 4;
int ignoreLength = -4;
int offset = 0;
pip.addLast(new LengthFieldBasedFrameDecoder(maxLength, offset, lengthFieldLength, ignoreLength, lengthFieldLength));
pip.addLast(new MessageDecoder(builder.getImessageandhandler()));
pip.addLast(new LengthFieldPrepender(4, true));
pip.addLast(new MessageEncoder(builder.getImessageandhandler()));
pip.addLast(new MessageExecutor(builder.getConsumer(), builder.getListener()));
for (ChannelHandler handler : builder.getExtraHandlers()) {
pip.addLast(handler);
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (sslCtxRefresher != null && this.enableTls) {
if (this.tlsEnabledWithKeyStore) {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
} else{
SslContext sslContext = sslCtxRefresher.get();
if (sslContext != null) {
ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
}
}
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ServerConnection(discoveryService));
}
@Test
public void testFailFastTooLongFrameRecovery() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4));
for (int i = 0; i < 2; i ++) {
try {
assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) {
// Expected
}
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 1, 'A' }));
ByteBuf buf = releaseLater((ByteBuf) ch.readInbound());
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
buf.release();
}
}
public void startMockBrokerService() throws Exception {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("mock-pulsar-%s").build();
final int numThreads = 2;
final int MaxMessageSize = 5 * 1024 * 1024;
try {
workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, threadFactory);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(workerGroup, workerGroup);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new MockServerCnx());
}
});
// Bind and start to accept incoming connections.
listenChannel = bootstrap.bind(0).sync().channel();
} catch (Exception e) {
throw e;
}
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
if (tlsEnabled) {
if (tlsEnabledWithKeyStore) {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
} else {
SslHandler handler = StringUtils.isNotBlank(sniHostName)
? sslContextSupplier.get().newHandler(ch.alloc(), sniHostName, sniHostPort)
: sslContextSupplier.get().newHandler(ch.alloc());
ch.pipeline().addLast(TLS_HANDLER, handler);
}
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
ch.pipeline()
.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING,
0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
/**
* {@link Provides} the list of providers of {@link ChannelHandler}s that are used for the EPP
* Protocol.
*/
@Provides
@EppProtocol
static ImmutableList<Provider<? extends ChannelHandler>> provideEppHandlerProviders(
@EppProtocol Provider<SslClientInitializer<NioSocketChannel>> sslClientInitializerProvider,
Provider<LengthFieldBasedFrameDecoder> lengthFieldBasedFrameDecoderProvider,
Provider<LengthFieldPrepender> lengthFieldPrependerProvider,
Provider<EppMessageHandler> eppMessageHandlerProvider,
Provider<EppActionHandler> eppActionHandlerProvider) {
return ImmutableList.of(
sslClientInitializerProvider,
lengthFieldBasedFrameDecoderProvider,
lengthFieldPrependerProvider,
eppMessageHandlerProvider,
eppActionHandlerProvider);
}
@Override
protected ChannelHandler newChannelInitializer() {
return new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline cp = ch.pipeline();
cp.addLast("frame", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
cp.addLast("prepender", FRAME_PREPENDER);
cp.addLast("decoder", GPB_DECODER_HANDLER);
cp.addLast("encoder", GPB_ENCODER_HANDLER);
// handler
cp.addLast("handler", serverHandler);
// cp.addLast("handler", new ServerHandler());
}
};
}
@Test
public void testFailSlowTooLongFrameRecovery() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4, false));
for (int i = 0; i < 2; i ++) {
assertFalse(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
try {
assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0 })));
fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) {
// Expected
}
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' }));
ByteBuf buf = releaseLater((ByteBuf) ch.readInbound());
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
buf.release();
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 读超时控制 - 注意:netty的EventLoop虽然支持定时任务任务,但是定时任务对EventLoop非常不友好,要尽量减少这种定时任务。
pipeline.addLast(NetUtils.READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(portExtraInfo.getSessionConfig().readTimeout()));
pipeline.addLast(new LengthFieldBasedFrameDecoder(portExtraInfo.getSessionConfig().maxFrameLength(), 0, 4, 0, 4));
pipeline.addLast(new ServerSocketCodec(portExtraInfo.getSessionConfig().serializer(), portExtraInfo));
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 读超时控制 - 注意:netty的EventLoop虽然支持定时任务任务,但是定时任务对EventLoop非常不友好,要尽量减少这种定时任务。
pipeline.addLast(NetUtils.READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(config.readTimeout()));
pipeline.addLast(new LengthFieldBasedFrameDecoder(config.maxFrameLength(), 0, 4, 0, 4));
pipeline.addLast(new ClientSocketCodec(config.serializer(), sessionId, netEventLoop));
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (this.enableTls) {
ch.pipeline().addLast(TLS_HANDLER, new SslHandler(SSLUtils.createSslEngine(sslContextFactory)));
}
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new KafkaRequestHandler(pulsarService, kafkaConfig, groupCoordinator, enableTls));
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline cp = socketChannel.pipeline();
cp.addLast(new RpcEncoder(RpcRequest.class));
cp.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0));
cp.addLast(new RpcDecoder(RpcResponse.class));
cp.addLast(new RpcClientHandler());
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new RpcEncoder(RpcResponse.class))
.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0))
.addLast(new RpcDecoder(RpcRequest.class))
.addLast(new RpcServerHandler(this.handleMap));
}
@Override
protected ChannelHandler newChannelInitializer() {
return new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline cp = ch.pipeline();
cp.addLast("frame", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
cp.addLast("prepender", FRAME_PREPENDER);
// Any other useful handler
cp.addLast("strDecoder", STRING_DECODER);
cp.addLast("strEncoder", STRING_ENCODER);
cp.addLast("handler", new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
COUNT.getAndIncrement();
ChannelPromise voidPromise = ctx.voidPromise();
if (ctx.channel().isWritable()) {
ctx.writeAndFlush(msg, voidPromise);
} else {
ctx.channel().eventLoop().schedule(() -> {
ctx.writeAndFlush(msg, voidPromise);
}, 1L, TimeUnit.SECONDS);
}
}
});
}
};
}
@Override
protected void initChannel(SocketChannel sc) {
ChannelPipeline pipeline = sc.pipeline();
ServerConfig config = serverContext.getConfig();
if (ServerConst.SERVER_PROTOCOL_STRING_LINE.equals(config.getProtocol())) {
pipeline.addLast(new StringPasswordLineDecoder(config.getMsgLength(), config.getCharset(), config.getPassword()));
pipeline.addLast(new StringPasswordLineEncoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new ServerStringHandler(serverContext));
} else if (ServerConst.SERVER_PROTOCOL_WEB_SOCKET.equals(config.getProtocol())) {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(config.getMsgLength()));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebSocketUriFilter(serverContext));
pipeline.addLast(new WebSocketServerProtocolHandler("/" + config.getServerName()));
pipeline.addLast(new WebSocketDecoder());
pipeline.addLast(new WebSocketEncoder());
pipeline.addLast(new LengthFieldBasedFrameDecoder(config.getMsgLength(), 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringPasswordDecoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new StringPasswordEncoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new ServerStringHandler(serverContext));
} else if (ServerConst.SERVER_PROTOCOL_LENGTH_FIELD.equals(config.getProtocol())) {
pipeline.addLast(new LengthFieldBasedFrameDecoder(config.getMsgLength(), 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringPasswordDecoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new StringPasswordEncoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new ServerStringHandler(serverContext));
} else if (ServerConst.SERVER_PROTOCOL_HTTP.equals(config.getProtocol())) {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(config.getMsgLength()));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebRequestHandler(serverContext));
} else {
throw new NotFoundProtocolException(config.getProtocol());
}
}
@Override
public void buildChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("lengthEncode", new LengthFieldPrepender(4, false));
pipeline.addLast("lengthDecoder", new LengthFieldBasedFrameDecoder(2000, 0, 4, 0, 4));
pipeline.addLast(MsgOutboundHandler.INSTANCE);
if(socketConfiguration.getProtocolType() == SocketConfiguration.ProtocolType.TEXT){
pipeline.addLast(DefaultTextTcpSocketServerHandler.INSTANCE);
}else{
pipeline.addLast(ProtocolTcpSocketServerHandler.INSTANCE);
}
}