下面列出了怎么用io.netty.channel.ChannelHandlerAdapter的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
final HostPort hostPort = parseAuthority(handler.getAuthority());
ChannelHandler sslBootstrap = new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(), hostPort.host, hostPort.port);
SSLParameters sslParams = sslEngine.getSSLParameters();
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(sslParams);
ctx.pipeline().replace(this, null, new SslHandler(sslEngine, false));
}
};
return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler);
}
@Test
public void testPromiseSendTimeout() {
//use an extra handler
EmbeddedChannel channel = new EmbeddedChannel(new WriteTimeoutHandler(1), new ChannelHandlerAdapter() {});
Flux<String> flux = Flux.range(0, 257).map(count -> count + "");
Mono<Void> m = MonoSendMany.objectSource(flux, channel, b -> false);
StepVerifier.create(m)
.then(() -> {
channel.runPendingTasks(); //run flush
for (int i = 0; i < 257; i++) {
assertThat(channel.<String>readOutbound()).isEqualTo(i + "");
}
})
.verifyComplete();
}
@Test
public void cleanupFuseableSyncCloseFuture() {
//use an extra handler
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandlerAdapter() {});
Mono<Void> m = MonoSendMany.objectSource(Flux.fromArray(new String[]{"test", "test2"}), channel, b -> false);
List<WeakReference<Subscription>> _w = new ArrayList<>(1);
StepVerifier.create(m)
.consumeSubscriptionWith(s -> _w.add(new WeakReference<>(s)))
.then(() -> {
channel.runPendingTasks();
assertThat(channel.<String>readOutbound()).isEqualToIgnoringCase("test");
assertThat(channel.<String>readOutbound()).isEqualToIgnoringCase("test2");
})
.verifyComplete();
System.gc();
wait(_w.get(0));
}
@Test
public void cleanupFuseableAsyncCloseFuture() {
//use an extra handler
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandlerAdapter() {});
Mono<Void> m = MonoSendMany.objectSource(Flux.fromArray(new String[]{"test", "test2"}).limitRate(10), channel, b -> false);
List<WeakReference<Subscription>> _w = new ArrayList<>(1);
StepVerifier.create(m)
.consumeSubscriptionWith(s -> _w.add(new WeakReference<>(s)))
.then(() -> {
channel.runPendingTasks();
assertThat(channel.<String>readOutbound()).isEqualToIgnoringCase("test");
assertThat(channel.<String>readOutbound()).isEqualToIgnoringCase("test2");
})
.verifyComplete();
System.gc();
wait(_w.get(0));
}
@Test
public void cleanupFuseableErrorCloseFuture() {
//use an extra handler
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandlerAdapter() {});
Mono<Void> m = MonoSendMany.objectSource(Flux.fromArray(new String[]{"test", "test2"}).concatWith(Mono.error(new Exception("boo"))).limitRate(10), channel, b -> false);
List<WeakReference<Subscription>> _w = new ArrayList<>(1);
StepVerifier.create(m)
.consumeSubscriptionWith(s -> _w.add(new WeakReference<>(s)))
.then(() -> {
channel.runPendingTasks();
assertThat(channel.<String>readOutbound()).isEqualToIgnoringCase("test");
assertThat(channel.<String>readOutbound()).isEqualToIgnoringCase("test2");
})
.verifyErrorMessage("boo");
System.gc();
wait(_w.get(0));
}
@Test
public void cleanupCancelCloseFuture() {
//use an extra handler
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandlerAdapter() {});
Mono<Void> m = MonoSendMany.objectSource(Flux.fromArray(new String[]{"test", "test2"}).concatWith(Mono.never()), channel, b -> false);
List<WeakReference<Subscription>> _w = new ArrayList<>(1);
StepVerifier.create(m)
.consumeSubscriptionWith(s -> _w.add(new WeakReference<>(s)))
.then(channel::runPendingTasks)
.thenCancel()
.verify();
System.gc();
wait(_w.get(0));
}
@Test
public void cleanupErrorCloseFuture() {
//use an extra handler
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandlerAdapter() {});
Mono<Void> m = MonoSendMany.objectSource(Mono.error(new Exception("boo")), channel, b -> false);
List<WeakReference<Subscription>> _w = new ArrayList<>(1);
StepVerifier.create(m)
.consumeSubscriptionWith(s -> _w.add(new WeakReference<>(s)))
.then(channel::runPendingTasks)
.verifyErrorMessage("boo");
System.gc();
wait(_w.get(0));
}
@Test
public void addByteDecoderWhenNoLeft() {
channel.pipeline()
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new LineBasedFrameDecoder(12);
testContext.addHandlerLast("decoder", decoder)
.addHandlerFirst("decoder$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR));
assertEquals(channel.pipeline()
.names(),
Arrays.asList("decoder$extract",
"decoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteDecoderWhenNoRight() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new LineBasedFrameDecoder(12);
testContext.addHandlerLast("decoder", decoder)
.addHandlerFirst("decoder$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR));
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
"decoder$extract",
"decoder",
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteDecoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new LineBasedFrameDecoder(12);
testContext.addHandlerLast("decoder", decoder)
.addHandlerFirst("decoder$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR));
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"decoder$extract",
"decoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addNonByteDecoderWhenNoLeft() {
channel.pipeline()
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new ChannelHandlerAdapter() {
};
testContext.addHandlerLast("decoder", decoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList("decoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addNonByteDecoderWhenNoRight() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new ChannelHandlerAdapter() {
};
testContext.addHandlerLast("decoder", decoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
"decoder",
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addNonByteDecoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler decoder = new ChannelHandlerAdapter() {
};
testContext.addHandlerLast("decoder", decoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"decoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteEncoderWhenNoLeft() {
channel.pipeline()
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new LineBasedFrameDecoder(12);
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList("encoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteEncoderWhenNoRight() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new LineBasedFrameDecoder(12);
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
"encoder",
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addByteEncoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new LineBasedFrameDecoder(12);
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"encoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addNonByteEncoderWhenNoLeft() {
channel.pipeline()
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new ChannelHandlerAdapter() {
};
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList("encoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addNonByteEncoderWhenNoRight() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new ChannelHandlerAdapter() {
};
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
"encoder",
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addNonByteEncoderWhenFullReactorPipeline() {
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
ChannelHandler encoder = new ChannelHandlerAdapter() {
};
testContext.addHandlerFirst("encoder", encoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"encoder",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addSeveralByteEncodersWhenCodec() {
ChannelHandler encoder1 = new LineBasedFrameDecoder(12);
ChannelHandler encoder2 = new LineBasedFrameDecoder(13);
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
testContext.addHandlerFirst("encoder1", encoder1)
.addHandlerFirst("encoder2", encoder2);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"encoder2",
"encoder1",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}
ClientSdsHandler(
GrpcHttp2ConnectionHandler grpcHandler, UpstreamTlsContext upstreamTlsContext) {
super(
// superclass (InternalProtocolNegotiators.ProtocolNegotiationHandler) expects 'next'
// handler but we don't have a next handler _yet_. So we "disable" superclass's behavior
// here and then manually add 'next' when we call fireProtocolNegotiationEvent()
new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(this);
}
});
checkNotNull(grpcHandler, "grpcHandler");
this.grpcHandler = grpcHandler;
this.upstreamTlsContext = upstreamTlsContext;
}
ServerSdsHandler(
GrpcHttp2ConnectionHandler grpcHandler,
DownstreamTlsContext downstreamTlsContext,
ProtocolNegotiator fallbackProtocolNegotiator) {
super(
// superclass (InternalProtocolNegotiators.ProtocolNegotiationHandler) expects 'next'
// handler but we don't have a next handler _yet_. So we "disable" superclass's behavior
// here and then manually add 'next' when we call fireProtocolNegotiationEvent()
new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(this);
}
});
checkNotNull(grpcHandler, "grpcHandler");
this.grpcHandler = grpcHandler;
this.downstreamTlsContext = downstreamTlsContext;
this.fallbackProtocolNegotiator = fallbackProtocolNegotiator;
}
@Test
public void connectionFailuresPropagated() throws Exception {
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
// Write before connect. In the event connect fails, the pipeline is torn down and the handler
// won't be able to fail the writes with the correct exception.
ChannelFuture wf = chan.writeAndFlush(new Object());
chan.connect(new LocalAddress("bogus"));
try {
wf.sync();
fail();
} catch (Exception e) {
assertThat(e).isInstanceOf(ConnectException.class);
assertThat(e).hasMessageThat().contains("connection refused");
}
}
/**
* Create a server plaintext handler for gRPC.
*/
public static ProtocolNegotiator serverPlaintext() {
return new ProtocolNegotiator() {
@Override
public Handler newHandler(final GrpcHttp2ConnectionHandler handler) {
class PlaintextHandler extends ChannelHandlerAdapter implements Handler {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Set sttributes before replace to be sure we pass it before accepting any requests.
handler.handleProtocolNegotiationCompleted(Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.build(),
/*securityInfo=*/ null);
// Just replace this handler with the gRPC handler.
ctx.pipeline().replace(this, null, handler);
}
@Override
public AsciiString scheme() {
return Utils.HTTP;
}
}
return new PlaintextHandler();
}
@Override
public void close() {}
};
}
private static ChannelHandler checkSharable(ChannelHandler handler) {
if ((handler instanceof ChannelHandlerAdapter && !((ChannelHandlerAdapter) handler).isSharable()) &&
!handler.getClass().isAnnotationPresent(ChannelHandler.Sharable.class)) {
throw new IllegalArgumentException("The handler must be Sharable");
}
return handler;
}
@Test(timeout = 5000)
public void testHandlerRegister() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
LocalEventLoopGroup group = new LocalEventLoopGroup(1);
try {
ServerBootstrap sb = new ServerBootstrap();
sb.channel(LocalServerChannel.class)
.group(group)
.childHandler(new ChannelInboundHandlerAdapter())
.handler(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
try {
assertTrue(ctx.executor().inEventLoop());
} catch (Throwable cause) {
error.set(cause);
} finally {
latch.countDown();
}
}
});
sb.register().syncUninterruptibly();
latch.await();
assertNull(error.get());
} finally {
group.shutdownGracefully();
}
}
@Before
public void setUp() throws Exception {
master = mock(NitmProxyMaster.class);
when(master.config()).thenReturn(new NitmProxyConfig());
when(master.handler(any(), any(), any())).thenAnswer(m -> new ChannelHandlerAdapter() {
});
inboundChannel = new EmbeddedChannel();
}
@Override
protected void initChannel(final SocketChannel channel) {
final ChannelPipeline pipeline = channel.pipeline();
final ChannelHandlerAdapter handler;
final boolean degzip;
if (Handlers.isActive("capture")) {
degzip = true;
handler = new DefaultResponseLocatorCapturingHandler(api);
} else if (Handlers.isActive("passthrough")) {
degzip = false;
handler = new PassthroughHandler(api);
} else {
degzip = true;
handler = new ServingProxyHandler(api);
}
pipeline
.addLast("logging", new LoggingHandler(LogLevel.valueOf(api.getLogLevel())))
.addLast("http-decoder", new HttpRequestDecoder());
if (degzip) {
pipeline.addLast("gzip-decompressor", new HttpContentDecompressor());
}
pipeline
.addLast("http-encoder", new HttpResponseEncoder())
.addLast("gzip-compressor", new HttpContentCompressor())
.addLast("aggregator", new HttpObjectAggregator(Integer.MAX_VALUE))
.addLast("chunked-writer", new ChunkedWriteHandler())
.addLast("talend-junit-api-server", handler);
}
@Test
public void addNonByteDecoderWhenEmptyPipeline() {
ChannelHandler decoder = new ChannelHandlerAdapter() {
};
testContext.addHandlerLast("decoder", decoder);
assertEquals(channel.pipeline()
.names(),
Arrays.asList("decoder", "DefaultChannelPipeline$TailContext#0"));
}
@Test
public void addSeveralByteDecodersWhenCodec() {
ChannelHandler decoder1 = new LineBasedFrameDecoder(12);
ChannelHandler decoder2 = new LineBasedFrameDecoder(13);
channel.pipeline()
.addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
.addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
.addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
});
testContext.addHandlerLast("decoder1$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR))
.addHandlerLast("decoder1", decoder1)
.addHandlerLast("decoder2$extract",
NettyPipeline.inboundHandler(ADD_EXTRACTOR))
.addHandlerLast("decoder2", decoder2);
assertEquals(channel.pipeline()
.names(),
Arrays.asList(NettyPipeline.HttpCodec,
NettyPipeline.HttpTrafficHandler,
"decoder1$extract",
"decoder1",
"decoder2$extract",
"decoder2",
NettyPipeline.ReactiveBridge,
"DefaultChannelPipeline$TailContext#0"));
}