类 io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler 的API类实例代码及写法,或者点击链接到github查看源代码。


@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
        // Handshake is completed
        String actualProtocol = handshaker.actualSubprotocol();
        serialization = WampSerialization.fromString(actualProtocol);
        if (serialization == WampSerialization.Invalid) {
            throw new WampError("Invalid Websocket Protocol");
        }
        
        // Install the serializer and deserializer
        ctx.pipeline()
           .addAfter(ctx.name(), "wamp-deserializer", 
                     new WampDeserializationHandler(serialization));
        ctx.pipeline()
           .addAfter(ctx.name(), "wamp-serializer", 
                     new WampSerializationHandler(serialization));
        
        // Fire the connection established event
        ctx.fireUserEventTriggered(new ConnectionEstablishedEvent(serialization));
        
    } else {
        ctx.fireUserEventTriggered(evt);
    }
}
 

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
        // Handshake is completed
        String actualProtocol = handshaker.actualSubprotocol();
        serialization = WampSerialization.fromString(actualProtocol);
        if (serialization == WampSerialization.Invalid) {
            throw new WampError("Invalid Websocket Protocol");
        }
        
        // Install the serializer and deserializer
        ctx.pipeline()
           .addAfter(ctx.name(), "wamp-deserializer", 
                     new WampDeserializationHandler(serialization));
        ctx.pipeline()
           .addAfter(ctx.name(), "wamp-serializer", 
                     new WampSerializationHandler(serialization));
        
        // Fire the connection established event
        ctx.fireUserEventTriggered(new ConnectionEstablishedEvent(serialization));
        
    } else {
        ctx.fireUserEventTriggered(evt);
    }
}
 

/**
 * websocket协议支持
 */
private void appendWebsocketCodec(ChannelPipeline pipeline) throws URISyntaxException {
    // websocket 解码流程
    URI uri = new URI(websocketUrl);
    pipeline.addLast(new WebSocketClientProtocolHandler(uri, WebSocketVersion.V13,
            null, true, new DefaultHttpHeaders(), sessionConfig.maxFrameLength()));
    pipeline.addLast(new BinaryWebSocketFrameToBytesDecoder());

    // websocket 编码流程
    // Web socket clients must set this to true to mask payload.
    // Server implementations must set this to false.
    pipeline.addLast(new WebSocket13FrameEncoder(true));
    // 将ByteBuf转换为websocket二进制帧
    pipeline.addLast(new BytesToBinaryWebSocketFrameEncoder());
}
 

/**
 * 通道注册的时候配置websocket解码handler
 */
@Override
protected final void initChannel(Channel ch) throws Exception {
	ChannelPipeline pipeline=ch.pipeline();
	if (sslCtx != null) {
		pipeline.addLast(sslCtx.newHandler(ch.alloc(),host,port));
       }
	pipeline.addLast(new HttpClientCodec());
	pipeline.addLast(new ChunkedWriteHandler());
	pipeline.addLast(new HttpObjectAggregator(64*1024));
	pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(webSocketURL, WebSocketVersion.V13, subprotocol, false, new DefaultHttpHeaders())));
       pipeline.addLast(new WebSocketConnectedClientHandler());//连接成功监听handler
}
 

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
		throws Exception {
	if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE)
	{
		log.debug("excute webSocketHandComplete……");
		webSocketHandComplete(ctx);
		ctx.pipeline().remove(this);
		log.debug("excuted webSocketHandComplete:"+ctx.pipeline().toMap().toString());
	}else
	{
		super.userEventTriggered(ctx, evt);
	}
}
 

@Override
public void addToPipeline(final ChannelPipeline pipeline) {
    pipeline.addLast("http-codec", new HttpClientCodec());
    pipeline.addLast("aggregator", new HttpObjectAggregator(8192));

    final WebSocketClientHandshaker handShaker = new WhiteSpaceInPathWebSocketClientHandshaker13(serverUri,
            WebSocketVersion.V13, PROTOCOL, false, createHttpHeaders(httpHeaders), Integer.MAX_VALUE);
    pipeline.addLast("websocket-protocol-handler", new WebSocketClientProtocolHandler(handShaker));

    pipeline.addLast("websocket-frame-codec", new ByteBufToWebSocketFrameCodec());
}
 
源代码7 项目: arthas   文件: TunnelClient.java

public ChannelFuture connect(boolean reconnect) throws SSLException, URISyntaxException, InterruptedException {
    QueryStringEncoder queryEncoder = new QueryStringEncoder(this.tunnelServerUrl);
    queryEncoder.addParam("method", "agentRegister");
    if (id != null) {
        queryEncoder.addParam("id", id);
    }
    // ws://127.0.0.1:7777/ws?method=agentRegister
    final URI agentRegisterURI = queryEncoder.toUri();

    logger.info("Try to register arthas agent, uri: {}", agentRegisterURI);

    String scheme = agentRegisterURI.getScheme() == null ? "ws" : agentRegisterURI.getScheme();
    final String host = agentRegisterURI.getHost() == null ? "127.0.0.1" : agentRegisterURI.getHost();
    final int port;
    if (agentRegisterURI.getPort() == -1) {
        if ("ws".equalsIgnoreCase(scheme)) {
            port = 80;
        } else if ("wss".equalsIgnoreCase(scheme)) {
            port = 443;
        } else {
            port = -1;
        }
    } else {
        port = agentRegisterURI.getPort();
    }

    if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
        throw new IllegalArgumentException("Only WS(S) is supported. tunnelServerUrl: " + tunnelServerUrl);
    }

    final boolean ssl = "wss".equalsIgnoreCase(scheme);
    final SslContext sslCtx;
    if (ssl) {
        sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
    } else {
        sslCtx = null;
    }

    WebSocketClientHandshaker newHandshaker = WebSocketClientHandshakerFactory.newHandshaker(agentRegisterURI,
            WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
    final WebSocketClientProtocolHandler websocketClientHandler = new WebSocketClientProtocolHandler(newHandshaker);
    final TunnelClientSocketClientHandler handler = new TunnelClientSocketClientHandler(TunnelClient.this);

    Bootstrap bs = new Bootstrap();

    bs.group(eventLoopGroup).channel(NioSocketChannel.class).remoteAddress(host, port)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline p = ch.pipeline();
                    if (sslCtx != null) {
                        p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
                    }

                    p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), websocketClientHandler,
                            handler);
                }
            });

    ChannelFuture connectFuture = bs.connect();
    if (reconnect) {
        connectFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.cause() != null) {
                    logger.error("connect to tunnel server error, uri: {}", tunnelServerUrl, future.cause());
                }
            }
        });
    }
    channel = connectFuture.sync().channel();

    return handler.registerFuture();
}
 
