下面列出了怎么用io.netty.channel.SimpleChannelInboundHandler的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("deprecation")
private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final CountDownLatch latch)
throws Throwable {
sb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
ByteBuf buf = msg.content();
assertEquals(bytes.length, buf.readableBytes());
for (byte b : bytes) {
assertEquals(b, buf.readByte());
}
latch.countDown();
}
});
}
});
return sb.bind(newSocketAddress()).sync().channel();
}
TestReceiver(SpanBytesDecoder decoder) throws Exception {
channel = new Bootstrap()
.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.RCVBUF_ALLOCATOR, DEFAULT_RECV_BUF_ALLOCATOR)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
byte[] b = new byte[msg.content().readableBytes()];
msg.content().readBytes(b);
decoder.decode(b, queue);
}
});
}
})
.localAddress(localAddress(0))
.bind().sync().channel();
}
private Flux<HttpObject> channelRequestResponse(Channel channel, FullHttpRequest request) {
return Flux.create(sink -> {
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
sink.next(msg);
if (msg instanceof DefaultHttpResponse) {
DefaultHttpResponse response = (DefaultHttpResponse) msg;
if (response.decoderResult().isFailure()) {
sink.error(response.decoderResult().cause());
}
}
if (msg instanceof LastHttpContent) {
sink.complete();
}
}
});
channel.writeAndFlush(request);
});
}
@Test
public void completesFutureOnlyAfterContentObservableIsCompleted() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new SimpleChannelInboundHandler<LiveHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LiveHttpResponse response) throws Exception {
HttpResponseWriter writer = new HttpResponseWriter(ctx);
CompletableFuture<Void> future = writer.write(response);
assertThat(future.isDone(), is(false));
contentObservable.onNext(new Buffer("aaa", UTF_8));
assertThat(future.isDone(), is(false));
contentObservable.onComplete();
assertThat(future.isDone(), is(true));
channelRead.set(true);
}
}
);
ch.writeInbound(response(OK).body(new ByteStream(contentObservable)).build());
assertThat(channelRead.get(), is(true));
}
@Test
public void failsTheResultWhenResponseWriteFails() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new CaptureChannelArgumentsHandler(channelArgs),
new SimpleChannelInboundHandler<LiveHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LiveHttpResponse response) throws Exception {
HttpResponseWriter writer = new HttpResponseWriter(ctx);
CompletableFuture<Void> future = writer.write(response);
assertThat(future.isDone(), is(false));
writeError(channelArgs);
assertThat(future.isDone(), is(true));
future.get(200, MILLISECONDS);
}
}
);
assertThrows(ExecutionException.class,
() -> ch.writeInbound(response(OK).body(new ByteStream(contentObservable)).build()));
}
public static Bootstrap createNettyHttpClientBootstrap() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
p.addLast("clientResponseHandler", new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
throw new RuntimeException("Client response handler was not setup before the call");
}
});
}
});
return bootstrap;
}
public static CompletableFuture<NettyHttpClientResponse> setupNettyHttpClientResponseHandler(
Channel ch, Consumer<ChannelPipeline> pipelineAdjuster
) {
CompletableFuture<NettyHttpClientResponse> responseFromServerFuture = new CompletableFuture<>();
ch.pipeline().replace("clientResponseHandler", "clientResponseHandler", new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
throws Exception {
if (msg instanceof FullHttpResponse) {
// Store the proxyServer response for asserting on later.
responseFromServerFuture.complete(new NettyHttpClientResponse((FullHttpResponse) msg));
} else {
// Should never happen.
throw new RuntimeException("Received unexpected message type: " + msg.getClass());
}
}
});
if (pipelineAdjuster != null)
pipelineAdjuster.accept(ch.pipeline());
return responseFromServerFuture;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (log.isDebugEnabled()) {
log.error("Unhandled exception caught within the pipeline {} for Channel {}, Id: {}", cause, ctx.channel(), ctx.channel().id());
if (ctx.channel().hasAttr(ChannelAttributes.LAST_REQUEST_SENT)) {
AbstractRequest request = ctx.channel().attr(ChannelAttributes.LAST_REQUEST_SENT).get();
if (request != null && SocketChannel.class.isAssignableFrom(ctx.channel().getClass())) {
Throwable ex = new ResponseException(request, cause);
SimpleChannelInboundHandler responseRouter = ctx.pipeline().get(SimpleChannelInboundHandler.class);
responseRouter.channelRead(ctx, ex);
return;
}
}
throw new TransportException(cause);
}
}
private CompletableFuture<Void> bootstrapServer() {
Bootstrap serverBootstrap = new Bootstrap()
.group(group)
.channelFactory(() -> new NioDatagramChannel(InternetProtocolFamily.IPv4))
.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Nothing will be sent.
}
})
.option(ChannelOption.IP_MULTICAST_IF, iface)
.option(ChannelOption.SO_REUSEADDR, true);
CompletableFuture<Void> future = new CompletableFuture<>();
serverBootstrap.bind(localAddress).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
serverChannel = f.channel();
future.complete(null);
} else {
future.completeExceptionally(f.cause());
}
});
return future;
}
private CompletableFuture<Void> bootstrap() {
Bootstrap serverBootstrap = new Bootstrap()
.group(group)
.channel(NioDatagramChannel.class)
.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(ChannelHandlerContext context, DatagramPacket packet) throws Exception {
byte[] payload = new byte[packet.content().readInt()];
packet.content().readBytes(payload);
Message message = SERIALIZER.decode(payload);
Map<BiConsumer<Address, byte[]>, Executor> listeners = NettyUnicastService.this.listeners.get(message.subject());
if (listeners != null) {
listeners.forEach((consumer, executor) ->
executor.execute(() -> consumer.accept(message.source(), message.payload())));
}
}
})
.option(ChannelOption.RCVBUF_ALLOCATOR, new DefaultMaxBytesRecvByteBufAllocator())
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_REUSEADDR, true);
return bind(serverBootstrap);
}
@Test(expected = UploadTimeoutException.class, timeout = 30000)
public void uploadTimeout() throws Exception {
ServerChannel server = null;
try {
server =
testServer.start(
new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
// Don't respond and force a client timeout.
}
});
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
getFromFuture(blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)));
fail("Exception expected");
} finally {
testServer.stop(server);
}
}
@Test(expected = DownloadTimeoutException.class, timeout = 30000)
public void downloadTimeout() throws Exception {
ServerChannel server = null;
try {
server =
testServer.start(
new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
// Don't respond and force a client timeout.
}
});
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
getFromFuture(blobStore.downloadBlob(DIGEST, new ByteArrayOutputStream()));
fail("Exception expected");
} finally {
testServer.stop(server);
}
}
private ServerBootstrap createTCPListeningPoint(final SimpleChannelInboundHandler<SipMessageEvent> handler) {
final ServerBootstrap b = new ServerBootstrap();
b.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(final SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new SipMessageStreamDecoder());
pipeline.addLast("encoder", new SipMessageEncoder());
pipeline.addLast("handler", handler);
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
return b;
}
@Before
public void setup() throws Exception {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpContentDecompressor());
p.addLast(new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
response = prepareResponse(ctx, msg, response);
}
});
}
});
channel = b.connect(HOST, PORT)
.sync()
.channel();
}
public static void start(ModbusConfig cfg) throws Exception {
ModbusConstants.MASTER_SHOW_DEBUG_LOG = cfg.showDebugLog;
ModbusConstants.DEFAULT_UNIT_IDENTIFIER = cfg.unit_IDENTIFIER;
DeviceCommandPluginRegister.getInstance().reg(DeviceCommandV1PluginImpl.class.newInstance());
DeviceRepositoryPluginRegister.getInstance().reg(DeviceRepositoryV1PluginImpl.class.newInstance());
ModbusSetup setup = new ModbusSetup();
setup.setHandler(null, new CustomModbusMasterResponseHandler(cfg.transactionIdentifierOffset));
setup.setupServer4Master(cfg.port);
Collection<Channel> channels = setup.getModbusServer().getChannels();
UdpServer udpServer = new UdpServer();
SimpleChannelInboundHandler<DatagramPacket> handler = new UdpServerHandler4SendToServer(channels);
udpServer.setup(cfg.udpPort, handler);
int sleep = cfg.sleep;
if (cfg.autoSend) {
Thread.sleep(sleep);
ModbusMasterSchedule4DeviceId modbusMasterSchedule4DeviceId = new ModbusMasterSchedule4DeviceId();
modbusMasterSchedule4DeviceId.run(channels);
modbusMasterSchedule4DeviceId.schedule(channels, sleep * 5);
ModbusMasterSchedule4All modbusMasterSchedule4All = new ModbusMasterSchedule4All();
modbusMasterSchedule4All.schedule(channels, sleep);
}
Runnable runnable = () -> ConsoleUtil.clearConsole(true);
ScheduledUtil.scheduleWithFixedDelay(runnable, sleep * 5);
}
public void setup(int port, SimpleChannelInboundHandler<DatagramPacket> handler) throws InterruptedException {
Bootstrap b = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(handler);
b.bind(port).sync();// .channel().closeFuture().await();
logger.info(String.format("UdpServer bind:%s", port));
}
@Override
protected void initChannel(Channel channel) throws Exception {
InetSocketAddress address = (InetSocketAddress) channel.remoteAddress();
Address clientAddress = new Address(address.getHostName(), address.getPort());
channel.pipeline().addLast(
master.proxyHandler(clientAddress),
new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o)
throws Exception {
LOGGER.info("[Client ({})] => Unhandled inbound: {}", clientAddress, o);
}
});
}
@Test
public void completesFutureOnlyAfterAllWritesAreSuccessfullyCompleted() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new CaptureChannelArgumentsHandler(channelArgs),
new LoggingHandler(),
new SimpleChannelInboundHandler<LiveHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LiveHttpResponse response) throws Exception {
HttpResponseWriter writer = new HttpResponseWriter(ctx);
CompletableFuture<Void> future = writer.write(response);
assertThat(future.isDone(), is(false));
contentObservable.onNext(new Buffer("aaa", UTF_8));
assertThat(future.isDone(), is(false));
contentObservable.onComplete();
assertThat(future.isDone(), is(false));
writeAck(channelArgs); // For response headers
writeAck(channelArgs); // For content chunk
writeAck(channelArgs); // For EMPTY_LAST_CHUNK
assertThat(future.isDone(), is(true));
channelRead.set(true);
}
}
);
ch.writeInbound(response(OK).body(new ByteStream(contentObservable)).build());
assertThat(channelRead.get(), is(true));
}
@Test
public void failsTheResultWhenContentWriteFails() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new CaptureChannelArgumentsHandler(channelArgs),
new SimpleChannelInboundHandler<LiveHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LiveHttpResponse response) throws Exception {
HttpResponseWriter writer = new HttpResponseWriter(ctx);
CompletableFuture<Void> future = writer.write(response);
writeAck(channelArgs);
assertThat(future.isDone(), is(false));
contentObservable.onNext(new Buffer("aaa", UTF_8));
assertThat(future.isDone(), is(false));
contentObservable.onComplete();
assertThat(future.isDone(), is(false));
writeError(channelArgs);
assertThat(future.isDone(), is(true));
future.get(200, MILLISECONDS);
}
}
);
assertThrows(ExecutionException.class,
() -> ch.writeInbound(response(OK).body(new ByteStream(contentObservable)).build()));
}
@Test
public void sendsEmptyLastHttpContentWhenContentObservableCompletes() throws Exception {
CaptureHttpResponseWriteEventsHandler writeEventsCollector = new CaptureHttpResponseWriteEventsHandler();
EmbeddedChannel ch = new EmbeddedChannel(
new CaptureChannelArgumentsHandler(channelArgs),
writeEventsCollector,
new SimpleChannelInboundHandler<LiveHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LiveHttpResponse response) throws Exception {
HttpResponseWriter writer = new HttpResponseWriter(ctx);
CompletableFuture<Void> future = writer.write(response);
writeAck(channelArgs);
assertThat(future.isDone(), is(false));
contentObservable.onComplete();
assertThat(future.isDone(), is(false));
writeAck(channelArgs);
assertThat(future.isDone(), is(true));
channelRead.set(true);
}
}
);
ch.writeInbound(response(OK).body(new ByteStream(contentObservable)).build());
assertThat(channelRead.get(), is(true));
List<Object> writeEvents = writeEventsCollector.writeEvents();
assertThat(writeEvents.get(0), instanceOf(DefaultHttpResponse.class));
assertThat(writeEvents.get(1), is(EMPTY_LAST_CONTENT));
}
@Test
public void unsubscribesFromContentWhenCancelled() throws Exception {
CaptureHttpResponseWriteEventsHandler writeEventsCollector = new CaptureHttpResponseWriteEventsHandler();
AtomicBoolean unsubscribed = new AtomicBoolean(false);
EmbeddedChannel ch = new EmbeddedChannel(
new CaptureChannelArgumentsHandler(channelArgs),
writeEventsCollector,
new SimpleChannelInboundHandler<LiveHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LiveHttpResponse response) throws Exception {
HttpResponseWriter writer = new HttpResponseWriter(ctx);
CompletableFuture<Void> future = writer.write(response);
writeAck(channelArgs);
assertThat(future.isDone(), is(false));
future.cancel(false);
assertThat(unsubscribed.get(), is(true));
assertThat(future.isDone(), is(true));
channelRead.set(true);
}
}
);
ch.writeInbound(response(OK).body(new ByteStream(contentObservable.doOnCancel(() -> unsubscribed.set(true)))).build());
assertThat(channelRead.get(), is(true));
}
@Test
public void logsSentAndAcknowledgedBytes() {
EmbeddedChannel ch = new EmbeddedChannel(
new SimpleChannelInboundHandler<LiveHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LiveHttpResponse response) throws Exception {
HttpResponseWriter writer = new HttpResponseWriter(ctx);
CompletableFuture<Void> future = writer.write(response);
assertThat(future.isDone(), is(false));
contentObservable.onNext(new Buffer("aaa", UTF_8));
assertThat(future.isDone(), is(false));
contentObservable.onNext(new Buffer("bbbb", UTF_8));
assertThat(future.isDone(), is(false));
contentObservable.onError(new TransportLostException(
new InetSocketAddress(getLoopbackAddress(), 5050),
newOriginBuilder("localhost", 5050).build()));
assertThat(future.isDone(), is(true));
channelRead.set(true);
}
}
);
ch.writeInbound(response(OK).body(new ByteStream(contentObservable)).build());
assertThat(LOGGER.lastMessage(), is(
loggingEvent(
Level.WARN,
"Content observable error. Written content bytes 7/7 \\(ackd/sent\\). Write events 3/3 \\(ackd/writes\\).*",
TransportLostException.class,
"Connection to origin lost. origin=\"generic-app:anonymous-origin:localhost:5050\", remoteAddress=\"localhost/127.0.0.1:5050.*")));
}
public UDPConsumingServer(
boolean enableEpoll,
int numThreads,
List<InetSocketAddress> addresses,
SimpleChannelInboundHandler<DatagramPacket> handler
) {
super(
enableEpoll,
numThreads,
addresses
);
this.handler = handler;
}
@Test
public void testSslRequest() throws InterruptedException {
CountDownLatch receivedResponse = new CountDownLatch(2);
final ConcurrentLinkedQueue<HttpObject> responses = new ConcurrentLinkedQueue<>();
ChannelHandler responseHandler =
new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
responses.add(msg);
receivedResponse.countDown();
}
};
ClientConfig config = ClientConfig.fromConfig("xio.h1TestClient");
XioClientBootstrap bootstrap =
new XioClientBootstrap(config)
.channelConfig(ChannelConfiguration.clientConfig(1))
.handler(responseHandler);
HttpClientBuilder builder = new HttpClientBuilder(bootstrap);
URL url = server.url("/hello-world").url();
HttpClient client = builder.endpointForUrl(url).build();
client.write(Http.get("/hello-world"));
Uninterruptibles.awaitUninterruptibly(receivedResponse); // block
// check request
RecordedRequest request1 = server.takeRequest();
assertEquals("/hello-world", request1.getPath());
// check response
assertEquals(HttpResponseStatus.OK, ((HttpResponse) responses.poll()).status());
}
private SimpleChannelInboundHandler<ChicagoMessage> newReader() {
return new SimpleChannelInboundHandler<ChicagoMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ChicagoMessage msg) throws Exception {
Promise<WriteResult> result = resultMap.get(msg.id);
if (result != null) {
System.out.println("Got result for id " + msg.id);
result.setSuccess(new WriteResult());
} else {
System.out.println("Couldn't find result for id " + msg.id);
}
}
};
}
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){
return new Bootstrap().group(loopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler)
.bind(port);
}
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){
return new Bootstrap().group(loopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler)
.bind(port);
}
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){
return new Bootstrap().group(loopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler)
.bind(port);
}
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){
return new Bootstrap().group(loopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler)
.bind(port);
}
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){
return new Bootstrap().group(loopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler)
.bind(port);
}