下面列出了怎么用 io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
// Handshake is completed
String actualProtocol = handshaker.actualSubprotocol();
serialization = WampSerialization.fromString(actualProtocol);
if (serialization == WampSerialization.Invalid) {
throw new WampError("Invalid Websocket Protocol");
}
// Install the serializer and deserializer
ctx.pipeline()
.addAfter(ctx.name(), "wamp-deserializer",
new WampDeserializationHandler(serialization));
ctx.pipeline()
.addAfter(ctx.name(), "wamp-serializer",
new WampSerializationHandler(serialization));
// Fire the connection established event
ctx.fireUserEventTriggered(new ConnectionEstablishedEvent(serialization));
} else {
ctx.fireUserEventTriggered(evt);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
// Handshake is completed
String actualProtocol = handshaker.actualSubprotocol();
serialization = WampSerialization.fromString(actualProtocol);
if (serialization == WampSerialization.Invalid) {
throw new WampError("Invalid Websocket Protocol");
}
// Install the serializer and deserializer
ctx.pipeline()
.addAfter(ctx.name(), "wamp-deserializer",
new WampDeserializationHandler(serialization));
ctx.pipeline()
.addAfter(ctx.name(), "wamp-serializer",
new WampSerializationHandler(serialization));
// Fire the connection established event
ctx.fireUserEventTriggered(new ConnectionEstablishedEvent(serialization));
} else {
ctx.fireUserEventTriggered(evt);
}
}
/**
* websocket协议支持
*/
private void appendWebsocketCodec(ChannelPipeline pipeline) throws URISyntaxException {
// websocket 解码流程
URI uri = new URI(websocketUrl);
pipeline.addLast(new WebSocketClientProtocolHandler(uri, WebSocketVersion.V13,
null, true, new DefaultHttpHeaders(), sessionConfig.maxFrameLength()));
pipeline.addLast(new BinaryWebSocketFrameToBytesDecoder());
// websocket 编码流程
// Web socket clients must set this to true to mask payload.
// Server implementations must set this to false.
pipeline.addLast(new WebSocket13FrameEncoder(true));
// 将ByteBuf转换为websocket二进制帧
pipeline.addLast(new BytesToBinaryWebSocketFrameEncoder());
}
/**
* 通道注册的时候配置websocket解码handler
*/
@Override
protected final void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(),host,port));
}
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64*1024));
pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(webSocketURL, WebSocketVersion.V13, subprotocol, false, new DefaultHttpHeaders())));
pipeline.addLast(new WebSocketConnectedClientHandler());//连接成功监听handler
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE)
{
log.debug("excute webSocketHandComplete……");
webSocketHandComplete(ctx);
ctx.pipeline().remove(this);
log.debug("excuted webSocketHandComplete:"+ctx.pipeline().toMap().toString());
}else
{
super.userEventTriggered(ctx, evt);
}
}
@Override
public void addToPipeline(final ChannelPipeline pipeline) {
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(8192));
final WebSocketClientHandshaker handShaker = new WhiteSpaceInPathWebSocketClientHandshaker13(serverUri,
WebSocketVersion.V13, PROTOCOL, false, createHttpHeaders(httpHeaders), Integer.MAX_VALUE);
pipeline.addLast("websocket-protocol-handler", new WebSocketClientProtocolHandler(handShaker));
pipeline.addLast("websocket-frame-codec", new ByteBufToWebSocketFrameCodec());
}
public ChannelFuture connect(boolean reconnect) throws SSLException, URISyntaxException, InterruptedException {
QueryStringEncoder queryEncoder = new QueryStringEncoder(this.tunnelServerUrl);
queryEncoder.addParam("method", "agentRegister");
if (id != null) {
queryEncoder.addParam("id", id);
}
// ws://127.0.0.1:7777/ws?method=agentRegister
final URI agentRegisterURI = queryEncoder.toUri();
logger.info("Try to register arthas agent, uri: {}", agentRegisterURI);
String scheme = agentRegisterURI.getScheme() == null ? "ws" : agentRegisterURI.getScheme();
final String host = agentRegisterURI.getHost() == null ? "127.0.0.1" : agentRegisterURI.getHost();
final int port;
if (agentRegisterURI.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = agentRegisterURI.getPort();
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
throw new IllegalArgumentException("Only WS(S) is supported. tunnelServerUrl: " + tunnelServerUrl);
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
WebSocketClientHandshaker newHandshaker = WebSocketClientHandshakerFactory.newHandshaker(agentRegisterURI,
WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
final WebSocketClientProtocolHandler websocketClientHandler = new WebSocketClientProtocolHandler(newHandshaker);
final TunnelClientSocketClientHandler handler = new TunnelClientSocketClientHandler(TunnelClient.this);
Bootstrap bs = new Bootstrap();
bs.group(eventLoopGroup).channel(NioSocketChannel.class).remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), websocketClientHandler,
handler);
}
});
ChannelFuture connectFuture = bs.connect();
if (reconnect) {
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.cause() != null) {
logger.error("connect to tunnel server error, uri: {}", tunnelServerUrl, future.cause());
}
}
});
}
channel = connectFuture.sync().channel();
return handler.registerFuture();
}
public void start() throws URISyntaxException, SSLException, InterruptedException {
String scheme = tunnelServerURI.getScheme() == null ? "ws" : tunnelServerURI.getScheme();
final String host = tunnelServerURI.getHost() == null ? "127.0.0.1" : tunnelServerURI.getHost();
final int port;
if (tunnelServerURI.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = tunnelServerURI.getPort();
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
logger.error("Only WS(S) is supported, uri: {}", tunnelServerURI);
return;
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// connect to local server
WebSocketClientHandshaker newHandshaker = WebSocketClientHandshakerFactory.newHandshaker(tunnelServerURI,
WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
final WebSocketClientProtocolHandler websocketClientHandler = new WebSocketClientProtocolHandler(newHandshaker);
final ForwardClientSocketClientHandler forwardClientSocketClientHandler = new ForwardClientSocketClientHandler(
localServerURI);
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), websocketClientHandler,
forwardClientSocketClientHandler);
}
});
channel = b.connect(tunnelServerURI.getHost(), port).sync().channel();
logger.info("forward client connect to server success, uri: " + tunnelServerURI);
}
@FXML
public void connect() throws URISyntaxException {
if( connected.get() ) {
if( logger.isWarnEnabled() ) {
logger.warn("client already connected; skipping connect");
}
return; // already connected; should be prevented with disabled
}
String host = tfHost.getText();
int port = Integer.parseInt(tfPort.getText());
group = new NioEventLoopGroup();
final WebSocketClientProtocolHandler handler =
new WebSocketClientProtocolHandler(
WebSocketClientHandshakerFactory.newHandshaker(
new URI("ws://" + host + "/ws"), WebSocketVersion.V13, null, false, new DefaultHttpHeaders()));
Task<Channel> task = new Task<Channel>() {
@Override
protected Channel call() throws Exception {
updateMessage("Bootstrapping");
updateProgress(0.1d, 1.0d);
Bootstrap b = new Bootstrap();
b
.group(group)
.channel(NioSocketChannel.class)
.remoteAddress( new InetSocketAddress(host, port) )
.handler( new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(8192));
p.addLast(handler);
p.addLast(new EchoClientHandlerWS(receivingMessageModel));
}
});
updateMessage("Connecting");
updateProgress(0.2d, 1.0d);
ChannelFuture f = b.connect();
f.sync();
Channel chn = f.channel();
if( logger.isDebugEnabled() ) {
logger.debug("[CONNECT] channel active=" + chn.isActive() + ", open=" + chn.isOpen() + ", register=" + chn.isRegistered() + ", writeable=" + chn.isWritable());
}
return chn;
}
@Override
protected void succeeded() {
channel = getValue();
connected.set(true);
}
@Override
protected void failed() {
Throwable exc = getException();
logger.error( "client connect error", exc );
Alert alert = new Alert(AlertType.ERROR);
alert.setTitle("Client");
alert.setHeaderText( exc.getClass().getName() );
alert.setContentText( exc.getMessage() );
alert.showAndWait();
connected.set(false);
}
};
hboxStatus.visibleProperty().bind( task.runningProperty() );
lblStatus.textProperty().bind( task.messageProperty() );
piStatus.progressProperty().bind(task.progressProperty());
new Thread(task).start();
}