源代码8 项目: arthas   文件: ForwardClient.java

public void start() throws URISyntaxException, SSLException, InterruptedException {
    String scheme = tunnelServerURI.getScheme() == null ? "ws" : tunnelServerURI.getScheme();
    final String host = tunnelServerURI.getHost() == null ? "127.0.0.1" : tunnelServerURI.getHost();
    final int port;
    if (tunnelServerURI.getPort() == -1) {
        if ("ws".equalsIgnoreCase(scheme)) {
            port = 80;
        } else if ("wss".equalsIgnoreCase(scheme)) {
            port = 443;
        } else {
            port = -1;
        }
    } else {
        port = tunnelServerURI.getPort();
    }

    if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
        logger.error("Only WS(S) is supported, uri: {}", tunnelServerURI);
        return;
    }

    final boolean ssl = "wss".equalsIgnoreCase(scheme);
    final SslContext sslCtx;
    if (ssl) {
        sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
    } else {
        sslCtx = null;
    }

    // connect to local server
    WebSocketClientHandshaker newHandshaker = WebSocketClientHandshakerFactory.newHandshaker(tunnelServerURI,
            WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
    final WebSocketClientProtocolHandler websocketClientHandler = new WebSocketClientProtocolHandler(newHandshaker);

    final ForwardClientSocketClientHandler forwardClientSocketClientHandler = new ForwardClientSocketClientHandler(
            localServerURI);

    Bootstrap b = new Bootstrap();
    b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) {
            ChannelPipeline p = ch.pipeline();
            if (sslCtx != null) {
                p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
            }
            p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), websocketClientHandler,
                    forwardClientSocketClientHandler);
        }
    });

    channel = b.connect(tunnelServerURI.getHost(), port).sync().channel();
    logger.info("forward client connect to server success, uri: " + tunnelServerURI);
}
 

@FXML
public void connect() throws URISyntaxException {
	
	if( connected.get() ) {
		if( logger.isWarnEnabled() ) {
			logger.warn("client already connected; skipping connect");
		}
		return;  // already connected; should be prevented with disabled
	}
	
	String host = tfHost.getText();
	int port = Integer.parseInt(tfPort.getText());

	group = new NioEventLoopGroup();
	
	final WebSocketClientProtocolHandler handler =
			  new WebSocketClientProtocolHandler(
					  WebSocketClientHandshakerFactory.newHandshaker(
							  new URI("ws://" + host + "/ws"), WebSocketVersion.V13, null, false, new DefaultHttpHeaders()));
	  
	Task<Channel> task = new Task<Channel>() {

		@Override
		protected Channel call() throws Exception {
			
			updateMessage("Bootstrapping");
			updateProgress(0.1d, 1.0d);
			
			Bootstrap b = new Bootstrap();
			b
				.group(group)
				.channel(NioSocketChannel.class)
				.remoteAddress( new InetSocketAddress(host, port) )
				.handler( new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline p = ch.pipeline();
						p.addLast(new HttpClientCodec());
						p.addLast(new HttpObjectAggregator(8192));
						p.addLast(handler);
						p.addLast(new EchoClientHandlerWS(receivingMessageModel));
					}
				});
			
			updateMessage("Connecting");
			updateProgress(0.2d, 1.0d);

			ChannelFuture f = b.connect();				
			f.sync();
			Channel chn = f.channel();

			if( logger.isDebugEnabled() ) {
				logger.debug("[CONNECT] channel active=" + chn.isActive() + ", open=" + chn.isOpen() + ", register=" + chn.isRegistered() + ", writeable=" + chn.isWritable());
			}

			return chn;
		}

		@Override
		protected void succeeded() {
			
			channel = getValue();
			connected.set(true);
		}

		@Override
		protected void failed() {
			
			Throwable exc = getException();
			logger.error( "client connect error", exc );
			Alert alert = new Alert(AlertType.ERROR);
			alert.setTitle("Client");
			alert.setHeaderText( exc.getClass().getName() );
			alert.setContentText( exc.getMessage() );
			alert.showAndWait();
			
			connected.set(false);
		}
	};
	
	hboxStatus.visibleProperty().bind( task.runningProperty() );
	lblStatus.textProperty().bind( task.messageProperty() );
	piStatus.progressProperty().bind(task.progressProperty());
	
	new Thread(task).start();
}
 
 类方法
 同包方法