下面列出了io.netty.channel.ChannelPipeline#remove ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 设置日志是否开启
*
* @param key 客户端关键字,须保证唯一
* @param enabled 是否开启,true为开启
*/
public void setLoggingEnabled(String key, boolean enabled, Class<?> channelManagerClass, String loggingName) {
if (!initialized) {
throw new IllegalArgumentException("服务没有初始化成功");
}
ClientEntry entry = clientEntries.get(key);
if (null == entry) {
throw new NullPointerException("根据[" + key + "]查找不到对应的ClientEntry对象,可能没有注册成功,请检查!");
}
Channel channel = entry.getChannel();
if (null == channel) {
LOG.debug("根据[{}]没有找到对应的channel/pipeline,退出处理!", key);
return;
}
ChannelPipeline pipeline = channel.pipeline();
if (enabled && pipeline.get(loggingName) == null) {
pipeline.addFirst(loggingName,
new LoggingHandler(channelManagerClass));
} else if (!enabled && pipeline.get(loggingName) != null) {
pipeline.remove(loggingName);
}
}
private void switchToHttp(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("http-decoder", new HttpRequestDecoder());
p.addLast("http-aggregator", new HttpObjectAggregator(Integer.MAX_VALUE));
p.addLast("http-encoder", new HttpResponseEncoder());
//create it lazily if and when we need it
if (httpKeepAliveRunnable == null) {
long httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, nettyAcceptor.getConfiguration());
httpKeepAliveRunnable = new HttpKeepAliveRunnable();
Future<?> future = scheduledThreadPool.scheduleAtFixedRate(httpKeepAliveRunnable, httpServerScanPeriod, httpServerScanPeriod, TimeUnit.MILLISECONDS);
httpKeepAliveRunnable.setFuture(future);
}
long httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, nettyAcceptor.getConfiguration());
HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime, ctx.channel());
ctx.pipeline().addLast("http-handler", httpHandler);
p.addLast(new ProtocolDecoder(false, true));
p.remove(this);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object obj) {
final ChannelPipeline pipeline = ctx.pipeline();
if (obj instanceof HttpMessage && !WebSocketHandlerUtil.isWebSocket((HttpMessage)obj)) {
if (null != pipeline.get(PIPELINE_AUTHENTICATOR)) {
pipeline.remove(PIPELINE_REQUEST_HANDLER);
final ChannelHandler authenticator = pipeline.get(PIPELINE_AUTHENTICATOR);
pipeline.remove(PIPELINE_AUTHENTICATOR);
pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_AUTHENTICATOR, authenticator);
pipeline.addAfter(PIPELINE_AUTHENTICATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
} else {
pipeline.remove(PIPELINE_REQUEST_HANDLER);
pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
}
}
ctx.fireChannelRead(obj);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf && ctx.channel().isActive()) {
boolean isHttpRequest = false;
ByteBuf buffer = (ByteBuf) msg;
final int len = 11;
if (buffer.readableBytes() > len) {
byte[] dst = new byte[len];
buffer.getBytes(buffer.readerIndex(), dst, 0, len);
int n = HttpMethodUtil.method(dst);
isHttpRequest = n > 2;
}
if (isHttpRequest) {
ChannelPipeline cp = ctx.pipeline();
String currentName = ctx.name();
cp.addAfter(currentName, "HttpRequestDecoder", new HttpRequestDecoder());
cp.addAfter("HttpRequestDecoder", "HttpResponseEncoder", new HttpResponseEncoder());
cp.addAfter("HttpResponseEncoder", "HttpObjectAggregator", new HttpObjectAggregator(512 * 1024));
ChannelHandler handler = serverDef.httpHandlerFactory.create(serverDef);
cp.addAfter("HttpObjectAggregator", "HttpThriftBufDecoder", handler);
cp.remove(currentName);
}
}
ctx.fireChannelRead(msg);
}
private boolean upgradeToWebSocket(ChannelHandlerContext ctx, FullHttpRequest request) {
HttpHeaders headers = request.headers();
if ("Upgrade".equalsIgnoreCase(headers.get(HttpHeaderNames.CONNECTION)) &&
"WebSocket".equalsIgnoreCase(headers.get(HttpHeaderNames.UPGRADE))) {
ChannelPipeline pipeline = ctx.pipeline();
// 将http升级为WebSocket
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(FrameCodec.getInstance());
pipeline.addLast(FrameHandler.getInstance(channelListener));
pipeline.remove(this);
// 将channelActive事件传递到FrameHandler
ctx.fireChannelActive();
return true;
}
return false;
}
static void configureH2Pipeline(ChannelPipeline p,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
boolean forwarded,
Http2Settings http2Settings,
ConnectionObserver listener,
ChannelOperations.OnSetup opsFactory,
boolean validate) {
p.remove(NettyPipeline.ReactiveBridge);
Http2FrameCodecBuilder http2FrameCodecBuilder =
Http2FrameCodecBuilder.forServer()
.validateHeaders(validate)
.initialSettings(http2Settings);
if (p.get(NettyPipeline.LoggingHandler) != null) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG,
"reactor.netty.http.server.h2"));
}
p.addLast(NettyPipeline.HttpCodec, http2FrameCodecBuilder.build())
.addLast(NettyPipeline.H2MultiplexHandler,
new Http2MultiplexHandler(new H2Codec(opsFactory, listener, forwarded, cookieEncoder, cookieDecoder)));
}
/**
* Invoke fire and forget command handler.
*
* @param ctx the ctx
*/
private void invokeAsyncFireAndForgetCommandHandler(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast(DECODER, new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast("commandExecutor", new CommandAsyncDelegator());
p.addLast(ENCODER, new StringEncoder());
p.remove(this);
}
/**
* Invoke log files send handler.
*
* @param ctx the ctx
*/
private void invokeLogFilesSendHandler(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast(STREAMER, new LogFilesDecoder(receiveDirectory));
p.addLast(ACK_RESPONSER, new AckResponser());
p.addLast(ENCODER, new StringEncoder());
p.remove(this);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
if (in.readableBytes() < 3) {
return;
}
if (detectTelnetFuture != null && detectTelnetFuture.isCancellable()) {
detectTelnetFuture.cancel(false);
}
byte[] bytes = new byte[3];
in.getBytes(0, bytes);
String httpHeader = new String(bytes);
ChannelPipeline pipeline = ctx.pipeline();
if (!"GET".equalsIgnoreCase(httpHeader)) { // telnet
channelGroup.add(ctx.channel());
TelnetChannelHandler handler = new TelnetChannelHandler(handlerFactory);
pipeline.addLast(handler);
ctx.fireChannelActive(); // trigger TelnetChannelHandler init
} else {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(workerGroup, "HttpRequestHandler", new HttpRequestHandler("/ws", new File("arthas-output")));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TtyWebSocketFrameHandler(channelGroup, ttyConnectionFactory));
ctx.fireChannelActive();
}
pipeline.remove(this);
ctx.fireChannelRead(in);
}
private void switchToSocket(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(400 * 1024, 0, 4, -4, 0));
pipeline.addLast("decoder", new PacketDecoder());
pipeline.addLast("encoder", new PacketEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("handler", new MessageServerHandler(handlerManager));
pipeline.remove(this);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
final int readerIndex = in.readerIndex();
if (in.writerIndex() == readerIndex) {
return;
}
ChannelPipeline p = ctx.pipeline();
final byte versionVal = in.getByte(readerIndex);
SocksVersion version = SocksVersion.valueOf(versionVal);
switch (version) {
case SOCKS4a:
logKnownVersion(ctx, version);
p.addAfter(ctx.name(), null, Socks4ServerEncoder.INSTANCE);
p.addAfter(ctx.name(), null, new Socks4ServerDecoder());
break;
case SOCKS5:
logKnownVersion(ctx, version);
p.addAfter(ctx.name(), null, socks5encoder);
p.addAfter(ctx.name(), null, new Socks5InitialRequestDecoder());
break;
default:
logUnknownVersion(ctx, versionVal);
in.skipBytes(in.readableBytes());
ctx.close();
return;
}
p.remove(this);
}
private void dispatchToPacket(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
BaseConfig baseConfig = ConfigFactory.getConfig(BaseConfig.class);
pipeline.addLast(new IdleStateChecker(baseConfig.readerIdleTime()));
pipeline.addLast(new PacketCodec());
pipeline.addLast(PacketHandler.getInstance(channelListener));
// 将所有所需的ChannelHandler添加到pipeline之后,一定要将自身移除掉
// 否则该Channel之后的请求仍会重新执行协议的分发,而这是要避免的
pipeline.remove(this);
// 将channelActive事件传递到PacketHandler
ctx.fireChannelActive();
}
/**
* handle WebSocket request,then, the the RPC could happen in WebSocket.
*
* @param ctx
* @param request
*/
protected void handleWebSocket(final ChannelHandlerContext ctx, FullHttpRequest request) {
if (logger.isDebugEnabled()) {
logger.debug("handleWebSocket request: uri={}", request.uri());
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(request.uri(), null, true);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
return;
}
ChannelFutureListener callback = websocketHandshakeListener(ctx, request);
ChannelFuture future = handshaker.handshake(ctx.channel(), request);
if (callback != null) {
future.addListener(callback);
}
ChannelPipeline pipe = ctx.pipeline();
if (pipe.get(WebsocketFrameHandler.class) == null) {
pipe.addAfter(ctx.name(), "wsFrameHandler", new WebsocketFrameHandler(handshaker));
ChannelHandler handlerAws = pipe.get(AwsProxyProtocolDecoder.class);
if (handlerAws != null) {
pipe.remove(handlerAws);
}
pipe.remove(ctx.name());// Remove current Handler
}
}
private void enableGzip(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
p.addLast("unificationB", new PortUnificationServerHandler(sslCtx, detectSsl, false));
p.remove(this);
}
/**
* Invoke jar send handler.
*
* @param ctx the ctx
*/
private void invokeJarSendHandlerForHA(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
p.addLast(e1, new ArchiveDecoder(10485760, receiveDirectory));
p.addLast(ACK_RESPONSER, new AckResponser());
p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(),
JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
p.addLast(ENCODER, new StringEncoder());
p.remove(this);
}
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
if (config.metricsRecorder != null) {
ChannelOperations.addMetricsHandler(channel,
Objects.requireNonNull(config.metricsRecorder.get(), "Metrics recorder supplier returned null"),
remoteAddress,
onServer);
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric());
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric());
}
}
if (config.loggingHandler != null) {
pipeline.addFirst(NettyPipeline.LoggingHandler, config.loggingHandler);
}
ChannelOperations.addReactiveBridge(channel, config.channelOperationsProvider(), connectionObserver);
config.defaultOnChannelInit()
.then(config.doOnChannelInit)
.onChannelInit(connectionObserver, channel, remoteAddress);
pipeline.remove(this);
if (log.isDebugEnabled()) {
log.debug(format(channel, "Initialized pipeline {}"), pipeline.toString());
}
}
private void enableSsl(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
p.addLast("unificationA", new PortUnificationServerHandler(sslCtx, false, detectGzip));
p.remove(this);
}
@Override
public void finalize(final ChannelPipeline pipeline) {
pipeline.remove(PIPELINE_OP_SELECTOR);
pipeline.remove(PIPELINE_OP_EXECUTOR);
}
private void enableSsl(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
p.addLast("unificationA", new PortUnificationServerHandler(sslCtx, false, detectGzip));
p.remove(this);
}
public final void finishHandshake(Channel channel, ProtocolVersion response) {
setHandshakeComplete();
ChannelPipeline p = channel.pipeline();
p.remove("rfb-handshake-decoder");
p.remove("rfb-handshake-encoder");
logger.debug("server {} - client {}", version, response);
}