下面列出了io.netty.channel.ChannelHandlerContext#pipeline ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object obj) {
final ChannelPipeline pipeline = ctx.pipeline();
if (obj instanceof HttpMessage && !WebSocketHandlerUtil.isWebSocket((HttpMessage)obj)) {
if (null != pipeline.get(PIPELINE_AUTHENTICATOR)) {
pipeline.remove(PIPELINE_REQUEST_HANDLER);
final ChannelHandler authenticator = pipeline.get(PIPELINE_AUTHENTICATOR);
pipeline.remove(PIPELINE_AUTHENTICATOR);
pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_AUTHENTICATOR, authenticator);
pipeline.addAfter(PIPELINE_AUTHENTICATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
} else {
pipeline.remove(PIPELINE_REQUEST_HANDLER);
pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
}
}
ctx.fireChannelRead(obj);
}
public static void main(String[] args) throws Exception {
InputStream ins=TestWssClient.class.getResourceAsStream("cloud.jueb.net.pfx");
String strPassword="xxxxxx";
SslContext sslc=NettyServerSslUtil.buildSslContext_P12_Pfx(ins, strPassword);
NettyServerConfig nc=new NettyServerConfig();
NettyServer ns=new NettyServer(nc, "0.0.0.0", 1191,new WebSocketServerInitializer("/test",sslc) {
@Override
protected void webSocketHandComplete(ChannelHandlerContext ctx) {
ChannelPipeline p=ctx.pipeline();
p.addLast(new WebSocketTextFrameStringAdapter());//消息解码器
p.addLast(new DefaultIdleListenerHandler<String>(new Listener()));//心跳适配器
//为新加的handler手动触发必要事件
ctx.fireChannelRegistered();
ctx.fireChannelActive();
}
});
ns.start();
new Scanner(System.in).nextLine();
}
@Override
protected void decode(ChannelHandlerContext ctx, HAProxyMessage msg, List<Object> out)
throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("PROXY message {}: {}:{} -> {}:{} (next: {})",
msg.protocolVersion().name(),
msg.sourceAddress(), msg.sourcePort(),
msg.destinationAddress(), msg.destinationPort(),
proxiedCandidates);
}
final ChannelPipeline p = ctx.pipeline();
final InetAddress src = InetAddress.getByAddress(
NetUtil.createByteArrayFromIpAddressString(msg.sourceAddress()));
final InetAddress dst = InetAddress.getByAddress(
NetUtil.createByteArrayFromIpAddressString(msg.destinationAddress()));
final ProxiedAddresses proxiedAddresses =
ProxiedAddresses.of(new InetSocketAddress(src, msg.sourcePort()),
new InetSocketAddress(dst, msg.destinationPort()));
configurePipeline(p, proxiedCandidates, proxiedAddresses);
p.remove(this);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object obj)
throws Exception {
if (obj instanceof HttpMessage && !isWebSocket((HttpMessage) obj)) {
ChannelPipeline pipeline = ctx.pipeline();
ChannelHandler authHandler = pipeline.get(HTTP_AUTH);
if (authHandler != null) {
authHandler = pipeline.remove(HTTP_AUTH);
} else {
authHandler = new HttpBasicAuthHandler(
this.authenticator, this.authenticationSettings);
}
pipeline.addAfter(AUTHENTICATOR, HTTP_AUTH, authHandler);
ctx.fireChannelRead(obj);
} else {
super.channelRead(ctx, obj);
}
}
public static void main(String[] args) throws Exception {
SslContext sslc=SslContextBuilder.forClient().build();
NettyClientConfig nc=new NettyClientConfig();
URI uri=new URI("wss://cloud.jueb.net:1191/test");
NettyClient ns=new NettyClient(nc, "192.168.0.223", 1191,new WebSocketClientInitializer(uri,sslc) {
@Override
protected void webSocketHandComplete(ChannelHandlerContext ctx) {
ChannelPipeline p=ctx.pipeline();
p.addLast(new WebSocketTextFrameStringAdapter());//消息解码器
p.addLast(new DefaultIdleListenerHandler<String>(new Listener()));//心跳适配器
//为新加的handler手动触发必要事件
ctx.fireChannelRegistered();
ctx.fireChannelActive();
}
});
ns.start();
new Scanner(System.in).nextLine();
}
private void createSessionAndReregister(ChannelHandlerContext ctx,
User user, DashBoard dash, Device device, int msgId) {
HardwareStateHolder hardwareStateHolder = new HardwareStateHolder(user, dash, device);
ChannelPipeline pipeline = ctx.pipeline();
pipeline.replace(this, "HHArdwareHandler", new HardwareHandler(holder, hardwareStateHolder));
Session session = holder.sessionDao.getOrCreateSessionByUser(
hardwareStateHolder.userKey, ctx.channel().eventLoop());
if (session.isSameEventLoop(ctx)) {
completeLogin(ctx.channel(), session, user, dash, device, msgId);
} else {
log.debug("Re registering hard channel. {}", ctx.channel());
ReregisterChannelUtil.reRegisterChannel(ctx, session, channelFuture ->
completeLogin(channelFuture.channel(), session, user, dash, device, msgId));
}
}
private boolean upgradeToWebSocket(ChannelHandlerContext ctx, FullHttpRequest request) {
HttpHeaders headers = request.headers();
if ("Upgrade".equalsIgnoreCase(headers.get(HttpHeaderNames.CONNECTION)) &&
"WebSocket".equalsIgnoreCase(headers.get(HttpHeaderNames.UPGRADE))) {
ChannelPipeline pipeline = ctx.pipeline();
// 将http升级为WebSocket
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(FrameCodec.getInstance());
pipeline.addLast(FrameHandler.getInstance(channelListener));
pipeline.remove(this);
// 将channelActive事件传递到FrameHandler
ctx.fireChannelActive();
return true;
}
return false;
}
/**
* Add all {@link ChannelHandler}'s that are needed for SPDY with the given version.
*/
protected void addSpdyHandlers(ChannelHandlerContext ctx, SpdyVersion version) {
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addLast("spdyFrameCodec", new SpdyFrameCodec(version));
pipeline.addLast("spdySessionHandler", new SpdySessionHandler(version, true));
pipeline.addLast("spdyHttpEncoder", new SpdyHttpEncoder(version));
pipeline.addLast("spdyHttpDecoder", new SpdyHttpDecoder(version, maxSpdyContentLength));
pipeline.addLast("spdyStreamIdHandler", new SpdyHttpResponseStreamIdHandler());
pipeline.addLast("httpRequestHandler", createHttpRequestHandlerForSpdy());
}
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
try {
initChannel(channelHandlerContext.channel());
} catch (Throwable t) {
exceptionCaught(channelHandlerContext, t);
} finally {
ChannelPipeline pipeline = channelHandlerContext.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
channelHandlerContext.pipeline().fireChannelRegistered();
}
private void enableGzip(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
p.addLast("unificationB", new PortUnificationServerHandler(sslCtx, detectSsl, false));
p.remove(this);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ProtocolVersion) {
handleServerVersion(ctx, (ProtocolVersion) msg);
return;
}
if (msg instanceof SecurityTypesEvent) {
handleSecurityTypes(ctx, (SecurityTypesEvent) msg);
return;
}
if (msg instanceof RfbSecurityMessage) {
handleSecurityMessage(ctx, (RfbSecurityMessage) msg);
return;
}
if (msg instanceof SecurityResultEvent) {
handleSecurityResult(ctx, (SecurityResultEvent) msg);
return;
}
if (msg instanceof ServerInitEvent) {
handshaker.finishHandshake(ctx.channel(), config.versionProperty().get());
ChannelPipeline cp = ctx.pipeline();
cp.fireUserEventTriggered(ProtocolState.HANDSHAKE_COMPLETE);
cp.remove(this);
cp.fireChannelRead(msg);
return;
}
throw new ProtocolException("unknown message occurred: " + msg);
}
/**
* Invoke fire and get object response command handler.
*
* @param ctx the ctx
*/
private void invokeFireAndGetObjectResponseCommandHandler(
ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast(DECODER, new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast("commandHandler", new CommandAsObjectResponser());
p.addLast(ENCODER, new ObjectEncoder());
p.remove(this);
}
@Override
protected void addCodec(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline p = ctx.pipeline();
String name = ctx.name();
Socks4ClientDecoder decoder = new Socks4ClientDecoder();
p.addBefore(name, null, decoder);
decoderName = p.context(decoder).name();
encoderName = decoderName + ".encoder";
p.addBefore(name, encoderName, Socks4ClientEncoder.INSTANCE);
}
/**
* Invoke log files receive handler.
*
* @param ctx the ctx
*/
private void invokeLogFilesReceiveHandlerForHA(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
p.addLast("stringDecoder", new StringDecoder());
p.addLast("delegator", new Delegator(receiveDirectory));
p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(),
JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
p.addLast("stringEncoder", new StringEncoder());
p.addLast(e1, new LogFilesEncoder());
p.remove(this);
}
@Override
protected void removeDecoder(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline p = ctx.pipeline();
if (p.context(decoderName) != null) {
p.remove(decoderName);
}
}
private void setHttpInterceptor(ChannelHandlerContext ctx, boolean ssl) {
ctx.pipeline().addLast("http-codec", new HttpServerCodec());
ctx.pipeline().addLast(new HttpServerExpectContinueHandler());
ctx.pipeline().addLast("replay-handler", new ReplayHandler(outboundChannel));
outboundChannel.pipeline().addLast("http-codec", new HttpClientCodec());
var httpUpgradeHandler = new HttpUpgradeHandler(ssl, address, messageListener, ctx.pipeline());
outboundChannel.pipeline().addLast("http-upgrade-handler", httpUpgradeHandler);
var httpInterceptor = new HttpInterceptor(ssl, address, messageListener);
outboundChannel.pipeline().addLast("http-interceptor", httpInterceptor);
outboundChannel.pipeline().addLast("replay-handler", new ReplayHandler(ctx.channel()));
}
@Override
public void proxyToServerConnectionSucceeded(final ChannelHandlerContext serverCtx) {
ChannelPipeline pipeline = serverCtx.pipeline();
//当没有修改getMaximumResponseBufferSizeInBytes中buffer默认的大小时,下面两个handler是不存在的
if (pipeline.get("inflater") != null) {
pipeline.remove("inflater");
}
if (pipeline.get("aggregator") != null) {
pipeline.remove("aggregator");
}
super.proxyToServerConnectionSucceeded(serverCtx);
}
/**
* Invoke fire and forget command handler.
*
* @param ctx the ctx
*/
private void invokeFireAndForgetCommandHandler(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast(DECODER, new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast("commandExecutor", new CommandDelegator());
p.addLast(ENCODER, new StringEncoder());
p.remove(this);
}
@Override
protected void decode(final ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
if (msg instanceof ImageRect) {
final ImageRect rect = (ImageRect) msg;
render.render(rect);
return;
}
if (msg instanceof ServerDecoderEvent) {
final ServerDecoderEvent event = (ServerDecoderEvent) msg;
render.eventReceived(event);
return;
}
if (!(msg instanceof ServerInitEvent)) {
logger.error("unknown message: {}", msg);
ctx.fireChannelRead(msg);
return;
}
serverInit = (ServerInitEvent) msg;
logger.debug("handshake completed with {}", serverInit);
FrameDecoderHandler frameHandler = new FrameDecoderHandler(serverInit.getPixelFormat());
if (!frameHandler.isPixelFormatSupported()) {
ProtocolException e = new ProtocolException(String.format("pixelformat: (%s bpp) not supported yet", serverInit.getPixelFormat().getBitPerPixel()));
exceptionCaught(ctx, e);
return;
}
ChannelPipeline cp = ctx.pipeline();
cp.addBefore(ctx.name(), "rfb-encoding-encoder", new PreferedEncodingEncoder());
PreferedEncoding prefEncodings = getPreferedEncodings(frameHandler.getSupportedEncodings());
ctx.write(prefEncodings);
cp.addBefore(ctx.name(), "rfb-pixelformat-encoder", new PixelFormatEncoder());
ctx.write(serverInit.getPixelFormat());
ctx.flush();
cp.addBefore(ctx.name(), "rfb-frame-handler", frameHandler);
cp.addBefore(ctx.name(), "rfb-keyevent-encoder", new KeyButtonEventEncoder());
cp.addBefore(ctx.name(), "rfb-pointerevent-encoder", new PointerEventEncoder());
cp.addBefore(ctx.name(), "rfb-cuttext-encoder", new ClientCutTextEncoder());
render.eventReceived(getConnectInfoEvent(ctx, prefEncodings));
render.registerInputEventListener(event -> ctx.writeAndFlush(event, ctx.voidPromise()));
logger.debug("request full framebuffer update");
sendFramebufferUpdateRequest(ctx, false, 0, 0, serverInit.getFrameBufferWidth(), serverInit.getFrameBufferHeight());
logger.trace("channel pipeline: {}", cp.toMap().keySet());
}
private void addHttp2Handlers(ChannelHandlerContext ctx) {
final ChannelPipeline p = ctx.pipeline();
p.addLast(newHttp2ConnectionHandler(p, SCHEME_HTTPS));
p.addLast(new HttpServerHandler(config, gracefulShutdownSupport, null, H2, proxiedAddresses));
}