下面列出了io.netty.channel.ChannelPipeline#addAfter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void addPipeline(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyReaderIdleTimeSeconds, 0, 0));
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
if(autoFlushIdleTime > 0) {
pipeline.addLast("autoflush", new MqttAutoFlushChannelHandler(autoFlushIdleTime, TimeUnit.SECONDS));
}
pipeline.addLast("decoder", new MqttDecoder(messageMaxLength));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("messageLogger",mqttMessageLoggerChannelHandler);
if(isEnableMetrics()) {
if(mqttDropWizardMetricsChannelHandler == null) {
mqttDropWizardMetricsChannelHandler = new MqttDropWizardMetricsChannelHandler();
mqttDropWizardMetricsChannelHandler.init(metricsLibratoEmail, metricsLibratoToken, metricsLibratoSource);
}
pipeline.addLast("wizardMetrics", mqttDropWizardMetricsChannelHandler);
}
pipeline.addLast("handler", mqttServerChannelHandler);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object obj)
throws Exception {
if (obj instanceof HttpMessage && !isWebSocket((HttpMessage) obj)) {
ChannelPipeline pipeline = ctx.pipeline();
ChannelHandler authHandler = pipeline.get(HTTP_AUTH);
if (authHandler != null) {
authHandler = pipeline.remove(HTTP_AUTH);
} else {
authHandler = new HttpBasicAuthHandler(
this.authenticator, this.authenticationSettings);
}
pipeline.addAfter(AUTHENTICATOR, HTTP_AUTH, authHandler);
ctx.fireChannelRead(obj);
} else {
super.channelRead(ctx, obj);
}
}
protected void setProtocol(Channel channel, ProtocolVersion version) {
ConnectionImpl connection = prepare(channel, version);
IPipeLineBuilder builder = InitialPacketDecoder.BUILDERS.get(connection.getVersion());
builder.buildBungeeClientCodec(channel, connection);
if (encapsulatedinfo == null) {
builder.buildBungeeClientPipeLine(channel, connection);
} else {
ChannelPipeline pipeline = channel.pipeline();
pipeline.replace(PipelineUtils.FRAME_DECODER, PipelineUtils.FRAME_DECODER, new VarIntFrameDecoder());
if (encapsulatedinfo.hasCompression()) {
pipeline.addAfter(PipelineUtils.FRAME_DECODER, "decompress", new PacketDecompressor());
pipeline.addAfter(PipelineUtils.FRAME_PREPENDER, "compress", new PacketCompressor(256));
}
if ((encapsulatedinfo.getAddress() != null) && connection.getRawAddress().getAddress().isLoopbackAddress()) {
connection.changeAddress(encapsulatedinfo.getAddress());
}
}
buffer.readerIndex(0);
channel.pipeline().firstContext().fireChannelRead(buffer.unwrap());
}
@Override
public void buildCodec(Channel channel, ConnectionImpl connection) {
ChannelPipeline pipeline = channel.pipeline();
PacketDecoder decoder = new PacketDecoder(connection);
PacketEncoder encoder = new PacketEncoder(connection);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_RECEIVE, ChannelHandlers.DECODER_TRANSFORMER, decoder);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_SEND, ChannelHandlers.ENCODER_TRANSFORMER, encoder);
connection.initCodec(PacketCodec.instance, encoder, decoder);
}
@Override
public void buildCodec(Channel channel, ConnectionImpl connection) {
ChannelPipeline pipeline = channel.pipeline();
PacketDecoder decoder = new PacketDecoder(connection);
PacketEncoder encoder = new PacketEncoder(connection);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_RECEIVE, ChannelHandlers.DECODER_TRANSFORMER, decoder);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_SEND, ChannelHandlers.ENCODER_TRANSFORMER, encoder);
connection.initCodec(PacketCodec.instance, encoder, decoder);
}
protected void releaseHandlers(PooledConnection conn) {
final ChannelPipeline pipeline = conn.getChannel().pipeline();
removeHandlerFromPipeline(OriginResponseReceiver.CHANNEL_HANDLER_NAME, pipeline);
// The Outbound handler is always after the inbound handler, so look for it.
ChannelHandlerContext passportStateHttpClientHandlerCtx =
pipeline.context(PassportStateHttpClientHandler.OutboundHandler.class);
pipeline.addAfter(passportStateHttpClientHandlerCtx.name(), IDLE_STATE_HANDLER_NAME,
new IdleStateHandler(0, 0, connPoolConfig.getIdleTimeout(), TimeUnit.MILLISECONDS));
}
/**
* handle WebSocket request,then, the the RPC could happen in WebSocket.
*
* @param ctx
* @param request
*/
protected void handleWebSocket(final ChannelHandlerContext ctx, FullHttpRequest request) {
if (logger.isDebugEnabled()) {
logger.debug("handleWebSocket request: uri={}", request.uri());
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(request.uri(), null, true);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
return;
}
ChannelFutureListener callback = websocketHandshakeListener(ctx, request);
ChannelFuture future = handshaker.handshake(ctx.channel(), request);
if (callback != null) {
future.addListener(callback);
}
ChannelPipeline pipe = ctx.pipeline();
if (pipe.get(WebsocketFrameHandler.class) == null) {
pipe.addAfter(ctx.name(), "wsFrameHandler", new WebsocketFrameHandler(handshaker));
ChannelHandler handlerAws = pipe.get(AwsProxyProtocolDecoder.class);
if (handlerAws != null) {
pipe.remove(handlerAws);
}
pipe.remove(ctx.name());// Remove current Handler
}
}
static void configureHttp11Pipeline(ChannelPipeline p,
boolean acceptGzip,
HttpResponseDecoderSpec decoder,
@Nullable Supplier<? extends ChannelMetricsRecorder> metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpCodec,
new HttpClientCodec(
decoder.maxInitialLineLength(),
decoder.maxHeaderSize(),
decoder.maxChunkSize(),
decoder.failOnMissingResponse,
decoder.validateHeaders(),
decoder.initialBufferSize(),
decoder.parseHttpAfterConnectRequest));
if (acceptGzip) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}
if (metricsRecorder != null) {
ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get();
if (channelMetricsRecorder instanceof HttpClientMetricsRecorder) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpMetricsHandler,
new HttpClientMetricsHandler((HttpClientMetricsRecorder) channelMetricsRecorder, uriTagValue));
}
}
}
@Override
public void buildCodec(Channel channel, ConnectionImpl connection) {
ChannelPipeline pipeline = channel.pipeline();
PacketDecoder decoder = new PacketDecoder(connection);
PacketEncoder encoder = new PacketEncoder(connection);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_RECEIVE, ChannelHandlers.DECODER_TRANSFORMER, decoder);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_SEND, ChannelHandlers.ENCODER_TRANSFORMER, encoder);
connection.initCodec(PacketCodec.instance, encoder, decoder);
}
@Override
public void buildCodec(Channel channel, ConnectionImpl connection) {
ChannelPipeline pipeline = channel.pipeline();
PacketDecoder decoder = new PacketDecoder(connection);
PacketEncoder encoder = new PacketEncoder(connection);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_RECEIVE, ChannelHandlers.DECODER_TRANSFORMER, decoder);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_SEND, ChannelHandlers.ENCODER_TRANSFORMER, encoder);
connection.initCodec(PacketCodec.instance, encoder, decoder);
}
@Override
public void buildCodec(Channel channel, ConnectionImpl connection) {
ChannelPipeline pipeline = channel.pipeline();
PacketDecoder decoder = new PacketDecoder(connection);
PacketEncoder encoder = new PacketEncoder(connection);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_RECEIVE, ChannelHandlers.DECODER_TRANSFORMER, decoder);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_SEND, ChannelHandlers.ENCODER_TRANSFORMER, encoder);
connection.initCodec(PacketCodec.instance, encoder, decoder);
}
@Override
public void buildCodec(Channel channel, ConnectionImpl connection) {
ChannelPipeline pipeline = channel.pipeline();
PacketDecoder decoder = new PacketDecoder(connection);
PacketEncoder encoder = new PacketEncoder(connection);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_RECEIVE, ChannelHandlers.DECODER_TRANSFORMER, decoder);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_SEND, ChannelHandlers.ENCODER_TRANSFORMER, encoder);
connection.initCodec(PacketCodec.instance, encoder, decoder);
}
@Override
public void buildCodec(Channel channel, ConnectionImpl connection) {
ChannelPipeline pipeline = channel.pipeline();
PacketDecoder decoder = new PacketDecoder(connection);
PacketEncoder encoder = new PacketEncoder(connection);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_RECEIVE, ChannelHandlers.DECODER_TRANSFORMER, decoder);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_SEND, ChannelHandlers.ENCODER_TRANSFORMER, encoder);
connection.initCodec(PacketCodec.instance, encoder, decoder);
}
@Override
public void buildCodec(Channel channel, ConnectionImpl connection) {
ChannelPipeline pipeline = channel.pipeline();
PacketDecoder decoder = new PacketDecoder(connection);
PacketEncoder encoder = new PacketEncoder(connection);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_RECEIVE, ChannelHandlers.DECODER_TRANSFORMER, decoder);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_SEND, ChannelHandlers.ENCODER_TRANSFORMER, encoder);
connection.initCodec(PacketCodec.instance, encoder, decoder);
}
@Override
public void buildCodec(Channel channel, ConnectionImpl connection) {
ChannelPipeline pipeline = channel.pipeline();
PacketDecoder decoder = new PacketDecoder(connection);
PacketEncoder encoder = new PacketEncoder(connection);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_RECEIVE, ChannelHandlers.DECODER_TRANSFORMER, decoder);
pipeline.addAfter(ChannelHandlers.RAW_CAPTURE_SEND, ChannelHandlers.ENCODER_TRANSFORMER, encoder);
connection.initCodec(PacketCodec.instance, encoder, decoder);
}
public void addSslHandler(Channel channel, @Nullable SocketAddress remoteAddress, boolean sslDebug) {
SslHandler sslHandler;
if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress sniInfo = (InetSocketAddress) remoteAddress;
sslHandler = getSslContext()
.newHandler(channel.alloc(), sniInfo.getHostString(), sniInfo.getPort());
if (log.isDebugEnabled()) {
log.debug(format(channel, "SSL enabled using engine {} and SNI {}"),
sslHandler.engine().getClass().getSimpleName(), sniInfo);
}
}
else {
sslHandler = getSslContext().newHandler(channel.alloc());
if (log.isDebugEnabled()) {
log.debug(format(channel, "SSL enabled using engine {}"),
sslHandler.engine().getClass().getSimpleName());
}
}
configure(sslHandler);
ChannelPipeline pipeline = channel.pipeline();
if (pipeline.get(NettyPipeline.ProxyHandler) != null) {
pipeline.addAfter(NettyPipeline.ProxyHandler, NettyPipeline.SslHandler, sslHandler);
}
else {
pipeline.addFirst(NettyPipeline.SslHandler, sslHandler);
}
if (pipeline.get(NettyPipeline.LoggingHandler) != null) {
pipeline.addAfter(NettyPipeline.LoggingHandler, NettyPipeline.SslReader, new SslReadHandler());
if (sslDebug) {
pipeline.addBefore(NettyPipeline.SslHandler,
NettyPipeline.SslLoggingHandler,
new LoggingHandler("reactor.netty.tcp.ssl"));
}
}
else {
pipeline.addAfter(NettyPipeline.SslHandler, NettyPipeline.SslReader, new SslReadHandler());
}
}
static void configureHttp11OrH2CleartextPipeline(
ChannelPipeline p,
boolean acceptGzip,
HttpResponseDecoderSpec decoder,
Http2Settings http2Settings,
@Nullable Supplier<? extends ChannelMetricsRecorder> metricsRecorder,
ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
@Nullable Function<String, String> uriTagValue) {
HttpClientCodec httpClientCodec =
new HttpClientCodec(
decoder.maxInitialLineLength(),
decoder.maxHeaderSize(),
decoder.maxChunkSize(),
decoder.failOnMissingResponse,
decoder.validateHeaders(),
decoder.initialBufferSize(),
decoder.parseHttpAfterConnectRequest);
Http2FrameCodecBuilder http2FrameCodecBuilder =
Http2FrameCodecBuilder.forClient()
.validateHeaders(decoder.validateHeaders())
.initialSettings(http2Settings);
if (p.get(NettyPipeline.LoggingHandler) != null) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG,
"reactor.netty.http.client.h2"));
}
Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build();
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec, new H2CleartextCodec(http2FrameCodec, opsFactory));
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer));
if (acceptGzip) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}
if (metricsRecorder != null) {
ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get();
if (channelMetricsRecorder instanceof HttpClientMetricsRecorder) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpMetricsHandler,
new HttpClientMetricsHandler((HttpClientMetricsRecorder) channelMetricsRecorder, uriTagValue));
}
}
}
static void configureHttp11Pipeline(ChannelPipeline p,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
HttpRequestDecoderSpec decoder,
boolean forwarded,
ConnectionObserver listener,
@Nullable Supplier<? extends ChannelMetricsRecorder> metricsRecorder,
int minCompressionSize,
@Nullable Function<String, String> uriTagValue) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpCodec,
new HttpServerCodec(decoder.maxInitialLineLength(), decoder.maxHeaderSize(),
decoder.maxChunkSize(), decoder.validateHeaders(), decoder.initialBufferSize()))
.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpTrafficHandler,
new HttpTrafficHandler(listener, forwarded, compressPredicate, cookieEncoder, cookieDecoder));
if (ACCESS_LOG) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.AccessLogHandler, new AccessLogHandler());
}
boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0;
if (alwaysCompress) {
p.addBefore(NettyPipeline.HttpTrafficHandler, NettyPipeline.CompressionHandler, new SimpleCompressionHandler());
}
if (metricsRecorder != null) {
ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get();
if (channelMetricsRecorder instanceof HttpServerMetricsRecorder) {
p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler,
new HttpServerMetricsHandler((HttpServerMetricsRecorder) channelMetricsRecorder, uriTagValue));
if (channelMetricsRecorder instanceof MicrometerHttpServerMetricsRecorder) {
// MicrometerHttpServerMetricsRecorder does not implement metrics on protocol level
// ChannelMetricsHandler will be removed from the pipeline
p.remove(NettyPipeline.ChannelMetricsHandler);
}
}
}
}
@Override
public void configure(final ChannelPipeline pipeline) {
handler.configure(pipeline);
pipeline.addAfter(PIPELINE_HTTP_REQUEST_DECODER, "WsAndHttpChannelizerHandler", handler);
}
/**
* Helps to add all the required security handler's after negotiation for encryption is completed.
* <p>Handler's before encryption is negotiated are:</p>
* <ul>
* <li> PROTOCOL_DECODER {@link ProtobufLengthDecoder} </li>
* <li> MESSAGE_DECODER {@link RpcDecoder} </li>
* <li> PROTOCOL_ENCODER {@link RpcEncoder} </li>
* <li> HANDSHAKE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.ClientHandshakeHandler} OR
* {@link org.apache.drill.exec.rpc.BasicServer.ServerHandshakeHandler} </li>
* <li> optional - IDLE_STATE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.IdlePingHandler} OR
* - TIMEOUT_HANDLER {@link org.apache.drill.exec.rpc.BasicServer.LoggingReadTimeoutHandler} </li>
* <li> MESSAGE_HANDLER {@link org.apache.drill.exec.rpc.RpcBus.InboundHandler} </li>
* <li> EXCEPTION_HANDLER {@link RpcExceptionHandler} </li>
* </ul>
* <p>Handler's after encryption is negotiated are:</p>
* <ul>
* <li> LENGTH_DECODER_HANDLER {@link LengthFieldBasedFrameDecoder}
* <li> SASL_DECRYPTION_HANDLER {@link SaslDecryptionHandler}
* <li> PROTOCOL_DECODER {@link ProtobufLengthDecoder}
* <li> MESSAGE_DECODER {@link RpcDecoder}
* <li> SASL_ENCRYPTION_HANDLER {@link SaslEncryptionHandler}
* <li> CHUNK_CREATION_HANDLER {@link ChunkCreationHandler}
* <li> PROTOCOL_ENCODER {@link RpcEncoder}
* <li> HANDSHAKE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.ClientHandshakeHandler} OR
* {@link org.apache.drill.exec.rpc.BasicServer.ServerHandshakeHandler}
* <li> optional - IDLE_STATE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.IdlePingHandler} OR
* - TIMEOUT_HANDLER {@link org.apache.drill.exec.rpc.BasicServer.LoggingReadTimeoutHandler}
* <li> MESSAGE_HANDLER {@link org.apache.drill.exec.rpc.RpcBus.InboundHandler}
* <li> EXCEPTION_HANDLER {@link RpcExceptionHandler}
* </ul>
* <p>
* If encryption is enabled ChunkCreationHandler is always added to divide the Rpc message into chunks of
* negotiated {@link EncryptionContextImpl#wrapSizeLimit} bytes. This helps to make a generic encryption handler.
* </p>
*/
@Override
public void addSecurityHandlers() {
final ChannelPipeline channelPipeline = getChannel().pipeline();
if (channelPipeline.names().contains(RpcConstants.SSL_HANDLER)) {
channelPipeline.addAfter(RpcConstants.SSL_HANDLER, RpcConstants.SASL_DECRYPTION_HANDLER,
new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE));
channelPipeline.addAfter(RpcConstants.SSL_HANDLER, RpcConstants.LENGTH_DECODER_HANDLER,
new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE,
RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH,
RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true));
} else {
channelPipeline.addFirst(RpcConstants.SASL_DECRYPTION_HANDLER,
new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE));
channelPipeline.addFirst(RpcConstants.LENGTH_DECODER_HANDLER,
new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE,
RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH,
RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true));
}
channelPipeline.addAfter(RpcConstants.MESSAGE_DECODER, RpcConstants.SASL_ENCRYPTION_HANDLER,
new SaslEncryptionHandler(saslCodec, getWrapSizeLimit(), OutOfMemoryHandler.DEFAULT_INSTANCE));
channelPipeline.addAfter(RpcConstants.SASL_ENCRYPTION_HANDLER, RpcConstants.CHUNK_CREATION_HANDLER,
new ChunkCreationHandler(getWrapSizeLimit()));
}