下面列出了怎么用io.netty.channel.ChannelOutboundHandlerAdapter的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testFlushOutbound() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
}
});
channel.flushOutbound();
if (!latch.await(1L, TimeUnit.SECONDS)) {
fail("Nobody called #flush() in time.");
}
}
public WriteStreamSubscriberFutureListenersTest() {
listeners = new LinkedBlockingQueue<>();
channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
ChannelFutureListener listener = mock(ChannelFutureListener.class);
listeners.add(listener);
promise.addListener(listener);
super.write(ctx, msg, promise);
}
});
WriteDemandEstimator estimator = WriteDemandEstimators.newDefaultEstimator();
TestCompletableSubscriber completableSubscriber = new TestCompletableSubscriber();
subscriber = new WriteStreamSubscriber(channel, estimator, completableSubscriber,
UNSUPPORTED_PROTOCOL_CLOSE_HANDLER);
TestSubscription subscription = new TestSubscription();
subscriber.onSubscribe(subscription);
assertThat("No items requested.", subscription.requested(), greaterThan(0L));
}
@Override
protected void initChannel(Channel channel) {
// inbound
channel.pipeline().addLast("identificationHandler", new IdentificationHandler(nettyNetworking));
channel.pipeline().addLast("encryptionDecoder", new ChannelInboundHandlerAdapter());
channel.pipeline().addLast("byteStreamDecoder", new ByteStreamDecoder());
channel.pipeline().addLast("compressionDecoder", new ChannelInboundHandlerAdapter());
channel.pipeline().addLast("packetDataDecoder", new PacketDataDecoder(nettyNetworking.getProtocol()));
channel.pipeline().addLast("inboundPacketHandler", new InboundPacketHandler(nettyNetworking));
// outbound
channel.pipeline().addFirst("outboundPacketHandler", new OutboundPacketHandler(nettyNetworking));
channel.pipeline().addFirst("packetEncoder", new PacketEncoder(nettyNetworking.getProtocol()));
channel.pipeline().addFirst("compressionEncoder", new ChannelOutboundHandlerAdapter());
channel.pipeline().addFirst("byteStreamEncoder", new ByteStreamEncoder());
channel.pipeline().addFirst("encryptionEncoder", new ChannelOutboundHandlerAdapter());
}
private Future<Void> injectMqttClientPubAckBlocker(final AtomicBoolean outboundPubAckBlocked) {
// The vert.x MqttClient automatically sends a PubAck after having received a Qos 1 Publish message,
// as of now, there is no configuration option to prevent this (see https://github.com/vert-x3/vertx-mqtt/issues/120).
// Therefore the underlying NetSocket pipeline is used here to filter out the outbound PubAck messages.
try {
final Method connectionMethod = MqttClientImpl.class.getDeclaredMethod("connection");
connectionMethod.setAccessible(true);
final NetSocketInternal connection = (NetSocketInternal) connectionMethod.invoke(mqttClient);
connection.channelHandlerContext().pipeline().addBefore("handler", "OutboundPubAckBlocker",
new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
if (outboundPubAckBlocked.get() && msg instanceof io.netty.handler.codec.mqtt.MqttPubAckMessage) {
LOGGER.debug("suppressing PubAck, message id: {}", ((MqttPubAckMessage) msg).variableHeader().messageId());
} else {
super.write(ctx, msg, promise);
}
}
});
return Future.succeededFuture();
} catch (final Exception e) {
LOGGER.error("failed to inject PubAck blocking handler");
return Future.failedFuture(new Exception("failed to inject PubAck blocking handler", e));
}
}
private void doTestSslEngineClosed(HttpClient client, AtomicInteger closeCount, Class<? extends Throwable> expectedExc, String expectedMsg) {
Mono<String> response =
client.doOnChannelInit(
(o, c, address) ->
c.pipeline()
.addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
super.connect(ctx, remoteAddress, localAddress,
new TestPromise(ctx.channel(), promise, closeCount));
}
}))
.get()
.uri("/")
.responseContent()
.aggregate()
.asString();
StepVerifier.create(response)
.expectErrorMatches(t -> t.getClass().isAssignableFrom(expectedExc) && t.getMessage().startsWith(expectedMsg))
.verify(Duration.ofSeconds(30));
}
@Test
public void flushAfterCloseShouldWork() throws InterruptedException {
ByteBuf msg = Unpooled.copiedBuffer("message after handshake failed", CharsetUtil.UTF_8);
channel.write(msg);
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// We have to call flush while doing a close, since close() tears down the pipeline
// immediately after.
channel.flush();
super.close(ctx, promise);
}
});
assertThat(channel.outboundMessages()).isEmpty();
channel.close().sync();
Object actual = channel.readOutbound();
assertWithMessage("pending write should be flushed on close").that(actual).isEqualTo(msg);
channel.checkException();
}
@Test(timeout = 5000)
public void test_read_subscribe_context_has_interceptors_timeouts_failure() throws Exception {
final ClientContextImpl clientContext = new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());
final List<SubscribeInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();
clientContext.addSubscribeInboundInterceptor(isolatedInterceptors.get(2));
channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);
final CountDownLatch subackLatch = new CountDownLatch(1);
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof SUBACK && ((SUBACK) msg).getReasonCodes().get(0).equals(Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR)) {
subackLatch.countDown();
}
super.write(ctx, msg, promise);
}
});
when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);
channel.writeInbound(new SUBSCRIBE(1, new Topic("topic", QoS.AT_LEAST_ONCE, true, true, Mqtt5RetainHandling.SEND, 1)));
while (subackLatch.getCount() != 0) {
channel.runPendingTasks();
channel.runScheduledPendingTasks();
}
assertTrue(subackLatch.await(5, TimeUnit.SECONDS));
}
@Test(timeout = 5000)
public void test_read_subscribe_extension_null() throws Exception {
final ClientContextImpl clientContext = new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());
final List<SubscribeInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();
clientContext.addSubscribeInboundInterceptor(isolatedInterceptors.get(2));
channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);
final CountDownLatch subackLatch = new CountDownLatch(1);
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof SUBACK && ((SUBACK) msg).getReasonCodes().get(0).equals(Mqtt5SubAckReasonCode.GRANTED_QOS_1)) {
subackLatch.countDown();
}
super.write(ctx, msg, promise);
}
});
when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(null);
channel.writeInbound(new SUBSCRIBE(1, new Topic("topic", QoS.AT_LEAST_ONCE, true, true, Mqtt5RetainHandling.SEND, 1)));
while (messageAtomicReference.get() == null) {
channel.runPendingTasks();
channel.runScheduledPendingTasks();
}
final SUBSCRIBE message = (SUBSCRIBE) messageAtomicReference.get();
assertEquals("topic", message.getTopics().get(0).getTopic());
}
@Test(timeout = 5000)
public void test_read_publish_context_has_interceptors_timeouts_failure_mqtt5_success_ack() throws Exception {
final ClientContextImpl clientContext =
new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());
final List<PublishInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();
clientContext.addPublishInboundInterceptor(isolatedInterceptors.get(4));
channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);
final CountDownLatch pubackLatch = new CountDownLatch(1);
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof PUBACK) {
pubackLatch.countDown();
}
super.write(ctx, msg, promise);
}
});
when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);
channel.writeInbound(TestMessageUtil.createMqtt3Publish("topic", "payload".getBytes(), QoS.AT_LEAST_ONCE));
while (dropLatch.getCount() != 0) {
channel.runPendingTasks();
channel.runScheduledPendingTasks();
}
assertTrue(dropLatch.await(5, TimeUnit.SECONDS));
assertTrue(pubackLatch.await(5, TimeUnit.SECONDS));
}
private static ChannelInitializer trailerProtocolEndEventEmitter() {
return ch -> ch.pipeline()
.addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx,
final Object msg,
final ChannelPromise promise) {
if (TRAILER.equals(msg)) {
ctx.pipeline().fireUserEventTriggered(CloseHandler.ProtocolPayloadEndEvent.OUTBOUND);
}
ctx.write(msg, promise);
}
});
}
public void setUp(Predicate<Integer> terminalPredicate) throws Exception {
channel = new EmbeddedChannel();
NettyConnection<Integer, Object> connection =
DefaultNettyConnection.<Integer, Object>initChannel(channel, DEFAULT_ALLOCATOR,
immediate(), terminalPredicate, UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, defaultFlushStrategy(), null, channel ->
channel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
readRequested = true;
super.read(ctx);
}
}), OFFLOAD_ALL_STRATEGY, mock(Protocol.class)).toFuture().get();
publisher = connection.read();
channel.config().setAutoRead(false);
}
private static Bootstrap buildBootstrap(EventLoopGroup group, Codec codec, SocketAddress collectorAddress,
@Nullable String loggerName) {
if (!(collectorAddress instanceof InetSocketAddress)) {
throw new IllegalArgumentException("collectorAddress " + collectorAddress +
" is invalid for UDP");
}
return new Bootstrap()
.group(group)
.channel(datagramChannel(group))
.option(RCVBUF_ALLOCATOR, DEFAULT_RECV_BUF_ALLOCATOR)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(final Channel ch) {
if (loggerName != null) {
ch.pipeline().addLast(new LoggingHandler(loggerName, LogLevel.TRACE));
}
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof Span) {
byte[] bytes = codec.spanBytesEncoder().encode((Span) msg);
ByteBuf buf = ctx.alloc().buffer(bytes.length).writeBytes(bytes);
ctx.write(new DatagramPacket(buf, (InetSocketAddress) collectorAddress), promise);
} else {
ctx.write(msg, promise);
}
}
});
}
});
}
@Override
public void setEncryptionEnabled(boolean encryptionEnabled) {
if (isClosed()) return;
super.setEncryptionEnabled(encryptionEnabled);
channel.pipeline().replace("encryptionEncoder", "encryptionEncoder",
encryptionEnabled ? new EncryptionEncoder() : new ChannelOutboundHandlerAdapter());
channel.pipeline().replace("encryptionDecoder", "encryptionDecoder",
encryptionEnabled ? new EncryptionDecoder() : new ChannelInboundHandlerAdapter());
}
@Override
public void setCompressionEnabled(boolean compressionEnabled) {
if (isClosed()) return;
super.setCompressionEnabled(compressionEnabled);
channel.pipeline().replace("compressionEncoder", "compressionEncoder",
compressionEnabled ? new JdkZlibEncoder() : new ChannelOutboundHandlerAdapter());
channel.pipeline().replace("compressionDecoder", "compressionDecoder",
compressionEnabled ? new JdkZlibDecoder() : new ChannelInboundHandlerAdapter());
}
private void trackWritesForUdpClient(StatsdProtocol protocol, AtomicInteger writeCount) {
if (protocol == StatsdProtocol.UDP) {
await().until(() -> meterRegistry.client.get() != null);
((Connection) meterRegistry.client.get())
.addHandler(new LoggingHandler("udpclient", LogLevel.INFO))
.addHandler(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
writeCount.incrementAndGet();
super.write(ctx, msg, promise);
}
});
}
}
@Test
public void invalid_http_call_that_causes_Netty_DecoderFailure_should_result_in_expected_400_error() throws Exception {
// given
// Normal request, but fiddle with the first chunk as it's going out to remove the HTTP version and make it an
// invalid HTTP call. This will cause Netty to mark the HttpRequest with a DecoderFailure.
NettyHttpClientRequestBuilder request = request()
.withMethod(HttpMethod.GET)
.withUri(BasicEndpoint.MATCHING_PATH)
.withPipelineAdjuster(
p -> p.addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
String msgAsString = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);
if (msgAsString.contains("HTTP/1.1")) {
msg = Unpooled.copiedBuffer(msgAsString.replace("HTTP/1.1", ""), CharsetUtil.UTF_8);
}
super.write(ctx, msg, promise);
}
})
);
// when
NettyHttpClientResponse response = request.execute(downstreamServerConfig.endpointsPort(), 3000);
// then
verifyErrorReceived(response.payload,
response.statusCode,
new ApiErrorWithMetadata(SampleCoreApiError.MALFORMED_REQUEST,
Pair.of("cause", "Invalid HTTP request"))
);
}
public XmppChannelHandler(XmppDeviceFactory deviceFactory) {
ChannelInboundHandlerAdapter inboundHandlerAdapter = new ChannelInboundHandlerAdapter();
ChannelOutboundHandlerAdapter outboundHandlerAdapter = new ChannelOutboundHandlerAdapter();
this.init(inboundHandlerAdapter, outboundHandlerAdapter);
this.state = ChannelState.IDLE;
this.deviceFactory = deviceFactory;
}
@Test
public void sslEarlyHandshakeFailure() throws Exception {
EmbeddedChannel clientChannel = new EmbeddedChannel();
SSLEngine clientEngine = SslContextBuilder.forClient().build().newEngine(clientChannel.alloc());
clientChannel.pipeline().addLast(new SslHandler(clientEngine));
EmbeddedChannel serverChannel = new EmbeddedChannel();
SelfSignedCertificate cert = new SelfSignedCertificate("localhorse");
SSLEngine serverEngine = SslContextBuilder.forServer(cert.key(), cert.cert()).build()
.newEngine(serverChannel.alloc());
serverChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// Simulate an early closure form the client.
ReferenceCountUtil.safeRelease(msg);
promise.setFailure(new ClosedChannelException());
}
});
serverChannel.pipeline().addLast(new SslHandler(serverEngine));
serverChannel.pipeline().addLast(new SslHandshakeInfoHandler());
Object clientHello = clientChannel.readOutbound();
assertNotNull(clientHello);
ReferenceCountUtil.retain(clientHello);
serverChannel.writeInbound(clientHello);
// Assert that the handler removes itself from the pipeline, since it was torn down.
assertNull(serverChannel.pipeline().context(SslHandshakeInfoHandler.class));
}
@Test(timeout = 5000)
public void test_read_subscribe_context_has_interceptors_throws_exception() throws Exception {
final ClientContextImpl clientContext = new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());
final List<SubscribeInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();
clientContext.addSubscribeInboundInterceptor(isolatedInterceptors.get(1));
channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);
final CountDownLatch subackLatch = new CountDownLatch(1);
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof SUBACK && ((SUBACK) msg).getReasonCodes().get(0).equals(Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR)) {
subackLatch.countDown();
}
super.write(ctx, msg, promise);
}
});
when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);
channel.writeInbound(new SUBSCRIBE(1, new Topic("topic", QoS.AT_LEAST_ONCE, true, true, Mqtt5RetainHandling.SEND, 1)));
while (subackLatch.getCount() != 0) {
channel.runPendingTasks();
channel.runScheduledPendingTasks();
}
assertTrue(subackLatch.await(5, TimeUnit.SECONDS));
}
@Test(timeout = 5000)
public void test_read_subscribe_context_has_interceptors_timeouts_failure_mqtt3() throws Exception {
final ClientContextImpl clientContext = new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());
final List<SubscribeInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();
clientContext.addSubscribeInboundInterceptor(isolatedInterceptors.get(2));
channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv3_1_1);
final CountDownLatch subackLatch = new CountDownLatch(1);
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof SUBACK && ((SUBACK) msg).getReasonCodes().get(0).equals(Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR)) {
subackLatch.countDown();
}
super.write(ctx, msg, promise);
}
});
when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);
channel.writeInbound(new SUBSCRIBE(1, new Topic("topic", QoS.AT_LEAST_ONCE, true, true, Mqtt5RetainHandling.SEND, 1)));
while (subackLatch.getCount() != 0) {
channel.runPendingTasks();
channel.runScheduledPendingTasks();
}
assertTrue(subackLatch.await(5, TimeUnit.SECONDS));
}
@Test(timeout = 5000)
public void test_read_publish_context_has_interceptors_timeouts_failure_mqtt3_success_ack() throws Exception {
final ClientContextImpl clientContext =
new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());
final List<PublishInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();
clientContext.addPublishInboundInterceptor(isolatedInterceptors.get(4));
channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv3_1_1);
final CountDownLatch pubackLatch = new CountDownLatch(1);
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof PUBACK) {
pubackLatch.countDown();
}
super.write(ctx, msg, promise);
}
});
when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);
channel.writeInbound(TestMessageUtil.createMqtt3Publish("topic", "payload".getBytes(), QoS.AT_LEAST_ONCE));
while (dropLatch.getCount() != 0) {
channel.runPendingTasks();
channel.runScheduledPendingTasks();
}
assertTrue(dropLatch.await(5, TimeUnit.SECONDS));
assertTrue(pubackLatch.await(5, TimeUnit.SECONDS));
}