下面列出了怎么用 io.netty.handler.codec.LineBasedFrameDecoder 的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) throws Exception {
EventLoopGroup group = new OioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(RxtxChannel.class)
.handler(new ChannelInitializer<RxtxChannel>() {
@Override
public void initChannel(RxtxChannel ch) throws Exception {
ch.pipeline().addLast(
new LineBasedFrameDecoder(32768),
new StringEncoder(),
new StringDecoder(),
new RxtxClientHandler()
);
}
});
ChannelFuture f = b.connect(new RxtxDeviceAddress(PORT)).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
@Test
public void addByteDecoderWhenNoLeft() {
channel.pipeline()
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new LineBasedFrameDecoder(12);
testContext.addHandlerLast("decoder", decoder)
.addHandlerFirst("decoder$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR));
assertEquals(channel.pipeline()
.names(),
Arrays.asList("decoder$extract",
"decoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteDecoderWhenNoRight() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new LineBasedFrameDecoder(12);
testContext.addHandlerLast("decoder", decoder)
.addHandlerFirst("decoder$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR));
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
"decoder$extract",
"decoder",
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteDecoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new LineBasedFrameDecoder(12);
testContext.addHandlerLast("decoder", decoder)
.addHandlerFirst("decoder$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR));
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"decoder$extract",
"decoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteEncoderWhenNoLeft() {
channel.pipeline()
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new LineBasedFrameDecoder(12);
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList("encoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteEncoderWhenNoRight() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new LineBasedFrameDecoder(12);
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
"encoder",
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteEncoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new LineBasedFrameDecoder(12);
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"encoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addSeveralByteEncodersWhenCodec() {
ChannelHandler encoder1 = new LineBasedFrameDecoder(12);
ChannelHandler encoder2 = new LineBasedFrameDecoder(13);
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
testContext.addHandlerFirst("encoder1", encoder1)
.addHandlerFirst("encoder2", encoder2);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"encoder2",
"encoder1",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void tcpClientHandlesLineFeedDataFixedPool() throws InterruptedException {
Consumer<? super Connection> channelInit = c -> c
.addHandler("codec",
new LineBasedFrameDecoder(8 * 1024));
// ConnectionProvider p = ConnectionProvider.fixed
// ("tcpClientHandlesLineFeedDataFixedPool", 1);
ConnectionProvider p = ConnectionProvider.newConnection();
tcpClientHandlesLineFeedData(
TcpClient.create(p)
.host("localhost")
.port(echoServerPort)
.doOnConnected(channelInit)
);
}
@Provides
@WhoisProtocol
static ImmutableList<Provider<? extends ChannelHandler>> provideHandlerProviders(
Provider<ProxyProtocolHandler> proxyProtocolHandlerProvider,
@WhoisProtocol Provider<ReadTimeoutHandler> readTimeoutHandlerProvider,
Provider<LineBasedFrameDecoder> lineBasedFrameDecoderProvider,
Provider<WhoisServiceHandler> whoisServiceHandlerProvider,
Provider<FrontendMetricsHandler> frontendMetricsHandlerProvider,
Provider<WhoisQuotaHandler> whoisQuotaHandlerProvider,
Provider<FullHttpRequestRelayHandler> relayHandlerProvider) {
return ImmutableList.of(
proxyProtocolHandlerProvider,
readTimeoutHandlerProvider,
lineBasedFrameDecoderProvider,
whoisServiceHandlerProvider,
frontendMetricsHandlerProvider,
whoisQuotaHandlerProvider,
relayHandlerProvider);
}
public static void main(String[] args) throws Exception {
EventLoopGroup group = new OioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(RxtxChannel.class)
.handler(new ChannelInitializer<RxtxChannel>() {
@Override
public void initChannel(RxtxChannel ch) throws Exception {
ch.pipeline().addLast(
new LineBasedFrameDecoder(32768),
new StringEncoder(),
new StringDecoder(),
new RxtxClientHandler()
);
}
});
ChannelFuture f = b.connect(new RxtxDeviceAddress(PORT)).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 1) {
return;
}
// read one byte to guess protocol
final int magic = in.getByte(in.readerIndex());
ChannelPipeline p = ctx.pipeline();
p.addLast(new LocalHostPermitHandler(acceptForeignIp));
if (isHttp(magic)) {
// no welcome output for http protocol
if (welcomeFuture != null && welcomeFuture.isCancellable()) {
welcomeFuture.cancel(false);
}
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpProcessHandler());
p.remove(this);
} else {
p.addLast(new LineBasedFrameDecoder(2048));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
p.addLast(new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new IdleStateHandler(0, 0, 5 * 60));
p.addLast(new TelnetProcessHandler());
p.remove(this);
}
}
@Override
protected void initSocketChannel(SocketChannel ch) {
super.initSocketChannel(ch);
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new LogDispatchHandler());
ch.pipeline().addLast(new SimpleServerHandler());
}
@Override
protected void initSocketChannel(SocketChannel ch) {
super.initSocketChannel(ch);
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
///ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
ch.pipeline().addLast(new LogDispatchHandler());
ch.pipeline().addLast(new ClientSimpleHandler());
///ch.pipeline().addLast(new ClientSimpleHandlerX());
}
@Override
protected void test() throws Exception {
final SuccessTestHandler testHandler = new SuccessTestHandler();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.AUTO_READ, ThreadLocalRandom.current().nextBoolean());
b.resolver(NoopAddressResolverGroup.INSTANCE);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(clientHandlers);
p.addLast(new LineBasedFrameDecoder(64));
p.addLast(testHandler);
}
});
boolean finished = b.connect(destination).channel().closeFuture().await(10, TimeUnit.SECONDS);
logger.debug("Received messages: {}", testHandler.received);
if (testHandler.exceptions.isEmpty()) {
logger.debug("No recorded exceptions on the client side.");
} else {
for (Throwable t : testHandler.exceptions) {
logger.debug("Recorded exception on the client side: {}", t);
}
}
assertProxyHandlers(true);
assertThat(testHandler.received.toArray(), is(new Object[] { "0", "1", "2", "3" }));
assertThat(testHandler.exceptions.toArray(), is(EmptyArrays.EMPTY_OBJECTS));
assertThat(testHandler.eventCount, is(expectedEventCount));
assertThat(finished, is(true));
}
@Override
protected void test() throws Exception {
final FailureTestHandler testHandler = new FailureTestHandler();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.resolver(NoopAddressResolverGroup.INSTANCE);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(clientHandlers);
p.addLast(new LineBasedFrameDecoder(64));
p.addLast(testHandler);
}
});
boolean finished = b.connect(destination).channel().closeFuture().await(10, TimeUnit.SECONDS);
finished &= testHandler.latch.await(10, TimeUnit.SECONDS);
logger.debug("Recorded exceptions: {}", testHandler.exceptions);
assertProxyHandlers(false);
assertThat(testHandler.exceptions.size(), is(1));
Throwable e = testHandler.exceptions.poll();
assertThat(e, is(instanceOf(ProxyConnectException.class)));
assertThat(String.valueOf(e), containsString(expectedMessage));
assertThat(finished, is(true));
}
@Override
protected void test() throws Exception {
final long TIMEOUT = 2000;
for (ChannelHandler h: clientHandlers) {
if (h instanceof ProxyHandler) {
((ProxyHandler) h).setConnectTimeoutMillis(TIMEOUT);
}
}
final FailureTestHandler testHandler = new FailureTestHandler();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.resolver(NoopAddressResolverGroup.INSTANCE);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(clientHandlers);
p.addLast(new LineBasedFrameDecoder(64));
p.addLast(testHandler);
}
});
ChannelFuture cf = b.connect(DESTINATION).channel().closeFuture();
boolean finished = cf.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
finished &= testHandler.latch.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
logger.debug("Recorded exceptions: {}", testHandler.exceptions);
assertProxyHandlers(false);
assertThat(testHandler.exceptions.size(), is(1));
Throwable e = testHandler.exceptions.poll();
assertThat(e, is(instanceOf(ProxyConnectException.class)));
assertThat(String.valueOf(e), containsString("timeout"));
assertThat(finished, is(true));
}
private boolean authenticate(ChannelHandlerContext ctx, FullHttpRequest req) {
assertThat(req.method(), is(HttpMethod.CONNECT));
if (testMode != TestMode.INTERMEDIARY) {
ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true));
}
ctx.pipeline().remove(HttpObjectAggregator.class);
ctx.pipeline().get(HttpServerCodec.class).removeInboundHandler();
boolean authzSuccess = false;
if (username != null) {
CharSequence authz = req.headers().get(HttpHeaderNames.PROXY_AUTHORIZATION);
if (authz != null) {
String[] authzParts = authz.toString().split(" ", 2);
ByteBuf authzBuf64 = Unpooled.copiedBuffer(authzParts[1], CharsetUtil.US_ASCII);
ByteBuf authzBuf = Base64.decode(authzBuf64);
String expectedAuthz = username + ':' + password;
authzSuccess = "Basic".equals(authzParts[0]) &&
expectedAuthz.equals(authzBuf.toString(CharsetUtil.US_ASCII));
authzBuf64.release();
authzBuf.release();
}
} else {
authzSuccess = true;
}
return authzSuccess;
}
private boolean authenticate(ChannelHandlerContext ctx, Socks4CommandRequest req) {
assertThat(req.type(), is(Socks4CommandType.CONNECT));
if (testMode != TestMode.INTERMEDIARY) {
ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true));
}
boolean authzSuccess;
if (username != null) {
authzSuccess = username.equals(req.userId());
} else {
authzSuccess = true;
}
return authzSuccess;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LineBasedFrameDecoder(2048)); //字节解码器 ,其中2048是规定一行数据最大的字节数。 用于解决拆包问题
// p.addLast(new FixedLengthFrameDecoder(100)); //定长数据帧的解码器 ,每帧数据100个字节就切分一次。 用于解决粘包问题
// p.addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("~_~".getBytes()))); //固定字符切分解码器 ,会以"~_~"为分隔符。 注意此方法要放到StringDecoder()上面
p.addLast(new StringDecoder()); //设置解码器
p.addLast(new NettyServerHandlerDemo2()); //绑定自定义事物
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
ChannelFuture future = sbs.bind(port).sync();
System.out.println("服务端启动成功,端口为 :" + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程
workerGroup.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程
}
}
public void start() {
ServerBootstrap b = new ServerBootstrap();
b.group(m_bossGroup, m_workerGroup)//
.channel(NioServerSocketChannel.class)//
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(10),//
new StringDecoder(Charsets.UTF_8),//
new IdleStateHandler(0, 0, MAX_IDLE_SECONDS),//
new ShutdownRequestInboundHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128) // TODO set tcp options
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(m_config.getShutdownRequestPort());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("Broker shutdown port is {}.", m_config.getShutdownRequestPort());
} else {
log.error("Failed to listen shutdown port {}.", m_config.getShutdownRequestPort());
}
}
});
}
/**
* 채널 파이프라인 설정.
* Netty.Server.Configuration.NettyServerConfiguration 에서 등록한 Bean 을 이용해 사용자의 통신을 처리할 Handler 도 등록.
* Netty.Server.Handler.JsonHandler 에서 실제 사용자 요청 처리.
*
* @param channel
* @throws Exception
*/
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline channelPipeline = channel.pipeline();
switch (transferType) {
case "websocket":
channelPipeline
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new WebSocketServerCompressionHandler())
.addLast(new WebSocketServerProtocolHandler(transferWebsocketPath, transferWebsocketSubProtocol, transferWebsocketAllowExtensions))
.addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline)))
.addLast(websocketHandler);
case "tcp":
default:
channelPipeline
.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE))
.addLast(STRING_DECODER)
.addLast(STRING_ENCODER)
.addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline)))
.addLast(jsonHandler);
}
}
@Test
public void addByteDecoderWhenEmptyPipeline() {
ChannelHandler decoder = new LineBasedFrameDecoder(12);
testContext.addHandlerLast("decoder", decoder)
.addHandlerFirst("decoder$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR));
assertEquals(channel.pipeline()
.names(),
Arrays.asList("decoder$extract",
"decoder",
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addSeveralByteDecodersWhenCodec() {
ChannelHandler decoder1 = new LineBasedFrameDecoder(12);
ChannelHandler decoder2 = new LineBasedFrameDecoder(13);
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
testContext.addHandlerLast("decoder1$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR))
.addHandlerLast("decoder1", decoder1)
.addHandlerLast("decoder2$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR))
.addHandlerLast("decoder2", decoder2);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"decoder1$extract",
"decoder1",
"decoder2$extract",
"decoder2",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteEncoderWhenEmptyPipeline() {
ChannelHandler encoder = new LineBasedFrameDecoder(12);
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList("encoder", "DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addDecoderSkipsIfExist() {
channel.pipeline()
.addFirst("foo", new Utf8FrameValidator());
testContext.addHandlerFirst("foo", new LineBasedFrameDecoder(10));
assertEquals(channel.pipeline()
.names(),
Arrays.asList("foo", "DefaultChannelPipeline$TailContext#0"));
MatcherAssert.assertThat(channel.pipeline()
.get("foo"), is(instanceOf(Utf8FrameValidator.class)));
}
@Test
public void addEncoderSkipsIfExist() {
channel.pipeline()
.addFirst("foo", new Utf8FrameValidator());
testContext.addHandlerFirst("foo", new LineBasedFrameDecoder(10));
assertEquals(channel.pipeline()
.names(),
Arrays.asList("foo", "DefaultChannelPipeline$TailContext#0"));
MatcherAssert.assertThat(channel.pipeline()
.get("foo"), is(instanceOf(Utf8FrameValidator.class)));
}
@Test
public void tcpClientHandlesLineFeedData() throws InterruptedException {
final int messages = 100;
final CountDownLatch latch = new CountDownLatch(messages);
final List<String> strings = new ArrayList<>();
Connection client =
TcpClient.create()
.host("localhost")
.port(echoServerPort)
.doOnConnected(c -> c.addHandlerLast("codec",
new LineBasedFrameDecoder(8 * 1024)))
.handle((in, out) ->
out.sendString(Flux.range(1, messages)
.map(i -> "Hello World!" + i + "\n")
.subscribeOn(Schedulers.parallel()))
.then( in.receive()
.asString()
.take(100)
.flatMapIterable(s -> Arrays.asList(s.split("\\n")))
.doOnNext(s -> {
strings.add(s);
latch.countDown();
}).then())
)
.wiretap(true)
.connectNow(Duration.ofSeconds(15));
assertTrue("Expected messages not received. Received " + strings.size() + " messages: " + strings,
latch.await(15, TimeUnit.SECONDS));
assertEquals(messages, strings.size());
client.disposeNow();
}
@Test
public void tcpClientHandlesLineFeedDataElasticPool() throws InterruptedException {
Consumer<? super Connection> channelInit = c -> c
.addHandler("codec",
new LineBasedFrameDecoder(8 * 1024));
tcpClientHandlesLineFeedData(
TcpClient.create(ConnectionProvider.create("tcpClientHandlesLineFeedDataElasticPool", Integer.MAX_VALUE))
.host("localhost")
.port(echoServerPort)
.doOnConnected(channelInit)
);
}
@Test
public void flushOnComplete() {
Flux<String> flux = Flux.range(0, 100)
.map(n -> String.format("%010d", n));
List<String> test =
flux.collectList()
.block();
assertThat(test).isNotNull();
disposableServer = HttpServer.create()
.port(0)
.handle((req, resp) -> resp.sendString(flux.map(s -> s + "\n")))
.wiretap(true)
.bindNow();
Flux<String> client = HttpClient.create()
.port(disposableServer.port())
.wiretap(true)
.doOnConnected(res ->
res.addHandler(new LineBasedFrameDecoder(10)))
.get()
.uri("/")
.responseContent()
.asString();
StepVerifier.create(client)
.expectNextSequence(test)
.expectComplete()
.verify(Duration.ofSeconds(30));
}