下面列出了io.netty.channel.ChannelPipeline#addFirst ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override protected void setupSSL(ChannelPipeline pipe,
ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener) {
String peerHost = endpoint.getAddress();
int peerPort = endpoint.getUserPort();
SSLEngine sslEngine = sslConfig.createSSLEngine(allocator, peerHost, peerPort);
// Add SSL handler into pipeline
SslHandler sslHandler = new SslHandler(sslEngine);
sslHandler.setHandshakeTimeoutMillis(sslConfig.getHandshakeTimeout());
// Add a listener for SSL Handshake complete. The Drill client handshake will be enabled only
// after this is done.
sslHandler.handshakeFuture().addListener(sslHandshakeListener);
pipe.addFirst(RpcConstants.SSL_HANDLER, sslHandler);
logger.debug(sslConfig.toString());
}
public void setLoggingEnabled(boolean enabled) {
checkState(initialized, "Not initialized.");
if (channelFuture == null) {
LOG.debug("No channel future available, doing nothing.");
return;
}
ChannelPipeline pipeline = channelFuture.channel().pipeline();
if (enabled && pipeline.get(LOGGING_HANDLER_NAME) == null) {
pipeline.addFirst(LOGGING_HANDLER_NAME,
new LoggingHandler(RxtxClientChannelManager.this.getClass()));
} else if (!enabled && pipeline.get(LOGGING_HANDLER_NAME) != null) {
pipeline.remove(LOGGING_HANDLER_NAME);
}
}
/**
* 设置日志是否开启
*
* @param key 客户端关键字,须保证唯一
* @param enabled 是否开启,true为开启
*/
public void setLoggingEnabled(String key, boolean enabled, Class<?> channelManagerClass, String loggingName) {
if (!initialized) {
throw new IllegalArgumentException("服务没有初始化成功");
}
ClientEntry entry = clientEntries.get(key);
if (null == entry) {
throw new NullPointerException("根据[" + key + "]查找不到对应的ClientEntry对象,可能没有注册成功,请检查!");
}
Channel channel = entry.getChannel();
if (null == channel) {
LOG.debug("根据[{}]没有找到对应的channel/pipeline,退出处理!", key);
return;
}
ChannelPipeline pipeline = channel.pipeline();
if (enabled && pipeline.get(loggingName) == null) {
pipeline.addFirst(loggingName,
new LoggingHandler(channelManagerClass));
} else if (!enabled && pipeline.get(loggingName) != null) {
pipeline.remove(loggingName);
}
}
@Override
protected void initChannel( SocketChannel _channel ) throws Exception {
ChannelPipeline pipeline = _channel.pipeline();
// 是否使用客户端模式
if( CommonConfig.getBoolean( "jees.jsts.websocket.ssl.enable", false ) ){
SSLEngine engine = sslContext1.createSSLEngine();
// 是否需要验证客户端
engine.setUseClientMode(false);
// engine.setNeedClientAuth(false);
pipeline.addFirst("ssl", new SslHandler( engine ));
}
pipeline.addLast( new IdleStateHandler( 100 , 0 , 0 , TimeUnit.SECONDS ) );
pipeline.addLast( new HttpServerCodec() );
pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new HttpObjectAggregator( 8192 ) );
pipeline.addLast( new WebSocketServerProtocolHandler( CommonConfig.getString( ISocketBase.Netty_WebSocket_Url, "/" ) ) );
pipeline.addLast( CommonContextHolder.getBean( WebSocketHandler.class ) );
}
@Override
public void buildBungeeServer(Channel channel, Connection connection) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(new EncapsulatedHandshakeSender(null, false));
NetworkDataCache cache = NetworkDataCache.getFrom(connection);
pipeline.replace(PipelineUtils.PACKET_DECODER, PipelineUtils.PACKET_DECODER, new FromServerPacketDecoder(connection, cache));
pipeline.replace(PipelineUtils.PACKET_ENCODER, PipelineUtils.PACKET_ENCODER, new ToServerPacketEncoder(connection, cache));
pipeline.get(CustomHandlerBoss.class).setHandlerChangeListener(handler -> {
try {
return (handler instanceof DownstreamBridge) ? new EntityRewriteDownstreamBridge(
ReflectionUtils.getFieldValue(handler, "con"), connection.getVersion()
) : handler;
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new RuntimeException(e);
}
});
}
@Override
public void buildBungeeServer(Channel channel, Connection connection) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(new EncapsulatedHandshakeSender(null, false));
NetworkDataCache cache = NetworkDataCache.getFrom(connection);
pipeline.replace(PipelineUtils.PACKET_DECODER, PipelineUtils.PACKET_DECODER, new FromServerPacketDecoder(connection, cache));
pipeline.replace(PipelineUtils.PACKET_ENCODER, PipelineUtils.PACKET_ENCODER, new ToServerPacketEncoder(connection, cache));
pipeline.get(CustomHandlerBoss.class).setHandlerChangeListener(handler -> {
try {
return (handler instanceof DownstreamBridge) ? new EntityRewriteDownstreamBridge(
ReflectionUtils.getFieldValue(handler, "con"), connection.getVersion()
) : handler;
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new RuntimeException(e);
}
});
}
@Override
public void buildBungeeServer(Channel channel, Connection connection) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(new EncapsulatedHandshakeSender(null, false));
NetworkDataCache cache = NetworkDataCache.getFrom(connection);
pipeline.replace(PipelineUtils.PACKET_DECODER, PipelineUtils.PACKET_DECODER, new FromServerPacketDecoder(connection, cache));
pipeline.replace(PipelineUtils.PACKET_ENCODER, PipelineUtils.PACKET_ENCODER, new ToServerPacketEncoder(connection, cache));
pipeline.get(CustomHandlerBoss.class).setHandlerChangeListener(handler -> {
try {
return (handler instanceof DownstreamBridge) ? new EntityRewriteDownstreamBridge(
ReflectionUtils.getFieldValue(handler, "con"), connection.getVersion()
) : handler;
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new RuntimeException(e);
}
});
}
@Override
protected void setupSSL(ChannelPipeline pipe) {
SSLEngine sslEngine = sslConfig.createSSLEngine(config.getAllocator(), null, 0);
// Add SSL handler into pipeline
pipe.addFirst(RpcConstants.SSL_HANDLER, new SslHandler(sslEngine));
logger.debug("SSL communication between client and server is enabled.");
logger.debug(sslConfig.toString());
}
public void registerItems(ChannelPipeline pipeline, Netty4CorsConfig corsConfig) {
for (PipelineRegistry.ChannelPipelineItem item : addBeforeList) {
pipeline.addBefore(item.base, item.name, item.handlerFactory.apply(corsConfig));
}
if (sslContextProvider != null) {
SslContext sslContext = sslContextProvider.getSslContext();
if (sslContext != null) {
SslHandler sslHandler = sslContext.newHandler(pipeline.channel().alloc());
pipeline.addFirst(sslHandler);
}
}
}
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
final SslHandler sslHandler = new SslHandler(sslEngineFactory.createSSLEngine(pipeline.channel().alloc()));
if(proxy != null) {
pipeline.addAfter(Constants.Properties.HTTP_PROXY_HANDLER_NAME,Constants.Properties.SSL_HANDLER_NAME, sslHandler);
} else {
pipeline.addFirst(Constants.Properties.SSL_HANDLER_NAME, sslHandler);
}
pipeline.addAfter(Constants.Properties.SSL_HANDLER_NAME, Constants.Properties.SSL_COMPLETION_HANDLER_NAME,
new SslCompletionHandler(sslHandler.handshakeFuture()));
}
/**
* Called by @{ChannelPipeline} initializer after the current channel is registered to an event loop.
* <p>
* This method constructs this pipeline:
* <pre>{@code
* ChannelPipeline {
* (SslHandler#0 = io.netty.handler.ssl.SslHandler),
* (IdleTimeoutHandler#0 = io.netty.handler.timeout.IdleTimeoutHandler),
* (LoggingHandler#0 = io.netty.handler.logging.LoggingHandler), // iff RntbdClientChannelHandler.config.wireLogLevel != null
* (RntbdContextNegotiator#0 = com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdContextNegotiator),
* (RntbdResponseDecoder#0 = com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdResponseDecoder),
* (RntbdRequestEncoder#0 = com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestEncoder),
* (RntbdRequestManager#0 = com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestManager),
* }
* }</pre>
*
* @param channel a channel that was just registered with an event loop
*/
@Override
protected void initChannel(final Channel channel) {
checkNotNull(channel);
final RntbdRequestManager requestManager = new RntbdRequestManager(this.healthChecker, this.config.maxRequestsPerChannel());
final long readerIdleTime = this.config.receiveHangDetectionTimeInNanos();
final long writerIdleTime = this.config.sendHangDetectionTimeInNanos();
final long allIdleTime = this.config.idleConnectionTimeoutInNanos();
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(
new RntbdContextNegotiator(requestManager, this.config.userAgent()),
new RntbdResponseDecoder(),
new RntbdRequestEncoder(),
requestManager
);
if (this.config.wireLogLevel() != null) {
pipeline.addFirst(new LoggingHandler(this.config.wireLogLevel()));
}
pipeline.addFirst(
this.config.sslContext().newHandler(channel.alloc()),
new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.NANOSECONDS)
);
channel.attr(REQUEST_MANAGER).set(requestManager);
}
@Override
protected void initPipeline(ChannelPipeline pipeline) {
super.initPipeline(pipeline);
if (trafficShapingHandler != null) {
pipeline.addFirst(trafficShapingHandler);
}
}
/**
* Process receives incoming data from the Netty pipeline. It
* may request more data by returning the WAITING_FOR_INPUT
* state. The process method should return DONE when it has
* finished processing. It may add additional elements to the
* pipeline. The handler is responsible for to position the
* buffer read marker correctly such that successive readers
* see the correct data. The handler is expected to position the
* marker after the SSLRequest payload.
* @param buffer The buffer with incoming data
* @param pipeline The Netty pipeline which may be modified
* @return The state of the handler
*/
public State process(ByteBuf buffer, ChannelPipeline pipeline) {
if (buffer.readableBytes() < SSL_REQUEST_BYTE_LENGTH) {
return State.WAITING_FOR_INPUT;
}
// mark the buffer so we can jump back if we don't handle this startup
buffer.markReaderIndex();
// reads the total message length (int) and the SSL request code (int)
if (buffer.readInt() == SSL_REQUEST_BYTE_LENGTH && buffer.readInt() == SSL_REQUEST_CODE) {
final SslContext sslContext;
if (sslContextProvider != null) {
sslContext = sslContextProvider.getSslContext();
} else {
sslContext = null;
}
// received optional SSL negotiation pkg
if (sslContext != null) {
writeByteAndFlushMessage(pipeline.channel(), 'S');
SslHandler sslHandler = sslContext.newHandler(pipeline.channel().alloc());
pipeline.addFirst(sslHandler);
} else {
writeByteAndFlushMessage(pipeline.channel(), 'N');
}
buffer.markReaderIndex();
} else {
buffer.resetReaderIndex();
}
return State.DONE;
}
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
if (config.metricsRecorder != null) {
ChannelOperations.addMetricsHandler(channel,
Objects.requireNonNull(config.metricsRecorder.get(), "Metrics recorder supplier returned null"),
remoteAddress,
onServer);
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric());
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric());
}
}
if (config.loggingHandler != null) {
pipeline.addFirst(NettyPipeline.LoggingHandler, config.loggingHandler);
}
ChannelOperations.addReactiveBridge(channel, config.channelOperationsProvider(), connectionObserver);
config.defaultOnChannelInit()
.then(config.doOnChannelInit)
.onChannelInit(connectionObserver, channel, remoteAddress);
pipeline.remove(this);
if (log.isDebugEnabled()) {
log.debug(format(channel, "Initialized pipeline {}"), pipeline.toString());
}
}
@Override
protected void addChannelHandlers(ChannelPipeline pipeline) {
logger.debug("Adding ssl handler to the pipeline");
SSLEngine engine = createSslEngine();
engine.setUseClientMode(false);
pipeline.addFirst("ssl", new SslHandler(engine));
}
private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
if (pipeline.names().contains("idleStateHandler")) {
pipeline.remove("idleStateHandler");
}
pipeline.addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0));
}
private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
if (pipeline.names().contains("idleStateHandler")) {
pipeline.remove("idleStateHandler");
}
pipeline.addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0));
}
public void addSslHandler(Channel channel, @Nullable SocketAddress remoteAddress, boolean sslDebug) {
SslHandler sslHandler;
if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress sniInfo = (InetSocketAddress) remoteAddress;
sslHandler = getSslContext()
.newHandler(channel.alloc(), sniInfo.getHostString(), sniInfo.getPort());
if (log.isDebugEnabled()) {
log.debug(format(channel, "SSL enabled using engine {} and SNI {}"),
sslHandler.engine().getClass().getSimpleName(), sniInfo);
}
}
else {
sslHandler = getSslContext().newHandler(channel.alloc());
if (log.isDebugEnabled()) {
log.debug(format(channel, "SSL enabled using engine {}"),
sslHandler.engine().getClass().getSimpleName());
}
}
configure(sslHandler);
ChannelPipeline pipeline = channel.pipeline();
if (pipeline.get(NettyPipeline.ProxyHandler) != null) {
pipeline.addAfter(NettyPipeline.ProxyHandler, NettyPipeline.SslHandler, sslHandler);
}
else {
pipeline.addFirst(NettyPipeline.SslHandler, sslHandler);
}
if (pipeline.get(NettyPipeline.LoggingHandler) != null) {
pipeline.addAfter(NettyPipeline.LoggingHandler, NettyPipeline.SslReader, new SslReadHandler());
if (sslDebug) {
pipeline.addBefore(NettyPipeline.SslHandler,
NettyPipeline.SslLoggingHandler,
new LoggingHandler("reactor.netty.tcp.ssl"));
}
}
else {
pipeline.addAfter(NettyPipeline.SslHandler, NettyPipeline.SslReader, new SslReadHandler());
}
}
/**
* Helps to add all the required security handler's after negotiation for encryption is completed.
* <p>Handler's before encryption is negotiated are:</p>
* <ul>
* <li> PROTOCOL_DECODER {@link ProtobufLengthDecoder} </li>
* <li> MESSAGE_DECODER {@link RpcDecoder} </li>
* <li> PROTOCOL_ENCODER {@link RpcEncoder} </li>
* <li> HANDSHAKE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.ClientHandshakeHandler} OR
* {@link org.apache.drill.exec.rpc.BasicServer.ServerHandshakeHandler} </li>
* <li> optional - IDLE_STATE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.IdlePingHandler} OR
* - TIMEOUT_HANDLER {@link org.apache.drill.exec.rpc.BasicServer.LoggingReadTimeoutHandler} </li>
* <li> MESSAGE_HANDLER {@link org.apache.drill.exec.rpc.RpcBus.InboundHandler} </li>
* <li> EXCEPTION_HANDLER {@link RpcExceptionHandler} </li>
* </ul>
* <p>Handler's after encryption is negotiated are:</p>
* <ul>
* <li> LENGTH_DECODER_HANDLER {@link LengthFieldBasedFrameDecoder}
* <li> SASL_DECRYPTION_HANDLER {@link SaslDecryptionHandler}
* <li> PROTOCOL_DECODER {@link ProtobufLengthDecoder}
* <li> MESSAGE_DECODER {@link RpcDecoder}
* <li> SASL_ENCRYPTION_HANDLER {@link SaslEncryptionHandler}
* <li> CHUNK_CREATION_HANDLER {@link ChunkCreationHandler}
* <li> PROTOCOL_ENCODER {@link RpcEncoder}
* <li> HANDSHAKE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.ClientHandshakeHandler} OR
* {@link org.apache.drill.exec.rpc.BasicServer.ServerHandshakeHandler}
* <li> optional - IDLE_STATE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.IdlePingHandler} OR
* - TIMEOUT_HANDLER {@link org.apache.drill.exec.rpc.BasicServer.LoggingReadTimeoutHandler}
* <li> MESSAGE_HANDLER {@link org.apache.drill.exec.rpc.RpcBus.InboundHandler}
* <li> EXCEPTION_HANDLER {@link RpcExceptionHandler}
* </ul>
* <p>
* If encryption is enabled ChunkCreationHandler is always added to divide the Rpc message into chunks of
* negotiated {@link EncryptionContextImpl#wrapSizeLimit} bytes. This helps to make a generic encryption handler.
* </p>
*/
@Override
public void addSecurityHandlers() {
final ChannelPipeline channelPipeline = getChannel().pipeline();
if (channelPipeline.names().contains(RpcConstants.SSL_HANDLER)) {
channelPipeline.addAfter(RpcConstants.SSL_HANDLER, RpcConstants.SASL_DECRYPTION_HANDLER,
new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE));
channelPipeline.addAfter(RpcConstants.SSL_HANDLER, RpcConstants.LENGTH_DECODER_HANDLER,
new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE,
RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH,
RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true));
} else {
channelPipeline.addFirst(RpcConstants.SASL_DECRYPTION_HANDLER,
new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE));
channelPipeline.addFirst(RpcConstants.LENGTH_DECODER_HANDLER,
new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE,
RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH,
RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true));
}
channelPipeline.addAfter(RpcConstants.MESSAGE_DECODER, RpcConstants.SASL_ENCRYPTION_HANDLER,
new SaslEncryptionHandler(saslCodec, getWrapSizeLimit(), OutOfMemoryHandler.DEFAULT_INSTANCE));
channelPipeline.addAfter(RpcConstants.SASL_ENCRYPTION_HANDLER, RpcConstants.CHUNK_CREATION_HANDLER,
new ChunkCreationHandler(getWrapSizeLimit()));
}
@Override
protected void initChannel(Channel ch) throws Exception {
SSLEngine sslEngine = sslContext.newEngine(ch.alloc());
sslEngine.setUseClientMode(false);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addFirst(new SslHandler(sslEngine)) ;
//http 服务器请求的 编码器和解码器的聚合
pipeline.addLast(new HttpServerCodec()) ;
//写文件
pipeline.addLast(new ChunkedWriteHandler()) ;
pipeline.addLast(new HttpObjectAggregator(64 * 1024)) ;
//处理不是 /ws 的 http 请求
pipeline.addLast(new HttpRequestHandle("/ws")) ;
//处理 websocket 升级 handler
pipeline.addLast(new WebSocketServerProtocolHandler("/ws")) ;
//真正的 websocket 处理文本 handler
pipeline.addLast(new TextWebSocketFrameHandler(group)) ;
}