下面列出了怎么用 io.netty.handler.codec.DelimiterBasedFrameDecoder 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
// and accept any invalid certificates in the client side.
// You will need something more complicated to identify both
// and server in the real world.
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
// On top of the SSL handler, add the text line codec.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// and then business logic.
pipeline.addLast(new SecureChatServerHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
// and accept any invalid certificates in the client side.
// You will need something more complicated to identify both
// and server in the real world.
pipeline.addLast(sslCtx.newHandler(ch.alloc(), SecureChatClient.HOST, SecureChatClient.PORT));
// On top of the SSL handler, add the text line codec.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// and then business logic.
pipeline.addLast(new SecureChatClientHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// the encoder and decoder are static as these are sharable
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(SERVER_HANDLER);
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(), TelnetClient.HOST, TelnetClient.PORT));
}
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(CLIENT_HANDLER);
}
@Test
public void testFailSlowTooLongFrameRecovery() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new DelimiterBasedFrameDecoder(1, true, false, Delimiters.nulDelimiter()));
for (int i = 0; i < 2; i ++) {
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 1, 2 }));
try {
assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0 })));
fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) {
// Expected
}
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 'A', 0 }));
ByteBuf buf = ch.readInbound();
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
buf.release();
}
}
@Test
public void testFailFastTooLongFrameRecovery() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new DelimiterBasedFrameDecoder(1, Delimiters.nulDelimiter()));
for (int i = 0; i < 2; i ++) {
try {
assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 1, 2 })));
fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) {
// Expected
}
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 'A', 0 }));
ByteBuf buf = ch.readInbound();
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
buf.release();
}
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加SSL用于加密每一个步骤.
// 在本次演示中我们在服务端使用了一张虚拟的证书,可以接收任何有效的客户端证书.
// 但在真实场景中你需要一个更复杂的客户端和服务端标记.
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
// SSL之上添加编解码处理.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 处理业务逻辑.
pipeline.addLast(new SecureChatServerHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加SSL用于加密每一个步骤.
// 在本次演示中我们在服务端使用了一张虚拟的证书,可以接收任何有效的客户端证书.
// 但在真实场景中你需要一个更复杂的客户端和服务端标记.
pipeline.addLast(sslCtx.newHandler(ch.alloc(), SecureChatClient.HOST, SecureChatClient.PORT));
// SSL之上添加编解码处理.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 处理业务逻辑.
pipeline.addLast(new SecureChatClientHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, new ByteBuf[]{
Unpooled.wrappedBuffer(Constant.RPC_DELIMITER.getBytes())
}));
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then unit logic.
pipeline.addLast(new RpcClientHandler(nodeId)/*CLIENT_HANDLER*/);
pipeline.addLast(new RpcClientDecoder());
pipeline.addLast(new StreamRpcClientHandler());
pipeline.addLast(new RpcClientUnitHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// the encoder and decoder are static as these are sharable
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(SERVER_HANDLER);
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(), TelnetClient.HOST, TelnetClient.PORT));
}
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(CLIENT_HANDLER);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
// and accept any invalid certificates in the client side.
// You will need something more complicated to identify both
// and server in the real world.
pipeline.addLast(createSslHandler(initSSLContext()));
// On top of the SSL handler, add the text line codec.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// and then business logic.
pipeline.addLast(new SecureChatServerHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
// and accept any invalid certificates in the client side.
// You will need something more complicated to identify both
// and server in the real world.
pipeline.addLast(createSslHandler(getClientSSLContext()));
// On top of the SSL handler, add the text line codec.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// and then business logic.
pipeline.addLast(new SecureChatClientHandler());
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
ServerBootstrap boot = bootstrap();
initChannelFactory();
boot.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()),
new StringDecoder(StandardCharsets.UTF_8),
encoder,
handler);
}
});
setOptions();
return boot.bind(localAddress);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
// and accept any invalid certificates in the client side.
// You will need something more complicated to identify both
// and server in the real world.
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
// On top of the SSL handler, add the text line codec.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// and then business logic.
pipeline.addLast(new SecureChatServerHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
// and accept any invalid certificates in the client side.
// You will need something more complicated to identify both
// and server in the real world.
pipeline.addLast(sslCtx.newHandler(ch.alloc(), SecureChatClient.HOST, SecureChatClient.PORT));
// On top of the SSL handler, add the text line codec.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// and then business logic.
pipeline.addLast(new SecureChatClientHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// the encoder and decoder are static as these are sharable
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(SERVER_HANDLER);
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(), TelnetClient.HOST, TelnetClient.PORT));
}
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(CLIENT_HANDLER);
}
@Test
public void testFailSlowTooLongFrameRecovery() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new DelimiterBasedFrameDecoder(1, true, false, Delimiters.nulDelimiter()));
for (int i = 0; i < 2; i ++) {
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 1, 2 }));
try {
assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0 })));
fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) {
// Expected
}
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 'A', 0 }));
ByteBuf buf = releaseLater((ByteBuf) ch.readInbound());
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
}
}
@Test
public void testFailFastTooLongFrameRecovery() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new DelimiterBasedFrameDecoder(1, Delimiters.nulDelimiter()));
for (int i = 0; i < 2; i ++) {
try {
assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 1, 2 })));
fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) {
// Expected
}
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 'A', 0 }));
ByteBuf buf = releaseLater((ByteBuf) ch.readInbound());
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
}
}
public void run() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH, false, Delimiters.lineDelimiter()));
p.addLast(new CarbonServerHandler(bus, configuration.getCarbon().getBaseRollup()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error(cause);
super.exceptionCaught(ctx, cause);
}
});
// Start the server.
b.bind(configuration.getCarbon().getBind(), configuration.getCarbon().getPort()).sync();
}
private void bind() throws Exception {
this.bossGroup = new NioEventLoopGroup();
this.workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(30, 0, 0, TimeUnit.MINUTES));
// 1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(new byte[]{delimiter}), Unpooled.wrappedBuffer(new byte[]{delimiter, delimiter})));
ch.pipeline().addLast(inboundHandler);
}
});
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
this.log.info("TCP服务启动完毕,port={}", this.port);
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
new IdleStateHandler(readTimeOut, 0, 0, TimeUnit.MINUTES));
// jt808协议 包头最大长度16+ 包体最大长度1023+分隔符2+转义字符最大姑且算60 = 1100
pipeline.addLast(
new DelimiterBasedFrameDecoder(1100, Unpooled.copiedBuffer(new byte[]{JT808Const.PKG_DELIMITER}),
Unpooled.copiedBuffer(new byte[]{JT808Const.PKG_DELIMITER, JT808Const.PKG_DELIMITER})));
pipeline.addLast(new JT808Decoder());
pipeline.addLast(new JT808Encoder());
pipeline.addLast(heartBeatMsgHandler);
pipeline.addLast(businessGroup, locationMsgHandler);//因为locationMsgHandler中涉及到数据库操作,所以放入businessGroup
pipeline.addLast(authMsgHandler);
pipeline.addLast(registerMsgHandler);
pipeline.addLast(logOutMsgHandler);
}
@Override
public void run() {
final Bootstrap boot = new Bootstrap();
final ThreadFactory clientFactory = new DefaultThreadFactory("client");
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
clientFactory, NioUdtProvider.BYTE_PROVIDER);
try {
boot.group(connectGroup)
.channelFactory(NioUdtProvider.BYTE_CONNECTOR)
.handler(new ChannelInitializer<UdtChannel>() {
@Override
protected void initChannel(final UdtChannel ch)
throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer",
new DelimiterBasedFrameDecoder(8192,
Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder(
CharsetUtil.UTF_8));
pipeline.addLast("encoder", new StringEncoder(
CharsetUtil.UTF_8));
pipeline.addLast("handler", new ClientHandler());
}
});
channel = boot.connect(address).sync().channel();
isRunning = true;
log.info("Client ready.");
waitForRunning(false);
log.info("Client closing...");
channel.close().sync();
isShutdown = true;
log.info("Client is done.");
} catch (final Throwable e) {
log.error("Client failed.", e);
} finally {
connectGroup.shutdownGracefully().syncUninterruptibly();
}
}
public static void start() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
clientHandler=new ClientHandler();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
p.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
p.addLast("encoder", new StringEncoder(Charset.forName("GBK")));
p.addLast(new IdleStateHandler(1,0,0,TimeUnit.SECONDS));
p.addLast(clientHandler);
}
});
//���ӷ����
ChannelFuture connect = b.connect(NETTY_SERVER_IP, NETTY_SERVER_PORT);
//��������
connect.addListener((ChannelFutureListener) channelFuture -> {
if (!channelFuture.isSuccess()) {
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(() -> {
try {
log.error("��������Ӳ��ϣ���ʼ��������...");
start();
} catch (Exception ignored) {
}
}, 1L, TimeUnit.SECONDS);
} else {
channel = channelFuture.channel();
log.info("��������ӳɹ�...");
}
});
}
@PostConstruct
public void init() throws Exception {
log.info("Starting Netty Server");
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
NettyServerHandler nettyServerHandler = new NettyServerHandler(deviceService);
pipeline.addLast(nettyServerHandler);
}
});
serverChannel = b.bind(host, port).sync().channel();
log.info("Netty Server started!");
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
pipeline.addLast(CLIENT_HANDLER);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// the encoder and decoder are static as these are sharable
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(SERVER_HANDLER);
}
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(ENCODER);
pipeline.addLast(DECODER);
pipeline.addLast(new NettyTelnetHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatServerHandler());
System.out.println("SimpleChatServerHandler: " + ch.remoteAddress() + "已连接;");
}