下面列出了怎么用io.netty.channel.ChannelDuplexHandler的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void addByteDecoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new LineBasedFrameDecoder(12);
testContext.addHandlerLast("decoder", decoder)
.addHandlerFirst("decoder$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR));
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"decoder$extract",
"decoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addNonByteDecoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new ChannelHandlerAdapter() {
};
testContext.addHandlerLast("decoder", decoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"decoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteEncoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new LineBasedFrameDecoder(12);
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"encoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addNonByteEncoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new ChannelHandlerAdapter() {
};
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"encoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addSeveralByteEncodersWhenCodec() {
ChannelHandler encoder1 = new LineBasedFrameDecoder(12);
ChannelHandler encoder2 = new LineBasedFrameDecoder(13);
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
testContext.addHandlerFirst("encoder1", encoder1)
.addHandlerFirst("encoder2", encoder2);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"encoder2",
"encoder1",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
private void createServerChannelHandler() {
serverChannelHandler = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel channel = (Channel) msg;
channel.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void channelRead(ChannelHandlerContext handlerContext, Object object) throws Exception {
onChannelRead(handlerContext, object);
super.channelRead(handlerContext, object);
}
@Override
public void write(ChannelHandlerContext handlerContext, Object object, ChannelPromise promise)
throws Exception {
onChannelWrite(handlerContext, object, promise);
super.write(handlerContext, object, promise);
}
});
ctx.fireChannelRead(msg);
}
};
}
@Test
public void flushShouldFailAllPromises() throws Exception {
doHandshake();
channel
.pipeline()
.addFirst(
new ChannelDuplexHandler() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
throw new Exception("Fake exception");
}
});
// Write the message 1 character at a time.
String message = "hello";
final AtomicInteger failures = new AtomicInteger();
for (int ix = 0; ix < message.length(); ++ix) {
ByteBuf in = Unpooled.copiedBuffer(message, ix, 1, UTF_8);
@SuppressWarnings("unused") // go/futurereturn-lsc
Future<?> possiblyIgnoredError =
channel
.write(in)
.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
failures.incrementAndGet();
}
}
});
}
channel.flush();
// Verify that the promises fail.
assertEquals(message.length(), failures.get());
}
/**
* The {@link FlowControlHandler} will simply pass-through all messages
* if auto reading is on and remains on.
*/
@Test
public void testFlowAutoReadOn() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
latch.countDown();
}
};
FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(true, flow, handler);
Channel client = newClient(server.localAddress());
try {
// Write the message
client.writeAndFlush(newOneMessage())
.syncUninterruptibly();
// We should receive 3 messages
assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());
} finally {
client.close();
server.close();
}
}
protected void handleInitChannel ( final SocketChannel ch )
{
// add the APCI/APDU handler
ch.pipeline ().addLast ( new APDUDecoder () );
ch.pipeline ().addLast ( new APDUEncoder () );
// add logging
if ( Boolean.getBoolean ( "org.eclipse.scada.protocol.iec60870.trace" ) )
{
ch.pipeline ().addLast ( new LoggingHandler ( LogLevel.TRACE ) );
}
final MessageChannel messageChannel = new MessageChannel ( this.options, this.manager );
// message channel
ch.pipeline ().addLast ( messageChannel );
// now add all server modules
for ( final ServerModule module : this.modules )
{
module.initializeChannel ( ch, messageChannel );
}
// finally add the default exception catcher
ch.pipeline ().addLast ( new ChannelDuplexHandler () {
@Override
public void exceptionCaught ( final ChannelHandlerContext ctx, final Throwable cause ) throws Exception
{
logger.warn ( "Close connection due to uncaught exception", cause );
ctx.close ();
}
} );
}
@Test
public void convertsEmbeddedChannelOutboundToList() {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelDuplexHandler());
channel.writeOutbound(1, 2, 3);
List<Object> list = EmbeddedChannelSupport.outbound(channel);
assertThat(list, contains(1, 2, 3));
}
@Override
public Subscriber<Long> createSubscriber() {
// Embedded channel requires at least one handler when it's created, but HandlerSubscriber
// needs the channels event loop in order to be created, so start with a dummy, then replace.
ChannelHandler dummy = new ChannelDuplexHandler();
EmbeddedChannel channel = new EmbeddedChannel(dummy);
HandlerSubscriber<Long> subscriber = new HandlerSubscriber<>(channel.eventLoop(), 2, 4);
channel.pipeline().replace(dummy, "subscriber", subscriber);
return new SubscriberWithChannel<>(channel, subscriber);
}
private void initChannel(ChannelPipeline pipeline) {
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
// max message size not set, so the default from Netty MQTT codec is used
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
// adding the idle state handler for timeout on CONNECT packet
pipeline.addBefore("handler", "idle", new IdleStateHandler(this.options.timeoutOnConnect(), 0, 0));
pipeline.addBefore("handler", "timeoutOnConnect", new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
// as MQTT 3.1.1 describes, if no packet is sent after a "reasonable" time (here CONNECT timeout)
// the connection is closed
ctx.channel().close();
}
}
}
});
}
private void initChannel(ChannelPipeline pipeline) {
// add into pipeline netty's (en/de)coder
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
// max message size not set, so the default from Netty MQTT codec is used
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveTimeSeconds() != 0) {
pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
ping();
}
}
}
});
}
}
@Test
public void addSeveralByteDecodersWhenCodec() {
ChannelHandler decoder1 = new LineBasedFrameDecoder(12);
ChannelHandler decoder2 = new LineBasedFrameDecoder(13);
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
testContext.addHandlerLast("decoder1$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR))
.addHandlerLast("decoder1", decoder1)
.addHandlerLast("decoder2$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR))
.addHandlerLast("decoder2", decoder2);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"decoder1$extract",
"decoder1",
"decoder2$extract",
"decoder2",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Override
public Subscriber<Long> createSubscriber() {
// Embedded channel requires at least one handler when it's created, but HandlerSubscriber
// needs the channels event loop in order to be created, so start with a dummy, then replace.
ChannelHandler dummy = new ChannelDuplexHandler();
EmbeddedChannel channel = new EmbeddedChannel(dummy);
HandlerSubscriber<Long> subscriber = new HandlerSubscriber<>(channel.eventLoop(), 2, 4);
channel.pipeline().replace(dummy, "subscriber", subscriber);
return new SubscriberWithChannel<>(channel, subscriber);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Initialize our session Object when the channel is initialized, attach
// it to the channel.
ch.attr(NetworkConstants.SESSION_KEY).setIfAbsent(new PlayerIO(ch));
// Initialize the pipeline channel handlers.
ChannelDuplexHandler timeout = new IdleStateHandler(NetworkConstants.INPUT_TIMEOUT, 0, 0);
ByteToMessageDecoder loginHandshakeHandler = new LoginHandshakeHandler();
ch.pipeline().addLast("login-handshake", loginHandshakeHandler);
ch.pipeline().addLast("channel-handler", channelHandler);
ch.pipeline().addLast("timeout", timeout);
}
@Test
public void uncaughtException_closeAtMostOnce() throws Exception {
final AtomicInteger closes = new AtomicInteger();
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelDuplexHandler() {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
closes.getAndIncrement();
// Simulates a loop between this handler and the WriteBufferingAndExceptionHandler.
ctx.fireExceptionCaught(Status.ABORTED.withDescription("zap").asRuntimeException());
super.close(ctx, promise);
}
});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
chan.connect(addr).sync();
chan.close().sync();
assertEquals(1, closes.get());
}
@Before
public void setup() throws Exception {
ChannelHandler uncaughtExceptionHandler =
new ChannelDuplexHandler() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
caughtException = cause;
super.exceptionCaught(ctx, cause);
ctx.close();
}
};
TsiHandshakerFactory handshakerFactory =
new DelegatingTsiHandshakerFactory(FakeTsiHandshaker.clientHandshakerFactory()) {
@Override
public TsiHandshaker newHandshaker(String authority) {
return new DelegatingTsiHandshaker(super.newHandshaker(authority)) {
@Override
public TsiPeer extractPeer() throws GeneralSecurityException {
return mockedTsiPeer;
}
@Override
public Object extractPeerObject() throws GeneralSecurityException {
return mockedAltsContext;
}
};
}
};
ManagedChannel fakeChannel = NettyChannelBuilder.forTarget("localhost:8080").build();
ObjectPool<Channel> fakeChannelPool = new FixedObjectPool<Channel>(fakeChannel);
LazyChannel lazyFakeChannel = new LazyChannel(fakeChannelPool);
ChannelHandler altsServerHandler = new ServerAltsProtocolNegotiator(
handshakerFactory, lazyFakeChannel)
.newHandler(grpcHandler);
// On real server, WBAEH fires default ProtocolNegotiationEvent. KickNH provides this behavior.
ChannelHandler handler = new KickNegotiationHandler(altsServerHandler);
channel = new EmbeddedChannel(uncaughtExceptionHandler, handler);
}
@Test
public void flushShouldFailAllPromises() throws Exception {
doHandshake();
channel
.pipeline()
.addFirst(
new ChannelDuplexHandler() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
throw new Exception("Fake exception");
}
});
// Write the message 1 character at a time.
String message = "hello";
final AtomicInteger failures = new AtomicInteger();
for (int ix = 0; ix < message.length(); ++ix) {
ByteBuf in = Unpooled.copiedBuffer(message, ix, 1, UTF_8);
@SuppressWarnings("unused") // go/futurereturn-lsc
Future<?> possiblyIgnoredError =
channel
.write(in)
.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
failures.incrementAndGet();
}
}
});
}
channel.flush();
// Verify that the promises fail.
assertEquals(message.length(), failures.get());
}
@Test
public void test_messages_not_sent_on_connack_fail() {
embeddedChannel.writeInbound(new CONNECT.Mqtt3Builder().withProtocolVersion(ProtocolVersion.MQTTv3_1_1).withClientIdentifier("clientID").build());
embeddedChannel.writeInbound(TestMessageUtil.createMqtt3Publish());
embeddedChannel.writeInbound(new SUBSCRIBE(ImmutableList.of(), 1));
assertEquals(2, messageBarrier.getQueue().size());
final AtomicInteger counter = new AtomicInteger(0);
embeddedChannel.pipeline().addFirst(new ChannelDuplexHandler() {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
counter.incrementAndGet();
}
});
embeddedChannel.writeOutbound(ConnackMessages.REFUSED_NOT_AUTHORIZED);
assertEquals(0, counter.get());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
// Add the text line codec combination first,分隔符就是"\r\n$end!"
pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,
new ByteBuf[]{
Unpooled.wrappedBuffer(Constant.RPC_DELIMITER.getBytes())
}
));
// the encoder and decoder are static as these are sharable
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
pipeline.addLast(/*IDLE_EVENT_HANDLER*/new RpcServerIdleStateHandler());
pipeline.addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.ALL_IDLE) {
LOG.info(new JSONObject() {{
put("type", "rpcIdle");
put("description", String.format("关闭闲置连接: timeOut=%sms", RpcServerIdleStateHandler.IDLE_TIMEOUT_IN_MILLI));
put("detail", ctx.channel().remoteAddress());
}});
ctx.close();
} else {
LOG.info("Nothing need to do: " + e.state().name());
}
}
}
});
// 注意:readHandler必须在idleHandler监听器之后!readHandler必须在idleHandler监听器之后!readHandler必须在idleHandler监听器之后!
// 接上:否则,就没法触发idleHandler了
pipeline.addLast(new RpcServerJsonDecoder());
pipeline.addLast(new RpcServerStreamHandler());
pipeline.addLast(new RpcServerDefaultHandler());
}
public MockChannelHandlerContext(Channel channel, String name, ChannelDuplexHandler handler) {
this.channel = channel;
this.name = name;
this.handler = handler;
}
/**
* Returns a duplex handler that traces {@link io.netty.handler.codec.http.HttpRequest} messages.
*/
public ChannelDuplexHandler serverHandler() {
return serverHandler;
}
@Test
public void test_messages_sent_on_connack_success() {
embeddedChannel.writeInbound(new CONNECT.Mqtt3Builder().withProtocolVersion(ProtocolVersion.MQTTv3_1_1).withClientIdentifier("clientID").build());
embeddedChannel.writeInbound(TestMessageUtil.createMqtt3Publish());
embeddedChannel.writeInbound(new SUBSCRIBE(ImmutableList.of(), 1));
assertEquals(2, messageBarrier.getQueue().size());
final AtomicInteger counter = new AtomicInteger(0);
embeddedChannel.pipeline().addAfter(MQTT_MESSAGE_BARRIER, "test", new ChannelDuplexHandler() {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof PUBLISH || msg instanceof SUBSCRIBE) {
counter.incrementAndGet();
}
}
});
embeddedChannel.writeOutbound(ConnackMessages.ACCEPTED_MSG_NO_SESS);
assertEquals(2, counter.get());
assertFalse(embeddedChannel.pipeline().names().contains(MQTT_MESSAGE_BARRIER));
}